Commit 330ee558 authored by acud's avatar acud Committed by GitHub

feat(pusher): retry shallow receipts (#2049)

parent c31f442a
package pusher
var (
RetryInterval = &retryInterval
RetryCount = &retryCount
)
...@@ -17,7 +17,8 @@ type metrics struct { ...@@ -17,7 +17,8 @@ type metrics struct {
SyncTime prometheus.Histogram SyncTime prometheus.Histogram
ErrorTime prometheus.Histogram ErrorTime prometheus.Histogram
ReceiptDepth *prometheus.CounterVec ReceiptDepth *prometheus.CounterVec
ShallowReceiptDepth *prometheus.CounterVec
} }
func newMetrics() metrics { func newMetrics() metrics {
...@@ -72,6 +73,15 @@ func newMetrics() metrics { ...@@ -72,6 +73,15 @@ func newMetrics() metrics {
}, },
[]string{"depth"}, []string{"depth"},
), ),
ShallowReceiptDepth: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "shallow_receipt_depth",
Help: "Counter of shallow receipts received at different depths.",
},
[]string{"depth"},
),
} }
} }
......
...@@ -34,6 +34,7 @@ type Service struct { ...@@ -34,6 +34,7 @@ type Service struct {
networkID uint64 networkID uint64
storer storage.Storer storer storage.Storer
pushSyncer pushsync.PushSyncer pushSyncer pushsync.PushSyncer
depther topology.NeighborhoodDepther
logger logging.Logger logger logging.Logger
tag *tags.Tags tag *tags.Tags
tracer *tracing.Tracer tracer *tracing.Tracer
...@@ -45,15 +46,20 @@ type Service struct { ...@@ -45,15 +46,20 @@ type Service struct {
var ( var (
retryInterval = 5 * time.Second // time interval between retries retryInterval = 5 * time.Second // time interval between retries
concurrentJobs = 10 // how many chunks to push simultaneously concurrentJobs = 10 // how many chunks to push simultaneously
retryCount = 6
) )
var ErrInvalidAddress = errors.New("invalid address") var (
ErrInvalidAddress = errors.New("invalid address")
ErrShallowReceipt = errors.New("shallow recipt")
)
func New(networkID uint64, storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer, warmupTime time.Duration) *Service { func New(networkID uint64, storer storage.Storer, depther topology.NeighborhoodDepther, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer, warmupTime time.Duration) *Service {
service := &Service{ service := &Service{
networkID: networkID, networkID: networkID,
storer: storer, storer: storer,
pushSyncer: pushSyncer, pushSyncer: pushSyncer,
depther: depther,
tag: tagger, tag: tagger,
logger: logger, logger: logger,
tracer: tracer, tracer: tracer,
...@@ -80,6 +86,7 @@ func (s *Service) chunksWorker(warmupTime time.Duration) { ...@@ -80,6 +86,7 @@ func (s *Service) chunksWorker(warmupTime time.Duration) {
mtx sync.Mutex mtx sync.Mutex
span opentracing.Span span opentracing.Span
logger *logrus.Entry logger *logrus.Entry
retryCounter = make(map[string]int)
) )
defer timer.Stop() defer timer.Stop()
...@@ -115,7 +122,9 @@ LOOP: ...@@ -115,7 +122,9 @@ LOOP:
} }
if span == nil { if span == nil {
mtx.Lock()
span, logger, ctx = s.tracer.StartSpanFromContext(cctx, "pusher-sync-batch", s.logger) span, logger, ctx = s.tracer.StartSpanFromContext(cctx, "pusher-sync-batch", s.logger)
mtx.Unlock()
} }
// postpone a retry only after we've finished processing everything in index // postpone a retry only after we've finished processing everything in index
...@@ -154,19 +163,20 @@ LOOP: ...@@ -154,19 +163,20 @@ LOOP:
storerPeer swarm.Address storerPeer swarm.Address
) )
defer func() { defer func() {
mtx.Lock()
if err == nil { if err == nil {
s.metrics.TotalSynced.Inc() s.metrics.TotalSynced.Inc()
s.metrics.SyncTime.Observe(time.Since(startTime).Seconds()) s.metrics.SyncTime.Observe(time.Since(startTime).Seconds())
// only print this if there was no error while sending the chunk // only print this if there was no error while sending the chunk
logger.Tracef("pusher: pushed chunk %s to node %s", ch.Address().String(), storerPeer.String())
po := swarm.Proximity(ch.Address().Bytes(), storerPeer.Bytes()) po := swarm.Proximity(ch.Address().Bytes(), storerPeer.Bytes())
logger.Tracef("pusher: pushed chunk %s to node %s, receipt depth %d", ch.Address().String(), storerPeer.String(), po)
s.metrics.ReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc() s.metrics.ReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc()
delete(retryCounter, ch.Address().ByteString())
} else { } else {
s.metrics.TotalErrors.Inc() s.metrics.TotalErrors.Inc()
s.metrics.ErrorTime.Observe(time.Since(startTime).Seconds()) s.metrics.ErrorTime.Observe(time.Since(startTime).Seconds())
logger.Tracef("pusher: cannot push chunk %s: %v", ch.Address().String(), err) logger.Tracef("pusher: cannot push chunk %s: %v", ch.Address().String(), err)
} }
mtx.Lock()
delete(inflight, ch.Address().String()) delete(inflight, ch.Address().String())
mtx.Unlock() mtx.Unlock()
<-sem <-sem
...@@ -200,6 +210,22 @@ LOOP: ...@@ -200,6 +210,22 @@ LOOP:
err = fmt.Errorf("pusher: receipt storer address: %w", err) err = fmt.Errorf("pusher: receipt storer address: %w", err)
return return
} }
po := swarm.Proximity(ch.Address().Bytes(), storerPeer.Bytes())
d := s.depther.NeighborhoodDepth()
if po < d {
mtx.Lock()
retryCounter[ch.Address().ByteString()]++
if retryCounter[ch.Address().ByteString()] < retryCount {
mtx.Unlock()
err = fmt.Errorf("pusher: shallow receipt depth %d, want at least %d", po, d)
s.metrics.ShallowReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc()
return
}
mtx.Unlock()
} else {
s.metrics.ReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc()
}
} }
if err = s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil { if err = s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil {
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"errors" "errors"
"io/ioutil" "io/ioutil"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
...@@ -87,7 +88,7 @@ func TestSendChunkToSyncWithTag(t *testing.T) { ...@@ -87,7 +88,7 @@ func TestSendChunkToSyncWithTag(t *testing.T) {
return receipt, nil return receipt, nil
}) })
mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer)) mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
defer storer.Close() defer storer.Close()
defer p.Close() defer p.Close()
...@@ -143,7 +144,7 @@ func TestSendChunkToPushSyncWithoutTag(t *testing.T) { ...@@ -143,7 +144,7 @@ func TestSendChunkToPushSyncWithoutTag(t *testing.T) {
return receipt, nil return receipt, nil
}) })
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer)) _, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
defer storer.Close() defer storer.Close()
defer p.Close() defer p.Close()
...@@ -228,7 +229,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) { ...@@ -228,7 +229,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
return receipt, nil return receipt, nil
}) })
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer)) _, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
defer storer.Close() defer storer.Close()
defer p.Close() defer p.Close()
...@@ -283,7 +284,7 @@ func TestPusherClose(t *testing.T) { ...@@ -283,7 +284,7 @@ func TestPusherClose(t *testing.T) {
return receipt, nil return receipt, nil
}) })
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer)) _, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
chunk := testingc.GenerateTestRandomChunk() chunk := testingc.GenerateTestRandomChunk()
...@@ -361,6 +362,61 @@ func TestPusherClose(t *testing.T) { ...@@ -361,6 +362,61 @@ func TestPusherClose(t *testing.T) {
} }
} }
func TestPusherRetryShallow(t *testing.T) {
defer func(d time.Duration, retryCount int) {
*pusher.RetryInterval = d
*pusher.RetryCount = retryCount
}(*pusher.RetryInterval, *pusher.RetryCount)
*pusher.RetryInterval = 500 * time.Millisecond
*pusher.RetryCount = 3
var (
pivotPeer = swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer = swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
key, _ = crypto.GenerateSecp256k1Key()
signer = crypto.NewDefaultSigner(key)
callCount = int32(0)
)
pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
atomic.AddInt32(&callCount, 1)
signature, _ := signer.Sign(chunk.Address().Bytes())
receipt := &pushsync.Receipt{
Address: swarm.NewAddress(chunk.Address().Bytes()),
Signature: signature,
}
return receipt, nil
})
// create the pivot peer pusher with depth 31, this makes
// sure that virtually any receipt generated by the random
// key will be considered too shallow
_, ps, storer := createPusher(t, pivotPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(31))
defer ps.Close()
// generate a chunk at PO 1 with closestPeer, meaning that we get a
// receipt which is shallower than the pivot peer's depth, resulting
// in retries
chunk := testingc.GenerateTestRandomChunkAt(closestPeer, 1)
_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
c := 0
for i := 0; i < 5; i++ {
c = int(atomic.LoadInt32(&callCount))
if c == *pusher.RetryCount {
return
}
if c > *pusher.RetryCount {
t.Fatalf("too many retries. got %d want %d", c, *pusher.RetryCount)
}
time.Sleep(1 * time.Second)
}
t.Fatalf("timed out waiting for retries. got %d want %d", c, *pusher.RetryCount)
}
func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, mockOpts ...mock.Option) (*tags.Tags, *pusher.Service, *Store) { func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, mockOpts ...mock.Option) (*tags.Tags, *pusher.Service, *Store) {
t.Helper() t.Helper()
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
......
...@@ -488,7 +488,7 @@ type pushResult struct { ...@@ -488,7 +488,7 @@ type pushResult struct {
attempted bool attempted bool
} }
const failureThreshold = 3 const failureThreshold = 2
type failedRequestCache struct { type failedRequestCache struct {
mtx sync.RWMutex mtx sync.RWMutex
......
...@@ -873,14 +873,8 @@ func TestFailureRequestCache(t *testing.T) { ...@@ -873,14 +873,8 @@ func TestFailureRequestCache(t *testing.T) {
} }
cache.RecordFailure(peer, chunk) cache.RecordFailure(peer, chunk)
if !cache.Useful(peer, chunk) {
t.Fatal("incorrect cache state after 2nd failure")
}
cache.RecordFailure(peer, chunk)
if cache.Useful(peer, chunk) { if cache.Useful(peer, chunk) {
t.Fatal("peer should no longer be useful") t.Fatal("incorrect cache state after 2nd failure")
} }
}) })
...@@ -901,7 +895,6 @@ func TestFailureRequestCache(t *testing.T) { ...@@ -901,7 +895,6 @@ func TestFailureRequestCache(t *testing.T) {
// the previous failed request and the peer should still be useful after // the previous failed request and the peer should still be useful after
// more failures // more failures
cache.RecordFailure(peer, chunk) cache.RecordFailure(peer, chunk)
cache.RecordFailure(peer, chunk)
if !cache.Useful(peer, chunk) { if !cache.Useful(peer, chunk) {
t.Fatal("peer should still be useful after intermittent success") t.Fatal("peer should still be useful after intermittent success")
...@@ -986,7 +979,7 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) { ...@@ -986,7 +979,7 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
t.Fatalf("tags initialization error") t.Fatalf("tags initialization error")
} }
for i := 0; i < 3; i++ { for i := 0; i < 2; i++ {
_, err := psPivot.PushChunkToClosest(context.Background(), chunk) _, err := psPivot.PushChunkToClosest(context.Background(), chunk)
if err == nil { if err == nil {
t.Fatal("expected error while pushing") t.Fatal("expected error while pushing")
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
type mock struct { type mock struct {
peers []swarm.Address peers []swarm.Address
depth uint8
closestPeer swarm.Address closestPeer swarm.Address
closestPeerErr error closestPeerErr error
peersErr error peersErr error
...@@ -35,6 +36,12 @@ func WithAddPeersErr(err error) Option { ...@@ -35,6 +36,12 @@ func WithAddPeersErr(err error) Option {
}) })
} }
func WithNeighborhoodDepth(dd uint8) Option {
return optionFunc(func(d *mock) {
d.depth = dd
})
}
func WithClosestPeer(addr swarm.Address) Option { func WithClosestPeer(addr swarm.Address) Option {
return optionFunc(func(d *mock) { return optionFunc(func(d *mock) {
d.closestPeer = addr d.closestPeer = addr
...@@ -136,8 +143,8 @@ func (d *mock) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) { ...@@ -136,8 +143,8 @@ func (d *mock) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) {
return c, unsubscribe return c, unsubscribe
} }
func (*mock) NeighborhoodDepth() uint8 { func (m *mock) NeighborhoodDepth() uint8 {
return 0 return m.depth
} }
func (m *mock) IsWithinDepth(addr swarm.Address) bool { func (m *mock) IsWithinDepth(addr swarm.Address) bool {
......
...@@ -25,7 +25,7 @@ type Driver interface { ...@@ -25,7 +25,7 @@ type Driver interface {
ClosestPeerer ClosestPeerer
EachPeerer EachPeerer
EachNeighbor EachNeighbor
NeighborhoodDepth() uint8 NeighborhoodDepther
SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) SubscribePeersChange() (c <-chan struct{}, unsubscribe func())
io.Closer io.Closer
Halter Halter
...@@ -137,3 +137,7 @@ type Halter interface { ...@@ -137,3 +137,7 @@ type Halter interface {
// while allowing it to still run. // while allowing it to still run.
Halt() Halt()
} }
type NeighborhoodDepther interface {
NeighborhoodDepth() uint8
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment