Commit 1ee1b179 authored by acud's avatar acud Committed by GitHub

puller, pull sync: clean shutdown (#370)

* puller, pullsync: clean shutdown
parent 294b0ca4
......@@ -49,6 +49,7 @@ type Puller struct {
cursorsMtx sync.Mutex
quit chan struct{}
wg sync.WaitGroup
}
func New(o Options) *Puller {
......@@ -62,12 +63,13 @@ func New(o Options) *Puller {
syncPeers: make([]map[string]*syncPeer, bins),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
}
for i := uint8(0); i < bins; i++ {
p.syncPeers[i] = make(map[string]*syncPeer)
}
p.wg.Add(1)
go p.manage()
return p
}
......@@ -78,11 +80,15 @@ type peer struct {
}
func (p *Puller) manage() {
defer p.wg.Done()
c, unsubscribe := p.topology.SubscribePeersChange()
defer unsubscribe()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-p.quit
cancel()
}()
for {
select {
case <-c:
......@@ -229,8 +235,10 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
binCtx, cancel := context.WithCancel(ctx)
syncCtx.binCancelFuncs[bin] = cancel
if cur > 0 {
p.wg.Add(1)
go p.histSyncWorker(binCtx, peer, bin, cur)
}
p.wg.Add(1)
go p.liveSyncWorker(binCtx, peer, bin, cur)
}
}
......@@ -263,8 +271,10 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
binCtx, cancel := context.WithCancel(ctx)
syncCtx.binCancelFuncs[po] = cancel
if cur > 0 {
p.wg.Add(1)
go p.histSyncWorker(binCtx, peer, want, cur)
}
p.wg.Add(1)
go p.liveSyncWorker(binCtx, peer, want, cur)
}
for _, bin := range dontWant {
......@@ -310,8 +320,10 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
binCtx, cancel := context.WithCancel(ctx)
syncCtx.binCancelFuncs[po] = cancel
if cur > 0 {
p.wg.Add(1)
go p.histSyncWorker(binCtx, peer, bin, cur) // start historical
}
p.wg.Add(1)
go p.liveSyncWorker(binCtx, peer, bin, cur) // start live
return
......@@ -324,14 +336,17 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
binCtx, cancel := context.WithCancel(ctx)
syncCtx.binCancelFuncs[uint8(bin)] = cancel
if cur > 0 {
p.wg.Add(1)
go p.histSyncWorker(binCtx, peer, uint8(bin), cur) // start historical
}
// start live
p.wg.Add(1)
go p.liveSyncWorker(binCtx, peer, uint8(bin), cur) // start live
}
}
func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) {
defer p.wg.Done()
if logMore {
p.logger.Tracef("histSyncWorker starting, peer %s bin %d cursor %d", peer, bin, cur)
}
......@@ -387,6 +402,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin
}
func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) {
defer p.wg.Done()
if logMore {
p.logger.Tracef("liveSyncWorker starting, peer %s bin %d cursor %d", peer, bin, cur)
}
......@@ -431,6 +447,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin
}
func (p *Puller) Close() error {
p.logger.Info("puller shutting down")
close(p.quit)
p.syncPeersMtx.Lock()
defer p.syncPeersMtx.Unlock()
......@@ -444,6 +461,16 @@ func (p *Puller) Close() error {
peer.Unlock()
}
}
cc := make(chan struct{})
go func() {
defer close(cc)
p.wg.Wait()
}()
select {
case <-cc:
case <-time.After(10 * time.Second):
p.logger.Warning("puller shutting down with running goroutines")
}
return nil
}
......
......@@ -61,7 +61,7 @@ func TestOneSync(t *testing.T) {
pullSync: []mockps.Option{mockps.WithCursors(cursors), mockps.WithLiveSyncReplies(liveReplies...)},
})
defer puller.Close()
defer pullsync.Close()
runtime.Gosched()
time.Sleep(10 * time.Millisecond)
......@@ -193,8 +193,8 @@ func TestSyncFlow_PeerOutsideDepth_Historical(t *testing.T) {
},
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithAutoReply(), mockps.WithLiveSyncBlock()},
})
defer pullsync.Close()
defer puller.Close()
defer pullsync.Close()
runtime.Gosched()
time.Sleep(10 * time.Millisecond)
......@@ -246,8 +246,8 @@ func TestSyncFlow_PeerWithinDepth_Live(t *testing.T) {
},
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithLateSyncReply(tc.liveReplies...)},
})
defer pullsync.Close()
defer puller.Close()
defer pullsync.Close()
runtime.Gosched()
time.Sleep(100 * time.Millisecond)
......@@ -279,8 +279,8 @@ func TestPeerDisconnected(t *testing.T) {
pullSync: []mockps.Option{mockps.WithCursors(cursors), mockps.WithLiveSyncBlock()},
})
t.Cleanup(func() {
p.Close()
pullsync.Close()
p.Close()
})
runtime.Gosched()
......@@ -376,8 +376,8 @@ func TestDepthChange(t *testing.T) {
},
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithLateSyncReply(tc.syncReplies...)},
})
defer pullsync.Close()
defer puller.Close()
defer pullsync.Close()
runtime.Gosched()
time.Sleep(100 * time.Millisecond)
......
......@@ -50,6 +50,8 @@ type Syncer struct {
streamer p2p.Streamer
logger logging.Logger
storage pullstorage.Storer
quit chan struct{}
wg sync.WaitGroup
ruidMtx sync.Mutex
ruidCtx map[uint32]func()
......@@ -71,6 +73,8 @@ func New(o Options) *Syncer {
storage: o.Storage,
logger: o.Logger,
ruidCtx: make(map[uint32]func()),
wg: sync.WaitGroup{},
quit: make(chan struct{}),
}
}
......@@ -113,15 +117,10 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
}
ru.Ruid = binary.BigEndian.Uint32(b)
defer func() {
if err != nil {
_ = stream.FullClose()
return
}
_ = stream.Close()
}()
w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close()
if err = w.WriteMsgWithContext(ctx, &ru); err != nil {
return 0, 0, fmt.Errorf("write ruid: %w", err)
}
......@@ -220,11 +219,24 @@ func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) err
s.ruidCtx[ru.Ruid] = cancel
s.ruidMtx.Unlock()
defer func() {
select {
case <-s.quit:
cancel()
case <-ctx.Done():
}
s.ruidMtx.Lock()
delete(s.ruidCtx, ru.Ruid)
s.ruidMtx.Unlock()
}()
defer cancel()
select {
case <-s.quit:
return nil
default:
}
s.wg.Add(1)
defer s.wg.Done()
var rn pb.GetRange
if err := r.ReadMsgWithContext(ctx, &rn); err != nil {
......@@ -398,5 +410,17 @@ func (s *Syncer) cancelHandler(ctx context.Context, p p2p.Peer, stream p2p.Strea
}
func (s *Syncer) Close() error {
s.logger.Info("pull syncer shutting down")
close(s.quit)
cc := make(chan struct{})
go func() {
defer close(cc)
s.wg.Wait()
}()
select {
case <-cc:
case <-time.After(10 * time.Second):
s.logger.Warning("pull syncer shutting down with running goroutines")
}
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment