Commit 32b82033 authored by acud's avatar acud Committed by GitHub

kademlia: handle overlay mismatch (#356)

* handle overlay mismatch
Co-authored-by: default avatarPetar Radovic <petar.radovic@gmail.com>
parent 3e7a11bd
...@@ -29,6 +29,7 @@ const ( ...@@ -29,6 +29,7 @@ const (
var ( var (
errMissingAddressBookEntry = errors.New("addressbook underlay entry not found") errMissingAddressBookEntry = errors.New("addressbook underlay entry not found")
errOverlayMismatch = errors.New("overlay mismatch")
timeToRetry = 60 * time.Second timeToRetry = 60 * time.Second
shortRetry = 30 * time.Second shortRetry = 30 * time.Second
) )
...@@ -156,6 +157,12 @@ func (k *Kad) manage() { ...@@ -156,6 +157,12 @@ func (k *Kad) manage() {
err = k.connect(ctx, peer, bzzAddr.Underlay, po) err = k.connect(ctx, peer, bzzAddr.Underlay, po)
if err != nil { if err != nil {
if errors.Is(err, errOverlayMismatch) {
k.knownPeers.Remove(peer, po)
if err := k.addressBook.Remove(peer); err != nil {
k.logger.Debugf("could not remove peer from addressbook: %s", peer.String())
}
}
k.logger.Debugf("error connecting to peer from kademlia %s: %v", bzzAddr.String(), err) k.logger.Debugf("error connecting to peer from kademlia %s: %v", bzzAddr.String(), err)
k.logger.Warningf("connecting to peer %s: %v", bzzAddr.ShortString(), err) k.logger.Warningf("connecting to peer %s: %v", bzzAddr.ShortString(), err)
// continue to next // continue to next
...@@ -260,7 +267,7 @@ func (k *Kad) recalcDepth() uint8 { ...@@ -260,7 +267,7 @@ func (k *Kad) recalcDepth() uint8 {
func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr, po uint8) error { func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr, po uint8) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel() defer cancel()
_, err := k.p2p.Connect(ctx, ma) i, err := k.p2p.Connect(ctx, ma)
if err != nil { if err != nil {
if errors.Is(err, p2p.ErrAlreadyConnected) { if errors.Is(err, p2p.ErrAlreadyConnected) {
return nil return nil
...@@ -287,6 +294,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr, ...@@ -287,6 +294,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr,
if err := k.addressBook.Remove(peer); err != nil { if err := k.addressBook.Remove(peer); err != nil {
k.logger.Debugf("could not remove peer from addressbook: %s", peer.String()) k.logger.Debugf("could not remove peer from addressbook: %s", peer.String())
} }
k.logger.Debugf("kademlia pruned peer from address book %s", peer.String())
} else { } else {
k.waitNext[peer.String()] = retryInfo{tryAfter: retryTime, failedAttempts: failedAttempts} k.waitNext[peer.String()] = retryInfo{tryAfter: retryTime, failedAttempts: failedAttempts}
} }
...@@ -295,6 +303,12 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr, ...@@ -295,6 +303,12 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr,
return err return err
} }
if !i.Overlay.Equal(peer) {
_ = k.p2p.Disconnect(peer)
_ = k.p2p.Disconnect(i.Overlay)
return errOverlayMismatch
}
return k.announce(ctx, peer) return k.announce(ctx, peer)
} }
......
...@@ -45,8 +45,7 @@ var nonConnectableAddress, _ = ma.NewMultiaddr(underlayBase + "16Uiu2HAkx8ULY8cT ...@@ -45,8 +45,7 @@ var nonConnectableAddress, _ = ma.NewMultiaddr(underlayBase + "16Uiu2HAkx8ULY8cT
// tested in TestManage below. // tested in TestManage below.
func TestNeighborhoodDepth(t *testing.T) { func TestNeighborhoodDepth(t *testing.T) {
var ( var (
conns int32 // how many connect calls were made to the p2p mock conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, nil) base, kad, ab, _, signer = newTestKademlia(&conns, nil, nil)
peers []swarm.Address peers []swarm.Address
binEight []swarm.Address binEight []swarm.Address
...@@ -443,7 +442,7 @@ func TestClosestPeer(t *testing.T) { ...@@ -443,7 +442,7 @@ func TestClosestPeer(t *testing.T) {
ab := addressbook.New(mockstate.NewStateStore()) ab := addressbook.New(mockstate.NewStateStore())
var conns int32 var conns int32
kad := kademlia.New(kademlia.Options{Base: base, Discovery: disc, AddressBook: ab, P2P: p2pMock(&conns, nil), Logger: logger}) kad := kademlia.New(kademlia.Options{Base: base, Discovery: disc, AddressBook: ab, P2P: p2pMock(ab, &conns, nil), Logger: logger})
defer kad.Close() defer kad.Close()
pk, _ := crypto.GenerateSecp256k1Key() pk, _ := crypto.GenerateSecp256k1Key()
...@@ -613,10 +612,10 @@ func TestMarshal(t *testing.T) { ...@@ -613,10 +612,10 @@ func TestMarshal(t *testing.T) {
func newTestKademlia(connCounter, failedConnCounter *int32, f func(bin, depth uint8, peers *pslice.PSlice) bool) (swarm.Address, *kademlia.Kad, addressbook.Interface, *mock.Discovery, beeCrypto.Signer) { func newTestKademlia(connCounter, failedConnCounter *int32, f func(bin, depth uint8, peers *pslice.PSlice) bool) (swarm.Address, *kademlia.Kad, addressbook.Interface, *mock.Discovery, beeCrypto.Signer) {
var ( var (
base = test.RandomAddress() // base address base = test.RandomAddress() // base address
p2p = p2pMock(connCounter, failedConnCounter) ab = addressbook.New(mockstate.NewStateStore()) // address book
p2p = p2pMock(ab, connCounter, failedConnCounter)
logger = logging.New(ioutil.Discard, 0) // logger logger = logging.New(ioutil.Discard, 0) // logger
ab = addressbook.New(mockstate.NewStateStore()) // address book
disc = mock.NewDiscovery() // mock discovery disc = mock.NewDiscovery() // mock discovery
kad = kademlia.New(kademlia.Options{Base: base, Discovery: disc, AddressBook: ab, P2P: p2p, Logger: logger, SaturationFunc: f}) // kademlia instance kad = kademlia.New(kademlia.Options{Base: base, Discovery: disc, AddressBook: ab, P2P: p2p, Logger: logger, SaturationFunc: f}) // kademlia instance
) )
...@@ -625,7 +624,7 @@ func newTestKademlia(connCounter, failedConnCounter *int32, f func(bin, depth ui ...@@ -625,7 +624,7 @@ func newTestKademlia(connCounter, failedConnCounter *int32, f func(bin, depth ui
return base, kad, ab, disc, beeCrypto.NewDefaultSigner(pk) return base, kad, ab, disc, beeCrypto.NewDefaultSigner(pk)
} }
func p2pMock(counter, failedCounter *int32) p2p.Service { func p2pMock(ab addressbook.Interface, counter, failedCounter *int32) p2p.Service {
p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) { p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
if addr.Equal(nonConnectableAddress) { if addr.Equal(nonConnectableAddress) {
_ = atomic.AddInt32(failedCounter, 1) _ = atomic.AddInt32(failedCounter, 1)
...@@ -634,6 +633,18 @@ func p2pMock(counter, failedCounter *int32) p2p.Service { ...@@ -634,6 +633,18 @@ func p2pMock(counter, failedCounter *int32) p2p.Service {
if counter != nil { if counter != nil {
_ = atomic.AddInt32(counter, 1) _ = atomic.AddInt32(counter, 1)
} }
addresses, err := ab.Addresses()
if err != nil {
return nil, errors.New("could not fetch addresbook addresses")
}
for _, a := range addresses {
if a.Underlay.Equal(addr) {
return &a, nil
}
}
return nil, nil return nil, nil
})) }))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment