Commit c85e55ad authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Clean addressbook after failed connection attempts (#309)

* Forget about non connectable peers in kademlia
parent 3c0c02e2
......@@ -19,6 +19,7 @@ var _ Interface = (*store)(nil)
type Interface interface {
GetPutter
Remover
Overlays() ([]swarm.Address, error)
Addresses() ([]bzz.Address, error)
}
......@@ -36,6 +37,10 @@ type Putter interface {
Put(overlay swarm.Address, addr bzz.Address) (err error)
}
type Remover interface {
Remove(overlay swarm.Address) error
}
type store struct {
store storage.StateStorer
}
......@@ -61,6 +66,10 @@ func (s *store) Put(overlay swarm.Address, addr bzz.Address) (err error) {
return s.store.Put(key, &addr)
}
func (s *store) Remove(overlay swarm.Address) error {
return s.store.Delete(keyPrefix + overlay.String())
}
func (s *store) Overlays() (overlays []swarm.Address, err error) {
err = s.store.Iterate(keyPrefix, func(key, _ []byte) (stop bool, err error) {
k := string(key)
......
......@@ -23,8 +23,9 @@ import (
)
const (
maxBins = 16
nnLowWatermark = 2 // the number of peers in consecutive deepest bins that constitute as nearest neighbours
maxBins = 16
nnLowWatermark = 2 // the number of peers in consecutive deepest bins that constitute as nearest neighbours
maxConnAttempts = 3 // when there is maxConnAttempts failed connect calls for a given peer it is considered non-connectable
)
var (
......@@ -56,13 +57,18 @@ type Kad struct {
depth uint8 // current neighborhood depth
depthMu sync.RWMutex // protect depth changes
manageC chan struct{} // trigger the manage forever loop to connect to new peers
waitNext map[string]time.Time // sanction connections to a peer, key is overlay string and value is time to next retry
waitNext map[string]retryInfo // sanction connections to a peer, key is overlay string and value is a retry information
waitNextMu sync.Mutex // synchronize map
logger logging.Logger // logger
quit chan struct{} // quit channel
done chan struct{} // signal that `manage` has quit
}
type retryInfo struct {
tryAfter time.Time
failedAttempts int
}
// New returns a new Kademlia.
func New(o Options) *Kad {
if o.SaturationFunc == nil {
......@@ -78,7 +84,7 @@ func New(o Options) *Kad {
connectedPeers: pslice.New(maxBins),
knownPeers: pslice.New(maxBins),
manageC: make(chan struct{}, 1),
waitNext: make(map[string]time.Time),
waitNext: make(map[string]retryInfo),
logger: o.Logger,
quit: make(chan struct{}),
done: make(chan struct{}),
......@@ -115,7 +121,7 @@ func (k *Kad) manage() {
}
k.waitNextMu.Lock()
if next, ok := k.waitNext[peer.String()]; ok && time.Now().Before(next) {
if next, ok := k.waitNext[peer.String()]; ok && time.Now().Before(next.tryAfter) {
k.waitNextMu.Unlock()
return false, false, nil
}
......@@ -248,12 +254,33 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr,
if errors.Is(err, p2p.ErrAlreadyConnected) {
return nil
}
k.logger.Debugf("error connecting to peer %s: %v", peer, err)
retryTime := time.Now().Add(timeToRetry)
var e *p2p.ConnectionBackoffError
k.waitNextMu.Lock()
k.waitNext[peer.String()] = time.Now().Add(timeToRetry)
k.waitNextMu.Unlock()
failedAttempts := 0
if errors.As(err, &e) {
retryTime = e.TryAfter()
} else {
info, ok := k.waitNext[peer.String()]
if ok {
failedAttempts = info.failedAttempts
}
// TODO: somehow keep track of attempts and at some point forget about the peer
failedAttempts++
}
if failedAttempts > maxConnAttempts {
delete(k.waitNext, peer.String())
if err := k.addressBook.Remove(peer); err != nil {
k.logger.Debugf("could not remove peer from addressbook: %s", peer.String())
}
} else {
k.waitNext[peer.String()] = retryInfo{tryAfter: retryTime, failedAttempts: failedAttempts}
}
k.waitNextMu.Unlock()
return err
}
......@@ -335,7 +362,7 @@ func (k *Kad) Disconnected(addr swarm.Address) {
k.connectedPeers.Remove(addr, po)
k.waitNextMu.Lock()
k.waitNext[addr.String()] = time.Now().Add(timeToRetry)
k.waitNext[addr.String()] = retryInfo{tryAfter: time.Now().Add(timeToRetry), failedAttempts: 0}
k.waitNextMu.Unlock()
k.depthMu.Lock()
......
......@@ -35,6 +35,8 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
var nonConnectableAddress, _ = ma.NewMultiaddr(underlayBase + "16Uiu2HAkx8ULY8cTXhdVAcMmLcH9AsTKz6uBQ7DPLKRjMLgBVYkA")
// TestNeighborhoodDepth tests that the kademlia depth changes correctly
// according to the change to known peers slice. This inadvertently tests
// the functionality in `manage()` method, however this is not the main aim of the
......@@ -45,7 +47,7 @@ func TestNeighborhoodDepth(t *testing.T) {
var (
conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil)
base, kad, ab, _, signer = newTestKademlia(&conns, nil, nil)
peers []swarm.Address
binEight []swarm.Address
)
......@@ -73,7 +75,7 @@ func TestNeighborhoodDepth(t *testing.T) {
add(t, signer, kad, ab, peers, 0, 2)
// wait for 4 connections
waitConns(t, &conns, 4)
waitCounter(t, &conns, 4)
// depth 2 (shallowest empty bin)
kDepth(t, kad, 2)
......@@ -117,7 +119,7 @@ func TestNeighborhoodDepth(t *testing.T) {
addOne(t, signer, kad, ab, addr)
}
waitConns(t, &conns, 15)
waitCounter(t, &conns, 15)
kDepth(t, kad, 13)
// add one at 14 - depth should be now 14
......@@ -162,7 +164,7 @@ func TestManage(t *testing.T) {
saturationFunc = func(bin, depth uint8, peers *pslice.PSlice) bool {
return saturationVal
}
base, kad, ab, _, signer = newTestKademlia(&conns, saturationFunc)
base, kad, ab, _, signer = newTestKademlia(&conns, nil, saturationFunc)
)
// first, saturationFunc returns always false, this means that the bin is not saturated,
// hence we expect that every peer we add to kademlia will be connected to
......@@ -171,7 +173,7 @@ func TestManage(t *testing.T) {
addOne(t, signer, kad, ab, addr)
}
waitConns(t, &conns, 50)
waitCounter(t, &conns, 50)
saturationVal = true
// now since the bin is "saturated", no new connections should be made
......@@ -180,7 +182,7 @@ func TestManage(t *testing.T) {
addOne(t, signer, kad, ab, addr)
}
waitConns(t, &conns, 0)
waitCounter(t, &conns, 0)
// check other bins just for fun
for i := 0; i < 16; i++ {
......@@ -189,7 +191,7 @@ func TestManage(t *testing.T) {
addOne(t, signer, kad, ab, addr)
}
}
waitConns(t, &conns, 0)
waitCounter(t, &conns, 0)
}
// TestBinSaturation tests the builtin binSaturated function.
......@@ -201,7 +203,7 @@ func TestManage(t *testing.T) {
func TestBinSaturation(t *testing.T) {
var (
conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil)
base, kad, ab, _, signer = newTestKademlia(&conns, nil, nil)
peers []swarm.Address
)
......@@ -215,7 +217,7 @@ func TestBinSaturation(t *testing.T) {
peers = append(peers, addr)
}
}
waitConns(t, &conns, 10)
waitCounter(t, &conns, 10)
// add one more peer in each bin shallower than depth and
// expect no connections due to saturation. if we add a peer within
......@@ -224,30 +226,30 @@ func TestBinSaturation(t *testing.T) {
addr := test.RandomAddressAt(base, i)
addOne(t, signer, kad, ab, addr)
}
waitConns(t, &conns, 0)
waitCounter(t, &conns, 0)
// add one peer in a bin higher (unsaturated) and expect one connection
addr := test.RandomAddressAt(base, 6)
addOne(t, signer, kad, ab, addr)
waitConns(t, &conns, 1)
waitCounter(t, &conns, 1)
// again, one bin higher
addr = test.RandomAddressAt(base, 7)
addOne(t, signer, kad, ab, addr)
waitConns(t, &conns, 1)
waitCounter(t, &conns, 1)
// this is in order to hit the `if size < 2` in the saturation func
removeOne(kad, peers[2])
waitConns(t, &conns, 1)
waitCounter(t, &conns, 1)
}
// TestNotifierHooks tests that the Connected/Disconnected hooks
// result in the correct behavior once called.
func TestNotifierHooks(t *testing.T) {
var (
base, kad, ab, _, signer = newTestKademlia(nil, nil)
base, kad, ab, _, signer = newTestKademlia(nil, nil, nil)
peer = test.RandomAddressAt(base, 3)
addr = test.RandomAddressAt(peer, 4) // address which is closer to peer
)
......@@ -277,7 +279,7 @@ func TestNotifierHooks(t *testing.T) {
func TestDiscoveryHooks(t *testing.T) {
var (
conns int32
_, kad, ab, disc, signer = newTestKademlia(&conns, nil)
_, kad, ab, disc, signer = newTestKademlia(&conns, nil, nil)
p1, p2, p3 = test.RandomAddress(), test.RandomAddress(), test.RandomAddress()
)
......@@ -312,14 +314,14 @@ func TestBackoff(t *testing.T) {
var (
conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil)
base, kad, ab, _, signer = newTestKademlia(&conns, nil, nil)
)
// add one peer, wait for connection
addr := test.RandomAddressAt(base, 1)
addOne(t, signer, kad, ab, addr)
waitConns(t, &conns, 1)
waitCounter(t, &conns, 1)
// remove that peer
removeOne(kad, addr)
......@@ -329,14 +331,100 @@ func TestBackoff(t *testing.T) {
addr = test.RandomAddressAt(base, 1)
addOne(t, signer, kad, ab, addr)
waitConns(t, &conns, 1)
waitCounter(t, &conns, 1)
// wait for another 400ms, add another, expect 2 connections
time.Sleep(400 * time.Millisecond)
addr = test.RandomAddressAt(base, 1)
addOne(t, signer, kad, ab, addr)
waitConns(t, &conns, 2)
waitCounter(t, &conns, 2)
}
func TestAddressBookPrune(t *testing.T) {
// test pruning addressbook after successive failed connect attempts
// cheat and decrease the timer
defer func(t time.Duration) {
*kademlia.TimeToRetry = t
}(*kademlia.TimeToRetry)
*kademlia.TimeToRetry = 50 * time.Millisecond
var (
conns, failedConns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, &failedConns, nil)
)
nonConnPeer, err := bzz.NewAddress(signer, nonConnectableAddress, test.RandomAddressAt(base, 1), 0)
if err != nil {
t.Fatal(err)
}
if err := ab.Put(nonConnPeer.Overlay, *nonConnPeer); err != nil {
t.Fatal(err)
}
// add non connectable peer, check connection and failed connection counters
_ = kad.AddPeer(context.Background(), nonConnPeer.Overlay)
waitCounter(t, &conns, 0)
waitCounter(t, &failedConns, 1)
addr := test.RandomAddressAt(base, 1)
addr1 := test.RandomAddressAt(base, 1)
addr2 := test.RandomAddressAt(base, 1)
p, err := ab.Get(nonConnPeer.Overlay)
if err != nil {
t.Fatal(err)
}
if !nonConnPeer.Equal(&p) {
t.Fatalf("expected %+v, got %+v", nonConnPeer, p)
}
time.Sleep(50 * time.Millisecond)
// add one valid peer to initiate the retry, check connection and failed connection counters
addOne(t, signer, kad, ab, addr)
waitCounter(t, &conns, 1)
waitCounter(t, &failedConns, 1)
p, err = ab.Get(nonConnPeer.Overlay)
if err != nil {
t.Fatal(err)
}
if !nonConnPeer.Equal(&p) {
t.Fatalf("expected %+v, got %+v", nonConnPeer, p)
}
time.Sleep(50 * time.Millisecond)
// add one valid peer to initiate the retry, check connection and failed connection counters
addOne(t, signer, kad, ab, addr1)
waitCounter(t, &conns, 1)
waitCounter(t, &failedConns, 1)
p, err = ab.Get(nonConnPeer.Overlay)
if err != nil {
t.Fatal(err)
}
if !nonConnPeer.Equal(&p) {
t.Fatalf("expected %+v, got %+v", nonConnPeer, p)
}
time.Sleep(50 * time.Millisecond)
// add one valid peer to initiate the retry, check connection and failed connection counters
addOne(t, signer, kad, ab, addr2)
waitCounter(t, &conns, 1)
waitCounter(t, &failedConns, 1)
p, err = ab.Get(nonConnPeer.Overlay)
if err == nil {
t.Fatal("expected not found error")
}
if nonConnPeer.Equal(&p) {
t.Fatal("peer found in addressbook")
}
}
// TestClosestPeer tests that ClosestPeer method returns closest connected peer to a given address.
......@@ -359,14 +447,14 @@ func TestClosestPeer(t *testing.T) {
ab := addressbook.New(mockstate.NewStateStore())
var conns int32
kad := kademlia.New(kademlia.Options{Base: base, Discovery: disc, AddressBook: ab, P2P: p2pMock(&conns), Logger: logger})
kad := kademlia.New(kademlia.Options{Base: base, Discovery: disc, AddressBook: ab, P2P: p2pMock(&conns, nil), Logger: logger})
defer kad.Close()
pk, _ := crypto.GenerateSecp256k1Key()
for _, v := range connectedPeers {
addOne(t, beeCrypto.NewDefaultSigner(pk), kad, ab, v.Address)
}
waitConns(t, &conns, 3)
waitCounter(t, &conns, 3)
for _, tc := range []struct {
chunkAddress swarm.Address // chunk address to test
......@@ -419,7 +507,7 @@ func TestClosestPeer(t *testing.T) {
func TestMarshal(t *testing.T) {
var (
_, kad, ab, _, signer = newTestKademlia(nil, nil)
_, kad, ab, _, signer = newTestKademlia(nil, nil, nil)
)
a := test.RandomAddress()
addOne(t, signer, kad, ab, a)
......@@ -429,10 +517,10 @@ func TestMarshal(t *testing.T) {
}
}
func newTestKademlia(connCounter *int32, f func(bin, depth uint8, peers *pslice.PSlice) bool) (swarm.Address, *kademlia.Kad, addressbook.Interface, *mock.Discovery, beeCrypto.Signer) {
func newTestKademlia(connCounter, failedConnCounter *int32, f func(bin, depth uint8, peers *pslice.PSlice) bool) (swarm.Address, *kademlia.Kad, addressbook.Interface, *mock.Discovery, beeCrypto.Signer) {
var (
base = test.RandomAddress() // base address
p2p = p2pMock(connCounter)
p2p = p2pMock(connCounter, failedConnCounter)
logger = logging.New(ioutil.Discard, 0) // logger
ab = addressbook.New(mockstate.NewStateStore()) // address book
disc = mock.NewDiscovery() // mock discovery
......@@ -443,8 +531,12 @@ func newTestKademlia(connCounter *int32, f func(bin, depth uint8, peers *pslice.
return base, kad, ab, disc, beeCrypto.NewDefaultSigner(pk)
}
func p2pMock(counter *int32) p2p.Service {
func p2pMock(counter, failedCounter *int32) p2p.Service {
p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
if addr.Equal(nonConnectableAddress) {
_ = atomic.AddInt32(failedCounter, 1)
return nil, errors.New("non reachable node")
}
if counter != nil {
_ = atomic.AddInt32(counter, 1)
}
......@@ -487,10 +579,6 @@ func addOne(t *testing.T, signer beeCrypto.Signer, k *kademlia.Kad, ab addressbo
if err != nil {
t.Fatal(err)
}
if err := ab.Put(peer, *bzzAddr); err != nil {
t.Fatal(err)
}
if err := ab.Put(peer, *bzzAddr); err != nil {
t.Fatal(err)
}
......@@ -519,29 +607,28 @@ func kDepth(t *testing.T, k *kademlia.Kad, d int) {
func waitConn(t *testing.T, conns *int32) {
t.Helper()
waitConns(t, conns, 1)
waitCounter(t, conns, 1)
}
// waits for some connections for some time. resets the pointer value
// if the correct number of connections have been reached.
func waitConns(t *testing.T, conns *int32, exp int32) {
// waits for counter for some time. resets the pointer value
// if the correct number have been reached.
func waitCounter(t *testing.T, conns *int32, exp int32) {
t.Helper()
var got int32
if exp == 0 {
// sleep for some time before checking for a 0.
// this gives some time for unwanted connections to be
// established.
// this gives some time for unwanted counter increments happen
time.Sleep(50 * time.Millisecond)
}
for i := 0; i < 50; i++ {
if atomic.LoadInt32(conns) == exp {
if got = atomic.LoadInt32(conns); got == exp {
atomic.StoreInt32(conns, 0)
return
}
time.Sleep(50 * time.Millisecond)
}
t.Fatalf("timed out waiting for connections to be established. got %d want %d", got, exp)
t.Fatalf("timed out waiting for counter to reach expected value. got %d want %d", got, exp)
}
// wait for discovery BroadcastPeers to happen
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment