Commit 48f0a205 authored by acud's avatar acud Committed by GitHub

revert: pullstorage iterator share (#1832)

Reverts this change due to a goroutine leak
parent 65c1774d
......@@ -23,7 +23,7 @@ import (
)
var (
logMore = false // enable this to get more logging
logMore = true // enable this to get more logging
)
type Options struct {
......@@ -337,6 +337,9 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin
p.logger.Errorf("could not persist interval for peer %s, quitting", peer)
return
}
if logMore {
p.logger.Debugf("histSyncWorker pulled bin %d [%d:%d], peer %s", bin, s, top, peer)
}
}
}
......@@ -381,6 +384,10 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin
p.logger.Errorf("liveSyncWorker exit on add peer interval. peer %s bin %d from %d err %v", peer, bin, from, err)
return
}
if logMore {
p.logger.Debugf("liveSyncWorker pulled bin %d [%d:%d], peer %s", bin, from, top, peer)
}
from = top + 1
}
}
......
......@@ -6,8 +6,8 @@ package puller_test
import (
"errors"
"io/ioutil"
"math"
"os"
"testing"
"time"
......@@ -37,7 +37,7 @@ func TestOneSync(t *testing.T) {
var (
addr = test.RandomAddress()
cursors = []uint64{1000, 1000, 1000}
liveReplies = []uint64{1}
liveReplies = []uint64{1001}
)
puller, _, kad, pullsync := newPuller(opts{
......@@ -592,7 +592,7 @@ func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *moc
s := mock.NewStateStore()
ps := mockps.NewPullSync(ops.pullSync...)
kad := mockk.NewMockKademlia(ops.kad...)
logger := logging.New(ioutil.Discard, 6)
logger := logging.New(os.Stdout, 5)
o := puller.Options{
Bins: ops.bins,
......
......@@ -7,8 +7,6 @@ package pullstorage
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/ethersphere/bee/pkg/storage"
......@@ -42,88 +40,32 @@ type Storer interface {
Has(ctx context.Context, addr swarm.Address) (bool, error)
}
type intervalChunks struct {
chs []swarm.Address
topmost uint64
err error
}
// ps wraps storage.Storer.
type ps struct {
storage.Storer
openSubs map[string][]chan intervalChunks
openSubsMu sync.Mutex
}
// New returns a new pullstorage Storer instance.
func New(storer storage.Storer) Storer {
return &ps{
Storer: storer,
openSubs: make(map[string][]chan intervalChunks),
Storer: storer,
}
}
// IntervalChunks collects chunk for a requested interval.
func (s *ps) IntervalChunks(ctx context.Context, bin uint8, from, to uint64, limit int) (chs []swarm.Address, topmost uint64, err error) {
var (
k = subKey(bin, from, to, limit)
c = make(chan intervalChunks)
)
s.openSubsMu.Lock()
if subs, ok := s.openSubs[k]; ok {
// some subscription already exists, add ours
// and wait for the result
subs = append(subs, c)
s.openSubs[k] = subs
s.openSubsMu.Unlock()
select {
case res := <-c:
// since this is a simple read from a channel, one slow
// peer requesting chunks will not affect another
return res.chs, res.topmost, res.err
case <-ctx.Done():
// note that there's no cleanup here of the existing channel.
// this is due to a possible deadlock in case notification is
// already ongoing (we cannot acquire the lock and the notifying
// goroutine will still potentially try to write to our channel,
// however since we're not selecting on the channel it will deadlock.
return nil, 0, ctx.Err()
}
}
s.openSubs[k] = make([]chan intervalChunks, 0)
s.openSubsMu.Unlock()
// call iterator, iterate either until upper bound or limit reached
// return addresses, topmost is the topmost bin ID
var (
timer *time.Timer
timerC <-chan time.Time
)
ch, dbClosed, stop := s.SubscribePull(ctx, bin, from, to)
defer func(start time.Time) {
stop()
if timer != nil {
timer.Stop()
}
// tell others about the results
s.openSubsMu.Lock()
for _, c := range s.openSubs[k] {
select {
case c <- intervalChunks{chs: chs, topmost: topmost, err: err}:
default:
// this is needed because the polling goroutine might go away in
// the meanwhile due to context cancellation, and therefore a
// simple write to the channel will necessarily result in a
// deadlock, since there's one reading on the other side, causing
// this goroutine to deadlock.
}
}
delete(s.openSubs, k)
s.openSubsMu.Unlock()
}(time.Now())
var nomore bool
......@@ -199,7 +141,3 @@ func (s *ps) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk)
_, err := s.Storer.Put(ctx, mode, chs...)
return err
}
func subKey(bin uint8, from, to uint64, limit int) string {
return fmt.Sprintf("%d_%d_%d_%d", bin, from, to, limit)
}
......@@ -9,10 +9,8 @@ import (
"crypto/rand"
"errors"
"io/ioutil"
"reflect"
"testing"
"time"
"unsafe"
"github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging"
......@@ -300,68 +298,6 @@ func TestIntervalChunks_Localstore(t *testing.T) {
}
}
// TestIntervalChunks_IteratorShare tests that two goroutines
// with the same subscription call the SubscribePull only once
// and that results are shared between both of them.
func TestIntervalChunks_IteratorShare(t *testing.T) {
desc := someDescriptors(0, 2)
ps, db := newPullStorage(t, mock.WithSubscribePullChunks(desc...), mock.WithPartialInterval(true))
go func() {
// delay is needed in order to have the iterator
// linger for a bit longer for more chunks.
<-time.After(200 * time.Millisecond)
// add chunks to subscribe pull on the storage mock
db.MorePull(someDescriptors(1, 3, 4)...)
}()
type result struct {
addrs []swarm.Address
top uint64
}
sched := make(chan struct{})
c := make(chan result)
go func() {
close(sched)
addrs, topmost, err := ps.IntervalChunks(context.Background(), 0, 0, 5, limit)
if err != nil {
t.Errorf("internal goroutine: %v", err)
}
c <- result{addrs, topmost}
}()
<-sched // wait for goroutine to get scheduled
addrs, topmost, err := ps.IntervalChunks(context.Background(), 0, 0, 5, limit)
if err != nil {
t.Fatal(err)
}
res := <-c
if l := len(addrs); l != 5 {
t.Fatalf("want %d addrs but got %d", 5, l)
}
// highest chunk we sent had BinID 5
exp := uint64(5)
if topmost != exp {
t.Fatalf("expected topmost %d but got %d", exp, topmost)
}
if c := db.SubscribePullCalls(); c != 1 {
t.Fatalf("wanted 1 subscribe pull calls, got %d", c)
}
// check that results point to same array
sh := (*reflect.SliceHeader)(unsafe.Pointer(&res.addrs))
sh2 := (*reflect.SliceHeader)(unsafe.Pointer(&addrs))
if sh.Data != sh2.Data {
t.Fatalf("results not shared between goroutines. ptr1 %d ptr2 %d", sh.Data, sh2.Data)
}
}
func newPullStorage(t *testing.T, o ...mock.Option) (pullstorage.Storer, *mock.MockStorer) {
db := mock.NewStorer(o...)
ps := pullstorage.New(db)
......
......@@ -21,7 +21,6 @@ type MockStorer struct {
pinnedAddress []swarm.Address // Stores the pinned address
pinnedCounter []uint64 // and its respective counter. These are stored as slices to preserve the order.
subpull []storage.Descriptor
subpullCalls int
partialInterval bool
morePull chan struct{}
mtx sync.Mutex
......@@ -221,7 +220,6 @@ func (m *MockStorer) LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
}
func (m *MockStorer) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (<-chan storage.Descriptor, <-chan struct{}, func()) {
m.subpullCalls++
c := make(chan storage.Descriptor)
done := make(chan struct{})
stop := func() {
......@@ -277,10 +275,6 @@ func (m *MockStorer) SubscribePull(ctx context.Context, bin uint8, since, until
return c, m.quit, stop
}
func (m *MockStorer) SubscribePullCalls() int {
return m.subpullCalls
}
func (m *MockStorer) MorePull(d ...storage.Descriptor) {
// clear out what we already have in subpull
m.mtx.Lock()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment