Commit 85b36366 authored by Esad Akar's avatar Esad Akar Committed by GitHub

refactor: pslice variadic add (#1887)

parent 441d5a2c
...@@ -66,7 +66,7 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer { ...@@ -66,7 +66,7 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer {
settlement := swapmock.New(o.SettlementOpts...) settlement := swapmock.New(o.SettlementOpts...)
chequebook := chequebookmock.NewChequebook(o.ChequebookOpts...) chequebook := chequebookmock.NewChequebook(o.ChequebookOpts...)
swapserv := swapmock.New(o.SwapOpts...) swapserv := swapmock.New(o.SwapOpts...)
ln := lightnode.NewContainer() ln := lightnode.NewContainer(o.Overlay)
s := debugapi.New(o.Overlay, o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, o.CORSAllowedOrigins) s := debugapi.New(o.Overlay, o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, o.CORSAllowedOrigins)
s.Configure(o.P2P, o.Pingpong, topologyDriver, ln, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook, o.BatchStore) s.Configure(o.P2P, o.Pingpong, topologyDriver, ln, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook, o.BatchStore)
ts := httptest.NewServer(s) ts := httptest.NewServer(s)
...@@ -133,7 +133,7 @@ func TestServer_Configure(t *testing.T) { ...@@ -133,7 +133,7 @@ func TestServer_Configure(t *testing.T) {
settlement := swapmock.New(o.SettlementOpts...) settlement := swapmock.New(o.SettlementOpts...)
chequebook := chequebookmock.NewChequebook(o.ChequebookOpts...) chequebook := chequebookmock.NewChequebook(o.ChequebookOpts...)
swapserv := swapmock.New(o.SwapOpts...) swapserv := swapmock.New(o.SwapOpts...)
ln := lightnode.NewContainer() ln := lightnode.NewContainer(o.Overlay)
s := debugapi.New(o.Overlay, o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, nil) s := debugapi.New(o.Overlay, o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, nil)
ts := httptest.NewServer(s) ts := httptest.NewServer(s)
t.Cleanup(ts.Close) t.Cleanup(ts.Close)
......
...@@ -45,7 +45,7 @@ var ( ...@@ -45,7 +45,7 @@ var (
type Service struct { type Service struct {
streamer p2p.Streamer streamer p2p.Streamer
addressBook addressbook.GetPutter addressBook addressbook.GetPutter
addPeersHandler func(context.Context, ...swarm.Address) error addPeersHandler func(...swarm.Address)
networkID uint64 networkID uint64
logger logging.Logger logger logging.Logger
metrics metrics metrics metrics
...@@ -98,7 +98,7 @@ func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, p ...@@ -98,7 +98,7 @@ func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, p
return nil return nil
} }
func (s *Service) SetAddPeersHandler(h func(ctx context.Context, addr ...swarm.Address) error) { func (s *Service) SetAddPeersHandler(h func(addr ...swarm.Address)) {
s.addPeersHandler = h s.addPeersHandler = h
} }
...@@ -182,9 +182,7 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St ...@@ -182,9 +182,7 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St
} }
if s.addPeersHandler != nil { if s.addPeersHandler != nil {
if err := s.addPeersHandler(ctx, peers...); err != nil { s.addPeersHandler(peers...)
return err
}
} }
return nil return nil
......
...@@ -290,7 +290,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -290,7 +290,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
) )
} }
lightNodes := lightnode.NewContainer() lightNodes := lightnode.NewContainer(swarmAddress)
txHash, err := getTxHash(stateStore, logger, o) txHash, err := getTxHash(stateStore, logger, o)
if err != nil { if err != nil {
......
...@@ -69,7 +69,7 @@ func newService(t *testing.T, networkID uint64, o libp2pServiceOpts) (s *libp2p. ...@@ -69,7 +69,7 @@ func newService(t *testing.T, networkID uint64, o libp2pServiceOpts) (s *libp2p.
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
lightnodes := lightnode.NewContainer() lightnodes := lightnode.NewContainer(overlay)
opts := o.libp2pOpts opts := o.libp2pOpts
opts.Transaction = []byte(hexutil.EncodeUint64(o.PrivateKey.Y.Uint64())) opts.Transaction = []byte(hexutil.EncodeUint64(o.PrivateKey.Y.Uint64()))
......
...@@ -125,8 +125,8 @@ func New( ...@@ -125,8 +125,8 @@ func New(
saturationFunc: o.SaturationFunc, saturationFunc: o.SaturationFunc,
bitSuffixLength: o.BitSuffixLength, bitSuffixLength: o.BitSuffixLength,
commonBinPrefixes: make([][]swarm.Address, int(swarm.MaxBins)), commonBinPrefixes: make([][]swarm.Address, int(swarm.MaxBins)),
connectedPeers: pslice.New(int(swarm.MaxBins)), connectedPeers: pslice.New(int(swarm.MaxBins), base),
knownPeers: pslice.New(int(swarm.MaxBins)), knownPeers: pslice.New(int(swarm.MaxBins), base),
bootnodes: o.Bootnodes, bootnodes: o.Bootnodes,
manageC: make(chan struct{}, 1), manageC: make(chan struct{}, 1),
waitNext: waitnext.New(), waitNext: waitnext.New(),
...@@ -335,7 +335,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, ...@@ -335,7 +335,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
switch { switch {
case errors.Is(err, addressbook.ErrNotFound): case errors.Is(err, addressbook.ErrNotFound):
k.logger.Debugf("kademlia: empty address book entry for peer %q", peer.addr) k.logger.Debugf("kademlia: empty address book entry for peer %q", peer.addr)
k.knownPeers.Remove(peer.addr, swarm.Proximity(k.base.Bytes(), peer.addr.Bytes())) k.knownPeers.Remove(peer.addr)
return return
case err != nil: case err != nil:
k.logger.Debugf("kademlia: failed to get address book entry for peer %q: %v", peer.addr, err) k.logger.Debugf("kademlia: failed to get address book entry for peer %q: %v", peer.addr, err)
...@@ -344,7 +344,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, ...@@ -344,7 +344,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
remove := func(peer *peerConnInfo) { remove := func(peer *peerConnInfo) {
k.waitNext.Remove(peer.addr) k.waitNext.Remove(peer.addr)
k.knownPeers.Remove(peer.addr, peer.po) k.knownPeers.Remove(peer.addr)
if err := k.addressBook.Remove(peer.addr); err != nil { if err := k.addressBook.Remove(peer.addr); err != nil {
k.logger.Debugf("kademlia: could not remove peer %q from addressbook", peer.addr) k.logger.Debugf("kademlia: could not remove peer %q from addressbook", peer.addr)
} }
...@@ -367,7 +367,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, ...@@ -367,7 +367,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
k.waitNext.Set(peer.addr, time.Now().Add(shortRetry), 0) k.waitNext.Set(peer.addr, time.Now().Add(shortRetry), 0)
k.connectedPeers.Add(peer.addr, peer.po) k.connectedPeers.Add(peer.addr)
k.collector.Record(peer.addr, metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionOutbound)) k.collector.Record(peer.addr, metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionOutbound))
...@@ -492,7 +492,8 @@ func (k *Kad) Start(ctx context.Context) error { ...@@ -492,7 +492,8 @@ func (k *Kad) Start(ctx context.Context) error {
return fmt.Errorf("addressbook overlays: %w", err) return fmt.Errorf("addressbook overlays: %w", err)
} }
return k.AddPeers(ctx, addresses...) k.AddPeers(addresses...)
return nil
} }
func (k *Kad) connectBootnodes(ctx context.Context) { func (k *Kad) connectBootnodes(ctx context.Context) {
...@@ -672,7 +673,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -672,7 +673,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts { if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts {
k.waitNext.Remove(peer) k.waitNext.Remove(peer)
k.knownPeers.Remove(peer, swarm.Proximity(k.base.Bytes(), peer.Bytes())) k.knownPeers.Remove(peer)
if err := k.addressBook.Remove(peer); err != nil { if err := k.addressBook.Remove(peer); err != nil {
k.logger.Debugf("could not remove peer from addressbook: %q", peer) k.logger.Debugf("could not remove peer from addressbook: %q", peer)
} }
...@@ -737,17 +738,9 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address) error { ...@@ -737,17 +738,9 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address) error {
// AddPeers adds peers to the knownPeers list. // AddPeers adds peers to the knownPeers list.
// This does not guarantee that a connection will immediately // This does not guarantee that a connection will immediately
// be made to the peer. // be made to the peer.
func (k *Kad) AddPeers(ctx context.Context, addrs ...swarm.Address) error { func (k *Kad) AddPeers(addrs ...swarm.Address) {
for _, addr := range addrs { k.knownPeers.Add(addrs...)
if k.knownPeers.Exists(addr) {
continue
}
po := swarm.Proximity(k.base.Bytes(), addr.Bytes())
k.knownPeers.Add(addr, po)
}
k.notifyManageLoop() k.notifyManageLoop()
return nil
} }
func (k *Kad) Pick(peer p2p.Peer) bool { func (k *Kad) Pick(peer p2p.Peer) bool {
...@@ -795,10 +788,8 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error { ...@@ -795,10 +788,8 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
return err return err
} }
po := swarm.Proximity(k.base.Bytes(), addr.Bytes()) k.knownPeers.Add(addr)
k.connectedPeers.Add(addr)
k.knownPeers.Add(addr, po)
k.connectedPeers.Add(addr, po)
k.collector.Record(addr, metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionInbound)) k.collector.Record(addr, metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionInbound))
...@@ -818,8 +809,7 @@ func (k *Kad) Disconnected(peer p2p.Peer) { ...@@ -818,8 +809,7 @@ func (k *Kad) Disconnected(peer p2p.Peer) {
k.logger.Debugf("kademlia: disconnected peer %s", peer.Address) k.logger.Debugf("kademlia: disconnected peer %s", peer.Address)
po := swarm.Proximity(k.base.Bytes(), peer.Address.Bytes()) k.connectedPeers.Remove(peer.Address)
k.connectedPeers.Remove(peer.Address, po)
k.waitNext.SetTryAfter(peer.Address, time.Now().Add(timeToRetry)) k.waitNext.SetTryAfter(peer.Address, time.Now().Add(timeToRetry))
......
...@@ -742,7 +742,7 @@ func TestAddressBookPrune(t *testing.T) { ...@@ -742,7 +742,7 @@ func TestAddressBookPrune(t *testing.T) {
} }
// add non connectable peer, check connection and failed connection counters // add non connectable peer, check connection and failed connection counters
_ = kad.AddPeers(context.Background(), nonConnPeer.Overlay) kad.AddPeers(nonConnPeer.Overlay)
waitCounter(t, &conns, 0) waitCounter(t, &conns, 0)
waitCounter(t, &failedConns, 1) waitCounter(t, &failedConns, 1)
...@@ -833,7 +833,7 @@ func TestAddressBookQuickPrune(t *testing.T) { ...@@ -833,7 +833,7 @@ func TestAddressBookQuickPrune(t *testing.T) {
waitCounter(t, &failedConns, 0) waitCounter(t, &failedConns, 0)
// add non connectable peer, check connection and failed connection counters // add non connectable peer, check connection and failed connection counters
_ = kad.AddPeers(context.Background(), nonConnPeer.Overlay) kad.AddPeers(nonConnPeer.Overlay)
waitCounter(t, &conns, 0) waitCounter(t, &conns, 0)
waitCounter(t, &failedConns, 1) waitCounter(t, &failedConns, 1)
...@@ -1269,7 +1269,7 @@ func addOne(t *testing.T, signer beeCrypto.Signer, k *kademlia.Kad, ab addressbo ...@@ -1269,7 +1269,7 @@ func addOne(t *testing.T, signer beeCrypto.Signer, k *kademlia.Kad, ab addressbo
if err := ab.Put(peer, *bzzAddr); err != nil { if err := ab.Put(peer, *bzzAddr); err != nil {
t.Fatal(err) t.Fatal(err)
} }
_ = k.AddPeers(context.Background(), peer) k.AddPeers(peer)
} }
func add(t *testing.T, signer beeCrypto.Signer, k *kademlia.Kad, ab addressbook.Putter, peers []swarm.Address, offset, number int) { func add(t *testing.T, signer beeCrypto.Signer, k *kademlia.Kad, ab addressbook.Putter, peers []swarm.Address, offset, number int) {
......
...@@ -60,7 +60,7 @@ func NewMockKademlia(o ...Option) *Mock { ...@@ -60,7 +60,7 @@ func NewMockKademlia(o ...Option) *Mock {
// AddPeers is called when a peers are added to the topology backlog // AddPeers is called when a peers are added to the topology backlog
// for further processing by connectivity strategy. // for further processing by connectivity strategy.
func (m *Mock) AddPeers(ctx context.Context, addr ...swarm.Address) error { func (m *Mock) AddPeers(addr ...swarm.Address) {
panic("not implemented") // TODO: Implement panic("not implemented") // TODO: Implement
} }
......
...@@ -15,27 +15,27 @@ import ( ...@@ -15,27 +15,27 @@ import (
) )
type Container struct { type Container struct {
base swarm.Address
connectedPeers *pslice.PSlice connectedPeers *pslice.PSlice
disconnectedPeers *pslice.PSlice disconnectedPeers *pslice.PSlice
peerMu sync.Mutex peerMu sync.Mutex
} }
func NewContainer() *Container { func NewContainer(base swarm.Address) *Container {
return &Container{ return &Container{
connectedPeers: pslice.New(1), base: base,
disconnectedPeers: pslice.New(1), connectedPeers: pslice.New(1, base),
disconnectedPeers: pslice.New(1, base),
} }
} }
const defaultBin = uint8(0)
func (c *Container) Connected(ctx context.Context, peer p2p.Peer) { func (c *Container) Connected(ctx context.Context, peer p2p.Peer) {
c.peerMu.Lock() c.peerMu.Lock()
defer c.peerMu.Unlock() defer c.peerMu.Unlock()
addr := peer.Address addr := peer.Address
c.connectedPeers.Add(addr, defaultBin) c.connectedPeers.Add(addr)
c.disconnectedPeers.Remove(addr, defaultBin) c.disconnectedPeers.Remove(addr)
} }
func (c *Container) Disconnected(peer p2p.Peer) { func (c *Container) Disconnected(peer p2p.Peer) {
...@@ -44,8 +44,8 @@ func (c *Container) Disconnected(peer p2p.Peer) { ...@@ -44,8 +44,8 @@ func (c *Container) Disconnected(peer p2p.Peer) {
addr := peer.Address addr := peer.Address
if found := c.connectedPeers.Exists(addr); found { if found := c.connectedPeers.Exists(addr); found {
c.connectedPeers.Remove(addr, defaultBin) c.connectedPeers.Remove(addr)
c.disconnectedPeers.Add(addr, defaultBin) c.disconnectedPeers.Add(addr)
} }
} }
......
...@@ -11,13 +11,17 @@ import ( ...@@ -11,13 +11,17 @@ import (
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/swarm/test"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/lightnode" "github.com/ethersphere/bee/pkg/topology/lightnode"
) )
func TestContainer(t *testing.T) { func TestContainer(t *testing.T) {
base := test.RandomAddress()
t.Run("new container is empty container", func(t *testing.T) { t.Run("new container is empty container", func(t *testing.T) {
c := lightnode.NewContainer() c := lightnode.NewContainer(base)
var empty topology.BinInfo var empty topology.BinInfo
...@@ -27,7 +31,7 @@ func TestContainer(t *testing.T) { ...@@ -27,7 +31,7 @@ func TestContainer(t *testing.T) {
}) })
t.Run("can add peers to container", func(t *testing.T) { t.Run("can add peers to container", func(t *testing.T) {
c := lightnode.NewContainer() c := lightnode.NewContainer(base)
c.Connected(context.Background(), p2p.Peer{Address: swarm.NewAddress([]byte("123"))}) c.Connected(context.Background(), p2p.Peer{Address: swarm.NewAddress([]byte("123"))})
c.Connected(context.Background(), p2p.Peer{Address: swarm.NewAddress([]byte("456"))}) c.Connected(context.Background(), p2p.Peer{Address: swarm.NewAddress([]byte("456"))})
...@@ -39,7 +43,7 @@ func TestContainer(t *testing.T) { ...@@ -39,7 +43,7 @@ func TestContainer(t *testing.T) {
} }
}) })
t.Run("empty container after peer disconnect", func(t *testing.T) { t.Run("empty container after peer disconnect", func(t *testing.T) {
c := lightnode.NewContainer() c := lightnode.NewContainer(base)
peer := p2p.Peer{Address: swarm.NewAddress([]byte("123"))} peer := p2p.Peer{Address: swarm.NewAddress([]byte("123"))}
......
...@@ -67,21 +67,16 @@ func NewTopologyDriver(opts ...Option) topology.Driver { ...@@ -67,21 +67,16 @@ func NewTopologyDriver(opts ...Option) topology.Driver {
return d return d
} }
func (d *mock) AddPeers(_ context.Context, addrs ...swarm.Address) error { func (d *mock) AddPeers(addrs ...swarm.Address) {
if d.addPeersErr != nil {
return d.addPeersErr
}
d.mtx.Lock() d.mtx.Lock()
defer d.mtx.Unlock() defer d.mtx.Unlock()
d.peers = append(d.peers, addrs...) d.peers = append(d.peers, addrs...)
return nil
} }
func (d *mock) Connected(ctx context.Context, addr swarm.Address) error { func (d *mock) Connected(ctx context.Context, addr swarm.Address) error {
return d.AddPeers(ctx, addr) d.AddPeers(addr)
return nil
} }
func (d *mock) Disconnected(swarm.Address) { func (d *mock) Disconnected(swarm.Address) {
......
...@@ -18,15 +18,17 @@ import ( ...@@ -18,15 +18,17 @@ import (
type PSlice struct { type PSlice struct {
peers []swarm.Address // the slice of peers peers []swarm.Address // the slice of peers
bins []uint // the indexes of every proximity order in the peers slice, index is po, value is index of peers slice bins []uint // the indexes of every proximity order in the peers slice, index is po, value is index of peers slice
baseBytes []byte
sync.RWMutex sync.RWMutex
} }
// New creates a new PSlice. // New creates a new PSlice.
func New(maxBins int) *PSlice { func New(maxBins int, base swarm.Address) *PSlice {
return &PSlice{ return &PSlice{
peers: make([]swarm.Address, 0), peers: make([]swarm.Address, 0),
bins: make([]uint, maxBins), bins: make([]uint, maxBins),
baseBytes: base.Bytes(),
} }
} }
...@@ -165,25 +167,30 @@ func (s *PSlice) exists(addr swarm.Address) (bool, int) { ...@@ -165,25 +167,30 @@ func (s *PSlice) exists(addr swarm.Address) (bool, int) {
} }
// Add a peer at a certain PO. // Add a peer at a certain PO.
func (s *PSlice) Add(addr swarm.Address, po uint8) { func (s *PSlice) Add(addrs ...swarm.Address) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
peers, bins := s.copy(len(addrs))
for _, addr := range addrs {
if e, _ := s.exists(addr); e { if e, _ := s.exists(addr); e {
return return
} }
peers, bins := s.copy(1) po := swarm.Proximity(s.baseBytes, addr.Bytes())
peers = insertAddresses(peers, int(s.bins[po]), addr) peers = insertAddresses(peers, int(s.bins[po]), addr)
s.peers = peers s.peers = peers
incDeeper(bins, po) incDeeper(bins, po)
s.bins = bins s.bins = bins
}
} }
// Remove a peer at a certain PO. // Remove a peer at a certain PO.
func (s *PSlice) Remove(addr swarm.Address, po uint8) { func (s *PSlice) Remove(addr swarm.Address) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
...@@ -197,7 +204,7 @@ func (s *PSlice) Remove(addr swarm.Address, po uint8) { ...@@ -197,7 +204,7 @@ func (s *PSlice) Remove(addr swarm.Address, po uint8) {
peers = append(peers[:i], peers[i+1:]...) peers = append(peers[:i], peers[i+1:]...)
s.peers = peers s.peers = peers
decDeeper(bins, po) decDeeper(bins, swarm.Proximity(s.baseBytes, addr.Bytes()))
s.bins = bins s.bins = bins
} }
......
...@@ -17,8 +17,8 @@ import ( ...@@ -17,8 +17,8 @@ import (
// TestShallowestEmpty tests that ShallowestEmpty functionality works correctly. // TestShallowestEmpty tests that ShallowestEmpty functionality works correctly.
func TestShallowestEmpty(t *testing.T) { func TestShallowestEmpty(t *testing.T) {
var ( var (
ps = pslice.New(16)
base = test.RandomAddress() base = test.RandomAddress()
ps = pslice.New(16, base)
peers = make([][]swarm.Address, 16) peers = make([][]swarm.Address, 16)
) )
...@@ -31,7 +31,7 @@ func TestShallowestEmpty(t *testing.T) { ...@@ -31,7 +31,7 @@ func TestShallowestEmpty(t *testing.T) {
for i, v := range peers { for i, v := range peers {
for _, vv := range v { for _, vv := range v {
ps.Add(vv, uint8(i)) ps.Add(vv)
} }
sd, none := ps.ShallowestEmpty() sd, none := ps.ShallowestEmpty()
if i == 15 { if i == 15 {
...@@ -75,15 +75,14 @@ func TestShallowestEmpty(t *testing.T) { ...@@ -75,15 +75,14 @@ func TestShallowestEmpty(t *testing.T) {
}, },
} { } {
for _, v := range peers[tc.removePo] { for _, v := range peers[tc.removePo] {
po := swarm.Proximity(base.Bytes(), v.Bytes()) ps.Remove(v)
ps.Remove(v, po)
} }
sd, none := ps.ShallowestEmpty() sd, none := ps.ShallowestEmpty()
if sd != tc.expectShallowest || none { if sd != tc.expectShallowest || none {
t.Fatalf("empty bin mismatch got %d want %d", sd, tc.expectShallowest) t.Fatalf("empty bin mismatch got %d want %d", sd, tc.expectShallowest)
} }
} }
ps.Add(peers[0][0], 0) ps.Add(peers[0][0])
if sd, none := ps.ShallowestEmpty(); sd != 1 || none { if sd, none := ps.ShallowestEmpty(); sd != 1 || none {
t.Fatalf("expected bin 1 to be empty shallowest but got %d", sd) t.Fatalf("expected bin 1 to be empty shallowest but got %d", sd)
} }
...@@ -92,75 +91,75 @@ func TestShallowestEmpty(t *testing.T) { ...@@ -92,75 +91,75 @@ func TestShallowestEmpty(t *testing.T) {
// TestAddRemove checks that the Add, Remove and Exists methods work as expected. // TestAddRemove checks that the Add, Remove and Exists methods work as expected.
func TestAddRemove(t *testing.T) { func TestAddRemove(t *testing.T) {
var ( var (
ps = pslice.New(4)
base = test.RandomAddress() base = test.RandomAddress()
ps = pslice.New(4, base)
peers = make([]swarm.Address, 8) peers = make([]swarm.Address, 8)
) )
// 2 peers per bin // 2 peers per bin
// indexes {0,1} {2,3} {4,5} {6,7} // indexes {0,1} {2,3} {4,5} {6,7}
for i := 0; i < 8; i += 2 { for i := 0; i < 8; i += 2 {
a := test.RandomAddressAt(base, i) a := test.RandomAddressAt(base, i/2)
peers[i] = a peers[i] = a
b := test.RandomAddressAt(base, i) b := test.RandomAddressAt(base, i/2)
peers[i+1] = b peers[i+1] = b
} }
// add one // add one
ps.Add(peers[0], 0) ps.Add(peers[0])
chkLen(t, ps, 1) chkLen(t, ps, 1)
chkExists(t, ps, peers[:1]...) chkExists(t, ps, peers[:1]...)
chkNotExists(t, ps, peers[1:]...) chkNotExists(t, ps, peers[1:]...)
// check duplicates // check duplicates
ps.Add(peers[0], 0) ps.Add(peers[0])
chkLen(t, ps, 1) chkLen(t, ps, 1)
chkBins(t, ps, []uint{0, 1, 1, 1}) chkBins(t, ps, []uint{0, 1, 1, 1})
chkExists(t, ps, peers[:1]...) chkExists(t, ps, peers[:1]...)
chkNotExists(t, ps, peers[1:]...) chkNotExists(t, ps, peers[1:]...)
// check empty // check empty
ps.Remove(peers[0], 0) ps.Remove(peers[0])
chkLen(t, ps, 0) chkLen(t, ps, 0)
chkBins(t, ps, []uint{0, 0, 0, 0}) chkBins(t, ps, []uint{0, 0, 0, 0})
chkNotExists(t, ps, peers...) chkNotExists(t, ps, peers...)
// add two in bin 0 // add two in bin 0
ps.Add(peers[0], 0) ps.Add(peers[0])
ps.Add(peers[1], 0) ps.Add(peers[1])
chkLen(t, ps, 2) chkLen(t, ps, 2)
chkBins(t, ps, []uint{0, 2, 2, 2}) chkBins(t, ps, []uint{0, 2, 2, 2})
chkExists(t, ps, peers[:2]...) chkExists(t, ps, peers[:2]...)
chkNotExists(t, ps, peers[2:]...) chkNotExists(t, ps, peers[2:]...)
ps.Add(peers[2], 1) ps.Add(peers[2])
ps.Add(peers[3], 1) ps.Add(peers[3])
chkLen(t, ps, 4) chkLen(t, ps, 4)
chkBins(t, ps, []uint{0, 2, 4, 4}) chkBins(t, ps, []uint{0, 2, 4, 4})
chkExists(t, ps, peers[:4]...) chkExists(t, ps, peers[:4]...)
chkNotExists(t, ps, peers[4:]...) chkNotExists(t, ps, peers[4:]...)
ps.Remove(peers[1], 0) ps.Remove(peers[1])
chkLen(t, ps, 3) chkLen(t, ps, 3)
chkBins(t, ps, []uint{0, 1, 3, 3}) chkBins(t, ps, []uint{0, 1, 3, 3})
chkExists(t, ps, peers[0], peers[2], peers[3]) chkExists(t, ps, peers[0], peers[2], peers[3])
chkNotExists(t, ps, append([]swarm.Address{peers[1]}, peers[4:]...)...) chkNotExists(t, ps, append([]swarm.Address{peers[1]}, peers[4:]...)...)
// this should not move the last cursor // this should not move the last cursor
ps.Add(peers[7], 3) ps.Add(peers[7])
chkLen(t, ps, 4) chkLen(t, ps, 4)
chkBins(t, ps, []uint{0, 1, 3, 3}) chkBins(t, ps, []uint{0, 1, 3, 3})
chkExists(t, ps, peers[0], peers[2], peers[3], peers[7]) chkExists(t, ps, peers[0], peers[2], peers[3], peers[7])
chkNotExists(t, ps, append([]swarm.Address{peers[1]}, peers[4:7]...)...) chkNotExists(t, ps, append([]swarm.Address{peers[1]}, peers[4:7]...)...)
ps.Add(peers[5], 2) ps.Add(peers[5])
chkLen(t, ps, 5) chkLen(t, ps, 5)
chkBins(t, ps, []uint{0, 1, 3, 4}) chkBins(t, ps, []uint{0, 1, 3, 4})
chkExists(t, ps, peers[0], peers[2], peers[3], peers[5], peers[7]) chkExists(t, ps, peers[0], peers[2], peers[3], peers[5], peers[7])
chkNotExists(t, ps, []swarm.Address{peers[1], peers[4], peers[6]}...) chkNotExists(t, ps, []swarm.Address{peers[1], peers[4], peers[6]}...)
ps.Remove(peers[2], 1) ps.Remove(peers[2])
chkLen(t, ps, 4) chkLen(t, ps, 4)
chkBins(t, ps, []uint{0, 1, 2, 3}) chkBins(t, ps, []uint{0, 1, 2, 3})
chkExists(t, ps, peers[0], peers[3], peers[5], peers[7]) chkExists(t, ps, peers[0], peers[3], peers[5], peers[7])
...@@ -168,8 +167,8 @@ func TestAddRemove(t *testing.T) { ...@@ -168,8 +167,8 @@ func TestAddRemove(t *testing.T) {
p := uint8(0) p := uint8(0)
for i := 0; i < 8; i += 2 { for i := 0; i < 8; i += 2 {
ps.Remove(peers[i], p) ps.Remove(peers[i])
ps.Remove(peers[i+1], p) ps.Remove(peers[i+1])
p++ p++
} }
...@@ -182,13 +181,13 @@ func TestAddRemove(t *testing.T) { ...@@ -182,13 +181,13 @@ func TestAddRemove(t *testing.T) {
// TestIteratorError checks that error propagation works correctly in the iterators. // TestIteratorError checks that error propagation works correctly in the iterators.
func TestIteratorError(t *testing.T) { func TestIteratorError(t *testing.T) {
var ( var (
ps = pslice.New(4)
base = test.RandomAddress() base = test.RandomAddress()
ps = pslice.New(4, base)
a = test.RandomAddressAt(base, 0) a = test.RandomAddressAt(base, 0)
e = errors.New("err1") e = errors.New("err1")
) )
ps.Add(a, 0) ps.Add(a)
f := func(p swarm.Address, _ uint8) (stop, jumpToNext bool, err error) { f := func(p swarm.Address, _ uint8) (stop, jumpToNext bool, err error) {
return false, false, e return false, false, e
...@@ -202,9 +201,11 @@ func TestIteratorError(t *testing.T) { ...@@ -202,9 +201,11 @@ func TestIteratorError(t *testing.T) {
// TestIterators tests that the EachBin and EachBinRev iterators work as expected. // TestIterators tests that the EachBin and EachBinRev iterators work as expected.
func TestIterators(t *testing.T) { func TestIterators(t *testing.T) {
ps := pslice.New(4)
base := test.RandomAddress() base := test.RandomAddress()
ps := pslice.New(4, base)
peers := make([]swarm.Address, 4) peers := make([]swarm.Address, 4)
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
a := test.RandomAddressAt(base, i) a := test.RandomAddressAt(base, i)
...@@ -214,26 +215,26 @@ func TestIterators(t *testing.T) { ...@@ -214,26 +215,26 @@ func TestIterators(t *testing.T) {
testIterator(t, ps, false, false, 0, []swarm.Address{}) testIterator(t, ps, false, false, 0, []swarm.Address{})
testIteratorRev(t, ps, false, false, 0, []swarm.Address{}) testIteratorRev(t, ps, false, false, 0, []swarm.Address{})
for i, v := range peers { for _, v := range peers {
ps.Add(v, uint8(i)) ps.Add(v)
} }
testIterator(t, ps, false, false, 4, []swarm.Address{peers[3], peers[2], peers[1], peers[0]}) testIterator(t, ps, false, false, 4, []swarm.Address{peers[3], peers[2], peers[1], peers[0]})
testIteratorRev(t, ps, false, false, 4, peers) testIteratorRev(t, ps, false, false, 4, peers)
ps.Remove(peers[2], 2) ps.Remove(peers[2])
testIterator(t, ps, false, false, 3, []swarm.Address{peers[3], peers[1], peers[0]}) testIterator(t, ps, false, false, 3, []swarm.Address{peers[3], peers[1], peers[0]})
testIteratorRev(t, ps, false, false, 3, []swarm.Address{peers[0], peers[1], peers[3]}) testIteratorRev(t, ps, false, false, 3, []swarm.Address{peers[0], peers[1], peers[3]})
ps.Remove(peers[0], 0) ps.Remove(peers[0])
testIterator(t, ps, false, false, 2, []swarm.Address{peers[3], peers[1]}) testIterator(t, ps, false, false, 2, []swarm.Address{peers[3], peers[1]})
testIteratorRev(t, ps, false, false, 2, []swarm.Address{peers[1], peers[3]}) testIteratorRev(t, ps, false, false, 2, []swarm.Address{peers[1], peers[3]})
ps.Remove(peers[3], 3) ps.Remove(peers[3])
testIterator(t, ps, false, false, 1, []swarm.Address{peers[1]}) testIterator(t, ps, false, false, 1, []swarm.Address{peers[1]})
testIteratorRev(t, ps, false, false, 1, []swarm.Address{peers[1]}) testIteratorRev(t, ps, false, false, 1, []swarm.Address{peers[1]})
ps.Remove(peers[1], 1) ps.Remove(peers[1])
testIterator(t, ps, false, false, 0, []swarm.Address{}) testIterator(t, ps, false, false, 0, []swarm.Address{})
testIteratorRev(t, ps, false, false, 0, []swarm.Address{}) testIteratorRev(t, ps, false, false, 0, []swarm.Address{})
} }
...@@ -264,15 +265,17 @@ func TestBinPeers(t *testing.T) { ...@@ -264,15 +265,17 @@ func TestBinPeers(t *testing.T) {
t.Run(tc.label, func(t *testing.T) { t.Run(tc.label, func(t *testing.T) {
base := test.RandomAddress()
binPeers := make([][]swarm.Address, len(tc.peersCount)) binPeers := make([][]swarm.Address, len(tc.peersCount))
// prepare slice // prepare slice
ps := pslice.New(len(tc.peersCount)) ps := pslice.New(len(tc.peersCount), base)
for bin, peersCount := range tc.peersCount { for bin, peersCount := range tc.peersCount {
for i := 0; i < peersCount; i++ { for i := 0; i < peersCount; i++ {
peer := test.RandomAddress() peer := test.RandomAddressAt(base, bin)
binPeers[bin] = append(binPeers[bin], peer) binPeers[bin] = append(binPeers[bin], peer)
ps.Add(peer, uint8(bin)) ps.Add(peer)
} }
} }
...@@ -317,16 +320,16 @@ func isEqual(a, b []swarm.Address) bool { ...@@ -317,16 +320,16 @@ func isEqual(a, b []swarm.Address) bool {
// TestIteratorsJumpStop tests that the EachBin and EachBinRev iterators jump to next bin and stop as expected. // TestIteratorsJumpStop tests that the EachBin and EachBinRev iterators jump to next bin and stop as expected.
func TestIteratorsJumpStop(t *testing.T) { func TestIteratorsJumpStop(t *testing.T) {
ps := pslice.New(4)
base := test.RandomAddress() base := test.RandomAddress()
ps := pslice.New(4, base)
peers := make([]swarm.Address, 12) peers := make([]swarm.Address, 12)
j := 0 j := 0
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
for ii := 0; ii < 3; ii++ { for ii := 0; ii < 3; ii++ {
a := test.RandomAddressAt(base, i) a := test.RandomAddressAt(base, i)
peers[j] = a peers[j] = a
ps.Add(a, uint8(i)) ps.Add(a)
j++ j++
} }
} }
...@@ -382,16 +385,18 @@ func testIterator(t *testing.T, ps *pslice.PSlice, skipNext, stop bool, iteratio ...@@ -382,16 +385,18 @@ func testIterator(t *testing.T, ps *pslice.PSlice, skipNext, stop bool, iteratio
} }
func chkLen(t *testing.T, ps *pslice.PSlice, l int) { func chkLen(t *testing.T, ps *pslice.PSlice, l int) {
t.Helper()
if lp := ps.Length(); lp != l { if lp := ps.Length(); lp != l {
t.Fatalf("length mismatch, want %d got %d", l, lp) t.Fatalf("length mismatch, want %d got %d", l, lp)
} }
} }
func chkBins(t *testing.T, ps *pslice.PSlice, seq []uint) { func chkBins(t *testing.T, ps *pslice.PSlice, seq []uint) {
t.Helper()
pb := pslice.PSliceBins(ps) pb := pslice.PSliceBins(ps)
for i, v := range seq { for i, v := range seq {
if pb[i] != v { if pb[i] != v {
t.Fatalf("bin seq wrong, get %d want %d, index %v", pb[i], v, pb) t.Fatalf("bin seq wrong, got %d want %d, index %v", pb[i], v, pb)
} }
} }
} }
...@@ -416,13 +421,13 @@ func chkNotExists(t *testing.T, ps *pslice.PSlice, addrs ...swarm.Address) { ...@@ -416,13 +421,13 @@ func chkNotExists(t *testing.T, ps *pslice.PSlice, addrs ...swarm.Address) {
func BenchmarkAdd(b *testing.B) { func BenchmarkAdd(b *testing.B) {
var ( var (
ps = pslice.New(16)
base = test.RandomAddress() base = test.RandomAddress()
ps = pslice.New(16, base)
) )
for i := 0; i < 16; i++ { for i := 0; i < 16; i++ {
for j := 0; j < 300; j++ { for j := 0; j < 300; j++ {
ps.Add(test.RandomAddressAt(base, i), uint8(i)) ps.Add(test.RandomAddressAt(base, i))
} }
} }
...@@ -430,6 +435,6 @@ func BenchmarkAdd(b *testing.B) { ...@@ -430,6 +435,6 @@ func BenchmarkAdd(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
ps.Add(test.RandomAddressAt(base, po), po) ps.Add(test.RandomAddressAt(base, po))
} }
} }
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
package topology package topology
import ( import (
"context"
"errors" "errors"
"io" "io"
"time" "time"
...@@ -34,7 +33,7 @@ type Driver interface { ...@@ -34,7 +33,7 @@ type Driver interface {
type PeerAdder interface { type PeerAdder interface {
// AddPeers is called when peers are added to the topology backlog // AddPeers is called when peers are added to the topology backlog
AddPeers(ctx context.Context, addr ...swarm.Address) error AddPeers(addr ...swarm.Address)
} }
type ClosestPeerer interface { type ClosestPeerer interface {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment