Commit 553a1b56 authored by acud's avatar acud Committed by GitHub

perf(pullstorage): share common iterators in pullstorage (#1683)

reuses localstore SubscribePull results for peers that request the same interval at the same time
parent 425891bb
......@@ -7,6 +7,8 @@ package pullstorage
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/ethersphere/bee/pkg/storage"
......@@ -40,32 +42,88 @@ type Storer interface {
Has(ctx context.Context, addr swarm.Address) (bool, error)
}
type intervalChunks struct {
chs []swarm.Address
topmost uint64
err error
}
// ps wraps storage.Storer.
type ps struct {
storage.Storer
openSubs map[string][]chan intervalChunks
openSubsMu sync.Mutex
}
// New returns a new pullstorage Storer instance.
func New(storer storage.Storer) Storer {
return &ps{
Storer: storer,
Storer: storer,
openSubs: make(map[string][]chan intervalChunks),
}
}
// IntervalChunks collects chunk for a requested interval.
func (s *ps) IntervalChunks(ctx context.Context, bin uint8, from, to uint64, limit int) (chs []swarm.Address, topmost uint64, err error) {
var (
k = subKey(bin, from, to, limit)
c = make(chan intervalChunks)
)
s.openSubsMu.Lock()
if subs, ok := s.openSubs[k]; ok {
// some subscription already exists, add ours
// and wait for the result
subs = append(subs, c)
s.openSubs[k] = subs
s.openSubsMu.Unlock()
select {
case res := <-c:
// since this is a simple read from a channel, one slow
// peer requesting chunks will not affect another
return res.chs, res.topmost, res.err
case <-ctx.Done():
// note that there's no cleanup here of the existing channel.
// this is due to a possible deadlock in case notification is
// already ongoing (we cannot acquire the lock and the notifying
// goroutine will still potentially try to write to our channel,
// however since we're not selecting on the channel it will deadlock.
return nil, 0, ctx.Err()
}
}
s.openSubs[k] = make([]chan intervalChunks, 0)
s.openSubsMu.Unlock()
// call iterator, iterate either until upper bound or limit reached
// return addresses, topmost is the topmost bin ID
var (
timer *time.Timer
timerC <-chan time.Time
)
ch, dbClosed, stop := s.SubscribePull(ctx, bin, from, to)
defer func(start time.Time) {
stop()
if timer != nil {
timer.Stop()
}
// tell others about the results
s.openSubsMu.Lock()
for _, c := range s.openSubs[k] {
select {
case c <- intervalChunks{chs: chs, topmost: topmost, err: err}:
default:
// this is needed because the polling goroutine might go away in
// the meanwhile due to context cancellation, and therefore a
// simple write to the channel will necessarily result in a
// deadlock, since there's one reading on the other side, causing
// this goroutine to deadlock.
}
}
delete(s.openSubs, k)
s.openSubsMu.Unlock()
}(time.Now())
var nomore bool
......@@ -141,3 +199,7 @@ func (s *ps) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk)
_, err := s.Storer.Put(ctx, mode, chs...)
return err
}
func subKey(bin uint8, from, to uint64, limit int) string {
return fmt.Sprintf("%d_%d_%d_%d", bin, from, to, limit)
}
......@@ -9,8 +9,10 @@ import (
"crypto/rand"
"errors"
"io/ioutil"
"reflect"
"testing"
"time"
"unsafe"
"github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging"
......@@ -298,6 +300,68 @@ func TestIntervalChunks_Localstore(t *testing.T) {
}
}
// TestIntervalChunks_IteratorShare tests that two goroutines
// with the same subscription call the SubscribePull only once
// and that results are shared between both of them.
func TestIntervalChunks_IteratorShare(t *testing.T) {
desc := someDescriptors(0, 2)
ps, db := newPullStorage(t, mock.WithSubscribePullChunks(desc...), mock.WithPartialInterval(true))
go func() {
// delay is needed in order to have the iterator
// linger for a bit longer for more chunks.
<-time.After(200 * time.Millisecond)
// add chunks to subscribe pull on the storage mock
db.MorePull(someDescriptors(1, 3, 4)...)
}()
type result struct {
addrs []swarm.Address
top uint64
}
sched := make(chan struct{})
c := make(chan result)
go func() {
close(sched)
addrs, topmost, err := ps.IntervalChunks(context.Background(), 0, 0, 5, limit)
if err != nil {
t.Errorf("internal goroutine: %v", err)
}
c <- result{addrs, topmost}
}()
<-sched // wait for goroutine to get scheduled
addrs, topmost, err := ps.IntervalChunks(context.Background(), 0, 0, 5, limit)
if err != nil {
t.Fatal(err)
}
res := <-c
if l := len(addrs); l != 5 {
t.Fatalf("want %d addrs but got %d", 5, l)
}
// highest chunk we sent had BinID 5
exp := uint64(5)
if topmost != exp {
t.Fatalf("expected topmost %d but got %d", exp, topmost)
}
if c := db.SubscribePullCalls(); c != 1 {
t.Fatalf("wanted 1 subscribe pull calls, got %d", c)
}
// check that results point to same array
sh := (*reflect.SliceHeader)(unsafe.Pointer(&res.addrs))
sh2 := (*reflect.SliceHeader)(unsafe.Pointer(&addrs))
if sh.Data != sh2.Data {
t.Fatalf("results not shared between goroutines. ptr1 %d ptr2 %d", sh.Data, sh2.Data)
}
}
func newPullStorage(t *testing.T, o ...mock.Option) (pullstorage.Storer, *mock.MockStorer) {
db := mock.NewStorer(o...)
ps := pullstorage.New(db)
......
......@@ -499,6 +499,14 @@ func (s *Syncer) Close() error {
defer close(cc)
s.wg.Wait()
}()
// cancel all contexts
s.ruidMtx.Lock()
for _, c := range s.ruidCtx {
c()
}
s.ruidMtx.Unlock()
select {
case <-cc:
case <-time.After(10 * time.Second):
......
......@@ -21,6 +21,7 @@ type MockStorer struct {
pinnedAddress []swarm.Address // Stores the pinned address
pinnedCounter []uint64 // and its respective counter. These are stored as slices to preserve the order.
subpull []storage.Descriptor
subpullCalls int
partialInterval bool
morePull chan struct{}
mtx sync.Mutex
......@@ -220,6 +221,7 @@ func (m *MockStorer) LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
}
func (m *MockStorer) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (<-chan storage.Descriptor, <-chan struct{}, func()) {
m.subpullCalls++
c := make(chan storage.Descriptor)
done := make(chan struct{})
stop := func() {
......@@ -275,6 +277,10 @@ func (m *MockStorer) SubscribePull(ctx context.Context, bin uint8, since, until
return c, m.quit, stop
}
func (m *MockStorer) SubscribePullCalls() int {
return m.subpullCalls
}
func (m *MockStorer) MorePull(d ...storage.Descriptor) {
// clear out what we already have in subpull
m.mtx.Lock()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment