Commit 42c12ede authored by acud's avatar acud Committed by GitHub

feeds: add feed resolution (#1120)

* feeds: 
   - generic feed support building on single owner chunks
   - subpackages for epochs and sequence based indexing scheme implementing timestamped feeds 
   - Lookup interface with both sequential and concurrent lookup algorithms 
   - Updater interface 
Co-authored-by: default avatarzelig <viktor.tron@gmail.com>
parent 824636a2
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package epochs implements time-based feeds using epochs as index
// and provide sequential as well as concurrent lookup algorithms
package epochs
import (
"encoding/binary"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds"
)
const (
maxLevel = 32
)
var _ feeds.Index = (*epoch)(nil)
// epoch is referencing a slot in the epoch grid
type epoch struct {
start uint64
level uint8
}
// MarshalBinary implements the BinaryMarshaler interface
func (e *epoch) MarshalBinary() ([]byte, error) {
epochBytes := make([]byte, 8)
binary.BigEndian.PutUint64(epochBytes, e.start)
return crypto.LegacyKeccak256(append(epochBytes, e.level))
}
// lca calculates the lowest common ancestor epoch given two unix times
func lca(at, after int64) *epoch {
if after == 0 {
return &epoch{0, maxLevel}
}
diff := uint64(at - after)
length := uint64(1)
var level uint8
for level < maxLevel && (length < diff || uint64(at)/length != uint64(after)/length) {
length <<= 1
level++
}
start := (uint64(after) / length) * length
return &epoch{start, level}
}
func next(e *epoch, last int64, at uint64) *epoch {
if e == nil {
return &epoch{0, maxLevel}
}
if e.start+e.length() > at {
return e.childAt(at)
}
return lca(int64(at), last).childAt(at)
}
// parent returns the ancestor of an epoch
// the call is unsafe in that it must not be called on a toplevel epoch
func (e *epoch) parent() *epoch {
length := e.length() << 1
start := (e.start / length) * length
return &epoch{start, e.level + 1}
}
// left returns the left sister of an epoch
// it is unsafe in that it must not be called on a left sister epoch
func (e *epoch) left() *epoch {
return &epoch{e.start - e.length(), e.level}
}
// at returns the left of right child epoch of an epoch depending on where `at` falls
// it is unsafe in that it must not be called with an at that does not fall within the epoch
func (e *epoch) childAt(at uint64) *epoch {
e = &epoch{e.start, e.level - 1}
if at&e.length() > 0 {
e.start |= e.length()
}
return e
}
// isLeft returns true if epoch is a left sister of its parent
func (e *epoch) isLeft() bool {
return e.start&e.length() == 0
}
// length returns the span of the epoch
func (e *epoch) length() uint64 {
return 1 << e.level
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package epochs
import (
"context"
"errors"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
var _ feeds.Lookup = (*finder)(nil)
var _ feeds.Lookup = (*asyncFinder)(nil)
// finder encapsulates a chunk store getter and a feed and provides
// non-concurrent lookup methods
type finder struct {
getter *feeds.Getter
}
// NewFinder constructs an AsyncFinder
func NewFinder(getter storage.Getter, feed *feeds.Feed) feeds.Lookup {
return &finder{feeds.NewGetter(getter, feed)}
}
// At looks up the version valid at time `at`
// after is a unix time hint of the latest known update
func (f *finder) At(ctx context.Context, at, after int64) (swarm.Chunk, error) {
e, ch, err := f.common(ctx, at, after)
if err != nil {
return nil, err
}
return f.at(ctx, uint64(at), e, ch)
}
// common returns the lowest common ancestor for which a feed update chunk is found in the chunk store
func (f *finder) common(ctx context.Context, at, after int64) (*epoch, swarm.Chunk, error) {
for e := lca(at, after); ; e = e.parent() {
ch, err := f.getter.Get(ctx, e)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
if e.level == maxLevel {
return e, nil, nil
}
continue
}
return e, nil, err
}
ts, err := feeds.UpdatedAt(ch)
if err != nil {
return e, nil, err
}
if ts <= uint64(at) {
return e, ch, nil
}
}
}
// at is a non-concurrent recursive Finder function to find the version update chunk at time `at`
func (f *finder) at(ctx context.Context, at uint64, e *epoch, ch swarm.Chunk) (swarm.Chunk, error) {
uch, err := f.getter.Get(ctx, e)
if err != nil {
// error retrieving
if !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
// epoch not found on branch
if e.isLeft() { // no lower resolution
return ch, nil
}
// traverse earlier branch
return f.at(ctx, e.start-1, e.left(), ch)
}
// epoch found
// check if timestamp is later then target
ts, err := feeds.UpdatedAt(uch)
if err != nil {
return nil, err
}
if ts > at {
if e.isLeft() {
return ch, nil
}
return f.at(ctx, e.start-1, e.left(), ch)
}
if e.level == 0 { // matching update time or finest resolution
return uch, nil
}
// continue traversing based on at
return f.at(ctx, at, e.childAt(at), uch)
}
type result struct {
path *path
chunk swarm.Chunk
*epoch
}
// asyncFinder encapsulates a chunk store getter and a feed and provides
// non-concurrent lookup methods
type asyncFinder struct {
getter *feeds.Getter
}
type path struct {
at int64
top *result
bottom *result
cancel chan struct{}
}
func newPath(at int64) *path {
return &path{at, nil, nil, make(chan struct{})}
}
// NewAsyncFinder constructs an AsyncFinder
func NewAsyncFinder(getter storage.Getter, feed *feeds.Feed) feeds.Lookup {
return &asyncFinder{feeds.NewGetter(getter, feed)}
}
func (f *asyncFinder) get(ctx context.Context, at int64, e *epoch) (swarm.Chunk, error) {
u, err := f.getter.Get(ctx, e)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
return nil, nil
}
ts, err := feeds.UpdatedAt(u)
if err != nil {
return nil, err
}
diff := at - int64(ts)
if diff < 0 {
return nil, nil
}
return u, nil
}
// at attempts to retrieve all epoch chunks on the path for `at` concurrently
func (f *asyncFinder) at(ctx context.Context, at int64, p *path, e *epoch, c chan<- *result) {
for ; ; e = e.childAt(uint64(at)) {
select {
case <-p.cancel:
return
default:
}
go func(e *epoch) {
uch, err := f.get(ctx, at, e)
if err != nil {
return
}
select {
case c <- &result{p, uch, e}:
case <-p.cancel:
}
}(e)
if e.level == 0 {
return
}
}
}
// At looks up the version valid at time `at`
// after is a unix time hint of the latest known update
func (f *asyncFinder) At(ctx context.Context, at, after int64) (swarm.Chunk, error) {
c := make(chan *result)
go f.at(ctx, at, newPath(at), &epoch{0, maxLevel}, c)
LOOP:
for r := range c {
p := r.path
// ignore result from paths already cancelled
select {
case <-p.cancel:
continue LOOP
default:
}
if r.chunk != nil { // update chunk for epoch found
if r.level == 0 { // return if deepest level epoch
return r.chunk, nil
}
// ignore if higher level than the deepest epoch found
if p.top != nil && p.top.level < r.level {
continue LOOP
}
p.top = r
} else { // update chunk for epoch not found
// if top level than return with no update found
if r.level == 32 {
close(p.cancel)
return nil, nil
}
// if topmost epoch not found, then set bottom
if p.bottom == nil || p.bottom.level < r.level {
p.bottom = r
}
}
// found - not found for two consecutive epochs
if p.top != nil && p.bottom != nil && p.top.level == p.bottom.level+1 {
// cancel path
close(p.cancel)
if p.bottom.isLeft() {
return p.top.chunk, nil
}
// recursive call on new path through left sister
np := newPath(at)
np.top = &result{np, p.top.chunk, p.top.epoch}
go f.at(ctx, int64(p.bottom.start-1), np, p.bottom.left(), c)
}
}
return nil, nil
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package epochs_test
import (
"context"
"errors"
"fmt"
"math/rand"
"testing"
"time"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/feeds/epochs"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
type timeout struct {
storage.Storer
}
var searchTimeout = 30 * time.Millisecond
// Get overrides the mock storer and introduces latency
func (t *timeout) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (swarm.Chunk, error) {
ch, err := t.Storer.Get(ctx, mode, addr)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
time.Sleep(searchTimeout)
}
return ch, err
}
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
return ch, nil
}
func BenchmarkFinder(b *testing.B) {
for _, i := range []int{0, 8, 30} {
for _, prefill := range []int64{1, 50} {
after := int64(50)
storer := &timeout{mock.NewStorer()}
topic := "testtopic"
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)
updater, err := epochs.NewUpdater(storer, signer, topic)
if err != nil {
b.Fatal(err)
}
payload := []byte("payload")
ctx := context.Background()
for at := int64(0); at < prefill; at++ {
err = updater.Update(ctx, at, payload)
if err != nil {
b.Fatal(err)
}
}
latest := after + (1 << i)
err = updater.Update(ctx, latest, payload)
if err != nil {
b.Fatal(err)
}
for _, j := range []int64{0, 8, 30} {
now := latest + 1<<j
for k, finder := range []feeds.Lookup{
epochs.NewFinder(storer, updater.Feed()),
epochs.NewAsyncFinder(storer, updater.Feed()),
} {
names := []string{"sync", "async"}
b.Run(fmt.Sprintf("%s:prefill=%d, latest=%d, now=%d", names[k], prefill, latest, now), func(b *testing.B) {
for n := 0; n < b.N; n++ {
_, err := finder.At(ctx, now, after)
if err != nil {
b.Fatal(err)
}
}
})
}
}
}
}
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package epochs_test
import (
"testing"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/feeds/epochs"
feedstesting "github.com/ethersphere/bee/pkg/feeds/testing"
"github.com/ethersphere/bee/pkg/storage"
)
func TestFinder(t *testing.T) {
testf := func(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic string) (feeds.Updater, error)) {
t.Run("basic", func(t *testing.T) {
feedstesting.TestFinderBasic(t, finderf, updaterf)
})
t.Run("fixed", func(t *testing.T) {
feedstesting.TestFinderFixIntervals(t, finderf, updaterf)
})
t.Run("random", func(t *testing.T) {
feedstesting.TestFinderRandomIntervals(t, finderf, updaterf)
})
}
t.Run("sync", func(t *testing.T) {
testf(t, epochs.NewFinder, epochs.NewUpdater)
})
t.Run("async", func(t *testing.T) {
testf(t, epochs.NewAsyncFinder, epochs.NewUpdater)
})
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package epochs
import (
"context"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/storage"
)
var _ feeds.Updater = (*updater)(nil)
// Updater encapsulates a feeds putter to generate successive updates for epoch based feeds
// it persists the last update
type updater struct {
*feeds.Putter
last int64
epoch *epoch
}
// NewUpdater constructs a feed updater
func NewUpdater(putter storage.Putter, signer crypto.Signer, topic string) (feeds.Updater, error) {
p, err := feeds.NewPutter(putter, signer, topic)
if err != nil {
return nil, err
}
return &updater{Putter: p}, nil
}
// Update pushes an update to the feed through the chunk stores
func (u *updater) Update(ctx context.Context, at int64, payload []byte) error {
e := next(u.epoch, u.last, uint64(at))
err := u.Put(ctx, e, at, payload)
if err != nil {
return err
}
u.last = at
u.epoch = e
return nil
}
func (u *updater) Feed() *feeds.Feed {
return u.Putter.Feed
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package feeds implements generic interfaces and methods for time-based feeds
// indexing schemes are implemented in subpackages
// - epochs
// - sequence
package feeds
import (
"encoding"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/swarm"
)
type id struct {
topic []byte
index []byte
}
var _ encoding.BinaryMarshaler = (*id)(nil)
func (i *id) MarshalBinary() ([]byte, error) {
return crypto.LegacyKeccak256(append(i.topic, i.index...))
}
// Feed is representing an epoch based feed
type Feed struct {
Topic []byte
Owner common.Address
}
// New constructs an epoch based feed from a human readable topic and an ether address
func New(topic string, owner common.Address) (*Feed, error) {
th, err := crypto.LegacyKeccak256([]byte(topic))
if err != nil {
return nil, err
}
return &Feed{th, owner}, nil
}
// Index is the interface for feed implementations
type Index interface {
encoding.BinaryMarshaler
}
// Update represents an update instance of a feed, i.e., pairing of a Feed with an Epoch
type Update struct {
*Feed
index Index
}
// Update called on a feed with an index and returns an Update
func (f *Feed) Update(index Index) *Update {
return &Update{f, index}
}
// Id calculates the identifier if a feed update to be used in single owner chunks
func (u *Update) Id() ([]byte, error) {
index, err := u.index.MarshalBinary()
if err != nil {
return nil, err
}
i := &id{u.Topic, index}
return i.MarshalBinary()
}
// Address calculates the soc address of a feed update
func (u *Update) Address() (swarm.Address, error) {
var addr swarm.Address
i, err := u.Id()
if err != nil {
return addr, err
}
owner, err := soc.NewOwner(u.Owner[:])
if err != nil {
return addr, err
}
return soc.CreateAddress(i, owner)
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package feeds
import (
"context"
"encoding/binary"
"fmt"
"time"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
// Lookup is the interface for time based feed lookup
type Lookup interface {
At(ctx context.Context, at, after int64) (swarm.Chunk, error)
}
// Getter encapsulates a chunk Getter getter and a feed and provides
// non-concurrent lookup methods
type Getter struct {
getter storage.Getter
*Feed
}
// NewGetter constructs a feed Getter
func NewGetter(getter storage.Getter, feed *Feed) *Getter {
return &Getter{getter, feed}
}
// Latest looks up the latest update of the feed
// after is a unix time hint of the latest known update
func Latest(ctx context.Context, l Lookup, after int64) (swarm.Chunk, error) {
return l.At(ctx, time.Now().Unix(), after)
}
// Get creates an update of the underlying feed at the given epoch
// and looks it up in the chunk Getter based on its address
func (f *Getter) Get(ctx context.Context, i Index) (swarm.Chunk, error) {
addr, err := f.Feed.Update(i).Address()
if err != nil {
return nil, err
}
return f.getter.Get(ctx, storage.ModeGetRequest, addr)
}
// FromChunk parses out the timestamp and the payload
func FromChunk(ch swarm.Chunk) (uint64, []byte, error) {
s, err := soc.FromChunk(ch)
if err != nil {
return 0, nil, err
}
cac := s.Chunk
if len(cac.Data()) < 16 {
return 0, nil, fmt.Errorf("feed update payload too short")
}
payload := cac.Data()[16:]
at := binary.BigEndian.Uint64(cac.Data()[8:16])
return at, payload, nil
}
// UpdatedAt extracts the time of feed other than update
func UpdatedAt(ch swarm.Chunk) (uint64, error) {
d := ch.Data()
if len(d) < 113 {
return 0, fmt.Errorf("too short: %d", len(d))
}
// a soc chunk with time information in the wrapped content addressed chunk
// 0-32 index,
// 65-97 signature,
// 98-105 span of wrapped chunk
// 105-113 timestamp
return binary.BigEndian.Uint64(d[105:113]), nil
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package feeds
import (
"context"
"encoding/binary"
"github.com/ethersphere/bee/pkg/bmtpool"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
// Updater is the generic interface f
type Updater interface {
Update(ctx context.Context, at int64, payload []byte) error
Feed() *Feed
}
// Putter encapsulates a chunk store putter and a Feed to store feed updates
type Putter struct {
putter storage.Putter
signer crypto.Signer
*Feed
}
// NewPutter constructs a feed Putter
func NewPutter(putter storage.Putter, signer crypto.Signer, topic string) (*Putter, error) {
owner, err := signer.EthereumAddress()
if err != nil {
return nil, err
}
feed, err := New(topic, owner)
if err != nil {
return nil, err
}
return &Putter{putter, signer, feed}, nil
}
// Put pushes an update to the feed through the chunk stores
func (u *Putter) Put(ctx context.Context, i Index, at int64, payload []byte) error {
id, err := u.Feed.Update(i).Id()
if err != nil {
return err
}
cac, err := toChunk(uint64(at), payload)
if err != nil {
return err
}
ch, err := soc.NewChunk(id, cac, u.signer)
if err != nil {
return err
}
_, err = u.putter.Put(ctx, storage.ModePutUpload, ch)
return err
}
func toChunk(at uint64, payload []byte) (swarm.Chunk, error) {
hasher := bmtpool.Get()
defer bmtpool.Put(hasher)
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts, at)
content := append(ts, payload...)
_, err := hasher.Write(content)
if err != nil {
return nil, err
}
span := make([]byte, 8)
binary.LittleEndian.PutUint64(span, uint64(len(content)))
err = hasher.SetSpanBytes(span)
if err != nil {
return nil, err
}
return swarm.NewChunk(swarm.NewAddress(hasher.Sum(nil)), append(append([]byte{}, span...), content...)), nil
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sequence_test
import (
"context"
"errors"
"fmt"
"math/rand"
"testing"
"time"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/feeds/sequence"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
type timeout struct {
storage.Storer
}
var searchTimeout = 30 * time.Millisecond
// Get overrides the mock storer and introduces latency
func (t *timeout) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (swarm.Chunk, error) {
ch, err := t.Storer.Get(ctx, mode, addr)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
time.Sleep(searchTimeout)
}
return ch, err
}
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
return ch, nil
}
func BenchmarkFinder(b *testing.B) {
for _, prefill := range []int64{1, 100, 1000, 5000} {
storer := &timeout{mock.NewStorer()}
topic := "testtopic"
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)
updater, err := sequence.NewUpdater(storer, signer, topic)
if err != nil {
b.Fatal(err)
}
payload := []byte("payload")
ctx := context.Background()
for at := int64(0); at < prefill; at++ {
err = updater.Update(ctx, at, payload)
if err != nil {
b.Fatal(err)
}
}
latest := prefill
err = updater.Update(ctx, latest, payload)
if err != nil {
b.Fatal(err)
}
now := prefill
for k, finder := range []feeds.Lookup{
sequence.NewFinder(storer, updater.Feed()),
sequence.NewAsyncFinder(storer, updater.Feed()),
} {
names := []string{"sync", "async"}
b.Run(fmt.Sprintf("%s:prefill=%d, latest/now=%d", names[k], prefill, now), func(b *testing.B) {
for n := 0; n < b.N; n++ {
_, err := finder.At(ctx, now, 0)
if err != nil {
b.Fatal(err)
}
}
})
}
}
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sequence_test
import (
"testing"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/feeds/sequence"
feedstesting "github.com/ethersphere/bee/pkg/feeds/testing"
"github.com/ethersphere/bee/pkg/storage"
)
func TestFinder(t *testing.T) {
testf := func(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic string) (feeds.Updater, error)) {
t.Run("basic", func(t *testing.T) {
feedstesting.TestFinderBasic(t, finderf, updaterf)
})
t.Run("fixed", func(t *testing.T) {
feedstesting.TestFinderFixIntervals(t, finderf, updaterf)
})
t.Run("random", func(t *testing.T) {
feedstesting.TestFinderRandomIntervals(t, finderf, updaterf)
})
}
t.Run("sync", func(t *testing.T) {
testf(t, sequence.NewFinder, sequence.NewUpdater)
})
t.Run("async", func(t *testing.T) {
testf(t, sequence.NewAsyncFinder, sequence.NewUpdater)
})
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package sequence provides implementation of sequential indexing for
// time-based feeds
// this feed type is best suited for
// - version updates
// - followed updates
// - frequent or regular-interval updates
package sequence
import (
"context"
"encoding/binary"
"errors"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
var _ feeds.Index = (*index)(nil)
var _ feeds.Lookup = (*finder)(nil)
var _ feeds.Lookup = (*asyncFinder)(nil)
var _ feeds.Updater = (*updater)(nil)
type index struct {
index uint64
}
func (i *index) MarshalBinary() ([]byte, error) {
indexBytes := make([]byte, 8)
binary.BigEndian.PutUint64(indexBytes, i.index)
return indexBytes, nil
}
// finder encapsulates a chunk store getter and a feed and provides
// non-concurrent lookup methods
type finder struct {
getter *feeds.Getter
}
// NewFinder constructs an Finder
func NewFinder(getter storage.Getter, feed *feeds.Feed) feeds.Lookup {
return &finder{feeds.NewGetter(getter, feed)}
}
// At looks up the version valid at time `at`
// after is a unix time hint of the latest known update
func (f *finder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, err error) {
for i := uint64(0); ; i++ {
u, err := f.getter.Get(ctx, &index{i})
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return nil, err
}
return ch, nil
}
ts, err := feeds.UpdatedAt(u)
if err != nil {
return nil, err
}
if ts > uint64(at) {
return ch, nil
}
ch = u
}
}
// asyncFinder encapsulates a chunk store getter and a feed and provides
// non-concurrent lookup methods
type asyncFinder struct {
getter *feeds.Getter
}
// NewAsyncFinder constructs an AsyncFinder
func NewAsyncFinder(getter storage.Getter, feed *feeds.Feed) feeds.Lookup {
return &asyncFinder{feeds.NewGetter(getter, feed)}
}
type path struct {
latest result
base uint64
level int
cancel chan struct{}
cancelled bool
}
func (p *path) close() {
if !p.cancelled {
close(p.cancel)
p.cancelled = true
}
}
func newPath(base uint64) *path {
return &path{base: base, cancel: make(chan struct{})}
}
type result struct {
chunk swarm.Chunk
path *path
level int
seq uint64
diff int64
}
// At looks up the version valid at time `at`
// after is a unix time hint of the latest known update
func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, err error) {
ch, diff, err := f.get(ctx, at, 0)
if err != nil {
return nil, err
}
if ch == nil {
return nil, nil
}
if diff == 0 {
return ch, nil
}
c := make(chan result)
p := newPath(0)
p.latest.chunk = ch
for p.level = 1; diff>>p.level > 0; p.level++ {
}
quit := make(chan struct{})
defer close(quit)
go f.at(ctx, at, p, c, quit)
for r := range c {
p = r.path
if r.chunk == nil {
if r.level == 0 {
return p.latest.chunk, nil
}
if p.level < r.level {
continue
}
p.level = r.level - 1
} else {
if r.diff == 0 {
return r.chunk, nil
}
if p.latest.level > r.level {
continue
}
p.close()
p.latest = r
}
// below applies even if p.latest==maxLevel
if p.latest.level == p.level {
if p.level == 0 {
return p.latest.chunk, nil
}
p.close()
np := newPath(p.latest.seq)
np.level = p.level
np.latest.chunk = p.latest.chunk
go f.at(ctx, at, np, c, quit)
}
}
return nil, nil
}
func (f *asyncFinder) at(ctx context.Context, at int64, p *path, c chan<- result, quit <-chan struct{}) {
for i := p.level; i > 0; i-- {
select {
case <-p.cancel:
return
case <-quit:
return
default:
}
go func(i int) {
seq := p.base + (1 << i) - 1
ch, diff, err := f.get(ctx, at, seq)
if err != nil {
return
}
select {
case c <- result{ch, p, i, seq, diff}:
case <-quit:
}
}(i)
}
}
func (f *asyncFinder) get(ctx context.Context, at int64, seq uint64) (swarm.Chunk, int64, error) {
u, err := f.getter.Get(ctx, &index{seq})
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return nil, 0, err
}
// if 'not-found' error, then just silence and return nil chunk
return nil, 0, nil
}
ts, err := feeds.UpdatedAt(u)
if err != nil {
return nil, 0, err
}
diff := at - int64(ts)
// this means the update timestamp is later than the pivot time we are looking for
// handled as if the update was missing but with no uncertainty due to timeout
if diff < 0 {
return nil, 0, nil
}
return u, diff, nil
}
// updater encapsulates a feeds putter to generate successive updates for epoch based feeds
// it persists the last update
type updater struct {
*feeds.Putter
next uint64
}
// NewUpdater constructs a feed updater
func NewUpdater(putter storage.Putter, signer crypto.Signer, topic string) (feeds.Updater, error) {
p, err := feeds.NewPutter(putter, signer, topic)
if err != nil {
return nil, err
}
return &updater{Putter: p}, nil
}
// Update pushes an update to the feed through the chunk stores
func (u *updater) Update(ctx context.Context, at int64, payload []byte) error {
err := u.Put(ctx, &index{u.next}, at, payload)
if err != nil {
return err
}
u.next++
return nil
}
func (u *updater) Feed() *feeds.Feed {
return u.Putter.Feed
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// package testing provides tests for update and resolution of time-based feeds
package testing
import (
"bytes"
"context"
"fmt"
"math/rand"
"testing"
"time"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
)
func TestFinderBasic(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic string) (feeds.Updater, error)) {
storer := mock.NewStorer()
topic := "testtopic"
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)
updater, err := updaterf(storer, signer, topic)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
finder := finderf(storer, updater.Feed())
t.Run("no update", func(t *testing.T) {
ch, err := feeds.Latest(ctx, finder, 0)
if err != nil {
t.Fatal(err)
}
if ch != nil {
t.Fatalf("expected no update, got addr %v", ch.Address())
}
})
t.Run("first update", func(t *testing.T) {
payload := []byte("payload")
at := time.Now().Unix()
err = updater.Update(ctx, at, payload)
if err != nil {
t.Fatal(err)
}
ch, err := feeds.Latest(ctx, finder, 0)
if err != nil {
t.Fatal(err)
}
if ch == nil {
t.Fatalf("expected to find update, got none")
}
exp := payload
ts, payload, err := feeds.FromChunk(ch)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(payload, exp) {
t.Fatalf("result mismatch. want %8x... got %8x...", exp, payload)
}
if ts != uint64(at) {
t.Fatalf("timestamp mismatch: expected %v, got %v", at, ts)
}
})
}
func TestFinderFixIntervals(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic string) (feeds.Updater, error)) {
for _, tc := range []struct {
count int64
step int64
offset int64
}{
{50, 1, 0},
{50, 1, 10000},
{50, 100, 0},
{50, 100, 100000},
} {
t.Run(fmt.Sprintf("count=%d,step=%d,offset=%d", tc.count, tc.step, tc.offset), func(t *testing.T) {
storer := mock.NewStorer()
topic := "testtopic"
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)
updater, err := updaterf(storer, signer, topic)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
payload := []byte("payload")
for at := tc.offset; at < tc.offset+tc.count*tc.step; at += tc.step {
err = updater.Update(ctx, at, payload)
if err != nil {
t.Fatal(err)
}
}
finder := finderf(storer, updater.Feed())
for at := tc.offset; at < tc.offset+tc.count*tc.step; at += tc.step {
for after := tc.offset; after < at; after += tc.step {
step := int64(1)
if tc.step > 1 {
step = tc.step / 4
}
for now := at; now < at+tc.step; now += step {
ch, err := finder.At(ctx, now, after)
if err != nil {
t.Fatal(err)
}
if ch == nil {
t.Fatalf("expected to find update, got none")
}
exp := payload
ts, payload, err := feeds.FromChunk(ch)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(payload, exp) {
t.Fatalf("payload mismatch: expected %x, got %x", exp, payload)
}
if ts != uint64(at) {
t.Fatalf("timestamp mismatch: expected %v, got %v", at, ts)
}
}
}
}
})
}
}
func TestFinderRandomIntervals(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic string) (feeds.Updater, error)) {
for i := 0; i < 5; i++ {
t.Run(fmt.Sprintf("random intervals %d", i), func(t *testing.T) {
storer := mock.NewStorer()
topic := "testtopic"
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)
updater, err := updaterf(storer, signer, topic)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
payload := []byte("payload")
var at int64
ats := make([]int64, 100)
for j := 0; j < 50; j++ {
ats[j] = at
at += int64(rand.Intn(1<<10) + 1)
err = updater.Update(ctx, ats[j], payload)
if err != nil {
t.Fatal(err)
}
}
finder := finderf(storer, updater.Feed())
for j := 1; j < 49; j++ {
diff := ats[j+1] - ats[j]
for at := ats[j]; at < ats[j+1]; at += int64(rand.Intn(int(diff)) + 1) {
for after := int64(0); after < at; after += int64(rand.Intn(int(at))) {
ch, err := finder.At(ctx, at, after)
if err != nil {
t.Fatal(err)
}
if ch == nil {
t.Fatalf("expected to find update, got none")
}
exp := payload
ts, payload, err := feeds.FromChunk(ch)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(payload, exp) {
t.Fatalf("payload mismatch: expected %x, got %x", exp, payload)
}
if ts != uint64(ats[j]) {
t.Fatalf("timestamp mismatch: expected %v, got %v", ats[j], ts)
}
}
}
}
})
}
}
......@@ -17,7 +17,6 @@ import (
const (
IdSize = 32
SignatureSize = 65
AddressSize = crypto.AddressSize
minChunkSize = IdSize + SignatureSize + swarm.SpanSize
)
......@@ -31,7 +30,7 @@ type Owner struct {
// NewOwner creates a new Owner.
func NewOwner(address []byte) (*Owner, error) {
if len(address) != AddressSize {
if len(address) != crypto.AddressSize {
return nil, fmt.Errorf("invalid address %x", address)
}
return &Owner{
......@@ -247,7 +246,7 @@ func contentAddressedChunk(data, spanBytes []byte) (swarm.Chunk, error) {
}
s := hasher.Sum(nil)
payload := append(spanBytes, data...)
payload := append(append([]byte{}, spanBytes...), data...)
address := swarm.NewAddress(s)
return swarm.NewChunk(address, payload), nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment