Commit b2d7a73e authored by Matthew Slipper's avatar Matthew Slipper

op-node: Add P2P metrics

Adds the following peer-to-peer metrics:

- Peer count
- Stream count
- Number of gossip events by type
- Peer-to-peer bandwidth usage by direction

I used libp2p's existing interfaces wherever I could. Note that we still need a backgorund goroutine to pull banwdiwth stats out of the libp2p bandwidth counter.

Meta:

- Fixes ENG-2338
- Fixes ENG-2336
- Fixes ENG-2329
parent 14719116
......@@ -10,6 +10,8 @@ import (
"strconv"
"time"
"github.com/libp2p/go-libp2p-core/metrics"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
......@@ -67,6 +69,12 @@ type Metrics struct {
TransactionsSequencedTotal prometheus.Counter
// P2P Metrics
PeerCount prometheus.Gauge
StreamCount prometheus.Gauge
GossipEventsTotal *prometheus.CounterVec
BandwidthTotal *prometheus.GaugeVec
registry *prometheus.Registry
}
......@@ -217,6 +225,35 @@ func NewMetrics(procName string) *Metrics {
Help: "Count of total transactions sequenced",
}),
PeerCount: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "peer_count",
Help: "Count of currently connected p2p peers",
}),
StreamCount: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "stream_count",
Help: "Count of currently connected p2p streams",
}),
GossipEventsTotal: promauto.With(registry).NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "gossip_events_total",
Help: "Count of gossip events by type",
}, []string{
"type",
}),
BandwidthTotal: promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "bandwidth_bytes_total",
Help: "P2P bandwidth by direction",
}, []string{
"direction",
}),
registry: registry,
}
}
......@@ -348,6 +385,26 @@ func (m *Metrics) RecordL1ReorgDepth(d uint64) {
m.L1ReorgDepth.Observe(float64(d))
}
func (m *Metrics) RecordGossipEvent(evType int32) {
m.GossipEventsTotal.WithLabelValues(pb.TraceEvent_Type_name[evType]).Inc()
}
func (m *Metrics) RecordBandwidth(ctx context.Context, bwc *metrics.BandwidthCounter) {
tick := time.NewTicker(10 * time.Second)
defer tick.Stop()
for {
select {
case <-tick.C:
bwTotals := bwc.GetBandwidthTotals()
m.BandwidthTotal.WithLabelValues("in").Set(float64(bwTotals.TotalIn))
m.BandwidthTotal.WithLabelValues("out").Set(float64(bwTotals.TotalOut))
case <-ctx.Done():
return
}
}
}
// Serve starts the metrics server on the given hostname and port.
// The server will be closed when the passed-in context is cancelled.
func (m *Metrics) Serve(ctx context.Context, hostname string, port int) error {
......
......@@ -196,7 +196,7 @@ func (n *OpNode) initMetricsServer(ctx context.Context, cfg *Config) error {
func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
if cfg.P2P != nil {
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n)
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.metrics)
if err != nil || p2pNode == nil {
return err
}
......
......@@ -41,7 +41,7 @@ import (
type SetupP2P interface {
Check() error
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
Host(log log.Logger) (host.Host, error)
Host(log log.Logger, reporter metrics.Reporter) (host.Host, error)
// Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled.
Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error)
TargetPeers() uint
......@@ -91,8 +91,6 @@ type Config struct {
ConnGater func(conf *Config) (connmgr.ConnectionGater, error)
ConnMngr func(conf *Config) (connmgr.ConnManager, error)
// nil to disable bandwidth metrics
BandwidthMetrics metrics.Reporter
}
type ConnectionGater interface {
......
......@@ -9,6 +9,7 @@ import (
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/golang/snappy"
lru "github.com/hashicorp/golang-lru"
"github.com/libp2p/go-libp2p-core/host"
......@@ -115,7 +116,7 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams {
return params
}
func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config) (*pubsub.PubSub, error) {
func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, metrics *metrics.Metrics) (*pubsub.PubSub, error) {
denyList, err := pubsub.NewTimeCachedBlacklist(30 * time.Second)
if err != nil {
return nil, err
......@@ -132,6 +133,7 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config) (*pub
pubsub.WithPeerExchange(false),
pubsub.WithBlacklist(denyList),
pubsub.WithGossipSubParams(BuildGlobalGossipParams(cfg)),
pubsub.WithEventTracer(&gossipTracer{m: metrics}),
)
// TODO: pubsub.WithPeerScoreInspect(inspect, InspectInterval) to update peerstore scores with gossip scores
}
......@@ -441,3 +443,11 @@ func LogTopicEvents(ctx context.Context, log log.Logger, evHandler *pubsub.Topic
}
}
}
type gossipTracer struct {
m *metrics.Metrics
}
func (g *gossipTracer) Trace(evt *pb.TraceEvent) {
g.m.RecordGossipEvent(int32(*evt.Type))
}
......@@ -8,6 +8,7 @@ import (
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/metrics"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
lconf "github.com/libp2p/go-libp2p/config"
......@@ -41,7 +42,7 @@ func (e *extraHost) ConnectionManager() connmgr.ConnManager {
var _ ExtraHostFeatures = (*extraHost)(nil)
func (conf *Config) Host(log log.Logger) (host.Host, error) {
func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host, error) {
if conf.DisableP2P {
return nil, nil
}
......@@ -115,7 +116,7 @@ func (conf *Config) Host(log log.Logger) (host.Host, error) {
ResourceManager: nil, // TODO use resource manager interface to manage resources per peer better.
NATManager: nat,
Peerstore: ps,
Reporter: conf.BandwidthMetrics, // may be nil if disabled
Reporter: reporter, // may be nil if disabled
MultiaddrResolver: madns.DefaultResolver,
// Ping is a small built-in libp2p protocol that helps us check/debug latency between peers.
DisablePing: false,
......
......@@ -65,10 +65,10 @@ func TestingConfig(t *testing.T) *Config {
func TestP2PSimple(t *testing.T) {
confA := TestingConfig(t)
confB := TestingConfig(t)
hostA, err := confA.Host(testlog.Logger(t, log.LvlError).New("host", "A"))
hostA, err := confA.Host(testlog.Logger(t, log.LvlError).New("host", "A"), nil)
require.NoError(t, err, "failed to launch host A")
defer hostA.Close()
hostB, err := confB.Host(testlog.Logger(t, log.LvlError).New("host", "B"))
hostB, err := confB.Host(testlog.Logger(t, log.LvlError).New("host", "B"), nil)
require.NoError(t, err, "failed to launch host B")
defer hostB.Close()
err = hostA.Connect(context.Background(), peer.AddrInfo{ID: hostB.ID(), Addrs: hostB.Addrs()})
......@@ -132,7 +132,7 @@ func TestP2PFull(t *testing.T) {
// TODO: maybe swap the order of sec/mux preferences, to test that negotiation works
logA := testlog.Logger(t, log.LvlError).New("host", "A")
nodeA, err := NewNodeP2P(context.Background(), &rollup.Config{}, logA, &confA, &mockGossipIn{})
nodeA, err := NewNodeP2P(context.Background(), &rollup.Config{}, logA, &confA, &mockGossipIn{}, metrics.NewMetrics(""))
require.NoError(t, err)
defer nodeA.Close()
......@@ -155,7 +155,7 @@ func TestP2PFull(t *testing.T) {
logB := testlog.Logger(t, log.LvlError).New("host", "B")
nodeB, err := NewNodeP2P(context.Background(), &rollup.Config{}, logB, &confB, &mockGossipIn{})
nodeB, err := NewNodeP2P(context.Background(), &rollup.Config{}, logB, &confB, &mockGossipIn{}, metrics.NewMetrics(""))
require.NoError(t, err)
defer nodeB.Close()
hostB := nodeB.Host()
......@@ -289,7 +289,7 @@ func TestDiscovery(t *testing.T) {
resourcesCtx, resourcesCancel := context.WithCancel(context.Background())
defer resourcesCancel()
nodeA, err := NewNodeP2P(context.Background(), rollupCfg, logA, &confA, &mockGossipIn{})
nodeA, err := NewNodeP2P(context.Background(), rollupCfg, logA, &confA, &mockGossipIn{}, metrics.NewMetrics(""))
require.NoError(t, err)
defer nodeA.Close()
hostA := nodeA.Host()
......@@ -304,7 +304,7 @@ func TestDiscovery(t *testing.T) {
confB.DiscoveryDB = discDBC
// Start B
nodeB, err := NewNodeP2P(context.Background(), rollupCfg, logB, &confB, &mockGossipIn{})
nodeB, err := NewNodeP2P(context.Background(), rollupCfg, logB, &confB, &mockGossipIn{}, metrics.NewMetrics(""))
require.NoError(t, err)
defer nodeB.Close()
hostB := nodeB.Host()
......@@ -319,7 +319,7 @@ func TestDiscovery(t *testing.T) {
}})
// Start C
nodeC, err := NewNodeP2P(context.Background(), rollupCfg, logC, &confC, &mockGossipIn{})
nodeC, err := NewNodeP2P(context.Background(), rollupCfg, logC, &confC, &mockGossipIn{}, metrics.NewMetrics(""))
require.NoError(t, err)
defer nodeC.Close()
hostC := nodeC.Host()
......
......@@ -6,9 +6,11 @@ import (
"fmt"
"strconv"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/host"
p2pmetrics "github.com/libp2p/go-libp2p-core/metrics"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
ma "github.com/multiformats/go-multiaddr"
......@@ -30,12 +32,12 @@ type NodeP2P struct {
gsOut GossipOut // p2p gossip application interface for publishing
}
func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn) (*NodeP2P, error) {
func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, metrics *metrics.Metrics) (*NodeP2P, error) {
if setup == nil {
return nil, errors.New("p2p node cannot be created without setup")
}
var n NodeP2P
if err := n.init(resourcesCtx, rollupCfg, log, setup, gossipIn); err != nil {
if err := n.init(resourcesCtx, rollupCfg, log, setup, gossipIn, metrics); err != nil {
closeErr := n.Close()
if closeErr != nil {
log.Error("failed to close p2p after starting with err", "closeErr", closeErr, "err", err)
......@@ -48,10 +50,12 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.
return &n, nil
}
func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn) error {
func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, metrics *metrics.Metrics) error {
bwc := p2pmetrics.NewBandwidthCounter()
var err error
// nil if disabled.
n.host, err = setup.Host(log)
n.host, err = setup.Host(log, bwc)
if err != nil {
if n.dv5Udp != nil {
n.dv5Udp.Close()
......@@ -66,10 +70,10 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.connMgr = extra.ConnectionManager()
}
// notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log))
n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// unregister identify-push handler. Only identifying on dial is fine, and more robust against spam
n.host.RemoveStreamHandler(identify.IDDelta)
n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg)
n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, metrics)
if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err)
}
......@@ -90,6 +94,8 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
if err != nil {
return fmt.Errorf("failed to start discv5: %w", err)
}
go metrics.RecordBandwidth(resourcesCtx, bwc)
}
return nil
}
......
package p2p
import (
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/libp2p/go-libp2p-core/network"
ma "github.com/multiformats/go-multiaddr"
......@@ -11,6 +12,7 @@ import (
type notifications struct {
log log.Logger
m *metrics.Metrics
}
func (notif *notifications) Listen(n network.Network, a ma.Multiaddr) {
......@@ -20,20 +22,24 @@ func (notif *notifications) ListenClose(n network.Network, a ma.Multiaddr) {
notif.log.Info("stopped listening network address", "addr", a)
}
func (notif *notifications) Connected(n network.Network, v network.Conn) {
notif.m.PeerCount.Inc()
notif.log.Info("connected to peer", "peer", v.RemotePeer(), "addr", v.RemoteMultiaddr())
}
func (notif *notifications) Disconnected(n network.Network, v network.Conn) {
notif.m.PeerCount.Dec()
notif.log.Info("disconnected from peer", "peer", v.RemotePeer(), "addr", v.RemoteMultiaddr())
}
func (notif *notifications) OpenedStream(n network.Network, v network.Stream) {
notif.m.StreamCount.Inc()
c := v.Conn()
notif.log.Trace("opened stream", "protocol", v.Protocol(), "peer", c.RemotePeer(), "addr", c.RemoteMultiaddr())
}
func (notif *notifications) ClosedStream(n network.Network, v network.Stream) {
notif.m.StreamCount.Dec()
c := v.Conn()
notif.log.Trace("opened stream", "protocol", v.Protocol(), "peer", c.RemotePeer(), "addr", c.RemoteMultiaddr())
}
func NewNetworkNotifier(log log.Logger) network.Notifiee {
return &notifications{log: log}
func NewNetworkNotifier(log log.Logger, metrics *metrics.Metrics) network.Notifiee {
return &notifications{log: log, m: metrics}
}
......@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/log"
......@@ -38,7 +39,7 @@ func (p *Prepared) Check() error {
}
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
func (p *Prepared) Host(log log.Logger) (host.Host, error) {
func (p *Prepared) Host(log log.Logger, reporter metrics.Reporter) (host.Host, error) {
return p.HostP2P, nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment