Commit f22dac93 authored by Andreas Bigger's avatar Andreas Bigger

feat: a foray into opnode p2p - peer score metrics

parent 1bd4e6f7
package p2p package p2p
import ( import (
"bufio"
"crypto/rand" "crypto/rand"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
...@@ -51,7 +52,8 @@ func Pub2PeerID(r io.Reader) (string, error) { ...@@ -51,7 +52,8 @@ func Pub2PeerID(r io.Reader) (string, error) {
} }
func readHexData(r io.Reader) ([]byte, error) { func readHexData(r io.Reader) ([]byte, error) {
data, err := io.ReadAll(r) reader := bufio.NewReader(os.Stdin)
data, err := reader.ReadString('\n')
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -22,6 +22,8 @@ type Payload struct { ...@@ -22,6 +22,8 @@ type Payload struct {
ChainID uint64 `json:"chainID"` ChainID uint64 `json:"chainID"`
} }
// Beat sends a heartbeat to the server at the given URL. It will send a heartbeat immediately, and then every SendInterval.
// Beat spawns a goroutine that will send heartbeats until the context is canceled.
func Beat( func Beat(
ctx context.Context, ctx context.Context,
log log.Logger, log log.Logger,
......
// Package metrics provides a set of metrics for the op-node.
package metrics package metrics
import ( import (
...@@ -14,6 +15,7 @@ import ( ...@@ -14,6 +15,7 @@ import (
pb "github.com/libp2p/go-libp2p-pubsub/pb" pb "github.com/libp2p/go-libp2p-pubsub/pb"
libp2pmetrics "github.com/libp2p/go-libp2p/core/metrics" libp2pmetrics "github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
...@@ -52,7 +54,6 @@ type Metricer interface { ...@@ -52,7 +54,6 @@ type Metricer interface {
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID) RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
CountSequencedTxs(count int) CountSequencedTxs(count int)
RecordL1ReorgDepth(d uint64) RecordL1ReorgDepth(d uint64)
RecordGossipEvent(evType int32)
IncPeerCount() IncPeerCount()
DecPeerCount() DecPeerCount()
IncStreamCount() IncStreamCount()
...@@ -61,8 +62,12 @@ type Metricer interface { ...@@ -61,8 +62,12 @@ type Metricer interface {
RecordSequencerBuildingDiffTime(duration time.Duration) RecordSequencerBuildingDiffTime(duration time.Duration)
RecordSequencerSealingTime(duration time.Duration) RecordSequencerSealingTime(duration time.Duration)
Document() []metrics.DocumentedMetric Document() []metrics.DocumentedMetric
// P2P Metrics
RecordGossipEvent(evType int32)
RecordPeerScoring(peerID peer.ID, score float64)
} }
// Metrics tracks all the metrics for the op-node.
type Metrics struct { type Metrics struct {
Info *prometheus.GaugeVec Info *prometheus.GaugeVec
Up prometheus.Gauge Up prometheus.Gauge
...@@ -109,6 +114,7 @@ type Metrics struct { ...@@ -109,6 +114,7 @@ type Metrics struct {
// P2P Metrics // P2P Metrics
PeerCount prometheus.Gauge PeerCount prometheus.Gauge
StreamCount prometheus.Gauge StreamCount prometheus.Gauge
PeerScores map[peer.ID]float64
GossipEventsTotal *prometheus.CounterVec GossipEventsTotal *prometheus.CounterVec
BandwidthTotal *prometheus.GaugeVec BandwidthTotal *prometheus.GaugeVec
...@@ -456,6 +462,10 @@ func (m *Metrics) RecordGossipEvent(evType int32) { ...@@ -456,6 +462,10 @@ func (m *Metrics) RecordGossipEvent(evType int32) {
m.GossipEventsTotal.WithLabelValues(pb.TraceEvent_Type_name[evType]).Inc() m.GossipEventsTotal.WithLabelValues(pb.TraceEvent_Type_name[evType]).Inc()
} }
func (m *Metrics) RecordPeerScoring(peerID peer.ID, score float64) {
m.PeerScores[peerID] = score
}
func (m *Metrics) IncPeerCount() { func (m *Metrics) IncPeerCount() {
m.PeerCount.Inc() m.PeerCount.Inc()
} }
...@@ -584,6 +594,9 @@ func (n *noopMetricer) RecordL1ReorgDepth(d uint64) { ...@@ -584,6 +594,9 @@ func (n *noopMetricer) RecordL1ReorgDepth(d uint64) {
func (n *noopMetricer) RecordGossipEvent(evType int32) { func (n *noopMetricer) RecordGossipEvent(evType int32) {
} }
func (n *noopMetricer) RecordPeerScoring(peerID peer.ID, score float64) {
}
func (n *noopMetricer) IncPeerCount() { func (n *noopMetricer) IncPeerCount() {
} }
......
...@@ -74,7 +74,7 @@ func (s *rpcServer) Start() error { ...@@ -74,7 +74,7 @@ func (s *rpcServer) Start() error {
// The CORS and VHosts arguments below must be set in order for // The CORS and VHosts arguments below must be set in order for
// other services to connect to the opnode. VHosts in particular // other services to connect to the opnode. VHosts in particular
// defaults to localhost, which will prevent containers from // defaults to localhost, which will prevent containers from
// calling into the opnode without an "invalid host" error. // calling into the opnode with an "invalid host" error.
nodeHandler := node.NewHTTPHandlerStack(srv, []string{"*"}, []string{"*"}, nil) nodeHandler := node.NewHTTPHandlerStack(srv, []string{"*"}, []string{"*"}, nil)
mux := http.NewServeMux() mux := http.NewServeMux()
......
...@@ -39,6 +39,8 @@ const ( ...@@ -39,6 +39,8 @@ const (
DefaultMeshDlo = 6 // topic stable mesh low watermark DefaultMeshDlo = 6 // topic stable mesh low watermark
DefaultMeshDhi = 12 // topic stable mesh high watermark DefaultMeshDhi = 12 // topic stable mesh high watermark
DefaultMeshDlazy = 6 // gossip target DefaultMeshDlazy = 6 // gossip target
// peerScoreInspectFrequency is the frequency at which peer scores are inspected
peerScoreInspectFrequency = 1 * time.Minute
) )
// Message domains, the msg id function uncompresses to keep data monomorphic, // Message domains, the msg id function uncompresses to keep data monomorphic,
...@@ -57,6 +59,7 @@ type GossipRuntimeConfig interface { ...@@ -57,6 +59,7 @@ type GossipRuntimeConfig interface {
type GossipMetricer interface { type GossipMetricer interface {
RecordGossipEvent(evType int32) RecordGossipEvent(evType int32)
RecordPeerScoring(peerID peer.ID, score float64)
} }
func blocksTopicV1(cfg *rollup.Config) string { func blocksTopicV1(cfg *rollup.Config) string {
...@@ -140,6 +143,8 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams { ...@@ -140,6 +143,8 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams {
return params return params
} }
// NewGossipSub configures a new pubsub instance with the specified parameters.
// PubSub uses a GossipSubRouter as it's router under the hood.
func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, gossipConf GossipSetupConfigurables, m GossipMetricer) (*pubsub.PubSub, error) { func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, gossipConf GossipSetupConfigurables, m GossipMetricer) (*pubsub.PubSub, error) {
denyList, err := pubsub.NewTimeCachedBlacklist(30 * time.Second) denyList, err := pubsub.NewTimeCachedBlacklist(30 * time.Second)
if err != nil { if err != nil {
...@@ -160,10 +165,21 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, gossi ...@@ -160,10 +165,21 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, gossi
pubsub.WithBlacklist(denyList), pubsub.WithBlacklist(denyList),
pubsub.WithGossipSubParams(params), pubsub.WithGossipSubParams(params),
pubsub.WithEventTracer(&gossipTracer{m: m}), pubsub.WithEventTracer(&gossipTracer{m: m}),
pubsub.WithPeerScoreInspect(BuildPeerScoreInspector(m), peerScoreInspectFrequency),
} }
gossipOpts = append(gossipOpts, gossipConf.ConfigureGossip(&params)...) gossipOpts = append(gossipOpts, gossipConf.ConfigureGossip(&params)...)
return pubsub.NewGossipSub(p2pCtx, h, gossipOpts...) return pubsub.NewGossipSub(p2pCtx, h, gossipOpts...)
// TODO: pubsub.WithPeerScoreInspect(inspect, InspectInterval) to update peerstore scores with gossip scores }
// BuildPeerScoreInspector returns a function that is called periodically by the pubsub library to inspect the peer scores.
// It is passed into the pubsub library as a [pubsub.ExtendedPeerScoreInspectFn] in the [pubsub.WithPeerScoreInspect] option.
// The returned [pubsub.ExtendedPeerScoreInspectFn] is called with a mapping of peer IDs to peer score snapshots.
func BuildPeerScoreInspector(metricer GossipMetricer) pubsub.ExtendedPeerScoreInspectFn {
return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) {
for id, s := range m {
metricer.RecordPeerScoring(id, s.Score)
}
}
} }
func validationResultString(v pubsub.ValidationResult) string { func validationResultString(v pubsub.ValidationResult) string {
......
...@@ -34,6 +34,8 @@ type NodeP2P struct { ...@@ -34,6 +34,8 @@ type NodeP2P struct {
gsOut GossipOut // p2p gossip application interface for publishing gsOut GossipOut // p2p gossip application interface for publishing
} }
// NewNodeP2P creates a new p2p node, and returns a reference to it. If the p2p is disabled, it returns nil.
// If metrics are configured, a bandwidth monitor will be spawned in a goroutine.
func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, runCfg GossipRuntimeConfig, metrics metrics.Metricer) (*NodeP2P, error) { func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, runCfg GossipRuntimeConfig, metrics metrics.Metricer) (*NodeP2P, error) {
if setup == nil { if setup == nil {
return nil, errors.New("p2p node cannot be created without setup") return nil, errors.New("p2p node cannot be created without setup")
......
...@@ -12,9 +12,9 @@ import ( ...@@ -12,9 +12,9 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
// IterativeBatchCall is an util to create a job to fetch many RPC requests in batches, // IterativeBatchCall batches many RPC requests with safe and easy parallelization.
// and enable the caller to parallelize easily and safely, handle and re-try errors, // Request errors are handled and re-tried, and the batch size is configurable.
// and pick a batch size all by simply calling Fetch again and again until it returns io.EOF. // Executing IterativeBatchCall is as simple as calling Fetch repeatedly until it returns io.EOF.
type IterativeBatchCall[K any, V any] struct { type IterativeBatchCall[K any, V any] struct {
completed uint32 // tracks how far to completing all requests we are completed uint32 // tracks how far to completing all requests we are
resetLock sync.RWMutex // ensures we do not concurrently read (incl. fetch) / reset resetLock sync.RWMutex // ensures we do not concurrently read (incl. fetch) / reset
...@@ -77,7 +77,7 @@ func (ibc *IterativeBatchCall[K, V]) Reset() { ...@@ -77,7 +77,7 @@ func (ibc *IterativeBatchCall[K, V]) Reset() {
} }
// Fetch fetches more of the data, and returns io.EOF when all data has been fetched. // Fetch fetches more of the data, and returns io.EOF when all data has been fetched.
// This method is safe to call concurrently: it will parallelize the fetching work. // This method is safe to call concurrently; it will parallelize the fetching work.
// If no work is available, but the fetching is not done yet, // If no work is available, but the fetching is not done yet,
// then Fetch will block until the next thing can be fetched, or until the context expires. // then Fetch will block until the next thing can be fetched, or until the context expires.
func (ibc *IterativeBatchCall[K, V]) Fetch(ctx context.Context) error { func (ibc *IterativeBatchCall[K, V]) Fetch(ctx context.Context) error {
...@@ -115,7 +115,6 @@ func (ibc *IterativeBatchCall[K, V]) Fetch(ctx context.Context) error { ...@@ -115,7 +115,6 @@ func (ibc *IterativeBatchCall[K, V]) Fetch(ctx context.Context) error {
} }
return ctx.Err() return ctx.Err()
default: default:
break
} }
break break
} }
......
// Package sources exports a number of clients used to access ethereum chain data.
//
// There are a number of these exported clients used by the op-node:
// [L1Client] wraps an RPC client to retrieve L1 ethereum data.
// [L2Client] wraps an RPC client to retrieve L2 ethereum data.
// [RollupClient] wraps an RPC client to retrieve rollup data.
// [EngineClient] extends the [L2Client] providing engine API bindings.
//
// Internally, the listed clients wrap an [EthClient] which itself wraps a specified RPC client.
package sources package sources
import ( import (
...@@ -126,8 +135,8 @@ func (s *EthClient) OnReceiptsMethodErr(m ReceiptsFetchingMethod, err error) { ...@@ -126,8 +135,8 @@ func (s *EthClient) OnReceiptsMethodErr(m ReceiptsFetchingMethod, err error) {
} }
} }
// NewEthClient wraps a RPC with bindings to fetch ethereum data, // NewEthClient returns an [EthClient], wrapping an RPC with bindings to fetch ethereum data with added error logging,
// while logging errors, parallel-requests constraint, tracking metrics (optional), and caching. // metric tracking, and caching. The [EthClient] uses a [LimitRPC] wrapper to limit the number of concurrent RPC requests.
func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, config *EthClientConfig) (*EthClient, error) { func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, config *EthClientConfig) (*EthClient, error) {
if err := config.Check(); err != nil { if err := config.Check(); err != nil {
return nil, fmt.Errorf("bad config, cannot create L1 source: %w", err) return nil, fmt.Errorf("bad config, cannot create L1 source: %w", err)
......
...@@ -68,6 +68,8 @@ func NewL1Client(client client.RPC, log log.Logger, metrics caching.Metrics, con ...@@ -68,6 +68,8 @@ func NewL1Client(client client.RPC, log log.Logger, metrics caching.Metrics, con
}, nil }, nil
} }
// L1BlockRefByLabel returns the L1BlockRef for the given block label.
// Notice, we cannot cache a block reference by label because labels are not guaranteed to be unique.
func (s *L1Client) L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error) { func (s *L1Client) L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error) {
info, err := s.InfoByLabel(ctx, label) info, err := s.InfoByLabel(ctx, label)
if err != nil { if err != nil {
...@@ -83,6 +85,8 @@ func (s *L1Client) L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) ...@@ -83,6 +85,8 @@ func (s *L1Client) L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel)
return ref, nil return ref, nil
} }
// L1BlockRefByNumber returns the L1BlockRef for the given block number.
// Notice, we cannot cache a block reference by number because L1 re-orgs can invalidate the cached block reference.
func (s *L1Client) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1BlockRef, error) { func (s *L1Client) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1BlockRef, error) {
info, err := s.InfoByNumber(ctx, num) info, err := s.InfoByNumber(ctx, num)
if err != nil { if err != nil {
...@@ -93,6 +97,8 @@ func (s *L1Client) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1Bl ...@@ -93,6 +97,8 @@ func (s *L1Client) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1Bl
return ref, nil return ref, nil
} }
// L1BlockRefByHash returns the L1BlockRef for the given block hash.
// We cache the block reference by hash as it is safe to assume collision will not occur.
func (s *L1Client) L1BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L1BlockRef, error) { func (s *L1Client) L1BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L1BlockRef, error) {
if v, ok := s.l1BlockRefsCache.Get(hash); ok { if v, ok := s.l1BlockRefsCache.Get(hash); ok {
return v.(eth.L1BlockRef), nil return v.(eth.L1BlockRef), nil
......
...@@ -70,6 +70,9 @@ type L2Client struct { ...@@ -70,6 +70,9 @@ type L2Client struct {
systemConfigsCache *caching.LRUCache systemConfigsCache *caching.LRUCache
} }
// NewL2Client constructs a new L2Client instance. The L2Client is a thin wrapper around the EthClient with added functions
// for fetching and caching eth.L2BlockRef values. This includes fetching an L2BlockRef by block number, label, or hash.
// See: [L2BlockRefByLabel], [L2BlockRefByNumber], [L2BlockRefByHash]
func NewL2Client(client client.RPC, log log.Logger, metrics caching.Metrics, config *L2ClientConfig) (*L2Client, error) { func NewL2Client(client client.RPC, log log.Logger, metrics caching.Metrics, config *L2ClientConfig) (*L2Client, error) {
ethClient, err := NewEthClient(client, log, metrics, &config.EthClientConfig) ethClient, err := NewEthClient(client, log, metrics, &config.EthClientConfig)
if err != nil { if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment