Commit db5a1bba authored by acud's avatar acud Committed by GitHub

* add tracing to pushsync and retrieval protocols (#714)

parent 2f565f9f
...@@ -241,7 +241,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service, ...@@ -241,7 +241,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
chunkvalidator := swarm.NewChunkValidator(soc.NewValidator(), content.NewValidator()) chunkvalidator := swarm.NewChunkValidator(soc.NewValidator(), content.NewValidator())
retrieve := retrieval.New(swarmAddress, p2ps, kad, logger, acc, accounting.NewFixedPricer(swarmAddress, 10), chunkvalidator) retrieve := retrieval.New(swarmAddress, p2ps, kad, logger, acc, accounting.NewFixedPricer(swarmAddress, 10), chunkvalidator, tracer)
tagg := tags.NewTags(stateStore, logger) tagg := tags.NewTags(stateStore, logger)
b.tagsCloser = tagg b.tagsCloser = tagg
...@@ -263,7 +263,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service, ...@@ -263,7 +263,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
} }
retrieve.SetStorer(ns) retrieve.SetStorer(ns)
pushSyncProtocol := pushsync.New(p2ps, storer, kad, tagg, psss.TryUnwrap, logger, acc, accounting.NewFixedPricer(swarmAddress, 10)) pushSyncProtocol := pushsync.New(p2ps, storer, kad, tagg, psss.TryUnwrap, logger, acc, accounting.NewFixedPricer(swarmAddress, 10), tracer)
// set the pushSyncer in the PSS // set the pushSyncer in the PSS
psss.SetPushSyncer(pushSyncProtocol) psss.SetPushSyncer(pushSyncProtocol)
......
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/tracing"
) )
const ( const (
...@@ -45,11 +46,12 @@ type PushSync struct { ...@@ -45,11 +46,12 @@ type PushSync struct {
accounting accounting.Interface accounting accounting.Interface
pricer accounting.Pricer pricer accounting.Pricer
metrics metrics metrics metrics
tracer *tracing.Tracer
} }
var timeToWaitForReceipt = 3 * time.Second // time to wait to get a receipt for a chunk var timeToWaitForReceipt = 3 * time.Second // time to wait to get a receipt for a chunk
func New(streamer p2p.Streamer, storer storage.Putter, closestPeerer topology.ClosestPeerer, tagger *tags.Tags, deliveryCallback func(context.Context, swarm.Chunk) error, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer) *PushSync { func New(streamer p2p.Streamer, storer storage.Putter, closestPeerer topology.ClosestPeerer, tagger *tags.Tags, deliveryCallback func(context.Context, swarm.Chunk) error, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, tracer *tracing.Tracer) *PushSync {
ps := &PushSync{ ps := &PushSync{
streamer: streamer, streamer: streamer,
storer: storer, storer: storer,
...@@ -88,12 +90,14 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -88,12 +90,14 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
_ = stream.FullClose() _ = stream.FullClose()
} }
}() }()
// Get the delivery // Get the delivery
chunk, err := ps.getChunkDelivery(r) chunk, err := ps.getChunkDelivery(r)
if err != nil { if err != nil {
return fmt.Errorf("chunk delivery from peer %s: %w", p.Address.String(), err) return fmt.Errorf("chunk delivery from peer %s: %w", p.Address.String(), err)
} }
span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger)
span = span.SetTag("address", chunk.Address().String())
defer span.Finish()
// Select the closest peer to forward the chunk // Select the closest peer to forward the chunk
peer, err := ps.peerSuggester.ClosestPeer(chunk.Address()) peer, err := ps.peerSuggester.ClosestPeer(chunk.Address())
...@@ -215,6 +219,10 @@ func (ps *PushSync) receiveReceipt(r protobuf.Reader) (receipt pb.Receipt, err e ...@@ -215,6 +219,10 @@ func (ps *PushSync) receiveReceipt(r protobuf.Reader) (receipt pb.Receipt, err e
// a receipt from that peer and returns error or nil based on the receiving and // a receipt from that peer and returns error or nil based on the receiving and
// the validity of the receipt. // the validity of the receipt.
func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Receipt, error) { func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Receipt, error) {
span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-push", ps.logger)
span = span.SetTag("address", ch.Address().String())
defer span.Finish()
peer, err := ps.peerSuggester.ClosestPeer(ch.Address()) peer, err := ps.peerSuggester.ClosestPeer(ch.Address())
if err != nil { if err != nil {
if errors.Is(err, topology.ErrWantSelf) { if errors.Is(err, topology.ErrWantSelf) {
......
...@@ -7,11 +7,12 @@ package pushsync_test ...@@ -7,11 +7,12 @@ package pushsync_test
import ( import (
"bytes" "bytes"
"context" "context"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"io/ioutil" "io/ioutil"
"testing" "testing"
"time" "time"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/accounting" "github.com/ethersphere/bee/pkg/accounting"
accountingmock "github.com/ethersphere/bee/pkg/accounting/mock" accountingmock "github.com/ethersphere/bee/pkg/accounting/mock"
"github.com/ethersphere/bee/pkg/localstore" "github.com/ethersphere/bee/pkg/localstore"
...@@ -293,7 +294,7 @@ func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.R ...@@ -293,7 +294,7 @@ func createPushSyncNode(t *testing.T, addr swarm.Address, recorder *streamtest.R
mockAccounting := accountingmock.NewAccounting() mockAccounting := accountingmock.NewAccounting()
mockPricer := accountingmock.NewPricer(fixedPrice, fixedPrice) mockPricer := accountingmock.NewPricer(fixedPrice, fixedPrice)
return pushsync.New(recorder, storer, mockTopology, mtag, pssDeliver, logger, mockAccounting, mockPricer), storer, mtag, mockAccounting return pushsync.New(recorder, storer, mockTopology, mtag, pssDeliver, logger, mockAccounting, mockPricer, nil), storer, mtag, mockAccounting
} }
func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) { func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
......
...@@ -260,12 +260,12 @@ func newTestNetStore(t *testing.T, recoveryFunc recovery.RecoveryHook) storage.S ...@@ -260,12 +260,12 @@ func newTestNetStore(t *testing.T, recoveryFunc recovery.RecoveryHook) storage.S
_, _, _ = f(peerID, 0) _, _, _ = f(peerID, 0)
return nil return nil
}} }}
server := retrieval.New(swarm.ZeroAddress, nil, nil, logger, serverMockAccounting, nil, nil) server := retrieval.New(swarm.ZeroAddress, nil, nil, logger, serverMockAccounting, nil, nil, nil)
server.SetStorer(mockStorer) server.SetStorer(mockStorer)
recorder := streamtest.New( recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()), streamtest.WithProtocols(server.Protocol()),
) )
retrieve := retrieval.New(swarm.ZeroAddress, recorder, ps, logger, serverMockAccounting, pricerMock, nil) retrieve := retrieval.New(swarm.ZeroAddress, recorder, ps, logger, serverMockAccounting, pricerMock, nil, nil)
retrieve.SetStorer(mockStorer) retrieve.SetStorer(mockStorer)
ns := netstore.New(storer, recoveryFunc, retrieve, logger, nil) ns := netstore.New(storer, recoveryFunc, retrieve, logger, nil)
return ns return ns
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/tracing"
"golang.org/x/sync/singleflight" "golang.org/x/sync/singleflight"
) )
...@@ -44,9 +45,10 @@ type Service struct { ...@@ -44,9 +45,10 @@ type Service struct {
accounting accounting.Interface accounting accounting.Interface
pricer accounting.Pricer pricer accounting.Pricer
validator swarm.Validator validator swarm.Validator
tracer *tracing.Tracer
} }
func New(addr swarm.Address, streamer p2p.Streamer, chunkPeerer topology.EachPeerer, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, validator swarm.Validator) *Service { func New(addr swarm.Address, streamer p2p.Streamer, chunkPeerer topology.EachPeerer, logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, validator swarm.Validator, tracer *tracing.Tracer) *Service {
return &Service{ return &Service{
addr: addr, addr: addr,
streamer: streamer, streamer: streamer,
...@@ -55,6 +57,7 @@ func New(addr swarm.Address, streamer p2p.Streamer, chunkPeerer topology.EachPee ...@@ -55,6 +57,7 @@ func New(addr swarm.Address, streamer p2p.Streamer, chunkPeerer topology.EachPee
accounting: accounting, accounting: accounting,
pricer: pricer, pricer: pricer,
validator: validator, validator: validator,
tracer: tracer,
} }
} }
...@@ -81,6 +84,10 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm. ...@@ -81,6 +84,10 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm.
defer cancel() defer cancel()
v, err, _ := s.singleflight.Do(addr.String(), func() (interface{}, error) { v, err, _ := s.singleflight.Do(addr.String(), func() (interface{}, error) {
span, logger, ctx := s.tracer.StartSpanFromContext(ctx, "retrieve-chunk", s.logger)
span = span.SetTag("address", addr.String())
defer span.Finish()
var skipPeers []swarm.Address var skipPeers []swarm.Address
for i := 0; i < maxPeers; i++ { for i := 0; i < maxPeers; i++ {
var peer swarm.Address var peer swarm.Address
...@@ -89,14 +96,14 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm. ...@@ -89,14 +96,14 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm.
if peer.IsZero() { if peer.IsZero() {
return nil, err return nil, err
} }
s.logger.Debugf("retrieval: failed to get chunk %s from peer %s: %v", addr, peer, err) logger.Debugf("retrieval: failed to get chunk %s from peer %s: %v", addr, peer, err)
skipPeers = append(skipPeers, peer) skipPeers = append(skipPeers, peer)
continue continue
} }
s.logger.Tracef("retrieval: got chunk %s from peer %s", addr, peer) logger.Tracef("retrieval: got chunk %s from peer %s", addr, peer)
return chunk, nil return chunk, nil
} }
s.logger.Tracef("retrieval: failed to get chunk %s: reached max peers of %v", addr, maxPeers) logger.Tracef("retrieval: failed to get chunk %s: reached max peers of %v", addr, maxPeers)
return nil, storage.ErrNotFound return nil, storage.ErrNotFound
}) })
if err != nil { if err != nil {
...@@ -231,6 +238,10 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e ...@@ -231,6 +238,10 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e
if err := r.ReadMsg(&req); err != nil { if err := r.ReadMsg(&req); err != nil {
return fmt.Errorf("read request: %w peer %s", err, p.Address.String()) return fmt.Errorf("read request: %w peer %s", err, p.Address.String())
} }
span, _, ctx := s.tracer.StartSpanFromContext(ctx, "handle-retrieve-chunk", s.logger)
span = span.SetTag("address", swarm.NewAddress(req.Addr).String())
defer span.Finish()
ctx = context.WithValue(ctx, requestSourceContextKey{}, p.Address.String()) ctx = context.WithValue(ctx, requestSourceContextKey{}, p.Address.String())
chunk, err := s.storer.Get(ctx, storage.ModeGetRequest, swarm.NewAddress(req.Addr)) chunk, err := s.storer.Get(ctx, storage.ModeGetRequest, swarm.NewAddress(req.Addr))
if err != nil { if err != nil {
......
...@@ -51,7 +51,7 @@ func TestDelivery(t *testing.T) { ...@@ -51,7 +51,7 @@ func TestDelivery(t *testing.T) {
pricerMock := accountingmock.NewPricer(price, price) pricerMock := accountingmock.NewPricer(price, price)
// create the server that will handle the request and will serve the response // create the server that will handle the request and will serve the response
server := retrieval.New(swarm.MustParseHexAddress("00112234"), nil, nil, logger, serverMockAccounting, pricerMock, mockValidator) server := retrieval.New(swarm.MustParseHexAddress("00112234"), nil, nil, logger, serverMockAccounting, pricerMock, mockValidator, nil)
server.SetStorer(mockStorer) server.SetStorer(mockStorer)
recorder := streamtest.New( recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()), streamtest.WithProtocols(server.Protocol()),
...@@ -70,7 +70,7 @@ func TestDelivery(t *testing.T) { ...@@ -70,7 +70,7 @@ func TestDelivery(t *testing.T) {
_, _, _ = f(peerID, 0) _, _, _ = f(peerID, 0)
return nil return nil
}} }}
client := retrieval.New(swarm.MustParseHexAddress("9ee7add8"), recorder, ps, logger, clientMockAccounting, pricerMock, mockValidator) client := retrieval.New(swarm.MustParseHexAddress("9ee7add8"), recorder, ps, logger, clientMockAccounting, pricerMock, mockValidator, nil)
client.SetStorer(clientMockStorer) client.SetStorer(clientMockStorer)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout) ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel() defer cancel()
...@@ -153,7 +153,7 @@ func TestRetrieveChunk(t *testing.T) { ...@@ -153,7 +153,7 @@ func TestRetrieveChunk(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
server := retrieval.New(serverAddress, nil, nil, logger, accountingmock.NewAccounting(), pricer, mockValidator) server := retrieval.New(serverAddress, nil, nil, logger, accountingmock.NewAccounting(), pricer, mockValidator, nil)
server.SetStorer(serverStorer) server.SetStorer(serverStorer)
recorder := streamtest.New(streamtest.WithProtocols(server.Protocol())) recorder := streamtest.New(streamtest.WithProtocols(server.Protocol()))
...@@ -162,7 +162,7 @@ func TestRetrieveChunk(t *testing.T) { ...@@ -162,7 +162,7 @@ func TestRetrieveChunk(t *testing.T) {
_, _, _ = f(serverAddress, 0) _, _, _ = f(serverAddress, 0)
return nil return nil
}} }}
client := retrieval.New(clientAddress, recorder, clientSuggester, logger, accountingmock.NewAccounting(), pricer, mockValidator) client := retrieval.New(clientAddress, recorder, clientSuggester, logger, accountingmock.NewAccounting(), pricer, mockValidator, nil)
got, err := client.RetrieveChunk(context.Background(), chunkAddress) got, err := client.RetrieveChunk(context.Background(), chunkAddress)
if err != nil { if err != nil {
...@@ -179,7 +179,7 @@ func TestRetrieveChunk(t *testing.T) { ...@@ -179,7 +179,7 @@ func TestRetrieveChunk(t *testing.T) {
chunkAddress := swarm.MustParseHexAddress("02") chunkAddress := swarm.MustParseHexAddress("02")
clientAddress := swarm.MustParseHexAddress("03") clientAddress := swarm.MustParseHexAddress("03")
server := retrieval.New(serverAddress, nil, nil, logger, accountingmock.NewAccounting(), pricer, mockValidator) server := retrieval.New(serverAddress, nil, nil, logger, accountingmock.NewAccounting(), pricer, mockValidator, nil)
recorder := streamtest.New(streamtest.WithProtocols(server.Protocol())) recorder := streamtest.New(streamtest.WithProtocols(server.Protocol()))
...@@ -187,7 +187,7 @@ func TestRetrieveChunk(t *testing.T) { ...@@ -187,7 +187,7 @@ func TestRetrieveChunk(t *testing.T) {
_, _, _ = f(serverAddress, 0) _, _, _ = f(serverAddress, 0)
return nil return nil
}} }}
client := retrieval.New(clientAddress, recorder, clientSuggester, logger, accountingmock.NewAccounting(), pricer, mockValidator) client := retrieval.New(clientAddress, recorder, clientSuggester, logger, accountingmock.NewAccounting(), pricer, mockValidator, nil)
// do not request from the upstream peer // do not request from the upstream peer
_, err := client.RetrieveChunk(context.Background(), chunkAddress) _, err := client.RetrieveChunk(context.Background(), chunkAddress)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment