Commit 95b8441b authored by Peter Mrekaj's avatar Peter Mrekaj Committed by GitHub

refactor(kademlia,metrics): make metrics persistent writes asynchronous (#1873)

This optimization includes:
- all counters are stored in memory by default in order to minimize disc I/O operations
- counters that need to be persisted are flushed periodically to the persistent store
- switch to sync.Map which is more suitable for the case of disjoint sets of keys
- using lock-free data structures for the critical parts
parent 14ef5446
...@@ -59,6 +59,7 @@ require ( ...@@ -59,6 +59,7 @@ require (
github.com/wealdtech/go-ens/v3 v3.4.4 github.com/wealdtech/go-ens/v3 v3.4.4
gitlab.com/nolash/go-mockbytes v0.0.7 gitlab.com/nolash/go-mockbytes v0.0.7
go.opencensus.io v0.22.5 // indirect go.opencensus.io v0.22.5 // indirect
go.uber.org/atomic v1.7.0
go.uber.org/multierr v1.6.0 // indirect go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.16.0 // indirect go.uber.org/zap v1.16.0 // indirect
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
......
...@@ -14,6 +14,8 @@ import ( ...@@ -14,6 +14,8 @@ import (
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/syndtr/goleveldb/leveldb"
"go.uber.org/atomic"
) )
const ( const (
...@@ -50,7 +52,7 @@ func newPeerKey(p, a string) *peerKey { ...@@ -50,7 +52,7 @@ func newPeerKey(p, a string) *peerKey {
// RecordOp is a definition of a peer metrics Record // RecordOp is a definition of a peer metrics Record
// operation whose execution modifies a specific metrics. // operation whose execution modifies a specific metrics.
type RecordOp func(*Counters) error type RecordOp func(*Counters)
// PeerLogIn will first update the current last seen to the give time t and as // PeerLogIn will first update the current last seen to the give time t and as
// the second it'll set the direction of the session connection to the given // the second it'll set the direction of the session connection to the given
...@@ -58,13 +60,12 @@ type RecordOp func(*Counters) error ...@@ -58,13 +60,12 @@ type RecordOp func(*Counters) error
// The time is set as Unix timestamp ignoring the timezone. The operation will // The time is set as Unix timestamp ignoring the timezone. The operation will
// panics if the given time is before the Unix epoch. // panics if the given time is before the Unix epoch.
func PeerLogIn(t time.Time, dir PeerConnectionDirection) RecordOp { func PeerLogIn(t time.Time, dir PeerConnectionDirection) RecordOp {
return func(cs *Counters) error { return func(cs *Counters) {
cs.Lock() cs.Lock()
defer cs.Unlock() defer cs.Unlock()
if cs.loggedIn { if cs.loggedIn {
return nil // Ignore when the peer is already logged in. return // Ignore when the peer is already logged in.
} }
cs.loggedIn = true cs.loggedIn = true
...@@ -73,7 +74,8 @@ func PeerLogIn(t time.Time, dir PeerConnectionDirection) RecordOp { ...@@ -73,7 +74,8 @@ func PeerLogIn(t time.Time, dir PeerConnectionDirection) RecordOp {
panic(fmt.Errorf("time before unix epoch: %s", t)) panic(fmt.Errorf("time before unix epoch: %s", t))
} }
cs.sessionConnDirection = dir cs.sessionConnDirection = dir
return cs.lastSeenTimestamp.Put(uint64(ls)) cs.lastSeenTimestamp = ls
cs.dirty.Store(3)
} }
} }
...@@ -83,59 +85,43 @@ func PeerLogIn(t time.Time, dir PeerConnectionDirection) RecordOp { ...@@ -83,59 +85,43 @@ func PeerLogIn(t time.Time, dir PeerConnectionDirection) RecordOp {
// The time is set as Unix timestamp ignoring the timezone. The operation will // The time is set as Unix timestamp ignoring the timezone. The operation will
// panics if the given time is before the Unix epoch. // panics if the given time is before the Unix epoch.
func PeerLogOut(t time.Time) RecordOp { func PeerLogOut(t time.Time) RecordOp {
return func(cs *Counters) error { return func(cs *Counters) {
cs.Lock() cs.Lock()
defer cs.Unlock() defer cs.Unlock()
if !cs.loggedIn { if !cs.loggedIn {
return nil // Ignore when the peer is not logged in. return // Ignore when the peer is not logged in.
} }
cs.loggedIn = false cs.loggedIn = false
unixt := t.UnixNano() curLs := cs.lastSeenTimestamp
newLs := uint64(unixt) newLs := t.UnixNano()
if unixt < 0 { if newLs < 0 {
panic(fmt.Errorf("time before unix epoch: %s", t)) panic(fmt.Errorf("time before unix epoch: %s", t))
} }
curLs, err := cs.lastSeenTimestamp.Get() cs.sessionConnDuration = time.Duration(newLs - curLs)
if err != nil { cs.connTotalDuration += cs.sessionConnDuration
return err cs.lastSeenTimestamp = newLs
} cs.dirty.Store(3)
ctd, err := cs.connTotalDuration.Get()
if err != nil {
return err
}
diff := newLs - curLs
cs.sessionConnDuration = time.Duration(diff)
err = cs.connTotalDuration.Put(ctd + diff)
if err != nil {
return err
}
return cs.lastSeenTimestamp.Put(newLs)
} }
} }
// IncSessionConnectionRetry increments the session connection retry // IncSessionConnectionRetry increments the session connection retry
// counter by 1. // counter by 1.
func IncSessionConnectionRetry() RecordOp { func IncSessionConnectionRetry() RecordOp {
return func(cs *Counters) error { return func(cs *Counters) {
cs.Lock() cs.Lock()
defer cs.Unlock() defer cs.Unlock()
cs.sessionConnRetry++ cs.sessionConnRetry++
return nil
} }
} }
// Snapshot represents a snapshot of peers' metrics counters. // Snapshot represents a snapshot of peers' metrics counters.
type Snapshot struct { type Snapshot struct {
LastSeenTimestamp int64 LastSeenTimestamp int64
SessionConnectionRetry uint SessionConnectionRetry uint64
ConnectionTotalDuration time.Duration ConnectionTotalDuration time.Duration
SessionConnectionDuration time.Duration SessionConnectionDuration time.Duration
SessionConnectionDirection PeerConnectionDirection SessionConnectionDirection PeerConnectionDirection
...@@ -151,74 +137,115 @@ func (ss *Snapshot) HasAtMaxOneConnectionAttempt() bool { ...@@ -151,74 +137,115 @@ func (ss *Snapshot) HasAtMaxOneConnectionAttempt() bool {
// Counters represents a collection of peer metrics // Counters represents a collection of peer metrics
// mainly collected for statistics and debugging. // mainly collected for statistics and debugging.
type Counters struct { type Counters struct {
sync.Mutex
// Bookkeeping.
peer *swarm.Address
loggedIn bool loggedIn bool
// Persistent.
lastSeenTimestamp *shed.Uint64Field // Watches in-memory counters which has to be persisted.
connTotalDuration *shed.Uint64Field // 3 - dirty, need to be persisted
// In memory. // 2 - snapshot of counters in progress
sessionConnRetry uint // 1 - batched for persistent write
// 0 - persisted
dirty atomic.Int32
// In-memory counters.
lastSeenTimestamp int64
connTotalDuration time.Duration
sessionConnRetry uint64
sessionConnDuration time.Duration sessionConnDuration time.Duration
sessionConnDirection PeerConnectionDirection sessionConnDirection PeerConnectionDirection
sync.Mutex
}
// NewCollector is a convenient constructor for creating new Collector. // Persistent counters.
func NewCollector(db *shed.DB) *Collector { persistentLastSeenTimestamp atomic.Value
return &Collector{ persistentConnTotalDuration atomic.Value
db: db,
counters: make(map[string]*Counters),
}
} }
// Collector collects various metrics about // flush writes the current state of in memory counters into the given db.
// peers specified be the swarm.Address. func (cs *Counters) flush(db *shed.DB, batch *leveldb.Batch) error {
type Collector struct { if cs.dirty.Load() > 1 {
db *shed.DB return nil
mu sync.RWMutex // mu guards counters. }
counters map[string]*Counters cs.dirty.CAS(3, 2)
}
// Record records a set of metrics for peer specified by the given address.
// The execution doesn't stop if some metric operation returns an error, it
// rather continues and all the execution errors are returned.
func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) error {
key := addr.String()
c.mu.RLock() cs.Lock()
cs, ok := c.counters[key] var (
c.mu.RUnlock() key = cs.peer.String()
lastSeenTimestampSnapshot = cs.lastSeenTimestamp
connectionTotalDurationSnapshot = cs.connTotalDuration
)
cs.Unlock()
ls, ok := cs.persistentLastSeenTimestamp.Load().(*shed.Uint64Field)
if !ok { if !ok {
mk := newPeerKey(peerLastSeenTimestamp, key) mk := newPeerKey(peerLastSeenTimestamp, key)
ls, err := c.db.NewUint64Field(mk.String()) field, err := db.NewUint64Field(mk.String())
if err != nil { if err != nil {
return fmt.Errorf("field initialization for %q failed: %w", mk, err) return fmt.Errorf("field initialization for %q failed: %w", mk, err)
} }
ls = &field
cs.persistentLastSeenTimestamp.Store(ls)
}
mk = newPeerKey(peerTotalConnectionDuration, key) cd, ok := cs.persistentConnTotalDuration.Load().(*shed.Uint64Field)
cd, err := c.db.NewUint64Field(mk.String()) if !ok {
mk := newPeerKey(peerTotalConnectionDuration, key)
field, err := db.NewUint64Field(mk.String())
if err != nil { if err != nil {
return fmt.Errorf("field initialization for %q failed: %w", mk, err) return fmt.Errorf("field initialization for %q failed: %w", mk, err)
} }
cd = &field
cs.persistentConnTotalDuration.Store(cd)
}
ls.PutInBatch(batch, uint64(lastSeenTimestampSnapshot))
cd.PutInBatch(batch, uint64(connectionTotalDurationSnapshot))
cs = &Counters{ cs.dirty.CAS(2, 1)
lastSeenTimestamp: &ls,
connTotalDuration: &cd, return nil
}
// snapshot returns current snapshot of counters referenced to the given t.
func (cs *Counters) snapshot(t time.Time) *Snapshot {
cs.Lock()
defer cs.Unlock()
connTotalDuration := cs.connTotalDuration
sessionConnDuration := cs.sessionConnDuration
if cs.loggedIn {
sessionConnDuration = t.Sub(time.Unix(0, cs.lastSeenTimestamp))
connTotalDuration += sessionConnDuration
} }
return &Snapshot{
LastSeenTimestamp: cs.lastSeenTimestamp,
SessionConnectionRetry: cs.sessionConnRetry,
ConnectionTotalDuration: connTotalDuration,
SessionConnectionDuration: sessionConnDuration,
SessionConnectionDirection: cs.sessionConnDirection,
} }
}
c.mu.Lock() // NewCollector is a convenient constructor for creating new Collector.
c.counters[key] = cs func NewCollector(db *shed.DB) *Collector {
c.mu.Unlock() return &Collector{db: db}
}
var err error // Collector collects various metrics about
for i, op := range rop { // peers specified be the swarm.Address.
if opErr := op(cs); opErr != nil { type Collector struct {
err = multierror.Append(err, fmt.Errorf("operation #%d for %q failed: %w", i, key, opErr)) db *shed.DB
} counters sync.Map
}
// Record records a set of metrics for peer specified by the given address.
func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) {
val, _ := c.counters.LoadOrStore(addr.ByteString(), &Counters{peer: &addr})
for _, op := range rop {
op(val.(*Counters))
} }
return err
} }
// Snapshot returns the current state of the metrics collector for peer(s). // Snapshot returns the current state of the metrics collector for peer(s).
...@@ -228,105 +255,120 @@ func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) error { ...@@ -228,105 +255,120 @@ func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) error {
// returned. If the peer is still logged in, the session-related counters will // returned. If the peer is still logged in, the session-related counters will
// be evaluated against the last seen time, which equals to the login time. If // be evaluated against the last seen time, which equals to the login time. If
// the peer is logged out, then the session counters will reflect its last // the peer is logged out, then the session counters will reflect its last
// session. The execution doesn't stop if some metric collection returns an // session.
// error, it rather continues and all the execution errors are returned together func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) map[string]*Snapshot {
// with the successful metrics snapshots.
func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[string]*Snapshot, error) {
var mErr error
snapshot := make(map[string]*Snapshot) snapshot := make(map[string]*Snapshot)
take := func(addr string) { for _, addr := range addresses {
val, ok := c.counters.Load(addr.ByteString())
c.mu.RLock() if !ok {
cs := c.counters[addr] continue
c.mu.RUnlock()
if cs == nil {
return
} }
cs := val.(*Counters)
ls, err := cs.lastSeenTimestamp.Get() snapshot[addr.ByteString()] = cs.snapshot(t)
if err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to take last seen snapshot for %q: %w", addr, err))
} }
lastSeenTimestamp := int64(ls)
cn, err := cs.connTotalDuration.Get() if len(addresses) == 0 {
if err != nil { c.counters.Range(func(key, val interface{}) bool {
mErr = multierror.Append(mErr, fmt.Errorf("unable to take connection duration snapshot for %q: %w", addr, err)) cs := val.(*Counters)
snapshot[cs.peer.ByteString()] = cs.snapshot(t)
return true
})
} }
connTotalDuration := time.Duration(cn)
cs.Lock() return snapshot
sessionConnDuration := cs.sessionConnDuration }
if cs.loggedIn {
sessionConnDuration = t.Sub(time.Unix(0, lastSeenTimestamp))
connTotalDuration += sessionConnDuration
}
snapshot[addr] = &Snapshot{ // Inspect allows to inspect current snapshot for the given
LastSeenTimestamp: lastSeenTimestamp, // peer address by executing the inspection function.
SessionConnectionRetry: cs.sessionConnRetry, func (c *Collector) Inspect(addr swarm.Address, fn func(ss *Snapshot)) {
ConnectionTotalDuration: connTotalDuration, snapshots := c.Snapshot(time.Now(), addr)
SessionConnectionDuration: sessionConnDuration, fn(snapshots[addr.ByteString()])
SessionConnectionDirection: cs.sessionConnDirection, }
}
cs.Unlock() // Flush sync the dirty in memory counters by flushing their values to the
} // underlying storage. If an address or a set of addresses is specified then
// only counters related to them will be flushed, otherwise counters for all
// peers will be flushed.
func (c *Collector) Flush(addresses ...swarm.Address) error {
var (
mErr error
dirty []string
batch = new(leveldb.Batch)
)
for _, addr := range addresses { for _, addr := range addresses {
take(addr.String()) val, ok := c.counters.Load(addr.ByteString())
if !ok {
continue
}
cs := val.(*Counters)
if err := cs.flush(c.db, batch); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to batch the counters of peer %q for flash: %w", addr, err))
continue
}
dirty = append(dirty, addr.ByteString())
} }
if len(addresses) == 0 { if len(addresses) == 0 {
for _, addr := range c.keys() { c.counters.Range(func(_, val interface{}) bool {
take(addr) cs := val.(*Counters)
if err := cs.flush(c.db, batch); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to batch the counters of peer %q for flash: %w", cs.peer, err))
return true
} }
dirty = append(dirty, cs.peer.ByteString())
return true
})
} }
return snapshot, mErr if batch.Len() == 0 {
} return mErr
}
func (c *Collector) keys() []string { if err := c.db.WriteBatch(batch); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to persist counters in batch: %w", err))
c.mu.RLock()
defer c.mu.RUnlock()
keys := make([]string, 0, len(c.counters))
for k := range c.counters {
keys = append(keys, k)
} }
return keys for _, addr := range dirty {
} val, ok := c.counters.Load(addr)
if !ok {
// Inspect allows to inspect current snapshot for the given peer address by continue
// executing the given fn in a safe manner when write to the counters is }
// blocked while the performing inspection function is executed. cs := val.(*Counters)
func (c *Collector) Inspect(addr swarm.Address, fn func(ss *Snapshot)) error { cs.dirty.CAS(1, 0)
snapshots, err := c.Snapshot(time.Now(), addr)
if err != nil {
return err
} }
fn(snapshots[addr.String()])
return nil return mErr
} }
// Finalize logs out all ongoing peer sessions // Finalize logs out all ongoing peer sessions
// and flushes all in-memory metrics counters. // and flushes all in-memory metrics counters.
func (c *Collector) Finalize(t time.Time) error { func (c *Collector) Finalize(t time.Time) error {
c.mu.Lock() var (
defer c.mu.Unlock() mErr error
batch = new(leveldb.Batch)
)
c.counters.Range(func(_, val interface{}) bool {
cs := val.(*Counters)
PeerLogOut(t)(cs)
if err := cs.flush(c.db, batch); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to flush counters for peer %q: %w", cs.peer, err))
}
return true
})
var mErr error if batch.Len() > 0 {
for addr, cs := range c.counters { if err := c.db.WriteBatch(batch); err != nil {
if err := PeerLogOut(t)(cs); err != nil { mErr = multierror.Append(mErr, fmt.Errorf("unable to persist counters in batch: %w", err))
mErr = multierror.Append(mErr, fmt.Errorf("unable to logout peer %q: %w", addr, err))
} }
delete(c.counters, addr)
} }
c.counters.Range(func(_, val interface{}) bool {
cs := val.(*Counters)
c.counters.Delete(cs.peer.ByteString())
return true
})
return mErr return mErr
} }
...@@ -17,18 +17,15 @@ import ( ...@@ -17,18 +17,15 @@ import (
func snapshot(t *testing.T, mc *metrics.Collector, sst time.Time, addr swarm.Address) *metrics.Snapshot { func snapshot(t *testing.T, mc *metrics.Collector, sst time.Time, addr swarm.Address) *metrics.Snapshot {
t.Helper() t.Helper()
ss, err := mc.Snapshot(sst, addr) ss := mc.Snapshot(sst, addr)
if err != nil {
t.Fatalf("Snapshot(%q, ...): unexpected error: %v", addr, err)
}
if have, want := len(ss), 1; have != want { if have, want := len(ss), 1; have != want {
t.Fatalf("Snapshot(%q, ...): length mismatch: have: %d; want: %d", addr, have, want) t.Fatalf("Snapshot(%q, ...): length mismatch: have: %d; want: %d", addr, have, want)
} }
pm, ok := ss[addr.String()] cs, ok := ss[addr.ByteString()]
if !ok { if !ok {
t.Fatalf("Snapshot(%q, ...): missing peer metrics", addr) t.Fatalf("Snapshot(%q, ...): missing peer metrics", addr)
} }
return pm return cs
} }
func TestPeerMetricsCollector(t *testing.T) { func TestPeerMetricsCollector(t *testing.T) {
...@@ -52,20 +49,14 @@ func TestPeerMetricsCollector(t *testing.T) { ...@@ -52,20 +49,14 @@ func TestPeerMetricsCollector(t *testing.T) {
) )
// Inc session conn retry. // Inc session conn retry.
err = mc.Record(addr, metrics.IncSessionConnectionRetry()) mc.Record(addr, metrics.IncSessionConnectionRetry())
if err != nil {
t.Fatalf("Record(%q, ...): unexpected error: %v", addr, err)
}
ss := snapshot(t, mc, t2, addr) ss := snapshot(t, mc, t2, addr)
if have, want := ss.SessionConnectionRetry, uint(1); have != want { if have, want := ss.SessionConnectionRetry, uint64(1); have != want {
t.Fatalf("Snapshot(%q, ...): session connection retry counter mismatch: have %d; want %d", addr, have, want) t.Fatalf("Snapshot(%q, ...): session connection retry counter mismatch: have %d; want %d", addr, have, want)
} }
// Login. // Login.
err = mc.Record(addr, metrics.PeerLogIn(t1, metrics.PeerConnectionDirectionInbound)) mc.Record(addr, metrics.PeerLogIn(t1, metrics.PeerConnectionDirectionInbound))
if err != nil {
t.Fatalf("Record(%q, ...): unexpected error: %v", addr, err)
}
ss = snapshot(t, mc, t2, addr) ss = snapshot(t, mc, t2, addr)
if have, want := ss.LastSeenTimestamp, t1.UnixNano(); have != want { if have, want := ss.LastSeenTimestamp, t1.UnixNano(); have != want {
t.Fatalf("Snapshot(%q, ...): last seen counter mismatch: have %d; want %d", addr, have, want) t.Fatalf("Snapshot(%q, ...): last seen counter mismatch: have %d; want %d", addr, have, want)
...@@ -81,10 +72,7 @@ func TestPeerMetricsCollector(t *testing.T) { ...@@ -81,10 +72,7 @@ func TestPeerMetricsCollector(t *testing.T) {
} }
// Login when already logged in. // Login when already logged in.
err = mc.Record(addr, metrics.PeerLogIn(t1.Add(1*time.Second), metrics.PeerConnectionDirectionOutbound)) mc.Record(addr, metrics.PeerLogIn(t1.Add(1*time.Second), metrics.PeerConnectionDirectionOutbound))
if err != nil {
t.Fatalf("Record(%q, ...): unexpected error: %v", addr, err)
}
ss = snapshot(t, mc, t2, addr) ss = snapshot(t, mc, t2, addr)
if have, want := ss.LastSeenTimestamp, t1.UnixNano(); have != want { if have, want := ss.LastSeenTimestamp, t1.UnixNano(); have != want {
t.Fatalf("Snapshot(%q, ...): last seen counter mismatch: have %d; want %d", addr, have, want) t.Fatalf("Snapshot(%q, ...): last seen counter mismatch: have %d; want %d", addr, have, want)
...@@ -100,20 +88,14 @@ func TestPeerMetricsCollector(t *testing.T) { ...@@ -100,20 +88,14 @@ func TestPeerMetricsCollector(t *testing.T) {
} }
// Inc session conn retry. // Inc session conn retry.
err = mc.Record(addr, metrics.IncSessionConnectionRetry()) mc.Record(addr, metrics.IncSessionConnectionRetry())
if err != nil {
t.Fatalf("Record(%q, ...): unexpected error: %v", addr, err)
}
ss = snapshot(t, mc, t2, addr) ss = snapshot(t, mc, t2, addr)
if have, want := ss.SessionConnectionRetry, uint(2); have != want { if have, want := ss.SessionConnectionRetry, uint64(2); have != want {
t.Fatalf("Snapshot(%q, ...): session connection retry counter mismatch: have %d; want %d", addr, have, want) t.Fatalf("Snapshot(%q, ...): session connection retry counter mismatch: have %d; want %d", addr, have, want)
} }
// Logout. // Logout.
err = mc.Record(addr, metrics.PeerLogOut(t3)) mc.Record(addr, metrics.PeerLogOut(t3))
if err != nil {
t.Fatalf("Record(%q, ...): unexpected error: %v", addr, err)
}
ss = snapshot(t, mc, t2, addr) ss = snapshot(t, mc, t2, addr)
if have, want := ss.LastSeenTimestamp, t3.UnixNano(); have != want { if have, want := ss.LastSeenTimestamp, t3.UnixNano(); have != want {
t.Fatalf("Snapshot(%q, ...): last seen counter mismatch: have %d; want %d", addr, have, want) t.Fatalf("Snapshot(%q, ...): last seen counter mismatch: have %d; want %d", addr, have, want)
...@@ -121,7 +103,7 @@ func TestPeerMetricsCollector(t *testing.T) { ...@@ -121,7 +103,7 @@ func TestPeerMetricsCollector(t *testing.T) {
if have, want := ss.ConnectionTotalDuration, t3.Sub(t1); have != want { if have, want := ss.ConnectionTotalDuration, t3.Sub(t1); have != want {
t.Fatalf("Snapshot(%q, ...): connection total duration counter mismatch: have %s; want %s", addr, have, want) t.Fatalf("Snapshot(%q, ...): connection total duration counter mismatch: have %s; want %s", addr, have, want)
} }
if have, want := ss.SessionConnectionRetry, uint(2); have != want { if have, want := ss.SessionConnectionRetry, uint64(2); have != want {
t.Fatalf("Snapshot(%q, ...): session connection retry counter mismatch: have %d; want %d", addr, have, want) t.Fatalf("Snapshot(%q, ...): session connection retry counter mismatch: have %d; want %d", addr, have, want)
} }
if have, want := ss.SessionConnectionDuration, t3.Sub(t1); have != want { if have, want := ss.SessionConnectionDuration, t3.Sub(t1); have != want {
...@@ -129,29 +111,24 @@ func TestPeerMetricsCollector(t *testing.T) { ...@@ -129,29 +111,24 @@ func TestPeerMetricsCollector(t *testing.T) {
} }
// Inspect. // Inspect.
if err := mc.Inspect(addr, func(have *metrics.Snapshot) { mc.Inspect(addr, func(have *metrics.Snapshot) {
want := ss want := ss
if diff := cmp.Diff(have, want); diff != "" { if diff := cmp.Diff(have, want); diff != "" {
t.Fatalf("unexpected snapshot diffrence:\n%s", diff) t.Fatalf("unexpected snapshot diffrence:\n%s", diff)
} }
}); err != nil { })
t.Fatalf("Inspect(%q, ...): unexpected error: %v", addr, err)
// Flush.
if err := mc.Flush(addr); err != nil {
t.Fatalf("Flush(): unexpected error: %v", err)
} }
// Finalize. // Finalize.
err = mc.Record(addr, metrics.PeerLogIn(t1, metrics.PeerConnectionDirectionInbound)) mc.Record(addr, metrics.PeerLogIn(t1, metrics.PeerConnectionDirectionInbound))
if err != nil { if err := mc.Finalize(t3); err != nil {
t.Fatalf("Record(%q, ...): unexpected error: %v", addr, err)
}
err = mc.Finalize(t3)
if err != nil {
t.Fatalf("Finalize(%s): unexpected error: %v", t3, err) t.Fatalf("Finalize(%s): unexpected error: %v", t3, err)
} }
snapshots, err := mc.Snapshot(t2, addr) if have, want := len(mc.Snapshot(t2, addr)), 0; have != want {
if err != nil {
t.Fatalf("Snapshot(%q, ...): unexpected error: %v", addr, err)
}
if have, want := len(snapshots), 0; have != want {
t.Fatalf("Finalize(%s): counters length mismatch: have %d; want %d", t3, have, want) t.Fatalf("Finalize(%s): counters length mismatch: have %d; want %d", t3, have, want)
} }
} }
...@@ -364,12 +364,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, ...@@ -364,12 +364,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
k.connectedPeers.Add(peer.addr, peer.po) k.connectedPeers.Add(peer.addr, peer.po)
if err := k.collector.Record( k.collector.Record(peer.addr, metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionOutbound))
peer.addr,
metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionOutbound),
); err != nil {
k.logger.Debugf("kademlia: unable to record login outbound metrics for %q: %v", peer.addr, err)
}
k.depthMu.Lock() k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers, k.radius) k.depth = recalcDepth(k.connectedPeers, k.radius)
...@@ -447,6 +442,9 @@ func (k *Kad) manage() { ...@@ -447,6 +442,9 @@ func (k *Kad) manage() {
case <-k.quit: case <-k.quit:
return return
case <-time.After(30 * time.Second): case <-time.After(30 * time.Second):
if err := k.collector.Flush(); err != nil {
k.logger.Debugf("kademlia: unable to flush metrics counters to the persistent store: %v", err)
}
k.notifyManageLoop() k.notifyManageLoop()
case <-k.manageC: case <-k.manageC:
start := time.Now() start := time.Now()
...@@ -663,11 +661,9 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -663,11 +661,9 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
failedAttempts++ failedAttempts++
} }
if err := k.collector.Record(peer, metrics.IncSessionConnectionRetry()); err != nil { k.collector.Record(peer, metrics.IncSessionConnectionRetry())
k.logger.Debugf("kademlia: unable to record session connection retry metrics for %q: %v", peer, err)
}
if err := k.collector.Inspect(peer, func(ss *metrics.Snapshot) { k.collector.Inspect(peer, func(ss *metrics.Snapshot) {
quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt() quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt()
if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts { if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts {
...@@ -680,9 +676,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -680,9 +676,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
} else { } else {
k.waitNext.Set(peer, retryTime, failedAttempts) k.waitNext.Set(peer, retryTime, failedAttempts)
} }
}); err != nil { })
k.logger.Debugf("kademlia: connect: unable to inspect snapshot for %q: %v", peer, err)
}
return err return err
case !i.Overlay.Equal(peer): case !i.Overlay.Equal(peer):
...@@ -802,12 +796,7 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error { ...@@ -802,12 +796,7 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
k.knownPeers.Add(addr, po) k.knownPeers.Add(addr, po)
k.connectedPeers.Add(addr, po) k.connectedPeers.Add(addr, po)
if err := k.collector.Record( k.collector.Record(addr, metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionInbound))
addr,
metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionInbound),
); err != nil {
k.logger.Debugf("kademlia: unable to record login inbound metrics for %q: %v", addr, err)
}
k.waitNext.Remove(addr) k.waitNext.Remove(addr)
...@@ -830,12 +819,7 @@ func (k *Kad) Disconnected(peer p2p.Peer) { ...@@ -830,12 +819,7 @@ func (k *Kad) Disconnected(peer p2p.Peer) {
k.waitNext.SetTryAfter(peer.Address, time.Now().Add(timeToRetry)) k.waitNext.SetTryAfter(peer.Address, time.Now().Add(timeToRetry))
if err := k.collector.Record( k.collector.Record(peer.Address, metrics.PeerLogOut(time.Now()))
peer.Address,
metrics.PeerLogOut(time.Now()),
); err != nil {
k.logger.Debugf("kademlia: unable to record logout metrics for %q: %v", peer.Address, err)
}
k.depthMu.Lock() k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers, k.radius) k.depth = recalcDepth(k.connectedPeers, k.radius)
...@@ -1097,10 +1081,7 @@ func (k *Kad) Snapshot() *topology.KadParams { ...@@ -1097,10 +1081,7 @@ func (k *Kad) Snapshot() *topology.KadParams {
infos = append(infos, topology.BinInfo{}) infos = append(infos, topology.BinInfo{})
} }
ss, err := k.collector.Snapshot(time.Now()) ss := k.collector.Snapshot(time.Now())
if err != nil {
k.logger.Debugf("kademlia: unable to take metrics snapshot: %v", err)
}
_ = k.connectedPeers.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) { _ = k.connectedPeers.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) {
infos[po].BinConnected++ infos[po].BinConnected++
...@@ -1108,7 +1089,7 @@ func (k *Kad) Snapshot() *topology.KadParams { ...@@ -1108,7 +1089,7 @@ func (k *Kad) Snapshot() *topology.KadParams {
infos[po].ConnectedPeers, infos[po].ConnectedPeers,
&topology.PeerInfo{ &topology.PeerInfo{
Address: addr, Address: addr,
Metrics: createMetricsSnapshotView(ss[addr.String()]), Metrics: createMetricsSnapshotView(ss[addr.ByteString()]),
}, },
) )
return false, false, nil return false, false, nil
...@@ -1129,7 +1110,7 @@ func (k *Kad) Snapshot() *topology.KadParams { ...@@ -1129,7 +1110,7 @@ func (k *Kad) Snapshot() *topology.KadParams {
infos[po].DisconnectedPeers, infos[po].DisconnectedPeers,
&topology.PeerInfo{ &topology.PeerInfo{
Address: addr, Address: addr,
Metrics: createMetricsSnapshotView(ss[addr.String()]), Metrics: createMetricsSnapshotView(ss[addr.ByteString()]),
}, },
) )
return false, false, nil return false, false, nil
......
...@@ -73,7 +73,7 @@ type PeerInfo struct { ...@@ -73,7 +73,7 @@ type PeerInfo struct {
// MetricSnapshotView represents snapshot of metrics counters in more human readable form. // MetricSnapshotView represents snapshot of metrics counters in more human readable form.
type MetricSnapshotView struct { type MetricSnapshotView struct {
LastSeenTimestamp int64 `json:"lastSeenTimestamp"` LastSeenTimestamp int64 `json:"lastSeenTimestamp"`
SessionConnectionRetry uint `json:"sessionConnectionRetry"` SessionConnectionRetry uint64 `json:"sessionConnectionRetry"`
ConnectionTotalDuration float64 `json:"connectionTotalDuration"` ConnectionTotalDuration float64 `json:"connectionTotalDuration"`
SessionConnectionDuration float64 `json:"sessionConnectionDuration"` SessionConnectionDuration float64 `json:"sessionConnectionDuration"`
SessionConnectionDirection string `json:"sessionConnectionDirection"` SessionConnectionDirection string `json:"sessionConnectionDirection"`
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment