Commit 8d7e9e9b authored by Esad Akar's avatar Esad Akar Committed by GitHub

feat: quick prune failed peers with no history (#1730)

parent d7970054
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
package metrics package metrics
import ( import (
"errors"
"fmt" "fmt"
"sync" "sync"
"time" "time"
...@@ -16,6 +17,8 @@ import ( ...@@ -16,6 +17,8 @@ import (
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
) )
var ErrPeerNotFound = errors.New("peer no found")
const ( const (
peerLastSeenTimestamp string = "peer-last-seen-timestamp" peerLastSeenTimestamp string = "peer-last-seen-timestamp"
peerTotalConnectionDuration string = "peer-total-connection-duration" peerTotalConnectionDuration string = "peer-total-connection-duration"
...@@ -208,42 +211,16 @@ func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) error { ...@@ -208,42 +211,16 @@ func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) error {
// error, it rather continues and all the execution errors are returned together // error, it rather continues and all the execution errors are returned together
// with the successful metrics snapshots. // with the successful metrics snapshots.
func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[string]*Snapshot, error) { func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[string]*Snapshot, error) {
c.mu.RLock()
defer c.mu.RUnlock()
var mErr error var mErr error
snapshot := make(map[string]*Snapshot) snapshot := make(map[string]*Snapshot)
take := func(addr string) { take := func(addr string) {
cs := c.counters[addr] peerSnapshot, err := c.peer(t, addr)
if cs == nil {
return
}
ls, err := cs.lastSeenTimestamp.Get()
if err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to take last seen snapshot for %q: %w", addr, err))
}
lastSeenTimestamp := int64(ls)
cn, err := cs.connTotalDuration.Get()
if err != nil { if err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to take connection duration snapshot for %q: %w", addr, err)) mErr = multierror.Append(err)
} } else {
connTotalDuration := time.Duration(cn) snapshot[addr] = peerSnapshot
sessionConnDuration := cs.sessionConnDuration
if cs.loggedIn {
sessionConnDuration = t.Sub(time.Unix(0, lastSeenTimestamp))
connTotalDuration += sessionConnDuration
}
snapshot[addr] = &Snapshot{
LastSeenTimestamp: lastSeenTimestamp,
SessionConnectionRetry: cs.sessionConnRetry,
ConnectionTotalDuration: connTotalDuration,
SessionConnectionDuration: sessionConnDuration,
SessionConnectionDirection: cs.sessionConnDirection,
} }
} }
...@@ -251,14 +228,58 @@ func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[strin ...@@ -251,14 +228,58 @@ func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[strin
take(addr.String()) take(addr.String())
} }
if len(addresses) == 0 { if len(addresses) == 0 {
c.mu.RLock()
for addr := range c.counters { for addr := range c.counters {
take(addr) take(addr)
} }
c.mu.RUnlock()
} }
return snapshot, mErr return snapshot, mErr
} }
func (c *Collector) Peer(t time.Time, addr swarm.Address) (*Snapshot, error) {
return c.peer(t, addr.String())
}
func (c *Collector) peer(t time.Time, addr string) (*Snapshot, error) {
c.mu.RLock()
defer c.mu.RUnlock()
var mErr error
cs := c.counters[addr]
if cs == nil {
return nil, ErrPeerNotFound
}
ls, err := cs.lastSeenTimestamp.Get()
if err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to take last seen snapshot for %q: %w", addr, err))
}
lastSeenTimestamp := int64(ls)
cn, err := cs.connTotalDuration.Get()
if err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to take connection duration snapshot for %q: %w", addr, err))
}
connTotalDuration := time.Duration(cn)
sessionConnDuration := cs.sessionConnDuration
if cs.loggedIn {
sessionConnDuration = t.Sub(time.Unix(0, lastSeenTimestamp))
connTotalDuration += sessionConnDuration
}
return &Snapshot{
LastSeenTimestamp: lastSeenTimestamp,
ConnectionTotalDuration: connTotalDuration,
SessionConnectionRetry: cs.sessionConnRetry,
SessionConnectionDuration: sessionConnDuration,
SessionConnectionDirection: cs.sessionConnDirection,
}, mErr
}
// Finalize logs out all ongoing peer sessions // Finalize logs out all ongoing peer sessions
// and flushes all in-memory metrics counters. // and flushes all in-memory metrics counters.
func (c *Collector) Finalize(t time.Time) error { func (c *Collector) Finalize(t time.Time) error {
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
package metrics_test package metrics_test
import ( import (
"errors"
"testing" "testing"
"time" "time"
...@@ -137,7 +138,7 @@ func TestPeerMetricsCollector(t *testing.T) { ...@@ -137,7 +138,7 @@ func TestPeerMetricsCollector(t *testing.T) {
t.Fatalf("Finalize(%s): unexpected error: %v", t3, err) t.Fatalf("Finalize(%s): unexpected error: %v", t3, err)
} }
sss, err := mc.Snapshot(t2, addr) sss, err := mc.Snapshot(t2, addr)
if err != nil { if !errors.Is(err, metrics.ErrPeerNotFound) {
t.Fatalf("Snapshot(%q, ...): unexpected error: %v", addr, err) t.Fatalf("Snapshot(%q, ...): unexpected error: %v", addr, err)
} }
if have, want := len(sss), 0; have != want { if have, want := len(sss), 0; have != want {
......
...@@ -689,7 +689,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -689,7 +689,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
k.logger.Debugf("kademlia: unable to record session connection retry metrics for %q: %v", peer, err) k.logger.Debugf("kademlia: unable to record session connection retry metrics for %q: %v", peer, err)
} }
if failedAttempts > maxConnAttempts { if k.quickPrune(peer) || failedAttempts > maxConnAttempts {
delete(k.waitNext, peer.String()) delete(k.waitNext, peer.String())
if err := k.addressBook.Remove(peer); err != nil { if err := k.addressBook.Remove(peer); err != nil {
k.logger.Debugf("could not remove peer from addressbook: %s", peer.String()) k.logger.Debugf("could not remove peer from addressbook: %s", peer.String())
...@@ -700,6 +700,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -700,6 +700,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
} }
k.waitNextMu.Unlock() k.waitNextMu.Unlock()
return err return err
} }
...@@ -712,6 +713,22 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -712,6 +713,22 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
return k.Announce(ctx, peer) return k.Announce(ctx, peer)
} }
// quickPrune will return true for cases where:
// there are other connected peers, the addr has never been seen before, AND it's the first failed attempt
func (k *Kad) quickPrune(addr swarm.Address) bool {
if k.connectedPeers.Length() == 0 {
return false
}
snapshot, _ := k.collector.Peer(time.Now(), addr)
if snapshot == nil || (snapshot.LastSeenTimestamp == 0 && snapshot.SessionConnectionRetry <= 1) {
return true
}
return false
}
// announce a newly connected peer to our connected peers, but also // announce a newly connected peer to our connected peers, but also
// notify the peer about our already connected peers // notify the peer about our already connected peers
func (k *Kad) Announce(ctx context.Context, peer swarm.Address) error { func (k *Kad) Announce(ctx context.Context, peer swarm.Address) error {
......
...@@ -735,20 +735,18 @@ func TestAddressBookPrune(t *testing.T) { ...@@ -735,20 +735,18 @@ func TestAddressBookPrune(t *testing.T) {
waitCounter(t, &conns, 0) waitCounter(t, &conns, 0)
waitCounter(t, &failedConns, 1) waitCounter(t, &failedConns, 1)
addr := test.RandomAddressAt(base, 1)
addr1 := test.RandomAddressAt(base, 1)
addr2 := test.RandomAddressAt(base, 1)
p, err := ab.Get(nonConnPeer.Overlay) p, err := ab.Get(nonConnPeer.Overlay)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if !nonConnPeer.Equal(p) { if !nonConnPeer.Equal(p) {
t.Fatalf("expected %+v, got %+v", nonConnPeer, p) t.Fatalf("expected %+v, got %+v", nonConnPeer, p)
} }
time.Sleep(50 * time.Millisecond) addr := test.RandomAddressAt(base, 1)
addr1 := test.RandomAddressAt(base, 1)
addr2 := test.RandomAddressAt(base, 1)
// add one valid peer to initiate the retry, check connection and failed connection counters // add one valid peer to initiate the retry, check connection and failed connection counters
addOne(t, signer, kad, ab, addr) addOne(t, signer, kad, ab, addr)
waitCounter(t, &conns, 1) waitCounter(t, &conns, 1)
...@@ -790,6 +788,50 @@ func TestAddressBookPrune(t *testing.T) { ...@@ -790,6 +788,50 @@ func TestAddressBookPrune(t *testing.T) {
} }
} }
func TestAddressBookQuickPrune(t *testing.T) {
// test pruning addressbook after successive failed connect attempts
// cheat and decrease the timer
defer func(t time.Duration) {
*kademlia.TimeToRetry = t
}(*kademlia.TimeToRetry)
*kademlia.TimeToRetry = 50 * time.Millisecond
var (
conns, failedConns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(t, &conns, &failedConns, kademlia.Options{})
)
if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
defer kad.Close()
nonConnPeer, err := bzz.NewAddress(signer, nonConnectableAddress, test.RandomAddressAt(base, 1), 0)
if err != nil {
t.Fatal(err)
}
if err := ab.Put(nonConnPeer.Overlay, *nonConnPeer); err != nil {
t.Fatal(err)
}
addr := test.RandomAddressAt(base, 1)
// add one valid peer
addOne(t, signer, kad, ab, addr)
waitCounter(t, &conns, 1)
waitCounter(t, &failedConns, 0)
// add non connectable peer, check connection and failed connection counters
_ = kad.AddPeers(context.Background(), nonConnPeer.Overlay)
waitCounter(t, &conns, 0)
waitCounter(t, &failedConns, 1)
_, err = ab.Get(nonConnPeer.Overlay)
if !errors.Is(err, addressbook.ErrNotFound) {
t.Fatal(err)
}
}
// TestClosestPeer tests that ClosestPeer method returns closest connected peer to a given address. // TestClosestPeer tests that ClosestPeer method returns closest connected peer to a given address.
func TestClosestPeer(t *testing.T) { func TestClosestPeer(t *testing.T) {
metricsDB, err := shed.NewDB("", nil) metricsDB, err := shed.NewDB("", nil)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment