Commit 9918a799 authored by acud's avatar acud Committed by GitHub

pusher: disable broken tags and make pusher push chunks in parallel (#330)

* make pushsync push chunks in parallel
parent b3e9ba8b
......@@ -7,6 +7,7 @@ package pusher
import (
"context"
"errors"
"sync"
"time"
"github.com/ethersphere/bee/pkg/logging"
......@@ -66,6 +67,12 @@ func (s *Service) chunksWorker() {
<-s.quit
cancel()
}()
sem := make(chan struct{}, 10)
inflight := make(map[string]struct{})
var mtx sync.Mutex
LOOP:
for {
select {
// handle incoming chunks
......@@ -81,33 +88,47 @@ func (s *Service) chunksWorker() {
break
}
// postpone a retry only after we've finished processing everything in index
timer.Reset(1 * time.Second)
chunksInBatch++
s.metrics.TotalChunksToBeSentCounter.Inc()
t, err := s.tag.GetByAddress(ch.Address())
if err != nil {
s.logger.Debugf("pusher: get tag by address %s: %v", ch.Address(), err)
//continue // // until bzz api implements tags dont continue here
} else {
// update the tags only if we get it
t.Inc(tags.StateSent)
}
// Later when we process receipt, get the receipt and process it
// for now ignoring the receipt and checking only for error
_, err = s.pushSyncer.PushChunkToClosest(ctx, ch)
if err != nil {
if !errors.Is(err, topology.ErrNotFound) {
s.logger.Errorf("pusher: error while sending chunk or receiving receipt: %v", err)
select {
case sem <- struct{}{}:
case <-s.quit:
if unsubscribe != nil {
unsubscribe()
}
return
}
mtx.Lock()
if _, ok := inflight[ch.Address().String()]; ok {
mtx.Unlock()
<-sem
continue
}
// set chunk status to synced
s.setChunkAsSynced(ctx, ch.Address())
continue
// retry interval timer triggers starting from new
inflight[ch.Address().String()] = struct{}{}
mtx.Unlock()
go func(ctx context.Context, ch swarm.Chunk) {
defer func() {
s.logger.Tracef("pusher pushed chunk %s", ch.Address().String())
mtx.Lock()
delete(inflight, ch.Address().String())
mtx.Unlock()
<-sem
}()
// Later when we process receipt, get the receipt and process it
// for now ignoring the receipt and checking only for error
_, err := s.pushSyncer.PushChunkToClosest(ctx, ch)
if err != nil {
if !errors.Is(err, topology.ErrNotFound) {
s.logger.Errorf("pusher: error while sending chunk or receiving receipt: %v", err)
}
return
}
s.setChunkAsSynced(ctx, ch.Address())
}(ctx, ch)
case <-timer.C:
// initially timer is set to go off as well as every time we hit the end of push index
startTime := time.Now()
......@@ -128,8 +149,23 @@ func (s *Service) chunksWorker() {
if unsubscribe != nil {
unsubscribe()
}
return
break LOOP
}
}
// wait for all pending push operations to terminate
closeC := make(chan struct{})
go func() {
defer func() { close(closeC) }()
for i := 0; i < cap(sem); i++ {
sem <- struct{}{}
}
}()
select {
case <-closeC:
case <-time.After(2 * time.Second):
s.logger.Error("pusher shutting down with pending operations")
}
}
......@@ -137,19 +173,6 @@ func (s *Service) setChunkAsSynced(ctx context.Context, addr swarm.Address) {
if err := s.storer.Set(ctx, storage.ModeSetSyncPush, addr); err != nil {
s.logger.Errorf("pusher: error setting chunk as synced: %v", err)
s.metrics.ErrorSettingChunkToSynced.Inc()
} else {
s.metrics.TotalChunksSynced.Inc()
ta, err := s.tag.GetByAddress(addr)
if err != nil {
if !errors.Is(err, tags.ErrNotFound) {
s.logger.Debugf("pusher: get tag by address %s: %v", addr, err)
}
// return // until bzz api implements tags dont retunrn here
} else {
// update the tags only if we get it
ta.Inc(tags.StateSynced)
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment