Commit 36efbe0e authored by istae's avatar istae Committed by GitHub

fix: add to skiplist on error in goroutine (#2311)

parent 8500410a
...@@ -318,7 +318,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo ...@@ -318,7 +318,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
// in which case we should return immediately. // in which case we should return immediately.
// if ErrWantSelf is returned, it means we are the closest peer. // if ErrWantSelf is returned, it means we are the closest peer.
if errors.Is(err, topology.ErrWantSelf) { if errors.Is(err, topology.ErrWantSelf) {
if time.Now().Before(ps.warmupPeriod) { if !ps.warmedUp() {
return nil, ErrWarmup return nil, ErrWarmup
} }
...@@ -375,6 +375,12 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo ...@@ -375,6 +375,12 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
} }
if err != nil { if err != nil {
logger.Debugf("could not push to peer %s: %v", peer, err) logger.Debugf("could not push to peer %s: %v", peer, err)
// if the node has warmed up AND no other closer peer has been tried
if ps.warmedUp() && !ps.skipList.HasChunk(ch.Address()) {
ps.skipList.Add(peer, ch.Address(), skipPeerExpiration)
}
resultC <- &pushResult{err: err, attempted: attempted} resultC <- &pushResult{err: err, attempted: attempted}
return return
} }
...@@ -393,11 +399,6 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo ...@@ -393,11 +399,6 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
} }
if r.err != nil && r.attempted { if r.err != nil && r.attempted {
ps.metrics.TotalFailedSendAttempts.Inc() ps.metrics.TotalFailedSendAttempts.Inc()
// if the node has warmed up AND no other closer peer has been tried
if time.Now().After(ps.warmupPeriod) && !ps.skipList.HasChunk(ch.Address()) {
ps.skipList.Add(peer, ch.Address(), skipPeerExpiration)
}
} }
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, ctx.Err()
...@@ -536,6 +537,10 @@ func (ps *PushSync) pushToNeighbour(peer swarm.Address, ch swarm.Chunk, origin b ...@@ -536,6 +537,10 @@ func (ps *PushSync) pushToNeighbour(peer swarm.Address, ch swarm.Chunk, origin b
err = ps.accounting.Credit(peer, receiptPrice, origin) err = ps.accounting.Credit(peer, receiptPrice, origin)
} }
func (ps *PushSync) warmedUp() bool {
return time.Now().After(ps.warmupPeriod)
}
type peerSkipList struct { type peerSkipList struct {
sync.Mutex sync.Mutex
chunks map[string]struct{} chunks map[string]struct{}
......
...@@ -130,16 +130,19 @@ func TestReplicateBeforeReceipt(t *testing.T) { ...@@ -130,16 +130,19 @@ func TestReplicateBeforeReceipt(t *testing.T) {
// it's address is closer to the chunk than secondPeer but it will not receive the chunk // it's address is closer to the chunk than secondPeer but it will not receive the chunk
psEmpty, storerEmpty, _, _ := createPushSyncNode(t, emptyPeer, defaultPrices, nil, nil, defaultSigner) psEmpty, storerEmpty, _, _ := createPushSyncNode(t, emptyPeer, defaultPrices, nil, nil, defaultSigner)
defer storerEmpty.Close() defer storerEmpty.Close()
emptyRecorder := streamtest.New(streamtest.WithProtocols(psEmpty.Protocol()), streamtest.WithBaseAddr(secondPeer)) emptyRecorder := streamtest.New(streamtest.WithProtocols(psEmpty.Protocol()), streamtest.WithBaseAddr(secondPeer))
// node that is connected to closestPeer // node that is connected to closestPeer
// will receieve chunk from closestPeer // will receieve chunk from closestPeer
psSecond, storerSecond, _, secondAccounting := createPushSyncNode(t, secondPeer, defaultPrices, emptyRecorder, nil, defaultSigner, mock.WithPeers(emptyPeer), WithinDepthMock) psSecond, storerSecond, _, secondAccounting := createPushSyncNode(t, secondPeer, defaultPrices, emptyRecorder, nil, defaultSigner, mock.WithPeers(emptyPeer), WithinDepthMock)
defer storerSecond.Close() defer storerSecond.Close()
secondRecorder := streamtest.New(streamtest.WithProtocols(psSecond.Protocol()), streamtest.WithBaseAddr(closestPeer)) secondRecorder := streamtest.New(streamtest.WithProtocols(psSecond.Protocol()), streamtest.WithBaseAddr(closestPeer))
psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock) psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer.Close() defer storerPeer.Close()
recorder := streamtest.New(streamtest.WithProtocols(psStorer.Protocol()), streamtest.WithBaseAddr(pivotNode)) recorder := streamtest.New(streamtest.WithProtocols(psStorer.Protocol()), streamtest.WithBaseAddr(pivotNode))
// pivot node needs the streamer since the chunk is intercepted by // pivot node needs the streamer since the chunk is intercepted by
...@@ -163,9 +166,6 @@ func TestReplicateBeforeReceipt(t *testing.T) { ...@@ -163,9 +166,6 @@ func TestReplicateBeforeReceipt(t *testing.T) {
// this intercepts the incoming receipt message // this intercepts the incoming receipt message
waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil) waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil)
// sleep for a bit to allow the second peer to the store replicated chunk
time.Sleep(time.Millisecond * 500)
// this intercepts the outgoing delivery message from storer node to second storer node // this intercepts the outgoing delivery message from storer node to second storer node
waitOnRecordAndTest(t, secondPeer, secondRecorder, chunk.Address(), chunk.Data()) waitOnRecordAndTest(t, secondPeer, secondRecorder, chunk.Address(), chunk.Data())
...@@ -262,9 +262,6 @@ func TestFailToReplicateBeforeReceipt(t *testing.T) { ...@@ -262,9 +262,6 @@ func TestFailToReplicateBeforeReceipt(t *testing.T) {
// this intercepts the incoming receipt message // this intercepts the incoming receipt message
waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil) waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil)
// sleep for a bit to allow the second peer to the store replicated chunk
time.Sleep(time.Millisecond * 500)
// this intercepts the outgoing delivery message from storer node to second storer node // this intercepts the outgoing delivery message from storer node to second storer node
waitOnRecordAndTest(t, secondPeer, secondRecorder, chunk.Address(), chunk.Data()) waitOnRecordAndTest(t, secondPeer, secondRecorder, chunk.Address(), chunk.Data())
...@@ -929,7 +926,7 @@ func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices p ...@@ -929,7 +926,7 @@ func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices p
return ch, nil return ch, nil
} }
return pushsync.New(addr, blockHash.Bytes(), recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil, 0), storer, mtag return pushsync.New(addr, blockHash.Bytes(), recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil, -1), storer, mtag
} }
func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) { func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
......
...@@ -107,7 +107,7 @@ func (d *mock) Peers() []swarm.Address { ...@@ -107,7 +107,7 @@ func (d *mock) Peers() []swarm.Address {
return d.peers return d.peers
} }
func (d *mock) ClosestPeer(addr swarm.Address, _ bool, skipPeers ...swarm.Address) (peerAddr swarm.Address, err error) { func (d *mock) ClosestPeer(addr swarm.Address, wantSelf bool, skipPeers ...swarm.Address) (peerAddr swarm.Address, err error) {
if len(skipPeers) == 0 { if len(skipPeers) == 0 {
if d.closestPeerErr != nil { if d.closestPeerErr != nil {
return d.closestPeer, d.closestPeerErr return d.closestPeer, d.closestPeerErr
...@@ -147,8 +147,13 @@ func (d *mock) ClosestPeer(addr swarm.Address, _ bool, skipPeers ...swarm.Addres ...@@ -147,8 +147,13 @@ func (d *mock) ClosestPeer(addr swarm.Address, _ bool, skipPeers ...swarm.Addres
} }
if peerAddr.IsZero() { if peerAddr.IsZero() {
if wantSelf {
return peerAddr, topology.ErrWantSelf
} else {
return peerAddr, topology.ErrNotFound return peerAddr, topology.ErrNotFound
} }
}
return peerAddr, nil return peerAddr, nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment