Commit 8d961da1 authored by acud's avatar acud Committed by GitHub

refactor: improve puller cancellations (#1890)

parent 0ce90358
...@@ -134,14 +134,15 @@ func (p *Puller) manage() { ...@@ -134,14 +134,15 @@ func (p *Puller) manage() {
// way that it returns an error - the value must be checked. // way that it returns an error - the value must be checked.
_ = p.topology.EachPeerRev(func(peerAddr swarm.Address, po uint8) (stop, jumpToNext bool, err error) { _ = p.topology.EachPeerRev(func(peerAddr swarm.Address, po uint8) (stop, jumpToNext bool, err error) {
bp := p.syncPeers[po] bp := p.syncPeers[po]
if _, ok := bp[peerAddr.String()]; ok {
delete(peersDisconnected, peerAddr.String())
}
if po >= depth { if po >= depth {
// delete from peersDisconnected since we'd like to sync
// with this peer
delete(peersDisconnected, peerAddr.ByteString())
// within depth, sync everything // within depth, sync everything
if _, ok := bp[peerAddr.String()]; !ok { if _, ok := bp[peerAddr.ByteString()]; !ok {
// we're not syncing with this peer yet, start doing so // we're not syncing with this peer yet, start doing so
bp[peerAddr.String()] = newSyncPeer(peerAddr, p.bins) bp[peerAddr.ByteString()] = newSyncPeer(peerAddr, p.bins)
peerEntry := peer{addr: peerAddr, po: po} peerEntry := peer{addr: peerAddr, po: po}
peersToSync = append(peersToSync, peerEntry) peersToSync = append(peersToSync, peerEntry)
} else { } else {
...@@ -149,14 +150,12 @@ func (p *Puller) manage() { ...@@ -149,14 +150,12 @@ func (p *Puller) manage() {
peerEntry := peer{addr: peerAddr, po: po} peerEntry := peer{addr: peerAddr, po: po}
peersToRecalc = append(peersToRecalc, peerEntry) peersToRecalc = append(peersToRecalc, peerEntry)
} }
} else {
if _, ok := bp[peerAddr.String()]; ok {
// already syncing, recalc so that existing streams get cleaned up
peerEntry := peer{addr: peerAddr, po: po}
peersToRecalc = append(peersToRecalc, peerEntry)
}
} }
// if peer is outside of depth, do nothing here, this
// will cause the peer to stay in the peersDisconnected
// map, leading to cancelling of its running syncing contexts.
return false, false, nil return false, false, nil
}) })
...@@ -171,13 +170,13 @@ func (p *Puller) manage() { ...@@ -171,13 +170,13 @@ func (p *Puller) manage() {
// stopgap solution for peers that dont return the correct // stopgap solution for peers that dont return the correct
// amount of cursors we expect // amount of cursors we expect
if dontSync { if dontSync {
peersDisconnected[v.addr.String()] = v peersDisconnected[v.addr.ByteString()] = v
} }
} }
p.syncPeersMtx.Lock() p.syncPeersMtx.Lock()
for _, v := range peersDisconnected { for _, v := range peersDisconnected {
p.disconnectPeer(ctx, v.addr, v.po) p.disconnectPeer(v.addr, v.po)
} }
p.syncPeersMtx.Unlock() p.syncPeersMtx.Unlock()
...@@ -187,15 +186,15 @@ func (p *Puller) manage() { ...@@ -187,15 +186,15 @@ func (p *Puller) manage() {
} }
} }
func (p *Puller) disconnectPeer(ctx context.Context, peer swarm.Address, po uint8) { func (p *Puller) disconnectPeer(peer swarm.Address, po uint8) {
if logMore { if logMore {
p.logger.Debugf("puller disconnect cleanup peer %s po %d", peer, po) p.logger.Debugf("puller disconnect cleanup peer %s po %d", peer, po)
} }
if syncCtx, ok := p.syncPeers[po][peer.String()]; ok { if syncCtx, ok := p.syncPeers[po][peer.ByteString()]; ok {
// disconnectPeer is called under lock, this is safe // disconnectPeer is called under lock, this is safe
syncCtx.gone() syncCtx.gone()
} }
delete(p.syncPeers[po], peer.String()) delete(p.syncPeers[po], peer.ByteString())
} }
func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8) (dontSync bool) { func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8) (dontSync bool) {
...@@ -204,14 +203,14 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8 ...@@ -204,14 +203,14 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
} }
p.syncPeersMtx.Lock() p.syncPeersMtx.Lock()
syncCtx := p.syncPeers[po][peer.String()] syncCtx := p.syncPeers[po][peer.ByteString()]
p.syncPeersMtx.Unlock() p.syncPeersMtx.Unlock()
syncCtx.Lock() syncCtx.Lock()
defer syncCtx.Unlock() defer syncCtx.Unlock()
p.cursorsMtx.Lock() p.cursorsMtx.Lock()
c := p.cursors[peer.String()] c := p.cursors[peer.ByteString()]
p.cursorsMtx.Unlock() p.cursorsMtx.Unlock()
if len(c) != int(p.bins) { if len(c) != int(p.bins) {
...@@ -252,14 +251,14 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8 ...@@ -252,14 +251,14 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) { func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) {
p.syncPeersMtx.Lock() p.syncPeersMtx.Lock()
syncCtx := p.syncPeers[po][peer.String()] syncCtx := p.syncPeers[po][peer.ByteString()]
p.syncPeersMtx.Unlock() p.syncPeersMtx.Unlock()
syncCtx.Lock() syncCtx.Lock()
defer syncCtx.Unlock() defer syncCtx.Unlock()
p.cursorsMtx.Lock() p.cursorsMtx.Lock()
c, ok := p.cursors[peer.String()] c, ok := p.cursors[peer.ByteString()]
p.cursorsMtx.Unlock() p.cursorsMtx.Unlock()
if !ok { if !ok {
...@@ -269,7 +268,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) ...@@ -269,7 +268,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
p.logger.Debugf("could not get cursors from peer %s: %v", peer.String(), err) p.logger.Debugf("could not get cursors from peer %s: %v", peer.String(), err)
} }
p.syncPeersMtx.Lock() p.syncPeersMtx.Lock()
delete(p.syncPeers[po], peer.String()) delete(p.syncPeers[po], peer.ByteString())
p.syncPeersMtx.Unlock() p.syncPeersMtx.Unlock()
return return
...@@ -277,7 +276,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) ...@@ -277,7 +276,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
// maybe blacklist for some time // maybe blacklist for some time
} }
p.cursorsMtx.Lock() p.cursorsMtx.Lock()
p.cursors[peer.String()] = cursors p.cursors[peer.ByteString()] = cursors
p.cursorsMtx.Unlock() p.cursorsMtx.Unlock()
c = cursors c = cursors
} }
...@@ -286,7 +285,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) ...@@ -286,7 +285,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
// what we expect it to be - dont do anything // what we expect it to be - dont do anything
if len(c) != int(p.bins) { if len(c) != int(p.bins) {
p.syncPeersMtx.Lock() p.syncPeersMtx.Lock()
delete(p.syncPeers[po], peer.String()) delete(p.syncPeers[po], peer.ByteString())
p.syncPeersMtx.Unlock() p.syncPeersMtx.Unlock()
return return
} }
...@@ -543,7 +542,7 @@ func isSyncing(p *Puller, addr swarm.Address) bool { ...@@ -543,7 +542,7 @@ func isSyncing(p *Puller, addr swarm.Address) bool {
defer p.syncPeersMtx.Unlock() defer p.syncPeersMtx.Unlock()
for _, bin := range p.syncPeers { for _, bin := range p.syncPeers {
for peer := range bin { for peer := range bin {
if addr.String() == peer { if addr.ByteString() == peer {
return true return true
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment