Commit cbb0bb5d authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #5385 from ethereum-optimism/inphi/chan-in-metrics

op-node: Add channel_input_bytes metric
parents 3be8b488 06bea643
...@@ -64,6 +64,7 @@ type Metricer interface { ...@@ -64,6 +64,7 @@ type Metricer interface {
RecordSequencerBuildingDiffTime(duration time.Duration) RecordSequencerBuildingDiffTime(duration time.Duration)
RecordSequencerSealingTime(duration time.Duration) RecordSequencerSealingTime(duration time.Duration)
Document() []metrics.DocumentedMetric Document() []metrics.DocumentedMetric
RecordChannelInputBytes(num int)
// P2P Metrics // P2P Metrics
SetPeerScores(scores map[string]float64) SetPeerScores(scores map[string]float64)
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
...@@ -131,6 +132,8 @@ type Metrics struct { ...@@ -131,6 +132,8 @@ type Metrics struct {
GossipEventsTotal *prometheus.CounterVec GossipEventsTotal *prometheus.CounterVec
BandwidthTotal *prometheus.GaugeVec BandwidthTotal *prometheus.GaugeVec
ChannelInputBytes prometheus.Counter
registry *prometheus.Registry registry *prometheus.Registry
factory metrics.Factory factory metrics.Factory
} }
...@@ -331,6 +334,12 @@ func NewMetrics(procName string) *Metrics { ...@@ -331,6 +334,12 @@ func NewMetrics(procName string) *Metrics {
"direction", "direction",
}), }),
ChannelInputBytes: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "channel_input_bytes",
Help: "Number of compressed bytes added to the channel",
}),
P2PReqDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{ P2PReqDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns, Namespace: ns,
Subsystem: "p2p", Subsystem: "p2p",
...@@ -635,6 +644,10 @@ func (m *Metrics) PayloadsQuarantineSize(n int) { ...@@ -635,6 +644,10 @@ func (m *Metrics) PayloadsQuarantineSize(n int) {
m.PayloadsQuarantineTotal.Set(float64(n)) m.PayloadsQuarantineTotal.Set(float64(n))
} }
func (m *Metrics) RecordChannelInputBytes(inputCompressedBytes int) {
m.ChannelInputBytes.Add(float64(inputCompressedBytes))
}
type noopMetricer struct{} type noopMetricer struct{}
var NoopMetrics Metricer = new(noopMetricer) var NoopMetrics Metricer = new(noopMetricer)
...@@ -737,3 +750,6 @@ func (n *noopMetricer) ServerPayloadByNumberEvent(num uint64, resultCode byte, d ...@@ -737,3 +750,6 @@ func (n *noopMetricer) ServerPayloadByNumberEvent(num uint64, resultCode byte, d
func (n *noopMetricer) PayloadsQuarantineSize(int) { func (n *noopMetricer) PayloadsQuarantineSize(int) {
} }
func (n *noopMetricer) RecordChannelInputBytes(int) {
}
...@@ -21,15 +21,18 @@ type ChannelInReader struct { ...@@ -21,15 +21,18 @@ type ChannelInReader struct {
nextBatchFn func() (BatchWithL1InclusionBlock, error) nextBatchFn func() (BatchWithL1InclusionBlock, error)
prev *ChannelBank prev *ChannelBank
metrics Metrics
} }
var _ ResetableStage = (*ChannelInReader)(nil) var _ ResetableStage = (*ChannelInReader)(nil)
// NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use. // NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use.
func NewChannelInReader(log log.Logger, prev *ChannelBank) *ChannelInReader { func NewChannelInReader(log log.Logger, prev *ChannelBank, metrics Metrics) *ChannelInReader {
return &ChannelInReader{ return &ChannelInReader{
log: log, log: log,
prev: prev, prev: prev,
metrics: metrics,
} }
} }
...@@ -41,6 +44,7 @@ func (cr *ChannelInReader) Origin() eth.L1BlockRef { ...@@ -41,6 +44,7 @@ func (cr *ChannelInReader) Origin() eth.L1BlockRef {
func (cr *ChannelInReader) WriteChannel(data []byte) error { func (cr *ChannelInReader) WriteChannel(data []byte) error {
if f, err := BatchReader(bytes.NewBuffer(data), cr.Origin()); err == nil { if f, err := BatchReader(bytes.NewBuffer(data), cr.Origin()); err == nil {
cr.nextBatchFn = f cr.nextBatchFn = f
cr.metrics.RecordChannelInputBytes(len(data))
return nil return nil
} else { } else {
cr.log.Error("Error creating batch reader from channel data", "err", err) cr.log.Error("Error creating batch reader from channel data", "err", err)
......
...@@ -15,6 +15,7 @@ type Metrics interface { ...@@ -15,6 +15,7 @@ type Metrics interface {
RecordL1Ref(name string, ref eth.L1BlockRef) RecordL1Ref(name string, ref eth.L1BlockRef)
RecordL2Ref(name string, ref eth.L2BlockRef) RecordL2Ref(name string, ref eth.L2BlockRef)
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID) RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
RecordChannelInputBytes(inputCompresedBytes int)
} }
type L1Fetcher interface { type L1Fetcher interface {
...@@ -82,7 +83,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch ...@@ -82,7 +83,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
l1Src := NewL1Retrieval(log, dataSrc, l1Traversal) l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
frameQueue := NewFrameQueue(log, l1Src) frameQueue := NewFrameQueue(log, l1Src)
bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher) bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher)
chInReader := NewChannelInReader(log, bank) chInReader := NewChannelInReader(log, bank, metrics)
batchQueue := NewBatchQueue(log, cfg, chInReader) batchQueue := NewBatchQueue(log, cfg, chInReader)
attrBuilder := NewFetchingAttributesBuilder(cfg, l1Fetcher, engine) attrBuilder := NewFetchingAttributesBuilder(cfg, l1Fetcher, engine)
attributesQueue := NewAttributesQueue(log, cfg, attrBuilder, batchQueue) attributesQueue := NewAttributesQueue(log, cfg, attrBuilder, batchQueue)
......
...@@ -21,6 +21,7 @@ type Metrics interface { ...@@ -21,6 +21,7 @@ type Metrics interface {
RecordL1Ref(name string, ref eth.L1BlockRef) RecordL1Ref(name string, ref eth.L1BlockRef)
RecordL2Ref(name string, ref eth.L2BlockRef) RecordL2Ref(name string, ref eth.L2BlockRef)
RecordChannelInputBytes(inputCompresedBytes int)
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID) RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
......
package testutils package testutils
import "github.com/ethereum-optimism/optimism/op-node/eth" import (
"github.com/ethereum-optimism/optimism/op-node/eth"
)
// TestDerivationMetrics implements the metrics used in the derivation pipeline as no-op operations. // TestDerivationMetrics implements the metrics used in the derivation pipeline as no-op operations.
// Optionally a test may hook into the metrics // Optionally a test may hook into the metrics
...@@ -9,6 +11,7 @@ type TestDerivationMetrics struct { ...@@ -9,6 +11,7 @@ type TestDerivationMetrics struct {
FnRecordL1Ref func(name string, ref eth.L1BlockRef) FnRecordL1Ref func(name string, ref eth.L1BlockRef)
FnRecordL2Ref func(name string, ref eth.L2BlockRef) FnRecordL2Ref func(name string, ref eth.L2BlockRef)
FnRecordUnsafePayloads func(length uint64, memSize uint64, next eth.BlockID) FnRecordUnsafePayloads func(length uint64, memSize uint64, next eth.BlockID)
FnRecordChannelInputBytes func(inputCompresedBytes int)
} }
func (t *TestDerivationMetrics) RecordL1ReorgDepth(d uint64) { func (t *TestDerivationMetrics) RecordL1ReorgDepth(d uint64) {
...@@ -35,6 +38,12 @@ func (t *TestDerivationMetrics) RecordUnsafePayloadsBuffer(length uint64, memSiz ...@@ -35,6 +38,12 @@ func (t *TestDerivationMetrics) RecordUnsafePayloadsBuffer(length uint64, memSiz
} }
} }
func (t *TestDerivationMetrics) RecordChannelInputBytes(inputCompresedBytes int) {
if t.FnRecordChannelInputBytes != nil {
t.FnRecordChannelInputBytes(inputCompresedBytes)
}
}
type TestRPCMetrics struct{} type TestRPCMetrics struct{}
func (n *TestRPCMetrics) RecordRPCServerRequest(method string) func() { func (n *TestRPCMetrics) RecordRPCServerRequest(method string) func() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment