Commit 0fcbe0b2 authored by Hamdi Allam's avatar Hamdi Allam Committed by GitHub

crr(indexer): clearer etl metrics (#8762)

* update metrics

* etl metrics

* update etls

* bridge changes

* keep metric name the same

* bugfix
parent 2aedac82
...@@ -161,11 +161,6 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error { ...@@ -161,11 +161,6 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
} }
} }
if len(l1BlockHeaders) == 0 {
batch.Logger.Info("no l1 blocks with logs in batch")
return nil
}
l1ContractEvents := make([]database.L1ContractEvent, len(batch.Logs)) l1ContractEvents := make([]database.L1ContractEvent, len(batch.Logs))
for i := range batch.Logs { for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
...@@ -176,6 +171,10 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error { ...@@ -176,6 +171,10 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out // Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250} retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
if _, err := retry.Do[interface{}](l1Etl.resourceCtx, 10, retryStrategy, func() (interface{}, error) { if _, err := retry.Do[interface{}](l1Etl.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
if len(l1BlockHeaders) == 0 {
return nil, nil // skip
}
if err := l1Etl.db.Transaction(func(tx *database.DB) error { if err := l1Etl.db.Transaction(func(tx *database.DB) error {
if err := tx.Blocks.StoreL1BlockHeaders(l1BlockHeaders); err != nil { if err := tx.Blocks.StoreL1BlockHeaders(l1BlockHeaders); err != nil {
return err return err
...@@ -190,17 +189,23 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error { ...@@ -190,17 +189,23 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
return nil, fmt.Errorf("unable to persist batch: %w", err) return nil, fmt.Errorf("unable to persist batch: %w", err)
} }
l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders))
l1Etl.ETL.metrics.RecordIndexedLatestHeight(l1BlockHeaders[len(l1BlockHeaders)-1].Number)
// a-ok! // a-ok!
return nil, nil return nil, nil
}); err != nil { }); err != nil {
return err return err
} }
batch.Logger.Info("indexed batch") if len(l1BlockHeaders) == 0 {
batch.Logger.Info("skipped batch. no logs found")
} else {
batch.Logger.Info("indexed batch")
}
// Since not every L1 block is indexed, we still want our metrics to cover L1 blocks
// that have been observed so that a false stall alert isn't triggered on low activity
l1Etl.LatestHeader = &batch.Headers[len(batch.Headers)-1] l1Etl.LatestHeader = &batch.Headers[len(batch.Headers)-1]
l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders))
l1Etl.ETL.metrics.RecordEtlLatestHeight(l1Etl.LatestHeader.Number)
// Notify Listeners // Notify Listeners
l1Etl.mu.Lock() l1Etl.mu.Lock()
......
...@@ -168,9 +168,6 @@ func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error { ...@@ -168,9 +168,6 @@ func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error {
return nil, err return nil, err
} }
l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders))
l2Etl.ETL.metrics.RecordIndexedLatestHeight(l2BlockHeaders[len(l2BlockHeaders)-1].Number)
// a-ok! // a-ok!
return nil, nil return nil, nil
}); err != nil { }); err != nil {
...@@ -178,7 +175,11 @@ func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error { ...@@ -178,7 +175,11 @@ func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error {
} }
batch.Logger.Info("indexed batch") batch.Logger.Info("indexed batch")
// All L2 blocks are indexed so len(batch.Headers) == len(l2BlockHeaders)
l2Etl.LatestHeader = &batch.Headers[len(batch.Headers)-1] l2Etl.LatestHeader = &batch.Headers[len(batch.Headers)-1]
l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders))
l2Etl.ETL.metrics.RecordEtlLatestHeight(l2Etl.LatestHeader.Number)
// Notify Listeners // Notify Listeners
l2Etl.mu.Lock() l2Etl.mu.Lock()
......
...@@ -16,8 +16,7 @@ type Metricer interface { ...@@ -16,8 +16,7 @@ type Metricer interface {
RecordInterval() (done func(err error)) RecordInterval() (done func(err error))
RecordLatestHeight(height *big.Int) RecordLatestHeight(height *big.Int)
// Indexed Batches RecordEtlLatestHeight(height *big.Int)
RecordIndexedLatestHeight(height *big.Int)
RecordIndexedHeaders(size int) RecordIndexedHeaders(size int)
RecordIndexedLog(contractAddress common.Address) RecordIndexedLog(contractAddress common.Address)
} }
...@@ -28,9 +27,9 @@ type etlMetrics struct { ...@@ -28,9 +27,9 @@ type etlMetrics struct {
intervalFailures prometheus.Counter intervalFailures prometheus.Counter
latestHeight prometheus.Gauge latestHeight prometheus.Gauge
indexedLatestHeight prometheus.Gauge etlLatestHeight prometheus.Gauge
indexedHeaders prometheus.Counter indexedHeaders prometheus.Counter
indexedLogs *prometheus.CounterVec indexedLogs *prometheus.CounterVec
} }
func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer { func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
...@@ -60,11 +59,11 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer { ...@@ -60,11 +59,11 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
Name: "latest_height", Name: "latest_height",
Help: "the latest height reported by the connected client", Help: "the latest height reported by the connected client",
}), }),
indexedLatestHeight: factory.NewGauge(prometheus.GaugeOpts{ etlLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "indexed_height", Name: "indexed_height",
Help: "the latest block height indexed into the database", Help: "the latest block height after a processing interval by the etl",
}), }),
indexedHeaders: factory.NewCounter(prometheus.CounterOpts{ indexedHeaders: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
...@@ -98,8 +97,8 @@ func (m *etlMetrics) RecordLatestHeight(height *big.Int) { ...@@ -98,8 +97,8 @@ func (m *etlMetrics) RecordLatestHeight(height *big.Int) {
m.latestHeight.Set(float64(height.Uint64())) m.latestHeight.Set(float64(height.Uint64()))
} }
func (m *etlMetrics) RecordIndexedLatestHeight(height *big.Int) { func (m *etlMetrics) RecordEtlLatestHeight(height *big.Int) {
m.indexedLatestHeight.Set(float64(height.Uint64())) m.etlLatestHeight.Set(float64(height.Uint64()))
} }
func (m *etlMetrics) RecordIndexedHeaders(size int) { func (m *etlMetrics) RecordIndexedHeaders(size int) {
......
...@@ -132,8 +132,9 @@ func (b *BridgeProcessor) onL1Data() error { ...@@ -132,8 +132,9 @@ func (b *BridgeProcessor) onL1Data() error {
} }
// `LastFinalizedL2Header` and `LastL1Header` are mutated by the same routine and can // `LastFinalizedL2Header` and `LastL1Header` are mutated by the same routine and can
// safely be read without needing any sync primitives // safely be read without needing any sync primitives. Not every L1 block is indexed
if b.LastFinalizedL2Header == nil || b.LastFinalizedL2Header.Timestamp < b.LastL1Header.Timestamp { // so check against a false interval on start.
if b.LastL1Header != nil && (b.LastFinalizedL2Header == nil || b.LastFinalizedL2Header.Timestamp < b.LastL1Header.Timestamp) {
if err := b.processFinalizedL2Events(); err != nil { if err := b.processFinalizedL2Events(); err != nil {
b.log.Error("failed to process finalized L2 events", "err", err) b.log.Error("failed to process finalized L2 events", "err", err)
errs = errors.Join(errs, err) errs = errors.Join(errs, err)
...@@ -178,8 +179,8 @@ func (b *BridgeProcessor) processInitiatedL1Events() error { ...@@ -178,8 +179,8 @@ func (b *BridgeProcessor) processInitiatedL1Events() error {
lastL1BlockNumber = b.LastL1Header.Number lastL1BlockNumber = b.LastL1Header.Number
} }
// Latest unobserved L1 state bounded by `blockLimits` blocks. Since this process is driven on new L1 data, // Latest unobserved L1 state bounded by `blockLimits` blocks. Since
// we always expect this query to return a new result // not every L1 block is indexed, we may have nothing to process.
latestL1HeaderScope := func(db *gorm.DB) *gorm.DB { latestL1HeaderScope := func(db *gorm.DB) *gorm.DB {
newQuery := db.Session(&gorm.Session{NewDB: true}) // fresh subquery newQuery := db.Session(&gorm.Session{NewDB: true}) // fresh subquery
headers := newQuery.Model(database.L1BlockHeader{}).Where("number > ?", lastL1BlockNumber) headers := newQuery.Model(database.L1BlockHeader{}).Where("number > ?", lastL1BlockNumber)
...@@ -189,7 +190,8 @@ func (b *BridgeProcessor) processInitiatedL1Events() error { ...@@ -189,7 +190,8 @@ func (b *BridgeProcessor) processInitiatedL1Events() error {
if err != nil { if err != nil {
return fmt.Errorf("failed to query new L1 state: %w", err) return fmt.Errorf("failed to query new L1 state: %w", err)
} else if latestL1Header == nil { } else if latestL1Header == nil {
return fmt.Errorf("no new L1 state found") l1BridgeLog.Debug("no new L1 state found")
return nil
} }
fromL1Height, toL1Height := new(big.Int).Add(lastL1BlockNumber, bigint.One), latestL1Header.Number fromL1Height, toL1Height := new(big.Int).Add(lastL1BlockNumber, bigint.One), latestL1Header.Number
...@@ -231,8 +233,8 @@ func (b *BridgeProcessor) processInitiatedL2Events() error { ...@@ -231,8 +233,8 @@ func (b *BridgeProcessor) processInitiatedL2Events() error {
lastL2BlockNumber = b.LastL2Header.Number lastL2BlockNumber = b.LastL2Header.Number
} }
// Latest unobserved L2 state bounded by `blockLimits` blocks. Since this process is driven by new L2 data, // Latest unobserved L2 state bounded by `blockLimits` blocks.
// we always expect this query to return a new result // Since every L2 block is indexed, we always expect new state.
latestL2HeaderScope := func(db *gorm.DB) *gorm.DB { latestL2HeaderScope := func(db *gorm.DB) *gorm.DB {
newQuery := db.Session(&gorm.Session{NewDB: true}) // fresh subquery newQuery := db.Session(&gorm.Session{NewDB: true}) // fresh subquery
headers := newQuery.Model(database.L2BlockHeader{}).Where("number > ?", lastL2BlockNumber) headers := newQuery.Model(database.L2BlockHeader{}).Where("number > ?", lastL2BlockNumber)
...@@ -286,8 +288,8 @@ func (b *BridgeProcessor) processFinalizedL1Events() error { ...@@ -286,8 +288,8 @@ func (b *BridgeProcessor) processFinalizedL1Events() error {
lastFinalizedL1BlockNumber = b.LastFinalizedL1Header.Number lastFinalizedL1BlockNumber = b.LastFinalizedL1Header.Number
} }
// Latest unfinalized L1 state bounded by `blockLimit` blocks that have had L2 bridge events indexed. Since L1 data // Latest unfinalized L1 state bounded by `blockLimit` blocks that have had L2 bridge events
// is indexed independently of L2, there may not be new L1 state to finalized // indexed. Since L1 data is indexed independently, there may not be new L1 state to finalize
latestL1HeaderScope := func(db *gorm.DB) *gorm.DB { latestL1HeaderScope := func(db *gorm.DB) *gorm.DB {
newQuery := db.Session(&gorm.Session{NewDB: true}) // fresh subquery newQuery := db.Session(&gorm.Session{NewDB: true}) // fresh subquery
headers := newQuery.Model(database.L1BlockHeader{}).Where("number > ? AND timestamp <= ?", lastFinalizedL1BlockNumber, b.LastL2Header.Timestamp) headers := newQuery.Model(database.L1BlockHeader{}).Where("number > ? AND timestamp <= ?", lastFinalizedL1BlockNumber, b.LastL2Header.Timestamp)
...@@ -340,8 +342,8 @@ func (b *BridgeProcessor) processFinalizedL2Events() error { ...@@ -340,8 +342,8 @@ func (b *BridgeProcessor) processFinalizedL2Events() error {
lastFinalizedL2BlockNumber = b.LastFinalizedL2Header.Number lastFinalizedL2BlockNumber = b.LastFinalizedL2Header.Number
} }
// Latest unfinalized L2 state bounded by `blockLimit` blocks that have had L1 bridge events indexed. Since L2 data // Latest unfinalized L2 state bounded by `blockLimit` blocks that have had L1 bridge events
// is indexed independently of L1, there may not be new L2 state to finalized // indexed. Since L2 data is indexed independently, there may not be new L2 state to finalize
latestL2HeaderScope := func(db *gorm.DB) *gorm.DB { latestL2HeaderScope := func(db *gorm.DB) *gorm.DB {
newQuery := db.Session(&gorm.Session{NewDB: true}) // fresh subquery newQuery := db.Session(&gorm.Session{NewDB: true}) // fresh subquery
headers := newQuery.Model(database.L2BlockHeader{}).Where("number > ? AND timestamp <= ?", lastFinalizedL2BlockNumber, b.LastL1Header.Timestamp) headers := newQuery.Model(database.L2BlockHeader{}).Where("number > ? AND timestamp <= ?", lastFinalizedL2BlockNumber, b.LastL1Header.Timestamp)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment