Commit 507f566f authored by Diederik Loerakker's avatar Diederik Loerakker Committed by GitHub

op-node: improve sync metrics (#3275)

Co-authored-by: default avatarMatthew Slipper <me@matthewslipper.com>
parent 5f9b8919
package metrics
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type EventMetrics struct {
Total prometheus.Counter
LastTime prometheus.Gauge
}
func (e *EventMetrics) RecordEvent() {
e.Total.Inc()
e.LastTime.Set(float64(time.Now().Unix()))
}
func NewEventMetrics(registry prometheus.Registerer, ns string, name string, displayName string) *EventMetrics {
return &EventMetrics{
Total: promauto.With(registry).NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: fmt.Sprintf("%s_total", name),
Help: fmt.Sprintf("Count of %s events", displayName),
}),
LastTime: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: fmt.Sprintf("last_%s_unix", name),
Help: fmt.Sprintf("Timestamp of last %s event", displayName),
}),
}
}
...@@ -2,6 +2,7 @@ package metrics ...@@ -2,6 +2,7 @@ package metrics
import ( import (
"context" "context"
"encoding/binary"
"errors" "errors"
"fmt" "fmt"
"net" "net"
...@@ -9,6 +10,9 @@ import ( ...@@ -9,6 +10,9 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/collectors"
...@@ -41,13 +45,22 @@ type Metrics struct { ...@@ -41,13 +45,22 @@ type Metrics struct {
// TODO: L2SourceCache *CacheMetrics // TODO: L2SourceCache *CacheMetrics
DerivationIdle prometheus.Gauge DerivationIdle prometheus.Gauge
PipelineResetsTotal prometheus.Counter
LastPipelineResetUnix prometheus.Gauge PipelineResets *EventMetrics
UnsafePayloadsTotal prometheus.Counter UnsafePayloads *EventMetrics
DerivationErrorsTotal prometheus.Counter DerivationErrors *EventMetrics
SequencingErrorsTotal prometheus.Counter SequencingErrors *EventMetrics
PublishingErrorsTotal prometheus.Counter PublishingErrors *EventMetrics
Heads *prometheus.GaugeVec
RefsNumber *prometheus.GaugeVec
RefsTime *prometheus.GaugeVec
RefsHash *prometheus.GaugeVec
RefsSeqNr *prometheus.GaugeVec
RefsLatency *prometheus.GaugeVec
// hash of the last seen block per name, so we don't reduce/increase latency on updates of the same data,
// and only count the first occurrence
LatencySeen map[string]common.Hash
L1ReorgDepth prometheus.Histogram L1ReorgDepth prometheus.Histogram
TransactionsSequencedTotal prometheus.Counter TransactionsSequencedTotal prometheus.Counter
...@@ -129,43 +142,54 @@ func NewMetrics(procName string) *Metrics { ...@@ -129,43 +142,54 @@ func NewMetrics(procName string) *Metrics {
Name: "derivation_idle", Name: "derivation_idle",
Help: "1 if the derivation pipeline is idle", Help: "1 if the derivation pipeline is idle",
}), }),
PipelineResetsTotal: promauto.With(registry).NewCounter(prometheus.CounterOpts{
Namespace: ns, PipelineResets: NewEventMetrics(registry, ns, "pipeline_resets", "derivation pipeline resets"),
Name: "pipeline_resets_total", UnsafePayloads: NewEventMetrics(registry, ns, "unsafe_payloads", "unsafe payloads"),
Help: "Count of derivation pipeline resets", DerivationErrors: NewEventMetrics(registry, ns, "derivation_errors", "derivation errors"),
}), SequencingErrors: NewEventMetrics(registry, ns, "sequencing_errors", "sequencing errors"),
LastPipelineResetUnix: promauto.With(registry).NewGauge(prometheus.GaugeOpts{ PublishingErrors: NewEventMetrics(registry, ns, "publishing_errors", "p2p publishing errors"),
Namespace: ns,
Name: "last_pipeline_reset_unix", RefsNumber: promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
Help: "Timestamp of last pipeline reset",
}),
UnsafePayloadsTotal: promauto.With(registry).NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "unsafe_payloads_total", Name: "refs_number",
Help: "Count of unsafe payloads received via p2p", Help: "Gauge representing the different L1/L2 reference block numbers",
}, []string{
"layer",
"type",
}), }),
DerivationErrorsTotal: promauto.With(registry).NewCounter(prometheus.CounterOpts{ RefsTime: promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Name: "derivation_errors_total", Name: "refs_time",
Help: "Count of total derivation errors", Help: "Gauge representing the different L1/L2 reference block timestamps",
}, []string{
"layer",
"type",
}), }),
SequencingErrorsTotal: promauto.With(registry).NewCounter(prometheus.CounterOpts{ RefsHash: promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Name: "sequencing_errors_total", Name: "refs_hash",
Help: "Count of total sequencing errors", Help: "Gauge representing the different L1/L2 reference block hashes truncated to float values",
}, []string{
"layer",
"type",
}), }),
PublishingErrorsTotal: promauto.With(registry).NewCounter(prometheus.CounterOpts{ RefsSeqNr: promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Name: "publishing_errors_total", Name: "refs_seqnr",
Help: "Count of total p2p publishing errors", Help: "Gauge representing the different L2 reference sequence numbers",
}, []string{
"type",
}), }),
Heads: promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{ RefsLatency: promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Name: "heads", Name: "refs_latency",
Help: "Gauge representing the different L1/L2 heads", Help: "Gauge representing the different L1/L2 reference block timestamps minus current time, in seconds",
}, []string{ }, []string{
"layer",
"type", "type",
}), }),
LatencySeen: make(map[string]common.Hash),
L1ReorgDepth: promauto.With(registry).NewHistogram(prometheus.HistogramOpts{ L1ReorgDepth: promauto.With(registry).NewHistogram(prometheus.HistogramOpts{
Namespace: ns, Namespace: ns,
Name: "l1_reorg_depth", Name: "l1_reorg_depth",
...@@ -250,13 +274,54 @@ func (m *Metrics) SetDerivationIdle(status bool) { ...@@ -250,13 +274,54 @@ func (m *Metrics) SetDerivationIdle(status bool) {
m.DerivationIdle.Set(val) m.DerivationIdle.Set(val)
} }
func (m *Metrics) SetHead(kind string, num uint64) { func (m *Metrics) RecordPipelineReset() {
m.Heads.WithLabelValues(kind).Set(float64(num)) m.PipelineResets.RecordEvent()
} }
func (m *Metrics) RecordPipelineReset() { func (m *Metrics) RecordSequencingError() {
m.PipelineResetsTotal.Inc() m.SequencingErrors.RecordEvent()
m.LastPipelineResetUnix.Set(float64(time.Now().Unix())) }
func (m *Metrics) RecordPublishingError() {
m.PublishingErrors.RecordEvent()
}
func (m *Metrics) RecordDerivationError() {
m.DerivationErrors.RecordEvent()
}
func (m *Metrics) RecordReceivedUnsafePayload(payload *eth.ExecutionPayload) {
m.UnsafePayloads.RecordEvent()
m.recordRef("l2", "received_payload", uint64(payload.BlockNumber), uint64(payload.Timestamp), payload.BlockHash)
}
func (m *Metrics) recordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash) {
m.RefsNumber.WithLabelValues(layer, name).Set(float64(num))
if timestamp != 0 {
m.RefsTime.WithLabelValues(layer, name).Set(float64(timestamp))
// only meter the latency when we first see this hash for the given label name
if m.LatencySeen[name] != h {
m.LatencySeen[name] = h
m.RefsLatency.WithLabelValues(layer, name).Set(float64(timestamp) - (float64(time.Now().UnixNano()) / 1e9))
}
}
// we map the first 8 bytes to a float64, so we can graph changes of the hash to find divergences visually.
// We don't do math.Float64frombits, just a regular conversion, to keep the value within a manageable range.
m.RefsHash.WithLabelValues(layer, name).Set(float64(binary.LittleEndian.Uint64(h[:])))
}
func (m *Metrics) RecordL1Ref(name string, ref eth.L1BlockRef) {
m.recordRef("l1", name, ref.Number, ref.Time, ref.Hash)
}
func (m *Metrics) RecordL2Ref(name string, ref eth.L2BlockRef) {
m.recordRef("l2", name, ref.Number, ref.Time, ref.Hash)
m.recordRef("l1_origin", name, ref.L1Origin.Number, 0, ref.L1Origin.Hash)
m.RefsSeqNr.WithLabelValues(name).Set(float64(ref.SequenceNumber))
}
func (m *Metrics) CountSequencedTxs(count int) {
m.TransactionsSequencedTotal.Add(float64(count))
} }
func (m *Metrics) RecordL1ReorgDepth(d uint64) { func (m *Metrics) RecordL1ReorgDepth(d uint64) {
......
...@@ -4,8 +4,6 @@ import ( ...@@ -4,8 +4,6 @@ import (
"context" "context"
"math/big" "math/big"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/l1" "github.com/ethereum-optimism/optimism/op-node/l1"
"github.com/ethereum-optimism/optimism/op-node/l2" "github.com/ethereum-optimism/optimism/op-node/l2"
...@@ -20,6 +18,23 @@ type Driver struct { ...@@ -20,6 +18,23 @@ type Driver struct {
s *state s *state
} }
type Metrics interface {
RecordPipelineReset()
RecordSequencingError()
RecordPublishingError()
RecordDerivationError()
RecordReceivedUnsafePayload(payload *eth.ExecutionPayload)
RecordL1Ref(name string, ref eth.L1BlockRef)
RecordL2Ref(name string, ref eth.L2BlockRef)
SetDerivationIdle(idle bool)
RecordL1ReorgDepth(d uint64)
CountSequencedTxs(count int)
}
type Downloader interface { type Downloader interface {
InfoByHash(ctx context.Context, hash common.Hash) (eth.L1Info, error) InfoByHash(ctx context.Context, hash common.Hash) (eth.L1Info, error)
Fetch(ctx context.Context, blockHash common.Hash) (eth.L1Info, types.Transactions, types.Receipts, error) Fetch(ctx context.Context, blockHash common.Hash) (eth.L1Info, types.Transactions, types.Receipts, error)
...@@ -58,7 +73,7 @@ type Network interface { ...@@ -58,7 +73,7 @@ type Network interface {
PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error
} }
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 *l2.Source, l1 *l1.Source, network Network, log log.Logger, snapshotLog log.Logger, metrics *metrics.Metrics) *Driver { func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 *l2.Source, l1 *l1.Source, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver {
output := &outputImpl{ output := &outputImpl{
Config: cfg, Config: cfg,
dl: l1, dl: l1,
......
...@@ -11,7 +11,6 @@ import ( ...@@ -11,7 +11,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/backoff" "github.com/ethereum-optimism/optimism/op-node/backoff"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -75,7 +74,7 @@ type state struct { ...@@ -75,7 +74,7 @@ type state struct {
output outputInterface output outputInterface
network Network // may be nil, network for is optional network Network // may be nil, network for is optional
metrics *metrics.Metrics metrics Metrics
log log.Logger log log.Logger
snapshotLog log.Logger snapshotLog log.Logger
done chan struct{} done chan struct{}
...@@ -86,7 +85,7 @@ type state struct { ...@@ -86,7 +85,7 @@ type state struct {
// NewState creates a new driver state. State changes take effect though // NewState creates a new driver state. State changes take effect though
// the given output, derivation pipeline and network interfaces. // the given output, derivation pipeline and network interfaces.
func NewState(driverCfg *Config, log log.Logger, snapshotLog log.Logger, config *rollup.Config, l1Chain L1Chain, l2Chain L2Chain, func NewState(driverCfg *Config, log log.Logger, snapshotLog log.Logger, config *rollup.Config, l1Chain L1Chain, l2Chain L2Chain,
output outputInterface, derivationPipeline DerivationPipeline, network Network, metrics *metrics.Metrics) *state { output outputInterface, derivationPipeline DerivationPipeline, network Network, metrics Metrics) *state {
return &state{ return &state{
derivation: derivationPipeline, derivation: derivationPipeline,
idleDerivation: false, idleDerivation: false,
...@@ -116,8 +115,8 @@ func (s *state) Start(ctx context.Context) error { ...@@ -116,8 +115,8 @@ func (s *state) Start(ctx context.Context) error {
} }
s.l1Head = l1Head s.l1Head = l1Head
s.l2Head, _ = s.l2.L2BlockRefByNumber(ctx, nil) s.l2Head, _ = s.l2.L2BlockRefByNumber(ctx, nil)
s.metrics.SetHead("l1", s.l1Head.Number) s.metrics.RecordL1Ref("l1_head", s.l1Head)
s.metrics.SetHead("l2_unsafe", s.l2Head.Number) s.metrics.RecordL2Ref("l2_unsafe", s.l2Head)
s.derivation.Reset() s.derivation.Reset()
...@@ -167,7 +166,7 @@ func (s *state) handleNewL1Block(newL1Head eth.L1BlockRef) { ...@@ -167,7 +166,7 @@ func (s *state) handleNewL1Block(newL1Head eth.L1BlockRef) {
// This could either be a long L1 extension, or a reorg. Both can be handled the same way. // This could either be a long L1 extension, or a reorg. Both can be handled the same way.
s.log.Warn("L1 Head signal indicates an L1 re-org", "old_l1_head", s.l1Head, "new_l1_head_parent", newL1Head.ParentHash, "new_l1_head", newL1Head) s.log.Warn("L1 Head signal indicates an L1 re-org", "old_l1_head", s.l1Head, "new_l1_head_parent", newL1Head.ParentHash, "new_l1_head", newL1Head)
} }
s.metrics.SetHead("l1", newL1Head.Number) s.metrics.RecordL1Ref("l1_head", newL1Head)
s.l1Head = newL1Head s.l1Head = newL1Head
} }
...@@ -255,12 +254,12 @@ func (s *state) createNewL2Block(ctx context.Context) error { ...@@ -255,12 +254,12 @@ func (s *state) createNewL2Block(ctx context.Context) error {
s.l2Head = newUnsafeL2Head s.l2Head = newUnsafeL2Head
s.log.Info("Sequenced new l2 block", "l2Head", s.l2Head, "l1Origin", s.l2Head.L1Origin, "txs", len(payload.Transactions), "time", s.l2Head.Time) s.log.Info("Sequenced new l2 block", "l2Head", s.l2Head, "l1Origin", s.l2Head.L1Origin, "txs", len(payload.Transactions), "time", s.l2Head.Time)
s.metrics.TransactionsSequencedTotal.Add(float64(len(payload.Transactions))) s.metrics.CountSequencedTxs(len(payload.Transactions))
if s.network != nil { if s.network != nil {
if err := s.network.PublishL2Payload(ctx, payload); err != nil { if err := s.network.PublishL2Payload(ctx, payload); err != nil {
s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err) s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err)
s.metrics.PublishingErrorsTotal.Inc() s.metrics.RecordPublishingError()
// publishing of unsafe data via p2p is optional. Errors are not severe enough to change/halt sequencing but should be logged and metered. // publishing of unsafe data via p2p is optional. Errors are not severe enough to change/halt sequencing but should be logged and metered.
} }
} }
...@@ -357,7 +356,7 @@ func (s *state) eventLoop() { ...@@ -357,7 +356,7 @@ func (s *state) eventLoop() {
cancel() cancel()
if err != nil { if err != nil {
s.log.Error("Error creating new L2 block", "err", err) s.log.Error("Error creating new L2 block", "err", err)
s.metrics.SequencingErrorsTotal.Inc() s.metrics.RecordSequencingError()
break // if we fail, we wait for the next block creation trigger. break // if we fail, we wait for the next block creation trigger.
} }
...@@ -374,7 +373,7 @@ func (s *state) eventLoop() { ...@@ -374,7 +373,7 @@ func (s *state) eventLoop() {
s.snapshot("New unsafe payload") s.snapshot("New unsafe payload")
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID()) s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID())
s.derivation.AddUnsafePayload(payload) s.derivation.AddUnsafePayload(payload)
s.metrics.UnsafePayloadsTotal.Inc() s.metrics.RecordReceivedUnsafePayload(payload)
reqStep() reqStep()
case newL1Head := <-s.l1Heads: case newL1Head := <-s.l1Heads:
...@@ -422,10 +421,11 @@ func (s *state) eventLoop() { ...@@ -422,10 +421,11 @@ func (s *state) eventLoop() {
// log sync progress when it changes // log sync progress when it changes
if s.l2Finalized != finalized || s.l2SafeHead != safe || s.l2Head != unsafe { if s.l2Finalized != finalized || s.l2SafeHead != safe || s.l2Head != unsafe {
s.log.Info("Sync progress", "finalized", finalized, "safe", safe, "unsafe", unsafe) s.log.Info("Sync progress", "finalized", finalized, "safe", safe, "unsafe", unsafe)
s.metrics.SetHead("l2_finalized", finalized.Number) s.metrics.RecordL2Ref("l2_finalized", finalized)
s.metrics.SetHead("l2_safe", safe.Number) s.metrics.RecordL2Ref("l2_safe", safe)
s.metrics.SetHead("l2_unsafe", unsafe.Number) s.metrics.RecordL2Ref("l2_unsafe", unsafe)
} }
s.metrics.RecordL1Ref("l1_derived", s.derivation.Progress().Origin)
// update the heads // update the heads
s.l2Finalized = finalized s.l2Finalized = finalized
s.l2SafeHead = safe s.l2SafeHead = safe
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment