Commit f5aa9716 authored by acud's avatar acud Committed by GitHub

pusher: add tracing (#746)

* add tracing to pusher
parent a1765172
......@@ -353,7 +353,7 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
b.recoveryHandleCleanup = psss.Register(recovery.RecoveryTopic, chunkRepairHandler)
}
pushSyncPusher := pusher.New(storer, kad, pushSyncProtocol, tagg, logger)
pushSyncPusher := pusher.New(storer, kad, pushSyncProtocol, tagg, logger, tracer)
b.pusherCloser = pushSyncPusher
pullStorage := pullstorage.New(storer)
......
......@@ -16,6 +16,8 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/opentracing/opentracing-go"
)
type Service struct {
......@@ -23,19 +25,21 @@ type Service struct {
pushSyncer pushsync.PushSyncer
logger logging.Logger
tagg *tags.Tags
tracer *tracing.Tracer
metrics metrics
quit chan struct{}
chunksWorkerQuitC chan struct{}
}
var retryInterval = 10 * time.Second // time interval between retries
var retryInterval = 5 * time.Second // time interval between retries
func New(storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger) *Service {
func New(storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer) *Service {
service := &Service{
storer: storer,
pushSyncer: pushSyncer,
tagg: tagger,
logger: logger,
tracer: tracer,
metrics: newMetrics(),
quit: make(chan struct{}),
chunksWorkerQuitC: make(chan struct{}),
......@@ -54,7 +58,7 @@ func (s *Service) chunksWorker() {
defer timer.Stop()
defer close(s.chunksWorkerQuitC)
chunksInBatch := -1
ctx, cancel := context.WithCancel(context.Background())
cctx, cancel := context.WithCancel(context.Background())
go func() {
<-s.quit
cancel()
......@@ -62,13 +66,15 @@ func (s *Service) chunksWorker() {
sem := make(chan struct{}, 10)
inflight := make(map[string]struct{})
var mtx sync.Mutex
var span opentracing.Span
ctx := cctx
LOOP:
for {
select {
// handle incoming chunks
case ch, more := <-chunks:
// if no more, set to nil, reset timer to 0 to finalise batch immediately
// if no more, set to nil, reset timer to finalise batch
if !more {
chunks = nil
var dur time.Duration
......@@ -79,6 +85,10 @@ LOOP:
break
}
if span == nil {
span, _, ctx = s.tracer.StartSpanFromContext(cctx, "pusher-sync-batch", s.logger)
}
// postpone a retry only after we've finished processing everything in index
timer.Reset(retryInterval)
chunksInBatch++
......@@ -89,6 +99,10 @@ LOOP:
if unsubscribe != nil {
unsubscribe()
}
if span != nil {
span.Finish()
}
return
}
mtx.Lock()
......@@ -138,6 +152,8 @@ LOOP:
unsubscribe()
}
chunksInBatch = 0
// and start iterating on Push index from the beginning
chunks, unsubscribe = s.storer.SubscribePush(ctx)
......@@ -145,10 +161,19 @@ LOOP:
timer.Reset(retryInterval)
s.metrics.MarkAndSweepTimer.Observe(time.Since(startTime).Seconds())
if span != nil {
span.Finish()
span = nil
}
case <-s.quit:
if unsubscribe != nil {
unsubscribe()
}
if span != nil {
span.Finish()
}
break LOOP
}
}
......
......@@ -7,12 +7,13 @@ package pusher_test
import (
"context"
"errors"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"io/ioutil"
"sync"
"testing"
"time"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pusher"
......@@ -237,7 +238,7 @@ func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.Pus
}
peerSuggester := mock.NewTopologyDriver(mockOpts...)
pusherService := pusher.New(pusherStorer, peerSuggester, pushSyncService, mtags, logger)
pusherService := pusher.New(pusherStorer, peerSuggester, pushSyncService, mtags, logger, nil)
return mtags, pusherService, pusherStorer
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment