Commit 8598ed22 authored by Alok Nerurkar's avatar Alok Nerurkar Committed by GitHub

refactor(pushsync): Retry improvements (#1662)

- Don't account for failed attempts during retries
- LRU Cache to remember failed traces and select better peers
- Metrics
parent 553a1b56
...@@ -20,6 +20,7 @@ require ( ...@@ -20,6 +20,7 @@ require (
github.com/gorilla/mux v1.7.4 github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru v0.5.4
github.com/kardianos/service v1.2.0 github.com/kardianos/service v1.2.0
github.com/koron/go-ssdp v0.0.2 // indirect github.com/koron/go-ssdp v0.0.2 // indirect
github.com/kr/text v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect
......
...@@ -30,11 +30,13 @@ var ( ...@@ -30,11 +30,13 @@ var (
) )
type Recorder struct { type Recorder struct {
base swarm.Address base swarm.Address
records map[string][]*Record records map[string][]*Record
recordsMu sync.Mutex recordsMu sync.Mutex
protocols []p2p.ProtocolSpec protocols []p2p.ProtocolSpec
middlewares []p2p.HandlerMiddleware middlewares []p2p.HandlerMiddleware
streamErr func(swarm.Address, string, string, string) error
protocolsWithPeers map[string]p2p.ProtocolSpec
} }
func WithProtocols(protocols ...p2p.ProtocolSpec) Option { func WithProtocols(protocols ...p2p.ProtocolSpec) Option {
...@@ -43,6 +45,12 @@ func WithProtocols(protocols ...p2p.ProtocolSpec) Option { ...@@ -43,6 +45,12 @@ func WithProtocols(protocols ...p2p.ProtocolSpec) Option {
}) })
} }
func WithPeerProtocols(protocolsWithPeers map[string]p2p.ProtocolSpec) Option {
return optionFunc(func(r *Recorder) {
r.protocolsWithPeers = protocolsWithPeers
})
}
func WithMiddlewares(middlewares ...p2p.HandlerMiddleware) Option { func WithMiddlewares(middlewares ...p2p.HandlerMiddleware) Option {
return optionFunc(func(r *Recorder) { return optionFunc(func(r *Recorder) {
r.middlewares = append(r.middlewares, middlewares...) r.middlewares = append(r.middlewares, middlewares...)
...@@ -55,6 +63,12 @@ func WithBaseAddr(a swarm.Address) Option { ...@@ -55,6 +63,12 @@ func WithBaseAddr(a swarm.Address) Option {
}) })
} }
func WithStreamError(streamErr func(swarm.Address, string, string, string) error) Option {
return optionFunc(func(r *Recorder) {
r.streamErr = streamErr
})
}
func New(opts ...Option) *Recorder { func New(opts ...Option) *Recorder {
r := &Recorder{ r := &Recorder{
records: make(map[string][]*Record), records: make(map[string][]*Record),
...@@ -73,6 +87,12 @@ func (r *Recorder) SetProtocols(protocols ...p2p.ProtocolSpec) { ...@@ -73,6 +87,12 @@ func (r *Recorder) SetProtocols(protocols ...p2p.ProtocolSpec) {
} }
func (r *Recorder) NewStream(ctx context.Context, addr swarm.Address, h p2p.Headers, protocolName, protocolVersion, streamName string) (p2p.Stream, error) { func (r *Recorder) NewStream(ctx context.Context, addr swarm.Address, h p2p.Headers, protocolName, protocolVersion, streamName string) (p2p.Stream, error) {
if r.streamErr != nil {
err := r.streamErr(addr, protocolName, protocolVersion, streamName)
if err != nil {
return nil, err
}
}
recordIn := newRecord() recordIn := newRecord()
recordOut := newRecord() recordOut := newRecord()
streamOut := newStream(recordIn, recordOut) streamOut := newStream(recordIn, recordOut)
...@@ -80,16 +100,20 @@ func (r *Recorder) NewStream(ctx context.Context, addr swarm.Address, h p2p.Head ...@@ -80,16 +100,20 @@ func (r *Recorder) NewStream(ctx context.Context, addr swarm.Address, h p2p.Head
var handler p2p.HandlerFunc var handler p2p.HandlerFunc
var headler p2p.HeadlerFunc var headler p2p.HeadlerFunc
for _, p := range r.protocols { peerHandlers, ok := r.protocolsWithPeers[addr.String()]
if p.Name == protocolName && p.Version == protocolVersion { if !ok {
for _, s := range p.StreamSpecs { for _, p := range r.protocols {
if s.Name == streamName { if p.Name == protocolName && p.Version == protocolVersion {
handler = s.Handler peerHandlers = p
headler = s.Headler
}
} }
} }
} }
for _, s := range peerHandlers.StreamSpecs {
if s.Name == streamName {
handler = s.Handler
headler = s.Headler
}
}
if handler == nil { if handler == nil {
return nil, ErrStreamNotSupported return nil, ErrStreamNotSupported
} }
......
...@@ -579,6 +579,185 @@ func TestRecorder_recordErr(t *testing.T) { ...@@ -579,6 +579,185 @@ func TestRecorder_recordErr(t *testing.T) {
}, testErr) }, testErr)
} }
func TestRecorder_withPeerProtocols(t *testing.T) {
peer1 := swarm.MustParseHexAddress("1000000000000000000000000000000000000000000000000000000000000000")
peer2 := swarm.MustParseHexAddress("2000000000000000000000000000000000000000000000000000000000000000")
recorder := streamtest.New(
streamtest.WithPeerProtocols(map[string]p2p.ProtocolSpec{
peer1.String(): newTestProtocol(func(_ context.Context, peer p2p.Peer, stream p2p.Stream) error {
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
if _, err := rw.ReadString('\n'); err != nil {
return err
}
if _, err := rw.WriteString("handler 1\n"); err != nil {
return err
}
if err := rw.Flush(); err != nil {
return err
}
return nil
}),
peer2.String(): newTestProtocol(func(_ context.Context, peer p2p.Peer, stream p2p.Stream) error {
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
if _, err := rw.ReadString('\n'); err != nil {
return err
}
if _, err := rw.WriteString("handler 2\n"); err != nil {
return err
}
if err := rw.Flush(); err != nil {
return err
}
return nil
}),
}),
)
request := func(ctx context.Context, s p2p.Streamer, address swarm.Address) error {
stream, err := s.NewStream(ctx, address, nil, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
defer stream.Close()
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
if _, err := rw.WriteString("req\n"); err != nil {
return err
}
if err := rw.Flush(); err != nil {
return err
}
_, err = rw.ReadString('\n')
return err
}
err := request(context.Background(), recorder, peer1)
if err != nil {
t.Fatal(err)
}
records, err := recorder.Records(peer1, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
t.Fatal(err)
}
testRecords(t, records, [][2]string{
{
"req\n",
"handler 1\n",
},
}, nil)
err = request(context.Background(), recorder, peer2)
if err != nil {
t.Fatal(err)
}
records, err = recorder.Records(peer2, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
t.Fatal(err)
}
testRecords(t, records, [][2]string{
{
"req\n",
"handler 2\n",
},
}, nil)
}
func TestRecorder_withStreamError(t *testing.T) {
peer1 := swarm.MustParseHexAddress("1000000000000000000000000000000000000000000000000000000000000000")
peer2 := swarm.MustParseHexAddress("2000000000000000000000000000000000000000000000000000000000000000")
testErr := errors.New("dummy stream error")
recorder := streamtest.New(
streamtest.WithPeerProtocols(map[string]p2p.ProtocolSpec{
peer1.String(): newTestProtocol(func(_ context.Context, peer p2p.Peer, stream p2p.Stream) error {
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
if _, err := rw.ReadString('\n'); err != nil {
return err
}
if _, err := rw.WriteString("handler 1\n"); err != nil {
return err
}
if err := rw.Flush(); err != nil {
return err
}
return nil
}),
peer2.String(): newTestProtocol(func(_ context.Context, peer p2p.Peer, stream p2p.Stream) error {
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
if _, err := rw.ReadString('\n'); err != nil {
return err
}
if _, err := rw.WriteString("handler 2\n"); err != nil {
return err
}
if err := rw.Flush(); err != nil {
return err
}
return nil
}),
}),
streamtest.WithStreamError(func(addr swarm.Address, _, _, _ string) error {
if addr.String() == peer1.String() {
return testErr
}
return nil
}),
)
request := func(ctx context.Context, s p2p.Streamer, address swarm.Address) error {
stream, err := s.NewStream(ctx, address, nil, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
defer stream.Close()
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
if _, err := rw.WriteString("req\n"); err != nil {
return err
}
if err := rw.Flush(); err != nil {
return err
}
_, err = rw.ReadString('\n')
return err
}
err := request(context.Background(), recorder, peer1)
if err == nil {
t.Fatal("expected error on NewStream for peer")
}
err = request(context.Background(), recorder, peer2)
if err != nil {
t.Fatal(err)
}
records, err := recorder.Records(peer2, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
t.Fatal(err)
}
testRecords(t, records, [][2]string{
{
"req\n",
"handler 2\n",
},
}, nil)
}
const ( const (
testProtocolName = "testing" testProtocolName = "testing"
testProtocolVersion = "1.0.1" testProtocolVersion = "1.0.1"
......
...@@ -5,7 +5,8 @@ ...@@ -5,7 +5,8 @@
package pushsync package pushsync
var ( var (
ProtocolName = protocolName ProtocolName = protocolName
ProtocolVersion = protocolVersion ProtocolVersion = protocolVersion
StreamName = streamName StreamName = streamName
FailedRequestCache = newFailedRequestCache
) )
...@@ -10,11 +10,14 @@ import ( ...@@ -10,11 +10,14 @@ import (
) )
type metrics struct { type metrics struct {
TotalSent prometheus.Counter TotalSent prometheus.Counter
TotalReceived prometheus.Counter TotalReceived prometheus.Counter
TotalErrors prometheus.Counter TotalErrors prometheus.Counter
TotalReplicated prometheus.Counter TotalReplicated prometheus.Counter
TotalReplicatedError prometheus.Counter TotalReplicatedError prometheus.Counter
TotalSendAttempts prometheus.Counter
TotalFailedSendAttempts prometheus.Counter
FailedCacheHits *prometheus.CounterVec
} }
func newMetrics() metrics { func newMetrics() metrics {
...@@ -51,6 +54,27 @@ func newMetrics() metrics { ...@@ -51,6 +54,27 @@ func newMetrics() metrics {
Name: "total_replication_error", Name: "total_replication_error",
Help: "Total no of failed replication chunks.", Help: "Total no of failed replication chunks.",
}), }),
TotalSendAttempts: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_send_attempts",
Help: "Total no of attempts to push chunk.",
}),
TotalFailedSendAttempts: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_failed_send_attempts",
Help: "Total no of failed attempts to push chunk.",
}),
FailedCacheHits: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "failed_cache_hits",
Help: "FailedRequestCache hits",
},
[]string{"peer", "chunk"},
),
} }
} }
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/ethersphere/bee/pkg/accounting" "github.com/ethersphere/bee/pkg/accounting"
...@@ -26,6 +27,7 @@ import ( ...@@ -26,6 +27,7 @@ import (
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/tracing" "github.com/ethersphere/bee/pkg/tracing"
"github.com/hashicorp/golang-lru"
opentracing "github.com/opentracing/opentracing-go" opentracing "github.com/opentracing/opentracing-go"
) )
...@@ -36,11 +38,13 @@ const ( ...@@ -36,11 +38,13 @@ const (
) )
const ( const (
maxPeers = 5 maxPeers = 3
maxAttempts = 16
) )
var ( var (
ErrOutOfDepthReplication = errors.New("replication outside of the neighborhood") ErrOutOfDepthReplication = errors.New("replication outside of the neighborhood")
ErrNoPush = errors.New("could not push chunk")
) )
type PushSyncer interface { type PushSyncer interface {
...@@ -67,9 +71,10 @@ type PushSync struct { ...@@ -67,9 +71,10 @@ type PushSync struct {
validStamp func(swarm.Chunk, []byte) (swarm.Chunk, error) validStamp func(swarm.Chunk, []byte) (swarm.Chunk, error)
signer crypto.Signer signer crypto.Signer
isFullNode bool isFullNode bool
failedRequests *failedRequestCache
} }
var timeToLive = 5 * time.Second // request time to live var defaultTTL = 20 * time.Second // request time to live
var timeToWaitForPushsyncToNeighbor = 3 * time.Second // time to wait to get a receipt for a chunk var timeToWaitForPushsyncToNeighbor = 3 * time.Second // time to wait to get a receipt for a chunk
var nPeersToPushsync = 3 // number of peers to replicate to as receipt is sent upstream var nPeersToPushsync = 3 // number of peers to replicate to as receipt is sent upstream
...@@ -89,6 +94,7 @@ func New(address swarm.Address, streamer p2p.StreamerDisconnecter, storer storag ...@@ -89,6 +94,7 @@ func New(address swarm.Address, streamer p2p.StreamerDisconnecter, storer storag
tracer: tracer, tracer: tracer,
validStamp: validStamp, validStamp: validStamp,
signer: signer, signer: signer,
failedRequests: newFailedRequestCache(),
} }
return ps return ps
} }
...@@ -110,7 +116,7 @@ func (s *PushSync) Protocol() p2p.ProtocolSpec { ...@@ -110,7 +116,7 @@ func (s *PushSync) Protocol() p2p.ProtocolSpec {
// If the current node is the destination, it stores in the local store and sends a receipt. // If the current node is the destination, it stores in the local store and sends a receipt.
func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) { func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
w, r := protobuf.NewWriterAndReader(stream) w, r := protobuf.NewWriterAndReader(stream)
ctx, cancel := context.WithTimeout(ctx, timeToLive) ctx, cancel := context.WithTimeout(ctx, defaultTTL)
defer cancel() defer cancel()
defer func() { defer func() {
if err != nil { if err != nil {
...@@ -174,7 +180,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -174,7 +180,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunk.Address().String()}) span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunk.Address().String()})
defer span.Finish() defer span.Finish()
receipt, err := ps.pushToClosest(ctx, chunk) receipt, err := ps.pushToClosest(ctx, chunk, false)
if err != nil { if err != nil {
if errors.Is(err, topology.ErrWantSelf) { if errors.Is(err, topology.ErrWantSelf) {
_, err = ps.storer.Put(ctx, storage.ModePutSync, chunk) _, err = ps.storer.Put(ctx, storage.ModePutSync, chunk)
...@@ -300,7 +306,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -300,7 +306,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
// a receipt from that peer and returns error or nil based on the receiving and // a receipt from that peer and returns error or nil based on the receiving and
// the validity of the receipt. // the validity of the receipt.
func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Receipt, error) { func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Receipt, error) {
r, err := ps.pushToClosest(ctx, ch) r, err := ps.pushToClosest(ctx, ch, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -309,41 +315,23 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re ...@@ -309,41 +315,23 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
Signature: r.Signature}, nil Signature: r.Signature}, nil
} }
func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.Receipt, reterr error) { func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllowed bool) (*pb.Receipt, error) {
span, logger, ctx := ps.tracer.StartSpanFromContext(ctx, "push-closest", ps.logger, opentracing.Tag{Key: "address", Value: ch.Address().String()}) span, logger, ctx := ps.tracer.StartSpanFromContext(ctx, "push-closest", ps.logger, opentracing.Tag{Key: "address", Value: ch.Address().String()})
defer span.Finish() defer span.Finish()
var ( var (
skipPeers []swarm.Address skipPeers []swarm.Address
lastErr error allowedRetries = 1
resultC = make(chan *pushResult)
includeSelf = ps.isFullNode
) )
stamp, err := ch.Stamp().MarshalBinary() if retryAllowed {
if err != nil { // only originator retries
return nil, err allowedRetries = maxPeers
}
deferFuncs := make([]func(), 0)
defersFn := func() {
if len(deferFuncs) > 0 {
for _, deferFn := range deferFuncs {
deferFn()
}
deferFuncs = deferFuncs[:0]
}
} }
defer defersFn()
for i := 0; i < maxPeers; i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
defersFn()
includeSelf := ps.isFullNode
for i := maxAttempts; allowedRetries > 0 && i > 0; i-- {
// find the next closest peer // find the next closest peer
peer, err := ps.topologyDriver.ClosestPeer(ch.Address(), includeSelf, skipPeers...) peer, err := ps.topologyDriver.ClosestPeer(ch.Address(), includeSelf, skipPeers...)
if err != nil { if err != nil {
...@@ -352,86 +340,165 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.R ...@@ -352,86 +340,165 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.R
// if ErrWantSelf is returned, it means we are the closest peer. // if ErrWantSelf is returned, it means we are the closest peer.
return nil, fmt.Errorf("closest peer: %w", err) return nil, fmt.Errorf("closest peer: %w", err)
} }
if !ps.failedRequests.Useful(peer, ch.Address()) {
// save found peer (to be skipped if there is some error with him) skipPeers = append(skipPeers, peer)
ps.metrics.FailedCacheHits.WithLabelValues(peer.String(), ch.Address().String()).Inc()
continue
}
skipPeers = append(skipPeers, peer) skipPeers = append(skipPeers, peer)
ps.metrics.TotalSendAttempts.Inc()
deferFuncs = append(deferFuncs, func() { go func(peer swarm.Address, ch swarm.Chunk) {
if lastErr != nil { ctxd, canceld := context.WithTimeout(ctx, defaultTTL)
ps.metrics.TotalErrors.Inc() defer canceld()
logger.Errorf("pushsync: %v", lastErr)
}
})
// compute the price we pay for this receipt and reserve it for the rest of this function r, attempted, err := ps.pushPeer(ctxd, peer, ch)
receiptPrice := ps.pricer.PeerPrice(peer, ch.Address()) // attempted is true if we get past accounting and actually attempt
// to send the request to the peer. If we dont get past accounting, we
// should not count the retry and try with a different peer again
if attempted {
allowedRetries--
}
if err != nil {
logger.Debugf("could not push to peer %s: %v", peer, err)
resultC <- &pushResult{err: err, attempted: attempted}
return
}
select {
case resultC <- &pushResult{receipt: r}:
case <-ctx.Done():
}
}(peer, ch)
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) select {
if err != nil { case r := <-resultC:
lastErr = fmt.Errorf("new stream for peer %s: %w", peer.String(), err) if r.receipt != nil {
continue ps.failedRequests.RecordSuccess(peer, ch.Address())
return r.receipt, nil
}
if r.err != nil && r.attempted {
ps.failedRequests.RecordFailure(peer, ch.Address())
ps.metrics.TotalFailedSendAttempts.Inc()
}
// proceed to retrying if applicable
case <-ctx.Done():
return nil, ctx.Err()
} }
deferFuncs = append(deferFuncs, func() { go streamer.FullClose() }) }
// Reserve to see whether we can make the request return nil, ErrNoPush
err = ps.accounting.Reserve(ctx, peer, receiptPrice) }
if err != nil {
return nil, fmt.Errorf("reserve balance for peer %s: %w", peer.String(), err)
}
deferFuncs = append(deferFuncs, func() { ps.accounting.Release(peer, receiptPrice) })
w, r := protobuf.NewWriterAndReader(streamer)
ctxd, canceld := context.WithTimeout(ctx, timeToLive)
deferFuncs = append(deferFuncs, func() { canceld() })
if err := w.WriteMsgWithContext(ctxd, &pb.Delivery{
Address: ch.Address().Bytes(),
Data: ch.Data(),
Stamp: stamp,
}); err != nil {
_ = streamer.Reset()
lastErr = fmt.Errorf("chunk %s deliver to peer %s: %w", ch.Address().String(), peer.String(), err)
continue
}
ps.metrics.TotalSent.Inc() func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.Chunk) (*pb.Receipt, bool, error) {
// compute the price we pay for this receipt and reserve it for the rest of this function
receiptPrice := ps.pricer.PeerPrice(peer, ch.Address())
// if you manage to get a tag, just increment the respective counter // Reserve to see whether we can make the request
t, err := ps.tagger.Get(ch.TagID()) err := ps.accounting.Reserve(ctx, peer, receiptPrice)
if err == nil && t != nil { if err != nil {
err = t.Inc(tags.StateSent) return nil, false, fmt.Errorf("reserve balance for peer %s: %w", peer, err)
if err != nil { }
lastErr = fmt.Errorf("tag %d increment: %v", ch.TagID(), err) defer ps.accounting.Release(peer, receiptPrice)
err = lastErr
return nil, err
}
}
var receipt pb.Receipt stamp, err := ch.Stamp().MarshalBinary()
if err := r.ReadMsgWithContext(ctxd, &receipt); err != nil { if err != nil {
_ = streamer.Reset() return nil, false, err
lastErr = fmt.Errorf("chunk %s receive receipt from peer %s: %w", ch.Address().String(), peer.String(), err) }
continue
}
if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) { streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
// if the receipt is invalid, try to push to the next peer if err != nil {
lastErr = fmt.Errorf("invalid receipt. chunk %s, peer %s", ch.Address().String(), peer.String()) return nil, true, fmt.Errorf("new stream for peer %s: %w", peer, err)
continue }
} defer streamer.Close()
w, r := protobuf.NewWriterAndReader(streamer)
if err := w.WriteMsgWithContext(ctx, &pb.Delivery{
Address: ch.Address().Bytes(),
Data: ch.Data(),
Stamp: stamp,
}); err != nil {
_ = streamer.Reset()
return nil, true, fmt.Errorf("chunk %s deliver to peer %s: %w", ch.Address(), peer, err)
}
err = ps.accounting.Credit(peer, receiptPrice) ps.metrics.TotalSent.Inc()
// if you manage to get a tag, just increment the respective counter
t, err := ps.tagger.Get(ch.TagID())
if err == nil && t != nil {
err = t.Inc(tags.StateSent)
if err != nil { if err != nil {
return nil, err return nil, true, fmt.Errorf("tag %d increment: %v", ch.TagID(), err)
} }
}
return &receipt, nil var receipt pb.Receipt
if err := r.ReadMsgWithContext(ctx, &receipt); err != nil {
_ = streamer.Reset()
return nil, true, fmt.Errorf("chunk %s receive receipt from peer %s: %w", ch.Address(), peer, err)
} }
logger.Tracef("pushsync: chunk %s: reached %v peers", ch.Address(), maxPeers) if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) {
// if the receipt is invalid, try to push to the next peer
return nil, true, fmt.Errorf("invalid receipt. chunk %s, peer %s", ch.Address(), peer)
}
if lastErr != nil { err = ps.accounting.Credit(peer, receiptPrice)
return nil, lastErr if err != nil {
return nil, true, err
} }
return nil, topology.ErrNotFound return &receipt, true, nil
}
type pushResult struct {
receipt *pb.Receipt
err error
attempted bool
}
const failureThreshold = 3
type failedRequestCache struct {
mtx sync.RWMutex
cache *lru.Cache
}
func newFailedRequestCache() *failedRequestCache {
// not necessary to check error here if we use constant value
cache, _ := lru.New(1000)
return &failedRequestCache{cache: cache}
}
func keyForReq(peer swarm.Address, chunk swarm.Address) string {
return fmt.Sprintf("%s/%s", peer, chunk)
}
func (f *failedRequestCache) RecordFailure(peer swarm.Address, chunk swarm.Address) {
f.mtx.Lock()
defer f.mtx.Unlock()
val, found := f.cache.Get(keyForReq(peer, chunk))
if !found {
f.cache.Add(keyForReq(peer, chunk), 1)
return
}
count := val.(int) + 1
f.cache.Add(keyForReq(peer, chunk), count)
}
func (f *failedRequestCache) RecordSuccess(peer swarm.Address, chunk swarm.Address) {
f.mtx.Lock()
defer f.mtx.Unlock()
f.cache.Remove(keyForReq(peer, chunk))
}
func (f *failedRequestCache) Useful(peer swarm.Address, chunk swarm.Address) bool {
f.mtx.RLock()
val, found := f.cache.Get(keyForReq(peer, chunk))
f.mtx.RUnlock()
if !found {
return true
}
return val.(int) < failureThreshold
} }
...@@ -51,9 +51,9 @@ var ( ...@@ -51,9 +51,9 @@ var (
})) }))
) )
// TestSendChunkAndGetReceipt inserts a chunk as uploaded chunk in db. This triggers sending a chunk to the closest node // TestPushClosest inserts a chunk as uploaded chunk in db. This triggers sending a chunk to the closest node
// and expects a receipt. The message are intercepted in the outgoing stream to check for correctness. // and expects a receipt. The message are intercepted in the outgoing stream to check for correctness.
func TestSendChunkAndReceiveReceipt(t *testing.T) { func TestPushClosest(t *testing.T) {
// chunk data to upload // chunk data to upload
chunk := testingc.FixtureChunk("7000") chunk := testingc.FixtureChunk("7000")
...@@ -432,6 +432,7 @@ func TestPushChunkToNextClosest(t *testing.T) { ...@@ -432,6 +432,7 @@ func TestPushChunkToNextClosest(t *testing.T) {
defer lock.Unlock() defer lock.Unlock()
if fail { if fail {
fail = false fail = false
stream.Close()
return errors.New("peer not reachable") return errors.New("peer not reachable")
} }
...@@ -519,6 +520,133 @@ func TestPushChunkToNextClosest(t *testing.T) { ...@@ -519,6 +520,133 @@ func TestPushChunkToNextClosest(t *testing.T) {
} }
} }
func TestPushChunkToClosestFailedAttemptRetry(t *testing.T) {
// chunk data to upload
chunk := testingc.FixtureChunk("7000")
// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
peer1 := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
peer2 := swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000")
peer3 := swarm.MustParseHexAddress("9000000000000000000000000000000000000000000000000000000000000000")
peer4 := swarm.MustParseHexAddress("4000000000000000000000000000000000000000000000000000000000000000")
// peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to
psPeer1, storerPeer1, _, peerAccounting1 := createPushSyncNode(t, peer1, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer1.Close()
psPeer2, storerPeer2, _, peerAccounting2 := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer2.Close()
psPeer3, storerPeer3, _, peerAccounting3 := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer3.Close()
psPeer4, storerPeer4, _, peerAccounting4 := createPushSyncNode(t, peer4, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer4.Close()
recorder := streamtest.New(
streamtest.WithProtocols(
psPeer1.Protocol(),
psPeer2.Protocol(),
psPeer3.Protocol(),
psPeer4.Protocol(),
),
streamtest.WithBaseAddr(pivotNode),
)
pivotAccounting := accountingmock.NewAccounting(
accountingmock.WithReserveFunc(func(ctx context.Context, peer swarm.Address, price uint64) error {
if peer.String() == peer4.String() {
return nil
}
return errors.New("unable to reserve")
}),
)
psPivot, storerPivot, pivotTags := createPushSyncNodeWithAccounting(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, pivotAccounting, mock.WithPeers(peer1, peer2, peer3, peer4))
defer storerPivot.Close()
ta, err := pivotTags.Create(1)
if err != nil {
t.Fatal(err)
}
chunk = chunk.WithTagID(ta.Uid)
ta1, err := pivotTags.Get(ta.Uid)
if err != nil {
t.Fatal(err)
}
if ta1.Get(tags.StateSent) != 0 || ta1.Get(tags.StateSynced) != 0 {
t.Fatalf("tags initialization error")
}
// Trigger the sending of chunk to the closest node
receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
if err != nil {
t.Fatal(err)
}
if !chunk.Address().Equal(receipt.Address) {
t.Fatal("invalid receipt")
}
// this intercepts the outgoing delivery message
waitOnRecordAndTest(t, peer4, recorder, chunk.Address(), chunk.Data())
// this intercepts the incoming receipt message
waitOnRecordAndTest(t, peer4, recorder, chunk.Address(), nil)
ta2, err := pivotTags.Get(ta.Uid)
if err != nil {
t.Fatal(err)
}
// out of 4, 3 peers should return accouting error. So we should have effectively
// sent only 1 msg
if ta2.Get(tags.StateSent) != 1 {
t.Fatalf("tags error")
}
balance, err := pivotAccounting.Balance(peer4)
if err != nil {
t.Fatal(err)
}
if balance.Int64() != -int64(fixedPrice) {
t.Fatalf("unexpected balance on pivot. want %d got %d", -int64(fixedPrice), balance)
}
balance4, err := peerAccounting4.Balance(pivotNode)
if err != nil {
t.Fatal(err)
}
if balance4.Int64() != int64(fixedPrice) {
t.Fatalf("unexpected balance on peer4. want %d got %d", int64(fixedPrice), balance4)
}
for _, p := range []struct {
addr swarm.Address
acct accounting.Interface
}{
{peer1, peerAccounting1},
{peer2, peerAccounting2},
{peer3, peerAccounting3},
} {
bal, err := p.acct.Balance(p.addr)
if err != nil {
t.Fatal(err)
}
if bal.Int64() != 0 {
t.Fatalf("unexpected balance on %s. want %d got %d", p.addr, 0, bal)
}
}
}
// TestHandler expect a chunk from a node on a stream. It then stores the chunk in the local store and // TestHandler expect a chunk from a node on a stream. It then stores the chunk in the local store and
// sends back a receipt. This is tested by intercepting the incoming stream for proper messages. // sends back a receipt. This is tested by intercepting the incoming stream for proper messages.
// It also sends the chunk to the closest peer and receives a receipt. // It also sends the chunk to the closest peer and receives a receipt.
...@@ -651,6 +779,13 @@ func TestSignsReceipt(t *testing.T) { ...@@ -651,6 +779,13 @@ func TestSignsReceipt(t *testing.T) {
} }
func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags, accounting.Interface) { func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags, accounting.Interface) {
t.Helper()
mockAccounting := accountingmock.NewAccounting()
ps, mstorer, ts := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, mockOpts...)
return ps, mstorer, ts, mockAccounting
}
func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, acct accounting.Interface, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags) {
t.Helper() t.Helper()
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
storer := mocks.NewStorer() storer := mocks.NewStorer()
...@@ -658,7 +793,6 @@ func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameter ...@@ -658,7 +793,6 @@ func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameter
mockTopology := mock.NewTopologyDriver(mockOpts...) mockTopology := mock.NewTopologyDriver(mockOpts...)
mockStatestore := statestore.NewStateStore() mockStatestore := statestore.NewStateStore()
mtag := tags.NewTags(mockStatestore, logger) mtag := tags.NewTags(mockStatestore, logger)
mockAccounting := accountingmock.NewAccounting()
mockPricer := pricermock.NewMockService(prices.price, prices.peerPrice) mockPricer := pricermock.NewMockService(prices.price, prices.peerPrice)
...@@ -670,7 +804,7 @@ func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameter ...@@ -670,7 +804,7 @@ func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameter
return ch.WithStamp(postage.NewStamp(nil, nil)), nil return ch.WithStamp(postage.NewStamp(nil, nil)), nil
} }
return pushsync.New(addr, recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, mockAccounting, mockPricer, signer, nil), storer, mtag, mockAccounting return pushsync.New(addr, recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil), storer, mtag
} }
func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) { func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
...@@ -723,6 +857,205 @@ func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest. ...@@ -723,6 +857,205 @@ func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.
} }
} }
func TestFailureRequestCache(t *testing.T) {
cache := pushsync.FailedRequestCache()
peer := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000")
chunk := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
t.Run("not useful after threshold", func(t *testing.T) {
if !cache.Useful(peer, chunk) {
t.Fatal("incorrect initial cache state")
}
cache.RecordFailure(peer, chunk)
if !cache.Useful(peer, chunk) {
t.Fatal("incorrect cache state after 1st failure")
}
cache.RecordFailure(peer, chunk)
if !cache.Useful(peer, chunk) {
t.Fatal("incorrect cache state after 2nd failure")
}
cache.RecordFailure(peer, chunk)
if cache.Useful(peer, chunk) {
t.Fatal("peer should no longer be useful")
}
})
t.Run("reset after success", func(t *testing.T) {
cache.RecordSuccess(peer, chunk)
if !cache.Useful(peer, chunk) {
t.Fatal("incorrect cache state after success")
}
cache.RecordFailure(peer, chunk)
if !cache.Useful(peer, chunk) {
t.Fatal("incorrect cache state after first failure")
}
cache.RecordSuccess(peer, chunk)
// success should remove the peer from failed cache. We should have swallowed
// the previous failed request and the peer should still be useful after
// more failures
cache.RecordFailure(peer, chunk)
cache.RecordFailure(peer, chunk)
if !cache.Useful(peer, chunk) {
t.Fatal("peer should still be useful after intermittent success")
}
})
}
func TestPushChunkToClosestSkipFailed(t *testing.T) {
// chunk data to upload
chunk := testingc.FixtureChunk("7000")
// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
peer1 := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
peer2 := swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000")
peer3 := swarm.MustParseHexAddress("4000000000000000000000000000000000000000000000000000000000000000")
peer4 := swarm.MustParseHexAddress("9000000000000000000000000000000000000000000000000000000000000000")
// peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to
psPeer1, storerPeer1, _, peerAccounting1 := createPushSyncNode(t, peer1, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer1.Close()
psPeer2, storerPeer2, _, peerAccounting2 := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer2.Close()
psPeer3, storerPeer3, _, peerAccounting3 := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer3.Close()
psPeer4, storerPeer4, _, peerAccounting4 := createPushSyncNode(
t, peer4, defaultPrices, nil, nil, defaultSigner,
mock.WithClosestPeerErr(topology.ErrWantSelf),
mock.WithIsWithinFunc(func(_ swarm.Address) bool { return true }),
)
defer storerPeer4.Close()
var (
fail = true
lock sync.Mutex
)
recorder := streamtest.New(
streamtest.WithPeerProtocols(
map[string]p2p.ProtocolSpec{
peer1.String(): psPeer1.Protocol(),
peer2.String(): psPeer2.Protocol(),
peer3.String(): psPeer3.Protocol(),
peer4.String(): psPeer4.Protocol(),
},
),
streamtest.WithStreamError(
func(addr swarm.Address, _, _, _ string) error {
lock.Lock()
defer lock.Unlock()
if fail && addr.String() != peer4.String() {
return errors.New("peer not reachable")
}
return nil
},
),
streamtest.WithBaseAddr(pivotNode),
)
psPivot, storerPivot, pivotTags, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithPeers(peer1, peer2, peer3, peer4))
defer storerPivot.Close()
ta, err := pivotTags.Create(1)
if err != nil {
t.Fatal(err)
}
chunk = chunk.WithTagID(ta.Uid)
ta1, err := pivotTags.Get(ta.Uid)
if err != nil {
t.Fatal(err)
}
if ta1.Get(tags.StateSent) != 0 || ta1.Get(tags.StateSynced) != 0 {
t.Fatalf("tags initialization error")
}
for i := 0; i < 3; i++ {
_, err := psPivot.PushChunkToClosest(context.Background(), chunk)
if err == nil {
t.Fatal("expected error while pushing")
}
}
// Trigger the sending of chunk to the closest node
receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
if err != nil {
t.Fatal(err)
}
if !chunk.Address().Equal(receipt.Address) {
t.Fatal("invalid receipt")
}
// this intercepts the outgoing delivery message
waitOnRecordAndTest(t, peer4, recorder, chunk.Address(), chunk.Data())
// this intercepts the incoming receipt message
waitOnRecordAndTest(t, peer4, recorder, chunk.Address(), nil)
ta2, err := pivotTags.Get(ta.Uid)
if err != nil {
t.Fatal(err)
}
// out of 4, 3 peers should return accouting error. So we should have effectively
// sent only 1 msg
if ta2.Get(tags.StateSent) != 1 {
t.Fatalf("tags error")
}
balance, err := pivotAccounting.Balance(peer4)
if err != nil {
t.Fatal(err)
}
if balance.Int64() != -int64(fixedPrice) {
t.Fatalf("unexpected balance on pivot. want %d got %d", -int64(fixedPrice), balance)
}
balance4, err := peerAccounting4.Balance(pivotNode)
if err != nil {
t.Fatal(err)
}
if balance4.Int64() != int64(fixedPrice) {
t.Fatalf("unexpected balance on peer4. want %d got %d", int64(fixedPrice), balance4)
}
for _, p := range []struct {
addr swarm.Address
acct accounting.Interface
}{
{peer1, peerAccounting1},
{peer2, peerAccounting2},
{peer3, peerAccounting3},
} {
bal, err := p.acct.Balance(p.addr)
if err != nil {
t.Fatal(err)
}
if bal.Int64() != 0 {
t.Fatalf("unexpected balance on %s. want %d got %d", p.addr, 0, bal)
}
}
}
func chanFunc(c chan<- struct{}) func(swarm.Chunk) { func chanFunc(c chan<- struct{}) func(swarm.Chunk) {
return func(_ swarm.Chunk) { return func(_ swarm.Chunk) {
c <- struct{}{} c <- struct{}{}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment