Commit a4ca719e authored by Esad Akar's avatar Esad Akar Committed by GitHub

Push/pull sync - neighborhood syncing (#1537)

* storer nodes send chunks to neighbor peers as the receipt is sent back

* pullsync: dont sync outside nn (#911)

* puller: cancel everything when peer moves out of depth

* pr comments

* adjust tests for puller changes

* recalc when peer moves out of depth

* beekeeper: shorter timeouts for debug

* revert change

* Revert "beekeeper: shorter timeouts for debug"

This reverts commit 756e9bc46bc18300133d62ea48f82f3b8479aecc.

* TestReplicateBeforeReceipt sleep fix

* increase retry-delay for pushsync with clef to 10s

* increase retry-delay for pushsync with clef to 15s

* set retry-delay for pushsync with clef to 10s and chunks-per-node to 1 for chunks test

* increase retry-delay for pushsync with clef to 15s

* revert to 5s, we solved it inside bee-local
Co-authored-by: default avataracud <12988138+acud@users.noreply.github.com>
Co-authored-by: default avatarIvan Vandot <ivan@vandot.rs>
parent dbb66820
......@@ -68,6 +68,18 @@ func (m *Mock) ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (peer
panic("not implemented") // TODO: Implement
}
func (m *Mock) IsWithinDepth(adr swarm.Address) bool {
panic("not implemented") // TODO: Implement
}
func (m *Mock) EachNeighbor(topology.EachPeerFunc) error {
panic("not implemented") // TODO: Implement
}
func (m *Mock) EachNeighborRev(topology.EachPeerFunc) error {
panic("not implemented") // TODO: Implement
}
// EachPeer iterates from closest bin to farthest
func (m *Mock) EachPeer(f topology.EachPeerFunc) error {
m.mtx.Lock()
......
......@@ -428,7 +428,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
traversalService := traversal.NewService(ns)
pushSyncProtocol := pushsync.New(p2ps, storer, kad, tagService, pssService.TryUnwrap, logger, acc, pricer, signer, tracer)
pushSyncProtocol := pushsync.New(swarmAddress, p2ps, storer, kad, tagService, pssService.TryUnwrap, logger, acc, pricer, signer, tracer)
// set the pushSyncer in the PSS
pssService.SetPushSyncer(pushSyncProtocol)
......
......@@ -22,15 +22,12 @@ import (
"github.com/ethersphere/bee/pkg/topology"
)
const defaultShallowBinPeers = 2
var (
logMore = false // enable this to get more logging
)
type Options struct {
Bins uint8
ShallowBinPeers int
Bins uint8
}
type Puller struct {
......@@ -52,21 +49,16 @@ type Puller struct {
quit chan struct{}
wg sync.WaitGroup
bins uint8 // how many bins do we support
shallowBinPeers int // how many peers per bin do we want to sync with outside of depth
bins uint8 // how many bins do we support
}
func New(stateStore storage.StateStorer, topology topology.Driver, pullSync pullsync.Interface, logger logging.Logger, o Options) *Puller {
var (
bins uint8 = swarm.MaxBins
shallowBinPeers int = defaultShallowBinPeers
bins uint8 = swarm.MaxBins
)
if o.Bins != 0 {
bins = o.Bins
}
if o.ShallowBinPeers != 0 {
shallowBinPeers = o.ShallowBinPeers
}
p := &Puller{
statestore: stateStore,
......@@ -80,8 +72,7 @@ func New(stateStore storage.StateStorer, topology topology.Driver, pullSync pull
quit: make(chan struct{}),
wg: sync.WaitGroup{},
bins: bins,
shallowBinPeers: shallowBinPeers,
bins: bins,
}
for i := uint8(0); i < bins; i++ {
......@@ -148,23 +139,8 @@ func (p *Puller) manage() {
if _, ok := bp[peerAddr.String()]; ok {
delete(peersDisconnected, peerAddr.String())
}
syncing := len(bp)
if po < depth {
// outside of depth, sync peerPO bin only
if _, ok := bp[peerAddr.String()]; !ok {
if syncing < p.shallowBinPeers {
// peer not syncing yet and we still need more peers in this bin
bp[peerAddr.String()] = newSyncPeer(peerAddr, p.bins)
peerEntry := peer{addr: peerAddr, po: po}
peersToSync = append(peersToSync, peerEntry)
}
} else {
// already syncing, recalc
peerEntry := peer{addr: peerAddr, po: po}
peersToRecalc = append(peersToRecalc, peerEntry)
}
} else {
// within depth, sync everything >= depth
if po >= depth {
// within depth, sync everything
if _, ok := bp[peerAddr.String()]; !ok {
// we're not syncing with this peer yet, start doing so
bp[peerAddr.String()] = newSyncPeer(peerAddr, p.bins)
......@@ -175,6 +151,12 @@ func (p *Puller) manage() {
peerEntry := peer{addr: peerAddr, po: po}
peersToRecalc = append(peersToRecalc, peerEntry)
}
} else {
if _, ok := bp[peerAddr.String()]; ok {
// already syncing, recalc so that existing streams get cleaned up
peerEntry := peer{addr: peerAddr, po: po}
peersToRecalc = append(peersToRecalc, peerEntry)
}
}
return false, false, nil
......@@ -223,45 +205,34 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
c := p.cursors[peer.String()]
p.cursorsMtx.Unlock()
var want, dontWant []uint8
if po >= d {
// within depth
var want, dontWant []uint8
for i := d; i < p.bins; i++ {
if i == 0 {
continue
}
want = append(want, i)
}
for i := uint8(0); i < d; i++ {
dontWant = append(dontWant, i)
}
for _, bin := range want {
if !syncCtx.isBinSyncing(bin) {
p.syncPeerBin(ctx, syncCtx, peer, bin, c[bin])
}
}
syncCtx.cancelBins(dontWant...)
} else {
// outside of depth
var (
want = po
dontWant = []uint8{0} // never want bin 0
)
for i := uint8(0); i < p.bins; i++ {
if i == want {
continue
}
// cancel everything outside of depth
for i := uint8(0); i < d; i++ {
dontWant = append(dontWant, i)
}
if !syncCtx.isBinSyncing(want) {
p.syncPeerBin(ctx, syncCtx, peer, want, c[want])
} else {
// peer is outside depth. cancel everything
for i := uint8(0); i < p.bins; i++ {
dontWant = append(dontWant, i)
}
syncCtx.cancelBins(dontWant...)
}
syncCtx.cancelBins(dontWant...)
}
func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) {
......@@ -290,12 +261,6 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
c = cursors
}
// peer outside depth?
if po < d && po > 0 {
p.syncPeerBin(ctx, syncCtx, peer, po, c[po])
return
}
for bin, cur := range c {
if bin == 0 || uint8(bin) < d {
continue
......
......@@ -32,16 +32,40 @@ var (
reply = mockps.NewReply // alias to make code more readable
)
// test that adding one peer start syncing
// then that adding another peer at the same po
// does not start another syncing session
// test that adding one peer starts syncing
func TestOneSync(t *testing.T) {
var (
addr = test.RandomAddress()
addr2 = test.RandomAddress()
cursors = []uint64{1000, 1000, 1000}
liveReplies = []uint64{1}
shallowBinPeers = 1
addr = test.RandomAddress()
cursors = []uint64{1000, 1000, 1000}
liveReplies = []uint64{1}
)
puller, _, kad, pullsync := newPuller(opts{
kad: []mockk.Option{
mockk.WithEachPeerRevCalls(
mockk.AddrTuple{Addr: addr, PO: 1},
), mockk.WithDepth(1),
},
pullSync: []mockps.Option{mockps.WithCursors(cursors), mockps.WithLiveSyncReplies(liveReplies...)},
bins: 3,
})
defer puller.Close()
defer pullsync.Close()
time.Sleep(100 * time.Millisecond)
kad.Trigger()
waitCursorsCalled(t, pullsync, addr, false)
waitSyncCalled(t, pullsync, addr, false)
}
func TestNoSyncOutsideDepth(t *testing.T) {
var (
addr = test.RandomAddress()
addr2 = test.RandomAddress()
cursors = []uint64{1000, 1000, 1000}
liveReplies = []uint64{1}
)
puller, _, kad, pullsync := newPuller(opts{
......@@ -51,9 +75,8 @@ func TestOneSync(t *testing.T) {
mockk.AddrTuple{Addr: addr2, PO: 1},
), mockk.WithDepth(2),
},
pullSync: []mockps.Option{mockps.WithCursors(cursors), mockps.WithLiveSyncReplies(liveReplies...)},
bins: 3,
shallowBinPeers: &shallowBinPeers,
pullSync: []mockps.Option{mockps.WithCursors(cursors), mockps.WithLiveSyncReplies(liveReplies...)},
bins: 3,
})
defer puller.Close()
defer pullsync.Close()
......@@ -61,14 +84,14 @@ func TestOneSync(t *testing.T) {
kad.Trigger()
waitCursorsCalled(t, pullsync, addr, false)
waitCursorsCalled(t, pullsync, addr, true)
waitCursorsCalled(t, pullsync, addr2, true)
waitSyncCalled(t, pullsync, addr, false)
waitSyncCalled(t, pullsync, addr, true)
waitSyncCalled(t, pullsync, addr2, true)
}
func TestSyncFlow_PeerOutsideDepth_Live(t *testing.T) {
func TestSyncFlow_PeerWithinDepth_Live(t *testing.T) {
addr := test.RandomAddress()
for _, tc := range []struct {
......@@ -80,13 +103,13 @@ func TestSyncFlow_PeerOutsideDepth_Live(t *testing.T) {
expLiveCalls []c // expected live sync calls
}{
{
name: "cursor 0, 1 chunk on live", cursors: []uint64{0, 0, 0},
name: "cursor 0, 1 chunk on live", cursors: []uint64{0, 0},
intervals: "[[1 1]]",
liveReplies: []uint64{1},
expLiveCalls: []c{call(1, 1, max), call(1, 2, max)},
},
{
name: "cursor 0 - calls 1-1, 2-5, 6-10", cursors: []uint64{0, 0, 0},
name: "cursor 0 - calls 1-1, 2-5, 6-10", cursors: []uint64{0, 0},
intervals: "[[1 10]]",
liveReplies: []uint64{1, 5, 10},
expLiveCalls: []c{call(1, 1, max), call(1, 2, max), call(1, 6, max), call(1, 11, max)},
......@@ -97,7 +120,7 @@ func TestSyncFlow_PeerOutsideDepth_Live(t *testing.T) {
kad: []mockk.Option{
mockk.WithEachPeerRevCalls(
mockk.AddrTuple{Addr: addr, PO: 1},
), mockk.WithDepth(2),
), mockk.WithDepth(1),
},
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithLiveSyncReplies(tc.liveReplies...)},
bins: 5,
......@@ -121,7 +144,7 @@ func TestSyncFlow_PeerOutsideDepth_Live(t *testing.T) {
}
}
func TestSyncFlow_PeerOutsideDepth_Historical(t *testing.T) {
func TestSyncFlow_PeerWithinDepth_Historical(t *testing.T) {
addr := test.RandomAddress()
for _, tc := range []struct {
......@@ -132,7 +155,7 @@ func TestSyncFlow_PeerOutsideDepth_Historical(t *testing.T) {
expLiveCalls []c // expected live sync calls
}{
{
name: "1,1 - 1 call", cursors: []uint64{0, 1, 3}, //the third cursor is to make sure we dont get a request for a bin we dont need
name: "1,1 - 1 call", cursors: []uint64{0, 1}, //the third cursor is to make sure we dont get a request for a bin we dont need
intervals: "[[1 1]]",
expCalls: []c{call(1, 1, 1)},
expLiveCalls: []c{call(1, 2, math.MaxUint64)},
......@@ -173,7 +196,7 @@ func TestSyncFlow_PeerOutsideDepth_Historical(t *testing.T) {
kad: []mockk.Option{
mockk.WithEachPeerRevCalls(
mockk.AddrTuple{Addr: addr, PO: 1},
), mockk.WithDepth(2),
), mockk.WithDepth(1),
},
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithAutoReply(), mockps.WithLiveSyncBlock()},
bins: 5,
......@@ -197,7 +220,7 @@ func TestSyncFlow_PeerOutsideDepth_Historical(t *testing.T) {
}
}
func TestSyncFlow_PeerWithinDepth_Live(t *testing.T) {
func TestSyncFlow_PeerWithinDepth_Live2(t *testing.T) {
addr := test.RandomAddress()
for _, tc := range []struct {
......@@ -265,8 +288,8 @@ func TestPeerDisconnected(t *testing.T) {
time.Sleep(50 * time.Millisecond)
kad.Trigger()
waitCursorsCalled(t, pullsync, addr, false)
waitLiveSyncCalled(t, pullsync, addr, false)
waitCursorsCalled(t, pullsync, addr, true)
waitLiveSyncCalled(t, pullsync, addr, true)
kad.ResetPeers()
kad.Trigger()
time.Sleep(50 * time.Millisecond)
......@@ -276,6 +299,20 @@ func TestPeerDisconnected(t *testing.T) {
}
}
// TestDepthChange tests that puller reacts correctly to
// depth changes signalled from kademlia.
// Due to the fact that the component does goroutine termination
// and new syncing sessions autonomously, the testing strategy is a bit
// more tricky than usual. The idea is that syncReplies basically allow us
// to somehow see through the inner workings of the syncing strategies.
// When a sync reply is specified with block=true, the protocol mock basically
// returns an interval back to the caller, as if we have successfully synced the
// requested interval. This in turn means that the interval would be persisted
// in the state store, allowing us to inspect for which bins intervals exist when
// we check which bins were synced or not (presence of the key in the db indicates
// the bin was synced). This also means that tweaking these tests needs to
// be done carefully and with the understanding of what each change does to
// the tested unit.
func TestDepthChange(t *testing.T) {
var (
addr = test.RandomAddress()
......@@ -318,11 +355,12 @@ func TestDepthChange(t *testing.T) {
{
name: "peer moves out of depth",
cursors: []uint64{0, 0, 0, 0, 0},
binsNotSyncing: []uint8{1, 2, 4}, // only bins 3,4 are expected to sync
binsSyncing: []uint8{3},
binsNotSyncing: []uint8{1, 2, 3, 4}, // no bins should be syncing
syncReplies: []mockps.SyncReply{
reply(3, 1, 1, false),
reply(3, 2, 1, true),
reply(1, 1, 1, true),
reply(2, 1, 1, true),
reply(3, 1, 1, true),
reply(4, 1, 1, true),
},
depths: []uint8{0, 1, 2, 3, 4},
},
......@@ -345,7 +383,7 @@ func TestDepthChange(t *testing.T) {
kad: []mockk.Option{
mockk.WithEachPeerRevCalls(
mockk.AddrTuple{Addr: addr, PO: 3},
), mockk.WithDepthCalls(tc.depths...), // peer moved from out of depth to depth
), mockk.WithDepthCalls(tc.depths...),
},
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithLateSyncReply(tc.syncReplies...)},
bins: 5,
......@@ -364,7 +402,7 @@ func TestDepthChange(t *testing.T) {
// check the intervals
for _, b := range tc.binsSyncing {
checkIntervals(t, st, addr, interval, b) // getting errors here
checkIntervals(t, st, addr, interval, b)
}
for _, b := range tc.binsNotSyncing {
......@@ -545,10 +583,9 @@ func waitLiveSyncCalledTimes(t *testing.T, ps *mockps.PullSyncMock, addr swarm.A
}
type opts struct {
pullSync []mockps.Option
kad []mockk.Option
bins uint8
shallowBinPeers *int
pullSync []mockps.Option
kad []mockk.Option
bins uint8
}
func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *mockps.PullSyncMock) {
......@@ -560,9 +597,6 @@ func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *moc
o := puller.Options{
Bins: ops.bins,
}
if ops.shallowBinPeers != nil {
o.ShallowBinPeers = *ops.shallowBinPeers
}
return puller.New(s, kad, ps, logger, o), s, kad, ps
}
......
......@@ -6,6 +6,7 @@ package mock
import (
"context"
"fmt"
"math"
"sync"
......@@ -156,7 +157,7 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin
}
return sr.topmost, 0, nil
}
panic("not found")
panic(fmt.Sprintf("bin %d from %d to %d", bin, from, to))
}
if isLive && p.blockLiveSync {
......@@ -164,7 +165,6 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin
<-p.quit
return 0, 1, context.Canceled
}
if isLive && len(p.liveSyncReplies) > 0 {
if p.liveSyncCalls >= len(p.liveSyncReplies) {
<-p.quit
......
......@@ -10,9 +10,11 @@ import (
)
type metrics struct {
TotalSent prometheus.Counter
TotalReceived prometheus.Counter
TotalErrors prometheus.Counter
TotalSent prometheus.Counter
TotalReceived prometheus.Counter
TotalErrors prometheus.Counter
TotalReplicated prometheus.Counter
TotalReplicatedError prometheus.Counter
}
func newMetrics() metrics {
......@@ -37,6 +39,18 @@ func newMetrics() metrics {
Name: "total_errors",
Help: "Total no of time error received while sending chunk.",
}),
TotalReplicated: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_replication",
Help: "Total no of successfully sent replication chunks.",
}),
TotalReplicatedError: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_replication_error",
Help: "Total no of failed replication chunks.",
}),
}
}
......
......@@ -40,6 +40,10 @@ const (
maxPeers = 5
)
var (
ErrOutOfDepthReplication = errors.New("replication outside of the neighborhood")
)
type PushSyncer interface {
PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Receipt, error)
}
......@@ -50,34 +54,38 @@ type Receipt struct {
}
type PushSync struct {
streamer p2p.StreamerDisconnecter
storer storage.Putter
peerSuggester topology.ClosestPeerer
tagger *tags.Tags
unwrap func(swarm.Chunk)
logger logging.Logger
accounting accounting.Interface
pricer pricer.Interface
metrics metrics
tracer *tracing.Tracer
signer crypto.Signer
address swarm.Address
streamer p2p.StreamerDisconnecter
storer storage.Putter
topologyDriver topology.Driver
tagger *tags.Tags
unwrap func(swarm.Chunk)
logger logging.Logger
accounting accounting.Interface
pricer pricer.Interface
metrics metrics
tracer *tracing.Tracer
signer crypto.Signer
}
var timeToLive = 5 * time.Second // request time to live
var timeToLive = 5 * time.Second // request time to live
var timeToWaitForPushsyncToNeighbor = 3 * time.Second // time to wait to get a receipt for a chunk
var nPeersToPushsync = 3 // number of peers to replicate to as receipt is sent upstream
func New(streamer p2p.StreamerDisconnecter, storer storage.Putter, closestPeerer topology.ClosestPeerer, tagger *tags.Tags, unwrap func(swarm.Chunk), logger logging.Logger, accounting accounting.Interface, pricer pricer.Interface, signer crypto.Signer, tracer *tracing.Tracer) *PushSync {
func New(address swarm.Address, streamer p2p.StreamerDisconnecter, storer storage.Putter, topologyDriver topology.Driver, tagger *tags.Tags, unwrap func(swarm.Chunk), logger logging.Logger, accounting accounting.Interface, pricer pricer.Interface, signer crypto.Signer, tracer *tracing.Tracer) *PushSync {
ps := &PushSync{
streamer: streamer,
storer: storer,
peerSuggester: closestPeerer,
tagger: tagger,
unwrap: unwrap,
logger: logger,
accounting: accounting,
pricer: pricer,
metrics: newMetrics(),
tracer: tracer,
signer: signer,
address: address,
streamer: streamer,
storer: storer,
topologyDriver: topologyDriver,
tagger: tagger,
unwrap: unwrap,
logger: logger,
accounting: accounting,
pricer: pricer,
metrics: newMetrics(),
tracer: tracer,
signer: signer,
}
return ps
}
......@@ -126,19 +134,40 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
return swarm.ErrInvalidChunk
}
span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunk.Address().String()})
defer span.Finish()
// Get price we charge for upstream peer read at headler
// Get price we charge for upstream peer read at headler.
responseHeaders := stream.ResponseHeaders()
price, err := headerutils.ParsePriceHeader(responseHeaders)
// if not found in returned header, compute the price we charge for this chunk.
if err != nil {
// if not found in returned header, compute the price we charge for this chunk and
ps.logger.Warningf("push sync: peer %v no price in previously issued response headers: %v", p.Address, err)
ps.logger.Warningf("pushsync: peer %v no price in previously issued response headers: %v", p.Address, err)
price = ps.pricer.PriceForPeer(p.Address, chunk.Address())
}
// if the peer is closer to the chunk, we were selected for replication. Return early.
if dcmp, _ := swarm.DistanceCmp(chunk.Address().Bytes(), p.Address.Bytes(), ps.address.Bytes()); dcmp == 1 {
if ps.topologyDriver.IsWithinDepth(chunk.Address()) {
_, err = ps.storer.Put(ctx, storage.ModePutSync, chunk)
if err != nil {
ps.logger.Errorf("pushsync: chunk store: %v", err)
}
return ps.accounting.Debit(p.Address, price)
}
return ErrOutOfDepthReplication
}
// forwarding replication
if ps.topologyDriver.IsWithinDepth(chunk.Address()) {
_, err = ps.storer.Put(ctx, storage.ModePutSync, chunk)
if err != nil {
ps.logger.Warningf("pushsync: within depth peer's attempt to store chunk failed: %v", err)
}
}
span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunk.Address().String()})
defer span.Finish()
receipt, err := ps.pushToClosest(ctx, chunk)
if err != nil {
if errors.Is(err, topology.ErrWantSelf) {
......@@ -147,10 +176,89 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
return fmt.Errorf("chunk store: %w", err)
}
count := 0
// Push the chunk to some peers in the neighborhood in parallel for replication.
// Any errors here should NOT impact the rest of the handler.
err = ps.topologyDriver.EachNeighbor(func(peer swarm.Address, po uint8) (bool, bool, error) {
// skip forwarding peer
if peer.Equal(p.Address) {
return false, false, nil
}
if count == nPeersToPushsync {
return true, false, nil
}
count++
go func(peer swarm.Address) {
var err error
defer func() {
if err != nil {
ps.logger.Tracef("pushsync replication: %v", err)
ps.metrics.TotalReplicatedError.Inc()
} else {
ps.metrics.TotalReplicated.Inc()
}
}()
// price for neighborhood replication
const receiptPrice uint64 = 0
headers, err := headerutils.MakePricingHeaders(receiptPrice, chunk.Address())
if err != nil {
err = fmt.Errorf("make pricing headers: %w", err)
return
}
streamer, err := ps.streamer.NewStream(ctx, peer, headers, protocolName, protocolVersion, streamName)
if err != nil {
err = fmt.Errorf("new stream for peer %s: %w", peer.String(), err)
return
}
defer streamer.Close()
returnedHeaders := streamer.Headers()
_, returnedPrice, returnedIndex, err := headerutils.ParsePricingResponseHeaders(returnedHeaders)
if err != nil {
err = fmt.Errorf("push price headers read returned: %w", err)
return
}
// check if returned price matches presumed price, if not, return early.
if returnedPrice != receiptPrice {
err = ps.pricer.NotifyPeerPrice(peer, returnedPrice, returnedIndex)
return
}
w := protobuf.NewWriter(streamer)
ctx, cancel := context.WithTimeout(ctx, timeToWaitForPushsyncToNeighbor)
defer cancel()
err = w.WriteMsgWithContext(ctx, &pb.Delivery{
Address: chunk.Address().Bytes(),
Data: chunk.Data(),
})
if err != nil {
_ = streamer.Reset()
return
}
}(peer)
return false, false, nil
})
if err != nil {
ps.logger.Tracef("pushsync replication closest peer: %w", err)
}
signature, err := ps.signer.Sign(ch.Address)
if err != nil {
return fmt.Errorf("receipt signature: %w", err)
}
// return back receipt
receipt := pb.Receipt{Address: chunk.Address().Bytes(), Signature: signature}
if err := w.WriteMsgWithContext(ctx, &receipt); err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
......@@ -211,8 +319,8 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.R
defersFn()
// find next closest peer
peer, err := ps.peerSuggester.ClosestPeer(ch.Address(), skipPeers...)
// find the next closest peer
peer, err := ps.topologyDriver.ClosestPeer(ch.Address(), skipPeers...)
if err != nil {
// ClosestPeer can return ErrNotFound in case we are not connected to any peers
// in which case we should return immediately.
......
This diff is collapsed.
......@@ -16,6 +16,7 @@ type mock struct {
peers []swarm.Address
closestPeer swarm.Address
closestPeerErr error
peersErr error
addPeersErr error
marshalJSONFunc func() ([]byte, error)
mtx sync.Mutex
......@@ -64,11 +65,10 @@ func (d *mock) AddPeers(_ context.Context, addrs ...swarm.Address) error {
return d.addPeersErr
}
for _, addr := range addrs {
d.mtx.Lock()
d.peers = append(d.peers, addr)
d.mtx.Unlock()
}
d.mtx.Lock()
defer d.mtx.Unlock()
d.peers = append(d.peers, addrs...)
return nil
}
......@@ -85,7 +85,7 @@ func (d *mock) Peers() []swarm.Address {
return d.peers
}
func (d *mock) ClosestPeer(_ swarm.Address, skipPeers ...swarm.Address) (peerAddr swarm.Address, err error) {
func (d *mock) ClosestPeer(addr swarm.Address, skipPeers ...swarm.Address) (peerAddr swarm.Address, err error) {
if len(skipPeers) == 0 {
if d.closestPeerErr != nil {
return d.closestPeer, d.closestPeerErr
......@@ -98,6 +98,10 @@ func (d *mock) ClosestPeer(_ swarm.Address, skipPeers ...swarm.Address) (peerAdd
d.mtx.Lock()
defer d.mtx.Unlock()
if len(d.peers) == 0 {
return peerAddr, topology.ErrNotFound
}
skipPeer := false
for _, p := range d.peers {
for _, a := range skipPeers {
......@@ -111,7 +115,13 @@ func (d *mock) ClosestPeer(_ swarm.Address, skipPeers ...swarm.Address) (peerAdd
continue
}
peerAddr = p
if peerAddr.IsZero() {
peerAddr = p
}
if cmp, _ := swarm.DistanceCmp(addr.Bytes(), p.Bytes(), peerAddr.Bytes()); cmp == 1 {
peerAddr = p
}
}
if peerAddr.IsZero() {
......@@ -128,11 +138,27 @@ func (*mock) NeighborhoodDepth() uint8 {
return 0
}
func (m *mock) IsWithinDepth(addr swarm.Address) bool {
return false
}
func (m *mock) EachNeighbor(f topology.EachPeerFunc) error {
return m.EachPeer(f)
}
func (*mock) EachNeighborRev(topology.EachPeerFunc) error {
panic("not implemented") // TODO: Implement
}
// EachPeer iterates from closest bin to farthest
func (d *mock) EachPeer(f topology.EachPeerFunc) (err error) {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.peersErr != nil {
return d.peersErr
}
for i, p := range d.peers {
_, _, err = f(p, uint8(i))
if err != nil {
......
......@@ -25,6 +25,7 @@ type Driver interface {
PeerAdder
ClosestPeerer
EachPeerer
EachNeighbor
NeighborhoodDepth() uint8
SubscribePeersChange() (c <-chan struct{}, unsubscribe func())
io.Closer
......@@ -51,6 +52,15 @@ type EachPeerer interface {
EachPeerRev(EachPeerFunc) error
}
type EachNeighbor interface {
// EachNeighbor iterates from closest bin to farthest within the neighborhood.
EachNeighbor(EachPeerFunc) error
// EachNeighborRev iterates from farthest bin to closest within the neighborhood.
EachNeighborRev(EachPeerFunc) error
// IsWithinDepth checks if an address is the within neighborhood.
IsWithinDepth(swarm.Address) bool
}
// EachPeerFunc is a callback that is called with a peer and its PO
type EachPeerFunc func(swarm.Address, uint8) (stop, jumpToNext bool, err error)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment