metrics.go 9.77 KB
Newer Older
1 2 3 4 5 6
package metrics

import (
	"context"

	"github.com/ethereum/go-ethereum/common"
7
	"github.com/ethereum/go-ethereum/core/types"
8 9 10 11 12 13 14
	"github.com/ethereum/go-ethereum/ethclient"
	"github.com/ethereum/go-ethereum/log"
	"github.com/prometheus/client_golang/prometheus"

	"github.com/ethereum-optimism/optimism/op-node/eth"
	"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
	opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
15
	txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
16 17 18 19 20 21 22 23 24 25 26
)

const Namespace = "op_batcher"

type Metricer interface {
	RecordInfo(version string)
	RecordUp()

	// Records all L1 and L2 block events
	opmetrics.RefMetricer

27 28 29
	// Record Tx metrics
	txmetrics.TxMetricer

30 31 32 33
	RecordLatestL1Block(l1ref eth.L1BlockRef)
	RecordL2BlocksLoaded(l2ref eth.L2BlockRef)
	RecordChannelOpened(id derive.ChannelID, numPendingBlocks int)
	RecordL2BlocksAdded(l2ref eth.L2BlockRef, numBlocksAdded, numPendingBlocks, inputBytes, outputComprBytes int)
34 35
	RecordL2BlockInPendingQueue(block *types.Block)
	RecordL2BlockInChannel(block *types.Block)
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
	RecordChannelClosed(id derive.ChannelID, numPendingBlocks int, numFrames int, inputBytes int, outputComprBytes int, reason error)
	RecordChannelFullySubmitted(id derive.ChannelID)
	RecordChannelTimedOut(id derive.ChannelID)

	RecordBatchTxSubmitted()
	RecordBatchTxSuccess()
	RecordBatchTxFailed()

	Document() []opmetrics.DocumentedMetric
}

type Metrics struct {
	ns       string
	registry *prometheus.Registry
	factory  opmetrics.Factory

	opmetrics.RefMetrics
53
	txmetrics.TxMetrics
54

55 56
	info prometheus.GaugeVec
	up   prometheus.Gauge
57 58

	// label by openend, closed, fully_submitted, timed_out
59
	channelEvs opmetrics.EventVec
60

61 62 63 64
	pendingBlocksCount        prometheus.GaugeVec
	pendingBlocksBytesTotal   prometheus.Counter
	pendingBlocksBytesCurrent prometheus.Gauge
	blocksAddedCount          prometheus.Gauge
65

66 67 68 69 70 71 72 73
	channelInputBytes       prometheus.GaugeVec
	channelReadyBytes       prometheus.Gauge
	channelOutputBytes      prometheus.Gauge
	channelClosedReason     prometheus.Gauge
	channelNumFrames        prometheus.Gauge
	channelComprRatio       prometheus.Histogram
	channelInputBytesTotal  prometheus.Counter
	channelOutputBytesTotal prometheus.Counter
74

75
	batcherTxEvs opmetrics.EventVec
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
}

var _ Metricer = (*Metrics)(nil)

func NewMetrics(procName string) *Metrics {
	if procName == "" {
		procName = "default"
	}
	ns := Namespace + "_" + procName

	registry := opmetrics.NewRegistry()
	factory := opmetrics.With(registry)

	return &Metrics{
		ns:       ns,
		registry: registry,
		factory:  factory,

		RefMetrics: opmetrics.MakeRefMetrics(ns, factory),
95
		TxMetrics:  txmetrics.MakeTxMetrics(ns, factory),
96

97
		info: *factory.NewGaugeVec(prometheus.GaugeOpts{
98 99 100 101 102 103
			Namespace: ns,
			Name:      "info",
			Help:      "Pseudo-metric tracking version and config info",
		}, []string{
			"version",
		}),
104
		up: factory.NewGauge(prometheus.GaugeOpts{
105 106 107 108 109
			Namespace: ns,
			Name:      "up",
			Help:      "1 if the op-batcher has finished starting up",
		}),

110
		channelEvs: opmetrics.NewEventVec(factory, ns, "", "channel", "Channel", []string{"stage"}),
111

112
		pendingBlocksCount: *factory.NewGaugeVec(prometheus.GaugeOpts{
113 114 115 116
			Namespace: ns,
			Name:      "pending_blocks_count",
			Help:      "Number of pending blocks, not added to a channel yet.",
		}, []string{"stage"}),
117 118 119 120 121 122 123 124 125 126
		pendingBlocksBytesTotal: factory.NewCounter(prometheus.CounterOpts{
			Namespace: ns,
			Name:      "pending_blocks_bytes_total",
			Help:      "Total size of transactions in pending blocks as they are fetched from L2",
		}),
		pendingBlocksBytesCurrent: factory.NewGauge(prometheus.GaugeOpts{
			Namespace: ns,
			Name:      "pending_blocks_bytes_current",
			Help:      "Current size of transactions in the pending (fetched from L2 but not in a channel) stage.",
		}),
127
		blocksAddedCount: factory.NewGauge(prometheus.GaugeOpts{
128 129 130 131 132
			Namespace: ns,
			Name:      "blocks_added_count",
			Help:      "Total number of blocks added to current channel.",
		}),

133
		channelInputBytes: *factory.NewGaugeVec(prometheus.GaugeOpts{
134 135 136 137
			Namespace: ns,
			Name:      "input_bytes",
			Help:      "Number of input bytes to a channel.",
		}, []string{"stage"}),
138
		channelReadyBytes: factory.NewGauge(prometheus.GaugeOpts{
139 140 141 142
			Namespace: ns,
			Name:      "ready_bytes",
			Help:      "Number of bytes ready in the compression buffer.",
		}),
143
		channelOutputBytes: factory.NewGauge(prometheus.GaugeOpts{
144 145 146 147
			Namespace: ns,
			Name:      "output_bytes",
			Help:      "Number of compressed output bytes from a channel.",
		}),
148
		channelClosedReason: factory.NewGauge(prometheus.GaugeOpts{
149 150 151 152
			Namespace: ns,
			Name:      "channel_closed_reason",
			Help:      "Pseudo-metric to record the reason a channel got closed.",
		}),
153
		channelNumFrames: factory.NewGauge(prometheus.GaugeOpts{
154 155 156 157
			Namespace: ns,
			Name:      "channel_num_frames",
			Help:      "Total number of frames of closed channel.",
		}),
158
		channelComprRatio: factory.NewHistogram(prometheus.HistogramOpts{
159 160 161 162 163
			Namespace: ns,
			Name:      "channel_compr_ratio",
			Help:      "Compression ratios of closed channel.",
			Buckets:   append([]float64{0.1, 0.2}, prometheus.LinearBuckets(0.3, 0.05, 14)...),
		}),
164
		channelInputBytesTotal: factory.NewCounter(prometheus.CounterOpts{
inphi's avatar
inphi committed
165 166 167 168
			Namespace: ns,
			Name:      "input_bytes_total",
			Help:      "Total number of bytes to a channel.",
		}),
169
		channelOutputBytesTotal: factory.NewCounter(prometheus.CounterOpts{
170 171 172 173
			Namespace: ns,
			Name:      "output_bytes_total",
			Help:      "Total number of compressed output bytes from a channel.",
		}),
174

175
		batcherTxEvs: opmetrics.NewEventVec(factory, ns, "", "batcher_tx", "BatcherTx", []string{"stage"}),
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
	}
}

func (m *Metrics) Serve(ctx context.Context, host string, port int) error {
	return opmetrics.ListenAndServe(ctx, m.registry, host, port)
}

func (m *Metrics) Document() []opmetrics.DocumentedMetric {
	return m.factory.Document()
}

func (m *Metrics) StartBalanceMetrics(ctx context.Context,
	l log.Logger, client *ethclient.Client, account common.Address) {
	opmetrics.LaunchBalanceMetrics(ctx, l, m.registry, m.ns, client, account)
}

// RecordInfo sets a pseudo-metric that contains versioning and
// config info for the op-batcher.
func (m *Metrics) RecordInfo(version string) {
195
	m.info.WithLabelValues(version).Set(1)
196 197 198 199 200
}

// RecordUp sets the up metric to 1.
func (m *Metrics) RecordUp() {
	prometheus.MustRegister()
201
	m.up.Set(1)
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
}

const (
	StageLoaded         = "loaded"
	StageOpened         = "opened"
	StageAdded          = "added"
	StageClosed         = "closed"
	StageFullySubmitted = "fully_submitted"
	StageTimedOut       = "timed_out"

	TxStageSubmitted = "submitted"
	TxStageSuccess   = "success"
	TxStageFailed    = "failed"
)

func (m *Metrics) RecordLatestL1Block(l1ref eth.L1BlockRef) {
	m.RecordL1Ref("latest", l1ref)
}

221
// RecordL2BlocksLoaded should be called when a new L2 block was loaded into the
222 223 224 225 226 227
// channel manager (but not processed yet).
func (m *Metrics) RecordL2BlocksLoaded(l2ref eth.L2BlockRef) {
	m.RecordL2Ref(StageLoaded, l2ref)
}

func (m *Metrics) RecordChannelOpened(id derive.ChannelID, numPendingBlocks int) {
228 229 230
	m.channelEvs.Record(StageOpened)
	m.blocksAddedCount.Set(0) // reset
	m.pendingBlocksCount.WithLabelValues(StageOpened).Set(float64(numPendingBlocks))
231 232 233 234 235 236
}

// RecordL2BlocksAdded should be called when L2 block were added to the channel
// builder, with the latest added block.
func (m *Metrics) RecordL2BlocksAdded(l2ref eth.L2BlockRef, numBlocksAdded, numPendingBlocks, inputBytes, outputComprBytes int) {
	m.RecordL2Ref(StageAdded, l2ref)
237 238 239 240
	m.blocksAddedCount.Add(float64(numBlocksAdded))
	m.pendingBlocksCount.WithLabelValues(StageAdded).Set(float64(numPendingBlocks))
	m.channelInputBytes.WithLabelValues(StageAdded).Set(float64(inputBytes))
	m.channelReadyBytes.Set(float64(outputComprBytes))
241 242 243
}

func (m *Metrics) RecordChannelClosed(id derive.ChannelID, numPendingBlocks int, numFrames int, inputBytes int, outputComprBytes int, reason error) {
244 245 246 247 248 249 250
	m.channelEvs.Record(StageClosed)
	m.pendingBlocksCount.WithLabelValues(StageClosed).Set(float64(numPendingBlocks))
	m.channelNumFrames.Set(float64(numFrames))
	m.channelInputBytes.WithLabelValues(StageClosed).Set(float64(inputBytes))
	m.channelOutputBytes.Set(float64(outputComprBytes))
	m.channelInputBytesTotal.Add(float64(inputBytes))
	m.channelOutputBytesTotal.Add(float64(outputComprBytes))
251

252 253 254 255
	var comprRatio float64
	if inputBytes > 0 {
		comprRatio = float64(outputComprBytes) / float64(inputBytes)
	}
256
	m.channelComprRatio.Observe(comprRatio)
257

258
	m.channelClosedReason.Set(float64(ClosedReasonToNum(reason)))
259 260
}

261 262 263 264 265 266 267 268 269 270 271 272
func (m *Metrics) RecordL2BlockInPendingQueue(block *types.Block) {
	size := float64(estimateBatchSize(block))
	m.pendingBlocksBytesTotal.Add(size)
	m.pendingBlocksBytesCurrent.Add(size)
}

func (m *Metrics) RecordL2BlockInChannel(block *types.Block) {
	size := float64(estimateBatchSize(block))
	m.pendingBlocksBytesCurrent.Add(-1 * size)
	// Refer to RecordL2BlocksAdded to see the current + count of bytes added to a channel
}

273 274 275 276 277 278
func ClosedReasonToNum(reason error) int {
	// CLI-3640
	return 0
}

func (m *Metrics) RecordChannelFullySubmitted(id derive.ChannelID) {
279
	m.channelEvs.Record(StageFullySubmitted)
280 281 282
}

func (m *Metrics) RecordChannelTimedOut(id derive.ChannelID) {
283
	m.channelEvs.Record(StageTimedOut)
284 285 286
}

func (m *Metrics) RecordBatchTxSubmitted() {
287
	m.batcherTxEvs.Record(TxStageSubmitted)
288 289 290
}

func (m *Metrics) RecordBatchTxSuccess() {
291
	m.batcherTxEvs.Record(TxStageSuccess)
292 293 294
}

func (m *Metrics) RecordBatchTxFailed() {
295
	m.batcherTxEvs.Record(TxStageFailed)
296
}
297 298 299 300 301 302 303 304 305 306 307 308 309 310

// estimateBatchSize estimates the size of the batch
func estimateBatchSize(block *types.Block) uint64 {
	size := uint64(70) // estimated overhead of batch metadata
	for _, tx := range block.Transactions() {
		// Don't include deposit transactions in the batch.
		if tx.IsDepositTx() {
			continue
		}
		// Add 2 for the overhead of encoding the tx bytes in a RLP list
		size += tx.Size() + 2
	}
	return size
}