Commit 1b0839dd authored by Janoš Guljaš's avatar Janoš Guljaš Committed by GitHub

feat: pullstorage iterator share (#1891)

parent a703f308
......@@ -76,5 +76,6 @@ require (
gopkg.in/yaml.v2 v2.3.0
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
resenje.org/singleflight v0.2.0 // indirect
resenje.org/web v0.4.3
)
......@@ -1318,6 +1318,10 @@ resenje.org/jsonhttp v0.2.0/go.mod h1:EDyeguyTWj2fU3D3SCE0qNTgthzyEkHYLM1uu0uikH
resenje.org/logging v0.1.5/go.mod h1:1IdoCm3+UwYfsplxDGV2pHCkUrLlQzlWwp4r28XfPx4=
resenje.org/marshal v0.1.1/go.mod h1:P7Cla6Ju5CFvW4Y8JbRgWX1Hcy4L1w4qcCsyadO7G94=
resenje.org/recovery v0.1.1/go.mod h1:3S6aCVKMJEWsSAb61oZTteaiqkIfQPTr1RdiWnRbhME=
resenje.org/singleflight v0.1.0 h1:jrf13EkBy4eX0514qzn4cfIww46YmAWSb/PNreARPzA=
resenje.org/singleflight v0.1.0/go.mod h1:RLR5DMmLXBtth52XGBWJuX3wrOFckksRzIKvWW08FFo=
resenje.org/singleflight v0.2.0 h1:nJ17VAZunMiFrfrltQ4Qs4r9MIP1pZC8u+0iSUTNnvQ=
resenje.org/singleflight v0.2.0/go.mod h1:plheHgw2rd77IH3J6aN0Lu2JvMvHXoLknDwb6vN0dsE=
resenje.org/web v0.4.3 h1:G9vceKKGvsVg0WpyafJEEMHfstoxSO8rG/1Bo7fOkhw=
resenje.org/web v0.4.3/go.mod h1:GZw/Jt7IGIYlytsyGdAV5CytZnaQu7GV2u1LLuViihc=
resenje.org/x v0.2.4/go.mod h1:1b2Xpo29FRc3IMvg/u46/IyjySl5IjvtuSjXTA/AOnk=
......
......@@ -7,10 +7,12 @@ package pullstorage
import (
"context"
"errors"
"fmt"
"time"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"resenje.org/singleflight"
)
var (
......@@ -43,6 +45,7 @@ type Storer interface {
// ps wraps storage.Storer.
type ps struct {
storage.Storer
intervalsSF singleflight.Group
}
// New returns a new pullstorage Storer instance.
......@@ -54,68 +57,82 @@ func New(storer storage.Storer) Storer {
// IntervalChunks collects chunk for a requested interval.
func (s *ps) IntervalChunks(ctx context.Context, bin uint8, from, to uint64, limit int) (chs []swarm.Address, topmost uint64, err error) {
// call iterator, iterate either until upper bound or limit reached
// return addresses, topmost is the topmost bin ID
var (
timer *time.Timer
timerC <-chan time.Time
)
ch, dbClosed, stop := s.SubscribePull(ctx, bin, from, to)
defer func(start time.Time) {
stop()
if timer != nil {
timer.Stop()
}
}(time.Now())
var nomore bool
type result struct {
chs []swarm.Address
topmost uint64
}
LOOP:
for limit > 0 {
select {
case v, ok := <-ch:
if !ok {
nomore = true
break LOOP
}
chs = append(chs, v.Address)
if v.BinID > topmost {
topmost = v.BinID
v, _, err := s.intervalsSF.Do(ctx, fmt.Sprintf("%v-%v-%v-%v", bin, from, to, limit), func(ctx context.Context) (interface{}, error) {
// call iterator, iterate either until upper bound or limit reached
// return addresses, topmost is the topmost bin ID
var (
timer *time.Timer
timerC <-chan time.Time
)
ch, dbClosed, stop := s.SubscribePull(ctx, bin, from, to)
defer func(start time.Time) {
stop()
if timer != nil {
timer.Stop()
}
limit--
if timer == nil {
timer = time.NewTimer(batchTimeout)
} else {
if !timer.Stop() {
<-timer.C
}(time.Now())
var nomore bool
LOOP:
for limit > 0 {
select {
case v, ok := <-ch:
if !ok {
nomore = true
break LOOP
}
chs = append(chs, v.Address)
if v.BinID > topmost {
topmost = v.BinID
}
limit--
if timer == nil {
timer = time.NewTimer(batchTimeout)
} else {
if !timer.Stop() {
<-timer.C
}
timer.Reset(batchTimeout)
}
timer.Reset(batchTimeout)
timerC = timer.C
case <-ctx.Done():
return nil, ctx.Err()
case <-timerC:
// return batch if new chunks are not received after some time
break LOOP
}
timerC = timer.C
}
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
case <-timerC:
// return batch if new chunks are not received after some time
break LOOP
return nil, ctx.Err()
case <-dbClosed:
return nil, ErrDbClosed
default:
}
}
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
case <-dbClosed:
return nil, 0, ErrDbClosed
default:
}
if nomore {
// end of interval reached. no more chunks so interval is complete
// return requested `to`. it could be that len(chs) == 0 if the interval
// is empty
topmost = to
}
if nomore {
// end of interval reached. no more chunks so interval is complete
// return requested `to`. it could be that len(chs) == 0 if the interval
// is empty
topmost = to
}
return &result{chs: chs, topmost: topmost}, nil
})
return chs, topmost, nil
if err != nil {
return nil, 0, err
}
r := v.(*result)
return r.chs, r.topmost, nil
}
// Cursors gets the last BinID for every bin in the local storage
......
......@@ -5,12 +5,17 @@
package pullstorage_test
import (
"bytes"
"context"
"crypto/rand"
"errors"
"io/ioutil"
"reflect"
"runtime/pprof"
"strings"
"testing"
"time"
"unsafe"
"github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging"
......@@ -298,6 +303,249 @@ func TestIntervalChunks_Localstore(t *testing.T) {
}
}
// TestIntervalChunks_IteratorShare tests that two goroutines
// with the same subscription call the SubscribePull only once
// and that results are shared between both of them.
func TestIntervalChunks_IteratorShare(t *testing.T) {
desc := someDescriptors(0, 2)
ps, db := newPullStorage(t, mock.WithSubscribePullChunks(desc...), mock.WithPartialInterval(true))
go func() {
// delay is needed in order to have the iterator
// linger for a bit longer for more chunks.
time.Sleep(200 * time.Millisecond)
// add chunks to subscribe pull on the storage mock
db.MorePull(someDescriptors(1, 3, 4)...)
}()
type result struct {
addrs []swarm.Address
top uint64
}
sched := make(chan struct{})
c := make(chan result)
go func() {
close(sched)
addrs, topmost, err := ps.IntervalChunks(context.Background(), 0, 0, 5, limit)
if err != nil {
t.Errorf("internal goroutine: %v", err)
}
c <- result{addrs, topmost}
}()
<-sched // wait for goroutine to get scheduled
addrs, topmost, err := ps.IntervalChunks(context.Background(), 0, 0, 5, limit)
if err != nil {
t.Fatal(err)
}
res := <-c
if l := len(addrs); l != 5 {
t.Fatalf("want %d addrs but got %d", 5, l)
}
// highest chunk we sent had BinID 5
exp := uint64(5)
if topmost != exp {
t.Fatalf("expected topmost %d but got %d", exp, topmost)
}
if c := db.SubscribePullCalls(); c != 1 {
t.Fatalf("wanted 1 subscribe pull calls, got %d", c)
}
// check that results point to same array
sh := (*reflect.SliceHeader)(unsafe.Pointer(&res.addrs))
sh2 := (*reflect.SliceHeader)(unsafe.Pointer(&addrs))
if sh.Data != sh2.Data {
t.Fatalf("results not shared between goroutines. ptr1 %d ptr2 %d", sh.Data, sh2.Data)
}
}
// TestIntervalChunks_IteratorShareContextCancellation
// 1. cancel first caller tests that if one of the goroutines waiting on some
// subscription is cancelled, the inflight request is not cancelled and the remaining
// callers get the results of the call which is shared
// 2. cancel all callers tests that if all the goroutines with the same subscription
// call are canceled, the call will be exited. During this time if a new goroutines comes,
// a fresh subscription call should be made and results should be shared
func TestIntervalChunks_IteratorShareContextCancellation(t *testing.T) {
type result struct {
addrs []swarm.Address
top uint64
err error
}
t.Run("cancel first caller", func(t *testing.T) {
ps, db := newPullStorage(t, mock.WithPartialInterval(true))
sched := make(chan struct{})
c := make(chan result, 3)
defer close(sched)
defer close(c)
ctx, cancel := context.WithCancel(context.Background())
go func() {
sched <- struct{}{}
addrs, topmost, err := ps.IntervalChunks(ctx, 0, 0, 5, limit)
c <- result{addrs, topmost, err}
// add more descriptors to unblock SubscribePull call after the first
// caller is cancelled
db.MorePull(someDescriptors(0, 1, 2, 3, 4)...)
}()
<-sched // wait for goroutine to get scheduled
go func() {
sched <- struct{}{}
addrs, topmost, err := ps.IntervalChunks(context.Background(), 0, 0, 5, limit)
c <- result{addrs, topmost, err}
}()
<-sched // wait for goroutine to get scheduled
go func() {
sched <- struct{}{}
addrs, topmost, err := ps.IntervalChunks(context.Background(), 0, 0, 5, limit)
c <- result{addrs, topmost, err}
}()
<-sched // wait for goroutine to get scheduled
// wait till all the routines are scheduled
waitStacks(t, "ethswarm.org/bee/pkg/pullsync/pullstorage/pullstorage.go:66", 3, 2*time.Second)
// cancel the first caller
cancel()
i := 0
var expected *result
for res := range c {
if i == 0 {
if res.err == nil {
t.Fatal("expected error for 1st attempt")
}
if !errors.Is(res.err, context.Canceled) {
t.Fatalf("invalid error type %v", res.err)
}
i++
continue
}
if expected == nil {
expected = &res
} else {
if res.top != expected.top || len(res.addrs) != 5 {
t.Fatalf("results are different expected: %v got: %v", expected, res)
}
// check that results point to same array
sh := (*reflect.SliceHeader)(unsafe.Pointer(&res.addrs))
sh2 := (*reflect.SliceHeader)(unsafe.Pointer(&expected.addrs))
if sh.Data != sh2.Data {
t.Fatalf("results not shared between goroutines. ptr1 %d ptr2 %d", sh.Data, sh2.Data)
}
}
i++
if i == 3 {
break
}
}
if c := db.SubscribePullCalls(); c != 1 {
t.Fatalf("wanted 1 subscribe pull calls, got %d", c)
}
})
t.Run("cancel all callers", func(t *testing.T) {
ps, db := newPullStorage(t, mock.WithPartialInterval(true))
sched := make(chan struct{})
c := make(chan result, 3)
defer close(sched)
defer close(c)
ctx, cancel := context.WithCancel(context.Background())
go func() {
sched <- struct{}{}
addrs, topmost, err := ps.IntervalChunks(ctx, 0, 0, 5, limit)
c <- result{addrs, topmost, err}
}()
<-sched // wait for goroutine to get scheduled
go func() {
sched <- struct{}{}
addrs, topmost, err := ps.IntervalChunks(ctx, 0, 0, 5, limit)
c <- result{addrs, topmost, err}
}()
<-sched // wait for goroutine to get scheduled
go func() {
sched <- struct{}{}
addrs, topmost, err := ps.IntervalChunks(ctx, 0, 0, 5, limit)
c <- result{addrs, topmost, err}
}()
<-sched // wait for goroutine to get scheduled
// wait till all the routines are scheduled
waitStacks(t, "ethswarm.org/bee/pkg/pullsync/pullstorage/pullstorage.go:66", 3, 2*time.Second)
// cancel all callers
cancel()
i := 0
for res := range c {
if res.err == nil {
t.Fatal("expected error for 1st attempt")
}
if !errors.Is(res.err, context.Canceled) {
t.Fatalf("invalid error type %v", res.err)
}
i++
if i == 3 {
break
}
}
go func() {
time.Sleep(time.Millisecond * 500)
db.MorePull(someDescriptors(0, 1, 2, 3, 4)...)
}()
addrs, topmost, err := ps.IntervalChunks(context.Background(), 0, 0, 5, limit)
if err != nil {
t.Fatalf("failed getting intervals %s", err.Error())
}
if topmost != uint64(5) {
t.Fatalf("expected topmost %d found %d", 5, topmost)
}
if len(addrs) != 5 {
t.Fatalf("wanted %d addresses found %d", 5, len(addrs))
}
// after all callers are cancelled, the SubscribePullCall should exit and the
// next caller will issue a fresh call
if c := db.SubscribePullCalls(); c != 2 {
t.Fatalf("wanted 2 subscribe pull calls, got %d", c)
}
})
}
// Taken from https://github.com/janos/singleflight/blob/master/singleflight_test.go#L344
// this is required to verify the goroutine scheduling for the tests
func waitStacks(t *testing.T, loc string, count int, timeout time.Duration) {
t.Helper()
for deadline := time.Now().Add(timeout); time.Now().Before(deadline); {
// Ensure that exact n goroutines are waiting at the desired stack trace.
var buf bytes.Buffer
if err := pprof.Lookup("goroutine").WriteTo(&buf, 2); err != nil {
t.Fatal(err)
}
c := strings.Count(buf.String(), loc)
if c == count {
break
}
time.Sleep(10 * time.Millisecond)
}
}
func newPullStorage(t *testing.T, o ...mock.Option) (pullstorage.Storer, *mock.MockStorer) {
db := mock.NewStorer(o...)
ps := pullstorage.New(db)
......
......@@ -27,6 +27,7 @@ type MockStorer struct {
quit chan struct{}
baseAddress []byte
bins []uint64
subPullCalls int
}
func WithSubscribePullChunks(chs ...storage.Descriptor) Option {
......@@ -220,6 +221,10 @@ func (m *MockStorer) LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
}
func (m *MockStorer) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (<-chan storage.Descriptor, <-chan struct{}, func()) {
m.mtx.Lock()
m.subPullCalls++
m.mtx.Unlock()
c := make(chan storage.Descriptor)
done := make(chan struct{})
stop := func() {
......@@ -287,6 +292,12 @@ func (m *MockStorer) MorePull(d ...storage.Descriptor) {
close(m.morePull)
}
func (m *MockStorer) SubscribePullCalls() int {
m.mtx.Lock()
defer m.mtx.Unlock()
return m.subPullCalls
}
func (m *MockStorer) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop func()) {
panic("not implemented") // TODO: Implement
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment