pushsync_test.go 14.8 KB
Newer Older
1 2 3 4
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

5 6 7 8 9
package pushsync_test

import (
	"bytes"
	"context"
10
	"fmt"
acud's avatar
acud committed
11
	"os"
12 13 14
	"testing"
	"time"

15 16
	"github.com/ethersphere/bee/pkg/accounting"
	accountingmock "github.com/ethersphere/bee/pkg/accounting/mock"
17 18
	"github.com/ethersphere/bee/pkg/localstore"
	"github.com/ethersphere/bee/pkg/logging"
19
	"github.com/ethersphere/bee/pkg/p2p"
20 21
	"github.com/ethersphere/bee/pkg/p2p/protobuf"
	"github.com/ethersphere/bee/pkg/p2p/streamtest"
22 23
	"github.com/ethersphere/bee/pkg/pushsync"
	"github.com/ethersphere/bee/pkg/pushsync/pb"
24
	statestore "github.com/ethersphere/bee/pkg/statestore/mock"
acud's avatar
acud committed
25
	testingc "github.com/ethersphere/bee/pkg/storage/testing"
26
	"github.com/ethersphere/bee/pkg/swarm"
27
	"github.com/ethersphere/bee/pkg/tags"
28 29
	"github.com/ethersphere/bee/pkg/topology"
	"github.com/ethersphere/bee/pkg/topology/mock"
30 31
)

32 33 34 35
const (
	fixedPrice = uint64(10)
)

36 37 38
// TestSendChunkAndGetReceipt inserts a chunk as uploaded chunk in db. This triggers sending a chunk to the closest node
// and expects a receipt. The message are intercepted in the outgoing stream to check for correctness.
func TestSendChunkAndReceiveReceipt(t *testing.T) {
39
	// chunk data to upload
acud's avatar
acud committed
40
	chunk := testingc.FixtureChunk("7000")
41 42 43 44 45

	// create a pivot node and a mocked closest node
	pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000")   // base is 0000
	closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // binary 0110 -> po 1

46 47
	// peer is the node responding to the chunk receipt message
	// mock should return ErrWantSelf since there's no one to forward to
acud's avatar
acud committed
48
	psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, nil, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
49
	defer storerPeer.Close()
50

51
	recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()))
52

53 54
	// pivot node needs the streamer since the chunk is intercepted by
	// the chunk worker, then gets sent by opening a new stream
55
	psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, recorder, nil, mock.WithClosestPeer(closestPeer))
56
	defer storerPivot.Close()
57

58 59
	// Trigger the sending of chunk to the closest node
	receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
60 61 62 63
	if err != nil {
		t.Fatal(err)
	}

64 65 66 67
	if !chunk.Address().Equal(receipt.Address) {
		t.Fatal("invalid receipt")
	}

68
	// this intercepts the outgoing delivery message
acud's avatar
acud committed
69
	waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), chunk.Data())
70

71
	// this intercepts the incoming receipt message
acud's avatar
acud committed
72
	waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil)
73

74 75 76 77 78
	balance, err := pivotAccounting.Balance(closestPeer)
	if err != nil {
		t.Fatal(err)
	}

79
	if balance.Int64() != -int64(fixedPrice) {
80 81 82 83 84 85 86 87
		t.Fatalf("unexpected balance on pivot. want %d got %d", -int64(fixedPrice), balance)
	}

	balance, err = peerAccounting.Balance(closestPeer)
	if err != nil {
		t.Fatal(err)
	}

88
	if balance.Int64() != int64(fixedPrice) {
89 90
		t.Fatalf("unexpected balance on peer. want %d got %d", int64(fixedPrice), balance)
	}
91
}
92

93 94 95 96
// PushChunkToClosest tests the sending of chunk to closest peer from the origination source perspective.
// it also checks wether the tags are incremented properly if they are present
func TestPushChunkToClosest(t *testing.T) {
	// chunk data to upload
acud's avatar
acud committed
97
	chunk := testingc.FixtureChunk("7000")
98
	// create a pivot node and a mocked closest node
99 100
	pivotNode := swarm.MustParseHexAddress("0000")   // base is 0000
	closestPeer := swarm.MustParseHexAddress("6000") // binary 0110 -> po 1
acud's avatar
acud committed
101
	callbackC := make(chan struct{}, 1)
102 103
	// peer is the node responding to the chunk receipt message
	// mock should return ErrWantSelf since there's no one to forward to
acud's avatar
acud committed
104
	psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, nil, chanFunc(callbackC), mock.WithClosestPeerErr(topology.ErrWantSelf))
105 106 107 108 109 110
	defer storerPeer.Close()

	recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()))

	// pivot node needs the streamer since the chunk is intercepted by
	// the chunk worker, then gets sent by opening a new stream
acud's avatar
acud committed
111
	psPivot, storerPivot, pivotTags, pivotAccounting := createPushSyncNode(t, pivotNode, recorder, nil, mock.WithClosestPeer(closestPeer))
112 113
	defer storerPivot.Close()

114
	ta, err := pivotTags.Create(1)
115 116 117
	if err != nil {
		t.Fatal(err)
	}
acud's avatar
acud committed
118
	chunk = chunk.WithTagID(ta.Uid)
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139

	ta1, err := pivotTags.Get(ta.Uid)
	if err != nil {
		t.Fatal(err)
	}

	if ta1.Get(tags.StateSent) != 0 || ta1.Get(tags.StateSynced) != 0 {
		t.Fatalf("tags initialization error")
	}

	// Trigger the sending of chunk to the closest node
	receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
	if err != nil {
		t.Fatal(err)
	}

	if !chunk.Address().Equal(receipt.Address) {
		t.Fatal("invalid receipt")
	}

	// this intercepts the outgoing delivery message
acud's avatar
acud committed
140
	waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), chunk.Data())
141 142

	// this intercepts the incoming receipt message
acud's avatar
acud committed
143
	waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil)
144 145 146 147 148 149 150 151 152

	ta2, err := pivotTags.Get(ta.Uid)
	if err != nil {
		t.Fatal(err)
	}
	if ta2.Get(tags.StateSent) != 1 {
		t.Fatalf("tags error")
	}

153 154 155 156 157
	balance, err := pivotAccounting.Balance(closestPeer)
	if err != nil {
		t.Fatal(err)
	}

158
	if balance.Int64() != -int64(fixedPrice) {
159 160 161 162 163 164 165 166
		t.Fatalf("unexpected balance on pivot. want %d got %d", -int64(fixedPrice), balance)
	}

	balance, err = peerAccounting.Balance(closestPeer)
	if err != nil {
		t.Fatal(err)
	}

167
	if balance.Int64() != int64(fixedPrice) {
168 169
		t.Fatalf("unexpected balance on peer. want %d got %d", int64(fixedPrice), balance)
	}
170 171 172 173 174 175 176

	// check if the pss delivery hook is called
	select {
	case <-callbackC:
	case <-time.After(100 * time.Millisecond):
		t.Fatalf("delivery hook was not called")
	}
177 178
}

179 180
func TestPushChunkToNextClosest(t *testing.T) {
	// chunk data to upload
acud's avatar
acud committed
181
	chunk := testingc.FixtureChunk("7000")
182 183

	// create a pivot node and a mocked closest node
184
	pivotNode := swarm.MustParseHexAddress("0000") // base is 0000
185

186 187
	peer1 := swarm.MustParseHexAddress("6000")
	peer2 := swarm.MustParseHexAddress("5000")
188 189 190 191 192 193 194
	peers := []swarm.Address{
		peer1,
		peer2,
	}

	// peer is the node responding to the chunk receipt message
	// mock should return ErrWantSelf since there's no one to forward to
acud's avatar
acud committed
195
	psPeer1, storerPeer1, _, peerAccounting1 := createPushSyncNode(t, peer1, nil, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
196 197
	defer storerPeer1.Close()

acud's avatar
acud committed
198
	psPeer2, storerPeer2, _, peerAccounting2 := createPushSyncNode(t, peer2, nil, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
	defer storerPeer2.Close()

	recorder := streamtest.New(
		streamtest.WithProtocols(
			psPeer1.Protocol(),
			psPeer2.Protocol(),
		),
		streamtest.WithMiddlewares(
			func(h p2p.HandlerFunc) p2p.HandlerFunc {
				return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
					// NOTE: return error for peer1
					if peer1.Equal(peer.Address) {
						return fmt.Errorf("peer not reachable: %s", peer.Address.String())
					}

					if err := h(ctx, peer, stream); err != nil {
						return err
					}
					// close stream after all previous middlewares wrote to it
					// so that the receiving peer can get all the post messages
					return stream.Close()
				}
			},
		),
	)

	// pivot node needs the streamer since the chunk is intercepted by
	// the chunk worker, then gets sent by opening a new stream
	psPivot, storerPivot, pivotTags, pivotAccounting := createPushSyncNode(t, pivotNode, recorder, nil,
		mock.WithPeers(peers...),
	)
	defer storerPivot.Close()

232
	ta, err := pivotTags.Create(1)
233 234 235
	if err != nil {
		t.Fatal(err)
	}
acud's avatar
acud committed
236
	chunk = chunk.WithTagID(ta.Uid)
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257

	ta1, err := pivotTags.Get(ta.Uid)
	if err != nil {
		t.Fatal(err)
	}

	if ta1.Get(tags.StateSent) != 0 || ta1.Get(tags.StateSynced) != 0 {
		t.Fatalf("tags initialization error")
	}

	// Trigger the sending of chunk to the closest node
	receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
	if err != nil {
		t.Fatal(err)
	}

	if !chunk.Address().Equal(receipt.Address) {
		t.Fatal("invalid receipt")
	}

	// this intercepts the outgoing delivery message
acud's avatar
acud committed
258
	waitOnRecordAndTest(t, peer2, recorder, chunk.Address(), chunk.Data())
259 260

	// this intercepts the incoming receipt message
acud's avatar
acud committed
261
	waitOnRecordAndTest(t, peer2, recorder, chunk.Address(), nil)
262 263 264 265 266 267 268 269 270 271 272 273 274 275

	ta2, err := pivotTags.Get(ta.Uid)
	if err != nil {
		t.Fatal(err)
	}
	if ta2.Get(tags.StateSent) != 1 {
		t.Fatalf("tags error")
	}

	balance, err := pivotAccounting.Balance(peer2)
	if err != nil {
		t.Fatal(err)
	}

276
	if balance.Int64() != -int64(fixedPrice) {
277 278 279 280 281 282 283 284
		t.Fatalf("unexpected balance on pivot. want %d got %d", -int64(fixedPrice), balance)
	}

	balance2, err := peerAccounting2.Balance(peer2)
	if err != nil {
		t.Fatal(err)
	}

285
	if balance2.Int64() != int64(fixedPrice) {
286 287 288 289 290 291 292 293
		t.Fatalf("unexpected balance on peer2. want %d got %d", int64(fixedPrice), balance2)
	}

	balance1, err := peerAccounting1.Balance(peer1)
	if err != nil {
		t.Fatal(err)
	}

294
	if balance1.Int64() != 0 {
295 296 297 298
		t.Fatalf("unexpected balance on peer1. want %d got %d", 0, balance1)
	}
}

299 300
// TestHandler expect a chunk from a node on a stream. It then stores the chunk in the local store and
// sends back a receipt. This is tested by intercepting the incoming stream for proper messages.
Zahoor Mohamed's avatar
Zahoor Mohamed committed
301
// It also sends the chunk to the closest peer and receives a receipt.
302 303 304 305
//
// Chunk moves from   TriggerPeer -> PivotPeer -> ClosestPeer
//
func TestHandler(t *testing.T) {
306
	// chunk data to upload
acud's avatar
acud committed
307
	chunk := testingc.FixtureChunk("7000")
308

309 310 311 312
	// create a pivot node and a mocked closest node
	pivotPeer := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000")
	triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
	closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
Zahoor Mohamed's avatar
Zahoor Mohamed committed
313

314
	// Create the closest peer
acud's avatar
acud committed
315
	psClosestPeer, closestStorerPeerDB, _, closestAccounting := createPushSyncNode(t, closestPeer, nil, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
316
	defer closestStorerPeerDB.Close()
317

318
	closestRecorder := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()))
319

320
	// creating the pivot peer
acud's avatar
acud committed
321
	psPivot, storerPivotDB, _, pivotAccounting := createPushSyncNode(t, pivotPeer, closestRecorder, nil, mock.WithClosestPeer(closestPeer))
322
	defer storerPivotDB.Close()
323

324
	pivotRecorder := streamtest.New(streamtest.WithProtocols(psPivot.Protocol()))
325

326
	// Creating the trigger peer
acud's avatar
acud committed
327
	psTriggerPeer, triggerStorerDB, _, triggerAccounting := createPushSyncNode(t, triggerPeer, pivotRecorder, nil, mock.WithClosestPeer(pivotPeer))
328
	defer triggerStorerDB.Close()
329

330
	receipt, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk)
331 332 333 334
	if err != nil {
		t.Fatal(err)
	}

335 336 337 338
	if !chunk.Address().Equal(receipt.Address) {
		t.Fatal("invalid receipt")
	}

339
	// In pivot peer,  intercept the incoming delivery chunk from the trigger peer and check for correctness
acud's avatar
acud committed
340
	waitOnRecordAndTest(t, pivotPeer, pivotRecorder, chunk.Address(), chunk.Data())
341

342 343
	// Pivot peer will forward the chunk to its closest peer. Intercept the incoming stream from pivot node and check
	// for the correctness of the chunk
acud's avatar
acud committed
344
	waitOnRecordAndTest(t, closestPeer, closestRecorder, chunk.Address(), chunk.Data())
345

346
	// Similarly intercept the same incoming stream to see if the closest peer is sending a proper receipt
acud's avatar
acud committed
347
	waitOnRecordAndTest(t, closestPeer, closestRecorder, chunk.Address(), nil)
348

349
	// In the received stream, check if a receipt is sent from pivot peer and check for its correctness.
acud's avatar
acud committed
350
	waitOnRecordAndTest(t, pivotPeer, pivotRecorder, chunk.Address(), nil)
Zahoor Mohamed's avatar
Zahoor Mohamed committed
351

352 353 354 355 356
	balance, err := triggerAccounting.Balance(pivotPeer)
	if err != nil {
		t.Fatal(err)
	}

357
	if balance.Int64() != -int64(fixedPrice) {
358 359 360 361 362 363 364 365 366
		t.Fatalf("unexpected balance on trigger. want %d got %d", -int64(fixedPrice), balance)
	}

	// we need to check here for pivotPeer instead of triggerPeer because during streamtest the peer in the handler is actually the receiver
	balance, err = pivotAccounting.Balance(pivotPeer)
	if err != nil {
		t.Fatal(err)
	}

367
	if balance.Int64() != int64(fixedPrice) {
368 369 370 371 372 373 374 375
		t.Fatalf("unexpected balance on pivot. want %d got %d", int64(fixedPrice), balance)
	}

	balance, err = pivotAccounting.Balance(closestPeer)
	if err != nil {
		t.Fatal(err)
	}

376
	if balance.Int64() != -int64(fixedPrice) {
377 378 379 380 381 382 383 384
		t.Fatalf("unexpected balance on pivot. want %d got %d", -int64(fixedPrice), balance)
	}

	balance, err = closestAccounting.Balance(closestPeer)
	if err != nil {
		t.Fatal(err)
	}

385
	if balance.Int64() != int64(fixedPrice) {
386 387
		t.Fatalf("unexpected balance on closest. want %d got %d", int64(fixedPrice), balance)
	}
388
}
389

acud's avatar
acud committed
390
func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), mockOpts ...mock.Option) (*pushsync.PushSync, *localstore.DB, *tags.Tags, accounting.Interface) {
391
	t.Helper()
acud's avatar
acud committed
392
	logger := logging.New(os.Stdout, 5)
393 394

	storer, err := localstore.New("", addr.Bytes(), nil, logger)
395 396 397 398
	if err != nil {
		t.Fatal(err)
	}

399
	mockTopology := mock.NewTopologyDriver(mockOpts...)
400 401
	mockStatestore := statestore.NewStateStore()
	mtag := tags.NewTags(mockStatestore, logger)
402 403 404
	mockAccounting := accountingmock.NewAccounting()
	mockPricer := accountingmock.NewPricer(fixedPrice, fixedPrice)

405
	recorderDisconnecter := streamtest.NewRecorderDisconnecter(recorder)
acud's avatar
acud committed
406 407 408
	if unwrap == nil {
		unwrap = func(swarm.Chunk) {}
	}
409

acud's avatar
acud committed
410
	return pushsync.New(recorderDisconnecter, storer, mockTopology, mtag, unwrap, logger, mockAccounting, mockPricer, nil), storer, mtag, mockAccounting
411
}
412

413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
	t.Helper()
	records := recorder.WaitRecords(t, peer, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName, 1, 5)

	if data != nil {
		messages, err := protobuf.ReadMessages(
			bytes.NewReader(records[0].In()),
			func() protobuf.Message { return new(pb.Delivery) },
		)
		if err != nil {
			t.Fatal(err)
		}
		if messages == nil {
			t.Fatal("nil rcvd. for message")
		}
		if len(messages) > 1 {
			t.Fatal("too many messages")
		}
		delivery := messages[0].(*pb.Delivery)

		if !bytes.Equal(delivery.Address, add.Bytes()) {
			t.Fatalf("chunk address mismatch")
		}

		if !bytes.Equal(delivery.Data, data) {
			t.Fatalf("chunk data mismatch")
		}
	} else {
		messages, err := protobuf.ReadMessages(
			bytes.NewReader(records[0].In()),
			func() protobuf.Message { return new(pb.Receipt) },
		)
		if err != nil {
			t.Fatal(err)
		}
		if messages == nil {
			t.Fatal("nil rcvd. for message")
		}
		if len(messages) > 1 {
			t.Fatal("too many messages")
		}
		receipt := messages[0].(*pb.Receipt)
		receiptAddress := swarm.NewAddress(receipt.Address)

		if !receiptAddress.Equal(add) {
			t.Fatalf("receipt address mismatch")
		}
460 461
	}
}
462

acud's avatar
acud committed
463 464 465
func chanFunc(c chan<- struct{}) func(swarm.Chunk) {
	return func(_ swarm.Chunk) {
		c <- struct{}{}
466 467
	}
}