Commit 9d7d98ff authored by acud's avatar acud Committed by GitHub

pullsync: address technical debt (#810)

parent 9917a715
......@@ -202,13 +202,7 @@ func (p *Puller) disconnectPeer(ctx context.Context, peer swarm.Address, po uint
p.logger.Debugf("puller disconnect cleanup peer %s po %d", peer, po)
}
syncCtx := p.syncPeers[po][peer.String()] // disconnectPeer is called under lock, this is safe
syncCtx.Lock()
defer syncCtx.Unlock()
for _, f := range syncCtx.binCancelFuncs {
f()
}
syncCtx.gone()
delete(p.syncPeers[po], peer.String())
}
......@@ -241,30 +235,11 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
}
for _, bin := range want {
// question: do we want to have the separate cancel funcs per live/hist
// for known whether syncing is running on that bin/stream? could be some race here
if _, ok := syncCtx.binCancelFuncs[bin]; !ok {
// if there's no bin cancel func it means there's no
// sync running on this bin. start syncing both hist and live
cur := c[bin]
binCtx, cancel := context.WithCancel(ctx)
syncCtx.binCancelFuncs[bin] = cancel
if cur > 0 {
p.wg.Add(1)
go p.histSyncWorker(binCtx, peer, bin, cur)
}
p.wg.Add(1)
go p.liveSyncWorker(binCtx, peer, bin, cur)
}
}
for _, bin := range dontWant {
if c, ok := syncCtx.binCancelFuncs[bin]; ok {
// we have sync running on this bin, cancel it
c()
delete(syncCtx.binCancelFuncs, bin)
if !syncCtx.isBinSyncing(bin) {
p.syncPeerBin(ctx, syncCtx, peer, bin, c[bin])
}
}
syncCtx.cancelBins(dontWant...)
} else {
// outside of depth
var (
......@@ -279,26 +254,10 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
dontWant = append(dontWant, i)
}
if _, ok := syncCtx.binCancelFuncs[want]; !ok {
// if there's no bin cancel func it means there's no
// sync running on this bin. start syncing both hist and live
cur := c[want]
binCtx, cancel := context.WithCancel(ctx)
syncCtx.binCancelFuncs[po] = cancel
if cur > 0 {
p.wg.Add(1)
go p.histSyncWorker(binCtx, peer, want, cur)
}
p.wg.Add(1)
go p.liveSyncWorker(binCtx, peer, want, cur)
}
for _, bin := range dontWant {
if c, ok := syncCtx.binCancelFuncs[bin]; ok {
// we have sync running on this bin, cancel it
c()
delete(syncCtx.binCancelFuncs, bin)
}
if !syncCtx.isBinSyncing(want) {
p.syncPeerBin(ctx, syncCtx, peer, want, c[want])
}
syncCtx.cancelBins(dontWant...)
}
}
......@@ -330,17 +289,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
// peer outside depth?
if po < d && po > 0 {
cur, bin := c[po], po
// start just one bin for historical and live
binCtx, cancel := context.WithCancel(ctx)
syncCtx.binCancelFuncs[po] = cancel
if cur > 0 {
p.wg.Add(1)
go p.histSyncWorker(binCtx, peer, bin, cur) // start historical
}
p.wg.Add(1)
go p.liveSyncWorker(binCtx, peer, bin, cur) // start live
p.syncPeerBin(ctx, syncCtx, peer, po, c[po])
return
}
......@@ -348,16 +297,20 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
if bin == 0 || uint8(bin) < d {
continue
}
binCtx, cancel := context.WithCancel(ctx)
syncCtx.binCancelFuncs[uint8(bin)] = cancel
if cur > 0 {
p.wg.Add(1)
go p.histSyncWorker(binCtx, peer, uint8(bin), cur) // start historical
}
// start live
p.syncPeerBin(ctx, syncCtx, peer, uint8(bin), cur)
}
}
func (p *Puller) syncPeerBin(ctx context.Context, syncCtx *syncPeer, peer swarm.Address, bin uint8, cur uint64) {
binCtx, cancel := context.WithCancel(ctx)
syncCtx.setBinCancel(cancel, bin)
if cur > 0 {
p.wg.Add(1)
go p.liveSyncWorker(binCtx, peer, uint8(bin), cur) // start live
go p.histSyncWorker(binCtx, peer, bin, cur)
}
// start live
p.wg.Add(1)
go p.liveSyncWorker(binCtx, peer, bin, cur)
}
func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) {
......@@ -454,9 +407,6 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin
}
return
}
if top == 0 {
return //TODO need to deal with this somehow. not right
}
err = p.addPeerInterval(peer, bin, from, top)
if err != nil {
p.metrics.LiveWorkerErrCounter.Inc()
......@@ -475,11 +425,7 @@ func (p *Puller) Close() error {
for i := uint8(0); i < p.bins; i++ {
binPeers := p.syncPeers[i]
for _, peer := range binPeers {
peer.Lock()
for _, f := range peer.binCancelFuncs {
f()
}
peer.Unlock()
peer.gone()
}
}
cc := make(chan struct{})
......@@ -562,6 +508,34 @@ func newSyncPeer(addr swarm.Address, bins uint8) *syncPeer {
}
}
// called when peer disconnects or on shutdown, cleans up ongoing sync operations
func (p *syncPeer) gone() {
p.Lock()
defer p.Unlock()
for _, f := range p.binCancelFuncs {
f()
}
}
func (p *syncPeer) setBinCancel(cf func(), bin uint8) {
p.binCancelFuncs[bin] = cf
}
func (p *syncPeer) cancelBins(bins ...uint8) {
for _, bin := range bins {
if c, ok := p.binCancelFuncs[bin]; ok {
c()
delete(p.binCancelFuncs, bin)
}
}
}
func (p *syncPeer) isBinSyncing(bin uint8) bool {
_, ok := p.binCancelFuncs[bin]
return ok
}
func isSyncing(p *Puller, addr swarm.Address) bool {
// this is needed for testing purposes in order
// to verify that a peer is no longer syncing on
......
......@@ -6,7 +6,6 @@ package mock
import (
"context"
"io"
"math"
"sync"
......@@ -163,13 +162,15 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin
if isLive && p.blockLiveSync {
// don't respond, wait for quit
<-p.quit
return 0, 1, io.EOF
return 0, 1, context.Canceled
}
if isLive && len(p.liveSyncReplies) > 0 {
if p.liveSyncCalls >= len(p.liveSyncReplies) {
<-p.quit
return
// when shutting down, onthe puller side we cancel the context going into the pullsync protocol request
// this results in SyncInterval returning with a context cancelled error
return 0, 0, context.Canceled
}
p.mtx.Lock()
v := p.liveSyncReplies[p.liveSyncCalls]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment