Commit 9dd380c4 authored by Peter Mrekaj's avatar Peter Mrekaj Committed by GitHub

feat(kademlia): add prometheus metrics (#1960)

parent 3e60bfc7
...@@ -652,6 +652,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -652,6 +652,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
debugAPIService.MustRegisterMetrics(pingPong.Metrics()...) debugAPIService.MustRegisterMetrics(pingPong.Metrics()...)
debugAPIService.MustRegisterMetrics(acc.Metrics()...) debugAPIService.MustRegisterMetrics(acc.Metrics()...)
debugAPIService.MustRegisterMetrics(storer.Metrics()...) debugAPIService.MustRegisterMetrics(storer.Metrics()...)
debugAPIService.MustRegisterMetrics(kad.Metrics()...)
if pullerService != nil { if pullerService != nil {
debugAPIService.MustRegisterMetrics(pullerService.Metrics()...) debugAPIService.MustRegisterMetrics(pullerService.Metrics()...)
......
...@@ -23,7 +23,7 @@ import ( ...@@ -23,7 +23,7 @@ import (
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/kademlia/internal/metrics" im "github.com/ethersphere/bee/pkg/topology/kademlia/internal/metrics"
"github.com/ethersphere/bee/pkg/topology/kademlia/internal/waitnext" "github.com/ethersphere/bee/pkg/topology/kademlia/internal/waitnext"
"github.com/ethersphere/bee/pkg/topology/pslice" "github.com/ethersphere/bee/pkg/topology/pslice"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
...@@ -89,12 +89,13 @@ type Kad struct { ...@@ -89,12 +89,13 @@ type Kad struct {
logger logging.Logger // logger logger logging.Logger // logger
standalone bool // indicates whether the node is working in standalone mode standalone bool // indicates whether the node is working in standalone mode
bootnode bool // indicates whether the node is working in bootnode mode bootnode bool // indicates whether the node is working in bootnode mode
collector *metrics.Collector collector *im.Collector
quit chan struct{} // quit channel quit chan struct{} // quit channel
halt chan struct{} // halt channel halt chan struct{} // halt channel
done chan struct{} // signal that `manage` has quit done chan struct{} // signal that `manage` has quit
wg sync.WaitGroup wg sync.WaitGroup
waitNext *waitnext.WaitNext waitNext *waitnext.WaitNext
metrics metrics
} }
// New returns a new Kademlia. // New returns a new Kademlia.
...@@ -134,11 +135,12 @@ func New( ...@@ -134,11 +135,12 @@ func New(
logger: logger, logger: logger,
standalone: o.StandaloneMode, standalone: o.StandaloneMode,
bootnode: o.BootnodeMode, bootnode: o.BootnodeMode,
collector: metrics.NewCollector(metricsDB), collector: im.NewCollector(metricsDB),
quit: make(chan struct{}), quit: make(chan struct{}),
halt: make(chan struct{}), halt: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
wg: sync.WaitGroup{}, wg: sync.WaitGroup{},
metrics: newMetrics(),
} }
if k.bitSuffixLength > 0 { if k.bitSuffixLength > 0 {
...@@ -235,7 +237,11 @@ type peerConnInfo struct { ...@@ -235,7 +237,11 @@ type peerConnInfo struct {
// connectBalanced attempts to connect to the balanced peers first. // connectBalanced attempts to connect to the balanced peers first.
func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnInfo) { func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnInfo) {
skipPeers := func(peer swarm.Address) bool { skipPeers := func(peer swarm.Address) bool {
return k.waitNext.Waiting(peer) if k.waitNext.Waiting(peer) {
k.metrics.TotalBeforeExpireWaits.Inc()
return true
}
return false
} }
for i := range k.commonBinPrefixes { for i := range k.commonBinPrefixes {
...@@ -309,6 +315,7 @@ func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerCon ...@@ -309,6 +315,7 @@ func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerCon
} }
if k.waitNext.Waiting(addr) { if k.waitNext.Waiting(addr) {
k.metrics.TotalBeforeExpireWaits.Inc()
return false, false, nil return false, false, nil
} }
...@@ -371,7 +378,8 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, ...@@ -371,7 +378,8 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
k.connectedPeers.Add(peer.addr) k.connectedPeers.Add(peer.addr)
k.collector.Record(peer.addr, metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionOutbound)) k.metrics.TotalOutboundConnections.Inc()
k.collector.Record(peer.addr, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionOutbound))
k.depthMu.Lock() k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers, k.radius) k.depth = recalcDepth(k.connectedPeers, k.radius)
...@@ -398,6 +406,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, ...@@ -398,6 +406,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
addr := peer.addr.String() addr := peer.addr.String()
if k.waitNext.Waiting(peer.addr) { if k.waitNext.Waiting(peer.addr) {
k.metrics.TotalBeforeExpireWaits.Inc()
wg.Done() wg.Done()
continue continue
} }
...@@ -450,8 +459,12 @@ func (k *Kad) manage() { ...@@ -450,8 +459,12 @@ func (k *Kad) manage() {
case <-k.quit: case <-k.quit:
return return
case <-time.After(30 * time.Second): case <-time.After(30 * time.Second):
start := time.Now()
if err := k.collector.Flush(); err != nil { if err := k.collector.Flush(); err != nil {
k.metrics.InternalMetricsFlushTotalErrors.Inc()
k.logger.Debugf("kademlia: unable to flush metrics counters to the persistent store: %v", err) k.logger.Debugf("kademlia: unable to flush metrics counters to the persistent store: %v", err)
} else {
k.metrics.InternalMetricsFlushTime.Observe(float64(time.Since(start).Nanoseconds()))
} }
k.notifyManageLoop() k.notifyManageLoop()
case <-k.manageC: case <-k.manageC:
...@@ -474,13 +487,24 @@ func (k *Kad) manage() { ...@@ -474,13 +487,24 @@ func (k *Kad) manage() {
k.connectBalanced(&wg, peerConnChan) k.connectBalanced(&wg, peerConnChan)
k.connectNeighbours(&wg, peerConnChan) k.connectNeighbours(&wg, peerConnChan)
wg.Wait() wg.Wait()
k.depthMu.Lock()
depth := k.depth
radius := k.radius
k.depthMu.Unlock()
k.logger.Tracef( k.logger.Tracef(
"kademlia: connector took %s to finish: old depth %d; new depth %d", "kademlia: connector took %s to finish: old depth %d; new depth %d",
time.Since(start), time.Since(start),
oldDepth, oldDepth,
k.NeighborhoodDepth(), depth,
) )
k.metrics.CurrentDepth.Set(float64(depth))
k.metrics.CurrentRadius.Set(float64(radius))
k.metrics.CurrentlyKnownPeers.Set(float64(k.knownPeers.Length()))
k.metrics.CurrentlyConnectedPeers.Set(float64(k.connectedPeers.Length()))
if k.connectedPeers.Length() == 0 { if k.connectedPeers.Length() == 0 {
select { select {
case <-k.halt: case <-k.halt:
...@@ -527,6 +551,8 @@ func (k *Kad) connectBootnodes(ctx context.Context) { ...@@ -527,6 +551,8 @@ func (k *Kad) connectBootnodes(ctx context.Context) {
bzzAddress, err := k.p2p.Connect(ctx, addr) bzzAddress, err := k.p2p.Connect(ctx, addr)
attempts++ attempts++
k.metrics.TotalBootNodesConnectionAttempts.Inc()
if err != nil { if err != nil {
if errors.Is(err, p2p.ErrDialLightNode) { if errors.Is(err, p2p.ErrDialLightNode) {
k.logger.Debugf("connect fail %s: %v", addr, err) k.logger.Debugf("connect fail %s: %v", addr, err)
...@@ -654,6 +680,8 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -654,6 +680,8 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
ctx, cancel := context.WithTimeout(ctx, peerConnectionAttemptTimeout) ctx, cancel := context.WithTimeout(ctx, peerConnectionAttemptTimeout)
defer cancel() defer cancel()
k.metrics.TotalOutboundConnectionAttempts.Inc()
switch i, err := k.p2p.Connect(ctx, ma); { switch i, err := k.p2p.Connect(ctx, ma); {
case errors.Is(err, p2p.ErrDialLightNode): case errors.Is(err, p2p.ErrDialLightNode):
return errPruneEntry return errPruneEntry
...@@ -677,9 +705,10 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -677,9 +705,10 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
failedAttempts++ failedAttempts++
} }
k.collector.Record(peer, metrics.IncSessionConnectionRetry()) k.metrics.TotalOutboundConnectionFailedAttempts.Inc()
k.collector.Record(peer, im.IncSessionConnectionRetry())
k.collector.Inspect(peer, func(ss *metrics.Snapshot) { k.collector.Inspect(peer, func(ss *im.Snapshot) {
quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt() quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt()
if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts { if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts {
...@@ -758,6 +787,7 @@ func (k *Kad) AddPeers(addrs ...swarm.Address) { ...@@ -758,6 +787,7 @@ func (k *Kad) AddPeers(addrs ...swarm.Address) {
} }
func (k *Kad) Pick(peer p2p.Peer) bool { func (k *Kad) Pick(peer p2p.Peer) bool {
k.metrics.PickCalls.Inc()
if k.bootnode { if k.bootnode {
// shortcircuit for bootnode mode - always accept connections, // shortcircuit for bootnode mode - always accept connections,
// at least until we find a better solution. // at least until we find a better solution.
...@@ -766,7 +796,11 @@ func (k *Kad) Pick(peer p2p.Peer) bool { ...@@ -766,7 +796,11 @@ func (k *Kad) Pick(peer p2p.Peer) bool {
po := swarm.Proximity(k.base.Bytes(), peer.Address.Bytes()) po := swarm.Proximity(k.base.Bytes(), peer.Address.Bytes())
_, oversaturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers) _, oversaturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers)
// pick the peer if we are not oversaturated // pick the peer if we are not oversaturated
return !oversaturated if !oversaturated {
return true
}
k.metrics.PickCallsFalse.Inc()
return false
} }
// Connected is called when a peer has dialed in. // Connected is called when a peer has dialed in.
...@@ -805,7 +839,8 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error { ...@@ -805,7 +839,8 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
k.knownPeers.Add(addr) k.knownPeers.Add(addr)
k.connectedPeers.Add(addr) k.connectedPeers.Add(addr)
k.collector.Record(addr, metrics.PeerLogIn(time.Now(), metrics.PeerConnectionDirectionInbound)) k.metrics.TotalInboundConnections.Inc()
k.collector.Record(addr, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionInbound))
k.waitNext.Remove(addr) k.waitNext.Remove(addr)
...@@ -827,7 +862,8 @@ func (k *Kad) Disconnected(peer p2p.Peer) { ...@@ -827,7 +862,8 @@ func (k *Kad) Disconnected(peer p2p.Peer) {
k.waitNext.SetTryAfter(peer.Address, time.Now().Add(timeToRetry)) k.waitNext.SetTryAfter(peer.Address, time.Now().Add(timeToRetry))
k.collector.Record(peer.Address, metrics.PeerLogOut(time.Now())) k.metrics.TotalInboundDisconnections.Inc()
k.collector.Record(peer.Address, im.PeerLogOut(time.Now()))
k.depthMu.Lock() k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers, k.radius) k.depth = recalcDepth(k.connectedPeers, k.radius)
...@@ -1254,7 +1290,7 @@ func (k *Kad) randomPeer(bin uint8) (swarm.Address, error) { ...@@ -1254,7 +1290,7 @@ func (k *Kad) randomPeer(bin uint8) (swarm.Address, error) {
// createMetricsSnapshotView creates new topology.MetricSnapshotView from the // createMetricsSnapshotView creates new topology.MetricSnapshotView from the
// given metrics.Snapshot and rounds all the timestamps and durations to its // given metrics.Snapshot and rounds all the timestamps and durations to its
// nearest second. // nearest second.
func createMetricsSnapshotView(ss *metrics.Snapshot) *topology.MetricSnapshotView { func createMetricsSnapshotView(ss *im.Snapshot) *topology.MetricSnapshotView {
if ss == nil { if ss == nil {
return nil return nil
} }
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package kademlia
import (
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)
// metrics groups kademlia related prometheus counters.
type metrics struct {
PickCalls prometheus.Counter
PickCallsFalse prometheus.Counter
CurrentDepth prometheus.Gauge
CurrentRadius prometheus.Gauge
CurrentlyKnownPeers prometheus.Gauge
CurrentlyConnectedPeers prometheus.Gauge
InternalMetricsFlushTime prometheus.Histogram
InternalMetricsFlushTotalErrors prometheus.Counter
TotalBeforeExpireWaits prometheus.Counter
TotalInboundConnections prometheus.Counter
TotalInboundDisconnections prometheus.Counter
TotalOutboundConnections prometheus.Counter
TotalOutboundConnectionAttempts prometheus.Counter
TotalOutboundConnectionFailedAttempts prometheus.Counter
TotalBootNodesConnectionAttempts prometheus.Counter
}
// newMetrics is a convenient constructor for creating new metrics.
func newMetrics() metrics {
const subsystem = "kademlia"
return metrics{
PickCalls: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "pick_calls",
Help: "The number of pick method call made.",
}),
PickCallsFalse: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "pick_calls_false",
Help: "The number of pick method call made which returned false.",
}),
CurrentDepth: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "current_depth",
Help: "The current value of depth.",
}),
CurrentRadius: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "current_radius",
Help: "The current value of radius.",
}),
CurrentlyKnownPeers: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "currently_known_peers",
Help: "Number of currently known peers.",
}),
CurrentlyConnectedPeers: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "currently_connected_peers",
Help: "Number of currently connected peers.",
}),
InternalMetricsFlushTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "internal_metrics_flush_time",
Help: "The time spent flushing the internal metrics about peers to the state-store.",
}),
InternalMetricsFlushTotalErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "internal_metrics_flush_total_errors",
Help: "Number of total errors occurred during flushing the internal metrics to the state-store.",
}),
TotalBeforeExpireWaits: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_before_expire_waits",
Help: "Total before expire waits made.",
}),
TotalInboundConnections: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_inbound_connections",
Help: "Total inbound connections made.",
}),
TotalInboundDisconnections: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_inbound_disconnections",
Help: "Total inbound disconnections made.",
}),
TotalOutboundConnections: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_outbound_connections",
Help: "Total outbound connections made.",
}),
TotalOutboundConnectionAttempts: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_outbound_connection_attempts",
Help: "Total outbound connection attempts made.",
}),
TotalOutboundConnectionFailedAttempts: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_outbound_connection_failed_attempts",
Help: "Total outbound connection failed attempts made.",
}),
TotalBootNodesConnectionAttempts: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_bootnodes_connection_attempts",
Help: "Total boot-nodes connection attempts made.",
})}
}
// Metrics returns set of prometheus collectors.
func (k *Kad) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(k.metrics)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment