Commit 83e113fb authored by John Chase's avatar John Chase Committed by GitHub

op-node: Use RefMetrics and Event from op-service/metrics package (#7329)

* fix issue 6735

* delete NewEventLight

* deduplicate refmetrics, ensure only initialized metrics exist

---------
Co-authored-by: default avatarprotolambda <proto@protolambda.com>
parent ef469a9a
package metrics
import (
"fmt"
"time"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type EventMetrics struct {
Total prometheus.Counter
LastTime prometheus.Gauge
}
func (e *EventMetrics) RecordEvent() {
e.Total.Inc()
e.LastTime.Set(float64(time.Now().Unix()))
}
func NewEventMetrics(factory metrics.Factory, ns string, name string, displayName string) *EventMetrics {
return &EventMetrics{
Total: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: fmt.Sprintf("%s_total", name),
Help: fmt.Sprintf("Count of %s events", displayName),
}),
LastTime: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: fmt.Sprintf("last_%s_unix", name),
Help: fmt.Sprintf("Timestamp of last %s event", displayName),
}),
}
}
...@@ -3,7 +3,6 @@ package metrics ...@@ -3,7 +3,6 @@ package metrics
import ( import (
"context" "context"
"encoding/binary"
"errors" "errors"
"fmt" "fmt"
"net" "net"
...@@ -50,7 +49,7 @@ type Metricer interface { ...@@ -50,7 +49,7 @@ type Metricer interface {
RecordPublishingError() RecordPublishingError()
RecordDerivationError() RecordDerivationError()
RecordReceivedUnsafePayload(payload *eth.ExecutionPayload) RecordReceivedUnsafePayload(payload *eth.ExecutionPayload)
recordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash) RecordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash)
RecordL1Ref(name string, ref eth.L1BlockRef) RecordL1Ref(name string, ref eth.L1BlockRef)
RecordL2Ref(name string, ref eth.L2BlockRef) RecordL2Ref(name string, ref eth.L2BlockRef)
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID) RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
...@@ -99,11 +98,11 @@ type Metrics struct { ...@@ -99,11 +98,11 @@ type Metrics struct {
DerivationIdle prometheus.Gauge DerivationIdle prometheus.Gauge
PipelineResets *EventMetrics PipelineResets *metrics.Event
UnsafePayloads *EventMetrics UnsafePayloads *metrics.Event
DerivationErrors *EventMetrics DerivationErrors *metrics.Event
SequencingErrors *EventMetrics SequencingErrors *metrics.Event
PublishingErrors *EventMetrics PublishingErrors *metrics.Event
P2PReqDurationSeconds *prometheus.HistogramVec P2PReqDurationSeconds *prometheus.HistogramVec
P2PReqTotal *prometheus.CounterVec P2PReqTotal *prometheus.CounterVec
...@@ -111,8 +110,8 @@ type Metrics struct { ...@@ -111,8 +110,8 @@ type Metrics struct {
PayloadsQuarantineTotal prometheus.Gauge PayloadsQuarantineTotal prometheus.Gauge
SequencerInconsistentL1Origin *EventMetrics SequencerInconsistentL1Origin *metrics.Event
SequencerResets *EventMetrics SequencerResets *metrics.Event
L1RequestDurationSeconds *prometheus.HistogramVec L1RequestDurationSeconds *prometheus.HistogramVec
...@@ -125,23 +124,16 @@ type Metrics struct { ...@@ -125,23 +124,16 @@ type Metrics struct {
UnsafePayloadsBufferLen prometheus.Gauge UnsafePayloadsBufferLen prometheus.Gauge
UnsafePayloadsBufferMemSize prometheus.Gauge UnsafePayloadsBufferMemSize prometheus.Gauge
RefsNumber *prometheus.GaugeVec metrics.RefMetrics
RefsTime *prometheus.GaugeVec
RefsHash *prometheus.GaugeVec
RefsSeqNr *prometheus.GaugeVec
RefsLatency *prometheus.GaugeVec
// hash of the last seen block per name, so we don't reduce/increase latency on updates of the same data,
// and only count the first occurrence
LatencySeen map[string]common.Hash
L1ReorgDepth prometheus.Histogram L1ReorgDepth prometheus.Histogram
TransactionsSequencedTotal prometheus.Counter TransactionsSequencedTotal prometheus.Counter
// Channel Bank Metrics // Channel Bank Metrics
headChannelOpenedEvent *EventMetrics headChannelOpenedEvent *metrics.Event
channelTimedOutEvent *EventMetrics channelTimedOutEvent *metrics.Event
frameAddedEvent *EventMetrics frameAddedEvent *metrics.Event
// P2P Metrics // P2P Metrics
PeerCount prometheus.Gauge PeerCount prometheus.Gauge
...@@ -247,14 +239,14 @@ func NewMetrics(procName string) *Metrics { ...@@ -247,14 +239,14 @@ func NewMetrics(procName string) *Metrics {
Help: "1 if the derivation pipeline is idle", Help: "1 if the derivation pipeline is idle",
}), }),
PipelineResets: NewEventMetrics(factory, ns, "pipeline_resets", "derivation pipeline resets"), PipelineResets: metrics.NewEvent(factory, ns, "", "pipeline_resets", "derivation pipeline resets"),
UnsafePayloads: NewEventMetrics(factory, ns, "unsafe_payloads", "unsafe payloads"), UnsafePayloads: metrics.NewEvent(factory, ns, "", "unsafe_payloads", "unsafe payloads"),
DerivationErrors: NewEventMetrics(factory, ns, "derivation_errors", "derivation errors"), DerivationErrors: metrics.NewEvent(factory, ns, "", "derivation_errors", "derivation errors"),
SequencingErrors: NewEventMetrics(factory, ns, "sequencing_errors", "sequencing errors"), SequencingErrors: metrics.NewEvent(factory, ns, "", "sequencing_errors", "sequencing errors"),
PublishingErrors: NewEventMetrics(factory, ns, "publishing_errors", "p2p publishing errors"), PublishingErrors: metrics.NewEvent(factory, ns, "", "publishing_errors", "p2p publishing errors"),
SequencerInconsistentL1Origin: NewEventMetrics(factory, ns, "sequencer_inconsistent_l1_origin", "events when the sequencer selects an inconsistent L1 origin"), SequencerInconsistentL1Origin: metrics.NewEvent(factory, ns, "", "sequencer_inconsistent_l1_origin", "events when the sequencer selects an inconsistent L1 origin"),
SequencerResets: NewEventMetrics(factory, ns, "sequencer_resets", "sequencer resets"), SequencerResets: metrics.NewEvent(factory, ns, "", "sequencer_resets", "sequencer resets"),
UnsafePayloadsBufferLen: factory.NewGauge(prometheus.GaugeOpts{ UnsafePayloadsBufferLen: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
...@@ -267,46 +259,7 @@ func NewMetrics(procName string) *Metrics { ...@@ -267,46 +259,7 @@ func NewMetrics(procName string) *Metrics {
Help: "Total estimated memory size of buffered L2 unsafe payloads", Help: "Total estimated memory size of buffered L2 unsafe payloads",
}), }),
RefsNumber: factory.NewGaugeVec(prometheus.GaugeOpts{ RefMetrics: metrics.MakeRefMetrics(ns, factory),
Namespace: ns,
Name: "refs_number",
Help: "Gauge representing the different L1/L2 reference block numbers",
}, []string{
"layer",
"type",
}),
RefsTime: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "refs_time",
Help: "Gauge representing the different L1/L2 reference block timestamps",
}, []string{
"layer",
"type",
}),
RefsHash: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "refs_hash",
Help: "Gauge representing the different L1/L2 reference block hashes truncated to float values",
}, []string{
"layer",
"type",
}),
RefsSeqNr: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "refs_seqnr",
Help: "Gauge representing the different L2 reference sequence numbers",
}, []string{
"type",
}),
RefsLatency: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "refs_latency",
Help: "Gauge representing the different L1/L2 reference block timestamps minus current time, in seconds",
}, []string{
"layer",
"type",
}),
LatencySeen: make(map[string]common.Hash),
L1ReorgDepth: factory.NewHistogram(prometheus.HistogramOpts{ L1ReorgDepth: factory.NewHistogram(prometheus.HistogramOpts{
Namespace: ns, Namespace: ns,
...@@ -380,9 +333,9 @@ func NewMetrics(procName string) *Metrics { ...@@ -380,9 +333,9 @@ func NewMetrics(procName string) *Metrics {
Help: "Count of incoming dial attempts to accept, with label to filter to allowed attempts", Help: "Count of incoming dial attempts to accept, with label to filter to allowed attempts",
}, []string{"allow"}), }, []string{"allow"}),
headChannelOpenedEvent: NewEventMetrics(factory, ns, "head_channel", "New channel at the front of the channel bank"), headChannelOpenedEvent: metrics.NewEvent(factory, ns, "", "head_channel", "New channel at the front of the channel bank"),
channelTimedOutEvent: NewEventMetrics(factory, ns, "channel_timeout", "Channel has timed out"), channelTimedOutEvent: metrics.NewEvent(factory, ns, "", "channel_timeout", "Channel has timed out"),
frameAddedEvent: NewEventMetrics(factory, ns, "frame_added", "New frame ingested in the channel bank"), frameAddedEvent: metrics.NewEvent(factory, ns, "", "frame_added", "New frame ingested in the channel bank"),
ChannelInputBytes: factory.NewCounter(prometheus.CounterOpts{ ChannelInputBytes: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
...@@ -570,53 +523,28 @@ func (m *Metrics) SetDerivationIdle(status bool) { ...@@ -570,53 +523,28 @@ func (m *Metrics) SetDerivationIdle(status bool) {
} }
func (m *Metrics) RecordPipelineReset() { func (m *Metrics) RecordPipelineReset() {
m.PipelineResets.RecordEvent() m.PipelineResets.Record()
} }
func (m *Metrics) RecordSequencingError() { func (m *Metrics) RecordSequencingError() {
m.SequencingErrors.RecordEvent() m.SequencingErrors.Record()
} }
func (m *Metrics) RecordPublishingError() { func (m *Metrics) RecordPublishingError() {
m.PublishingErrors.RecordEvent() m.PublishingErrors.Record()
} }
func (m *Metrics) RecordDerivationError() { func (m *Metrics) RecordDerivationError() {
m.DerivationErrors.RecordEvent() m.DerivationErrors.Record()
} }
func (m *Metrics) RecordReceivedUnsafePayload(payload *eth.ExecutionPayload) { func (m *Metrics) RecordReceivedUnsafePayload(payload *eth.ExecutionPayload) {
m.UnsafePayloads.RecordEvent() m.UnsafePayloads.Record()
m.recordRef("l2", "received_payload", uint64(payload.BlockNumber), uint64(payload.Timestamp), payload.BlockHash) m.RecordRef("l2", "received_payload", uint64(payload.BlockNumber), uint64(payload.Timestamp), payload.BlockHash)
}
func (m *Metrics) recordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash) {
m.RefsNumber.WithLabelValues(layer, name).Set(float64(num))
if timestamp != 0 {
m.RefsTime.WithLabelValues(layer, name).Set(float64(timestamp))
// only meter the latency when we first see this hash for the given label name
if m.LatencySeen[name] != h {
m.LatencySeen[name] = h
m.RefsLatency.WithLabelValues(layer, name).Set(float64(timestamp) - (float64(time.Now().UnixNano()) / 1e9))
}
}
// we map the first 8 bytes to a float64, so we can graph changes of the hash to find divergences visually.
// We don't do math.Float64frombits, just a regular conversion, to keep the value within a manageable range.
m.RefsHash.WithLabelValues(layer, name).Set(float64(binary.LittleEndian.Uint64(h[:])))
}
func (m *Metrics) RecordL1Ref(name string, ref eth.L1BlockRef) {
m.recordRef("l1", name, ref.Number, ref.Time, ref.Hash)
}
func (m *Metrics) RecordL2Ref(name string, ref eth.L2BlockRef) {
m.recordRef("l2", name, ref.Number, ref.Time, ref.Hash)
m.recordRef("l1_origin", name, ref.L1Origin.Number, 0, ref.L1Origin.Hash)
m.RefsSeqNr.WithLabelValues(name).Set(float64(ref.SequenceNumber))
} }
func (m *Metrics) RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID) { func (m *Metrics) RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID) {
m.recordRef("l2", "l2_buffer_unsafe", next.Number, 0, next.Hash) m.RecordRef("l2", "l2_buffer_unsafe", next.Number, 0, next.Hash)
m.UnsafePayloadsBufferLen.Set(float64(length)) m.UnsafePayloadsBufferLen.Set(float64(length))
m.UnsafePayloadsBufferMemSize.Set(float64(memSize)) m.UnsafePayloadsBufferMemSize.Set(float64(memSize))
} }
...@@ -630,13 +558,13 @@ func (m *Metrics) RecordL1ReorgDepth(d uint64) { ...@@ -630,13 +558,13 @@ func (m *Metrics) RecordL1ReorgDepth(d uint64) {
} }
func (m *Metrics) RecordSequencerInconsistentL1Origin(from eth.BlockID, to eth.BlockID) { func (m *Metrics) RecordSequencerInconsistentL1Origin(from eth.BlockID, to eth.BlockID) {
m.SequencerInconsistentL1Origin.RecordEvent() m.SequencerInconsistentL1Origin.Record()
m.recordRef("l1_origin", "inconsistent_from", from.Number, 0, from.Hash) m.RecordRef("l1_origin", "inconsistent_from", from.Number, 0, from.Hash)
m.recordRef("l1_origin", "inconsistent_to", to.Number, 0, to.Hash) m.RecordRef("l1_origin", "inconsistent_to", to.Number, 0, to.Hash)
} }
func (m *Metrics) RecordSequencerReset() { func (m *Metrics) RecordSequencerReset() {
m.SequencerResets.RecordEvent() m.SequencerResets.Record()
} }
func (m *Metrics) RecordGossipEvent(evType int32) { func (m *Metrics) RecordGossipEvent(evType int32) {
...@@ -740,15 +668,15 @@ func (m *Metrics) RecordChannelInputBytes(inputCompressedBytes int) { ...@@ -740,15 +668,15 @@ func (m *Metrics) RecordChannelInputBytes(inputCompressedBytes int) {
} }
func (m *Metrics) RecordHeadChannelOpened() { func (m *Metrics) RecordHeadChannelOpened() {
m.headChannelOpenedEvent.RecordEvent() m.headChannelOpenedEvent.Record()
} }
func (m *Metrics) RecordChannelTimedOut() { func (m *Metrics) RecordChannelTimedOut() {
m.channelTimedOutEvent.RecordEvent() m.channelTimedOutEvent.Record()
} }
func (m *Metrics) RecordFrame() { func (m *Metrics) RecordFrame() {
m.frameAddedEvent.RecordEvent() m.frameAddedEvent.Record()
} }
func (m *Metrics) RecordPeerUnban() { func (m *Metrics) RecordPeerUnban() {
...@@ -821,7 +749,7 @@ func (n *noopMetricer) RecordDerivationError() { ...@@ -821,7 +749,7 @@ func (n *noopMetricer) RecordDerivationError() {
func (n *noopMetricer) RecordReceivedUnsafePayload(payload *eth.ExecutionPayload) { func (n *noopMetricer) RecordReceivedUnsafePayload(payload *eth.ExecutionPayload) {
} }
func (n *noopMetricer) recordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash) { func (n *noopMetricer) RecordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash) {
} }
func (n *noopMetricer) RecordL1Ref(name string, ref eth.L1BlockRef) { func (n *noopMetricer) RecordL1Ref(name string, ref eth.L1BlockRef) {
......
...@@ -16,8 +16,8 @@ func (e *Event) Record() { ...@@ -16,8 +16,8 @@ func (e *Event) Record() {
e.LastTime.SetToCurrentTime() e.LastTime.SetToCurrentTime()
} }
func NewEvent(factory Factory, ns string, subsystem string, name string, displayName string) Event { func NewEvent(factory Factory, ns string, subsystem string, name string, displayName string) *Event {
return Event{ return &Event{
Total: factory.NewCounter(prometheus.CounterOpts{ Total: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: fmt.Sprintf("%s_total", name), Name: fmt.Sprintf("%s_total", name),
......
...@@ -27,7 +27,7 @@ type TxMetrics struct { ...@@ -27,7 +27,7 @@ type TxMetrics struct {
currentNonce prometheus.Gauge currentNonce prometheus.Gauge
pendingTxs prometheus.Gauge pendingTxs prometheus.Gauge
txPublishError *prometheus.CounterVec txPublishError *prometheus.CounterVec
publishEvent metrics.Event publishEvent *metrics.Event
confirmEvent metrics.EventVec confirmEvent metrics.EventVec
rpcError prometheus.Counter rpcError prometheus.Counter
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment