Commit c3efa312 authored by Hamdi Allam's avatar Hamdi Allam Committed by GitHub

Merge pull request #7858 from ethereum-optimism/10-25-indexer.metric.prefix

fix(indexer) `op_indexer_` MetricsNamespace prefix + metric updates
parents babe5e7d 6600e979
......@@ -67,14 +67,18 @@ func (etl *ETL) Start(ctx context.Context) error {
if len(headers) > 0 {
etl.log.Info("retrying previous batch")
} else {
newHeaders, err := etl.headerTraversal.NextFinalizedHeaders(etl.headerBufferSize)
newHeaders, err := etl.headerTraversal.NextHeaders(etl.headerBufferSize)
if err != nil {
etl.log.Error("error querying for headers", "err", err)
} else if len(newHeaders) == 0 {
etl.log.Warn("no new headers. processor unexpectedly at head...")
etl.log.Warn("no new headers. etl at head?")
} else {
headers = newHeaders
etl.metrics.RecordBatchHeaders(len(newHeaders))
}
latestHeader := etl.headerTraversal.LatestHeader()
if latestHeader != nil {
etl.metrics.RecordLatestHeight(latestHeader.Number)
}
}
......@@ -98,7 +102,6 @@ func (etl *ETL) processBatch(headers []types.Header) error {
batchLog := etl.log.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
batchLog.Info("extracting batch", "size", len(headers))
etl.metrics.RecordBatchLatestHeight(lastHeader.Number)
headerMap := make(map[common.Hash]*types.Header, len(headers))
for i := range headers {
header := headers[i]
......@@ -128,6 +131,7 @@ func (etl *ETL) processBatch(headers []types.Header) error {
for i := range logs.Logs {
log := logs.Logs[i]
headersWithLog[log.BlockHash] = true
if _, ok := headerMap[log.BlockHash]; !ok {
// NOTE. Definitely an error state if the none of the headers were re-orged out in between
// the blocks and logs retrieval operations. Unlikely as long as the confirmation depth has
......@@ -135,9 +139,6 @@ func (etl *ETL) processBatch(headers []types.Header) error {
batchLog.Error("log found with block hash not in the batch", "block_hash", logs.Logs[i].BlockHash, "log_index", logs.Logs[i].Index)
return errors.New("parsed log with a block hash not in the batch")
}
etl.metrics.RecordBatchLog(log.Address)
headersWithLog[log.BlockHash] = true
}
// ensure we use unique downstream references for the etl batch
......
......@@ -108,6 +108,7 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
l1BlockHeaders = append(l1BlockHeaders, database.L1BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])})
}
}
if len(l1BlockHeaders) == 0 {
batch.Logger.Info("no l1 blocks with logs in batch")
continue
......@@ -117,6 +118,7 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
l1ContractEvents[i] = database.L1ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
l1Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
}
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
......@@ -138,7 +140,6 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders))
l1Etl.ETL.metrics.RecordIndexedLatestHeight(l1BlockHeaders[len(l1BlockHeaders)-1].Number)
l1Etl.ETL.metrics.RecordIndexedLogs(len(l1ContractEvents))
// a-ok!
return nil, nil
......
......@@ -62,7 +62,7 @@ func TestL1ETLConstruction(t *testing.T) {
},
assertion: func(etl *L1ETL, err error) {
require.NoError(t, err)
require.Equal(t, etl.headerTraversal.LastHeader().ParentHash, common.HexToHash("0x69"))
require.Equal(t, etl.headerTraversal.LastTraversedHeader().ParentHash, common.HexToHash("0x69"))
},
},
{
......@@ -94,7 +94,7 @@ func TestL1ETLConstruction(t *testing.T) {
},
assertion: func(etl *L1ETL, err error) {
require.NoError(t, err)
header := etl.headerTraversal.LastHeader()
header := etl.headerTraversal.LastTraversedHeader()
require.True(t, header.Number.Cmp(big.NewInt(69)) == 0)
},
......
......@@ -93,6 +93,7 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
l2ContractEvents[i] = database.L2ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
l2Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
}
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
......@@ -115,9 +116,6 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders))
l2Etl.ETL.metrics.RecordIndexedLatestHeight(l2BlockHeaders[len(l2BlockHeaders)-1].Number)
if len(l2ContractEvents) > 0 {
l2Etl.ETL.metrics.RecordIndexedLogs(len(l2ContractEvents))
}
// a-ok!
return nil, nil
......
......@@ -9,35 +9,28 @@ import (
)
var (
MetricsNamespace string = "etl"
MetricsNamespace string = "op_indexer_etl"
)
type Metricer interface {
RecordInterval() (done func(err error))
// Batch Extraction
RecordBatchLatestHeight(height *big.Int)
RecordBatchHeaders(size int)
RecordBatchLog(contractAddress common.Address)
RecordLatestHeight(height *big.Int)
// Indexed Batches
RecordIndexedLatestHeight(height *big.Int)
RecordIndexedHeaders(size int)
RecordIndexedLogs(size int)
RecordIndexedLog(contractAddress common.Address)
}
type etlMetrics struct {
intervalTick prometheus.Counter
intervalDuration prometheus.Histogram
batchFailures prometheus.Counter
batchLatestHeight prometheus.Gauge
batchHeaders prometheus.Counter
batchLogs *prometheus.CounterVec
intervalFailures prometheus.Counter
latestHeight prometheus.Gauge
indexedLatestHeight prometheus.Gauge
indexedHeaders prometheus.Counter
indexedLogs prometheus.Counter
indexedLogs *prometheus.CounterVec
}
func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
......@@ -55,31 +48,17 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
Name: "interval_seconds",
Help: "duration elapsed for during the processing loop",
}),
batchFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "failures_total",
Help: "number of times the etl encountered a failure to extract a batch",
}),
batchLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
intervalFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "height",
Help: "the latest block height observed by an etl interval",
Name: "interval_failures_total",
Help: "number of times the etl encountered a failure during the processing loop",
}),
batchHeaders: factory.NewCounter(prometheus.CounterOpts{
latestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "headers_total",
Help: "number of headers observed by the etl",
}),
batchLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "logs_total",
Help: "number of logs observed by the etl",
}, []string{
"contract",
Name: "latest_height",
Help: "the latest height reported by the connected client",
}),
indexedLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
......@@ -93,11 +72,13 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
Name: "indexed_headers_total",
Help: "number of headers indexed by the etl",
}),
indexedLogs: factory.NewCounter(prometheus.CounterOpts{
indexedLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "indexed_logs_total",
Help: "number of logs indexed by the etl",
}, []string{
"contract",
}),
}
}
......@@ -107,23 +88,14 @@ func (m *etlMetrics) RecordInterval() func(error) {
timer := prometheus.NewTimer(m.intervalDuration)
return func(err error) {
if err != nil {
m.batchFailures.Inc()
m.intervalFailures.Inc()
}
timer.ObserveDuration()
}
}
func (m *etlMetrics) RecordBatchLatestHeight(height *big.Int) {
m.batchLatestHeight.Set(float64(height.Uint64()))
}
func (m *etlMetrics) RecordBatchHeaders(size int) {
m.batchHeaders.Add(float64(size))
}
func (m *etlMetrics) RecordBatchLog(contractAddress common.Address) {
m.batchLogs.WithLabelValues(contractAddress.String()).Inc()
func (m *etlMetrics) RecordLatestHeight(height *big.Int) {
m.latestHeight.Set(float64(height.Uint64()))
}
func (m *etlMetrics) RecordIndexedLatestHeight(height *big.Int) {
......@@ -134,6 +106,6 @@ func (m *etlMetrics) RecordIndexedHeaders(size int) {
m.indexedHeaders.Add(float64(size))
}
func (m *etlMetrics) RecordIndexedLogs(size int) {
m.indexedLogs.Add(float64(size))
func (m *etlMetrics) RecordIndexedLog(addr common.Address) {
m.indexedLogs.WithLabelValues(addr.String()).Inc()
}
......@@ -17,38 +17,56 @@ var (
type HeaderTraversal struct {
ethClient EthClient
lastHeader *types.Header
latestHeader *types.Header
lastTraversedHeader *types.Header
blockConfirmationDepth *big.Int
}
// NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client.
// The HeaderTraversal will start fetching blocks starting from the supplied header unless nil, indicating genesis.
func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header, confDepth *big.Int) *HeaderTraversal {
return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader, blockConfirmationDepth: confDepth}
return &HeaderTraversal{
ethClient: ethClient,
lastTraversedHeader: fromHeader,
blockConfirmationDepth: confDepth,
}
}
// LatestHeader returns the latest header reported by underlying eth client
// as headers are traversed via `NextHeaders`.
func (f *HeaderTraversal) LatestHeader() *types.Header {
return f.latestHeader
}
// LastHeader returns the last header that was fetched by the HeaderTraversal
// This is useful for testing the state of the HeaderTraversal
func (f *HeaderTraversal) LastHeader() *types.Header {
return f.lastHeader
// LastTraversedHeader returns the last header traversed.
// - This is useful for testing the state of the HeaderTraversal
// - LastTraversedHeader may be << LatestHeader depending on the number
// headers traversed via `NextHeaders`.
func (f *HeaderTraversal) LastTraversedHeader() *types.Header {
return f.lastTraversedHeader
}
// NextFinalizedHeaders retrieves the next set of headers that have been
// NextHeaders retrieves the next set of headers that have been
// marked as finalized by the connected client, bounded by the supplied size
func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) {
latestBlockHeader, err := f.ethClient.BlockHeaderByNumber(nil)
func (f *HeaderTraversal) NextHeaders(maxSize uint64) ([]types.Header, error) {
latestHeader, err := f.ethClient.BlockHeaderByNumber(nil)
if err != nil {
return nil, fmt.Errorf("unable to query latest block: %w", err)
} else if latestHeader == nil {
return nil, fmt.Errorf("latest header unreported")
} else {
f.latestHeader = latestHeader
}
endHeight := new(big.Int).Sub(latestBlockHeader.Number, f.blockConfirmationDepth)
endHeight := new(big.Int).Sub(latestHeader.Number, f.blockConfirmationDepth)
if endHeight.Sign() < 0 {
// No blocks with the provided confirmation depth available
return nil, nil
}
if f.lastHeader != nil {
cmp := f.lastHeader.Number.Cmp(endHeight)
if f.lastTraversedHeader != nil {
cmp := f.lastTraversedHeader.Number.Cmp(endHeight)
if cmp == 0 { // We're synced to head and there are no new headers
return nil, nil
} else if cmp > 0 {
......@@ -57,8 +75,8 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
}
nextHeight := bigint.Zero
if f.lastHeader != nil {
nextHeight = new(big.Int).Add(f.lastHeader.Number, bigint.One)
if f.lastTraversedHeader != nil {
nextHeight = new(big.Int).Add(f.lastTraversedHeader.Number, bigint.One)
}
// endHeight = (nextHeight - endHeight) <= maxSize
......@@ -71,12 +89,12 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
numHeaders := len(headers)
if numHeaders == 0 {
return nil, nil
} else if f.lastHeader != nil && headers[0].ParentHash != f.lastHeader.Hash() {
} else if f.lastTraversedHeader != nil && headers[0].ParentHash != f.lastTraversedHeader.Hash() {
// The indexer's state is in an irrecoverable state relative to the provider. This
// should never happen since the indexer is dealing with only finalized blocks.
return nil, ErrHeaderTraversalAndProviderMismatchedState
}
f.lastHeader = &headers[numHeaders-1]
f.lastTraversedHeader = &headers[numHeaders-1]
return headers, nil
}
......@@ -33,44 +33,55 @@ func makeHeaders(numHeaders uint64, prevHeader *types.Header) []types.Header {
return headers
}
func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) {
func TestHeaderTraversalNextHeadersNoOp(t *testing.T) {
client := new(MockEthClient)
// start from block 10 as the latest fetched block
lastHeader := &types.Header{Number: big.NewInt(10)}
headerTraversal := NewHeaderTraversal(client, lastHeader, bigint.Zero)
LastTraversedHeader := &types.Header{Number: big.NewInt(10)}
headerTraversal := NewHeaderTraversal(client, LastTraversedHeader, bigint.Zero)
require.Nil(t, headerTraversal.LatestHeader())
require.NotNil(t, headerTraversal.LastTraversedHeader())
// no new headers when matched with head
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(lastHeader, nil)
headers, err := headerTraversal.NextFinalizedHeaders(100)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(LastTraversedHeader, nil)
headers, err := headerTraversal.NextHeaders(100)
require.NoError(t, err)
require.Empty(t, headers)
require.NotNil(t, headerTraversal.LatestHeader())
require.NotNil(t, headerTraversal.LastTraversedHeader())
require.Equal(t, LastTraversedHeader.Number.Uint64(), headerTraversal.LatestHeader().Number.Uint64())
}
func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
func TestHeaderTraversalNextHeadersCursored(t *testing.T) {
client := new(MockEthClient)
// start from genesis
headerTraversal := NewHeaderTraversal(client, nil, bigint.Zero)
// blocks [0..4]
headers := makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
headers := makeHeaders(10, nil)
// blocks [0..4]. Latest reported is 7
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[7], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers[:5], nil)
_, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
// blocks [5..9]
headers = makeHeaders(5, &headers[len(headers)-1])
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5)
require.Equal(t, uint64(7), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(4), headerTraversal.LastTraversedHeader().Number.Uint64())
// blocks [5..9]. Latest Reported is 9
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[9], nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers[5:], nil)
_, err = headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
require.Equal(t, uint64(9), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(9), headerTraversal.LastTraversedHeader().Number.Uint64())
}
func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
func TestHeaderTraversalNextHeadersMaxSize(t *testing.T) {
client := new(MockEthClient)
// start from genesis
......@@ -82,16 +93,22 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
// clamped by the supplied size
headers := makeHeaders(5, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
headers, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
require.Equal(t, uint64(100), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(4), headerTraversal.LastTraversedHeader().Number.Uint64())
// clamped by the supplied size. FinalizedHeight == 100
headers = makeHeaders(10, &headers[len(headers)-1])
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(14))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(10)
headers, err = headerTraversal.NextHeaders(10)
require.NoError(t, err)
require.Len(t, headers, 10)
require.Equal(t, uint64(100), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(14), headerTraversal.LastTraversedHeader().Number.Uint64())
}
func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
......@@ -104,7 +121,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
headers := makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
headers, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
......@@ -112,7 +129,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
headers = makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(9)}, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5)
headers, err = headerTraversal.NextHeaders(5)
require.Nil(t, headers)
require.Equal(t, ErrHeaderTraversalAndProviderMismatchedState, err)
}
......@@ -12,7 +12,7 @@ import (
)
var (
MetricsNamespace = "rpc"
MetricsNamespace = "op_indexer_rpc"
batchMethod = "<batch>"
)
......
......@@ -231,6 +231,9 @@ func (b *BridgeProcessor) run() error {
batchLog.Info("indexed bridge events", "latest_l1_block_number", toL1Height, "latest_l2_block_number", toL2Height)
b.LatestL1Header = latestEpoch.L1BlockHeader.RLPHeader.Header()
b.metrics.RecordLatestIndexedL1Height(b.LatestL1Header.Number)
b.LatestL2Header = latestEpoch.L2BlockHeader.RLPHeader.Header()
b.metrics.RecordLatestIndexedL2Height(b.LatestL2Header.Number)
return nil
}
......@@ -10,7 +10,7 @@ import (
)
var (
MetricsNamespace string = "bridge"
MetricsNamespace string = "op_indexer_bridge"
)
type L1Metricer interface {
......@@ -83,7 +83,7 @@ func NewMetrics(registry *prometheus.Registry) Metricer {
}),
intervalFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "failures_total",
Name: "interval_failures_total",
Help: "number of failures encountered",
}),
latestL1Height: factory.NewGauge(prometheus.GaugeOpts{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment