Commit 372b8630 authored by metacertain's avatar metacertain Committed by GitHub

Receipts in Neighborhood sync (#1573)

pushsync: add unsigned receipts to verify in-neighborhood replication peers accepted chunk and can be accounted for
parent e4eb6372
......@@ -144,11 +144,19 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
// if the peer is closer to the chunk, we were selected for replication. Return early.
if dcmp, _ := swarm.DistanceCmp(chunk.Address().Bytes(), p.Address.Bytes(), ps.address.Bytes()); dcmp == 1 {
if ps.topologyDriver.IsWithinDepth(chunk.Address()) {
_, err = ps.storer.Put(ctx, storage.ModePutSync, chunk)
ctxd, canceld := context.WithTimeout(context.Background(), timeToWaitForPushsyncToNeighbor)
defer canceld()
_, err = ps.storer.Put(ctxd, storage.ModePutSync, chunk)
if err != nil {
ps.logger.Errorf("pushsync: chunk store: %v", err)
}
// return back receipt
receipt := pb.Receipt{Address: chunk.Address().Bytes()}
if err := w.WriteMsgWithContext(ctxd, &receipt); err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
}
return ps.accounting.Debit(p.Address, price)
}
......@@ -219,11 +227,17 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
err = fmt.Errorf("new stream for peer %s: %w", peer.String(), err)
return
}
defer streamer.Close()
w := protobuf.NewWriter(streamer)
ctx, cancel = context.WithTimeout(ctx, timeToWaitForPushsyncToNeighbor)
defer cancel()
defer func() {
if err != nil {
ps.metrics.TotalErrors.Inc()
_ = streamer.Reset()
} else {
_ = streamer.FullClose()
}
}()
w, r := protobuf.NewWriterAndReader(streamer)
stamp, err := chunk.Stamp().MarshalBinary()
if err != nil {
return
......@@ -234,7 +248,16 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
Stamp: stamp,
})
if err != nil {
_ = streamer.Reset()
return
}
var receipt pb.Receipt
if err = r.ReadMsgWithContext(ctx, &receipt); err != nil {
return
}
if !chunk.Address().Equal(swarm.NewAddress(receipt.Address)) {
// if the receipt is invalid, give up
return
}
......
......@@ -210,6 +210,105 @@ func TestReplicateBeforeReceipt(t *testing.T) {
}
}
func TestFailToReplicateBeforeReceipt(t *testing.T) {
// chunk data to upload
chunk := testingc.FixtureChunk("7000") // base 0111
// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // binary 0110
secondPeer := swarm.MustParseHexAddress("4000000000000000000000000000000000000000000000000000000000000000") // binary 0100
emptyPeer := swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000") // binary 0101, this peer should not get the chunk
// node that is connected to secondPeer
// it's address is closer to the chunk than secondPeer but it will not receive the chunk
_, storerEmpty, _, _ := createPushSyncNode(t, emptyPeer, defaultPrices, nil, nil, defaultSigner)
defer storerEmpty.Close()
wFunc := func(addr swarm.Address) bool {
return false
}
// node that is connected to closestPeer
// will receieve chunk from closestPeer
psSecond, storerSecond, _, secondAccounting := createPushSyncNode(t, secondPeer, defaultPrices, nil, nil, defaultSigner, mock.WithPeers(emptyPeer), mock.WithIsWithinFunc(wFunc))
defer storerSecond.Close()
secondRecorder := streamtest.New(streamtest.WithProtocols(psSecond.Protocol()), streamtest.WithBaseAddr(closestPeer))
psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer.Close()
recorder := streamtest.New(streamtest.WithProtocols(psStorer.Protocol()), streamtest.WithBaseAddr(pivotNode))
// pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream
psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithClosestPeer(closestPeer))
defer storerPivot.Close()
// Trigger the sending of chunk to the closest node
receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
if err != nil {
t.Fatal(err)
}
if !chunk.Address().Equal(receipt.Address) {
t.Fatal("invalid receipt")
}
// this intercepts the outgoing delivery message
waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), chunk.Data())
// this intercepts the incoming receipt message
waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil)
// sleep for a bit to allow the second peer to the store replicated chunk
time.Sleep(time.Millisecond * 500)
// this intercepts the outgoing delivery message from storer node to second storer node
waitOnRecordAndTest(t, secondPeer, secondRecorder, chunk.Address(), chunk.Data())
// this intercepts the incoming receipt message
waitOnRecordAndTest(t, secondPeer, secondRecorder, chunk.Address(), nil)
_, err = storerEmpty.Get(context.Background(), storage.ModeGetSync, chunk.Address())
if !errors.Is(err, storage.ErrNotFound) {
t.Fatal(err)
}
balance, err := pivotAccounting.Balance(closestPeer)
if err != nil {
t.Fatal(err)
}
if balance.Int64() != -int64(fixedPrice) {
t.Fatalf("unexpected balance on storer node. want %d got %d", int64(fixedPrice), balance)
}
balance, err = storerAccounting.Balance(pivotNode)
if err != nil {
t.Fatal(err)
}
if balance.Int64() != int64(fixedPrice) {
t.Fatalf("unexpected balance on storer node. want %d got %d", int64(fixedPrice), balance)
}
balance, err = secondAccounting.Balance(closestPeer)
if err != nil {
t.Fatal(err)
}
if balance.Int64() != int64(0) {
t.Fatalf("unexpected balance on second storer. want %d got %d", int64(0), balance)
}
balance, err = storerAccounting.Balance(secondPeer)
if err != nil {
t.Fatal(err)
}
if balance.Int64() != -int64(0) {
t.Fatalf("unexpected balance on storer node. want %d got %d", -int64(0), balance)
}
}
// PushChunkToClosest tests the sending of chunk to closest peer from the origination source perspective.
// it also checks wether the tags are incremented properly if they are present
func TestPushChunkToClosest(t *testing.T) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment