Commit 6adc4546 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #6708 from ethereum-optimism/jg/channel_bank_metric

op-node: Metrics in the channel bank
parents d3f4ec81 5f128658
......@@ -66,6 +66,9 @@ type Metricer interface {
RecordSequencerSealingTime(duration time.Duration)
Document() []metrics.DocumentedMetric
RecordChannelInputBytes(num int)
RecordHeadChannelOpened()
RecordChannelTimedOut()
RecordFrame()
// P2P Metrics
SetPeerScores(allScores []store.PeerScores)
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
......@@ -132,6 +135,11 @@ type Metrics struct {
TransactionsSequencedTotal prometheus.Counter
// Channel Bank Metrics
headChannelOpenedEvent *EventMetrics
channelTimedOutEvent *EventMetrics
frameAddedEvent *EventMetrics
// P2P Metrics
PeerCount prometheus.Gauge
StreamCount prometheus.Gauge
......@@ -363,6 +371,10 @@ func NewMetrics(procName string) *Metrics {
Help: "Count of incoming dial attempts to accept, with label to filter to allowed attempts",
}, []string{"allow"}),
headChannelOpenedEvent: NewEventMetrics(factory, ns, "head_channel", "New channel at the front of the channel bank"),
channelTimedOutEvent: NewEventMetrics(factory, ns, "channel_timeout", "Channel has timed out"),
frameAddedEvent: NewEventMetrics(factory, ns, "frame_added", "New frame ingested in the channel bank"),
ChannelInputBytes: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "channel_input_bytes",
......@@ -700,6 +712,18 @@ func (m *Metrics) RecordChannelInputBytes(inputCompressedBytes int) {
m.ChannelInputBytes.Add(float64(inputCompressedBytes))
}
func (m *Metrics) RecordHeadChannelOpened() {
m.headChannelOpenedEvent.RecordEvent()
}
func (m *Metrics) RecordChannelTimedOut() {
m.channelTimedOutEvent.RecordEvent()
}
func (m *Metrics) RecordFrame() {
m.frameAddedEvent.RecordEvent()
}
func (m *Metrics) RecordPeerUnban() {
m.PeerUnbans.Inc()
}
......@@ -830,6 +854,15 @@ func (n *noopMetricer) PayloadsQuarantineSize(int) {
func (n *noopMetricer) RecordChannelInputBytes(int) {
}
func (n *noopMetricer) RecordHeadChannelOpened() {
}
func (n *noopMetricer) RecordChannelTimedOut() {
}
func (n *noopMetricer) RecordFrame() {
}
func (n *noopMetricer) RecordPeerUnban() {
}
......
......@@ -29,8 +29,9 @@ type NextFrameProvider interface {
// ChannelBank buffers channel frames, and emits full channel data
type ChannelBank struct {
log log.Logger
cfg *rollup.Config
log log.Logger
cfg *rollup.Config
metrics Metrics
channels map[ChannelID]*Channel // channels by ID
channelQueue []ChannelID // channels in FIFO order
......@@ -42,10 +43,11 @@ type ChannelBank struct {
var _ ResettableStage = (*ChannelBank)(nil)
// NewChannelBank creates a ChannelBank, which should be Reset(origin) before use.
func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextFrameProvider, fetcher L1Fetcher) *ChannelBank {
func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextFrameProvider, fetcher L1Fetcher, m Metrics) *ChannelBank {
return &ChannelBank{
log: log,
cfg: cfg,
metrics: m,
channels: make(map[ChannelID]*Channel),
channelQueue: make([]ChannelID, 0, 10),
prev: prev,
......@@ -83,6 +85,10 @@ func (cb *ChannelBank) IngestFrame(f Frame) {
currentCh, ok := cb.channels[f.ID]
if !ok {
// Only record a head channel if it can immediately be active.
if len(cb.channelQueue) == 0 {
cb.metrics.RecordHeadChannelOpened()
}
// create new channel if it doesn't exist yet
currentCh = NewChannel(f.ID, origin)
cb.channels[f.ID] = currentCh
......@@ -101,6 +107,7 @@ func (cb *ChannelBank) IngestFrame(f Frame) {
log.Warn("failed to ingest frame into channel", "err", err)
return
}
cb.metrics.RecordFrame()
// Prune after the frame is loaded.
cb.prune()
......@@ -117,8 +124,13 @@ func (cb *ChannelBank) Read() (data []byte, err error) {
timedOut := ch.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.Origin().Number
if timedOut {
cb.log.Info("channel timed out", "channel", first, "frames", len(ch.inputs))
cb.metrics.RecordChannelTimedOut()
delete(cb.channels, first)
cb.channelQueue = cb.channelQueue[1:]
// There is a new head channel if there is a channel after we have removed the first channel
if len(cb.channelQueue) > 0 {
cb.metrics.RecordHeadChannelOpened()
}
return nil, nil // multiple different channels may all be timed out
}
if !ch.IsReady() {
......@@ -128,6 +140,10 @@ func (cb *ChannelBank) Read() (data []byte, err error) {
delete(cb.channels, first)
cb.channelQueue = cb.channelQueue[1:]
// There is a new head channel if there is a channel after we have removed the first channel
if len(cb.channelQueue) > 0 {
cb.metrics.RecordHeadChannelOpened()
}
r := ch.Reader()
// Suppress error here. io.ReadAll does return nil instead of io.EOF though.
data, _ = io.ReadAll(r)
......
......@@ -8,6 +8,7 @@ import (
"strings"
"testing"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
......@@ -101,7 +102,7 @@ func TestChannelBankSimple(t *testing.T) {
cfg := &rollup.Config{ChannelTimeout: 10}
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil)
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil, metrics.NoopMetrics)
// Load the first frame
out, err := cb.NextData(context.Background())
......@@ -145,7 +146,7 @@ func TestChannelBankInterleaved(t *testing.T) {
cfg := &rollup.Config{ChannelTimeout: 10}
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil)
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil, metrics.NoopMetrics)
// Load a:0
out, err := cb.NextData(context.Background())
......@@ -205,7 +206,7 @@ func TestChannelBankDuplicates(t *testing.T) {
cfg := &rollup.Config{ChannelTimeout: 10}
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil)
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil, metrics.NoopMetrics)
// Load the first frame
out, err := cb.NextData(context.Background())
......
......@@ -18,6 +18,9 @@ type Metrics interface {
RecordL2Ref(name string, ref eth.L2BlockRef)
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
RecordChannelInputBytes(inputCompressedBytes int)
RecordHeadChannelOpened()
RecordChannelTimedOut()
RecordFrame()
}
type L1Fetcher interface {
......@@ -85,7 +88,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) // auxiliary stage for L1Retrieval
l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
frameQueue := NewFrameQueue(log, l1Src)
bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher)
bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher, metrics)
chInReader := NewChannelInReader(log, bank, metrics)
batchQueue := NewBatchQueue(log, cfg, chInReader)
attrBuilder := NewFetchingAttributesBuilder(cfg, l1Fetcher, engine)
......
......@@ -23,6 +23,9 @@ type Metrics interface {
RecordL1Ref(name string, ref eth.L1BlockRef)
RecordL2Ref(name string, ref eth.L2BlockRef)
RecordChannelInputBytes(inputCompressedBytes int)
RecordHeadChannelOpened()
RecordChannelTimedOut()
RecordFrame()
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
......
......@@ -44,6 +44,15 @@ func (t *TestDerivationMetrics) RecordChannelInputBytes(inputCompressedBytes int
}
}
func (t *TestDerivationMetrics) RecordHeadChannelOpened() {
}
func (t *TestDerivationMetrics) RecordChannelTimedOut() {
}
func (t *TestDerivationMetrics) RecordFrame() {
}
type TestRPCMetrics struct{}
func (n *TestRPCMetrics) RecordRPCServerRequest(method string) func() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment