Commit a1842698 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

op-node: Derivation metrics (#3156)

* op-node: Derivation metrics

Adds support for the following batch derivation metrics:

- 0/1 gauge for if the derivation is idle
- Count of total pipeline resets
- Count of total unsafe payloads received
- Count of total derivation errors
- Gauge tracking the various L1/L2 safe/unsafe heads

* review updates + more metrics
parent 9673eca7
......@@ -7,6 +7,7 @@ import (
"net"
"net/http"
"strconv"
"time"
"github.com/ethereum/go-ethereum"
"github.com/prometheus/client_golang/prometheus/collectors"
......@@ -27,14 +28,22 @@ const (
)
type Metrics struct {
Info *prometheus.GaugeVec
Up prometheus.Gauge
Info *prometheus.GaugeVec
Up prometheus.Gauge
RPCServerRequestsTotal *prometheus.CounterVec
RPCServerRequestDurationSeconds *prometheus.HistogramVec
RPCClientRequestsTotal *prometheus.CounterVec
RPCClientRequestDurationSeconds *prometheus.HistogramVec
RPCClientResponsesTotal *prometheus.CounterVec
DerivationIdle prometheus.Gauge
PipelineResetsTotal prometheus.Counter
LastPipelineResetUnix prometheus.Gauge
UnsafePayloadsTotal prometheus.Counter
DerivationErrorsTotal prometheus.Counter
Heads *prometheus.GaugeVec
registry *prometheus.Registry
}
......@@ -60,6 +69,7 @@ func NewMetrics(procName string) *Metrics {
Name: "up",
Help: "1 if the op node has finished starting up",
}),
RPCServerRequestsTotal: promauto.With(registry).NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: RPCServerSubsystem,
......@@ -103,6 +113,40 @@ func NewMetrics(procName string) *Metrics {
"method",
"error",
}),
DerivationIdle: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "derivation_idle",
Help: "1 if the derivation pipeline is idle",
}),
PipelineResetsTotal: promauto.With(registry).NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "pipeline_resets_total",
Help: "Count of derivation pipeline resets",
}),
LastPipelineResetUnix: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "last_pipeline_reset_unix",
Help: "Timestamp of last pipeline reset",
}),
UnsafePayloadsTotal: promauto.With(registry).NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "unsafe_payloads_total",
Help: "Count of unsafe payloads received via p2p",
}),
DerivationErrorsTotal: promauto.With(registry).NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "derivation_errors_total",
Help: "Count of total derivation errors",
}),
Heads: promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "heads",
Help: "Gauge representing the different L1/L2 heads",
}, []string{
"type",
}),
registry: registry,
}
}
......@@ -166,6 +210,24 @@ func (m *Metrics) RecordRPCClientResponse(method string, err error) {
m.RPCClientResponsesTotal.WithLabelValues(method, errStr).Inc()
}
func (m *Metrics) SetDerivationIdle(status bool) {
var val float64
if status {
val = 1
}
m.DerivationIdle.Set(val)
}
func (m *Metrics) SetHead(kind string, num uint64) {
m.Heads.WithLabelValues(kind).Set(float64(num))
}
func (m *Metrics) RecordPipelineReset() {
m.PipelineResetsTotal.Inc()
m.DerivationErrorsTotal.Inc()
m.LastPipelineResetUnix.Set(float64(time.Now().Unix()))
}
// Serve starts the metrics server on the given hostname and port.
// The server will be closed when the passed-in context is cancelled.
func (m *Metrics) Serve(ctx context.Context, hostname string, port int) error {
......
......@@ -144,7 +144,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return err
}
n.l2Engine = driver.NewDriver(&cfg.Driver, &cfg.Rollup, source, n.l1Source, n, n.log, snapshotLog)
n.l2Engine = driver.NewDriver(&cfg.Driver, &cfg.Rollup, source, n.l1Source, n, n.log, snapshotLog, n.metrics)
return nil
}
......
......@@ -4,6 +4,8 @@ import (
"context"
"math/big"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/l1"
"github.com/ethereum-optimism/optimism/op-node/l2"
......@@ -56,7 +58,7 @@ type Network interface {
PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error
}
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 *l2.Source, l1 *l1.Source, network Network, log log.Logger, snapshotLog log.Logger) *Driver {
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 *l2.Source, l1 *l1.Source, network Network, log log.Logger, snapshotLog log.Logger, metrics *metrics.Metrics) *Driver {
output := &outputImpl{
Config: cfg,
dl: l1,
......@@ -67,7 +69,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 *l2.Source, l1 *l1.Sour
var state *state
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, func() eth.L1BlockRef { return state.l1Head }, l1)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2)
state = NewState(driverCfg, log, snapshotLog, cfg, l1, l2, output, derivationPipeline, network)
state = NewState(driverCfg, log, snapshotLog, cfg, l1, l2, output, derivationPipeline, network, metrics)
return &Driver{s: state}
}
......
......@@ -9,6 +9,7 @@ import (
"time"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/log"
)
......@@ -67,6 +68,7 @@ type state struct {
output outputInterface
network Network // may be nil, network for is optional
metrics *metrics.Metrics
log log.Logger
snapshotLog log.Logger
done chan struct{}
......@@ -77,7 +79,7 @@ type state struct {
// NewState creates a new driver state. State changes take effect though
// the given output, derivation pipeline and network interfaces.
func NewState(driverCfg *Config, log log.Logger, snapshotLog log.Logger, config *rollup.Config, l1Chain L1Chain, l2Chain L2Chain,
output outputInterface, derivationPipeline DerivationPipeline, network Network) *state {
output outputInterface, derivationPipeline DerivationPipeline, network Network, metrics *metrics.Metrics) *state {
return &state{
derivation: derivationPipeline,
idleDerivation: false,
......@@ -91,6 +93,7 @@ func NewState(driverCfg *Config, log log.Logger, snapshotLog log.Logger, config
l2: l2Chain,
output: output,
network: network,
metrics: metrics,
l1Heads: make(chan eth.L1BlockRef, 10),
unsafeL2Payloads: make(chan *eth.ExecutionPayload, 10),
}
......@@ -105,6 +108,8 @@ func (s *state) Start(ctx context.Context) error {
}
s.l1Head = l1Head
s.l2Head, _ = s.l2.L2BlockRefByNumber(ctx, nil)
s.metrics.SetHead("l1", s.l1Head.Number)
s.metrics.SetHead("l2_unsafe", s.l2Head.Number)
s.derivation.Reset()
......@@ -151,6 +156,7 @@ func (s *state) handleNewL1Block(newL1Head eth.L1BlockRef) {
// This could either be a long L1 extension, or a reorg. Both can be handled the same way.
s.log.Warn("L1 Head signal indicates an L1 re-org", "old_l1_head", s.l1Head, "new_l1_head_parent", newL1Head.ParentHash, "new_l1_head", newL1Head)
}
s.metrics.SetHead("l1", newL1Head.Number)
s.l1Head = newL1Head
}
......@@ -315,6 +321,7 @@ func (s *state) eventLoop() {
cancel()
if err != nil {
s.log.Error("Error creating new L2 block", "err", err)
s.metrics.DerivationErrorsTotal.Inc()
}
// We need to catch up to the next origin as quickly as possible. We can do this by
......@@ -330,6 +337,7 @@ func (s *state) eventLoop() {
s.snapshot("New unsafe payload")
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID())
s.derivation.AddUnsafePayload(payload)
s.metrics.UnsafePayloadsTotal.Inc()
reqStep()
case newL1Head := <-s.l1Heads:
......@@ -338,6 +346,7 @@ func (s *state) eventLoop() {
s.handleNewL1Block(newL1Head)
reqStep() // a new L1 head may mean we have the data to not get an EOF again.
case <-stepReqCh:
s.metrics.SetDerivationIdle(false)
s.idleDerivation = false
s.log.Debug("Derivation process step", "onto_origin", s.derivation.Progress().Origin, "onto_closed", s.derivation.Progress().Closed)
stepCtx, cancel := context.WithTimeout(ctx, time.Second*10) // TODO pick a timeout for executing a single step
......@@ -346,16 +355,21 @@ func (s *state) eventLoop() {
if err == io.EOF {
s.log.Debug("Derivation process went idle", "progress", s.derivation.Progress().Origin)
s.idleDerivation = true
s.metrics.SetDerivationIdle(true)
continue
} else if err != nil {
// If the pipeline corrupts, e.g. due to a reorg, simply reset it
s.log.Warn("Derivation pipeline is reset", "err", err)
s.derivation.Reset()
s.metrics.RecordPipelineReset()
} else {
finalized, safe, unsafe := s.derivation.Finalized(), s.derivation.SafeL2Head(), s.derivation.UnsafeL2Head()
// log sync progress when it changes
if s.l2Finalized != finalized || s.l2SafeHead != safe || s.l2Head != unsafe {
s.log.Info("Sync progress", "finalized", finalized, "safe", safe, "unsafe", unsafe)
s.metrics.SetHead("l2_finalized", finalized.Number)
s.metrics.SetHead("l2_safe", safe.Number)
s.metrics.SetHead("l2_unsafe", unsafe.Number)
}
// update the heads
s.l2Finalized = finalized
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment