Commit 40dac40d authored by Esad Akar's avatar Esad Akar Committed by GitHub

feat: pushsync sanctions closest node on push error (#2066)

parent 4a0558e2
......@@ -5,8 +5,8 @@
package pushsync
var (
ProtocolName = protocolName
ProtocolVersion = protocolVersion
StreamName = streamName
FailedRequestCache = newFailedRequestCache
ProtocolName = protocolName
ProtocolVersion = protocolVersion
StreamName = streamName
NewPeerSkipList = newPeerSkipList
)
......@@ -17,7 +17,7 @@ type metrics struct {
TotalReplicatedError prometheus.Counter
TotalSendAttempts prometheus.Counter
TotalFailedSendAttempts prometheus.Counter
TotalFailedCacheHits prometheus.Counter
TotalSkippedPeers prometheus.Counter
}
func newMetrics() metrics {
......@@ -66,11 +66,11 @@ func newMetrics() metrics {
Name: "total_failed_send_attempts",
Help: "Total no of failed attempts to push chunk.",
}),
TotalFailedCacheHits: prometheus.NewCounter(prometheus.CounterOpts{
TotalSkippedPeers: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_failed_cache_hits",
Help: "Total FailedRequestCache hits",
Name: "total_skipped_peers",
Help: "Total no of peers skipped",
}),
}
}
......
......@@ -27,7 +27,6 @@ import (
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/tracing"
lru "github.com/hashicorp/golang-lru"
opentracing "github.com/opentracing/opentracing-go"
)
......@@ -38,8 +37,9 @@ const (
)
const (
maxPeers = 3
maxAttempts = 16
maxPeers = 3
maxAttempts = 16
skipPeerExpiration = time.Minute
)
var (
......@@ -72,8 +72,8 @@ type PushSync struct {
validStamp func(swarm.Chunk, []byte) (swarm.Chunk, error)
signer crypto.Signer
isFullNode bool
failedRequests *failedRequestCache
warmupPeriod time.Time
skipList *peerSkipList
}
var defaultTTL = 20 * time.Second // request time to live
......@@ -96,7 +96,7 @@ func New(address swarm.Address, streamer p2p.StreamerDisconnecter, storer storag
tracer: tracer,
validStamp: validStamp,
signer: signer,
failedRequests: newFailedRequestCache(),
skipList: newPeerSkipList(),
warmupPeriod: time.Now().Add(warmupTime),
}
return ps
......@@ -355,9 +355,16 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
Signature: r.Signature}, nil
}
type pushResult struct {
receipt *pb.Receipt
err error
attempted bool
}
func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllowed bool) (*pb.Receipt, error) {
span, logger, ctx := ps.tracer.StartSpanFromContext(ctx, "push-closest", ps.logger, opentracing.Tag{Key: "address", Value: ch.Address().String()})
defer span.Finish()
defer ps.skipList.PruneExpired()
var (
skipPeers []swarm.Address
......@@ -380,12 +387,14 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
// if ErrWantSelf is returned, it means we are the closest peer.
return nil, fmt.Errorf("closest peer: %w", err)
}
if !ps.failedRequests.Useful(peer, ch.Address()) {
skipPeers = append(skipPeers, peer)
ps.metrics.TotalFailedCacheHits.Inc()
skipPeers = append(skipPeers, peer)
if ps.skipList.ShouldSkip(peer) {
ps.metrics.TotalSkippedPeers.Inc()
continue
}
skipPeers = append(skipPeers, peer)
ps.metrics.TotalSendAttempts.Inc()
go func(peer swarm.Address, ch swarm.Chunk) {
......@@ -412,15 +421,19 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
select {
case r := <-resultC:
// receipt received for chunk
if r.receipt != nil {
ps.failedRequests.RecordSuccess(peer, ch.Address())
ps.skipList.PruneChunk(ch.Address())
return r.receipt, nil
}
if r.err != nil && r.attempted {
ps.failedRequests.RecordFailure(peer, ch.Address())
ps.metrics.TotalFailedSendAttempts.Inc()
// if the node has warmed up AND no other closer peer has been tried
if time.Now().After(ps.warmupPeriod) && !ps.skipList.HasChunk(ch.Address()) {
ps.skipList.Add(peer, ch.Address(), skipPeerExpiration)
}
}
// proceed to retrying if applicable
case <-ctx.Done():
return nil, ctx.Err()
}
......@@ -491,54 +504,69 @@ func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.C
return &receipt, true, nil
}
type pushResult struct {
receipt *pb.Receipt
err error
attempted bool
type peerSkipList struct {
sync.Mutex
chunks map[string]struct{}
skipExpiration map[string]time.Time
}
const failureThreshold = 2
type failedRequestCache struct {
mtx sync.RWMutex
cache *lru.Cache
func newPeerSkipList() *peerSkipList {
return &peerSkipList{
chunks: make(map[string]struct{}),
skipExpiration: make(map[string]time.Time),
}
}
func newFailedRequestCache() *failedRequestCache {
// not necessary to check error here if we use constant value
cache, _ := lru.New(1000)
return &failedRequestCache{cache: cache}
}
func (l *peerSkipList) Add(peer swarm.Address, chunk swarm.Address, expire time.Duration) {
l.Lock()
defer l.Unlock()
func keyForReq(peer swarm.Address, chunk swarm.Address) string {
return fmt.Sprintf("%s/%s", peer, chunk)
l.skipExpiration[peer.ByteString()] = time.Now().Add(expire)
l.chunks[chunk.ByteString()] = struct{}{}
}
func (f *failedRequestCache) RecordFailure(peer swarm.Address, chunk swarm.Address) {
f.mtx.Lock()
defer f.mtx.Unlock()
func (l *peerSkipList) ShouldSkip(peer swarm.Address) bool {
l.Lock()
defer l.Unlock()
peerStr := peer.ByteString()
val, found := f.cache.Get(keyForReq(peer, chunk))
if !found {
f.cache.Add(keyForReq(peer, chunk), 1)
return
if exp, has := l.skipExpiration[peerStr]; has {
// entry is expired
if exp.Before(time.Now()) {
delete(l.skipExpiration, peerStr)
return false
} else {
return true
}
}
count := val.(int) + 1
f.cache.Add(keyForReq(peer, chunk), count)
return false
}
func (l *peerSkipList) HasChunk(chunk swarm.Address) bool {
l.Lock()
defer l.Unlock()
_, has := l.chunks[chunk.ByteString()]
return has
}
func (f *failedRequestCache) RecordSuccess(peer swarm.Address, chunk swarm.Address) {
f.mtx.Lock()
defer f.mtx.Unlock()
f.cache.Remove(keyForReq(peer, chunk))
func (l *peerSkipList) PruneChunk(chunk swarm.Address) {
l.Lock()
defer l.Unlock()
delete(l.chunks, chunk.ByteString())
}
func (f *failedRequestCache) Useful(peer swarm.Address, chunk swarm.Address) bool {
f.mtx.RLock()
val, found := f.cache.Get(keyForReq(peer, chunk))
f.mtx.RUnlock()
if !found {
return true
func (l *peerSkipList) PruneExpired() {
l.Lock()
defer l.Unlock()
now := time.Now()
for k, v := range l.skipExpiration {
if v.Before(now) {
delete(l.skipExpiration, k)
}
}
return val.(int) < failureThreshold
}
......@@ -857,49 +857,35 @@ func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.
}
}
func TestFailureRequestCache(t *testing.T) {
cache := pushsync.FailedRequestCache()
peer := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000")
chunk := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
t.Run("not useful after threshold", func(t *testing.T) {
if !cache.Useful(peer, chunk) {
t.Fatal("incorrect initial cache state")
}
func TestPeerSkipList(t *testing.T) {
cache.RecordFailure(peer, chunk)
if !cache.Useful(peer, chunk) {
t.Fatal("incorrect cache state after 1st failure")
}
skipList := pushsync.NewPeerSkipList()
cache.RecordFailure(peer, chunk)
if cache.Useful(peer, chunk) {
t.Fatal("incorrect cache state after 2nd failure")
}
})
addr1 := testingc.GenerateTestRandomChunk().Address()
addr2 := testingc.GenerateTestRandomChunk().Address()
t.Run("reset after success", func(t *testing.T) {
cache.RecordSuccess(peer, chunk)
if !cache.Useful(peer, chunk) {
t.Fatal("incorrect cache state after success")
}
skipList.Add(addr1, addr2, time.Millisecond*10)
cache.RecordFailure(peer, chunk)
if !skipList.ShouldSkip(addr1) {
t.Fatal("peer should be skipped")
}
if !cache.Useful(peer, chunk) {
t.Fatal("incorrect cache state after first failure")
}
if !skipList.HasChunk(addr2) {
t.Fatal("chunk is missing")
}
cache.RecordSuccess(peer, chunk)
// success should remove the peer from failed cache. We should have swallowed
// the previous failed request and the peer should still be useful after
// more failures
cache.RecordFailure(peer, chunk)
time.Sleep(time.Millisecond * 11)
if !cache.Useful(peer, chunk) {
t.Fatal("peer should still be useful after intermittent success")
}
})
skipList.PruneExpired()
if skipList.ShouldSkip(addr1) {
t.Fatal("peer should be not be skipped")
}
skipList.PruneChunk(addr2)
if skipList.HasChunk(addr2) {
t.Fatal("chunk should be missing")
}
}
func TestPushChunkToClosestSkipFailed(t *testing.T) {
......@@ -910,31 +896,25 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
peer1 := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
peer2 := swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000")
peer3 := swarm.MustParseHexAddress("4000000000000000000000000000000000000000000000000000000000000000")
peer4 := swarm.MustParseHexAddress("9000000000000000000000000000000000000000000000000000000000000000")
peer1 := swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000")
peer2 := swarm.MustParseHexAddress("4000000000000000000000000000000000000000000000000000000000000000")
peer3 := swarm.MustParseHexAddress("3000000000000000000000000000000000000000000000000000000000000000")
// peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to
psPeer1, storerPeer1, _, peerAccounting1 := createPushSyncNode(t, peer1, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
psPeer1, storerPeer1, _, _ := createPushSyncNode(t, peer1, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer1.Close()
psPeer2, storerPeer2, _, peerAccounting2 := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
psPeer2, storerPeer2, _, _ := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer2.Close()
psPeer3, storerPeer3, _, peerAccounting3 := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
psPeer3, storerPeer3, _, _ := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer3.Close()
psPeer4, storerPeer4, _, peerAccounting4 := createPushSyncNode(
t, peer4, defaultPrices, nil, nil, defaultSigner,
mock.WithClosestPeerErr(topology.ErrWantSelf),
mock.WithIsWithinFunc(func(_ swarm.Address) bool { return true }),
var (
lock sync.Mutex
count int
)
defer storerPeer4.Close()
triggerCount := 0
var lock sync.Mutex
recorder := streamtest.New(
streamtest.WithPeerProtocols(
......@@ -942,7 +922,6 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
peer1.String(): psPeer1.Protocol(),
peer2.String(): psPeer2.Protocol(),
peer3.String(): psPeer3.Protocol(),
peer4.String(): psPeer4.Protocol(),
},
),
streamtest.WithMiddlewares(
......@@ -950,11 +929,11 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
lock.Lock()
defer lock.Unlock()
count++
if triggerCount < 6 {
triggerCount++
if count <= 2 {
stream.Close()
return errors.New("new error")
return errors.New("peer error")
}
if err := h(ctx, peer, stream); err != nil {
......@@ -969,32 +948,9 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
streamtest.WithBaseAddr(pivotNode),
)
psPivot, storerPivot, pivotTags, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithPeers(peer1, peer2, peer3, peer4))
psPivot, storerPivot, _, _ := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithPeers(peer1, peer2, peer3))
defer storerPivot.Close()
ta, err := pivotTags.Create(1)
if err != nil {
t.Fatal(err)
}
chunk = chunk.WithTagID(ta.Uid)
ta1, err := pivotTags.Get(ta.Uid)
if err != nil {
t.Fatal(err)
}
if ta1.Get(tags.StateSent) != 0 || ta1.Get(tags.StateSynced) != 0 {
t.Fatalf("tags initialization error")
}
for i := 0; i < 2; i++ {
_, err := psPivot.PushChunkToClosest(context.Background(), chunk)
if err == nil {
t.Fatal("expected error while pushing")
}
}
// Trigger the sending of chunk to the closest node
receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
if err != nil {
t.Fatal(err)
......@@ -1005,55 +961,19 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
}
// this intercepts the outgoing delivery message
waitOnRecordAndTest(t, peer4, recorder, chunk.Address(), chunk.Data())
waitOnRecordAndTest(t, peer3, recorder, chunk.Address(), chunk.Data())
// this intercepts the incoming receipt message
waitOnRecordAndTest(t, peer4, recorder, chunk.Address(), nil)
waitOnRecordAndTest(t, peer3, recorder, chunk.Address(), nil)
ta2, err := pivotTags.Get(ta.Uid)
if err != nil {
t.Fatal(err)
}
// out of 4, 3 peers should return accouting error. So we should have effectively
// sent only 1 msg
if ta2.Get(tags.StateSent) != 7 {
t.Fatalf("tags error")
}
balance, err := pivotAccounting.Balance(peer4)
if err != nil {
t.Fatal(err)
}
if balance.Int64() != -int64(fixedPrice) {
t.Fatalf("unexpected balance on pivot. want %d got %d", -int64(fixedPrice), balance)
}
balance4, err := peerAccounting4.Balance(pivotNode)
if err != nil {
t.Fatal(err)
want := false
if got, _ := storerPeer2.Has(context.Background(), chunk.Address()); got != want {
t.Fatalf("got %v, want %v", got, want)
}
if balance4.Int64() != int64(fixedPrice) {
t.Fatalf("unexpected balance on peer4. want %d got %d", int64(fixedPrice), balance4)
}
for _, p := range []struct {
addr swarm.Address
acct accounting.Interface
}{
{peer1, peerAccounting1},
{peer2, peerAccounting2},
{peer3, peerAccounting3},
} {
bal, err := p.acct.Balance(p.addr)
if err != nil {
t.Fatal(err)
}
if bal.Int64() != 0 {
t.Fatalf("unexpected balance on %s. want %d got %d", p.addr, 0, bal)
}
want = true
if got, _ := storerPeer3.Has(context.Background(), chunk.Address()); got != want {
t.Fatalf("got %v, want %v", got, want)
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment