Commit 04075a19 authored by acud's avatar acud Committed by GitHub

increase amount of concurrent jobs (#748)

parent aed7f865
...@@ -31,7 +31,10 @@ type Service struct { ...@@ -31,7 +31,10 @@ type Service struct {
chunksWorkerQuitC chan struct{} chunksWorkerQuitC chan struct{}
} }
var retryInterval = 5 * time.Second // time interval between retries var (
retryInterval = 5 * time.Second // time interval between retries
concurrentJobs = 20 // how many chunks to push simultaneously
)
func New(storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer) *Service { func New(storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer) *Service {
service := &Service{ service := &Service{
...@@ -51,23 +54,24 @@ func New(storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer ...@@ -51,23 +54,24 @@ func New(storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer
// chunksWorker is a loop that keeps looking for chunks that are locally uploaded ( by monitoring pushIndex ) // chunksWorker is a loop that keeps looking for chunks that are locally uploaded ( by monitoring pushIndex )
// and pushes them to the closest peer and get a receipt. // and pushes them to the closest peer and get a receipt.
func (s *Service) chunksWorker() { func (s *Service) chunksWorker() {
var chunks <-chan swarm.Chunk var (
var unsubscribe func() chunks <-chan swarm.Chunk
// timer, initially set to 0 to fall through select case on timer.C for initialisation unsubscribe func()
timer := time.NewTimer(0) timer = time.NewTimer(0) // timer, initially set to 0 to fall through select case on timer.C for initialisation
chunksInBatch = -1
cctx, cancel = context.WithCancel(context.Background())
ctx = cctx
sem = make(chan struct{}, concurrentJobs)
inflight = make(map[string]struct{})
mtx sync.Mutex
span opentracing.Span
)
defer timer.Stop() defer timer.Stop()
defer close(s.chunksWorkerQuitC) defer close(s.chunksWorkerQuitC)
chunksInBatch := -1
cctx, cancel := context.WithCancel(context.Background())
go func() { go func() {
<-s.quit <-s.quit
cancel() cancel()
}() }()
sem := make(chan struct{}, 10)
inflight := make(map[string]struct{})
var mtx sync.Mutex
var span opentracing.Span
ctx := cctx
LOOP: LOOP:
for { for {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment