pusher_test.go 8.36 KB
Newer Older
1 2 3 4 5 6 7 8 9
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package pusher_test

import (
	"context"
	"errors"
10
	statestore "github.com/ethersphere/bee/pkg/statestore/mock"
11 12 13 14 15 16 17 18 19 20 21 22
	"io/ioutil"
	"sync"
	"testing"
	"time"

	"github.com/ethersphere/bee/pkg/localstore"
	"github.com/ethersphere/bee/pkg/logging"
	"github.com/ethersphere/bee/pkg/pusher"
	"github.com/ethersphere/bee/pkg/pushsync"
	pushsyncmock "github.com/ethersphere/bee/pkg/pushsync/mock"
	"github.com/ethersphere/bee/pkg/storage"
	"github.com/ethersphere/bee/pkg/swarm"
23
	"github.com/ethersphere/bee/pkg/tags"
24 25 26 27
	"github.com/ethersphere/bee/pkg/topology/mock"
)

// no of times to retry to see if we have received response from pushsync
Zahoor Mohamed's avatar
Zahoor Mohamed committed
28
var noOfRetries = 20
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49

// Wrap the actual storer to intercept the modeSet that the pusher will call when a valid receipt is received
type Store struct {
	storage.Storer
	modeSet   map[string]storage.ModeSet
	modeSetMu *sync.Mutex
}

// Override the Set function to capture the ModeSetSyncPush
func (s Store) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) error {
	s.modeSetMu.Lock()
	defer s.modeSetMu.Unlock()
	for _, addr := range addrs {
		s.modeSet[addr.String()] = mode
	}
	return nil
}

// TestSendChunkToPushSync sends a chunk to pushsync to be sent ot its closest peer and get a receipt.
// once the receipt is got this check to see if the localstore is updated to see if the chunk is set
// as ModeSetSyncPush status.
50
func TestSendChunkToPushSyncWithTag(t *testing.T) {
51 52 53 54 55 56 57 58 59 60
	// create a trigger  and a closestpeer
	triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
	closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")

	pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
		receipt := &pushsync.Receipt{
			Address: swarm.NewAddress(chunk.Address().Bytes()),
		}
		return receipt, nil
	})
61 62 63
	mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
	defer storer.Close()

64
	ta, err := mtags.Create("test", 1)
Zahoor Mohamed's avatar
Zahoor Mohamed committed
65 66 67
	if err != nil {
		t.Fatal(err)
	}
68 69

	chunk := createChunk().WithTagID(ta.Uid)
70

Zahoor Mohamed's avatar
Zahoor Mohamed committed
71
	_, err = storer.Put(context.Background(), storage.ModePutUpload, chunk)
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
	if err != nil {
		t.Fatal(err)
	}

	// Check is the chunk is set as synced in the DB.
	for i := 0; i < noOfRetries; i++ {
		// Give some time for chunk to be pushed and receipt to be received
		time.Sleep(10 * time.Millisecond)

		err = checkIfModeSet(chunk.Address(), storage.ModeSetSyncPush, storer)
		if err == nil {
			break
		}
	}
	if err != nil {
		t.Fatal(err)
	}
89 90 91 92 93

	if ta.Get(tags.StateSynced) != 1 {
		t.Fatalf("tags error")
	}

94 95 96
	p.Close()
}

97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
// TestSendChunkToPushSyncWithoutTag is similar to TestSendChunkToPushSync, excep that the tags are not
// present to simulate bzz api withotu splitter condition
func TestSendChunkToPushSyncWithoutTag(t *testing.T) {
	chunk := createChunk()

	// create a trigger  and a closestpeer
	triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
	closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")

	pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
		receipt := &pushsync.Receipt{
			Address: swarm.NewAddress(chunk.Address().Bytes()),
		}
		return receipt, nil
	})
112 113

	_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
114 115
	defer storer.Close()

116
	_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
	if err != nil {
		t.Fatal(err)
	}

	// Check is the chunk is set as synced in the DB.
	for i := 0; i < noOfRetries; i++ {
		// Give some time for chunk to be pushed and receipt to be received
		time.Sleep(10 * time.Millisecond)

		err = checkIfModeSet(chunk.Address(), storage.ModeSetSyncPush, storer)
		if err == nil {
			break
		}
	}
	if err != nil {
		t.Fatal(err)
	}
	p.Close()
}

137 138 139 140 141 142 143 144 145 146 147 148 149 150
// TestSendChunkAndReceiveInvalidReceipt sends a chunk to pushsync to be sent ot its closest peer and
// get a invalid receipt (not with the address of the chunk sent). The test makes sure that this error
// is received and the ModeSetSyncPush is not set for the chunk.
func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) {
	chunk := createChunk()

	// create a trigger  and a closestpeer
	triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
	closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")

	pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
		return nil, errors.New("invalid receipt")
	})

151
	_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
152 153
	defer storer.Close()

154
	_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
	if err != nil {
		t.Fatal(err)
	}

	// Check is the chunk is set as synced in the DB.
	for i := 0; i < noOfRetries; i++ {
		// Give some time for chunk to be pushed and receipt to be received
		time.Sleep(10 * time.Millisecond)

		err = checkIfModeSet(chunk.Address(), storage.ModeSetSyncPush, storer)
		if err != nil {
			continue
		}
	}
	if err == nil {
		t.Fatalf("chunk not syned error expected")
	}
	p.Close()
}

// TestSendChunkAndTimeoutinReceivingReceipt sends a chunk to pushsync to be sent ot its closest peer and
// expects a timeout to get instead of getting a receipt. The test makes sure that timeout error
// is received and the ModeSetSyncPush is not set for the chunk.
func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
	chunk := createChunk()

	// create a trigger  and a closestpeer
	triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
	closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")

	pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
		// Set 10 times more than the time we wait for the test to complete so that
		// the response never reaches our testcase
		time.Sleep(1 * time.Second)
		return nil, nil
	})

192
	_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
193
	defer storer.Close()
194
	defer p.Close()
195

196
	_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
	if err != nil {
		t.Fatal(err)
	}

	// Check is the chunk is set as synced in the DB.
	for i := 0; i < noOfRetries; i++ {
		// Give some time for chunk to be pushed and receipt to be received
		time.Sleep(10 * time.Millisecond)

		err = checkIfModeSet(chunk.Address(), storage.ModeSetSyncPush, storer)
		if err != nil {
			continue
		}
	}
	if err == nil {
		t.Fatalf("chunk not syned error expected")
	}
}

func createChunk() swarm.Chunk {
	// chunk data to upload
	chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")
	chunkData := []byte("1234")
220
	return swarm.NewChunk(chunkAddress, chunkData).WithTagID(666)
221 222
}

223
func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, mockOpts ...mock.Option) (*tags.Tags, *pusher.Service, *Store) {
224 225 226 227 228 229 230
	t.Helper()
	logger := logging.New(ioutil.Discard, 0)
	storer, err := localstore.New("", addr.Bytes(), nil, logger)
	if err != nil {
		t.Fatal(err)
	}

231 232
	mockStatestore := statestore.NewStateStore()
	mtags := tags.NewTags(mockStatestore, logger)
233 234 235 236 237 238
	pusherStorer := &Store{
		Storer:    storer,
		modeSet:   make(map[string]storage.ModeSet),
		modeSetMu: &sync.Mutex{},
	}
	peerSuggester := mock.NewTopologyDriver(mockOpts...)
Zahoor Mohamed's avatar
Zahoor Mohamed committed
239

240
	pusherService := pusher.New(pusherStorer, peerSuggester, pushSyncService, mtags, logger)
241
	return mtags, pusherService, pusherStorer
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
}

func checkIfModeSet(addr swarm.Address, mode storage.ModeSet, storer *Store) error {
	var found bool
	storer.modeSetMu.Lock()
	defer storer.modeSetMu.Unlock()

	for k, v := range storer.modeSet {
		if addr.String() == k {
			found = true
			if v != mode {
				return errors.New("chunk mode is not properly set as synced")
			}
		}
	}
	if !found {
		return errors.New("Chunk not synced")
	}
	return nil
}

// To avoid timeout during race testing
// cd pkg/pusher
// go test -race -count 1000 -timeout 60m .