Commit b3bb812c authored by Zach Howard's avatar Zach Howard Committed by GitHub

op-conductor: adds metrics for actions, healthchecks, execution time (#10495)

parent 60faf8b5
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
"github.com/ethereum-optimism/optimism/op-conductor/client" "github.com/ethereum-optimism/optimism/op-conductor/client"
"github.com/ethereum-optimism/optimism/op-conductor/consensus" "github.com/ethereum-optimism/optimism/op-conductor/consensus"
"github.com/ethereum-optimism/optimism/op-conductor/health" "github.com/ethereum-optimism/optimism/op-conductor/health"
"github.com/ethereum-optimism/optimism/op-conductor/metrics"
conductorrpc "github.com/ethereum-optimism/optimism/op-conductor/rpc" conductorrpc "github.com/ethereum-optimism/optimism/op-conductor/rpc"
opp2p "github.com/ethereum-optimism/optimism/op-node/p2p" opp2p "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
...@@ -25,6 +26,8 @@ import ( ...@@ -25,6 +26,8 @@ import (
opclient "github.com/ethereum-optimism/optimism/op-service/client" opclient "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
) )
...@@ -38,7 +41,7 @@ var ( ...@@ -38,7 +41,7 @@ var (
// New creates a new OpConductor instance. // New creates a new OpConductor instance.
func New(ctx context.Context, cfg *Config, log log.Logger, version string) (*OpConductor, error) { func New(ctx context.Context, cfg *Config, log log.Logger, version string) (*OpConductor, error) {
return NewOpConductor(ctx, cfg, log, version, nil, nil, nil) return NewOpConductor(ctx, cfg, log, metrics.NewMetrics(), version, nil, nil, nil)
} }
// NewOpConductor creates a new OpConductor instance. // NewOpConductor creates a new OpConductor instance.
...@@ -46,6 +49,7 @@ func NewOpConductor( ...@@ -46,6 +49,7 @@ func NewOpConductor(
ctx context.Context, ctx context.Context,
cfg *Config, cfg *Config,
log log.Logger, log log.Logger,
m metrics.Metricer,
version string, version string,
ctrl client.SequencerControl, ctrl client.SequencerControl,
cons consensus.Consensus, cons consensus.Consensus,
...@@ -59,6 +63,7 @@ func NewOpConductor( ...@@ -59,6 +63,7 @@ func NewOpConductor(
log: log, log: log,
version: version, version: version,
cfg: cfg, cfg: cfg,
metrics: m,
pauseCh: make(chan struct{}), pauseCh: make(chan struct{}),
pauseDoneCh: make(chan struct{}), pauseDoneCh: make(chan struct{}),
resumeCh: make(chan struct{}), resumeCh: make(chan struct{}),
...@@ -172,6 +177,7 @@ func (c *OpConductor) initHealthMonitor(ctx context.Context) error { ...@@ -172,6 +177,7 @@ func (c *OpConductor) initHealthMonitor(ctx context.Context) error {
c.hmon = health.NewSequencerHealthMonitor( c.hmon = health.NewSequencerHealthMonitor(
c.log, c.log,
c.metrics,
c.cfg.HealthCheck.Interval, c.cfg.HealthCheck.Interval,
c.cfg.HealthCheck.UnsafeInterval, c.cfg.HealthCheck.UnsafeInterval,
c.cfg.HealthCheck.SafeInterval, c.cfg.HealthCheck.SafeInterval,
...@@ -245,6 +251,7 @@ type OpConductor struct { ...@@ -245,6 +251,7 @@ type OpConductor struct {
log log.Logger log log.Logger
version string version string
cfg *Config cfg *Config
metrics metrics.Metricer
ctrl client.SequencerControl ctrl client.SequencerControl
cons consensus.Consensus cons consensus.Consensus
...@@ -271,7 +278,8 @@ type OpConductor struct { ...@@ -271,7 +278,8 @@ type OpConductor struct {
shutdownCtx context.Context shutdownCtx context.Context
shutdownCancel context.CancelFunc shutdownCancel context.CancelFunc
rpcServer *oprpc.Server rpcServer *oprpc.Server
metricsServer *httputil.HTTPServer
} }
type state struct { type state struct {
...@@ -310,9 +318,25 @@ func (oc *OpConductor) Start(ctx context.Context) error { ...@@ -310,9 +318,25 @@ func (oc *OpConductor) Start(ctx context.Context) error {
return errors.Wrap(err, "failed to start JSON-RPC server") return errors.Wrap(err, "failed to start JSON-RPC server")
} }
if oc.cfg.MetricsConfig.Enabled {
oc.log.Info("starting metrics server")
m, ok := oc.metrics.(opmetrics.RegistryMetricer)
if !ok {
return fmt.Errorf("metrics were enabled, but metricer %T does not expose registry for metrics-server", oc.metrics)
}
metricsServer, err := opmetrics.StartServer(m.Registry(), oc.cfg.MetricsConfig.ListenAddr, oc.cfg.MetricsConfig.ListenPort)
if err != nil {
return errors.Wrap(err, "failed to start metrics server")
}
oc.metricsServer = metricsServer
}
oc.wg.Add(1) oc.wg.Add(1)
go oc.loop() go oc.loop()
oc.metrics.RecordInfo(oc.version)
oc.metrics.RecordUp()
oc.log.Info("OpConductor started") oc.log.Info("OpConductor started")
return nil return nil
} }
...@@ -350,6 +374,12 @@ func (oc *OpConductor) Stop(ctx context.Context) error { ...@@ -350,6 +374,12 @@ func (oc *OpConductor) Stop(ctx context.Context) error {
} }
} }
if oc.metricsServer != nil {
if err := oc.metricsServer.Shutdown(ctx); err != nil {
result = multierror.Append(result, errors.Wrap(err, "failed to stop metrics server"))
}
}
if result.ErrorOrNil() != nil { if result.ErrorOrNil() != nil {
oc.log.Error("failed to stop OpConductor", "err", result.ErrorOrNil()) oc.log.Error("failed to stop OpConductor", "err", result.ErrorOrNil())
return result.ErrorOrNil() return result.ErrorOrNil()
...@@ -465,12 +495,14 @@ func (oc *OpConductor) loop() { ...@@ -465,12 +495,14 @@ func (oc *OpConductor) loop() {
defer oc.wg.Done() defer oc.wg.Done()
for { for {
startTime := time.Now()
select { select {
case <-oc.shutdownCtx.Done(): case <-oc.shutdownCtx.Done():
return return
default: default:
oc.loopActionFn() oc.loopActionFn()
} }
oc.metrics.RecordLoopExecutionTime(time.Since(startTime).Seconds())
} }
} }
...@@ -619,6 +651,7 @@ func (oc *OpConductor) action() { ...@@ -619,6 +651,7 @@ func (oc *OpConductor) action() {
if !status.Equal(oc.prevState) { if !status.Equal(oc.prevState) {
oc.log.Info("state changed", "prev_state", oc.prevState, "new_state", status) oc.log.Info("state changed", "prev_state", oc.prevState, "new_state", status)
oc.prevState = status oc.prevState = status
oc.metrics.RecordStateChange(status.leader, status.healthy, status.active)
} }
} }
...@@ -627,6 +660,7 @@ func (oc *OpConductor) transferLeader() error { ...@@ -627,6 +660,7 @@ func (oc *OpConductor) transferLeader() error {
// TransferLeader here will do round robin to try to transfer leadership to the next healthy node. // TransferLeader here will do round robin to try to transfer leadership to the next healthy node.
oc.log.Info("transferring leadership", "server", oc.cons.ServerID()) oc.log.Info("transferring leadership", "server", oc.cons.ServerID())
err := oc.cons.TransferLeader() err := oc.cons.TransferLeader()
oc.metrics.RecordLeaderTransfer(err == nil)
if err == nil { if err == nil {
oc.leader.Store(false) oc.leader.Store(false)
return nil // success return nil // success
...@@ -654,6 +688,7 @@ func (oc *OpConductor) stopSequencer() error { ...@@ -654,6 +688,7 @@ func (oc *OpConductor) stopSequencer() error {
return errors.Wrap(err, "failed to stop sequencer") return errors.Wrap(err, "failed to stop sequencer")
} }
} }
oc.metrics.RecordStopSequencer(err == nil)
oc.seqActive.Store(false) oc.seqActive.Store(false)
return nil return nil
...@@ -684,7 +719,8 @@ func (oc *OpConductor) startSequencer() error { ...@@ -684,7 +719,8 @@ func (oc *OpConductor) startSequencer() error {
} }
oc.log.Info("starting sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load()) oc.log.Info("starting sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load())
if err = oc.ctrl.StartSequencer(ctx, unsafeInCons.ExecutionPayload.BlockHash); err != nil { err = oc.ctrl.StartSequencer(ctx, unsafeInCons.ExecutionPayload.BlockHash)
if err != nil {
// cannot directly compare using Errors.Is because the error is returned from an JSON RPC server which lost its type. // cannot directly compare using Errors.Is because the error is returned from an JSON RPC server which lost its type.
if !strings.Contains(err.Error(), driver.ErrSequencerAlreadyStarted.Error()) { if !strings.Contains(err.Error(), driver.ErrSequencerAlreadyStarted.Error()) {
return fmt.Errorf("failed to start sequencer: %w", err) return fmt.Errorf("failed to start sequencer: %w", err)
...@@ -692,6 +728,7 @@ func (oc *OpConductor) startSequencer() error { ...@@ -692,6 +728,7 @@ func (oc *OpConductor) startSequencer() error {
oc.log.Warn("sequencer already started.", "err", err) oc.log.Warn("sequencer already started.", "err", err)
} }
} }
oc.metrics.RecordStartSequencer(err == nil)
oc.seqActive.Store(true) oc.seqActive.Store(true)
return nil return nil
......
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
consensusmocks "github.com/ethereum-optimism/optimism/op-conductor/consensus/mocks" consensusmocks "github.com/ethereum-optimism/optimism/op-conductor/consensus/mocks"
"github.com/ethereum-optimism/optimism/op-conductor/health" "github.com/ethereum-optimism/optimism/op-conductor/health"
healthmocks "github.com/ethereum-optimism/optimism/op-conductor/health/mocks" healthmocks "github.com/ethereum-optimism/optimism/op-conductor/health/mocks"
"github.com/ethereum-optimism/optimism/op-conductor/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
...@@ -88,6 +89,7 @@ type OpConductorTestSuite struct { ...@@ -88,6 +89,7 @@ type OpConductorTestSuite struct {
err error err error
log log.Logger log log.Logger
cfg Config cfg Config
metrics metrics.Metricer
version string version string
ctrl *clientmocks.SequencerControl ctrl *clientmocks.SequencerControl
cons *consensusmocks.Consensus cons *consensusmocks.Consensus
...@@ -101,6 +103,7 @@ type OpConductorTestSuite struct { ...@@ -101,6 +103,7 @@ type OpConductorTestSuite struct {
func (s *OpConductorTestSuite) SetupSuite() { func (s *OpConductorTestSuite) SetupSuite() {
s.ctx = context.Background() s.ctx = context.Background()
s.log = testlog.Logger(s.T(), log.LevelDebug) s.log = testlog.Logger(s.T(), log.LevelDebug)
s.metrics = &metrics.NoopMetricsImpl{}
s.cfg = mockConfig(s.T()) s.cfg = mockConfig(s.T())
s.version = "v0.0.1" s.version = "v0.0.1"
s.next = make(chan struct{}, 1) s.next = make(chan struct{}, 1)
...@@ -113,7 +116,7 @@ func (s *OpConductorTestSuite) SetupTest() { ...@@ -113,7 +116,7 @@ func (s *OpConductorTestSuite) SetupTest() {
s.hmon = &healthmocks.HealthMonitor{} s.hmon = &healthmocks.HealthMonitor{}
s.cons.EXPECT().ServerID().Return("SequencerA") s.cons.EXPECT().ServerID().Return("SequencerA")
conductor, err := NewOpConductor(s.ctx, &s.cfg, s.log, s.version, s.ctrl, s.cons, s.hmon) conductor, err := NewOpConductor(s.ctx, &s.cfg, s.log, s.metrics, s.version, s.ctrl, s.cons, s.hmon)
s.NoError(err) s.NoError(err)
s.conductor = conductor s.conductor = conductor
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-conductor/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/dial"
...@@ -34,9 +35,10 @@ type HealthMonitor interface { ...@@ -34,9 +35,10 @@ type HealthMonitor interface {
// interval is the interval between health checks measured in seconds. // interval is the interval between health checks measured in seconds.
// safeInterval is the interval between safe head progress measured in seconds. // safeInterval is the interval between safe head progress measured in seconds.
// minPeerCount is the minimum number of peers required for the sequencer to be healthy. // minPeerCount is the minimum number of peers required for the sequencer to be healthy.
func NewSequencerHealthMonitor(log log.Logger, interval, unsafeInterval, safeInterval, minPeerCount uint64, safeEnabled bool, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p p2p.API) HealthMonitor { func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interval, unsafeInterval, safeInterval, minPeerCount uint64, safeEnabled bool, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p p2p.API) HealthMonitor {
return &SequencerHealthMonitor{ return &SequencerHealthMonitor{
log: log, log: log,
metrics: metrics,
done: make(chan struct{}), done: make(chan struct{}),
interval: interval, interval: interval,
healthUpdateCh: make(chan error), healthUpdateCh: make(chan error),
...@@ -53,9 +55,10 @@ func NewSequencerHealthMonitor(log log.Logger, interval, unsafeInterval, safeInt ...@@ -53,9 +55,10 @@ func NewSequencerHealthMonitor(log log.Logger, interval, unsafeInterval, safeInt
// SequencerHealthMonitor monitors sequencer health. // SequencerHealthMonitor monitors sequencer health.
type SequencerHealthMonitor struct { type SequencerHealthMonitor struct {
log log.Logger log log.Logger
done chan struct{} metrics metrics.Metricer
wg sync.WaitGroup done chan struct{}
wg sync.WaitGroup
rollupCfg *rollup.Config rollupCfg *rollup.Config
unsafeInterval uint64 unsafeInterval uint64
...@@ -112,7 +115,9 @@ func (hm *SequencerHealthMonitor) loop() { ...@@ -112,7 +115,9 @@ func (hm *SequencerHealthMonitor) loop() {
case <-hm.done: case <-hm.done:
return return
case <-ticker.C: case <-ticker.C:
hm.healthUpdateCh <- hm.healthCheck() err := hm.healthCheck()
hm.metrics.RecordHealthCheck(err == nil, err)
hm.healthUpdateCh <- err
} }
} }
} }
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/ethereum-optimism/optimism/op-conductor/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks" p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -58,6 +59,7 @@ func (s *HealthMonitorTestSuite) SetupMonitor( ...@@ -58,6 +59,7 @@ func (s *HealthMonitorTestSuite) SetupMonitor(
log: s.log, log: s.log,
done: make(chan struct{}), done: make(chan struct{}),
interval: s.interval, interval: s.interval,
metrics: &metrics.NoopMetricsImpl{},
healthUpdateCh: make(chan error), healthUpdateCh: make(chan error),
rollupCfg: s.rollupCfg, rollupCfg: s.rollupCfg,
unsafeInterval: unsafeInterval, unsafeInterval: unsafeInterval,
......
package metrics
import (
"strconv"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/prometheus/client_golang/prometheus"
)
const Namespace = "op_conductor"
type Metricer interface {
RecordInfo(version string)
RecordUp()
RecordStateChange(leader bool, healthy bool, active bool)
RecordLeaderTransfer(success bool)
RecordStartSequencer(success bool)
RecordStopSequencer(success bool)
RecordHealthCheck(success bool, err error)
RecordLoopExecutionTime(duration float64)
}
// Metrics implementation must implement RegistryMetricer to allow the metrics server to work.
var _ opmetrics.RegistryMetricer = (*Metrics)(nil)
type Metrics struct {
ns string
registry *prometheus.Registry
factory opmetrics.Factory
info prometheus.GaugeVec
up prometheus.Gauge
healthChecks *prometheus.CounterVec
leaderTransfers *prometheus.CounterVec
sequencerStarts *prometheus.CounterVec
sequencerStops *prometheus.CounterVec
stateChanges *prometheus.CounterVec
loopExecutionTime prometheus.Histogram
}
func (m *Metrics) Registry() *prometheus.Registry {
return m.registry
}
var _ Metricer = (*Metrics)(nil)
func NewMetrics() *Metrics {
registry := opmetrics.NewRegistry()
factory := opmetrics.With(registry)
return &Metrics{
ns: Namespace,
registry: registry,
factory: factory,
info: *factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: Namespace,
Name: "info",
Help: "Pseudo-metric tracking version and config info",
}, []string{
"version",
}),
up: factory.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Name: "up",
Help: "1 if the op-conductor has finished starting up",
}),
healthChecks: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Name: "healthchecks_count",
Help: "Number of healthchecks",
}, []string{"success", "error"}),
leaderTransfers: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Name: "leader_transfers_count",
Help: "Number of leader transfers",
}, []string{"success"}),
sequencerStarts: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Name: "sequencer_starts_count",
Help: "Number of sequencer starts",
}, []string{"success"}),
sequencerStops: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Name: "sequencer_stops_count",
Help: "Number of sequencer stops",
}, []string{"success"}),
stateChanges: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Name: "state_changes_count",
Help: "Number of state changes",
}, []string{
"leader",
"healthy",
"active",
}),
loopExecutionTime: factory.NewHistogram(prometheus.HistogramOpts{
Namespace: Namespace,
Name: "loop_execution_time",
Help: "Time (in seconds) to execute conductor loop iteration",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
}),
}
}
func (m *Metrics) Start(host string, port int) (*httputil.HTTPServer, error) {
return opmetrics.StartServer(m.registry, host, port)
}
// RecordInfo sets a pseudo-metric that contains versioning and
// config info for the op-proposer.
func (m *Metrics) RecordInfo(version string) {
m.info.WithLabelValues(version).Set(1)
}
// RecordUp sets the up metric to 1.
func (m *Metrics) RecordUp() {
prometheus.MustRegister()
m.up.Set(1)
}
// RecordHealthCheck increments the healthChecks counter.
func (m *Metrics) RecordHealthCheck(success bool, err error) {
errStr := ""
if err != nil {
errStr = err.Error()
}
m.healthChecks.WithLabelValues(strconv.FormatBool(success), errStr).Inc()
}
// RecordLeaderTransfer increments the leaderTransfers counter.
func (m *Metrics) RecordLeaderTransfer(success bool) {
m.leaderTransfers.WithLabelValues(strconv.FormatBool(success)).Inc()
}
// RecordStateChange increments the stateChanges counter.
func (m *Metrics) RecordStateChange(leader bool, healthy bool, active bool) {
m.stateChanges.WithLabelValues(strconv.FormatBool(leader), strconv.FormatBool(healthy), strconv.FormatBool(active)).Inc()
}
// RecordStartSequencer increments the sequencerStarts counter.
func (m *Metrics) RecordStartSequencer(success bool) {
m.sequencerStarts.WithLabelValues(strconv.FormatBool(success)).Inc()
}
// RecordStopSequencer increments the sequencerStops counter.
func (m *Metrics) RecordStopSequencer(success bool) {
m.sequencerStops.WithLabelValues(strconv.FormatBool(success)).Inc()
}
// RecordLoopExecutionTime records the time it took to execute the conductor loop.
func (m *Metrics) RecordLoopExecutionTime(duration float64) {
m.loopExecutionTime.Observe(duration)
}
package metrics
type NoopMetricsImpl struct{}
var NoopMetrics Metricer = new(NoopMetricsImpl)
func (*NoopMetricsImpl) RecordInfo(version string) {}
func (*NoopMetricsImpl) RecordUp() {}
func (*NoopMetricsImpl) RecordStateChange(leader bool, healthy bool, active bool) {}
func (*NoopMetricsImpl) RecordLeaderTransfer(success bool) {}
func (*NoopMetricsImpl) RecordStartSequencer(success bool) {}
func (*NoopMetricsImpl) RecordStopSequencer(success bool) {}
func (*NoopMetricsImpl) RecordHealthCheck(success bool, err error) {}
func (*NoopMetricsImpl) RecordLoopExecutionTime(duration float64) {}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment