Commit a7c05c20 authored by Peter Mrekaj's avatar Peter Mrekaj Committed by GitHub

feat(kademlia): extend information kept about peers (#1658)

The following extended metrics information are exposed through
the debug API:
- last seen timestamp - updated every time when the peer
connects/disconnects
- connection total duration - sums of the duration of the sessions
  (in seconds)
- session connection retry count - number of attempts to create
  a new connection
- session connection duration - duration (in seconds) of last or
  current (if in progress) session
- session connection direction - specifies whether the connection is
  outbound or inbound
parent 0a1ad9a7
...@@ -58,6 +58,7 @@ import ( ...@@ -58,6 +58,7 @@ import (
"github.com/ethersphere/bee/pkg/settlement/swap" "github.com/ethersphere/bee/pkg/settlement/swap"
"github.com/ethersphere/bee/pkg/settlement/swap/chequebook" "github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
"github.com/ethersphere/bee/pkg/settlement/swap/transaction" "github.com/ethersphere/bee/pkg/settlement/swap/transaction"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/steward" "github.com/ethersphere/bee/pkg/steward"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
...@@ -425,7 +426,12 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -425,7 +426,12 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
var swapService *swap.Service var swapService *swap.Service
kad := kademlia.New(swarmAddress, addressbook, hive, p2ps, logger, kademlia.Options{Bootnodes: bootnodes, StandaloneMode: o.Standalone, BootnodeMode: o.BootnodeMode}) metricsDB, err := shed.NewDBWrap(stateStore.DB())
if err != nil {
return nil, fmt.Errorf("unable to create metrics storage for kademlia: %w", err)
}
kad := kademlia.New(swarmAddress, addressbook, hive, p2ps, metricsDB, logger, kademlia.Options{Bootnodes: bootnodes, StandaloneMode: o.Standalone, BootnodeMode: o.BootnodeMode})
b.topologyCloser = kad b.topologyCloser = kad
hive.SetAddPeersHandler(kad.AddPeers) hive.SetAddPeersHandler(kad.AddPeers)
p2ps.SetPickyNotifier(kad) p2ps.SetPickyNotifier(kad)
......
...@@ -83,6 +83,16 @@ func NewDB(path string, o *Options) (db *DB, err error) { ...@@ -83,6 +83,16 @@ func NewDB(path string, o *Options) (db *DB, err error) {
return nil, err return nil, err
} }
return NewDBWrap(ldb)
}
// NewDBWrap returns new DB which uses the given ldb as its underlying storage.
// The function will panics if the given ldb is nil.
func NewDBWrap(ldb *leveldb.DB) (db *DB, err error) {
if ldb == nil {
panic(errors.New("shed: NewDBWrap: nil ldb"))
}
db = &DB{ db = &DB{
ldb: ldb, ldb: ldb,
metrics: newMetrics(), metrics: newMetrics(),
...@@ -90,7 +100,7 @@ func NewDB(path string, o *Options) (db *DB, err error) { ...@@ -90,7 +100,7 @@ func NewDB(path string, o *Options) (db *DB, err error) {
if _, err = db.getSchema(); err != nil { if _, err = db.getSchema(); err != nil {
if errors.Is(err, leveldb.ErrNotFound) { if errors.Is(err, leveldb.ErrNotFound) {
// save schema with initialized default fields // Save schema with initialized default fields.
if err = db.putSchema(schema{ if err = db.putSchema(schema{
Fields: make(map[string]fieldSpec), Fields: make(map[string]fieldSpec),
Indexes: make(map[byte]indexSpec), Indexes: make(map[byte]indexSpec),
...@@ -102,7 +112,7 @@ func NewDB(path string, o *Options) (db *DB, err error) { ...@@ -102,7 +112,7 @@ func NewDB(path string, o *Options) (db *DB, err error) {
} }
} }
// Create a quit channel for the periodic metrics collector and run it // Create a quit channel for the periodic metrics collector and run it.
db.quit = make(chan struct{}) db.quit = make(chan struct{})
return db, nil return db, nil
......
...@@ -25,7 +25,7 @@ type store struct { ...@@ -25,7 +25,7 @@ type store struct {
logger logging.Logger logger logging.Logger
} }
// New creates a new persistent state storage. // NewStateStore creates a new persistent state storage.
func NewStateStore(path string, l logging.Logger) (storage.StateStorer, error) { func NewStateStore(path string, l logging.Logger) (storage.StateStorer, error) {
db, err := leveldb.OpenFile(path, nil) db, err := leveldb.OpenFile(path, nil)
if err != nil { if err != nil {
...@@ -138,6 +138,11 @@ func (s *store) putSchemaName(val string) error { ...@@ -138,6 +138,11 @@ func (s *store) putSchemaName(val string) error {
return s.db.Put([]byte(dbSchemaKey), []byte(val), nil) return s.db.Put([]byte(dbSchemaKey), []byte(val), nil)
} }
// DB implements StateStorer.DB method.
func (s *store) DB() *leveldb.DB {
return s.db
}
// Close releases the resources used by the store. // Close releases the resources used by the store.
func (s *store) Close() error { func (s *store) Close() error {
return s.db.Close() return s.db.Close()
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"sync" "sync"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/syndtr/goleveldb/leveldb"
) )
var _ storage.StateStorer = (*store)(nil) var _ storage.StateStorer = (*store)(nil)
...@@ -99,6 +100,11 @@ func (s *store) Iterate(prefix string, iterFunc storage.StateIterFunc) (err erro ...@@ -99,6 +100,11 @@ func (s *store) Iterate(prefix string, iterFunc storage.StateIterFunc) (err erro
return nil return nil
} }
// DB implements StateStorer.DB method.
func (s *store) DB() *leveldb.DB {
return nil
}
func (s *store) Close() (err error) { func (s *store) Close() (err error) {
return nil return nil
} }
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"io" "io"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
) )
var ( var (
...@@ -173,6 +174,8 @@ type StateStorer interface { ...@@ -173,6 +174,8 @@ type StateStorer interface {
Put(key string, i interface{}) (err error) Put(key string, i interface{}) (err error)
Delete(key string) (err error) Delete(key string) (err error)
Iterate(prefix string, iterFunc StateIterFunc) (err error) Iterate(prefix string, iterFunc StateIterFunc) (err error)
// DB returns the underlying DB storage.
DB() *leveldb.DB
io.Closer io.Closer
} }
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package metrics provides service for collecting various metrics about peers.
// It is intended to be used with the kademlia where the metrics are collected.
package metrics
import (
"fmt"
"sync"
"time"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/hashicorp/go-multierror"
)
const (
peerLastSeenTimestamp string = "peer-last-seen-timestamp"
peerTotalConnectionDuration string = "peer-total-connection-duration"
)
// PeerConnectionDirection represents peer connection direction.
type PeerConnectionDirection string
const (
PeerConnectionDirectionInbound PeerConnectionDirection = "inbound"
PeerConnectionDirectionOutbound PeerConnectionDirection = "outbound"
)
// peerKey is used to store peers' persistent metrics counters.
type peerKey struct {
prefix string
address string
}
// String implements Stringer.String method.
func (pk peerKey) String() string {
return fmt.Sprintf("%s-%s", pk.prefix, pk.address)
}
// newPeerKey is a convenient constructor for creating new peerKey.
func newPeerKey(p, a string) *peerKey {
return &peerKey{
prefix: p,
address: a,
}
}
// RecordOp is a definition of a peer metrics Record
// operation whose execution modifies a specific metrics.
type RecordOp func(*Counters) error
// PeerLogIn will first update the current last seen to the give time t and as
// the second it'll set the direction of the session connection to the given
// value. The force flag will force the peer re-login if he's already logged in.
// The time is set as Unix timestamp ignoring the timezone. The operation will
// panics if the given time is before the Unix epoch.
func PeerLogIn(t time.Time, dir PeerConnectionDirection) RecordOp {
return func(cs *Counters) error {
if cs.loggedIn {
return nil // Ignore when the peer is already logged in.
}
cs.loggedIn = true
ls := t.UnixNano()
if ls < 0 {
panic(fmt.Errorf("time before unix epoch: %s", t))
}
cs.sessionConnDirection = dir
return cs.lastSeenTimestamp.Put(uint64(ls))
}
}
// PeerLogOut will first update the connection session and total duration with
// the difference of the given time t and the current last seen value. As the
// second it'll also update the last seen peer metrics to the given time t.
// The time is set as Unix timestamp ignoring the timezone. The operation will
// panics if the given time is before the Unix epoch.
func PeerLogOut(t time.Time) RecordOp {
return func(cs *Counters) error {
if !cs.loggedIn {
return nil // Ignore when the peer is not logged in.
}
cs.loggedIn = false
unixt := t.UnixNano()
newLs := uint64(unixt)
if unixt < 0 {
panic(fmt.Errorf("time before unix epoch: %s", t))
}
curLs, err := cs.lastSeenTimestamp.Get()
if err != nil {
return err
}
ctd, err := cs.connTotalDuration.Get()
if err != nil {
return err
}
diff := newLs - curLs
cs.sessionConnDuration = time.Duration(diff)
err = cs.connTotalDuration.Put(ctd + diff)
if err != nil {
return err
}
return cs.lastSeenTimestamp.Put(newLs)
}
}
// IncSessionConnectionRetry increments the session connection retry
// counter by 1.
func IncSessionConnectionRetry() RecordOp {
return func(cs *Counters) error {
cs.sessionConnRetry++
return nil
}
}
// Snapshot represents a snapshot of peers' metrics counters.
type Snapshot struct {
LastSeenTimestamp int64
SessionConnectionRetry uint
ConnectionTotalDuration time.Duration
SessionConnectionDuration time.Duration
SessionConnectionDirection PeerConnectionDirection
}
// Counters represents a collection of peer metrics
// mainly collected for statistics and debugging.
type Counters struct {
loggedIn bool
// Persistent.
lastSeenTimestamp *shed.Uint64Field
connTotalDuration *shed.Uint64Field
// In memory.
sessionConnRetry uint
sessionConnDuration time.Duration
sessionConnDirection PeerConnectionDirection
}
// NewCollector is a convenient constructor for creating new Collector.
func NewCollector(db *shed.DB) *Collector {
return &Collector{
db: db,
counters: make(map[string]*Counters),
}
}
// Collector collects various metrics about
// peers specified be the swarm.Address.
type Collector struct {
db *shed.DB
mu sync.RWMutex // mu guards counters.
counters map[string]*Counters
}
// Record records a set of metrics for peer specified by the given address.
// The execution doesn't stop if some metric operation returns an error, it
// rather continues and all the execution errors are returned.
func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) error {
c.mu.Lock()
defer c.mu.Unlock()
key := addr.String()
cs, ok := c.counters[key]
if !ok {
mk := newPeerKey(peerLastSeenTimestamp, key)
ls, err := c.db.NewUint64Field(mk.String())
if err != nil {
return fmt.Errorf("field initialization for %q failed: %w", mk, err)
}
mk = newPeerKey(peerTotalConnectionDuration, key)
cd, err := c.db.NewUint64Field(mk.String())
if err != nil {
return fmt.Errorf("field initialization for %q failed: %w", mk, err)
}
cs = &Counters{
lastSeenTimestamp: &ls,
connTotalDuration: &cd,
}
}
c.counters[key] = cs
var err error
for i, op := range rop {
if opErr := op(cs); opErr != nil {
err = multierror.Append(err, fmt.Errorf("operation #%d for %q failed: %w", i, key, opErr))
}
}
return err
}
// Snapshot returns the current state of the metrics collector for peer(s).
// The given time t is used to calculate the duration of the current session,
// if any. If an address or a set of addresses is specified then only metrics
// related to them will be returned, otherwise metrics for all peers will be
// returned. If the peer is still logged in, the session-related counters will
// be evaluated against the last seen time, which equals to the login time. If
// the peer is logged out, then the session counters will reflect its last
// session. The execution doesn't stop if some metric collection returns an
// error, it rather continues and all the execution errors are returned together
// with the successful metrics snapshots.
func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[string]*Snapshot, error) {
c.mu.RLock()
defer c.mu.RUnlock()
var mErr error
snapshot := make(map[string]*Snapshot)
take := func(addr string) {
cs := c.counters[addr]
if cs == nil {
return
}
ls, err := cs.lastSeenTimestamp.Get()
if err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to take last seen snapshot for %q: %w", addr, err))
}
lastSeenTimestamp := int64(ls)
cn, err := cs.connTotalDuration.Get()
if err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to take connection duration snapshot for %q: %w", addr, err))
}
connTotalDuration := time.Duration(cn)
sessionConnDuration := cs.sessionConnDuration
if cs.loggedIn {
sessionConnDuration = t.Sub(time.Unix(0, lastSeenTimestamp))
connTotalDuration += sessionConnDuration
}
snapshot[addr] = &Snapshot{
LastSeenTimestamp: lastSeenTimestamp,
SessionConnectionRetry: cs.sessionConnRetry,
ConnectionTotalDuration: connTotalDuration,
SessionConnectionDuration: sessionConnDuration,
SessionConnectionDirection: cs.sessionConnDirection,
}
}
for _, addr := range addresses {
take(addr.String())
}
if len(addresses) == 0 {
for addr := range c.counters {
take(addr)
}
}
return snapshot, mErr
}
// Finalize logs out all ongoing peer sessions
// and flushes all in-memory metrics counters.
func (c *Collector) Finalize(t time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
var mErr error
for addr, cs := range c.counters {
if err := PeerLogOut(t)(cs); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to logout peer %q: %w", addr, err))
}
delete(c.counters, addr)
}
return mErr
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package metrics_test
import (
"testing"
"time"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology/kademlia/internal/metrics"
)
func snapshot(t *testing.T, mc *metrics.Collector, sst time.Time, addr swarm.Address) *metrics.Snapshot {
t.Helper()
ss, err := mc.Snapshot(sst, addr)
if err != nil {
t.Fatalf("Snapshot(%q, ...): unexpected error: %v", addr, err)
}
if have, want := len(ss), 1; have != want {
t.Fatalf("Snapshot(%q, ...): length mismatch: have: %d; want: %d", addr, have, want)
}
pm, ok := ss[addr.String()]
if !ok {
t.Fatalf("Snapshot(%q, ...): missing peer metrics", addr)
}
return pm
}
func TestPeerMetricsCollector(t *testing.T) {
db, err := shed.NewDB("", nil)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := db.Close(); err != nil {
t.Fatal(err)
}
})
var (
mc = metrics.NewCollector(db)
addr = swarm.MustParseHexAddress("0123456789")
t1 = time.Now() // Login time.
t2 = t1.Add(10 * time.Second) // Snapshot time.
t3 = t2.Add(55 * time.Second) // Logout time.
)
// Inc session conn retry.
err = mc.Record(addr, metrics.IncSessionConnectionRetry())
if err != nil {
t.Fatalf("Record(%q, ...): unexpected error: %v", addr, err)
}
ss := snapshot(t, mc, t2, addr)
if have, want := ss.SessionConnectionRetry, uint(1); have != want {
t.Fatalf("Snapshot(%q, ...): session connection retry counter mismatch: have %d; want %d", addr, have, want)
}
// Login.
err = mc.Record(addr, metrics.PeerLogIn(t1, metrics.PeerConnectionDirectionInbound))
if err != nil {
t.Fatalf("Record(%q, ...): unexpected error: %v", addr, err)
}
ss = snapshot(t, mc, t2, addr)
if have, want := ss.LastSeenTimestamp, t1.UnixNano(); have != want {
t.Fatalf("Snapshot(%q, ...): last seen counter mismatch: have %d; want %d", addr, have, want)
}
if have, want := ss.SessionConnectionDirection, metrics.PeerConnectionDirectionInbound; have != want {
t.Fatalf("Snapshot(%q, ...): session connection direction counter mismatch: have %q; want %q", addr, have, want)
}
if have, want := ss.SessionConnectionDuration, t2.Sub(t1); have != want {
t.Fatalf("Snapshot(%q, ...): session connection duration counter mismatch: have %s; want %s", addr, have, want)
}
if have, want := ss.ConnectionTotalDuration, t2.Sub(t1); have != want {
t.Fatalf("Snapshot(%q, ...): connection total duration counter mismatch: have %s; want %s", addr, have, want)
}
// Login when already logged in.
err = mc.Record(addr, metrics.PeerLogIn(t1.Add(1*time.Second), metrics.PeerConnectionDirectionOutbound))
if err != nil {
t.Fatalf("Record(%q, ...): unexpected error: %v", addr, err)
}
ss = snapshot(t, mc, t2, addr)
if have, want := ss.LastSeenTimestamp, t1.UnixNano(); have != want {
t.Fatalf("Snapshot(%q, ...): last seen counter mismatch: have %d; want %d", addr, have, want)
}
if have, want := ss.SessionConnectionDirection, metrics.PeerConnectionDirectionInbound; have != want {
t.Fatalf("Snapshot(%q, ...): session connection direction counter mismatch: have %q; want %q", addr, have, want)
}
if have, want := ss.SessionConnectionDuration, t2.Sub(t1); have != want {
t.Fatalf("Snapshot(%q, ...): session connection duration counter mismatch: have %s; want %s", addr, have, want)
}
if have, want := ss.ConnectionTotalDuration, t2.Sub(t1); have != want {
t.Fatalf("Snapshot(%q, ...): connection total duration counter mismatch: have %s; want %s", addr, have, want)
}
// Inc session conn retry.
err = mc.Record(addr, metrics.IncSessionConnectionRetry())
if err != nil {
t.Fatalf("Record(%q, ...): unexpected error: %v", addr, err)
}
ss = snapshot(t, mc, t2, addr)
if have, want := ss.SessionConnectionRetry, uint(2); have != want {
t.Fatalf("Snapshot(%q, ...): session connection retry counter mismatch: have %d; want %d", addr, have, want)
}
// Logout.
err = mc.Record(addr, metrics.PeerLogOut(t3))
if err != nil {
t.Fatalf("Record(%q, ...): unexpected error: %v", addr, err)
}
ss = snapshot(t, mc, t2, addr)
if have, want := ss.LastSeenTimestamp, t3.UnixNano(); have != want {
t.Fatalf("Snapshot(%q, ...): last seen counter mismatch: have %d; want %d", addr, have, want)
}
if have, want := ss.ConnectionTotalDuration, t3.Sub(t1); have != want {
t.Fatalf("Snapshot(%q, ...): connection total duration counter mismatch: have %s; want %s", addr, have, want)
}
if have, want := ss.SessionConnectionRetry, uint(2); have != want {
t.Fatalf("Snapshot(%q, ...): session connection retry counter mismatch: have %d; want %d", addr, have, want)
}
if have, want := ss.SessionConnectionDuration, t3.Sub(t1); have != want {
t.Fatalf("Snapshot(%q, ...): session connection duration counter mismatch: have %q; want %q", addr, have, want)
}
// Finalize.
err = mc.Record(addr, metrics.PeerLogIn(t1, metrics.PeerConnectionDirectionInbound))
if err != nil {
t.Fatalf("Record(%q, ...): unexpected error: %v", addr, err)
}
err = mc.Finalize(t3)
if err != nil {
t.Fatalf("Finalize(%s): unexpected error: %v", t3, err)
}
sss, err := mc.Snapshot(t2, addr)
if err != nil {
t.Fatalf("Snapshot(%q, ...): unexpected error: %v", addr, err)
}
if have, want := len(sss), 0; have != want {
t.Fatalf("Finalize(%s): counters length mismatch: have %d; want %d", t3, have, want)
}
}
...@@ -20,8 +20,10 @@ import ( ...@@ -20,8 +20,10 @@ import (
"github.com/ethersphere/bee/pkg/discovery" "github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/kademlia/internal/metrics"
"github.com/ethersphere/bee/pkg/topology/pslice" "github.com/ethersphere/bee/pkg/topology/pslice"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
...@@ -85,6 +87,7 @@ type Kad struct { ...@@ -85,6 +87,7 @@ type Kad struct {
logger logging.Logger // logger logger logging.Logger // logger
standalone bool // indicates whether the node is working in standalone mode standalone bool // indicates whether the node is working in standalone mode
bootnode bool // indicates whether the node is working in bootnode mode bootnode bool // indicates whether the node is working in bootnode mode
collector *metrics.Collector
quit chan struct{} // quit channel quit chan struct{} // quit channel
done chan struct{} // signal that `manage` has quit done chan struct{} // signal that `manage` has quit
wg sync.WaitGroup wg sync.WaitGroup
...@@ -96,12 +99,15 @@ type retryInfo struct { ...@@ -96,12 +99,15 @@ type retryInfo struct {
} }
// New returns a new Kademlia. // New returns a new Kademlia.
func New(base swarm.Address, func New(
base swarm.Address,
addressbook addressbook.Interface, addressbook addressbook.Interface,
discovery discovery.Driver, discovery discovery.Driver,
p2p p2p.Service, p2p p2p.Service,
metricsDB *shed.DB,
logger logging.Logger, logger logging.Logger,
o Options) *Kad { o Options,
) *Kad {
if o.SaturationFunc == nil { if o.SaturationFunc == nil {
os := overSaturationPeers os := overSaturationPeers
if o.BootnodeMode { if o.BootnodeMode {
...@@ -129,6 +135,7 @@ func New(base swarm.Address, ...@@ -129,6 +135,7 @@ func New(base swarm.Address,
logger: logger, logger: logger,
standalone: o.StandaloneMode, standalone: o.StandaloneMode,
bootnode: o.BootnodeMode, bootnode: o.BootnodeMode,
collector: metrics.NewCollector(metricsDB),
quit: make(chan struct{}), quit: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
wg: sync.WaitGroup{}, wg: sync.WaitGroup{},
...@@ -371,6 +378,13 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, ...@@ -371,6 +378,13 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
k.connectedPeers.Add(peer.addr, peer.po) k.connectedPeers.Add(peer.addr, peer.po)
if err := k.collector.Record(
peer.addr,
metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionOutbound),
); err != nil {
k.logger.Debugf("kademlia: unable to record login outbound metrics for %q: %v", peer.addr, err)
}
k.depthMu.Lock() k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers, k.radius) k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock() k.depthMu.Unlock()
...@@ -671,6 +685,10 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -671,6 +685,10 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
failedAttempts++ failedAttempts++
} }
if err := k.collector.Record(peer, metrics.IncSessionConnectionRetry()); err != nil {
k.logger.Debugf("kademlia: unable to record session connection retry metrics for %q: %v", peer, err)
}
if failedAttempts > maxConnAttempts { if failedAttempts > maxConnAttempts {
delete(k.waitNext, peer.String()) delete(k.waitNext, peer.String())
if err := k.addressBook.Remove(peer); err != nil { if err := k.addressBook.Remove(peer); err != nil {
...@@ -812,6 +830,13 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error { ...@@ -812,6 +830,13 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
k.knownPeers.Add(addr, po) k.knownPeers.Add(addr, po)
k.connectedPeers.Add(addr, po) k.connectedPeers.Add(addr, po)
if err := k.collector.Record(
addr,
metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionInbound),
); err != nil {
k.logger.Debugf("kademlia: unable to record login inbound metrics for %q: %v", addr, err)
}
k.waitNextMu.Lock() k.waitNextMu.Lock()
delete(k.waitNext, addr.String()) delete(k.waitNext, addr.String())
k.waitNextMu.Unlock() k.waitNextMu.Unlock()
...@@ -837,6 +862,13 @@ func (k *Kad) Disconnected(peer p2p.Peer) { ...@@ -837,6 +862,13 @@ func (k *Kad) Disconnected(peer p2p.Peer) {
k.waitNext[peer.Address.String()] = retryInfo{tryAfter: time.Now().Add(timeToRetry), failedAttempts: 0} k.waitNext[peer.Address.String()] = retryInfo{tryAfter: time.Now().Add(timeToRetry), failedAttempts: 0}
k.waitNextMu.Unlock() k.waitNextMu.Unlock()
if err := k.collector.Record(
peer.Address,
metrics.PeerLogOut(time.Now()),
); err != nil {
k.logger.Debugf("kademlia: unable to record logout metrics for %q: %v", peer.Address, err)
}
k.depthMu.Lock() k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers, k.radius) k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock() k.depthMu.Unlock()
...@@ -1103,9 +1135,20 @@ func (k *Kad) Snapshot() *topology.KadParams { ...@@ -1103,9 +1135,20 @@ func (k *Kad) Snapshot() *topology.KadParams {
infos = append(infos, topology.BinInfo{}) infos = append(infos, topology.BinInfo{})
} }
ss, err := k.collector.Snapshot(time.Now())
if err != nil {
k.logger.Debugf("kademlia: unable to take metrics snapshot: %v", err)
}
_ = k.connectedPeers.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) { _ = k.connectedPeers.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) {
infos[po].BinConnected++ infos[po].BinConnected++
infos[po].ConnectedPeers = append(infos[po].ConnectedPeers, addr.String()) infos[po].ConnectedPeers = append(
infos[po].ConnectedPeers,
&topology.PeerInfo{
Address: addr,
Metrics: createMetricsSnapshotView(ss[addr.String()]),
},
)
return false, false, nil return false, false, nil
}) })
...@@ -1115,12 +1158,18 @@ func (k *Kad) Snapshot() *topology.KadParams { ...@@ -1115,12 +1158,18 @@ func (k *Kad) Snapshot() *topology.KadParams {
for _, v := range infos[po].ConnectedPeers { for _, v := range infos[po].ConnectedPeers {
// peer already connected, don't show in the known peers list // peer already connected, don't show in the known peers list
if v == addr.String() { if v.Address.Equal(addr) {
return false, false, nil return false, false, nil
} }
} }
infos[po].DisconnectedPeers = append(infos[po].DisconnectedPeers, addr.String()) infos[po].DisconnectedPeers = append(
infos[po].DisconnectedPeers,
&topology.PeerInfo{
Address: addr,
Metrics: createMetricsSnapshotView(ss[addr.String()]),
},
)
return false, false, nil return false, false, nil
}) })
...@@ -1190,6 +1239,10 @@ func (k *Kad) Close() error { ...@@ -1190,6 +1239,10 @@ func (k *Kad) Close() error {
k.wg.Wait() k.wg.Wait()
}() }()
if err := k.collector.Finalize(time.Now()); err != nil {
k.logger.Debugf("kademlia: unable to finalize open sessions: %v", err)
}
select { select {
case <-cc: case <-cc:
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
...@@ -1238,3 +1291,19 @@ func (k *Kad) randomPeer(bin uint8) (swarm.Address, error) { ...@@ -1238,3 +1291,19 @@ func (k *Kad) randomPeer(bin uint8) (swarm.Address, error) {
return peers[rndIndx.Int64()], nil return peers[rndIndx.Int64()], nil
} }
// createMetricsSnapshotView creates new topology.MetricSnapshotView from the
// given metrics.Snapshot and rounds all the timestamps and durations to its
// nearest second.
func createMetricsSnapshotView(ss *metrics.Snapshot) *topology.MetricSnapshotView {
if ss == nil {
return nil
}
return &topology.MetricSnapshotView{
LastSeenTimestamp: time.Unix(0, ss.LastSeenTimestamp).Unix(),
SessionConnectionRetry: ss.SessionConnectionRetry,
ConnectionTotalDuration: ss.ConnectionTotalDuration.Truncate(time.Second).Seconds(),
SessionConnectionDuration: ss.SessionConnectionDuration.Truncate(time.Second).Seconds(),
SessionConnectionDirection: string(ss.SessionConnectionDirection),
}
}
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethersphere/bee/pkg/shed"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/addressbook"
...@@ -47,7 +48,7 @@ var nonConnectableAddress, _ = ma.NewMultiaddr(underlayBase + "16Uiu2HAkx8ULY8cT ...@@ -47,7 +48,7 @@ var nonConnectableAddress, _ = ma.NewMultiaddr(underlayBase + "16Uiu2HAkx8ULY8cT
func TestNeighborhoodDepth(t *testing.T) { func TestNeighborhoodDepth(t *testing.T) {
var ( var (
conns int32 // how many connect calls were made to the p2p mock conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{}) base, kad, ab, _, signer = newTestKademlia(t, &conns, nil, kademlia.Options{})
) )
kad.SetRadius(swarm.MaxPO) // initial tests do not check for radius kad.SetRadius(swarm.MaxPO) // initial tests do not check for radius
...@@ -196,7 +197,7 @@ func TestNeighborhoodDepth(t *testing.T) { ...@@ -196,7 +197,7 @@ func TestNeighborhoodDepth(t *testing.T) {
func TestEachNeighbor(t *testing.T) { func TestEachNeighbor(t *testing.T) {
var ( var (
conns int32 // how many connect calls were made to the p2p mock conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{}) base, kad, ab, _, signer = newTestKademlia(t, &conns, nil, kademlia.Options{})
peers []swarm.Address peers []swarm.Address
) )
...@@ -268,7 +269,7 @@ func TestManage(t *testing.T) { ...@@ -268,7 +269,7 @@ func TestManage(t *testing.T) {
saturationFunc = func(bin uint8, peers, connected *pslice.PSlice) (bool, bool) { saturationFunc = func(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
return saturationVal, overSaturationVal return saturationVal, overSaturationVal
} }
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{BitSuffixLength: -1, SaturationFunc: saturationFunc}) base, kad, ab, _, signer = newTestKademlia(t, &conns, nil, kademlia.Options{BitSuffixLength: -1, SaturationFunc: saturationFunc})
) )
if err := kad.Start(context.Background()); err != nil { if err := kad.Start(context.Background()); err != nil {
...@@ -315,7 +316,7 @@ func TestManageWithBalancing(t *testing.T) { ...@@ -315,7 +316,7 @@ func TestManageWithBalancing(t *testing.T) {
f := *saturationFuncImpl f := *saturationFuncImpl
return f(bin, peers, connected) return f(bin, peers, connected)
} }
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{SaturationFunc: saturationFunc, BitSuffixLength: 2}) base, kad, ab, _, signer = newTestKademlia(t, &conns, nil, kademlia.Options{SaturationFunc: saturationFunc, BitSuffixLength: 2})
) )
kad.SetRadius(swarm.MaxPO) // don't use radius for checks kad.SetRadius(swarm.MaxPO) // don't use radius for checks
...@@ -378,7 +379,7 @@ func TestBinSaturation(t *testing.T) { ...@@ -378,7 +379,7 @@ func TestBinSaturation(t *testing.T) {
var ( var (
conns int32 // how many connect calls were made to the p2p mock conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{BitSuffixLength: -1}) base, kad, ab, _, signer = newTestKademlia(t, &conns, nil, kademlia.Options{BitSuffixLength: -1})
peers []swarm.Address peers []swarm.Address
) )
...@@ -433,7 +434,7 @@ func TestOversaturation(t *testing.T) { ...@@ -433,7 +434,7 @@ func TestOversaturation(t *testing.T) {
var ( var (
conns int32 // how many connect calls were made to the p2p mock conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{}) base, kad, ab, _, signer = newTestKademlia(t, &conns, nil, kademlia.Options{})
) )
kad.SetRadius(swarm.MaxPO) // don't use radius for checks kad.SetRadius(swarm.MaxPO) // don't use radius for checks
...@@ -489,7 +490,7 @@ func TestOversaturationBootnode(t *testing.T) { ...@@ -489,7 +490,7 @@ func TestOversaturationBootnode(t *testing.T) {
var ( var (
conns int32 // how many connect calls were made to the p2p mock conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{BootnodeMode: true}) base, kad, ab, _, signer = newTestKademlia(t, &conns, nil, kademlia.Options{BootnodeMode: true})
) )
kad.SetRadius(swarm.MaxPO) // don't use radius for checks kad.SetRadius(swarm.MaxPO) // don't use radius for checks
...@@ -545,7 +546,7 @@ func TestBootnodeMaxConnections(t *testing.T) { ...@@ -545,7 +546,7 @@ func TestBootnodeMaxConnections(t *testing.T) {
var ( var (
conns int32 // how many connect calls were made to the p2p mock conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{BootnodeMode: true}) base, kad, ab, _, signer = newTestKademlia(t, &conns, nil, kademlia.Options{BootnodeMode: true})
) )
kad.SetRadius(swarm.MaxPO) // don't use radius for checks kad.SetRadius(swarm.MaxPO) // don't use radius for checks
...@@ -596,7 +597,7 @@ func TestBootnodeMaxConnections(t *testing.T) { ...@@ -596,7 +597,7 @@ func TestBootnodeMaxConnections(t *testing.T) {
func TestNotifierHooks(t *testing.T) { func TestNotifierHooks(t *testing.T) {
t.Skip("disabled due to kademlia inconsistencies hotfix") t.Skip("disabled due to kademlia inconsistencies hotfix")
var ( var (
base, kad, ab, _, signer = newTestKademlia(nil, nil, kademlia.Options{}) base, kad, ab, _, signer = newTestKademlia(t, nil, nil, kademlia.Options{})
peer = test.RandomAddressAt(base, 3) peer = test.RandomAddressAt(base, 3)
addr = test.RandomAddressAt(peer, 4) // address which is closer to peer addr = test.RandomAddressAt(peer, 4) // address which is closer to peer
) )
...@@ -631,7 +632,7 @@ func TestNotifierHooks(t *testing.T) { ...@@ -631,7 +632,7 @@ func TestNotifierHooks(t *testing.T) {
func TestDiscoveryHooks(t *testing.T) { func TestDiscoveryHooks(t *testing.T) {
var ( var (
conns int32 conns int32
_, kad, ab, disc, signer = newTestKademlia(&conns, nil, kademlia.Options{}) _, kad, ab, disc, signer = newTestKademlia(t, &conns, nil, kademlia.Options{})
p1, p2, p3 = test.RandomAddress(), test.RandomAddress(), test.RandomAddress() p1, p2, p3 = test.RandomAddress(), test.RandomAddress(), test.RandomAddress()
) )
...@@ -670,7 +671,7 @@ func TestBackoff(t *testing.T) { ...@@ -670,7 +671,7 @@ func TestBackoff(t *testing.T) {
var ( var (
conns int32 // how many connect calls were made to the p2p mock conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{}) base, kad, ab, _, signer = newTestKademlia(t, &conns, nil, kademlia.Options{})
) )
if err := kad.Start(context.Background()); err != nil { if err := kad.Start(context.Background()); err != nil {
...@@ -713,7 +714,7 @@ func TestAddressBookPrune(t *testing.T) { ...@@ -713,7 +714,7 @@ func TestAddressBookPrune(t *testing.T) {
var ( var (
conns, failedConns int32 // how many connect calls were made to the p2p mock conns, failedConns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, &failedConns, kademlia.Options{}) base, kad, ab, _, signer = newTestKademlia(t, &conns, &failedConns, kademlia.Options{})
) )
if err := kad.Start(context.Background()); err != nil { if err := kad.Start(context.Background()); err != nil {
...@@ -791,6 +792,16 @@ func TestAddressBookPrune(t *testing.T) { ...@@ -791,6 +792,16 @@ func TestAddressBookPrune(t *testing.T) {
// TestClosestPeer tests that ClosestPeer method returns closest connected peer to a given address. // TestClosestPeer tests that ClosestPeer method returns closest connected peer to a given address.
func TestClosestPeer(t *testing.T) { func TestClosestPeer(t *testing.T) {
metricsDB, err := shed.NewDB("", nil)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := metricsDB.Close(); err != nil {
t.Fatal(err)
}
})
_ = waitPeers _ = waitPeers
t.Skip("disabled due to kademlia inconsistencies hotfix") t.Skip("disabled due to kademlia inconsistencies hotfix")
...@@ -811,7 +822,7 @@ func TestClosestPeer(t *testing.T) { ...@@ -811,7 +822,7 @@ func TestClosestPeer(t *testing.T) {
disc := mock.NewDiscovery() disc := mock.NewDiscovery()
ab := addressbook.New(mockstate.NewStateStore()) ab := addressbook.New(mockstate.NewStateStore())
kad := kademlia.New(base, ab, disc, p2pMock(ab, nil, nil, nil), logger, kademlia.Options{}) kad := kademlia.New(base, ab, disc, p2pMock(ab, nil, nil, nil), metricsDB, logger, kademlia.Options{})
if err := kad.Start(context.Background()); err != nil { if err := kad.Start(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -901,7 +912,7 @@ func TestKademlia_SubscribePeersChange(t *testing.T) { ...@@ -901,7 +912,7 @@ func TestKademlia_SubscribePeersChange(t *testing.T) {
} }
t.Run("single subscription", func(t *testing.T) { t.Run("single subscription", func(t *testing.T) {
base, kad, ab, _, sg := newTestKademlia(nil, nil, kademlia.Options{}) base, kad, ab, _, sg := newTestKademlia(t, nil, nil, kademlia.Options{})
if err := kad.Start(context.Background()); err != nil { if err := kad.Start(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -917,7 +928,7 @@ func TestKademlia_SubscribePeersChange(t *testing.T) { ...@@ -917,7 +928,7 @@ func TestKademlia_SubscribePeersChange(t *testing.T) {
}) })
t.Run("single subscription, remove peer", func(t *testing.T) { t.Run("single subscription, remove peer", func(t *testing.T) {
base, kad, ab, _, sg := newTestKademlia(nil, nil, kademlia.Options{}) base, kad, ab, _, sg := newTestKademlia(t, nil, nil, kademlia.Options{})
if err := kad.Start(context.Background()); err != nil { if err := kad.Start(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -936,7 +947,7 @@ func TestKademlia_SubscribePeersChange(t *testing.T) { ...@@ -936,7 +947,7 @@ func TestKademlia_SubscribePeersChange(t *testing.T) {
}) })
t.Run("multiple subscriptions", func(t *testing.T) { t.Run("multiple subscriptions", func(t *testing.T) {
base, kad, ab, _, sg := newTestKademlia(nil, nil, kademlia.Options{}) base, kad, ab, _, sg := newTestKademlia(t, nil, nil, kademlia.Options{})
if err := kad.Start(context.Background()); err != nil { if err := kad.Start(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -957,7 +968,7 @@ func TestKademlia_SubscribePeersChange(t *testing.T) { ...@@ -957,7 +968,7 @@ func TestKademlia_SubscribePeersChange(t *testing.T) {
}) })
t.Run("multiple changes", func(t *testing.T) { t.Run("multiple changes", func(t *testing.T) {
base, kad, ab, _, sg := newTestKademlia(nil, nil, kademlia.Options{}) base, kad, ab, _, sg := newTestKademlia(t, nil, nil, kademlia.Options{})
if err := kad.Start(context.Background()); err != nil { if err := kad.Start(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -982,7 +993,7 @@ func TestKademlia_SubscribePeersChange(t *testing.T) { ...@@ -982,7 +993,7 @@ func TestKademlia_SubscribePeersChange(t *testing.T) {
}) })
t.Run("no depth change", func(t *testing.T) { t.Run("no depth change", func(t *testing.T) {
_, kad, _, _, _ := newTestKademlia(nil, nil, kademlia.Options{}) _, kad, _, _, _ := newTestKademlia(t, nil, nil, kademlia.Options{})
if err := kad.Start(context.Background()); err != nil { if err := kad.Start(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -1005,7 +1016,7 @@ func TestKademlia_SubscribePeersChange(t *testing.T) { ...@@ -1005,7 +1016,7 @@ func TestKademlia_SubscribePeersChange(t *testing.T) {
func TestSnapshot(t *testing.T) { func TestSnapshot(t *testing.T) {
var conns = new(int32) var conns = new(int32)
sa, kad, ab, _, signer := newTestKademlia(conns, nil, kademlia.Options{}) sa, kad, ab, _, signer := newTestKademlia(t, conns, nil, kademlia.Options{})
if err := kad.Start(context.Background()); err != nil { if err := kad.Start(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -1053,7 +1064,7 @@ func TestStart(t *testing.T) { ...@@ -1053,7 +1064,7 @@ func TestStart(t *testing.T) {
t.Run("non-empty addressbook", func(t *testing.T) { t.Run("non-empty addressbook", func(t *testing.T) {
var conns, failedConns int32 // how many connect calls were made to the p2p mock var conns, failedConns int32 // how many connect calls were made to the p2p mock
_, kad, ab, _, signer := newTestKademlia(&conns, &failedConns, kademlia.Options{Bootnodes: bootnodes}) _, kad, ab, _, signer := newTestKademlia(t, &conns, &failedConns, kademlia.Options{Bootnodes: bootnodes})
defer kad.Close() defer kad.Close()
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
...@@ -1081,7 +1092,7 @@ func TestStart(t *testing.T) { ...@@ -1081,7 +1092,7 @@ func TestStart(t *testing.T) {
t.Run("empty addressbook", func(t *testing.T) { t.Run("empty addressbook", func(t *testing.T) {
var conns, failedConns int32 // how many connect calls were made to the p2p mock var conns, failedConns int32 // how many connect calls were made to the p2p mock
_, kad, _, _, _ := newTestKademlia(&conns, &failedConns, kademlia.Options{Bootnodes: bootnodes}) _, kad, _, _, _ := newTestKademlia(t, &conns, &failedConns, kademlia.Options{Bootnodes: bootnodes})
defer kad.Close() defer kad.Close()
if err := kad.Start(context.Background()); err != nil { if err := kad.Start(context.Background()); err != nil {
...@@ -1093,7 +1104,18 @@ func TestStart(t *testing.T) { ...@@ -1093,7 +1104,18 @@ func TestStart(t *testing.T) {
}) })
} }
func newTestKademlia(connCounter, failedConnCounter *int32, kadOpts kademlia.Options) (swarm.Address, *kademlia.Kad, addressbook.Interface, *mock.Discovery, beeCrypto.Signer) { func newTestKademlia(t *testing.T, connCounter, failedConnCounter *int32, kadOpts kademlia.Options) (swarm.Address, *kademlia.Kad, addressbook.Interface, *mock.Discovery, beeCrypto.Signer) {
t.Helper()
metricsDB, err := shed.NewDB("", nil)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := metricsDB.Close(); err != nil {
t.Fatal(err)
}
})
var ( var (
pk, _ = beeCrypto.GenerateSecp256k1Key() // random private key pk, _ = beeCrypto.GenerateSecp256k1Key() // random private key
signer = beeCrypto.NewDefaultSigner(pk) // signer signer = beeCrypto.NewDefaultSigner(pk) // signer
...@@ -1102,7 +1124,7 @@ func newTestKademlia(connCounter, failedConnCounter *int32, kadOpts kademlia.Opt ...@@ -1102,7 +1124,7 @@ func newTestKademlia(connCounter, failedConnCounter *int32, kadOpts kademlia.Opt
p2p = p2pMock(ab, signer, connCounter, failedConnCounter) // p2p mock p2p = p2pMock(ab, signer, connCounter, failedConnCounter) // p2p mock
logger = logging.New(ioutil.Discard, 0) // logger logger = logging.New(ioutil.Discard, 0) // logger
disc = mock.NewDiscovery() // mock discovery protocol disc = mock.NewDiscovery() // mock discovery protocol
kad = kademlia.New(base, ab, disc, p2p, logger, kadOpts) // kademlia instance kad = kademlia.New(base, ab, disc, p2p, metricsDB, logger, kadOpts) // kademlia instance
) )
return base, kad, ab, disc, signer return base, kad, ab, disc, signer
......
...@@ -53,16 +53,19 @@ func (c *Container) PeerInfo() topology.BinInfo { ...@@ -53,16 +53,19 @@ func (c *Container) PeerInfo() topology.BinInfo {
return topology.BinInfo{ return topology.BinInfo{
BinPopulation: uint(c.connectedPeers.Length()), BinPopulation: uint(c.connectedPeers.Length()),
BinConnected: uint(c.connectedPeers.Length()), BinConnected: uint(c.connectedPeers.Length()),
DisconnectedPeers: toAddrs(c.disconnectedPeers), DisconnectedPeers: peersInfo(c.disconnectedPeers),
ConnectedPeers: toAddrs(c.connectedPeers), ConnectedPeers: peersInfo(c.connectedPeers),
} }
} }
func toAddrs(s *pslice.PSlice) (addrs []string) { func peersInfo(s *pslice.PSlice) []*topology.PeerInfo {
if s.Length() == 0 {
return nil
}
peers := make([]*topology.PeerInfo, 0, s.Length())
_ = s.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) { _ = s.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) {
addrs = append(addrs, addr.String()) peers = append(peers, &topology.PeerInfo{Address: addr})
return false, false, nil return false, false, nil
}) })
return peers
return
} }
...@@ -64,11 +64,26 @@ type EachNeighbor interface { ...@@ -64,11 +64,26 @@ type EachNeighbor interface {
// EachPeerFunc is a callback that is called with a peer and its PO // EachPeerFunc is a callback that is called with a peer and its PO
type EachPeerFunc func(swarm.Address, uint8) (stop, jumpToNext bool, err error) type EachPeerFunc func(swarm.Address, uint8) (stop, jumpToNext bool, err error)
// PeerInfo is a view of peer information exposed to a user.
type PeerInfo struct {
Address swarm.Address `json:"address"`
Metrics *MetricSnapshotView `json:"metrics,omitempty"`
}
// MetricSnapshotView represents snapshot of metrics counters in more human readable form.
type MetricSnapshotView struct {
LastSeenTimestamp int64 `json:"lastSeenTimestamp"`
SessionConnectionRetry uint `json:"sessionConnectionRetry"`
ConnectionTotalDuration float64 `json:"connectionTotalDuration"`
SessionConnectionDuration float64 `json:"sessionConnectionDuration"`
SessionConnectionDirection string `json:"sessionConnectionDirection"`
}
type BinInfo struct { type BinInfo struct {
BinPopulation uint `json:"population"` BinPopulation uint `json:"population"`
BinConnected uint `json:"connected"` BinConnected uint `json:"connected"`
DisconnectedPeers []string `json:"disconnectedPeers"` DisconnectedPeers []*PeerInfo `json:"disconnectedPeers"`
ConnectedPeers []string `json:"connectedPeers"` ConnectedPeers []*PeerInfo `json:"connectedPeers"`
} }
type KadBins struct { type KadBins struct {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment