Commit 4252adec authored by Peter Mrekaj's avatar Peter Mrekaj Committed by GitHub

fix(kademlia): inspect peer metrics snapshot in a safe manner (#1804)

parent 814910ce
......@@ -13,6 +13,7 @@ require (
github.com/ethersphere/langos v1.0.0
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/google/go-cmp v0.5.0
github.com/google/gopacket v1.1.19 // indirect
github.com/google/uuid v1.1.4 // indirect
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect
......
......@@ -143,7 +143,6 @@ github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUn
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dlclark/regexp2 v1.2.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc=
github.com/docker/docker v1.4.2-0.20180625184442-8e610b2b55bf h1:sh8rkQZavChcmakYiSlqu2425CHyFXLZZnvm7PDpU8M=
github.com/docker/docker v1.4.2-0.20180625184442-8e610b2b55bf/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/dop251/goja v0.0.0-20200721192441-a695b0cdd498/go.mod h1:Mw6PkjjMXWbTj+nnj4s3QPXq1jaT0s5pC0iFD4+BOAA=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
......
......@@ -130,6 +130,13 @@ type Snapshot struct {
SessionConnectionDirection PeerConnectionDirection
}
// HasAtMaxOneConnectionAttempt returns true if the snapshot represents a new
// peer which has at maximum one session connection attempt but it still isn't
// logged in.
func (ss *Snapshot) HasAtMaxOneConnectionAttempt() bool {
return ss.LastSeenTimestamp == 0 && ss.SessionConnectionRetry <= 1
}
// Counters represents a collection of peer metrics
// mainly collected for statistics and debugging.
type Counters struct {
......@@ -259,6 +266,22 @@ func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[strin
return snapshot, mErr
}
// Inspect allows to inspect current snapshot for the given peer address by
// executing the given fn in a safe manner when write to the counters is
// blocked while the performing inspection function is executed.
func (c *Collector) Inspect(addr swarm.Address, fn func(ss *Snapshot)) error {
c.mu.RLock()
defer c.mu.RUnlock()
snapshots, err := c.Snapshot(time.Now(), addr)
if err != nil {
return err
}
fn(snapshots[addr.String()])
return nil
}
// Finalize logs out all ongoing peer sessions
// and flushes all in-memory metrics counters.
func (c *Collector) Finalize(t time.Time) error {
......
......@@ -11,6 +11,7 @@ import (
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology/kademlia/internal/metrics"
"github.com/google/go-cmp/cmp"
)
func snapshot(t *testing.T, mc *metrics.Collector, sst time.Time, addr swarm.Address) *metrics.Snapshot {
......@@ -127,6 +128,16 @@ func TestPeerMetricsCollector(t *testing.T) {
t.Fatalf("Snapshot(%q, ...): session connection duration counter mismatch: have %q; want %q", addr, have, want)
}
// Inspect.
if err := mc.Inspect(addr, func(have *metrics.Snapshot) {
want := ss
if diff := cmp.Diff(have, want); diff != "" {
t.Fatalf("unexpected snapshot diffrence:\n%s", diff)
}
}); err != nil {
t.Fatalf("Inspect(%q, ...): unexpected error: %v", addr, err)
}
// Finalize.
err = mc.Record(addr, metrics.PeerLogIn(t1, metrics.PeerConnectionDirectionInbound))
if err != nil {
......@@ -136,11 +147,11 @@ func TestPeerMetricsCollector(t *testing.T) {
if err != nil {
t.Fatalf("Finalize(%s): unexpected error: %v", t3, err)
}
sss, err := mc.Snapshot(t2, addr)
snapshots, err := mc.Snapshot(t2, addr)
if err != nil {
t.Fatalf("Snapshot(%q, ...): unexpected error: %v", addr, err)
}
if have, want := len(sss), 0; have != want {
if have, want := len(snapshots), 0; have != want {
t.Fatalf("Finalize(%s): counters length mismatch: have %d; want %d", t3, have, want)
}
}
......@@ -691,7 +691,9 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
k.logger.Debugf("kademlia: unable to record session connection retry metrics for %q: %v", peer, err)
}
if k.quickPrune(peer) || failedAttempts > maxConnAttempts {
if err := k.collector.Inspect(peer, func(ss *metrics.Snapshot) {
quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt()
if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts {
delete(k.waitNext, peer.String())
if err := k.addressBook.Remove(peer); err != nil {
k.logger.Debugf("could not remove peer from addressbook: %q", peer)
......@@ -703,6 +705,9 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
failedAttempts: failedAttempts,
}
}
}); err != nil {
k.logger.Debugf("kademlia: connect: unable to inspect snapshot for %q: %v", peer, err)
}
k.waitNextMu.Unlock()
return err
......@@ -715,24 +720,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
return k.Announce(ctx, peer)
}
// quickPrune will return true for cases where:
// - there are other connected peers
// - the addr has never been seen before and it's the first failed attempt
func (k *Kad) quickPrune(addr swarm.Address) bool {
if k.connectedPeers.Length() == 0 {
return false
}
sss, err := k.collector.Snapshot(time.Now(), addr)
if err != nil {
k.logger.Debugf("kademlia: quickPrune: unable to take snapshot for %q: %v", addr, err)
}
snapshot := sss[addr.String()]
return snapshot == nil ||
(snapshot.LastSeenTimestamp == 0 && snapshot.SessionConnectionRetry <= 1)
}
// announce a newly connected peer to our connected peers, but also
// Announce a newly connected peer to our connected peers, but also
// notify the peer about our already connected peers
func (k *Kad) Announce(ctx context.Context, peer swarm.Address) error {
addrs := []swarm.Address{}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment