Commit e688199e authored by Rodrigo Q. Saramago's avatar Rodrigo Q. Saramago Committed by GitHub

perf: feeds lookup improvements (#1548)

parent 68e0b591
...@@ -15,6 +15,8 @@ import ( ...@@ -15,6 +15,8 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"sync"
"time"
"github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds" "github.com/ethersphere/bee/pkg/feeds"
...@@ -26,10 +28,12 @@ import ( ...@@ -26,10 +28,12 @@ import (
// 8 spans 2^8 updates // 8 spans 2^8 updates
const DefaultLevels = 8 const DefaultLevels = 8
var _ feeds.Index = (*index)(nil) var (
var _ feeds.Lookup = (*finder)(nil) _ feeds.Index = (*index)(nil)
var _ feeds.Lookup = (*asyncFinder)(nil) _ feeds.Lookup = (*finder)(nil)
var _ feeds.Updater = (*updater)(nil) _ feeds.Lookup = (*asyncFinder)(nil)
_ feeds.Updater = (*updater)(nil)
)
// index just wraps a uint64. implements the feeds.Index interface // index just wraps a uint64. implements the feeds.Index interface
type index struct { type index struct {
...@@ -139,19 +143,20 @@ type result struct { ...@@ -139,19 +143,20 @@ type result struct {
chunk swarm.Chunk // the chunk found chunk swarm.Chunk // the chunk found
interval *interval // the interval it belongs to interval *interval // the interval it belongs to
level int // the level within the interval level int // the level within the interval
index uint64 // the actual seqeuence index of the update index uint64 // the actual sequence index of the update
} }
// At looks up the version valid at time `at` // At looks up the version valid at time `at`
// after is a unix time hint of the latest known update // after is a unix time hint of the latest known update
func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, cur, next feeds.Index, err error) { func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, cur, next feeds.Index, err error) {
// first lookup update at the 0 index // first lookup update at the 0 index
ch, err = f.get(ctx, at, 0) // TODO: consider receive after as uint
ch, err = f.get(ctx, at, uint64(after))
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
if ch == nil { if ch == nil {
return nil, nil, &index{0}, nil return nil, nil, &index{uint64(after)}, nil
} }
// if chunk exists construct an initial interval with base=0 // if chunk exists construct an initial interval with base=0
c := make(chan *result) c := make(chan *result)
...@@ -182,7 +187,7 @@ func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, ...@@ -182,7 +187,7 @@ func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk,
} }
i.found = r i.found = r
} }
// below applies even if i.latest==ceilingLevel in which case we just continue with // below applies even if i.latest==ceilingLevel in which case we just continue with
// DefaultLevel lookaheads // DefaultLevel lookaheads
if i.found.level == i.notFound { if i.found.level == i.notFound {
if i.found.level == 0 { if i.found.level == 0 {
...@@ -198,38 +203,52 @@ func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, ...@@ -198,38 +203,52 @@ func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk,
return nil, nil, nil, nil return nil, nil, nil, nil
} }
// at launches concurrent lookups at exponential intervals after th c starting from further // at launches concurrent lookups at exponential intervals after the starting from further
func (f *asyncFinder) at(ctx context.Context, at int64, min int, i *interval, c chan<- *result, quit <-chan struct{}) { func (f *asyncFinder) at(ctx context.Context, at int64, min int, i *interval, c chan<- *result, quit <-chan struct{}) {
stop := make(chan struct{}, 1) var wg sync.WaitGroup
defer wg.Wait()
wg.Add(i.level)
for l := i.level; l > min; l-- { for l := i.level; l > min; l-- {
select { select {
case <-stop: // if a chunk is found
return
case <-quit: // if the parent process quit case <-quit: // if the parent process quit
return return
default: default:
} }
go func(l int) { go func(l int) {
// TODO: remove hardcoded timeout and define it as constant or inject in the getter.
reqCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer func() {
wg.Done()
cancel()
}()
index := i.base + (1 << l) - 1 index := i.base + (1 << l) - 1
ch, err := f.get(ctx, at, index) chunk := f.asyncGet(reqCtx, at, index)
if err != nil {
return
}
// if a chunk is found, stop the iterationq
if ch != nil {
select {
case stop <- struct{}{}:
default:
}
}
select { select {
case c <- &result{ch, i, l, index}: case ch := <-chunk:
c <- &result{ch, i, l, index}
case <-reqCtx.Done():
c <- &result{nil, i, l, index}
case <-quit: case <-quit:
} }
}(l) }(l)
} }
} }
func (f *asyncFinder) asyncGet(ctx context.Context, at int64, index uint64) <-chan swarm.Chunk {
c := make(chan swarm.Chunk)
go func() {
defer close(c)
ch, err := f.get(ctx, at, index)
if err != nil {
return
}
c <- ch
}()
return c
}
// get performs a lookup of an update chunk, returns nil (not error) if not found // get performs a lookup of an update chunk, returns nil (not error) if not found
func (f *asyncFinder) get(ctx context.Context, at int64, idx uint64) (swarm.Chunk, error) { func (f *asyncFinder) get(ctx context.Context, at int64, idx uint64) (swarm.Chunk, error) {
u, err := f.getter.Get(ctx, &index{idx}) u, err := f.getter.Get(ctx, &index{idx})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment