Commit 576ee49d authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #7042 from ethereum-optimism/indexer.etl.metrics

feat(indexer): wire up ETL metrics
parents 1f27655e ce253950
......@@ -22,13 +22,14 @@ type Config struct {
type ETL struct {
log log.Logger
metrics Metricer
loopInterval time.Duration
headerBufferSize uint64
headerTraversal *node.HeaderTraversal
ethClient *ethclient.Client
contracts []common.Address
etlBatches chan ETLBatch
}
......@@ -47,8 +48,11 @@ func (etl *ETL) Start(ctx context.Context) error {
pollTicker := time.NewTicker(etl.loopInterval)
defer pollTicker.Stop()
etl.log.Info("starting etl...")
// A reference that'll stay populated between intervals
// in the event of failures in order to retry.
var headers []types.Header
etl.log.Info("starting etl...")
for {
select {
case <-done:
......@@ -56,61 +60,74 @@ func (etl *ETL) Start(ctx context.Context) error {
return nil
case <-pollTicker.C:
if len(headers) == 0 {
done := etl.metrics.RecordInterval()
if len(headers) > 0 {
etl.log.Info("retrying previous batch")
} else {
newHeaders, err := etl.headerTraversal.NextFinalizedHeaders(etl.headerBufferSize)
if err != nil {
etl.log.Error("error querying for headers", "err", err)
continue
}
if len(newHeaders) == 0 {
// Logged as an error since this loop should be operating at a longer interval than the provider
etl.log.Error("no new headers. processor unexpectedly at head...")
continue
} else if len(newHeaders) == 0 {
etl.log.Warn("no new headers. processor unexpectedly at head...")
}
headers = newHeaders
} else {
etl.log.Info("retrying previous batch")
etl.metrics.RecordBatchHeaders(len(newHeaders))
}
firstHeader := headers[0]
lastHeader := headers[len(headers)-1]
// only clear the reference if we were able to process this batch
err := etl.processBatch(headers)
if err == nil {
headers = nil
}
done(err)
}
}
}
func (etl *ETL) processBatch(headers []types.Header) error {
if len(headers) == 0 {
return nil
}
firstHeader, lastHeader := headers[0], headers[len(headers)-1]
batchLog := etl.log.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
batchLog.Info("extracting batch", "size", len(headers))
etl.metrics.RecordBatchLatestHeight(lastHeader.Number)
headerMap := make(map[common.Hash]*types.Header, len(headers))
for i := range headers {
headerMap[headers[i].Hash()] = &headers[i]
header := headers[i]
headerMap[header.Hash()] = &header
}
headersWithLog := make(map[common.Hash]bool, len(headers))
logFilter := ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts}
logs, err := etl.ethClient.FilterLogs(context.Background(), logFilter)
if err != nil {
batchLog.Info("unable to extract logs within batch", "err", err)
continue // spin and try again
batchLog.Info("unable to extract logs", "err", err)
return err
}
if len(logs) > 0 {
batchLog.Info("detected logs", "size", len(logs))
}
for i := range logs {
if _, ok := headerMap[logs[i].BlockHash]; !ok {
log := logs[i]
if _, ok := headerMap[log.BlockHash]; !ok {
// NOTE. Definitely an error state if the none of the headers were re-orged out in between
// the blocks and logs retrieval operations. However, we need to gracefully handle reorgs
batchLog.Error("log found with block hash not in the batch", "block_hash", logs[i].BlockHash, "log_index", logs[i].Index)
return errors.New("parsed log with a block hash not in the fetched batch")
}
headersWithLog[logs[i].BlockHash] = true
return errors.New("parsed log with a block hash not in the batch")
}
if len(logs) > 0 {
batchLog.Info("detected logs", "size", len(logs))
etl.metrics.RecordBatchLog(log.Address)
headersWithLog[log.BlockHash] = true
}
// create a new reference such that subsequent changes to `headers` does not affect the reference
// ensure we use unique downstream references for the etl batch
headersRef := headers
batch := ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs, HeadersWithLog: headersWithLog}
headers = nil
etl.etlBatches <- batch
}
}
etl.etlBatches <- ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs, HeadersWithLog: headersWithLog}
return nil
}
......@@ -21,7 +21,7 @@ type L1ETL struct {
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height.
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metrics, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
log = log.New("etl", "l1")
latestHeader, err := db.Blocks.L1LatestBlockHeader()
......@@ -61,6 +61,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, client node.EthClient
headerBufferSize: uint64(cfg.HeaderBufferSize),
log: log,
metrics: metrics.newMetricer("l1"),
headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(),
contracts: cSlice,
......@@ -81,16 +82,14 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
case err := <-errCh:
return err
// Index incoming batches
// Index incoming batches (only L1 blocks that have an emitted log)
case batch := <-l1Etl.etlBatches:
// Pull out only L1 blocks that have emitted a log ( <= batch.Headers )
l1BlockHeaders := make([]database.L1BlockHeader, 0, len(batch.Headers))
for i := range batch.Headers {
if _, ok := batch.HeadersWithLog[batch.Headers[i].Hash()]; ok {
l1BlockHeaders = append(l1BlockHeaders, database.L1BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])})
}
}
if len(l1BlockHeaders) == 0 {
batch.Logger.Info("no l1 blocks with logs in batch")
continue
......@@ -104,29 +103,28 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
_, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) {
err := l1Etl.db.Transaction(func(tx *database.DB) error {
if _, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) {
if err := l1Etl.db.Transaction(func(tx *database.DB) error {
if err := tx.Blocks.StoreL1BlockHeaders(l1BlockHeaders); err != nil {
return err
}
// we must have logs if we have l1 blocks
if err := tx.ContractEvents.StoreL1ContractEvents(l1ContractEvents); err != nil {
return err
}
return nil
})
if err != nil {
}); err != nil {
batch.Logger.Error("unable to persist batch", "err", err)
return nil, err
}
// a-ok! Can merge with the above block but being explicit
return nil, nil
})
l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders))
l1Etl.ETL.metrics.RecordIndexedLatestHeight(l1BlockHeaders[len(l1BlockHeaders)-1].Number)
l1Etl.ETL.metrics.RecordIndexedLogs(len(l1ContractEvents))
if err != nil {
// a-ok!
return nil, nil
}); err != nil {
return err
}
......
......@@ -4,6 +4,7 @@ import (
"math/big"
"github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/mock"
......@@ -17,6 +18,8 @@ import (
)
func Test_L1ETL_Construction(t *testing.T) {
etlMetrics := NewMetrics(metrics.NewRegistry())
type testSuite struct {
db *database.MockDB
client *node.MockEthClient
......@@ -100,7 +103,7 @@ func Test_L1ETL_Construction(t *testing.T) {
logger := log.NewLogger(log.DefaultCLIConfig())
cfg := Config{StartHeight: ts.start}
etl, err := NewL1ETL(cfg, logger, ts.db.DB, ts.client, ts.contracts)
etl, err := NewL1ETL(cfg, logger, ts.db.DB, etlMetrics, ts.client, ts.contracts)
test.assertion(etl, err)
})
}
......
......@@ -19,7 +19,7 @@ type L2ETL struct {
db *database.DB
}
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, error) {
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metrics, client node.EthClient) (*L2ETL, error) {
log = log.New("etl", "l2")
// allow predeploys to be overridable
......@@ -48,6 +48,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, client node.EthClient
headerBufferSize: uint64(cfg.HeaderBufferSize),
log: log,
metrics: metrics.newMetricer("l2"),
headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(),
contracts: l2Contracts,
......@@ -68,9 +69,8 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
case err := <-errCh:
return err
// Index incoming batches
// Index incoming batches (all L2 Blocks)
case batch := <-l2Etl.etlBatches:
// We're indexing every L2 block.
l2BlockHeaders := make([]database.L2BlockHeader, len(batch.Headers))
for i := range batch.Headers {
l2BlockHeaders[i] = database.L2BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])}
......@@ -82,10 +82,10 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
l2ContractEvents[i] = database.L2ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
}
// Continually try to persist this batch. If it fails after 5 attempts, we simply error out
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
_, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) {
err := l2Etl.db.Transaction(func(tx *database.DB) error {
if _, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) {
if err := l2Etl.db.Transaction(func(tx *database.DB) error {
if err := tx.Blocks.StoreL2BlockHeaders(l2BlockHeaders); err != nil {
return err
}
......@@ -95,18 +95,20 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
}
}
return nil
})
if err != nil {
}); err != nil {
batch.Logger.Error("unable to persist batch", "err", err)
return nil, err
}
// a-ok! Can merge with the above block but being explicit
return nil, nil
})
l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders))
l2Etl.ETL.metrics.RecordIndexedLatestHeight(l2BlockHeaders[len(l2BlockHeaders)-1].Number)
if len(l2ContractEvents) > 0 {
l2Etl.ETL.metrics.RecordIndexedLogs(len(l2ContractEvents))
}
if err != nil {
// a-ok!
return nil, nil
}); err != nil {
return err
}
......
package etl
import (
"math/big"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
)
var (
MetricsNamespace string = "etl"
_ Metricer = &metricer{}
)
type Metrics interface {
newMetricer(etl string) Metricer
}
type Metricer interface {
RecordInterval() (done func(err error))
// Batch Extraction
RecordBatchFailure()
RecordBatchLatestHeight(height *big.Int)
RecordBatchHeaders(size int)
RecordBatchLog(contractAddress common.Address)
// Indexed Batches
RecordIndexedLatestHeight(height *big.Int)
RecordIndexedHeaders(size int)
RecordIndexedLogs(size int)
}
type etlMetrics struct {
intervalTick *prometheus.CounterVec
intervalDuration *prometheus.HistogramVec
batchFailures *prometheus.CounterVec
batchLatestHeight *prometheus.GaugeVec
batchHeaders *prometheus.CounterVec
batchLogs *prometheus.CounterVec
indexedLatestHeight *prometheus.GaugeVec
indexedHeaders *prometheus.CounterVec
indexedLogs *prometheus.CounterVec
}
type metricerFactory struct {
metrics *etlMetrics
}
type metricer struct {
etl string
metrics *etlMetrics
}
func NewMetrics(registry *prometheus.Registry) Metrics {
return &metricerFactory{metrics: newMetrics(registry)}
}
func (factory *metricerFactory) newMetricer(etl string) Metricer {
return &metricer{etl, factory.metrics}
}
func newMetrics(registry *prometheus.Registry) *etlMetrics {
factory := metrics.With(registry)
return &etlMetrics{
intervalTick: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "intervals_total",
Help: "number of times the etl has run its extraction loop",
}, []string{
"etl",
}),
intervalDuration: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Name: "interval_seconds",
Help: "duration elapsed for during the processing loop",
}, []string{
"etl",
}),
batchFailures: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "failures_total",
Help: "number of times the etl encountered a failure to extract a batch",
}, []string{
"etl",
}),
batchLatestHeight: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "height",
Help: "the latest block height observed by an etl interval",
}, []string{
"etl",
}),
batchHeaders: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "headers_total",
Help: "number of headers observed by the etl",
}, []string{
"etl",
}),
batchLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "logs_total",
Help: "number of logs observed by the etl",
}, []string{
"etl",
"contract",
}),
indexedLatestHeight: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Name: "indexed_height",
Help: "the latest block height indexed into the database",
}, []string{
"etl",
}),
indexedHeaders: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "indexed_headers_total",
Help: "number of headers indexed by the etl",
}, []string{
"etl",
}),
indexedLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "indexed_logs_total",
Help: "number of logs indexed by the etl",
}, []string{
"etl",
}),
}
}
func (m *metricer) RecordInterval() func(error) {
m.metrics.intervalTick.WithLabelValues(m.etl).Inc()
timer := prometheus.NewTimer(m.metrics.intervalDuration.WithLabelValues(m.etl))
return func(err error) {
if err != nil {
m.RecordBatchFailure()
}
timer.ObserveDuration()
}
}
func (m *metricer) RecordBatchFailure() {
m.metrics.batchFailures.WithLabelValues(m.etl).Inc()
}
func (m *metricer) RecordBatchLatestHeight(height *big.Int) {
m.metrics.batchLatestHeight.WithLabelValues(m.etl).Set(float64(height.Uint64()))
}
func (m *metricer) RecordBatchHeaders(size int) {
m.metrics.batchHeaders.WithLabelValues(m.etl).Add(float64(size))
}
func (m *metricer) RecordBatchLog(contractAddress common.Address) {
m.metrics.batchLogs.WithLabelValues(m.etl, contractAddress.String()).Inc()
}
func (m *metricer) RecordIndexedLatestHeight(height *big.Int) {
m.metrics.indexedLatestHeight.WithLabelValues(m.etl).Set(float64(height.Uint64()))
}
func (m *metricer) RecordIndexedHeaders(size int) {
m.metrics.indexedHeaders.WithLabelValues(m.etl).Add(float64(size))
}
func (m *metricer) RecordIndexedLogs(size int) {
m.metrics.indexedLogs.WithLabelValues(m.etl).Add(float64(size))
}
......@@ -35,6 +35,7 @@ type Indexer struct {
// NewIndexer initializes an instance of the Indexer
func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConfig, rpcsConfig config.RPCsConfig, metricsConfig config.MetricsConfig) (*Indexer, error) {
metricsRegistry := metrics.NewRegistry()
etlMetrics := etl.NewMetrics(metricsRegistry)
// L1
l1EthClient, err := node.DialEthClient(rpcsConfig.L1RPC)
......@@ -42,7 +43,7 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
return nil, err
}
l1Cfg := etl.Config{LoopIntervalMsec: chainConfig.L1PollingInterval, HeaderBufferSize: chainConfig.L1HeaderBufferSize, StartHeight: chainConfig.L1StartHeight()}
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, l1EthClient, chainConfig.L1Contracts)
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, etlMetrics, l1EthClient, chainConfig.L1Contracts)
if err != nil {
return nil, err
}
......@@ -53,7 +54,7 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
return nil, err
}
l2Cfg := etl.Config{LoopIntervalMsec: chainConfig.L2PollingInterval, HeaderBufferSize: chainConfig.L2HeaderBufferSize}
l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, l2EthClient)
l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, etlMetrics, l2EthClient)
if err != nil {
return nil, err
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment