Commit 4bae1a0e authored by acud's avatar acud Committed by GitHub

puller: extract maxpo and shallow bin peers (#411)

* extract maxpo and shallow bin peers
parent 5f5c6a13
......@@ -286,6 +286,7 @@ func NewBee(o Options) (*Bee, error) {
PullSync: pullSync,
Logger: logger,
})
b.pullerCloser = puller
var apiService api.Service
......
......@@ -2,7 +2,5 @@ package puller
var (
PeerIntervalKey = peerIntervalKey
Bins = &bins
ShallowBinPeers = &shallowBinPeers
IsSyncing = isSyncing
)
......@@ -19,19 +19,19 @@ import (
"github.com/ethersphere/bee/pkg/topology"
)
var (
bins = uint8(16)
const defaultShallowBinPeers = 2
// how many peers per bin do we want to sync with outside of depth
shallowBinPeers = 2
logMore = false // enable this to get more logging
var (
logMore = false // enable this to get more logging
)
type Options struct {
StateStore storage.StateStorer
Topology topology.Driver
PullSync pullsync.Interface
Logger logging.Logger
StateStore storage.StateStorer
Topology topology.Driver
PullSync pullsync.Interface
Logger logging.Logger
Bins uint8
ShallowBinPeers int
}
type Puller struct {
......@@ -40,7 +40,8 @@ type Puller struct {
statestore storage.StateStorer
intervalMtx sync.Mutex
syncer pullsync.Interface
logger logging.Logger
logger logging.Logger
syncPeers []map[string]*syncPeer // index is bin, map key is peer address
syncPeersMtx sync.Mutex
......@@ -50,20 +51,36 @@ type Puller struct {
quit chan struct{}
wg sync.WaitGroup
bins uint8 // how many bins do we support
shallowBinPeers int // how many peers per bin do we want to sync with outside of depth
}
func New(o Options) *Puller {
var (
bins uint8 = swarm.MaxBins
shallowBinPeers int = defaultShallowBinPeers
)
if o.Bins != 0 {
bins = o.Bins
}
if o.ShallowBinPeers != 0 {
shallowBinPeers = o.ShallowBinPeers
}
p := &Puller{
statestore: o.StateStore,
topology: o.Topology,
syncer: o.PullSync,
logger: o.Logger,
cursors: make(map[string][]uint64),
cursors: make(map[string][]uint64),
syncPeers: make([]map[string]*syncPeer, bins),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
bins: bins,
shallowBinPeers: shallowBinPeers,
}
for i := uint8(0); i < bins; i++ {
......@@ -134,9 +151,9 @@ func (p *Puller) manage() {
if po < depth {
// outside of depth, sync peerPO bin only
if _, ok := bp[peerAddr.String()]; !ok {
if syncing < shallowBinPeers {
if syncing < p.shallowBinPeers {
// peer not syncing yet and we still need more peers in this bin
bp[peerAddr.String()] = newSyncPeer(peerAddr)
bp[peerAddr.String()] = newSyncPeer(peerAddr, p.bins)
peerEntry := peer{addr: peerAddr, po: po}
peersToSync = append(peersToSync, peerEntry)
}
......@@ -149,7 +166,7 @@ func (p *Puller) manage() {
// within depth, sync everything >= depth
if _, ok := bp[peerAddr.String()]; !ok {
// we're not syncing with this peer yet, start doing so
bp[peerAddr.String()] = newSyncPeer(peerAddr)
bp[peerAddr.String()] = newSyncPeer(peerAddr, p.bins)
peerEntry := peer{addr: peerAddr, po: po}
peersToSync = append(peersToSync, peerEntry)
} else {
......@@ -215,7 +232,7 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
// within depth
var want, dontWant []uint8
for i := d; i < bins; i++ {
for i := d; i < p.bins; i++ {
if i == 0 {
continue
}
......@@ -257,7 +274,7 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
dontWant = []uint8{0} // never want bin 0
)
for i := uint8(0); i < bins; i++ {
for i := uint8(0); i < p.bins; i++ {
if i == want {
continue
}
......@@ -451,7 +468,7 @@ func (p *Puller) Close() error {
close(p.quit)
p.syncPeersMtx.Lock()
defer p.syncPeersMtx.Unlock()
for i := uint8(0); i < bins; i++ {
for i := uint8(0); i < p.bins; i++ {
binPeers := p.syncPeers[i]
for _, peer := range binPeers {
peer.Lock()
......@@ -534,7 +551,7 @@ type syncPeer struct {
sync.Mutex
}
func newSyncPeer(addr swarm.Address) *syncPeer {
func newSyncPeer(addr swarm.Address, bins uint8) *syncPeer {
return &syncPeer{
address: addr,
binCancelFuncs: make(map[uint8]func(), bins),
......
......@@ -37,20 +37,14 @@ var (
// then that adding another peer at the same po
// does not start another syncing session
func TestOneSync(t *testing.T) {
defer func(b uint8, p int) {
*puller.Bins = b
*puller.ShallowBinPeers = p
}(*puller.Bins, *puller.ShallowBinPeers)
*puller.Bins = 3
*puller.ShallowBinPeers = 1
var (
addr = test.RandomAddress()
addr2 = test.RandomAddress()
cursors = []uint64{1000, 1000, 1000}
liveReplies = []uint64{1}
addr = test.RandomAddress()
addr2 = test.RandomAddress()
cursors = []uint64{1000, 1000, 1000}
liveReplies = []uint64{1}
shallowBinPeers = 1
)
puller, _, kad, pullsync := newPuller(opts{
kad: []mockk.Option{
mockk.WithEachPeerRevCalls(
......@@ -58,7 +52,9 @@ func TestOneSync(t *testing.T) {
mockk.AddrTuple{Addr: addr2, PO: 1},
), mockk.WithDepth(2),
},
pullSync: []mockps.Option{mockps.WithCursors(cursors), mockps.WithLiveSyncReplies(liveReplies...)},
pullSync: []mockps.Option{mockps.WithCursors(cursors), mockps.WithLiveSyncReplies(liveReplies...)},
bins: 3,
shallowBinPeers: &shallowBinPeers,
})
defer puller.Close()
defer pullsync.Close()
......@@ -75,11 +71,6 @@ func TestOneSync(t *testing.T) {
}
func TestSyncFlow_PeerOutsideDepth_Live(t *testing.T) {
defer func(b uint8) {
*puller.Bins = b
}(*puller.Bins)
*puller.Bins = 5
addr := test.RandomAddress()
for _, tc := range []struct {
......@@ -111,6 +102,7 @@ func TestSyncFlow_PeerOutsideDepth_Live(t *testing.T) {
), mockk.WithDepth(2),
},
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithLiveSyncReplies(tc.liveReplies...)},
bins: 5,
})
t.Cleanup(func() {
pullsync.Close()
......@@ -133,11 +125,6 @@ func TestSyncFlow_PeerOutsideDepth_Live(t *testing.T) {
}
func TestSyncFlow_PeerOutsideDepth_Historical(t *testing.T) {
defer func(b uint8) {
*puller.Bins = b
}(*puller.Bins)
*puller.Bins = 5
addr := test.RandomAddress()
for _, tc := range []struct {
......@@ -192,6 +179,7 @@ func TestSyncFlow_PeerOutsideDepth_Historical(t *testing.T) {
), mockk.WithDepth(2),
},
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithAutoReply(), mockps.WithLiveSyncBlock()},
bins: 5,
})
defer puller.Close()
defer pullsync.Close()
......@@ -214,13 +202,7 @@ func TestSyncFlow_PeerOutsideDepth_Historical(t *testing.T) {
}
func TestSyncFlow_PeerWithinDepth_Live(t *testing.T) {
defer func(b uint8) {
*puller.Bins = b
}(*puller.Bins)
*puller.Bins = 5
addr := test.RandomAddress()
const max = math.MaxUint64
for _, tc := range []struct {
name string // name of test
......@@ -245,6 +227,7 @@ func TestSyncFlow_PeerWithinDepth_Live(t *testing.T) {
), mockk.WithDepth(2),
},
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithLateSyncReply(tc.liveReplies...)},
bins: 5,
})
defer puller.Close()
defer pullsync.Close()
......@@ -277,6 +260,7 @@ func TestPeerDisconnected(t *testing.T) {
), mockk.WithDepthCalls(2, 2, 2), // peer moved from out of depth to depth
},
pullSync: []mockps.Option{mockps.WithCursors(cursors), mockps.WithLiveSyncBlock()},
bins: 5,
})
t.Cleanup(func() {
pullsync.Close()
......@@ -299,11 +283,6 @@ func TestPeerDisconnected(t *testing.T) {
}
func TestDepthChange(t *testing.T) {
defer func(b uint8) {
*puller.Bins = b
}(*puller.Bins)
*puller.Bins = 5
var (
addr = test.RandomAddress()
interval = "[[1 1]]"
......@@ -375,6 +354,7 @@ func TestDepthChange(t *testing.T) {
), mockk.WithDepthCalls(tc.depths...), // peer moved from out of depth to depth
},
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithLateSyncReply(tc.syncReplies...)},
bins: 5,
})
defer puller.Close()
defer pullsync.Close()
......@@ -403,7 +383,6 @@ func TestDepthChange(t *testing.T) {
func checkIntervals(t *testing.T, s storage.StateStorer, addr swarm.Address, expInterval string, bin uint8) {
t.Helper()
key := puller.PeerIntervalKey(addr, bin)
i := &intervalstore.Intervals{}
err := s.Get(key, i)
......@@ -434,7 +413,6 @@ func checkNotFound(t *testing.T, s storage.StateStorer, addr swarm.Address, bin
func checkCalls(t *testing.T, expCalls []c, calls []mockps.SyncCall) {
t.Helper()
exp := len(expCalls)
if l := len(calls); l != exp {
t.Fatalf("expected %d calls but got %d. calls: %v", exp, l, calls)
......@@ -457,7 +435,6 @@ func checkCalls(t *testing.T, expCalls []c, calls []mockps.SyncCall) {
// so the call list in the test is no longer expected to be in order
func checkCallsUnordered(t *testing.T, expCalls []c, calls []mockps.SyncCall) {
t.Helper()
exp := len(expCalls)
if l := len(calls); l != exp {
t.Fatalf("expected %d calls but got %d. calls: %v", exp, l, calls)
......@@ -568,8 +545,10 @@ func waitLiveSyncCalledTimes(t *testing.T, ps *mockps.PullSyncMock, addr swarm.A
}
type opts struct {
pullSync []mockps.Option
kad []mockk.Option
pullSync []mockps.Option
kad []mockk.Option
bins uint8
shallowBinPeers *int
}
func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *mockps.PullSyncMock) {
......@@ -583,6 +562,10 @@ func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *moc
StateStore: s,
PullSync: ps,
Logger: logger,
Bins: ops.bins,
}
if ops.shallowBinPeers != nil {
o.ShallowBinPeers = *ops.shallowBinPeers
}
return puller.New(o), s, kad, ps
}
......
......@@ -18,6 +18,7 @@ const (
ChunkSize = SectionSize * Branches
HashSize = 32
MaxPO uint8 = 15
MaxBins = MaxPO + 1
)
// Address represents an address in Swarm metric space of
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment