Commit 799c5116 authored by acud's avatar acud Committed by GitHub

fix(puller): cursors length panic (#1851)

parent 4d918bf3
...@@ -23,7 +23,7 @@ import ( ...@@ -23,7 +23,7 @@ import (
) )
var ( var (
logMore = true // enable this to get more logging logMore = false // enable this to get more logging
) )
type Options struct { type Options struct {
...@@ -167,7 +167,12 @@ func (p *Puller) manage() { ...@@ -167,7 +167,12 @@ func (p *Puller) manage() {
} }
for _, v := range peersToRecalc { for _, v := range peersToRecalc {
p.recalcPeer(ctx, v.addr, v.po, depth) dontSync := p.recalcPeer(ctx, v.addr, v.po, depth)
// stopgap solution for peers that dont return the correct
// amount of cursors we expect
if dontSync {
peersDisconnected[v.addr.String()] = v
}
} }
for _, v := range peersDisconnected { for _, v := range peersDisconnected {
...@@ -186,13 +191,14 @@ func (p *Puller) disconnectPeer(ctx context.Context, peer swarm.Address, po uint ...@@ -186,13 +191,14 @@ func (p *Puller) disconnectPeer(ctx context.Context, peer swarm.Address, po uint
if logMore { if logMore {
p.logger.Debugf("puller disconnect cleanup peer %s po %d", peer, po) p.logger.Debugf("puller disconnect cleanup peer %s po %d", peer, po)
} }
syncCtx := p.syncPeers[po][peer.String()] // disconnectPeer is called under lock, this is safe if syncCtx, ok := p.syncPeers[po][peer.String()]; ok {
syncCtx.gone() // disconnectPeer is called under lock, this is safe
syncCtx.gone()
}
delete(p.syncPeers[po], peer.String()) delete(p.syncPeers[po], peer.String())
} }
func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8) { func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8) (dontSync bool) {
if logMore { if logMore {
p.logger.Debugf("puller recalculating peer %s po %d depth %d", peer, po, d) p.logger.Debugf("puller recalculating peer %s po %d depth %d", peer, po, d)
} }
...@@ -205,6 +211,10 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8 ...@@ -205,6 +211,10 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
c := p.cursors[peer.String()] c := p.cursors[peer.String()]
p.cursorsMtx.Unlock() p.cursorsMtx.Unlock()
if len(c) != int(p.bins) {
return true
}
var want, dontWant []uint8 var want, dontWant []uint8
if po >= d { if po >= d {
// within depth // within depth
...@@ -233,6 +243,7 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8 ...@@ -233,6 +243,7 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
} }
syncCtx.cancelBins(dontWant...) syncCtx.cancelBins(dontWant...)
return false
} }
func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) { func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) {
...@@ -261,6 +272,13 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) ...@@ -261,6 +272,13 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
c = cursors c = cursors
} }
// if length of returned cursors does not add up to
// what we expect it to be - dont do anything
if len(c) != int(p.bins) {
delete(p.syncPeers[po], peer.String())
return
}
for bin, cur := range c { for bin, cur := range c {
if bin == 0 || uint8(bin) < d { if bin == 0 || uint8(bin) < d {
continue continue
......
...@@ -6,8 +6,8 @@ package puller_test ...@@ -6,8 +6,8 @@ package puller_test
import ( import (
"errors" "errors"
"io/ioutil"
"math" "math"
"os"
"testing" "testing"
"time" "time"
...@@ -123,7 +123,7 @@ func TestSyncFlow_PeerWithinDepth_Live(t *testing.T) { ...@@ -123,7 +123,7 @@ func TestSyncFlow_PeerWithinDepth_Live(t *testing.T) {
), mockk.WithDepth(1), ), mockk.WithDepth(1),
}, },
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithLiveSyncReplies(tc.liveReplies...)}, pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithLiveSyncReplies(tc.liveReplies...)},
bins: 5, bins: 2,
}) })
t.Cleanup(func() { t.Cleanup(func() {
pullsync.Close() pullsync.Close()
...@@ -199,7 +199,7 @@ func TestSyncFlow_PeerWithinDepth_Historical(t *testing.T) { ...@@ -199,7 +199,7 @@ func TestSyncFlow_PeerWithinDepth_Historical(t *testing.T) {
), mockk.WithDepth(1), ), mockk.WithDepth(1),
}, },
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithAutoReply(), mockps.WithLiveSyncBlock()}, pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithAutoReply(), mockps.WithLiveSyncBlock()},
bins: 5, bins: 2,
}) })
defer puller.Close() defer puller.Close()
defer pullsync.Close() defer pullsync.Close()
...@@ -592,7 +592,7 @@ func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *moc ...@@ -592,7 +592,7 @@ func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *moc
s := mock.NewStateStore() s := mock.NewStateStore()
ps := mockps.NewPullSync(ops.pullSync...) ps := mockps.NewPullSync(ops.pullSync...)
kad := mockk.NewMockKademlia(ops.kad...) kad := mockk.NewMockKademlia(ops.kad...)
logger := logging.New(os.Stdout, 5) logger := logging.New(ioutil.Discard, 0)
o := puller.Options{ o := puller.Options{
Bins: ops.bins, Bins: ops.bins,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment