sequence.go 7.93 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package sequence provides implementation of sequential indexing for
// time-based feeds
// this feed type is best suited for
// - version updates
// - followed updates
// - frequent or regular-interval updates
package sequence

import (
	"context"
	"encoding/binary"
	"errors"
17
	"fmt"
18 19 20 21 22 23 24

	"github.com/ethersphere/bee/pkg/crypto"
	"github.com/ethersphere/bee/pkg/feeds"
	"github.com/ethersphere/bee/pkg/storage"
	"github.com/ethersphere/bee/pkg/swarm"
)

25 26 27 28
// DefaultLevels is the number of concurrent lookaheads
// 8 spans 2^8 updates
const DefaultLevels = 8

29 30 31 32 33
var _ feeds.Index = (*index)(nil)
var _ feeds.Lookup = (*finder)(nil)
var _ feeds.Lookup = (*asyncFinder)(nil)
var _ feeds.Updater = (*updater)(nil)

34
// index just wraps a uint64. implements the feeds.Index interface
35 36 37 38
type index struct {
	index uint64
}

39 40 41 42
func (i *index) String() string {
	return fmt.Sprintf("%d", i.index)
}

43 44 45 46 47 48
func (i *index) MarshalBinary() ([]byte, error) {
	indexBytes := make([]byte, 8)
	binary.BigEndian.PutUint64(indexBytes, i.index)
	return indexBytes, nil
}

49 50 51 52 53
// Next requires
func (i *index) Next(last int64, at uint64) feeds.Index {
	return &index{i.index + 1}
}

54
// finder encapsulates a chunk store getter and a feed and provides
55
// non-concurrent lookup
56 57 58 59
type finder struct {
	getter *feeds.Getter
}

60
// NewFinder constructs an finder (feeds.Lookup interface)
61 62 63 64 65 66
func NewFinder(getter storage.Getter, feed *feeds.Feed) feeds.Lookup {
	return &finder{feeds.NewGetter(getter, feed)}
}

// At looks up the version valid at time `at`
// after is a unix time hint of the latest known update
67
func (f *finder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, current, next feeds.Index, err error) {
68 69 70 71
	for i := uint64(0); ; i++ {
		u, err := f.getter.Get(ctx, &index{i})
		if err != nil {
			if !errors.Is(err, storage.ErrNotFound) {
72
				return nil, nil, nil, err
73
			}
74 75 76 77
			if i > 0 {
				current = &index{i - 1}
			}
			return ch, current, &index{i}, nil
78 79 80
		}
		ts, err := feeds.UpdatedAt(u)
		if err != nil {
81
			return nil, nil, nil, err
82
		}
83
		// if timestamp is later than the `at` target datetime, then return previous chunk  and index
84
		if ts > uint64(at) {
85
			return ch, &index{i - 1}, &index{i}, nil
86 87 88 89 90 91
		}
		ch = u
	}
}

// asyncFinder encapsulates a chunk store getter and a feed and provides
92
//  non-concurrent lookup
93 94 95 96 97 98 99 100 101
type asyncFinder struct {
	getter *feeds.Getter
}

// NewAsyncFinder constructs an AsyncFinder
func NewAsyncFinder(getter storage.Getter, feed *feeds.Feed) feeds.Lookup {
	return &asyncFinder{feeds.NewGetter(getter, feed)}
}

102 103 104 105 106 107 108 109 110
// interval represents a batch of concurrent retreieve requests
// that probe the interval (base,b+2^level) at offsets 2^k-1 for k=1,...,max
// recording  the level of the latest found update chunk and the earliest not found update
// the actual latest update is guessed to be within a subinterval
type interval struct {
	base     uint64  // beginning of the interval, guaranteed to have an  update
	level    int     // maximum level to check
	found    *result // the result with the latest chunk found
	notFound int     // the earliest level where no update is found
111 112
}

113 114 115 116 117 118 119 120 121 122
// when a subinterval is identified to contain the latest update
// next returns an interval matching it
func (i *interval) next() *interval {
	found := i.found.level
	i.found.level = 0
	return &interval{
		base:     i.found.index, // set base to index of latest chunk found
		level:    found,         // set max level to the latest update level
		notFound: found,         // set notFound to the latest update level
		found:    i.found,       // inherit latest found  result
123 124 125
	}
}

126 127 128 129 130 131 132 133 134
func (i *interval) retry() *interval {
	r := i.next()
	r.level = i.level    // reset to max
	r.notFound = i.level //  reset to max
	return r
}

func newInterval(base uint64) *interval {
	return &interval{base: base, level: DefaultLevels, notFound: DefaultLevels}
135 136
}

137
// results capture a chunk lookup on a interval
138
type result struct {
139 140 141 142
	chunk    swarm.Chunk // the chunk found
	interval *interval   // the interval it belongs to
	level    int         // the level within the interval
	index    uint64      // the actual seqeuence index of the update
143 144 145 146
}

// At looks up the version valid at time `at`
// after is a unix time hint of the latest known update
147
func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, cur, next feeds.Index, err error) {
148 149
	// first lookup update at the 0 index
	ch, err = f.get(ctx, at, 0)
150
	if err != nil {
151
		return nil, nil, nil, err
152 153
	}
	if ch == nil {
154
		return nil, nil, &index{0}, nil
155
	}
156 157 158 159 160
	// if chunk exists construct an initial interval with base=0
	c := make(chan *result)
	i := newInterval(0)
	i.found = &result{ch, nil, 0, 0}

161 162
	quit := make(chan struct{})
	defer close(quit)
163 164 165

	// launch concurrent request at  doubling intervals
	go f.at(ctx, at, 0, i, c, quit)
166
	for r := range c {
167 168
		// collect the results into the interval
		i = r.interval
169
		if r.chunk == nil {
170
			if i.notFound < r.level {
171 172
				continue
			}
173
			i.notFound = r.level - 1
174
		} else {
175
			if i.found.level > r.level {
176 177
				continue
			}
178 179 180 181 182 183
			// if a chunk is found on the max level, and this is already a subinterval
			// then found.index+1 is already known to be not found
			if i.level == r.level && r.level < DefaultLevels {
				return r.chunk, &index{r.index}, &index{r.index + 1}, nil
			}
			i.found = r
184
		}
185 186 187 188 189
		// below applies even  if  i.latest==ceilingLevel in which case we just continue with
		// DefaultLevel lookaheads
		if i.found.level == i.notFound {
			if i.found.level == 0 {
				return i.found.chunk, &index{i.found.index}, &index{i.found.index + 1}, nil
190
			}
191 192 193 194 195
			go f.at(ctx, at, 0, i.next(), c, quit)
		}
		// inconsistent feed, retry
		if i.notFound < i.found.level {
			go f.at(ctx, at, i.found.level, i.retry(), c, quit)
196 197
		}
	}
198
	return nil, nil, nil, nil
199 200
}

201 202 203 204
// at launches concurrent lookups at exponential intervals after th c starting from further
func (f *asyncFinder) at(ctx context.Context, at int64, min int, i *interval, c chan<- *result, quit <-chan struct{}) {
	stop := make(chan struct{}, 1)
	for l := i.level; l > min; l-- {
205
		select {
206
		case <-stop: // if a chunk is found
207
			return
208
		case <-quit: // if the parent process quit
209 210 211
			return
		default:
		}
212 213 214
		go func(l int) {
			index := i.base + (1 << l) - 1
			ch, err := f.get(ctx, at, index)
215 216 217
			if err != nil {
				return
			}
218 219 220 221 222 223 224
			// if a chunk is found, stop the iterationq
			if ch != nil {
				select {
				case stop <- struct{}{}:
				default:
				}
			}
225
			select {
226
			case c <- &result{ch, i, l, index}:
227 228
			case <-quit:
			}
229
		}(l)
230 231 232
	}
}

233 234 235
// get performs a lookup of an update chunk, returns nil (not error) if not found
func (f *asyncFinder) get(ctx context.Context, at int64, idx uint64) (swarm.Chunk, error) {
	u, err := f.getter.Get(ctx, &index{idx})
236 237
	if err != nil {
		if !errors.Is(err, storage.ErrNotFound) {
238
			return nil, err
239 240
		}
		// if 'not-found' error, then just silence and return nil chunk
241
		return nil, nil
242 243 244
	}
	ts, err := feeds.UpdatedAt(u)
	if err != nil {
245
		return nil, err
246 247 248
	}
	// this means the update timestamp is later than the pivot time we are looking for
	// handled as if the update was missing but with no uncertainty due to timeout
249 250
	if at < int64(ts) {
		return nil, nil
251
	}
252
	return u, nil
253 254 255 256 257 258 259 260 261 262
}

// updater encapsulates a feeds putter to generate successive updates for epoch based feeds
// it persists the last update
type updater struct {
	*feeds.Putter
	next uint64
}

// NewUpdater constructs a feed updater
263
func NewUpdater(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error) {
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
	p, err := feeds.NewPutter(putter, signer, topic)
	if err != nil {
		return nil, err
	}
	return &updater{Putter: p}, nil
}

// Update pushes an update to the feed through the chunk stores
func (u *updater) Update(ctx context.Context, at int64, payload []byte) error {
	err := u.Put(ctx, &index{u.next}, at, payload)
	if err != nil {
		return err
	}
	u.next++
	return nil
}

func (u *updater) Feed() *feeds.Feed {
	return u.Putter.Feed
}