Commit c5ad0c0f authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Fix loop break bug in discover (#640)

Break at 3 bootnodes
parent f8054e91
Pipeline #449 failed with stages
......@@ -239,26 +239,30 @@ func (k *Kad) Start(ctx context.Context) error {
func (k *Kad) connectBootnodes(ctx context.Context) {
var count int
for _, addr := range k.bootnodes {
if count == 3 {
if count >= 3 {
return
}
if _, err := p2p.Discover(ctx, addr, func(addr ma.Multiaddr) (stop bool, err error) {
k.logger.Tracef("connecting to bootnode %s", addr)
_, err = k.p2p.ConnectNotify(ctx, addr)
bzzAddress, err := k.p2p.Connect(ctx, addr)
if err != nil {
if !errors.Is(err, p2p.ErrAlreadyConnected) {
k.logger.Debugf("connect fail %s: %v", addr, err)
k.logger.Warningf("connect to bootnode %s", addr)
return false, err
}
return false, nil
}
if err := k.connected(ctx, bzzAddress.Overlay); err != nil {
return false, err
}
k.logger.Tracef("connected to bootnode %s", addr)
count++
// connect to max 3 bootnodes
return count == 3, nil
return count >= 3, nil
}); err != nil {
k.logger.Debugf("discover fail %s: %v", addr, err)
k.logger.Warningf("discover to bootnode %s", addr)
......@@ -437,6 +441,19 @@ func (k *Kad) AddPeers(ctx context.Context, addrs ...swarm.Address) error {
// Connected is called when a peer has dialed in.
func (k *Kad) Connected(ctx context.Context, addr swarm.Address) error {
if err := k.connected(ctx, addr); err != nil {
return err
}
select {
case k.manageC <- struct{}{}:
default:
}
return nil
}
func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
if err := k.announce(ctx, addr); err != nil {
return err
}
......@@ -455,11 +472,6 @@ func (k *Kad) Connected(ctx context.Context, addr swarm.Address) error {
k.notifyPeerSig()
select {
case k.manageC <- struct{}{}:
default:
}
return nil
}
......@@ -475,6 +487,7 @@ func (k *Kad) Disconnected(addr swarm.Address) {
k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers)
k.depthMu.Unlock()
select {
case k.manageC <- struct{}{}:
default:
......
......@@ -479,7 +479,7 @@ func TestClosestPeer(t *testing.T) {
ab := addressbook.New(mockstate.NewStateStore())
var conns int32
kad := kademlia.New(base, ab, disc, p2pMock(ab, &conns, nil), logger, kademlia.Options{})
kad := kademlia.New(base, ab, disc, p2pMock(ab, nil, &conns, nil), logger, kademlia.Options{})
if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
......@@ -674,8 +674,7 @@ func TestMarshal(t *testing.T) {
func TestStart(t *testing.T) {
var bootnodes []ma.Multiaddr
for i := 0; i < 5; i++ {
for i := 0; i < 10; i++ {
multiaddr, err := ma.NewMultiaddr(underlayBase + test.RandomAddress().String())
if err != nil {
t.Fatal(err)
......@@ -727,20 +726,22 @@ func TestStart(t *testing.T) {
}
func newTestKademlia(connCounter, failedConnCounter *int32, f func(bin uint8, peers, connected *pslice.PSlice) bool, bootnodes []ma.Multiaddr) (swarm.Address, *kademlia.Kad, addressbook.Interface, *mock.Discovery, beeCrypto.Signer) {
pk, _ := crypto.GenerateSecp256k1Key()
var (
signer = beeCrypto.NewDefaultSigner(pk)
base = test.RandomAddress() // base address
ab = addressbook.New(mockstate.NewStateStore()) // address book
p2p = p2pMock(ab, connCounter, failedConnCounter)
p2p = p2pMock(ab, signer, connCounter, failedConnCounter)
logger = logging.New(ioutil.Discard, 0) // logger
disc = mock.NewDiscovery()
kad = kademlia.New(base, ab, disc, p2p, logger, kademlia.Options{SaturationFunc: f, Bootnodes: bootnodes}) // kademlia instance
)
pk, _ := crypto.GenerateSecp256k1Key()
return base, kad, ab, disc, beeCrypto.NewDefaultSigner(pk)
return base, kad, ab, disc, signer
}
func p2pMock(ab addressbook.Interface, counter, failedCounter *int32) p2p.Service {
func p2pMock(ab addressbook.Interface, signer beeCrypto.Signer, counter, failedCounter *int32) p2p.Service {
p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
if addr.Equal(nonConnectableAddress) {
_ = atomic.AddInt32(failedCounter, 1)
......@@ -761,7 +762,17 @@ func p2pMock(ab addressbook.Interface, counter, failedCounter *int32) p2p.Servic
}
}
return nil, nil
address := test.RandomAddress()
bzzAddr, err := bzz.NewAddress(signer, addr, address, 0)
if err != nil {
return nil, err
}
if err := ab.Put(address, *bzzAddr); err != nil {
return nil, err
}
return bzzAddr, nil
}))
return p2ps
......
......@@ -14,7 +14,7 @@ import (
madns "github.com/multiformats/go-multiaddr-dns"
)
func Discover(ctx context.Context, addr ma.Multiaddr, f func(ma.Multiaddr) (stop bool, err error)) (stopped bool, err error) {
func Discover(ctx context.Context, addr ma.Multiaddr, f func(ma.Multiaddr) (bool, error)) (bool, error) {
if comp, _ := ma.SplitFirst(addr); comp.Protocol().Name != "dnsaddr" {
return f(addr)
}
......@@ -32,12 +32,13 @@ func Discover(ctx context.Context, addr ma.Multiaddr, f func(ma.Multiaddr) (stop
addrs[i], addrs[j] = addrs[j], addrs[i]
})
for _, addr := range addrs {
stopped, err = Discover(ctx, addr, f)
stopped, err := Discover(ctx, addr, f)
if err != nil {
return false, fmt.Errorf("discover %s: %w", addr, err)
}
if stopped {
break
return true, nil
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment