pusher.go 4.8 KB
Newer Older
1 2 3 4 5 6 7 8
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package pusher

import (
	"context"
9
	"errors"
10
	"sync"
11 12 13 14 15 16
	"time"

	"github.com/ethersphere/bee/pkg/logging"
	"github.com/ethersphere/bee/pkg/pushsync"
	"github.com/ethersphere/bee/pkg/storage"
	"github.com/ethersphere/bee/pkg/swarm"
Zahoor Mohamed's avatar
Zahoor Mohamed committed
17
	"github.com/ethersphere/bee/pkg/tags"
18 19 20 21 22 23
	"github.com/ethersphere/bee/pkg/topology"
)

type Service struct {
	storer            storage.Storer
	pushSyncer        pushsync.PushSyncer
Zahoor Mohamed's avatar
Zahoor Mohamed committed
24
	tag               *tags.Tags
25 26 27 28 29 30 31 32 33
	logger            logging.Logger
	metrics           metrics
	quit              chan struct{}
	chunksWorkerQuitC chan struct{}
}

type Options struct {
	Storer        storage.Storer
	PeerSuggester topology.ClosestPeerer
Zahoor Mohamed's avatar
Zahoor Mohamed committed
34
	Tags          *tags.Tags
35 36 37 38 39 40 41 42 43 44
	PushSyncer    pushsync.PushSyncer
	Logger        logging.Logger
}

var retryInterval = 10 * time.Second // time interval between retries

func New(o Options) *Service {
	service := &Service{
		storer:            o.Storer,
		pushSyncer:        o.PushSyncer,
Zahoor Mohamed's avatar
Zahoor Mohamed committed
45
		tag:               o.Tags,
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
		logger:            o.Logger,
		metrics:           newMetrics(),
		quit:              make(chan struct{}),
		chunksWorkerQuitC: make(chan struct{}),
	}
	go service.chunksWorker()
	return service
}

// chunksWorker is a loop that keeps looking for chunks that are locally uploaded ( by monitoring pushIndex )
// and pushes them to the closest peer and get a receipt.
func (s *Service) chunksWorker() {
	var chunks <-chan swarm.Chunk
	var unsubscribe func()
	// timer, initially set to 0 to fall through select case on timer.C for initialisation
	timer := time.NewTimer(0)
	defer timer.Stop()
	defer close(s.chunksWorkerQuitC)
	chunksInBatch := -1
	ctx, cancel := context.WithCancel(context.Background())
	go func() {
		<-s.quit
		cancel()
	}()
70 71 72 73 74 75

	sem := make(chan struct{}, 10)
	inflight := make(map[string]struct{})
	var mtx sync.Mutex

LOOP:
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
	for {
		select {
		// handle incoming chunks
		case ch, more := <-chunks:
			// if no more, set to nil, reset timer to 0 to finalise batch immediately
			if !more {
				chunks = nil
				var dur time.Duration
				if chunksInBatch == 0 {
					dur = 500 * time.Millisecond
				}
				timer.Reset(dur)
				break
			}

91
			// postpone a retry only after we've finished processing everything in index
92
			timer.Reset(retryInterval)
93 94
			chunksInBatch++
			s.metrics.TotalChunksToBeSentCounter.Inc()
95 96 97 98 99
			select {
			case sem <- struct{}{}:
			case <-s.quit:
				if unsubscribe != nil {
					unsubscribe()
100
				}
101 102 103 104 105 106
				return
			}
			mtx.Lock()
			if _, ok := inflight[ch.Address().String()]; ok {
				mtx.Unlock()
				<-sem
107 108 109
				continue
			}

110 111 112 113
			inflight[ch.Address().String()] = struct{}{}
			mtx.Unlock()

			go func(ctx context.Context, ch swarm.Chunk) {
114
				var err error
115
				defer func() {
116 117 118 119
					if err == nil {
						// only print this if there was no error while sending the chunk
						s.logger.Tracef("pusher pushed chunk %s", ch.Address().String())
					}
120 121 122 123 124 125 126
					mtx.Lock()
					delete(inflight, ch.Address().String())
					mtx.Unlock()
					<-sem
				}()
				// Later when we process receipt, get the receipt and process it
				// for now ignoring the receipt and checking only for error
127
				_, err = s.pushSyncer.PushChunkToClosest(ctx, ch)
128 129
				if err != nil {
					if !errors.Is(err, topology.ErrNotFound) {
acud's avatar
acud committed
130
						s.logger.Debugf("pusher: error while sending chunk or receiving receipt: %v", err)
131 132 133 134 135
					}
					return
				}
				s.setChunkAsSynced(ctx, ch.Address())
			}(ctx, ch)
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
		case <-timer.C:
			// initially timer is set to go off as well as every time we hit the end of push index
			startTime := time.Now()

			// if subscribe was running, stop it
			if unsubscribe != nil {
				unsubscribe()
			}

			// and start iterating on Push index from the beginning
			chunks, unsubscribe = s.storer.SubscribePush(ctx)

			// reset timer to go off after retryInterval
			timer.Reset(retryInterval)
			s.metrics.MarkAndSweepTimer.Observe(time.Since(startTime).Seconds())

		case <-s.quit:
			if unsubscribe != nil {
				unsubscribe()
			}
156 157 158 159 160 161 162 163 164 165
			break LOOP
		}
	}

	// wait for all pending push operations to terminate
	closeC := make(chan struct{})
	go func() {
		defer func() { close(closeC) }()
		for i := 0; i < cap(sem); i++ {
			sem <- struct{}{}
166
		}
167 168 169 170 171
	}()

	select {
	case <-closeC:
	case <-time.After(2 * time.Second):
acud's avatar
acud committed
172
		s.logger.Warning("pusher shutting down with pending operations")
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
	}
}

func (s *Service) setChunkAsSynced(ctx context.Context, addr swarm.Address) {
	if err := s.storer.Set(ctx, storage.ModeSetSyncPush, addr); err != nil {
		s.logger.Errorf("pusher: error setting chunk as synced: %v", err)
		s.metrics.ErrorSettingChunkToSynced.Inc()
	}
}

func (s *Service) Close() error {
	close(s.quit)

	// Wait for chunks worker to finish
	select {
	case <-s.chunksWorkerQuitC:
	case <-time.After(3 * time.Second):
	}
	return nil
}