Commit b4b6245d authored by Viktor Trón's avatar Viktor Trón Committed by GitHub

feeds: fix sequence feed resolution, drop timebase heuristic (#1259)

Co-authored-by: default avatarMetacertain <metacertain@gmail.com>
parent c4338dad
...@@ -271,3 +271,11 @@ type id struct { ...@@ -271,3 +271,11 @@ type id struct {
func (i *id) MarshalBinary() ([]byte, error) { func (i *id) MarshalBinary() ([]byte, error) {
return []byte("accd"), nil return []byte("accd"), nil
} }
func (i *id) String() string {
return "44237"
}
func (*id) Next(last int64, at uint64) feeds.Index {
return &id{}
}
...@@ -8,6 +8,7 @@ package epochs ...@@ -8,6 +8,7 @@ package epochs
import ( import (
"encoding/binary" "encoding/binary"
"fmt"
"github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds" "github.com/ethersphere/bee/pkg/feeds"
...@@ -19,12 +20,17 @@ const ( ...@@ -19,12 +20,17 @@ const (
var _ feeds.Index = (*epoch)(nil) var _ feeds.Index = (*epoch)(nil)
// epoch is referencing a slot in the epoch grid // epoch is referencing a slot in the epoch grid and represents an update
// it implements the feeds.Index interface
type epoch struct { type epoch struct {
start uint64 start uint64
level uint8 level uint8
} }
func (e *epoch) String() string {
return fmt.Sprintf("%d/%d", e.start, e.level)
}
// MarshalBinary implements the BinaryMarshaler interface // MarshalBinary implements the BinaryMarshaler interface
func (e *epoch) MarshalBinary() ([]byte, error) { func (e *epoch) MarshalBinary() ([]byte, error) {
epochBytes := make([]byte, 8) epochBytes := make([]byte, 8)
...@@ -32,6 +38,21 @@ func (e *epoch) MarshalBinary() ([]byte, error) { ...@@ -32,6 +38,21 @@ func (e *epoch) MarshalBinary() ([]byte, error) {
return crypto.LegacyKeccak256(append(epochBytes, e.level)) return crypto.LegacyKeccak256(append(epochBytes, e.level))
} }
func next(e feeds.Index, last int64, at uint64) feeds.Index {
if e == nil {
return &epoch{0, maxLevel}
}
return e.Next(last, at)
}
// Next implements feeds.Index advancement
func (e *epoch) Next(last int64, at uint64) feeds.Index {
if e.start+e.length() > at {
return e.childAt(at)
}
return lca(int64(at), last).childAt(at)
}
// lca calculates the lowest common ancestor epoch given two unix times // lca calculates the lowest common ancestor epoch given two unix times
func lca(at, after int64) *epoch { func lca(at, after int64) *epoch {
if after == 0 { if after == 0 {
...@@ -48,16 +69,6 @@ func lca(at, after int64) *epoch { ...@@ -48,16 +69,6 @@ func lca(at, after int64) *epoch {
return &epoch{start, level} return &epoch{start, level}
} }
func next(e *epoch, last int64, at uint64) *epoch {
if e == nil {
return &epoch{0, maxLevel}
}
if e.start+e.length() > at {
return e.childAt(at)
}
return lca(int64(at), last).childAt(at)
}
// parent returns the ancestor of an epoch // parent returns the ancestor of an epoch
// the call is unsafe in that it must not be called on a toplevel epoch // the call is unsafe in that it must not be called on a toplevel epoch
func (e *epoch) parent() *epoch { func (e *epoch) parent() *epoch {
......
...@@ -6,44 +6,21 @@ package epochs_test ...@@ -6,44 +6,21 @@ package epochs_test
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"math/rand"
"testing" "testing"
"time"
"github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds" "github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/feeds/epochs" "github.com/ethersphere/bee/pkg/feeds/epochs"
"github.com/ethersphere/bee/pkg/storage" feedstesting "github.com/ethersphere/bee/pkg/feeds/testing"
"github.com/ethersphere/bee/pkg/storage/mock" "github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
) )
type timeout struct {
storage.Storer
}
var searchTimeout = 30 * time.Millisecond
// Get overrides the mock storer and introduces latency
func (t *timeout) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (swarm.Chunk, error) {
ch, err := t.Storer.Get(ctx, mode, addr)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
time.Sleep(searchTimeout)
}
return ch, err
}
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
return ch, nil
}
func BenchmarkFinder(b *testing.B) { func BenchmarkFinder(b *testing.B) {
for _, i := range []int{0, 8, 30} { for _, i := range []int{0, 8, 30} {
for _, prefill := range []int64{1, 50} { for _, prefill := range []int64{1, 50} {
after := int64(50) after := int64(50)
storer := &timeout{mock.NewStorer()} storer := &feedstesting.Timeout{Storer: mock.NewStorer()}
topicStr := "testtopic" topicStr := "testtopic"
topic, err := crypto.LegacyKeccak256([]byte(topicStr)) topic, err := crypto.LegacyKeccak256([]byte(topicStr))
if err != nil { if err != nil {
......
...@@ -20,8 +20,13 @@ func TestFinder(t *testing.T) { ...@@ -20,8 +20,13 @@ func TestFinder(t *testing.T) {
t.Run("basic", func(t *testing.T) { t.Run("basic", func(t *testing.T) {
feedstesting.TestFinderBasic(t, finderf, updaterf) feedstesting.TestFinderBasic(t, finderf, updaterf)
}) })
i := int64(0)
nextf := func() (bool, int64) {
defer func() { i++ }()
return i < 50, i
}
t.Run("fixed", func(t *testing.T) { t.Run("fixed", func(t *testing.T) {
feedstesting.TestFinderFixIntervals(t, finderf, updaterf) feedstesting.TestFinderFixIntervals(t, nextf, finderf, updaterf)
}) })
t.Run("random", func(t *testing.T) { t.Run("random", func(t *testing.T) {
feedstesting.TestFinderRandomIntervals(t, finderf, updaterf) feedstesting.TestFinderRandomIntervals(t, finderf, updaterf)
......
...@@ -19,7 +19,7 @@ var _ feeds.Updater = (*updater)(nil) ...@@ -19,7 +19,7 @@ var _ feeds.Updater = (*updater)(nil)
type updater struct { type updater struct {
*feeds.Putter *feeds.Putter
last int64 last int64
epoch *epoch epoch feeds.Index
} }
// NewUpdater constructs a feed updater // NewUpdater constructs a feed updater
......
...@@ -28,6 +28,7 @@ type Factory interface { ...@@ -28,6 +28,7 @@ type Factory interface {
NewLookup(Type, *Feed) (Lookup, error) NewLookup(Type, *Feed) (Lookup, error)
} }
// Type enumerates the time-based feed types
type Type int type Type int
const ( const (
...@@ -46,6 +47,7 @@ func (t Type) String() string { ...@@ -46,6 +47,7 @@ func (t Type) String() string {
} }
} }
// FromString constructs the type from a string
func (t *Type) FromString(s string) error { func (t *Type) FromString(s string) error {
switch s = strings.ToLower(s); s { switch s = strings.ToLower(s); s {
case "sequence": case "sequence":
...@@ -66,7 +68,7 @@ type id struct { ...@@ -66,7 +68,7 @@ type id struct {
var _ encoding.BinaryMarshaler = (*id)(nil) var _ encoding.BinaryMarshaler = (*id)(nil)
func (i *id) MarshalBinary() ([]byte, error) { func (i *id) MarshalBinary() ([]byte, error) {
return crypto.LegacyKeccak256(append(i.topic, i.index...)) return crypto.LegacyKeccak256(append(append([]byte{}, i.topic...), i.index...))
} }
// Feed is representing an epoch based feed // Feed is representing an epoch based feed
...@@ -84,6 +86,8 @@ func New(topic []byte, owner common.Address) *Feed { ...@@ -84,6 +86,8 @@ func New(topic []byte, owner common.Address) *Feed {
// Index is the interface for feed implementations. // Index is the interface for feed implementations.
type Index interface { type Index interface {
encoding.BinaryMarshaler encoding.BinaryMarshaler
Next(last int64, at uint64) Index
fmt.Stringer
} }
// Update represents an update instance of a feed, i.e., pairing of a Feed with an Epoch // Update represents an update instance of a feed, i.e., pairing of a Feed with an Epoch
...@@ -97,6 +101,7 @@ func (f *Feed) Update(index Index) *Update { ...@@ -97,6 +101,7 @@ func (f *Feed) Update(index Index) *Update {
return &Update{f, index} return &Update{f, index}
} }
// NewUpdate creates an update from an index, timestamp, payload and signature
func NewUpdate(f *Feed, idx Index, timestamp int64, payload []byte, sig []byte) (swarm.Chunk, error) { func NewUpdate(f *Feed, idx Index, timestamp int64, payload []byte, sig []byte) (swarm.Chunk, error) {
id, err := f.Update(idx).Id() id, err := f.Update(idx).Id()
if err != nil { if err != nil {
...@@ -119,12 +124,18 @@ func NewUpdate(f *Feed, idx Index, timestamp int64, payload []byte, sig []byte) ...@@ -119,12 +124,18 @@ func NewUpdate(f *Feed, idx Index, timestamp int64, payload []byte, sig []byte)
// Id calculates the identifier if a feed update to be used in single owner chunks // Id calculates the identifier if a feed update to be used in single owner chunks
func (u *Update) Id() ([]byte, error) { func (u *Update) Id() ([]byte, error) {
index, err := u.index.MarshalBinary() return Id(u.Topic, u.index)
}
// Id calculates the feed id from a topic and an index
func Id(topic []byte, index Index) ([]byte, error) {
indexBytes, err := index.MarshalBinary()
if err != nil { if err != nil {
return nil, err return nil, err
} }
i := &id{u.Topic, index} i := &id{topic, indexBytes}
return i.MarshalBinary() return i.MarshalBinary()
} }
// Address calculates the soc address of a feed update // Address calculates the soc address of a feed update
......
...@@ -6,42 +6,19 @@ package sequence_test ...@@ -6,42 +6,19 @@ package sequence_test
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"math/rand"
"testing" "testing"
"time"
"github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds" "github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/feeds/sequence" "github.com/ethersphere/bee/pkg/feeds/sequence"
"github.com/ethersphere/bee/pkg/storage" feedstesting "github.com/ethersphere/bee/pkg/feeds/testing"
"github.com/ethersphere/bee/pkg/storage/mock" "github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
) )
type timeout struct {
storage.Storer
}
var searchTimeout = 30 * time.Millisecond
// Get overrides the mock storer and introduces latency
func (t *timeout) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (swarm.Chunk, error) {
ch, err := t.Storer.Get(ctx, mode, addr)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
time.Sleep(searchTimeout)
}
return ch, err
}
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
return ch, nil
}
func BenchmarkFinder(b *testing.B) { func BenchmarkFinder(b *testing.B) {
for _, prefill := range []int64{1, 100, 1000, 5000} { for _, prefill := range []int64{1, 100, 1000, 5000} {
storer := &timeout{mock.NewStorer()} storer := &feedstesting.Timeout{Storer: mock.NewStorer()}
topicStr := "testtopic" topicStr := "testtopic"
topic, err := crypto.LegacyKeccak256([]byte(topicStr)) topic, err := crypto.LegacyKeccak256([]byte(topicStr))
if err != nil { if err != nil {
......
...@@ -19,8 +19,13 @@ func TestFinder(t *testing.T) { ...@@ -19,8 +19,13 @@ func TestFinder(t *testing.T) {
t.Run("basic", func(t *testing.T) { t.Run("basic", func(t *testing.T) {
feedstesting.TestFinderBasic(t, finderf, updaterf) feedstesting.TestFinderBasic(t, finderf, updaterf)
}) })
i := 0
nextf := func() (bool, int64) {
i++
return i == 40, int64(i)
}
t.Run("fixed", func(t *testing.T) { t.Run("fixed", func(t *testing.T) {
feedstesting.TestFinderFixIntervals(t, finderf, updaterf) feedstesting.TestFinderFixIntervals(t, nextf, finderf, updaterf)
}) })
t.Run("random", func(t *testing.T) { t.Run("random", func(t *testing.T) {
feedstesting.TestFinderRandomIntervals(t, finderf, updaterf) feedstesting.TestFinderRandomIntervals(t, finderf, updaterf)
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt"
"github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds" "github.com/ethersphere/bee/pkg/feeds"
...@@ -21,28 +22,42 @@ import ( ...@@ -21,28 +22,42 @@ import (
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
) )
// DefaultLevels is the number of concurrent lookaheads
// 8 spans 2^8 updates
const DefaultLevels = 8
var _ feeds.Index = (*index)(nil) var _ feeds.Index = (*index)(nil)
var _ feeds.Lookup = (*finder)(nil) var _ feeds.Lookup = (*finder)(nil)
var _ feeds.Lookup = (*asyncFinder)(nil) var _ feeds.Lookup = (*asyncFinder)(nil)
var _ feeds.Updater = (*updater)(nil) var _ feeds.Updater = (*updater)(nil)
// index just wraps a uint64. implements the feeds.Index interface
type index struct { type index struct {
index uint64 index uint64
} }
func (i *index) String() string {
return fmt.Sprintf("%d", i.index)
}
func (i *index) MarshalBinary() ([]byte, error) { func (i *index) MarshalBinary() ([]byte, error) {
indexBytes := make([]byte, 8) indexBytes := make([]byte, 8)
binary.BigEndian.PutUint64(indexBytes, i.index) binary.BigEndian.PutUint64(indexBytes, i.index)
return indexBytes, nil return indexBytes, nil
} }
// Next requires
func (i *index) Next(last int64, at uint64) feeds.Index {
return &index{i.index + 1}
}
// finder encapsulates a chunk store getter and a feed and provides // finder encapsulates a chunk store getter and a feed and provides
// non-concurrent lookup methods // non-concurrent lookup
type finder struct { type finder struct {
getter *feeds.Getter getter *feeds.Getter
} }
// NewFinder constructs an Finder // NewFinder constructs an finder (feeds.Lookup interface)
func NewFinder(getter storage.Getter, feed *feeds.Feed) feeds.Lookup { func NewFinder(getter storage.Getter, feed *feeds.Feed) feeds.Lookup {
return &finder{feeds.NewGetter(getter, feed)} return &finder{feeds.NewGetter(getter, feed)}
} }
...@@ -56,21 +71,25 @@ func (f *finder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, curre ...@@ -56,21 +71,25 @@ func (f *finder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, curre
if !errors.Is(err, storage.ErrNotFound) { if !errors.Is(err, storage.ErrNotFound) {
return nil, nil, nil, err return nil, nil, nil, err
} }
return ch, &index{i - 1}, &index{i}, nil if i > 0 {
current = &index{i - 1}
}
return ch, current, &index{i}, nil
} }
ts, err := feeds.UpdatedAt(u) ts, err := feeds.UpdatedAt(u)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
// if timestamp is later than the `at` target datetime, then return previous chunk and index
if ts > uint64(at) { if ts > uint64(at) {
return ch, &index{i}, nil, nil return ch, &index{i - 1}, &index{i}, nil
} }
ch = u ch = u
} }
} }
// asyncFinder encapsulates a chunk store getter and a feed and provides // asyncFinder encapsulates a chunk store getter and a feed and provides
// non-concurrent lookup methods // non-concurrent lookup
type asyncFinder struct { type asyncFinder struct {
getter *feeds.Getter getter *feeds.Getter
} }
...@@ -80,132 +99,157 @@ func NewAsyncFinder(getter storage.Getter, feed *feeds.Feed) feeds.Lookup { ...@@ -80,132 +99,157 @@ func NewAsyncFinder(getter storage.Getter, feed *feeds.Feed) feeds.Lookup {
return &asyncFinder{feeds.NewGetter(getter, feed)} return &asyncFinder{feeds.NewGetter(getter, feed)}
} }
type path struct { // interval represents a batch of concurrent retreieve requests
latest result // that probe the interval (base,b+2^level) at offsets 2^k-1 for k=1,...,max
base uint64 // recording the level of the latest found update chunk and the earliest not found update
level int // the actual latest update is guessed to be within a subinterval
cancel chan struct{} type interval struct {
cancelled bool base uint64 // beginning of the interval, guaranteed to have an update
level int // maximum level to check
found *result // the result with the latest chunk found
notFound int // the earliest level where no update is found
} }
func (p *path) close() { // when a subinterval is identified to contain the latest update
if !p.cancelled { // next returns an interval matching it
close(p.cancel) func (i *interval) next() *interval {
p.cancelled = true found := i.found.level
i.found.level = 0
return &interval{
base: i.found.index, // set base to index of latest chunk found
level: found, // set max level to the latest update level
notFound: found, // set notFound to the latest update level
found: i.found, // inherit latest found result
} }
} }
func newPath(base uint64) *path { func (i *interval) retry() *interval {
return &path{base: base, cancel: make(chan struct{})} r := i.next()
r.level = i.level // reset to max
r.notFound = i.level // reset to max
return r
}
func newInterval(base uint64) *interval {
return &interval{base: base, level: DefaultLevels, notFound: DefaultLevels}
} }
// results capture a chunk lookup on a interval
type result struct { type result struct {
chunk swarm.Chunk chunk swarm.Chunk // the chunk found
path *path interval *interval // the interval it belongs to
level int level int // the level within the interval
seq uint64 index uint64 // the actual seqeuence index of the update
diff int64
} }
// At looks up the version valid at time `at` // At looks up the version valid at time `at`
// after is a unix time hint of the latest known update // after is a unix time hint of the latest known update
func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, cur, next feeds.Index, err error) { func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, cur, next feeds.Index, err error) {
ch, diff, err := f.get(ctx, at, 0) // first lookup update at the 0 index
ch, err = f.get(ctx, at, 0)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
if ch == nil { if ch == nil {
return nil, nil, nil, nil return nil, nil, &index{0}, nil
}
if diff == 0 {
return ch, &index{0}, &index{1}, nil
}
c := make(chan result)
p := newPath(0)
p.latest.chunk = ch
for p.level = 1; diff>>p.level > 0; p.level++ {
} }
// if chunk exists construct an initial interval with base=0
c := make(chan *result)
i := newInterval(0)
i.found = &result{ch, nil, 0, 0}
quit := make(chan struct{}) quit := make(chan struct{})
defer close(quit) defer close(quit)
go f.at(ctx, at, p, c, quit)
// launch concurrent request at doubling intervals
go f.at(ctx, at, 0, i, c, quit)
for r := range c { for r := range c {
p = r.path // collect the results into the interval
i = r.interval
if r.chunk == nil { if r.chunk == nil {
if r.level == 0 { if i.notFound < r.level {
return p.latest.chunk, &index{p.latest.seq}, &index{p.latest.seq + 1}, nil
}
if p.level < r.level {
continue continue
} }
p.level = r.level - 1 i.notFound = r.level - 1
} else { } else {
if r.diff == 0 { if i.found.level > r.level {
return r.chunk, &index{r.seq}, &index{r.seq + 1}, nil
}
if p.latest.level > r.level {
continue continue
} }
p.close() // if a chunk is found on the max level, and this is already a subinterval
p.latest = r // then found.index+1 is already known to be not found
if i.level == r.level && r.level < DefaultLevels {
return r.chunk, &index{r.index}, &index{r.index + 1}, nil
}
i.found = r
} }
// below applies even if p.latest==maxLevel // below applies even if i.latest==ceilingLevel in which case we just continue with
if p.latest.level == p.level { // DefaultLevel lookaheads
if p.level == 0 { if i.found.level == i.notFound {
return p.latest.chunk, &index{p.latest.seq}, &index{p.latest.seq + 1}, nil if i.found.level == 0 {
return i.found.chunk, &index{i.found.index}, &index{i.found.index + 1}, nil
} }
p.close() go f.at(ctx, at, 0, i.next(), c, quit)
np := newPath(p.latest.seq) }
np.level = p.level // inconsistent feed, retry
np.latest.chunk = p.latest.chunk if i.notFound < i.found.level {
go f.at(ctx, at, np, c, quit) go f.at(ctx, at, i.found.level, i.retry(), c, quit)
} }
} }
return nil, nil, nil, nil return nil, nil, nil, nil
} }
func (f *asyncFinder) at(ctx context.Context, at int64, p *path, c chan<- result, quit <-chan struct{}) { // at launches concurrent lookups at exponential intervals after th c starting from further
for i := p.level; i > 0; i-- { func (f *asyncFinder) at(ctx context.Context, at int64, min int, i *interval, c chan<- *result, quit <-chan struct{}) {
stop := make(chan struct{}, 1)
for l := i.level; l > min; l-- {
select { select {
case <-p.cancel: case <-stop: // if a chunk is found
return return
case <-quit: case <-quit: // if the parent process quit
return return
default: default:
} }
go func(i int) { go func(l int) {
seq := p.base + (1 << i) - 1 index := i.base + (1 << l) - 1
ch, diff, err := f.get(ctx, at, seq) ch, err := f.get(ctx, at, index)
if err != nil { if err != nil {
return return
} }
// if a chunk is found, stop the iterationq
if ch != nil {
select {
case stop <- struct{}{}:
default:
}
}
select { select {
case c <- result{ch, p, i, seq, diff}: case c <- &result{ch, i, l, index}:
case <-quit: case <-quit:
} }
}(i) }(l)
} }
} }
func (f *asyncFinder) get(ctx context.Context, at int64, seq uint64) (swarm.Chunk, int64, error) { // get performs a lookup of an update chunk, returns nil (not error) if not found
u, err := f.getter.Get(ctx, &index{seq}) func (f *asyncFinder) get(ctx context.Context, at int64, idx uint64) (swarm.Chunk, error) {
u, err := f.getter.Get(ctx, &index{idx})
if err != nil { if err != nil {
if !errors.Is(err, storage.ErrNotFound) { if !errors.Is(err, storage.ErrNotFound) {
return nil, 0, err return nil, err
} }
// if 'not-found' error, then just silence and return nil chunk // if 'not-found' error, then just silence and return nil chunk
return nil, 0, nil return nil, nil
} }
ts, err := feeds.UpdatedAt(u) ts, err := feeds.UpdatedAt(u)
if err != nil { if err != nil {
return nil, 0, err return nil, err
} }
diff := at - int64(ts)
// this means the update timestamp is later than the pivot time we are looking for // this means the update timestamp is later than the pivot time we are looking for
// handled as if the update was missing but with no uncertainty due to timeout // handled as if the update was missing but with no uncertainty due to timeout
if diff < 0 { if at < int64(ts) {
return nil, 0, nil return nil, nil
} }
return u, diff, nil return u, nil
} }
// updater encapsulates a feeds putter to generate successive updates for epoch based feeds // updater encapsulates a feeds putter to generate successive updates for epoch based feeds
......
...@@ -8,6 +8,8 @@ package testing ...@@ -8,6 +8,8 @@ package testing
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/binary"
"errors"
"fmt" "fmt"
"math/rand" "math/rand"
"testing" "testing"
...@@ -17,10 +19,30 @@ import ( ...@@ -17,10 +19,30 @@ import (
"github.com/ethersphere/bee/pkg/feeds" "github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock" "github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
) )
type Timeout struct {
storage.Storer
}
var searchTimeout = 30 * time.Millisecond
// Get overrides the mock storer and introduces latency
func (t *Timeout) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (swarm.Chunk, error) {
ch, err := t.Storer.Get(ctx, mode, addr)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
time.Sleep(searchTimeout)
}
return ch, err
}
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
return ch, nil
}
func TestFinderBasic(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) { func TestFinderBasic(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) {
storer := mock.NewStorer() storer := &Timeout{mock.NewStorer()}
topicStr := "testtopic" topicStr := "testtopic"
topic, err := crypto.LegacyKeccak256([]byte(topicStr)) topic, err := crypto.LegacyKeccak256([]byte(topicStr))
if err != nil { if err != nil {
...@@ -73,128 +95,114 @@ func TestFinderBasic(t *testing.T, finderf func(storage.Getter, *feeds.Feed) fee ...@@ -73,128 +95,114 @@ func TestFinderBasic(t *testing.T, finderf func(storage.Getter, *feeds.Feed) fee
}) })
} }
func TestFinderFixIntervals(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) { func TestFinderFixIntervals(t *testing.T, nextf func() (bool, int64), finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) {
for _, tc := range []struct { var stop bool
count int64 for j := 10; !stop; j += 10 {
step int64 t.Run(fmt.Sprintf("custom intervals up to %d", j), func(t *testing.T) {
offset int64 var i int64
}{ var n int
{50, 1, 0}, f := func() (bool, int64) {
{50, 1, 10000}, n++
{50, 100, 0}, stop, i = nextf()
{50, 100, 100000}, return n == j || stop, i
} { }
t.Run(fmt.Sprintf("count=%d,step=%d,offset=%d", tc.count, tc.step, tc.offset), func(t *testing.T) { TestFinderIntervals(t, f, finderf, updaterf)
storer := mock.NewStorer() })
topicStr := "testtopic" }
topic, err := crypto.LegacyKeccak256([]byte(topicStr)) }
func TestFinderIntervals(t *testing.T, nextf func() (bool, int64), finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) {
storer := &Timeout{mock.NewStorer()}
topicStr := "testtopic"
topic, err := crypto.LegacyKeccak256([]byte(topicStr))
if err != nil {
t.Fatal(err)
}
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)
updater, err := updaterf(storer, signer, topic)
if err != nil {
t.Fatal(err)
}
finder := finderf(storer, updater.Feed())
ctx := context.Background()
var ats []int64
for stop, at := nextf(); !stop; stop, at = nextf() {
ats = append(ats, at)
payload := make([]byte, 8)
binary.BigEndian.PutUint64(payload, uint64(at))
err = updater.Update(ctx, at, payload)
if err != nil {
t.Fatal(err)
}
}
for j := 0; j < len(ats)-1; j++ {
at := ats[j]
diff := ats[j+1] - at
for now := at; now < ats[j+1]; now += int64(rand.Intn(int(diff)) + 1) {
after := int64(0)
ch, current, next, err := finder.At(ctx, now, after)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
pk, _ := crypto.GenerateSecp256k1Key() if ch == nil {
signer := crypto.NewDefaultSigner(pk) t.Fatalf("expected to find update, got none")
}
updater, err := updaterf(storer, signer, topic) ts, payload, err := feeds.FromChunk(ch)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ctx := context.Background() content := binary.BigEndian.Uint64(payload)
if content != uint64(at) {
t.Fatalf("payload mismatch: expected %v, got %v", at, content)
}
payload := []byte("payload") if ts != uint64(at) {
for at := tc.offset; at < tc.offset+tc.count*tc.step; at += tc.step { t.Fatalf("timestamp mismatch: expected %v, got %v", at, ts)
err = updater.Update(ctx, at, payload) }
if current != nil {
expectedId := ch.Data()[:32]
id, err := feeds.Id(topic, current)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if !bytes.Equal(id, expectedId) {
t.Fatalf("current mismatch: expected %x, got %x", expectedId, id)
}
} }
finder := finderf(storer, updater.Feed()) if next != nil {
for at := tc.offset; at < tc.offset+tc.count*tc.step; at += tc.step { expectedNext := current.Next(at, uint64(now))
for after := tc.offset; after < at; after += tc.step { expectedIdx, err := expectedNext.MarshalBinary()
step := int64(1) if err != nil {
if tc.step > 1 { t.Fatal(err)
step = tc.step / 4 }
} idx, err := next.MarshalBinary()
for now := at; now < at+tc.step; now += step { if err != nil {
ch, _, _, err := finder.At(ctx, now, after) t.Fatal(err)
if err != nil { }
t.Fatal(err) if !bytes.Equal(idx, expectedIdx) {
} t.Fatalf("next mismatch: expected %x, got %x", expectedIdx, idx)
if ch == nil {
t.Fatalf("expected to find update, got none")
}
exp := payload
ts, payload, err := feeds.FromChunk(ch)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(payload, exp) {
t.Fatalf("payload mismatch: expected %x, got %x", exp, payload)
}
if ts != uint64(at) {
t.Fatalf("timestamp mismatch: expected %v, got %v", at, ts)
}
}
} }
} }
}) }
} }
} }
func TestFinderRandomIntervals(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) { func TestFinderRandomIntervals(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) {
for i := 0; i < 5; i++ { for j := 0; j < 3; j++ {
t.Run(fmt.Sprintf("random intervals %d", i), func(t *testing.T) { t.Run(fmt.Sprintf("random intervals %d", j), func(t *testing.T) {
storer := mock.NewStorer() var i int64
topicStr := "testtopic" var n int
topic, err := crypto.LegacyKeccak256([]byte(topicStr)) nextf := func() (bool, int64) {
if err != nil { i += int64(rand.Intn(1<<10) + 1)
t.Fatal(err) n++
} return n == 40, i
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)
updater, err := updaterf(storer, signer, topic)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
payload := []byte("payload")
var at int64
ats := make([]int64, 100)
for j := 0; j < 50; j++ {
ats[j] = at
at += int64(rand.Intn(1<<10) + 1)
err = updater.Update(ctx, ats[j], payload)
if err != nil {
t.Fatal(err)
}
}
finder := finderf(storer, updater.Feed())
for j := 1; j < 49; j++ {
diff := ats[j+1] - ats[j]
for at := ats[j]; at < ats[j+1]; at += int64(rand.Intn(int(diff)) + 1) {
for after := int64(0); after < at; after += int64(rand.Intn(int(at))) {
ch, _, _, err := finder.At(ctx, at, after)
if err != nil {
t.Fatal(err)
}
if ch == nil {
t.Fatalf("expected to find update, got none")
}
exp := payload
ts, payload, err := feeds.FromChunk(ch)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(payload, exp) {
t.Fatalf("payload mismatch: expected %x, got %x", exp, payload)
}
if ts != uint64(ats[j]) {
t.Fatalf("timestamp mismatch: expected %v, got %v", ats[j], ts)
}
}
}
} }
TestFinderIntervals(t, nextf, finderf, updaterf)
}) })
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment