Commit 15e1b509 authored by Nemanja Zbiljić's avatar Nemanja Zbiljić Committed by GitHub

Pushsync retry next peer after error (#947)

parent 11fb3bea
...@@ -511,13 +511,19 @@ func (k *Kad) notifyPeerSig() { ...@@ -511,13 +511,19 @@ func (k *Kad) notifyPeerSig() {
} }
// ClosestPeer returns the closest peer to a given address. // ClosestPeer returns the closest peer to a given address.
func (k *Kad) ClosestPeer(addr swarm.Address) (swarm.Address, error) { func (k *Kad) ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (swarm.Address, error) {
if k.connectedPeers.Length() == 0 { if k.connectedPeers.Length() == 0 {
return swarm.Address{}, topology.ErrNotFound return swarm.Address{}, topology.ErrNotFound
} }
closest := k.base closest := k.base
err := k.connectedPeers.EachBinRev(func(peer swarm.Address, po uint8) (bool, bool, error) { err := k.connectedPeers.EachBinRev(func(peer swarm.Address, po uint8) (bool, bool, error) {
for _, a := range skipPeers {
if a.Equal(peer) {
return false, false, nil
}
}
dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Bytes()) dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Bytes())
if err != nil { if err != nil {
return false, false, err return false, false, err
......
...@@ -64,7 +64,7 @@ func (m *Mock) AddPeers(ctx context.Context, addr ...swarm.Address) error { ...@@ -64,7 +64,7 @@ func (m *Mock) AddPeers(ctx context.Context, addr ...swarm.Address) error {
panic("not implemented") // TODO: Implement panic("not implemented") // TODO: Implement
} }
func (m *Mock) ClosestPeer(addr swarm.Address) (peerAddr swarm.Address, err error) { func (m *Mock) ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (peerAddr swarm.Address, err error) {
panic("not implemented") // TODO: Implement panic("not implemented") // TODO: Implement
} }
......
...@@ -29,6 +29,11 @@ const ( ...@@ -29,6 +29,11 @@ const (
streamName = "pushsync" streamName = "pushsync"
) )
const (
maxPeers = 5
blocklistDuration = time.Minute
)
type PushSyncer interface { type PushSyncer interface {
PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Receipt, error) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Receipt, error)
} }
...@@ -38,7 +43,7 @@ type Receipt struct { ...@@ -38,7 +43,7 @@ type Receipt struct {
} }
type PushSync struct { type PushSync struct {
streamer p2p.Streamer streamer p2p.StreamerDisconnecter
storer storage.Putter storer storage.Putter
peerSuggester topology.ClosestPeerer peerSuggester topology.ClosestPeerer
tagger *tags.Tags tagger *tags.Tags
...@@ -52,7 +57,7 @@ type PushSync struct { ...@@ -52,7 +57,7 @@ type PushSync struct {
var timeToWaitForReceipt = 3 * time.Second // time to wait to get a receipt for a chunk var timeToWaitForReceipt = 3 * time.Second // time to wait to get a receipt for a chunk
func New(streamer p2p.Streamer, storer storage.Putter, closestPeerer topology.ClosestPeerer, tagger *tags.Tags, validator swarm.ValidatorWithCallback, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, tracer *tracing.Tracer) *PushSync { func New(streamer p2p.StreamerDisconnecter, storer storage.Putter, closestPeerer topology.ClosestPeerer, tagger *tags.Tags, validator swarm.ValidatorWithCallback, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, tracer *tracing.Tracer) *PushSync {
ps := &PushSync{ ps := &PushSync{
streamer: streamer, streamer: streamer,
storer: storer, storer: storer,
...@@ -229,8 +234,40 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re ...@@ -229,8 +234,40 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-push", ps.logger, opentracing.Tag{Key: "address", Value: ch.Address().String()}) span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-push", ps.logger, opentracing.Tag{Key: "address", Value: ch.Address().String()})
defer span.Finish() defer span.Finish()
peer, err := ps.peerSuggester.ClosestPeer(ch.Address()) var (
skipPeers []swarm.Address
lastErr error
)
deferFuncs := make([]func(), 0)
defersFn := func() {
if len(deferFuncs) > 0 {
for _, deferFn := range deferFuncs {
deferFn()
}
deferFuncs = deferFuncs[:0]
}
}
defer defersFn()
for i := 0; i < maxPeers; i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
defersFn()
// find next closes peer
peer, err := ps.peerSuggester.ClosestPeer(ch.Address(), skipPeers...)
if err != nil { if err != nil {
if errors.Is(err, topology.ErrNotFound) {
// NOTE: needed for tests
skipPeers = append(skipPeers, peer)
continue
}
if errors.Is(err, topology.ErrWantSelf) { if errors.Is(err, topology.ErrWantSelf) {
// this is to make sure that the sent number does not diverge from the synced counter // this is to make sure that the sent number does not diverge from the synced counter
t, err := ps.tagger.Get(ch.TagID()) t, err := ps.tagger.Get(ch.TagID())
...@@ -246,27 +283,38 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re ...@@ -246,27 +283,38 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
Address: ch.Address(), Address: ch.Address(),
}, nil }, nil
} }
return nil, fmt.Errorf("closest peer: %w", err) return nil, fmt.Errorf("closest peer: %w", err)
} }
// save found peer (to be skipped if there is some error with him)
skipPeers = append(skipPeers, peer)
// compute the price we pay for this receipt and reserve it for the rest of this function // compute the price we pay for this receipt and reserve it for the rest of this function
receiptPrice := ps.pricer.PeerPrice(peer, ch.Address()) receiptPrice := ps.pricer.PeerPrice(peer, ch.Address())
err = ps.accounting.Reserve(ctx, peer, receiptPrice) err = ps.accounting.Reserve(ctx, peer, receiptPrice)
if err != nil { if err != nil {
return nil, fmt.Errorf("reserve balance for peer %s: %w", peer.String(), err) return nil, fmt.Errorf("reserve balance for peer %s: %w", peer.String(), err)
} }
defer ps.accounting.Release(peer, receiptPrice) deferFuncs = append(deferFuncs, func() { ps.accounting.Release(peer, receiptPrice) })
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil { if err != nil {
return nil, fmt.Errorf("new stream for peer %s: %w", peer.String(), err) lastErr = fmt.Errorf("new stream for peer %s: %w", peer.String(), err)
ps.logger.Debugf("pushsync-push: %w", lastErr)
continue
} }
defer func() { go streamer.FullClose() }() deferFuncs = append(deferFuncs, func() { go streamer.FullClose() })
w, r := protobuf.NewWriterAndReader(streamer) w, r := protobuf.NewWriterAndReader(streamer)
if err := ps.sendChunkDelivery(ctx, w, ch); err != nil { if err := ps.sendChunkDelivery(ctx, w, ch); err != nil {
_ = streamer.Reset() _ = streamer.Reset()
return nil, fmt.Errorf("chunk deliver to peer %s: %w", peer.String(), err) lastErr = fmt.Errorf("chunk deliver to peer %s: %w", peer.String(), err)
ps.logger.Debugf("pushsync-push: %w", lastErr)
if errors.Is(err, context.DeadlineExceeded) {
ps.blocklistPeer(peer)
}
continue
} }
// if you manage to get a tag, just increment the respective counter // if you manage to get a tag, just increment the respective counter
...@@ -282,7 +330,12 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re ...@@ -282,7 +330,12 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
receipt, err := ps.receiveReceipt(ctx, r) receipt, err := ps.receiveReceipt(ctx, r)
if err != nil { if err != nil {
_ = streamer.Reset() _ = streamer.Reset()
return nil, fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err) lastErr = fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err)
ps.logger.Debugf("pushsync-push: %w", lastErr)
if errors.Is(err, context.DeadlineExceeded) {
ps.blocklistPeer(peer)
}
continue
} }
ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds()) ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds())
...@@ -303,6 +356,24 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re ...@@ -303,6 +356,24 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
} }
return rec, nil return rec, nil
}
ps.logger.Tracef("pushsync-push: failed to push chunk %s: reached max peers of %v", ch.Address(), maxPeers)
if lastErr != nil {
return nil, lastErr
}
return nil, topology.ErrNotFound
}
func (ps *PushSync) blocklistPeer(peer swarm.Address) {
if err := ps.streamer.Blocklist(peer, blocklistDuration); err != nil {
ps.logger.Errorf("pushsync-push: unable to block peer %s", peer)
ps.logger.Debugf("pushsync-push: blocking peer %s: %v", peer, err)
} else {
ps.logger.Warningf("pushsync-push: peer %s blocked as unresponsive", peer)
}
} }
func (ps *PushSync) handleDeliveryResponse(ctx context.Context, w protobuf.Writer, p p2p.Peer, chunk swarm.Chunk) error { func (ps *PushSync) handleDeliveryResponse(ctx context.Context, w protobuf.Writer, p p2p.Peer, chunk swarm.Chunk) error {
......
...@@ -7,6 +7,7 @@ package pushsync_test ...@@ -7,6 +7,7 @@ package pushsync_test
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"io/ioutil" "io/ioutil"
"testing" "testing"
"time" "time"
...@@ -15,6 +16,7 @@ import ( ...@@ -15,6 +16,7 @@ import (
accountingmock "github.com/ethersphere/bee/pkg/accounting/mock" accountingmock "github.com/ethersphere/bee/pkg/accounting/mock"
"github.com/ethersphere/bee/pkg/localstore" "github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf" "github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/p2p/streamtest" "github.com/ethersphere/bee/pkg/p2p/streamtest"
"github.com/ethersphere/bee/pkg/pushsync" "github.com/ethersphere/bee/pkg/pushsync"
...@@ -181,6 +183,130 @@ func TestPushChunkToClosest(t *testing.T) { ...@@ -181,6 +183,130 @@ func TestPushChunkToClosest(t *testing.T) {
} }
} }
func TestPushChunkToNextClosest(t *testing.T) {
// chunk data to upload
chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")
chunkData := []byte("1234")
validator := testValidator(chunkAddress, chunkData, nil)
// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
peer1 := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
peer2 := swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000")
peers := []swarm.Address{
peer1,
peer2,
}
// peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to
psPeer1, storerPeer1, _, peerAccounting1 := createPushSyncNode(t, peer1, nil, validator, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer1.Close()
psPeer2, storerPeer2, _, peerAccounting2 := createPushSyncNode(t, peer2, nil, validator, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer2.Close()
recorder := streamtest.New(
streamtest.WithProtocols(
psPeer1.Protocol(),
psPeer2.Protocol(),
),
streamtest.WithMiddlewares(
func(h p2p.HandlerFunc) p2p.HandlerFunc {
return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
// NOTE: return error for peer1
if peer1.Equal(peer.Address) {
return fmt.Errorf("peer not reachable: %s", peer.Address.String())
}
if err := h(ctx, peer, stream); err != nil {
return err
}
// close stream after all previous middlewares wrote to it
// so that the receiving peer can get all the post messages
return stream.Close()
}
},
),
)
// pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream
psPivot, storerPivot, pivotTags, pivotAccounting := createPushSyncNode(t, pivotNode, recorder, nil,
mock.WithClosestPeerErr(topology.ErrNotFound),
mock.WithPeers(peers...),
)
defer storerPivot.Close()
ta, err := pivotTags.Create("test", 1)
if err != nil {
t.Fatal(err)
}
chunk := swarm.NewChunk(chunkAddress, chunkData).WithTagID(ta.Uid)
ta1, err := pivotTags.Get(ta.Uid)
if err != nil {
t.Fatal(err)
}
if ta1.Get(tags.StateSent) != 0 || ta1.Get(tags.StateSynced) != 0 {
t.Fatalf("tags initialization error")
}
// Trigger the sending of chunk to the closest node
receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
if err != nil {
t.Fatal(err)
}
if !chunk.Address().Equal(receipt.Address) {
t.Fatal("invalid receipt")
}
// this intercepts the outgoing delivery message
waitOnRecordAndTest(t, peer2, recorder, chunkAddress, chunkData)
// this intercepts the incoming receipt message
waitOnRecordAndTest(t, peer2, recorder, chunkAddress, nil)
ta2, err := pivotTags.Get(ta.Uid)
if err != nil {
t.Fatal(err)
}
if ta2.Get(tags.StateSent) != 1 {
t.Fatalf("tags error")
}
balance, err := pivotAccounting.Balance(peer2)
if err != nil {
t.Fatal(err)
}
if balance != -int64(fixedPrice) {
t.Fatalf("unexpected balance on pivot. want %d got %d", -int64(fixedPrice), balance)
}
balance2, err := peerAccounting2.Balance(peer2)
if err != nil {
t.Fatal(err)
}
if balance2 != int64(fixedPrice) {
t.Fatalf("unexpected balance on peer2. want %d got %d", int64(fixedPrice), balance2)
}
balance1, err := peerAccounting1.Balance(peer1)
if err != nil {
t.Fatal(err)
}
if balance1 != 0 {
t.Fatalf("unexpected balance on peer1. want %d got %d", 0, balance1)
}
}
// TestHandler expect a chunk from a node on a stream. It then stores the chunk in the local store and // TestHandler expect a chunk from a node on a stream. It then stores the chunk in the local store and
// sends back a receipt. This is tested by intercepting the incoming stream for proper messages. // sends back a receipt. This is tested by intercepting the incoming stream for proper messages.
// It also sends the chunk to the closest peer and receives a receipt. // It also sends the chunk to the closest peer and receives a receipt.
...@@ -290,7 +416,9 @@ func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.R ...@@ -290,7 +416,9 @@ func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.R
mockAccounting := accountingmock.NewAccounting() mockAccounting := accountingmock.NewAccounting()
mockPricer := accountingmock.NewPricer(fixedPrice, fixedPrice) mockPricer := accountingmock.NewPricer(fixedPrice, fixedPrice)
return pushsync.New(recorder, storer, mockTopology, mtag, validator, logger, mockAccounting, mockPricer, nil), storer, mtag, mockAccounting recorderDisconnecter := streamtest.NewRecorderDisconnecter(recorder)
return pushsync.New(recorderDisconnecter, storer, mockTopology, mtag, validator, logger, mockAccounting, mockPricer, nil), storer, mtag, mockAccounting
} }
func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) { func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
......
...@@ -137,7 +137,7 @@ func (d *driver) AddPeers(ctx context.Context, addrs ...swarm.Address) error { ...@@ -137,7 +137,7 @@ func (d *driver) AddPeers(ctx context.Context, addrs ...swarm.Address) error {
// ClosestPeer returns the closest connected peer we have in relation to a // ClosestPeer returns the closest connected peer we have in relation to a
// given chunk address. Returns topology.ErrWantSelf in case base is the closest to the chunk. // given chunk address. Returns topology.ErrWantSelf in case base is the closest to the chunk.
func (d *driver) ClosestPeer(addr swarm.Address) (swarm.Address, error) { func (d *driver) ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (swarm.Address, error) {
connectedPeers := d.p2pService.Peers() connectedPeers := d.p2pService.Peers()
if len(connectedPeers) == 0 { if len(connectedPeers) == 0 {
return swarm.Address{}, topology.ErrNotFound return swarm.Address{}, topology.ErrNotFound
...@@ -145,7 +145,21 @@ func (d *driver) ClosestPeer(addr swarm.Address) (swarm.Address, error) { ...@@ -145,7 +145,21 @@ func (d *driver) ClosestPeer(addr swarm.Address) (swarm.Address, error) {
// start checking closest from _self_ // start checking closest from _self_
closest := d.base closest := d.base
skipPeer := false
for _, peer := range connectedPeers { for _, peer := range connectedPeers {
if len(skipPeers) > 0 {
for _, a := range skipPeers {
if a.Equal(peer.Address) {
skipPeer = true
break
}
}
if skipPeer {
skipPeer = false
continue
}
}
dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Address.Bytes()) dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Address.Bytes())
if err != nil { if err != nil {
return swarm.Address{}, err return swarm.Address{}, err
...@@ -174,25 +188,25 @@ func (d *driver) Connected(ctx context.Context, addr swarm.Address) error { ...@@ -174,25 +188,25 @@ func (d *driver) Connected(ctx context.Context, addr swarm.Address) error {
return d.AddPeers(ctx, addr) return d.AddPeers(ctx, addr)
} }
func (_ *driver) Disconnected(swarm.Address) { func (*driver) Disconnected(swarm.Address) {
// TODO: implement if necessary // TODO: implement if necessary
} }
func (_ *driver) NeighborhoodDepth() uint8 { func (*driver) NeighborhoodDepth() uint8 {
return 0 return 0
} }
// EachPeer iterates from closest bin to farthest // EachPeer iterates from closest bin to farthest
func (_ *driver) EachPeer(_ topology.EachPeerFunc) error { func (*driver) EachPeer(_ topology.EachPeerFunc) error {
panic("not implemented") // TODO: Implement panic("not implemented") // TODO: Implement
} }
// EachPeerRev iterates from farthest bin to closest // EachPeerRev iterates from farthest bin to closest
func (_ *driver) EachPeerRev(_ topology.EachPeerFunc) error { func (*driver) EachPeerRev(_ topology.EachPeerFunc) error {
panic("not implemented") // TODO: Implement panic("not implemented") // TODO: Implement
} }
func (_ *driver) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) { func (*driver) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) {
//TODO implement if necessary //TODO implement if necessary
return c, unsubscribe return c, unsubscribe
} }
......
...@@ -21,6 +21,12 @@ type mock struct { ...@@ -21,6 +21,12 @@ type mock struct {
mtx sync.Mutex mtx sync.Mutex
} }
func WithPeers(peers ...swarm.Address) Option {
return optionFunc(func(d *mock) {
d.peers = peers
})
}
func WithAddPeersErr(err error) Option { func WithAddPeersErr(err error) Option {
return optionFunc(func(d *mock) { return optionFunc(func(d *mock) {
d.addPeersErr = err d.addPeersErr = err
...@@ -66,6 +72,7 @@ func (d *mock) AddPeers(_ context.Context, addrs ...swarm.Address) error { ...@@ -66,6 +72,7 @@ func (d *mock) AddPeers(_ context.Context, addrs ...swarm.Address) error {
return nil return nil
} }
func (d *mock) Connected(ctx context.Context, addr swarm.Address) error { func (d *mock) Connected(ctx context.Context, addr swarm.Address) error {
return d.AddPeers(ctx, addr) return d.AddPeers(ctx, addr)
} }
...@@ -78,26 +85,74 @@ func (d *mock) Peers() []swarm.Address { ...@@ -78,26 +85,74 @@ func (d *mock) Peers() []swarm.Address {
return d.peers return d.peers
} }
func (d *mock) ClosestPeer(addr swarm.Address) (peerAddr swarm.Address, err error) { func (d *mock) ClosestPeer(_ swarm.Address, skipPeers ...swarm.Address) (peerAddr swarm.Address, err error) {
if len(skipPeers) == 0 {
return d.closestPeer, d.closestPeerErr return d.closestPeer, d.closestPeerErr
}
d.mtx.Lock()
defer d.mtx.Unlock()
skipPeer := false
for _, p := range d.peers {
for _, a := range skipPeers {
if a.Equal(p) {
skipPeer = true
break
}
}
if skipPeer {
skipPeer = false
continue
}
peerAddr = p
}
if peerAddr.IsZero() {
return peerAddr, topology.ErrNotFound
}
return peerAddr, nil
} }
func (d *mock) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) { func (d *mock) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) {
return c, unsubscribe return c, unsubscribe
} }
func (_ *mock) NeighborhoodDepth() uint8 { func (*mock) NeighborhoodDepth() uint8 {
return 0 return 0
} }
// EachPeer iterates from closest bin to farthest // EachPeer iterates from closest bin to farthest
func (_ *mock) EachPeer(_ topology.EachPeerFunc) error { func (d *mock) EachPeer(f topology.EachPeerFunc) (err error) {
panic("not implemented") // TODO: Implement d.mtx.Lock()
defer d.mtx.Unlock()
for i, p := range d.peers {
_, _, err = f(p, uint8(i))
if err != nil {
return
}
}
return nil
} }
// EachPeerRev iterates from farthest bin to closest // EachPeerRev iterates from farthest bin to closest
func (_ *mock) EachPeerRev(_ topology.EachPeerFunc) error { func (d *mock) EachPeerRev(f topology.EachPeerFunc) (err error) {
panic("not implemented") // TODO: Implement d.mtx.Lock()
defer d.mtx.Unlock()
for i := len(d.peers) - 1; i >= 0; i-- {
_, _, err = f(d.peers[i], uint8(i))
if err != nil {
return
}
}
return nil
} }
func (d *mock) MarshalJSON() ([]byte, error) { func (d *mock) MarshalJSON() ([]byte, error) {
......
...@@ -32,7 +32,11 @@ type PeerAdder interface { ...@@ -32,7 +32,11 @@ type PeerAdder interface {
} }
type ClosestPeerer interface { type ClosestPeerer interface {
ClosestPeer(addr swarm.Address) (peerAddr swarm.Address, err error) // ClosestPeer returns the closest connected peer we have in relation to a
// given chunk address.
// This function will ignore peers with addresses provided in skipPeers.
// Returns topology.ErrWantSelf in case base is the closest to the address.
ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (peerAddr swarm.Address, err error)
} }
type EachPeerer interface { type EachPeerer interface {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment