Commit 01356153 authored by protolambda's avatar protolambda Committed by GitHub

op-node: re-enable and improve event metrics (#11137)

parent d5109103
...@@ -39,8 +39,8 @@ type Metricer interface { ...@@ -39,8 +39,8 @@ type Metricer interface {
RecordSequencingError() RecordSequencingError()
RecordPublishingError() RecordPublishingError()
RecordDerivationError() RecordDerivationError()
RecordEmittedEvent(name string) RecordEmittedEvent(eventName string, emitter string)
RecordProcessedEvent(name string) RecordProcessedEvent(eventName string, deriver string, duration time.Duration)
RecordEventsRateLimited() RecordEventsRateLimited()
RecordReceivedUnsafePayload(payload *eth.ExecutionPayloadEnvelope) RecordReceivedUnsafePayload(payload *eth.ExecutionPayloadEnvelope)
RecordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash) RecordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash)
...@@ -98,6 +98,13 @@ type Metrics struct { ...@@ -98,6 +98,13 @@ type Metrics struct {
EmittedEvents *prometheus.CounterVec EmittedEvents *prometheus.CounterVec
ProcessedEvents *prometheus.CounterVec ProcessedEvents *prometheus.CounterVec
// We don't use a histogram for observing time durations,
// as each vec entry (event-type, deriver type) is synchronous with other occurrences of the same entry key,
// so we can get a reasonably good understanding of execution by looking at the rate.
// Bucketing to detect outliers would be nice, but also increases the overhead by a lot,
// where we already track many event-type/deriver combinations.
EventsProcessTime *prometheus.CounterVec
EventsRateLimited *metrics.Event EventsRateLimited *metrics.Event
DerivedBatches metrics.EventVec DerivedBatches metrics.EventVec
...@@ -209,7 +216,7 @@ func NewMetrics(procName string) *Metrics { ...@@ -209,7 +216,7 @@ func NewMetrics(procName string) *Metrics {
Subsystem: "events", Subsystem: "events",
Name: "emitted", Name: "emitted",
Help: "number of emitted events", Help: "number of emitted events",
}, []string{"event_type"}), }, []string{"event_type", "emitter"}),
ProcessedEvents: factory.NewCounterVec( ProcessedEvents: factory.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
...@@ -217,7 +224,15 @@ func NewMetrics(procName string) *Metrics { ...@@ -217,7 +224,15 @@ func NewMetrics(procName string) *Metrics {
Subsystem: "events", Subsystem: "events",
Name: "processed", Name: "processed",
Help: "number of processed events", Help: "number of processed events",
}, []string{"event_type"}), }, []string{"event_type", "deriver"}),
EventsProcessTime: factory.NewCounterVec(
prometheus.CounterOpts{
Namespace: ns,
Subsystem: "events",
Name: "process_time",
Help: "total duration in seconds of processed events",
}, []string{"event_type", "deriver"}),
EventsRateLimited: metrics.NewEvent(factory, ns, "events", "rate_limited", "events rate limiter hits"), EventsRateLimited: metrics.NewEvent(factory, ns, "events", "rate_limited", "events rate limiter hits"),
...@@ -467,12 +482,15 @@ func (m *Metrics) RecordPublishingError() { ...@@ -467,12 +482,15 @@ func (m *Metrics) RecordPublishingError() {
m.PublishingErrors.Record() m.PublishingErrors.Record()
} }
func (m *Metrics) RecordEmittedEvent(name string) { func (m *Metrics) RecordEmittedEvent(eventName string, emitter string) {
m.EmittedEvents.WithLabelValues(name).Inc() m.EmittedEvents.WithLabelValues(eventName, emitter).Inc()
} }
func (m *Metrics) RecordProcessedEvent(name string) { func (m *Metrics) RecordProcessedEvent(eventName string, deriver string, duration time.Duration) {
m.ProcessedEvents.WithLabelValues(name).Inc() m.ProcessedEvents.WithLabelValues(eventName, deriver).Inc()
// We take the absolute value; if the clock was not monotonically increased between start and top,
// there still was a duration gap. And the Counter metrics-type would panic if the duration is negative.
m.EventsProcessTime.WithLabelValues(eventName, deriver).Add(float64(duration.Abs()) / float64(time.Second))
} }
func (m *Metrics) RecordEventsRateLimited() { func (m *Metrics) RecordEventsRateLimited() {
...@@ -680,10 +698,10 @@ func (n *noopMetricer) RecordPublishingError() { ...@@ -680,10 +698,10 @@ func (n *noopMetricer) RecordPublishingError() {
func (n *noopMetricer) RecordDerivationError() { func (n *noopMetricer) RecordDerivationError() {
} }
func (n *noopMetricer) RecordEmittedEvent(name string) { func (n *noopMetricer) RecordEmittedEvent(eventName string, emitter string) {
} }
func (n *noopMetricer) RecordProcessedEvent(name string) { func (n *noopMetricer) RecordProcessedEvent(eventName string, deriver string, duration time.Duration) {
} }
func (n *noopMetricer) RecordEventsRateLimited() { func (n *noopMetricer) RecordEventsRateLimited() {
......
...@@ -179,6 +179,7 @@ func NewDriver( ...@@ -179,6 +179,7 @@ func NewDriver(
drain = s.Drain drain = s.Drain
} }
sys := event.NewSystem(log, executor) sys := event.NewSystem(log, executor)
sys.AddTracer(event.NewMetricsTracer(metrics))
opts := event.DefaultRegisterOpts() opts := event.DefaultRegisterOpts()
......
package event package event
import "time"
type Metrics interface { type Metrics interface {
RecordEmittedEvent(name string) RecordEmittedEvent(eventName string, emitter string)
RecordProcessedEvent(name string) RecordProcessedEvent(eventName string, deriver string, duration time.Duration)
RecordEventsRateLimited() RecordEventsRateLimited()
} }
type NoopMetrics struct { type NoopMetrics struct {
} }
func (n NoopMetrics) RecordEmittedEvent(name string) {} func (n NoopMetrics) RecordEmittedEvent(eventName string, emitter string) {}
func (n NoopMetrics) RecordProcessedEvent(name string) {} func (n NoopMetrics) RecordProcessedEvent(eventName string, deriver string, duration time.Duration) {}
func (n NoopMetrics) RecordEventsRateLimited() {} func (n NoopMetrics) RecordEventsRateLimited() {}
......
...@@ -8,6 +8,10 @@ type MetricsTracer struct { ...@@ -8,6 +8,10 @@ type MetricsTracer struct {
var _ Tracer = (*MetricsTracer)(nil) var _ Tracer = (*MetricsTracer)(nil)
func NewMetricsTracer(m Metrics) *MetricsTracer {
return &MetricsTracer{metrics: m}
}
func (mt *MetricsTracer) OnDeriveStart(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time) { func (mt *MetricsTracer) OnDeriveStart(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time) {
} }
...@@ -15,7 +19,7 @@ func (mt *MetricsTracer) OnDeriveEnd(name string, ev AnnotatedEvent, derivContex ...@@ -15,7 +19,7 @@ func (mt *MetricsTracer) OnDeriveEnd(name string, ev AnnotatedEvent, derivContex
if !effect { // don't count events that were just pass-through and not of any effect if !effect { // don't count events that were just pass-through and not of any effect
return return
} }
mt.metrics.RecordProcessedEvent(ev.Event.String()) mt.metrics.RecordProcessedEvent(ev.Event.String(), name, duration)
} }
func (mt *MetricsTracer) OnRateLimited(name string, derivContext uint64) { func (mt *MetricsTracer) OnRateLimited(name string, derivContext uint64) {
...@@ -23,5 +27,5 @@ func (mt *MetricsTracer) OnRateLimited(name string, derivContext uint64) { ...@@ -23,5 +27,5 @@ func (mt *MetricsTracer) OnRateLimited(name string, derivContext uint64) {
} }
func (mt *MetricsTracer) OnEmit(name string, ev AnnotatedEvent, derivContext uint64, emitTime time.Time) { func (mt *MetricsTracer) OnEmit(name string, ev AnnotatedEvent, derivContext uint64, emitTime time.Time) {
mt.metrics.RecordEmittedEvent(ev.Event.String()) mt.metrics.RecordEmittedEvent(ev.Event.String(), name)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment