pullstorage.go 4.37 KB
Newer Older
1 2 3 4 5 6 7 8 9
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package pullstorage

import (
	"context"
	"errors"
10
	"fmt"
11 12 13 14
	"time"

	"github.com/ethersphere/bee/pkg/storage"
	"github.com/ethersphere/bee/pkg/swarm"
15
	"resenje.org/singleflight"
16 17 18
)

var (
19
	_ Storer = (*PullStorer)(nil)
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
	// ErrDbClosed is used to signal the underlying database was closed
	ErrDbClosed = errors.New("db closed")

	// after how long to return a non-empty batch
	batchTimeout = 500 * time.Millisecond
)

// Storer is a thin wrapper around storage.Storer.
// It is used in order to collect and provide information about chunks
// currently present in the local store.
type Storer interface {
	// IntervalChunks collects chunk for a requested interval.
	IntervalChunks(ctx context.Context, bin uint8, from, to uint64, limit int) (chunks []swarm.Address, topmost uint64, err error)
	// Cursors gets the last BinID for every bin in the local storage
	Cursors(ctx context.Context) ([]uint64, error)
	// Get chunks.
	Get(ctx context.Context, mode storage.ModeGet, addrs ...swarm.Address) ([]swarm.Chunk, error)
	// Put chunks.
	Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) error
	// Set chunks.
	Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) error
	// Has chunks.
	Has(ctx context.Context, addr swarm.Address) (bool, error)
}

45 46
// PullStorer wraps storage.Storer.
type PullStorer struct {
47
	storage.Storer
48
	intervalsSF singleflight.Group
49
	metrics     metrics
50 51 52
}

// New returns a new pullstorage Storer instance.
53 54 55 56
func New(storer storage.Storer) *PullStorer {
	return &PullStorer{
		Storer:  storer,
		metrics: newMetrics(),
57 58 59 60
	}
}

// IntervalChunks collects chunk for a requested interval.
61
func (s *PullStorer) IntervalChunks(ctx context.Context, bin uint8, from, to uint64, limit int) (chs []swarm.Address, topmost uint64, err error) {
62

63 64 65 66
	type result struct {
		chs     []swarm.Address
		topmost uint64
	}
67 68
	s.metrics.TotalSubscribePullRequests.Inc()
	defer s.metrics.TotalSubscribePullRequestsComplete.Inc()
69

70 71 72 73 74 75 76
	v, _, err := s.intervalsSF.Do(ctx, fmt.Sprintf("%v-%v-%v-%v", bin, from, to, limit), func(ctx context.Context) (interface{}, error) {
		// call iterator, iterate either until upper bound or limit reached
		// return addresses, topmost is the topmost bin ID
		var (
			timer  *time.Timer
			timerC <-chan time.Time
		)
77
		s.metrics.SubscribePullsStarted.Inc()
78 79 80 81 82
		ch, dbClosed, stop := s.SubscribePull(ctx, bin, from, to)
		defer func(start time.Time) {
			stop()
			if timer != nil {
				timer.Stop()
83
			}
84
			s.metrics.SubscribePullsComplete.Inc()
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
		}(time.Now())

		var nomore bool

	LOOP:
		for limit > 0 {
			select {
			case v, ok := <-ch:
				if !ok {
					nomore = true
					break LOOP
				}
				chs = append(chs, v.Address)
				if v.BinID > topmost {
					topmost = v.BinID
				}
				limit--
				if timer == nil {
					timer = time.NewTimer(batchTimeout)
				} else {
					if !timer.Stop() {
						<-timer.C
					}
					timer.Reset(batchTimeout)
109
				}
110 111 112 113 114 115
				timerC = timer.C
			case <-ctx.Done():
				return nil, ctx.Err()
			case <-timerC:
				// return batch if new chunks are not received after some time
				break LOOP
116
			}
117 118 119
		}

		select {
120
		case <-ctx.Done():
121 122 123 124
			return nil, ctx.Err()
		case <-dbClosed:
			return nil, ErrDbClosed
		default:
125 126
		}

127 128 129 130 131 132
		if nomore {
			// end of interval reached. no more chunks so interval is complete
			// return requested `to`. it could be that len(chs) == 0 if the interval
			// is empty
			topmost = to
		}
133

134 135
		return &result{chs: chs, topmost: topmost}, nil
	})
136

137
	if err != nil {
138
		s.metrics.SubscribePullsFailures.Inc()
139 140 141 142
		return nil, 0, err
	}
	r := v.(*result)
	return r.chs, r.topmost, nil
143 144 145
}

// Cursors gets the last BinID for every bin in the local storage
146
func (s *PullStorer) Cursors(ctx context.Context) (curs []uint64, err error) {
147 148
	curs = make([]uint64, swarm.MaxBins)
	for i := uint8(0); i < swarm.MaxBins; i++ {
149 150 151 152 153 154 155 156 157 158
		binID, err := s.Storer.LastPullSubscriptionBinID(i)
		if err != nil {
			return nil, err
		}
		curs[i] = binID
	}
	return curs, nil
}

// Get chunks.
159
func (s *PullStorer) Get(ctx context.Context, mode storage.ModeGet, addrs ...swarm.Address) ([]swarm.Chunk, error) {
160 161 162 163
	return s.Storer.GetMulti(ctx, mode, addrs...)
}

// Put chunks.
164
func (s *PullStorer) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) error {
165 166 167
	_, err := s.Storer.Put(ctx, mode, chs...)
	return err
}