Commit d5723357 authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Fallback to bootnodes (#558)

handle bootnodes in kademlia
parent d2b3746a
...@@ -27,11 +27,11 @@ const ( ...@@ -27,11 +27,11 @@ const (
) )
type Service struct { type Service struct {
streamer p2p.Streamer streamer p2p.Streamer
addressBook addressbook.GetPutter addressBook addressbook.GetPutter
peerHandler func(context.Context, swarm.Address) error addPeersHandler func(context.Context, ...swarm.Address) error
networkID uint64 networkID uint64
logger logging.Logger logger logging.Logger
} }
type Options struct { type Options struct {
...@@ -79,8 +79,8 @@ func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, p ...@@ -79,8 +79,8 @@ func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, p
return nil return nil
} }
func (s *Service) SetPeerAddedHandler(h func(ctx context.Context, addr swarm.Address) error) { func (s *Service) SetAddPeersHandler(h func(ctx context.Context, addr ...swarm.Address) error) {
s.peerHandler = h s.addPeersHandler = h
} }
func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swarm.Address) (err error) { func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swarm.Address) (err error) {
...@@ -134,6 +134,7 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St ...@@ -134,6 +134,7 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St
// but we still want to handle not closed stream from the other side to avoid zombie stream // but we still want to handle not closed stream from the other side to avoid zombie stream
go stream.FullClose() go stream.FullClose()
var peers []swarm.Address
for _, newPeer := range peersReq.Peers { for _, newPeer := range peersReq.Peers {
bzzAddress, err := bzz.ParseAddress(newPeer.Underlay, newPeer.Overlay, newPeer.Signature, s.networkID) bzzAddress, err := bzz.ParseAddress(newPeer.Underlay, newPeer.Overlay, newPeer.Signature, s.networkID)
if err != nil { if err != nil {
...@@ -147,10 +148,12 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St ...@@ -147,10 +148,12 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St
continue continue
} }
if s.peerHandler != nil { peers = append(peers, bzzAddress.Overlay)
if err := s.peerHandler(ctx, bzzAddress.Overlay); err != nil { }
return err
} if s.addPeersHandler != nil {
if err := s.addPeersHandler(ctx, peers...); err != nil {
return err
} }
} }
......
...@@ -15,7 +15,7 @@ was persisted in the address book and the node has been restarted). ...@@ -15,7 +15,7 @@ was persisted in the address book and the node has been restarted).
So the information has been changed, and potentially upon disconnection, So the information has been changed, and potentially upon disconnection,
the depth can travel to a shallower depth in result. the depth can travel to a shallower depth in result.
If a peer gets added through AddPeer, this does not necessarily infer If a peer gets added through AddPeers, this does not necessarily infer
an immediate depth change, since the peer might end up in the backlog for an immediate depth change, since the peer might end up in the backlog for
a long time until we actually need to connect to her. a long time until we actually need to connect to her.
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"sync" "sync"
"time" "time"
...@@ -43,6 +44,7 @@ type Options struct { ...@@ -43,6 +44,7 @@ type Options struct {
AddressBook addressbook.Interface AddressBook addressbook.Interface
P2P p2p.Service P2P p2p.Service
SaturationFunc binSaturationFunc SaturationFunc binSaturationFunc
Bootnodes []ma.Multiaddr
Logger logging.Logger Logger logging.Logger
} }
...@@ -55,11 +57,12 @@ type Kad struct { ...@@ -55,11 +57,12 @@ type Kad struct {
saturationFunc binSaturationFunc // pluggable saturation function saturationFunc binSaturationFunc // pluggable saturation function
connectedPeers *pslice.PSlice // a slice of peers sorted and indexed by po, indexes kept in `bins` connectedPeers *pslice.PSlice // a slice of peers sorted and indexed by po, indexes kept in `bins`
knownPeers *pslice.PSlice // both are po aware slice of addresses knownPeers *pslice.PSlice // both are po aware slice of addresses
depth uint8 // current neighborhood depth bootnodes []ma.Multiaddr
depthMu sync.RWMutex // protect depth changes depth uint8 // current neighborhood depth
manageC chan struct{} // trigger the manage forever loop to connect to new peers depthMu sync.RWMutex // protect depth changes
waitNext map[string]retryInfo // sanction connections to a peer, key is overlay string and value is a retry information manageC chan struct{} // trigger the manage forever loop to connect to new peers
waitNextMu sync.Mutex // synchronize map waitNext map[string]retryInfo // sanction connections to a peer, key is overlay string and value is a retry information
waitNextMu sync.Mutex // synchronize map
peerSig []chan struct{} peerSig []chan struct{}
peerSigMtx sync.Mutex peerSigMtx sync.Mutex
logger logging.Logger // logger logger logging.Logger // logger
...@@ -87,6 +90,7 @@ func New(o Options) *Kad { ...@@ -87,6 +90,7 @@ func New(o Options) *Kad {
saturationFunc: o.SaturationFunc, saturationFunc: o.SaturationFunc,
connectedPeers: pslice.New(int(swarm.MaxBins)), connectedPeers: pslice.New(int(swarm.MaxBins)),
knownPeers: pslice.New(int(swarm.MaxBins)), knownPeers: pslice.New(int(swarm.MaxBins)),
bootnodes: o.Bootnodes,
manageC: make(chan struct{}, 1), manageC: make(chan struct{}, 1),
waitNext: make(map[string]retryInfo), waitNext: make(map[string]retryInfo),
logger: o.Logger, logger: o.Logger,
...@@ -94,8 +98,7 @@ func New(o Options) *Kad { ...@@ -94,8 +98,7 @@ func New(o Options) *Kad {
done: make(chan struct{}), done: make(chan struct{}),
wg: sync.WaitGroup{}, wg: sync.WaitGroup{},
} }
k.wg.Add(1)
go k.manage()
return k return k
} }
...@@ -132,6 +135,7 @@ func (k *Kad) manage() { ...@@ -132,6 +135,7 @@ func (k *Kad) manage() {
return return
default: default:
} }
err := k.knownPeers.EachBinRev(func(peer swarm.Address, po uint8) (bool, bool, error) { err := k.knownPeers.EachBinRev(func(peer swarm.Address, po uint8) (bool, bool, error) {
if k.connectedPeers.Exists(peer) { if k.connectedPeers.Exists(peer) {
return false, false, nil return false, false, nil
...@@ -211,10 +215,58 @@ func (k *Kad) manage() { ...@@ -211,10 +215,58 @@ func (k *Kad) manage() {
k.logger.Errorf("kademlia manage loop iterator: %v", err) k.logger.Errorf("kademlia manage loop iterator: %v", err)
} }
} }
if k.connectedPeers.Length() == 0 {
k.connectBootnodes(ctx)
}
} }
} }
} }
func (k *Kad) Start(ctx context.Context) error {
k.wg.Add(1)
go k.manage()
addresses, err := k.addressBook.Overlays()
if err != nil {
return fmt.Errorf("addressbook overlays: %w", err)
}
return k.AddPeers(ctx, addresses...)
}
func (k *Kad) connectBootnodes(ctx context.Context) {
var wg sync.WaitGroup
for _, addr := range k.bootnodes {
wg.Add(1)
go func(a ma.Multiaddr) {
defer wg.Done()
var count int
if _, err := p2p.Discover(ctx, a, func(addr ma.Multiaddr) (stop bool, err error) {
k.logger.Tracef("connecting to bootnode %s", addr)
_, err = k.p2p.ConnectNotify(ctx, addr)
if err != nil {
if !errors.Is(err, p2p.ErrAlreadyConnected) {
k.logger.Debugf("connect fail %s: %v", addr, err)
k.logger.Warningf("connect to bootnode %s", addr)
}
return false, nil
}
k.logger.Tracef("connected to bootnode %s", addr)
count++
// connect to max 3 bootnodes
return count > 3, nil
}); err != nil {
k.logger.Debugf("discover fail %s: %v", a, err)
k.logger.Warningf("discover to bootnode %s", a)
return
}
}(addr)
}
wg.Wait()
}
// binSaturated indicates whether a certain bin is saturated or not. // binSaturated indicates whether a certain bin is saturated or not.
// when a bin is not saturated it means we would like to proactively // when a bin is not saturated it means we would like to proactively
// initiate connections to other peers in the bin. // initiate connections to other peers in the bin.
...@@ -362,16 +414,18 @@ func (k *Kad) announce(ctx context.Context, peer swarm.Address) error { ...@@ -362,16 +414,18 @@ func (k *Kad) announce(ctx context.Context, peer swarm.Address) error {
return err return err
} }
// AddPeer adds a peer to the knownPeers list. // AddPeers adds peers to the knownPeers list.
// This does not guarantee that a connection will immediately // This does not guarantee that a connection will immediately
// be made to the peer. // be made to the peer.
func (k *Kad) AddPeer(ctx context.Context, addr swarm.Address) error { func (k *Kad) AddPeers(ctx context.Context, addrs ...swarm.Address) error {
if k.knownPeers.Exists(addr) { for _, addr := range addrs {
return nil if k.knownPeers.Exists(addr) {
} continue
}
po := swarm.Proximity(k.base.Bytes(), addr.Bytes()) po := swarm.Proximity(k.base.Bytes(), addr.Bytes())
k.knownPeers.Add(addr, po) k.knownPeers.Add(addr, po)
}
select { select {
case k.manageC <- struct{}{}: case k.manageC <- struct{}{}:
......
This diff is collapsed.
...@@ -58,9 +58,9 @@ func NewMockKademlia(o ...Option) *Mock { ...@@ -58,9 +58,9 @@ func NewMockKademlia(o ...Option) *Mock {
return m return m
} }
// AddPeer is called when a peer is added to the topology backlog // AddPeers is called when a peers are added to the topology backlog
// for further processing by connectivity strategy. // for further processing by connectivity strategy.
func (m *Mock) AddPeer(ctx context.Context, addr swarm.Address) error { func (m *Mock) AddPeers(ctx context.Context, addr ...swarm.Address) error {
panic("not implemented") // TODO: Implement panic("not implemented") // TODO: Implement
} }
......
...@@ -6,14 +6,12 @@ package node ...@@ -6,14 +6,12 @@ package node
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"log" "log"
"net" "net"
"net/http" "net/http"
"path/filepath" "path/filepath"
"sync"
"time" "time"
"github.com/ethersphere/bee/pkg/accounting" "github.com/ethersphere/bee/pkg/accounting"
...@@ -31,7 +29,6 @@ import ( ...@@ -31,7 +29,6 @@ import (
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/metrics" "github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/netstore" "github.com/ethersphere/bee/pkg/netstore"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/puller" "github.com/ethersphere/bee/pkg/puller"
...@@ -202,10 +199,22 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) { ...@@ -202,10 +199,22 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
return nil, fmt.Errorf("hive service: %w", err) return nil, fmt.Errorf("hive service: %w", err)
} }
topologyDriver := kademlia.New(kademlia.Options{Base: address, Discovery: hive, AddressBook: addressbook, P2P: p2ps, Logger: logger}) var bootnodes []ma.Multiaddr
b.topologyCloser = topologyDriver for _, a := range o.Bootnodes {
hive.SetPeerAddedHandler(topologyDriver.AddPeer) addr, err := ma.NewMultiaddr(a)
p2ps.AddNotifier(topologyDriver) if err != nil {
logger.Debugf("multiaddress fail %s: %v", a, err)
logger.Warningf("invalid bootnode address %s", a)
continue
}
bootnodes = append(bootnodes, addr)
}
kad := kademlia.New(kademlia.Options{Base: address, Discovery: hive, AddressBook: addressbook, P2P: p2ps, Bootnodes: bootnodes, Logger: logger})
b.topologyCloser = kad
hive.SetAddPeersHandler(kad.AddPeers)
p2ps.AddNotifier(kad)
addrs, err := p2ps.Addresses() addrs, err := p2ps.Addresses()
if err != nil { if err != nil {
return nil, fmt.Errorf("get server addresses: %w", err) return nil, fmt.Errorf("get server addresses: %w", err)
...@@ -255,7 +264,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) { ...@@ -255,7 +264,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
retrieve := retrieval.New(retrieval.Options{ retrieve := retrieval.New(retrieval.Options{
Streamer: p2ps, Streamer: p2ps,
ChunkPeerer: topologyDriver, ChunkPeerer: kad,
Logger: logger, Logger: logger,
Accounting: acc, Accounting: acc,
Pricer: accounting.NewFixedPricer(address, 10), Pricer: accounting.NewFixedPricer(address, 10),
...@@ -274,7 +283,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) { ...@@ -274,7 +283,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
pushSyncProtocol := pushsync.New(pushsync.Options{ pushSyncProtocol := pushsync.New(pushsync.Options{
Streamer: p2ps, Streamer: p2ps,
Storer: storer, Storer: storer,
ClosestPeerer: topologyDriver, ClosestPeerer: kad,
Tagger: tagg, Tagger: tagg,
Logger: logger, Logger: logger,
}) })
...@@ -285,7 +294,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) { ...@@ -285,7 +294,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
pushSyncPusher := pusher.New(pusher.Options{ pushSyncPusher := pusher.New(pusher.Options{
Storer: storer, Storer: storer,
PeerSuggester: topologyDriver, PeerSuggester: kad,
PushSyncer: pushSyncProtocol, PushSyncer: pushSyncProtocol,
Tagger: tagg, Tagger: tagg,
Logger: logger, Logger: logger,
...@@ -307,7 +316,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) { ...@@ -307,7 +316,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
puller := puller.New(puller.Options{ puller := puller.New(puller.Options{
StateStore: stateStore, StateStore: stateStore,
Topology: topologyDriver, Topology: kad,
PullSync: pullSync, PullSync: pullSync,
Logger: logger, Logger: logger,
}) })
...@@ -348,7 +357,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) { ...@@ -348,7 +357,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
Pingpong: pingPong, Pingpong: pingPong,
Logger: logger, Logger: logger,
Tracer: tracer, Tracer: tracer,
TopologyDriver: topologyDriver, TopologyDriver: kad,
Storer: storer, Storer: storer,
Tags: tagg, Tags: tagg,
Accounting: acc, Accounting: acc,
...@@ -387,59 +396,8 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) { ...@@ -387,59 +396,8 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
b.debugAPIServer = debugAPIServer b.debugAPIServer = debugAPIServer
} }
addresses, err := addressbook.Overlays() if err := kad.Start(p2pCtx); err != nil {
if err != nil { return nil, err
return nil, fmt.Errorf("addressbook overlays: %w", err)
}
var count int32
// add the peers to topology and allow it to connect independently
for _, o := range addresses {
err = topologyDriver.AddPeer(p2pCtx, o)
if err != nil {
logger.Debugf("topology add peer from addressbook: %v", err)
} else {
count++
}
}
// Connect bootnodes if the address book is clean
if count == 0 {
var wg sync.WaitGroup
for _, a := range o.Bootnodes {
wg.Add(1)
go func(a string) {
defer wg.Done()
addr, err := ma.NewMultiaddr(a)
if err != nil {
logger.Debugf("multiaddress fail %s: %v", a, err)
logger.Warningf("connect to bootnode %s", a)
return
}
var count int
if _, err := p2p.Discover(p2pCtx, addr, func(addr ma.Multiaddr) (stop bool, err error) {
logger.Tracef("connecting to bootnode %s", addr)
_, err = p2ps.ConnectNotify(p2pCtx, addr)
if err != nil {
if !errors.Is(err, p2p.ErrAlreadyConnected) {
logger.Debugf("connect fail %s: %v", addr, err)
logger.Warningf("connect to bootnode %s", addr)
}
return false, nil
}
logger.Tracef("connected to bootnode %s", addr)
count++
// connect to max 3 bootnodes
return count > 3, nil
}); err != nil {
logger.Debugf("discover fail %s: %v", a, err)
logger.Warningf("discover to bootnode %s", a)
return
}
}(a)
}
wg.Wait()
} }
return b, nil return b, nil
......
...@@ -40,75 +40,99 @@ type driver struct { ...@@ -40,75 +40,99 @@ type driver struct {
backoffActive bool backoffActive bool
logger logging.Logger logger logging.Logger
mtx sync.Mutex mtx sync.Mutex
addPeerCh chan swarm.Address
quit chan struct{} quit chan struct{}
} }
func New(disc discovery.Driver, addressBook addressbook.Interface, p2pService p2p.Service, logger logging.Logger, baseAddress swarm.Address) topology.Driver { func New(disc discovery.Driver, addressBook addressbook.Interface, p2pService p2p.Service, logger logging.Logger, baseAddress swarm.Address) topology.Driver {
return &driver{ d := &driver{
base: baseAddress, base: baseAddress,
discovery: disc, discovery: disc,
addressBook: addressBook, addressBook: addressBook,
p2pService: p2pService, p2pService: p2pService,
receivedPeers: make(map[string]struct{}), receivedPeers: make(map[string]struct{}),
logger: logger, logger: logger,
addPeerCh: make(chan swarm.Address, 64),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
}
// AddPeer adds a new peer to the topology driver. go d.manage()
// The peer would be subsequently broadcasted to all connected peers. return d
// All connected peers are also broadcasted to the new peer. }
func (d *driver) AddPeer(ctx context.Context, addr swarm.Address) error {
d.mtx.Lock()
if _, ok := d.receivedPeers[addr.ByteString()]; ok {
d.mtx.Unlock()
return nil
}
d.receivedPeers[addr.ByteString()] = struct{}{} func (d *driver) manage() {
d.mtx.Unlock() ctx, cancel := context.WithCancel(context.Background())
connectedPeers := d.p2pService.Peers() go func() {
bzzAddress, err := d.addressBook.Get(addr) <-d.quit
if err != nil { cancel()
if err == addressbook.ErrNotFound { }()
return topology.ErrNotFound
}
return err
}
if !isConnected(addr, connectedPeers) { for {
_, err := d.p2pService.Connect(ctx, bzzAddress.Underlay) select {
if err != nil { case <-d.quit:
return
case addr := <-d.addPeerCh:
d.mtx.Lock() d.mtx.Lock()
delete(d.receivedPeers, addr.ByteString()) if _, ok := d.receivedPeers[addr.ByteString()]; ok {
d.mtx.Unlock()
return
}
d.receivedPeers[addr.ByteString()] = struct{}{}
d.mtx.Unlock() d.mtx.Unlock()
var e *p2p.ConnectionBackoffError connectedPeers := d.p2pService.Peers()
if errors.As(err, &e) { bzzAddress, err := d.addressBook.Get(addr)
d.backoff(e.TryAfter()) if err != nil {
return err return
} }
return err
}
}
connectedAddrs := []swarm.Address{} if !isConnected(addr, connectedPeers) {
for _, addressee := range connectedPeers { _, err := d.p2pService.Connect(ctx, bzzAddress.Underlay)
// skip newly added peer if err != nil {
if addressee.Address.Equal(addr) { d.mtx.Lock()
continue delete(d.receivedPeers, addr.ByteString())
} d.mtx.Unlock()
var e *p2p.ConnectionBackoffError
if errors.As(err, &e) {
d.backoff(e.TryAfter())
}
return
}
}
connectedAddrs := []swarm.Address{}
for _, addressee := range connectedPeers {
// skip newly added peer
if addressee.Address.Equal(addr) {
continue
}
connectedAddrs = append(connectedAddrs, addressee.Address)
if err := d.discovery.BroadcastPeers(ctx, addressee.Address, addr); err != nil {
return
}
}
if len(connectedAddrs) == 0 {
return
}
_ = d.discovery.BroadcastPeers(ctx, addr, connectedAddrs...)
connectedAddrs = append(connectedAddrs, addressee.Address)
if err := d.discovery.BroadcastPeers(ctx, addressee.Address, addr); err != nil {
return err
} }
} }
}
if len(connectedAddrs) == 0 { // AddPeers adds a new peer to the topology driver.
return nil // The peer would be subsequently broadcasted to all connected peers.
// All connected peers are also broadcasted to the new peer.
func (d *driver) AddPeers(ctx context.Context, addrs ...swarm.Address) error {
for _, addr := range addrs {
d.addPeerCh <- addr
} }
return d.discovery.BroadcastPeers(ctx, addr, connectedAddrs...) return nil
} }
// ClosestPeer returns the closest connected peer we have in relation to a // ClosestPeer returns the closest connected peer we have in relation to a
...@@ -147,7 +171,7 @@ func (d *driver) ClosestPeer(addr swarm.Address) (swarm.Address, error) { ...@@ -147,7 +171,7 @@ func (d *driver) ClosestPeer(addr swarm.Address) (swarm.Address, error) {
} }
func (d *driver) Connected(ctx context.Context, addr swarm.Address) error { func (d *driver) Connected(ctx context.Context, addr swarm.Address) error {
return d.AddPeer(ctx, addr) return d.AddPeers(ctx, addr)
} }
func (_ *driver) Disconnected(swarm.Address) { func (_ *driver) Disconnected(swarm.Address) {
...@@ -223,7 +247,7 @@ func (d *driver) backoff(tryAfter time.Time) { ...@@ -223,7 +247,7 @@ func (d *driver) backoff(tryAfter time.Time) {
case <-d.quit: case <-d.quit:
return return
default: default:
if err := d.AddPeer(ctx, addr); err != nil { if err := d.AddPeers(ctx, addr); err != nil {
var e *p2p.ConnectionBackoffError var e *p2p.ConnectionBackoffError
if errors.As(err, &e) { if errors.As(err, &e) {
d.backoff(e.TryAfter()) d.backoff(e.TryAfter())
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"testing" "testing"
"time"
"github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/bzz" "github.com/ethersphere/bee/pkg/bzz"
...@@ -25,7 +26,7 @@ import ( ...@@ -25,7 +26,7 @@ import (
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
func TestAddPeer(t *testing.T) { func TestAddPeers(t *testing.T) {
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
underlay, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/7070/p2p/16Uiu2HAkx8ULY8cTXhdVAcMmLcH9AsTKz6uBQ7DPLKRjMLgBVYkS") underlay, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/7070/p2p/16Uiu2HAkx8ULY8cTXhdVAcMmLcH9AsTKz6uBQ7DPLKRjMLgBVYkS")
if err != nil { if err != nil {
...@@ -73,35 +74,12 @@ func TestAddPeer(t *testing.T) { ...@@ -73,35 +74,12 @@ func TestAddPeer(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = fullDriver.AddPeer(context.Background(), overlay) err = fullDriver.AddPeers(context.Background(), overlay)
if err != nil { if err != nil {
t.Fatalf("full conn driver returned err %s", err.Error()) t.Fatalf("full conn driver returned err %s", err.Error())
} }
if discovery.Broadcasts() != 0 { expectBroadcastsEventually(t, discovery, 0)
t.Fatalf("broadcasts expected %v, got %v ", 0, discovery.Broadcasts())
}
})
t.Run("ERROR - peer not added", func(t *testing.T) {
discovery := mock.NewDiscovery()
statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore)
p2p := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
t.Fatal("should not be called")
return nil, nil
}))
fullDriver := full.New(discovery, ab, p2p, logger, overlay)
defer fullDriver.Close()
err := fullDriver.AddPeer(context.Background(), overlay)
if !errors.Is(err, topology.ErrNotFound) {
t.Fatalf("full conn driver returned err %v", err)
}
if discovery.Broadcasts() != 0 {
t.Fatalf("broadcasts expected %v, got %v ", 0, discovery.Broadcasts())
}
}) })
t.Run("OK - connected peers - peer already connected", func(t *testing.T) { t.Run("OK - connected peers - peer already connected", func(t *testing.T) {
...@@ -129,14 +107,12 @@ func TestAddPeer(t *testing.T) { ...@@ -129,14 +107,12 @@ func TestAddPeer(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = fullDriver.AddPeer(context.Background(), alreadyConnected) err = fullDriver.AddPeers(context.Background(), alreadyConnected)
if err != nil { if err != nil {
t.Fatalf("full conn driver returned err %s", err.Error()) t.Fatalf("full conn driver returned err %s", err.Error())
} }
if discovery.Broadcasts() != 3 { expectBroadcastsEventually(t, discovery, 3)
t.Fatalf("broadcasts expected %v, got %v ", 3, discovery.Broadcasts())
}
// check newly added node // check newly added node
if err := checkAddreseeRecords(discovery, alreadyConnected, connectedPeers[1:]); err != nil { if err := checkAddreseeRecords(discovery, alreadyConnected, connectedPeers[1:]); err != nil {
...@@ -172,14 +148,12 @@ func TestAddPeer(t *testing.T) { ...@@ -172,14 +148,12 @@ func TestAddPeer(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = fullDriver.AddPeer(context.Background(), overlay) err = fullDriver.AddPeers(context.Background(), overlay)
if err != nil { if err != nil {
t.Fatalf("full conn driver returned err %s", err.Error()) t.Fatalf("full conn driver returned err %s", err.Error())
} }
if discovery.Broadcasts() != 4 { expectBroadcastsEventually(t, discovery, 4)
t.Fatalf("broadcasts expected %v, got %v ", 4, discovery.Broadcasts())
}
// check newly added node // check newly added node
if err := checkAddreseeRecords(discovery, overlay, connectedPeers); err != nil { if err := checkAddreseeRecords(discovery, overlay, connectedPeers); err != nil {
...@@ -195,6 +169,17 @@ func TestAddPeer(t *testing.T) { ...@@ -195,6 +169,17 @@ func TestAddPeer(t *testing.T) {
}) })
} }
func expectBroadcastsEventually(t *testing.T, discovery *mock.Discovery, expected int) {
for i := 0; i < 100; i++ {
time.Sleep(50 * time.Millisecond)
if discovery.Broadcasts() == expected {
return
}
}
t.Fatalf("broadcasts expected %v, got %v ", expected, discovery.Broadcasts())
}
// TestSyncPeer tests that SyncPeer method returns closest connected peer to a given chunk. // TestSyncPeer tests that SyncPeer method returns closest connected peer to a given chunk.
func TestClosestPeer(t *testing.T) { func TestClosestPeer(t *testing.T) {
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
......
...@@ -16,14 +16,14 @@ type mock struct { ...@@ -16,14 +16,14 @@ type mock struct {
peers []swarm.Address peers []swarm.Address
closestPeer swarm.Address closestPeer swarm.Address
closestPeerErr error closestPeerErr error
addPeerErr error addPeersErr error
marshalJSONFunc func() ([]byte, error) marshalJSONFunc func() ([]byte, error)
mtx sync.Mutex mtx sync.Mutex
} }
func WithAddPeerErr(err error) Option { func WithAddPeersErr(err error) Option {
return optionFunc(func(d *mock) { return optionFunc(func(d *mock) {
d.addPeerErr = err d.addPeersErr = err
}) })
} }
...@@ -53,18 +53,21 @@ func NewTopologyDriver(opts ...Option) topology.Driver { ...@@ -53,18 +53,21 @@ func NewTopologyDriver(opts ...Option) topology.Driver {
return d return d
} }
func (d *mock) AddPeer(_ context.Context, addr swarm.Address) error { func (d *mock) AddPeers(_ context.Context, addrs ...swarm.Address) error {
if d.addPeerErr != nil { if d.addPeersErr != nil {
return d.addPeerErr return d.addPeersErr
}
for _, addr := range addrs {
d.mtx.Lock()
d.peers = append(d.peers, addr)
d.mtx.Unlock()
} }
d.mtx.Lock()
d.peers = append(d.peers, addr)
d.mtx.Unlock()
return nil return nil
} }
func (d *mock) Connected(ctx context.Context, addr swarm.Address) error { func (d *mock) Connected(ctx context.Context, addr swarm.Address) error {
return d.AddPeer(ctx, addr) return d.AddPeers(ctx, addr)
} }
func (d *mock) Disconnected(swarm.Address) { func (d *mock) Disconnected(swarm.Address) {
......
...@@ -33,8 +33,8 @@ type Notifier interface { ...@@ -33,8 +33,8 @@ type Notifier interface {
} }
type PeerAdder interface { type PeerAdder interface {
// AddPeer is called when a peer is added to the topology backlog // AddPeers is called when peers are added to the topology backlog
AddPeer(ctx context.Context, addr swarm.Address) error AddPeers(ctx context.Context, addr ...swarm.Address) error
} }
type Connecter interface { type Connecter interface {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment