Commit 2922c5b1 authored by Zahoor Mohamed's avatar Zahoor Mohamed Committed by GitHub

Pusher: Move push sync logic in to pusher (#171)

parent 50eaca85
...@@ -15,9 +15,6 @@ import ( ...@@ -15,9 +15,6 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/api" "github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/crypto"
...@@ -32,6 +29,8 @@ import ( ...@@ -32,6 +29,8 @@ import (
"github.com/ethersphere/bee/pkg/netstore" "github.com/ethersphere/bee/pkg/netstore"
"github.com/ethersphere/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/pusher"
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/retrieval" "github.com/ethersphere/bee/pkg/retrieval"
"github.com/ethersphere/bee/pkg/statestore/leveldb" "github.com/ethersphere/bee/pkg/statestore/leveldb"
mockinmem "github.com/ethersphere/bee/pkg/statestore/mock" mockinmem "github.com/ethersphere/bee/pkg/statestore/mock"
...@@ -41,6 +40,8 @@ import ( ...@@ -41,6 +40,8 @@ import (
"github.com/ethersphere/bee/pkg/tracing" "github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/bee/pkg/validator" "github.com/ethersphere/bee/pkg/validator"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
) )
type Bee struct { type Bee struct {
...@@ -53,6 +54,7 @@ type Bee struct { ...@@ -53,6 +54,7 @@ type Bee struct {
stateStoreCloser io.Closer stateStoreCloser io.Closer
localstoreCloser io.Closer localstoreCloser io.Closer
topologyCloser io.Closer topologyCloser io.Closer
pusherCloser io.Closer
} }
type Options struct { type Options struct {
...@@ -206,6 +208,21 @@ func NewBee(o Options) (*Bee, error) { ...@@ -206,6 +208,21 @@ func NewBee(o Options) (*Bee, error) {
ns := netstore.New(storer, retrieve, validator.NewContentAddressValidator()) ns := netstore.New(storer, retrieve, validator.NewContentAddressValidator())
pushSyncProtocol := pushsync.New(pushsync.Options{
Streamer: p2ps,
Storer: storer,
ClosestPeerer: topologyDriver,
Logger: logger,
})
pushSyncPusher := pusher.New(pusher.Options{
Storer: storer,
PeerSuggester: topologyDriver,
PushSyncer: pushSyncProtocol,
Logger: logger,
})
b.pusherCloser = pushSyncPusher
var apiService api.Service var apiService api.Service
if o.APIAddr != "" { if o.APIAddr != "" {
// API server // API server
...@@ -374,6 +391,10 @@ func (b *Bee) Shutdown(ctx context.Context) error { ...@@ -374,6 +391,10 @@ func (b *Bee) Shutdown(ctx context.Context) error {
return err return err
} }
if err := b.pusherCloser.Close(); err != nil {
return fmt.Errorf("pusher: %w", err)
}
b.p2pCancel() b.p2pCancel()
if err := b.p2pService.Close(); err != nil { if err := b.p2pService.Close(); err != nil {
return fmt.Errorf("p2p server: %w", err) return fmt.Errorf("p2p server: %w", err)
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pusher
import (
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type metrics struct {
// all metrics fields must be exported
// to be able to return them by Metrics()
// using reflection
TotalChunksToBeSentCounter prometheus.Counter
TotalChunksSynced prometheus.Counter
ErrorSettingChunkToSynced prometheus.Counter
MarkAndSweepTimer prometheus.Histogram
}
func newMetrics() metrics {
subsystem := "pushsync"
return metrics{
TotalChunksToBeSentCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_chunk_to_be_sent",
Help: "Total chunks to be sent.",
}),
TotalChunksSynced: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_chunk_synced",
Help: "Total chunks synced successfully with valid receipts.",
}),
ErrorSettingChunkToSynced: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "cannot_set_chunk_sync_in_db",
Help: "Total no of times the chunk cannot be synced in DB.",
}),
MarkAndSweepTimer: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "mark_and_sweep_time_histogram",
Help: "Histogram of time spent in mark and sweep.",
Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60},
}),
}
}
func (s *Service) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pusher
import (
"context"
"time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
)
type Service struct {
storer storage.Storer
pushSyncer pushsync.PushSyncer
logger logging.Logger
metrics metrics
quit chan struct{}
chunksWorkerQuitC chan struct{}
}
type Options struct {
Storer storage.Storer
PeerSuggester topology.ClosestPeerer
PushSyncer pushsync.PushSyncer
Logger logging.Logger
}
var retryInterval = 10 * time.Second // time interval between retries
func New(o Options) *Service {
service := &Service{
storer: o.Storer,
pushSyncer: o.PushSyncer,
logger: o.Logger,
metrics: newMetrics(),
quit: make(chan struct{}),
chunksWorkerQuitC: make(chan struct{}),
}
go service.chunksWorker()
return service
}
// chunksWorker is a loop that keeps looking for chunks that are locally uploaded ( by monitoring pushIndex )
// and pushes them to the closest peer and get a receipt.
func (s *Service) chunksWorker() {
var chunks <-chan swarm.Chunk
var unsubscribe func()
// timer, initially set to 0 to fall through select case on timer.C for initialisation
timer := time.NewTimer(0)
defer timer.Stop()
defer close(s.chunksWorkerQuitC)
chunksInBatch := -1
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-s.quit
cancel()
}()
for {
select {
// handle incoming chunks
case ch, more := <-chunks:
// if no more, set to nil, reset timer to 0 to finalise batch immediately
if !more {
chunks = nil
var dur time.Duration
if chunksInBatch == 0 {
dur = 500 * time.Millisecond
}
timer.Reset(dur)
break
}
chunksInBatch++
s.metrics.TotalChunksToBeSentCounter.Inc()
// Later when we process receipt, get the receipt and process it
// for now ignoring the receipt and checking only for error
_, err := s.pushSyncer.PushChunkToClosest(ctx, ch)
if err != nil {
s.logger.Errorf("pusher: error while sending chunk or receiving receipt: %v", err)
continue
}
// set chunk status to synced
s.setChunkAsSynced(ctx, ch.Address())
continue
// retry interval timer triggers starting from new
case <-timer.C:
// initially timer is set to go off as well as every time we hit the end of push index
startTime := time.Now()
// if subscribe was running, stop it
if unsubscribe != nil {
unsubscribe()
}
// and start iterating on Push index from the beginning
chunks, unsubscribe = s.storer.SubscribePush(ctx)
// reset timer to go off after retryInterval
timer.Reset(retryInterval)
s.metrics.MarkAndSweepTimer.Observe(time.Since(startTime).Seconds())
case <-s.quit:
if unsubscribe != nil {
unsubscribe()
}
return
}
}
}
func (s *Service) setChunkAsSynced(ctx context.Context, addr swarm.Address) {
if err := s.storer.Set(ctx, storage.ModeSetSyncPush, addr); err != nil {
s.logger.Errorf("pusher: error setting chunk as synced: %v", err)
s.metrics.ErrorSettingChunkToSynced.Inc()
} else {
s.metrics.TotalChunksSynced.Inc()
}
}
func (s *Service) Close() error {
close(s.quit)
// Wait for chunks worker to finish
select {
case <-s.chunksWorkerQuitC:
case <-time.After(3 * time.Second):
}
return nil
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pusher_test
import (
"context"
"errors"
"io/ioutil"
"sync"
"testing"
"time"
"github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pusher"
"github.com/ethersphere/bee/pkg/pushsync"
pushsyncmock "github.com/ethersphere/bee/pkg/pushsync/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology/mock"
)
// no of times to retry to see if we have received response from pushsync
var noOfRetries = 10
// Wrap the actual storer to intercept the modeSet that the pusher will call when a valid receipt is received
type Store struct {
storage.Storer
modeSet map[string]storage.ModeSet
modeSetMu *sync.Mutex
}
// Override the Set function to capture the ModeSetSyncPush
func (s Store) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) error {
s.modeSetMu.Lock()
defer s.modeSetMu.Unlock()
for _, addr := range addrs {
s.modeSet[addr.String()] = mode
}
return nil
}
// TestSendChunkToPushSync sends a chunk to pushsync to be sent ot its closest peer and get a receipt.
// once the receipt is got this check to see if the localstore is updated to see if the chunk is set
// as ModeSetSyncPush status.
func TestSendChunkToPushSync(t *testing.T) {
chunk := createChunk()
// create a trigger and a closestpeer
triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
receipt := &pushsync.Receipt{
Address: swarm.NewAddress(chunk.Address().Bytes()),
}
return receipt, nil
})
p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
defer storer.Close()
_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
// Check is the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(10 * time.Millisecond)
err = checkIfModeSet(chunk.Address(), storage.ModeSetSyncPush, storer)
if err == nil {
break
}
}
if err != nil {
t.Fatal(err)
}
p.Close()
}
// TestSendChunkAndReceiveInvalidReceipt sends a chunk to pushsync to be sent ot its closest peer and
// get a invalid receipt (not with the address of the chunk sent). The test makes sure that this error
// is received and the ModeSetSyncPush is not set for the chunk.
func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) {
chunk := createChunk()
// create a trigger and a closestpeer
triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
return nil, errors.New("invalid receipt")
})
p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
defer storer.Close()
_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
// Check is the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(10 * time.Millisecond)
err = checkIfModeSet(chunk.Address(), storage.ModeSetSyncPush, storer)
if err != nil {
continue
}
}
if err == nil {
t.Fatalf("chunk not syned error expected")
}
p.Close()
}
// TestSendChunkAndTimeoutinReceivingReceipt sends a chunk to pushsync to be sent ot its closest peer and
// expects a timeout to get instead of getting a receipt. The test makes sure that timeout error
// is received and the ModeSetSyncPush is not set for the chunk.
func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
chunk := createChunk()
// create a trigger and a closestpeer
triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
// Set 10 times more than the time we wait for the test to complete so that
// the response never reaches our testcase
time.Sleep(1 * time.Second)
return nil, nil
})
p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
defer storer.Close()
_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
// Check is the chunk is set as synced in the DB.
for i := 0; i < noOfRetries; i++ {
// Give some time for chunk to be pushed and receipt to be received
time.Sleep(10 * time.Millisecond)
err = checkIfModeSet(chunk.Address(), storage.ModeSetSyncPush, storer)
if err != nil {
continue
}
}
if err == nil {
t.Fatalf("chunk not syned error expected")
}
p.Close()
}
func createChunk() swarm.Chunk {
// chunk data to upload
chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")
chunkData := []byte("1234")
return swarm.NewChunk(chunkAddress, chunkData)
}
func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, mockOpts ...mock.Option) (*pusher.Service, *Store) {
t.Helper()
logger := logging.New(ioutil.Discard, 0)
storer, err := localstore.New("", addr.Bytes(), nil, logger)
if err != nil {
t.Fatal(err)
}
pusherStorer := &Store{
Storer: storer,
modeSet: make(map[string]storage.ModeSet),
modeSetMu: &sync.Mutex{},
}
peerSuggester := mock.NewTopologyDriver(mockOpts...)
pusherService := pusher.New(pusher.Options{Storer: pusherStorer, PushSyncer: pushSyncService, PeerSuggester: peerSuggester, Logger: logger})
return pusherService, pusherStorer
}
func checkIfModeSet(addr swarm.Address, mode storage.ModeSet, storer *Store) error {
var found bool
storer.modeSetMu.Lock()
defer storer.modeSetMu.Unlock()
for k, v := range storer.modeSet {
if addr.String() == k {
found = true
if v != mode {
return errors.New("chunk mode is not properly set as synced")
}
}
}
if !found {
return errors.New("Chunk not synced")
}
return nil
}
// To avoid timeout during race testing
// cd pkg/pusher
// go test -race -count 1000 -timeout 60m .
...@@ -14,8 +14,6 @@ type metrics struct { ...@@ -14,8 +14,6 @@ type metrics struct {
// to be able to return them by Metrics() // to be able to return them by Metrics()
// using reflection // using reflection
TotalChunksToBeSentCounter prometheus.Counter
TotalChunksSynced prometheus.Counter
TotalChunksStoredInDB prometheus.Counter TotalChunksStoredInDB prometheus.Counter
ChunksSentCounter prometheus.Counter ChunksSentCounter prometheus.Counter
ChunksReceivedCounter prometheus.Counter ChunksReceivedCounter prometheus.Counter
...@@ -25,30 +23,16 @@ type metrics struct { ...@@ -25,30 +23,16 @@ type metrics struct {
ReceiptsSentCounter prometheus.Counter ReceiptsSentCounter prometheus.Counter
SendReceiptErrorCounter prometheus.Counter SendReceiptErrorCounter prometheus.Counter
ReceiveReceiptErrorCounter prometheus.Counter ReceiveReceiptErrorCounter prometheus.Counter
ErrorSettingChunkToSynced prometheus.Counter
RetriesExhaustedCounter prometheus.Counter RetriesExhaustedCounter prometheus.Counter
InvalidReceiptReceived prometheus.Counter InvalidReceiptReceived prometheus.Counter
SendChunkTimer prometheus.Histogram SendChunkTimer prometheus.Histogram
ReceiptRTT prometheus.Histogram ReceiptRTT prometheus.Histogram
MarkAndSweepTimer prometheus.Histogram
} }
func newMetrics() metrics { func newMetrics() metrics {
subsystem := "pushsync" subsystem := "pushsync"
return metrics{ return metrics{
TotalChunksToBeSentCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_chunk_to_be_sent",
Help: "Total chunks to be sent.",
}),
TotalChunksSynced: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_chunk_synced",
Help: "Total chunks synced successfully with valid receipts.",
}),
TotalChunksStoredInDB: prometheus.NewCounter(prometheus.CounterOpts{ TotalChunksStoredInDB: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
...@@ -103,13 +87,6 @@ func newMetrics() metrics { ...@@ -103,13 +87,6 @@ func newMetrics() metrics {
Name: "receive_receipt_error", Name: "receive_receipt_error",
Help: "Total no of time error received while receiving receipt.", Help: "Total no of time error received while receiving receipt.",
}), }),
ErrorSettingChunkToSynced: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "cannot_set_chunk_sync_in_DB",
Help: "Total no of times the chunk cannot be synced in DB.",
}),
RetriesExhaustedCounter: prometheus.NewCounter(prometheus.CounterOpts{ RetriesExhaustedCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
...@@ -129,13 +106,6 @@ func newMetrics() metrics { ...@@ -129,13 +106,6 @@ func newMetrics() metrics {
Help: "Histogram for Time taken to send a chunk.", Help: "Histogram for Time taken to send a chunk.",
Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60}, Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60},
}), }),
MarkAndSweepTimer: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "mark_and_sweep_time_histogram",
Help: "Histogram of time spent in mark and sweep.",
Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60},
}),
ReceiptRTT: prometheus.NewHistogram(prometheus.HistogramOpts{ ReceiptRTT: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mock
import (
"context"
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/swarm"
)
type PushSync struct {
sendChunk func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error)
}
func New(sendChunk func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error)) *PushSync {
return &PushSync{sendChunk: sendChunk}
}
func (s *PushSync) PushChunkToClosest(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
return s.sendChunk(ctx, chunk)
}
...@@ -25,40 +25,39 @@ const ( ...@@ -25,40 +25,39 @@ const (
streamName = "pushsync" streamName = "pushsync"
) )
type PushSyncer interface {
PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Receipt, error)
}
type Receipt struct {
Address swarm.Address
}
type PushSync struct { type PushSync struct {
streamer p2p.Streamer streamer p2p.Streamer
storer storage.Storer storer storage.Putter
peerSuggester topology.ClosestPeerer peerSuggester topology.ClosestPeerer
logger logging.Logger logger logging.Logger
metrics metrics metrics metrics
quit chan struct{}
chunksWorkerQuitC chan struct{}
} }
type Options struct { type Options struct {
Streamer p2p.Streamer Streamer p2p.Streamer
Storer storage.Storer Storer storage.Putter
ClosestPeerer topology.ClosestPeerer ClosestPeerer topology.ClosestPeerer
Logger logging.Logger Logger logging.Logger
} }
var ( var timeToWaitForReceipt = 3 * time.Second // time to wait to get a receipt for a chunk
retryInterval = 10 * time.Second // time interval between retries
timeToWaitForReceipt = 3 * time.Second // time to wait to get a receipt for a chunk
)
func New(o Options) *PushSync { func New(o Options) *PushSync {
ps := &PushSync{ ps := &PushSync{
streamer: o.Streamer, streamer: o.Streamer,
storer: o.Storer, storer: o.Storer,
peerSuggester: o.ClosestPeerer, peerSuggester: o.ClosestPeerer,
logger: o.Logger, logger: o.Logger,
metrics: newMetrics(), metrics: newMetrics(),
quit: make(chan struct{}),
chunksWorkerQuitC: make(chan struct{}),
} }
go ps.chunksWorker()
return ps return ps
} }
...@@ -75,17 +74,6 @@ func (s *PushSync) Protocol() p2p.ProtocolSpec { ...@@ -75,17 +74,6 @@ func (s *PushSync) Protocol() p2p.ProtocolSpec {
} }
} }
func (ps *PushSync) Close() error {
close(ps.quit)
// Wait for chunks worker to finish
select {
case <-ps.chunksWorkerQuitC:
case <-time.After(3 * time.Second):
}
return nil
}
// handler handles chunk delivery from other node and forwards to its destination node. // handler handles chunk delivery from other node and forwards to its destination node.
// If the current node is the destination, it stores in the local store and sends a receipt. // If the current node is the destination, it stores in the local store and sends a receipt.
func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error { func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
...@@ -111,7 +99,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -111,7 +99,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
} }
ps.metrics.TotalChunksStoredInDB.Inc() ps.metrics.TotalChunksStoredInDB.Inc()
// Send a receipt immediately once the storage of the chunk is successful // Send a receipt immediately once the storage of the chunk is successfully
receipt := &pb.Receipt{Address: chunk.Address().Bytes()} receipt := &pb.Receipt{Address: chunk.Address().Bytes()}
err = ps.sendReceipt(w, receipt) err = ps.sendReceipt(w, receipt)
if err != nil { if err != nil {
...@@ -133,7 +121,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -133,7 +121,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
} }
ps.metrics.TotalChunksStoredInDB.Inc() ps.metrics.TotalChunksStoredInDB.Inc()
// Send a receipt immediately once the storage of the chunk is successful // Send a receipt immediately once the storage of the chunk is successfully
receipt := &pb.Receipt{Address: chunk.Address().Bytes()} receipt := &pb.Receipt{Address: chunk.Address().Bytes()}
return ps.sendReceipt(w, receipt) return ps.sendReceipt(w, receipt)
} }
...@@ -220,115 +208,48 @@ func (ps *PushSync) receiveReceipt(r protobuf.Reader) (receipt pb.Receipt, err e ...@@ -220,115 +208,48 @@ func (ps *PushSync) receiveReceipt(r protobuf.Reader) (receipt pb.Receipt, err e
return receipt, nil return receipt, nil
} }
// chunksWorker is a loop that keeps looking for chunks that are locally uploaded ( by monitoring pushIndex ) // PushChunkToClosest sends chunk to the closest peer by opening a stream. It then waits for
// and pushes them to the closest peer and get a receipt. // a receipt from that peer and returns error or nil based on the receiving and
func (ps *PushSync) chunksWorker() { // the validity of the receipt.
var chunks <-chan swarm.Chunk func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Receipt, error) {
var unsubscribe func() peer, err := ps.peerSuggester.ClosestPeer(ch.Address())
// timer, initially set to 0 to fall through select case on timer.C for initialisation if err != nil {
timer := time.NewTimer(0) if errors.Is(err, topology.ErrWantSelf) {
defer timer.Stop() // if you are the closest node return a receipt immediately
defer close(ps.chunksWorkerQuitC) return &Receipt{
chunksInBatch := -1 Address: ch.Address(),
ctx, cancel := context.WithCancel(context.Background()) }, nil
go func() {
<-ps.quit
cancel()
}()
for {
select {
// handle incoming chunks
case ch, more := <-chunks:
// if no more, set to nil, reset timer to 0 to finalise batch immediately
if !more {
chunks = nil
var dur time.Duration
if chunksInBatch == 0 {
dur = 500 * time.Millisecond
}
timer.Reset(dur)
break
}
chunksInBatch++
ps.metrics.TotalChunksToBeSentCounter.Inc()
peer, err := ps.peerSuggester.ClosestPeer(ch.Address())
if err != nil {
if errors.Is(err, topology.ErrWantSelf) {
if err := ps.storer.Set(ctx, storage.ModeSetSyncPush, ch.Address()); err != nil {
ps.logger.Errorf("pushsync: error setting chunks to synced: %v", err)
}
continue
}
}
// TODO: make this function as a go routine and process several chunks in parallel
if err := ps.SendChunkAndReceiveReceipt(ctx, peer, ch); err != nil {
ps.logger.Errorf("pushsync: error while sending chunk or receiving receipt: %v", err)
continue
}
// retry interval timer triggers starting from new
case <-timer.C:
// initially timer is set to go off as well as every time we hit the end of push index
startTime := time.Now()
// if subscribe was running, stop it
if unsubscribe != nil {
unsubscribe()
}
// and start iterating on Push index from the beginning
chunks, unsubscribe = ps.storer.SubscribePush(ctx)
// reset timer to go off after retryInterval
timer.Reset(retryInterval)
ps.metrics.MarkAndSweepTimer.Observe(time.Since(startTime).Seconds())
case <-ps.quit:
if unsubscribe != nil {
unsubscribe()
}
return
} }
return nil, fmt.Errorf("closest peer: %w", err)
} }
}
// sendChunkAndReceiveReceipt sends chunk to a given peer
// by opening a stream. It then waits for a receipt from that peer.
// Once the receipt is received within a given time frame it marks that this chunk
// as synced in the localstore.
func (ps *PushSync) SendChunkAndReceiveReceipt(ctx context.Context, peer swarm.Address, ch swarm.Chunk) error {
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil { if err != nil {
return fmt.Errorf("new stream: %w", err) return nil, fmt.Errorf("new stream: %w", err)
} }
defer streamer.Close() defer streamer.Close()
w, r := protobuf.NewWriterAndReader(streamer) w, r := protobuf.NewWriterAndReader(streamer)
if err := ps.sendChunkDelivery(w, ch); err != nil { if err := ps.sendChunkDelivery(w, ch); err != nil {
return fmt.Errorf("chunk deliver: %w", err) return nil, fmt.Errorf("chunk deliver: %w", err)
} }
receiptRTTTimer := time.Now() receiptRTTTimer := time.Now()
receipt, err := ps.receiveReceipt(r) receipt, err := ps.receiveReceipt(r)
if err != nil { if err != nil {
return fmt.Errorf("receive receipt: %w", err) return nil, fmt.Errorf("receive receipt: %w", err)
} }
ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds()) ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds())
// Check if the receipt is valid // Check if the receipt is valid
if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) { if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) {
ps.metrics.InvalidReceiptReceived.Inc() ps.metrics.InvalidReceiptReceived.Inc()
return errors.New("invalid receipt") return nil, errors.New("invalid receipt")
} }
// set chunk status to synced, insert to db GC index rec := &Receipt{
if err := ps.storer.Set(ctx, storage.ModeSetSyncPush, ch.Address()); err != nil { Address: swarm.NewAddress(receipt.Address),
ps.metrics.ErrorSettingChunkToSynced.Inc()
return fmt.Errorf("chunk store: %w", err)
} }
ps.metrics.TotalChunksSynced.Inc() return rec, nil
return nil
} }
...@@ -17,7 +17,6 @@ import ( ...@@ -17,7 +17,6 @@ import (
"github.com/ethersphere/bee/pkg/p2p/streamtest" "github.com/ethersphere/bee/pkg/p2p/streamtest"
"github.com/ethersphere/bee/pkg/pushsync" "github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/pushsync/pb" "github.com/ethersphere/bee/pkg/pushsync/pb"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/mock" "github.com/ethersphere/bee/pkg/topology/mock"
...@@ -39,7 +38,6 @@ func TestSendChunkAndReceiveReceipt(t *testing.T) { ...@@ -39,7 +38,6 @@ func TestSendChunkAndReceiveReceipt(t *testing.T) {
// mock should return ErrWantSelf since there's no one to forward to // mock should return ErrWantSelf since there's no one to forward to
psPeer, storerPeer := createPushSyncNode(t, closestPeer, nil, mock.WithClosestPeerErr(topology.ErrWantSelf)) psPeer, storerPeer := createPushSyncNode(t, closestPeer, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer.Close() defer storerPeer.Close()
defer psPeer.Close()
recorder := streamtest.New( recorder := streamtest.New(
streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithProtocols(psPeer.Protocol()),
...@@ -53,21 +51,22 @@ func TestSendChunkAndReceiveReceipt(t *testing.T) { ...@@ -53,21 +51,22 @@ func TestSendChunkAndReceiveReceipt(t *testing.T) {
psPivot, storerPivot := createPushSyncNode(t, pivotNode, recorder, mock.WithClosestPeer(closestPeer)) psPivot, storerPivot := createPushSyncNode(t, pivotNode, recorder, mock.WithClosestPeer(closestPeer))
defer storerPivot.Close() defer storerPivot.Close()
// upload the chunk to the pivot node // Trigger the sending of chunk to the closest node
_, err := storerPivot.Put(context.Background(), storage.ModePutUpload, chunk) receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if !chunk.Address().Equal(receipt.Address) {
t.Fatal("invalid receipt")
}
// this intercepts the outgoing delivery message // this intercepts the outgoing delivery message
waitOnRecordAndTest(t, closestPeer, recorder, chunkAddress, chunkData) waitOnRecordAndTest(t, closestPeer, recorder, chunkAddress, chunkData)
// this intercepts the incoming receipt message // this intercepts the incoming receipt message
waitOnRecordAndTest(t, closestPeer, recorder, chunkAddress, nil) waitOnRecordAndTest(t, closestPeer, recorder, chunkAddress, nil)
// Close the pushsync and then the DB
psPivot.Close()
} }
// TestHandler expect a chunk from a node on a stream. It then stores the chunk in the local store and // TestHandler expect a chunk from a node on a stream. It then stores the chunk in the local store and
...@@ -113,13 +112,15 @@ func TestHandler(t *testing.T) { ...@@ -113,13 +112,15 @@ func TestHandler(t *testing.T) {
psTriggerPeer, triggerStorerDB := createPushSyncNode(t, triggerPeer, pivotRecorder, mock.WithClosestPeer(pivotPeer)) psTriggerPeer, triggerStorerDB := createPushSyncNode(t, triggerPeer, pivotRecorder, mock.WithClosestPeer(pivotPeer))
defer triggerStorerDB.Close() defer triggerStorerDB.Close()
// upload the chunk to the trigger node DB which triggers the chain reaction of sending this chunk receipt, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk)
// from trigger peer to pivot peer to closest peer.
_, err := triggerStorerDB.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if !chunk.Address().Equal(receipt.Address) {
t.Fatal("invalid receipt")
}
// In pivot peer, intercept the incoming delivery chunk from the trigger peer and check for correctness // In pivot peer, intercept the incoming delivery chunk from the trigger peer and check for correctness
waitOnRecordAndTest(t, pivotPeer, pivotRecorder, chunkAddress, chunkData) waitOnRecordAndTest(t, pivotPeer, pivotRecorder, chunkAddress, chunkData)
...@@ -132,11 +133,6 @@ func TestHandler(t *testing.T) { ...@@ -132,11 +133,6 @@ func TestHandler(t *testing.T) {
// In the received stream, check if a receipt is sent from pivot peer and check for its correctness. // In the received stream, check if a receipt is sent from pivot peer and check for its correctness.
waitOnRecordAndTest(t, pivotPeer, pivotRecorder, chunkAddress, nil) waitOnRecordAndTest(t, pivotPeer, pivotRecorder, chunkAddress, nil)
// close push sync before the storers are closed to avoid data race
psClosestPeer.Close()
psPivot.Close()
psTriggerPeer.Close()
} }
func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.Recorder, mockOpts ...mock.Option) (*pushsync.PushSync, *localstore.DB) { func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.Recorder, mockOpts ...mock.Option) (*pushsync.PushSync, *localstore.DB) {
...@@ -207,5 +203,4 @@ func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest. ...@@ -207,5 +203,4 @@ func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.
t.Fatalf("receipt address mismatch") t.Fatalf("receipt address mismatch")
} }
} }
} }
...@@ -129,7 +129,7 @@ func (d *Descriptor) String() string { ...@@ -129,7 +129,7 @@ func (d *Descriptor) String() string {
type Storer interface { type Storer interface {
Get(ctx context.Context, mode ModeGet, addr swarm.Address) (ch swarm.Chunk, err error) Get(ctx context.Context, mode ModeGet, addr swarm.Address) (ch swarm.Chunk, err error)
GetMulti(ctx context.Context, mode ModeGet, addrs ...swarm.Address) (ch []swarm.Chunk, err error) GetMulti(ctx context.Context, mode ModeGet, addrs ...swarm.Address) (ch []swarm.Chunk, err error)
Put(ctx context.Context, mode ModePut, chs ...swarm.Chunk) (exist []bool, err error) Putter
Has(ctx context.Context, addr swarm.Address) (yes bool, err error) Has(ctx context.Context, addr swarm.Address) (yes bool, err error)
HasMulti(ctx context.Context, addrs ...swarm.Address) (yes []bool, err error) HasMulti(ctx context.Context, addrs ...swarm.Address) (yes []bool, err error)
Set(ctx context.Context, mode ModeSet, addrs ...swarm.Address) (err error) Set(ctx context.Context, mode ModeSet, addrs ...swarm.Address) (err error)
...@@ -139,6 +139,10 @@ type Storer interface { ...@@ -139,6 +139,10 @@ type Storer interface {
io.Closer io.Closer
} }
type Putter interface {
Put(ctx context.Context, mode ModePut, chs ...swarm.Chunk) (exist []bool, err error)
}
// StateStorer defines methods required to get, set, delete values for different keys // StateStorer defines methods required to get, set, delete values for different keys
// and close the underlying resources. // and close the underlying resources.
type StateStorer interface { type StateStorer interface {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment