Commit 6eedd003 authored by Zahoor Mohamed's avatar Zahoor Mohamed Committed by GitHub

Fix getting tags by address in pusher (#334)

*Fix tags in pusher
parent d39c5638
......@@ -237,7 +237,7 @@ func NewBee(o Options) (*Bee, error) {
ChunkPeerer: topologyDriver,
Logger: logger,
})
tag := tags.NewTags()
tagg := tags.NewTags()
if err = p2ps.AddProtocol(retrieve.Protocol()); err != nil {
return nil, fmt.Errorf("retrieval service: %w", err)
......@@ -251,6 +251,7 @@ func NewBee(o Options) (*Bee, error) {
Streamer: p2ps,
Storer: storer,
ClosestPeerer: topologyDriver,
Tagger: tagg,
Logger: logger,
})
......@@ -262,7 +263,7 @@ func NewBee(o Options) (*Bee, error) {
Storer: storer,
PeerSuggester: topologyDriver,
PushSyncer: pushSyncProtocol,
Tags: tag,
Tagger: tagg,
Logger: logger,
})
b.pusherCloser = pushSyncPusher
......@@ -293,7 +294,7 @@ func NewBee(o Options) (*Bee, error) {
if o.APIAddr != "" {
// API server
apiService = api.New(api.Options{
Tags: tag,
Tags: tagg,
Storer: ns,
CORSAllowedOrigins: o.CORSAllowedOrigins,
Logger: logger,
......
......@@ -21,8 +21,8 @@ import (
type Service struct {
storer storage.Storer
pushSyncer pushsync.PushSyncer
tag *tags.Tags
logger logging.Logger
tagg *tags.Tags
metrics metrics
quit chan struct{}
chunksWorkerQuitC chan struct{}
......@@ -31,8 +31,8 @@ type Service struct {
type Options struct {
Storer storage.Storer
PeerSuggester topology.ClosestPeerer
Tags *tags.Tags
PushSyncer pushsync.PushSyncer
Tagger *tags.Tags
Logger logging.Logger
}
......@@ -42,7 +42,7 @@ func New(o Options) *Service {
service := &Service{
storer: o.Storer,
pushSyncer: o.PushSyncer,
tag: o.Tags,
tagg: o.Tagger,
logger: o.Logger,
metrics: newMetrics(),
quit: make(chan struct{}),
......@@ -131,7 +131,7 @@ LOOP:
}
return
}
s.setChunkAsSynced(ctx, ch.Address())
s.setChunkAsSynced(ctx, ch)
}(ctx, ch)
case <-timer.C:
// initially timer is set to go off as well as every time we hit the end of push index
......@@ -173,11 +173,15 @@ LOOP:
}
}
func (s *Service) setChunkAsSynced(ctx context.Context, addr swarm.Address) {
if err := s.storer.Set(ctx, storage.ModeSetSyncPush, addr); err != nil {
func (s *Service) setChunkAsSynced(ctx context.Context, ch swarm.Chunk) {
if err := s.storer.Set(ctx, storage.ModeSetSyncPush, ch.Address()); err != nil {
s.logger.Errorf("pusher: error setting chunk as synced: %v", err)
s.metrics.ErrorSettingChunkToSynced.Inc()
}
t, err := s.tagg.Get(ch.TagID())
if err == nil && t != nil {
t.Inc(tags.StateSynced)
}
}
func (s *Service) Close() error {
......
......@@ -12,8 +12,6 @@ import (
"testing"
"time"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pusher"
......@@ -21,6 +19,7 @@ import (
pushsyncmock "github.com/ethersphere/bee/pkg/pushsync/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology/mock"
)
......@@ -47,8 +46,7 @@ func (s Store) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Add
// TestSendChunkToPushSync sends a chunk to pushsync to be sent ot its closest peer and get a receipt.
// once the receipt is got this check to see if the localstore is updated to see if the chunk is set
// as ModeSetSyncPush status.
func TestSendChunkToPushSync(t *testing.T) {
chunk := createChunk()
func TestSendChunkToPushSyncWithTag(t *testing.T) {
// create a trigger and a closestpeer
triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
......@@ -60,14 +58,15 @@ func TestSendChunkToPushSync(t *testing.T) {
}
return receipt, nil
})
mtag := tags.NewTags()
tag, err := mtag.Create("name", 1, false)
mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
defer storer.Close()
ta, err := mtags.Create("test", 1, false)
if err != nil {
t.Fatal(err)
}
tag.Address = chunk.Address()
p, storer := createPusher(t, triggerPeer, pushSyncService, mtag, mock.WithClosestPeer(closestPeer))
defer storer.Close()
chunk := createChunk().WithTagID(ta.Uid)
_, err = storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
......@@ -87,6 +86,11 @@ func TestSendChunkToPushSync(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if ta.Get(tags.StateSynced) != 1 {
t.Fatalf("tags error")
}
p.Close()
}
......@@ -105,15 +109,11 @@ func TestSendChunkToPushSyncWithoutTag(t *testing.T) {
}
return receipt, nil
})
mtag := tags.NewTags()
_, err := mtag.Create("name", 1, false)
if err != nil {
t.Fatal(err)
}
p, storer := createPusher(t, triggerPeer, pushSyncService, mtag, mock.WithClosestPeer(closestPeer))
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
defer storer.Close()
_, err = storer.Put(context.Background(), storage.ModePutUpload, chunk)
_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
......@@ -148,16 +148,10 @@ func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) {
return nil, errors.New("invalid receipt")
})
mtag := tags.NewTags()
tag, err := mtag.Create("name", 1, false)
if err != nil {
t.Fatal(err)
}
tag.Address = chunk.Address()
p, storer := createPusher(t, triggerPeer, pushSyncService, mtag, mock.WithClosestPeer(closestPeer))
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
defer storer.Close()
_, err = storer.Put(context.Background(), storage.ModePutUpload, chunk)
_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
......@@ -195,17 +189,11 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
return nil, nil
})
mtag := tags.NewTags()
tag, err := mtag.Create("name", 1, false)
if err != nil {
t.Fatal(err)
}
tag.Address = chunk.Address()
p, storer := createPusher(t, triggerPeer, pushSyncService, mtag, mock.WithClosestPeer(closestPeer))
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
defer storer.Close()
defer p.Close()
_, err = storer.Put(context.Background(), storage.ModePutUpload, chunk)
_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
......@@ -229,10 +217,10 @@ func createChunk() swarm.Chunk {
// chunk data to upload
chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")
chunkData := []byte("1234")
return swarm.NewChunk(chunkAddress, chunkData)
return swarm.NewChunk(chunkAddress, chunkData).WithTagID(666)
}
func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, tag *tags.Tags, mockOpts ...mock.Option) (*pusher.Service, *Store) {
func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, mockOpts ...mock.Option) (*tags.Tags, *pusher.Service, *Store) {
t.Helper()
logger := logging.New(ioutil.Discard, 0)
storer, err := localstore.New("", addr.Bytes(), nil, logger)
......@@ -240,6 +228,7 @@ func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.Pus
t.Fatal(err)
}
mtags := tags.NewTags()
pusherStorer := &Store{
Storer: storer,
modeSet: make(map[string]storage.ModeSet),
......@@ -247,8 +236,8 @@ func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.Pus
}
peerSuggester := mock.NewTopologyDriver(mockOpts...)
pusherService := pusher.New(pusher.Options{Storer: pusherStorer, Tags: tag, PushSyncer: pushSyncService, PeerSuggester: peerSuggester, Logger: logger})
return pusherService, pusherStorer
pusherService := pusher.New(pusher.Options{Storer: pusherStorer, PushSyncer: pushSyncService, Tagger: mtags, PeerSuggester: peerSuggester, Logger: logger})
return mtags, pusherService, pusherStorer
}
func checkIfModeSet(addr swarm.Address, mode storage.ModeSet, storer *Store) error {
......
......@@ -16,6 +16,7 @@ import (
"github.com/ethersphere/bee/pkg/pushsync/pb"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology"
)
......@@ -37,6 +38,7 @@ type PushSync struct {
streamer p2p.Streamer
storer storage.Putter
peerSuggester topology.ClosestPeerer
tagg *tags.Tags
logger logging.Logger
metrics metrics
}
......@@ -45,6 +47,7 @@ type Options struct {
Streamer p2p.Streamer
Storer storage.Putter
ClosestPeerer topology.ClosestPeerer
Tagger *tags.Tags
Logger logging.Logger
}
......@@ -55,6 +58,7 @@ func New(o Options) *PushSync {
streamer: o.Streamer,
storer: o.Storer,
peerSuggester: o.ClosestPeerer,
tagg: o.Tagger,
logger: o.Logger,
metrics: newMetrics(),
}
......@@ -134,7 +138,6 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
defer streamer.Close()
wc, rc := protobuf.NewWriterAndReader(streamer)
if err := ps.sendChunkDelivery(wc, chunk); err != nil {
return fmt.Errorf("forward chunk to peer %s: %w", peer.String(), err)
}
......@@ -233,8 +236,13 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
if err := ps.sendChunkDelivery(w, ch); err != nil {
return nil, fmt.Errorf("chunk deliver to peer %s: %w", peer.String(), err)
}
receiptRTTTimer := time.Now()
// if you manage to get a tag, just increment the respective counter
t, err := ps.tagg.Get(ch.TagID())
if err == nil && t != nil {
t.Inc(tags.StateSent)
}
receiptRTTTimer := time.Now()
receipt, err := ps.receiveReceipt(r)
if err != nil {
return nil, fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err)
......
......@@ -17,6 +17,7 @@ import (
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/pushsync/pb"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/mock"
)
......@@ -35,14 +36,14 @@ func TestSendChunkAndReceiveReceipt(t *testing.T) {
// peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to
psPeer, storerPeer := createPushSyncNode(t, closestPeer, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
psPeer, storerPeer, _ := createPushSyncNode(t, closestPeer, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer.Close()
recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()))
// pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream
psPivot, storerPivot := createPushSyncNode(t, pivotNode, recorder, mock.WithClosestPeer(closestPeer))
psPivot, storerPivot, _ := createPushSyncNode(t, pivotNode, recorder, mock.WithClosestPeer(closestPeer))
defer storerPivot.Close()
// Trigger the sending of chunk to the closest node
......@@ -63,6 +64,70 @@ func TestSendChunkAndReceiveReceipt(t *testing.T) {
}
// PushChunkToClosest tests the sending of chunk to closest peer from the origination source perspective.
// it also checks wether the tags are incremented properly if they are present
func TestPushChunkToClosest(t *testing.T) {
// chunk data to upload
chunkAddress := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")
chunkData := []byte("1234")
// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // binary 0110 -> po 1
// peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to
psPeer, storerPeer, _ := createPushSyncNode(t, closestPeer, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer.Close()
recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()))
// pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream
psPivot, storerPivot, pivotTags := createPushSyncNode(t, pivotNode, recorder, mock.WithClosestPeer(closestPeer))
defer storerPivot.Close()
ta, err := pivotTags.Create("test", 1, false)
if err != nil {
t.Fatal(err)
}
chunk := swarm.NewChunk(chunkAddress, chunkData).WithTagID(ta.Uid)
ta1, err := pivotTags.Get(ta.Uid)
if err != nil {
t.Fatal(err)
}
if ta1.Get(tags.StateSent) != 0 || ta1.Get(tags.StateSynced) != 0 {
t.Fatalf("tags initialization error")
}
// Trigger the sending of chunk to the closest node
receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
if err != nil {
t.Fatal(err)
}
if !chunk.Address().Equal(receipt.Address) {
t.Fatal("invalid receipt")
}
// this intercepts the outgoing delivery message
waitOnRecordAndTest(t, closestPeer, recorder, chunkAddress, chunkData)
// this intercepts the incoming receipt message
waitOnRecordAndTest(t, closestPeer, recorder, chunkAddress, nil)
ta2, err := pivotTags.Get(ta.Uid)
if err != nil {
t.Fatal(err)
}
if ta2.Get(tags.StateSent) != 1 {
t.Fatalf("tags error")
}
}
// TestHandler expect a chunk from a node on a stream. It then stores the chunk in the local store and
// sends back a receipt. This is tested by intercepting the incoming stream for proper messages.
// It also sends the chunk to the closest peerand receives a receipt.
......@@ -81,19 +146,19 @@ func TestHandler(t *testing.T) {
closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
// Create the closest peer
psClosestPeer, closestStorerPeerDB := createPushSyncNode(t, closestPeer, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
psClosestPeer, closestStorerPeerDB, _ := createPushSyncNode(t, closestPeer, nil, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer closestStorerPeerDB.Close()
closestRecorder := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()))
// creating the pivot peer
psPivot, storerPivotDB := createPushSyncNode(t, pivotPeer, closestRecorder, mock.WithClosestPeer(closestPeer))
psPivot, storerPivotDB, _ := createPushSyncNode(t, pivotPeer, closestRecorder, mock.WithClosestPeer(closestPeer))
defer storerPivotDB.Close()
pivotRecorder := streamtest.New(streamtest.WithProtocols(psPivot.Protocol()))
// Creating the trigger peer
psTriggerPeer, triggerStorerDB := createPushSyncNode(t, triggerPeer, pivotRecorder, mock.WithClosestPeer(pivotPeer))
psTriggerPeer, triggerStorerDB, _ := createPushSyncNode(t, triggerPeer, pivotRecorder, mock.WithClosestPeer(pivotPeer))
defer triggerStorerDB.Close()
receipt, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk)
......@@ -119,7 +184,7 @@ func TestHandler(t *testing.T) {
waitOnRecordAndTest(t, pivotPeer, pivotRecorder, chunkAddress, nil)
}
func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.Recorder, mockOpts ...mock.Option) (*pushsync.PushSync, *localstore.DB) {
func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.Recorder, mockOpts ...mock.Option) (*pushsync.PushSync, *localstore.DB, *tags.Tags) {
logger := logging.New(ioutil.Discard, 0)
storer, err := localstore.New("", addr.Bytes(), nil, logger)
......@@ -128,15 +193,17 @@ func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.R
}
mockTopology := mock.NewTopologyDriver(mockOpts...)
mtag := tags.NewTags()
ps := pushsync.New(pushsync.Options{
Streamer: recorder,
Storer: storer,
Tagger: mtag,
ClosestPeerer: mockTopology,
Logger: logger,
})
return ps, storer
return ps, storer, mtag
}
func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment