Commit 7b5c3380 authored by Hamdi Allam's avatar Hamdi Allam Committed by GitHub

feat(indexer): index block after specified period of inactivity (#10021)

* address etl log inactivity

* Update indexer/README.md
Co-authored-by: default avatarWill Cory <willcory10@gmail.com>

---------
Co-authored-by: default avatarWill Cory <willcory10@gmail.com>
parent 51a661b9
...@@ -54,7 +54,8 @@ The indexer service is responsible for polling and processing real-time batches ...@@ -54,7 +54,8 @@ The indexer service is responsible for polling and processing real-time batches
#### L1 Poller #### L1 Poller
L1 blocks are only indexed if they contain L1 contract events. This is done to reduce the amount of unnecessary data that is indexed. Because of this, the `l1_block_headers` table will not contain every L1 block header unlike L2 blocks. L1 blocks are only indexed if they contain L1 contract events. This is done to reduce the amount of unnecessary data that is indexed. Because of this, the `l1_block_headers` table will not contain every L1 block header unlike L2 blocks.
An **exception** to this is if no log activity has been observed over the specified `ETLAllowedInactivityWindowSeconds` value in the [chain config](https://github.com/ethereum-optimism/optimism/blob/develop/indexer/config/config.go) -- disabled by default with a zero value. Past this duration, the L1 ETL will index the latest
observed L1 header.
#### Database #### Database
The indexer service currently supports a Postgres database for storing L1/L2 OP Stack chain data. The most up-to-date database schemas can be found in the `./migrations` directory. **Run the idempotent migrations prior to starting the indexer** The indexer service currently supports a Postgres database for storing L1/L2 OP Stack chain data. The most up-to-date database schemas can be found in the `./migrations` directory. **Run the idempotent migrations prior to starting the indexer**
......
...@@ -114,6 +114,9 @@ type ChainConfig struct { ...@@ -114,6 +114,9 @@ type ChainConfig struct {
L1HeaderBufferSize uint `toml:"l1-header-buffer-size"` L1HeaderBufferSize uint `toml:"l1-header-buffer-size"`
L2HeaderBufferSize uint `toml:"l2-header-buffer-size"` L2HeaderBufferSize uint `toml:"l2-header-buffer-size"`
// Inactivity allowed before a block is indexed by the ETL. Default 0 value disables this feature
ETLAllowedInactivityWindowSeconds uint `toml:"etl-allowed-inactivity-window-seconds"`
} }
// RPCsConfig configures the RPC urls // RPCsConfig configures the RPC urls
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-bindings/bindings" "github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-bindings/bindingspreview" "github.com/ethereum-optimism/optimism/op-bindings/bindingspreview"
...@@ -21,6 +22,33 @@ import ( ...@@ -21,6 +22,33 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestE2EL1ETLInactivityWindow(t *testing.T) {
withInactivityWindow := func(cfg *config.Config) *config.Config {
cfg.Chain.ETLAllowedInactivityWindowSeconds = 1
// Passing the inactivity window will index the latest header
// in the batch. Make the batch size 1 so all blocks are indexed
cfg.Chain.L1HeaderBufferSize = 1
return cfg
}
testSuite := createE2ETestSuite(t, withInactivityWindow)
// wait for 10 L1 blocks to be posted
require.NoError(t, wait.For(context.Background(), time.Second, func() (bool, error) {
l1Header := testSuite.Indexer.BridgeProcessor.LastL1Header
return l1Header != nil && l1Header.Number.Uint64() >= 10, nil
}))
// each block is indexed
for height := int64(0); height < int64(10); height++ {
header, err := testSuite.DB.Blocks.L1BlockHeaderWithFilter(database.BlockHeader{Number: big.NewInt(height)})
require.NoError(t, err)
require.NotNil(t, header)
require.Equal(t, header.Number.Uint64(), uint64(height))
}
}
func TestE2EETL(t *testing.T) { func TestE2EETL(t *testing.T) {
testSuite := createE2ETestSuite(t) testSuite := createE2ETestSuite(t)
......
...@@ -54,6 +54,8 @@ type E2ETestSuite struct { ...@@ -54,6 +54,8 @@ type E2ETestSuite struct {
L2Client *ethclient.Client L2Client *ethclient.Client
} }
type ConfigOpts func(*config.Config) *config.Config
func init() { func init() {
// Disable the global logger. Ideally we'd like to dump geth // Disable the global logger. Ideally we'd like to dump geth
// logs per-test but that's possible when running tests in // logs per-test but that's possible when running tests in
...@@ -62,10 +64,12 @@ func init() { ...@@ -62,10 +64,12 @@ func init() {
} }
// createE2ETestSuite ... Create a new E2E test suite // createE2ETestSuite ... Create a new E2E test suite
func createE2ETestSuite(t *testing.T) E2ETestSuite { func createE2ETestSuite(t *testing.T, cfgOpt ...ConfigOpts) E2ETestSuite {
dbUser := os.Getenv("DB_USER") dbUser := os.Getenv("DB_USER")
dbName := setupTestDatabase(t) dbName := setupTestDatabase(t)
require.LessOrEqual(t, len(cfgOpt), 1)
// E2E tests can run on the order of magnitude of minutes. // E2E tests can run on the order of magnitude of minutes.
// We mark the test as parallel before starting the devnet // We mark the test as parallel before starting the devnet
// to reduce that number of idle routines when paused. // to reduce that number of idle routines when paused.
...@@ -114,6 +118,11 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -114,6 +118,11 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
MetricsServer: config.ServerConfig{Host: "127.0.0.1", Port: 0}, MetricsServer: config.ServerConfig{Host: "127.0.0.1", Port: 0},
} }
// apply any settings
for _, opt := range cfgOpt {
indexerCfg = opt(indexerCfg)
}
indexerLog := testlog.Logger(t, log.LevelInfo).New("role", "indexer") indexerLog := testlog.Logger(t, log.LevelInfo).New("role", "indexer")
ix, err := indexer.NewIndexer(context.Background(), indexerLog, indexerCfg, func(cause error) { ix, err := indexer.NewIndexer(context.Background(), indexerLog, indexerCfg, func(cause error) {
if cause != nil { if cause != nil {
......
...@@ -20,6 +20,9 @@ type Config struct { ...@@ -20,6 +20,9 @@ type Config struct {
LoopIntervalMsec uint LoopIntervalMsec uint
HeaderBufferSize uint HeaderBufferSize uint
// Applicable only to the L1 ETL (all L2 block are indexed)
AllowedInactivityWindowSeconds uint
StartHeight *big.Int StartHeight *big.Int
ConfirmationDepth *big.Int ConfirmationDepth *big.Int
} }
......
...@@ -21,7 +21,9 @@ import ( ...@@ -21,7 +21,9 @@ import (
type L1ETL struct { type L1ETL struct {
ETL ETL
LatestHeader *types.Header latestHeader *types.Header
cfg Config
// the batch handler may do work that we can interrupt on shutdown // the batch handler may do work that we can interrupt on shutdown
resourceCtx context.Context resourceCtx context.Context
...@@ -32,7 +34,7 @@ type L1ETL struct { ...@@ -32,7 +34,7 @@ type L1ETL struct {
db *database.DB db *database.DB
mu sync.Mutex mu sync.Mutex
listeners []chan interface{} listeners []chan *types.Header
} }
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points // NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
...@@ -102,7 +104,9 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -102,7 +104,9 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
resCtx, resCancel := context.WithCancel(context.Background()) resCtx, resCancel := context.WithCancel(context.Background())
return &L1ETL{ return &L1ETL{
ETL: etl, ETL: etl,
LatestHeader: fromHeader, latestHeader: fromHeader,
cfg: cfg,
db: db, db: db,
resourceCtx: resCtx, resourceCtx: resCtx,
...@@ -161,6 +165,15 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error { ...@@ -161,6 +165,15 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
} }
} }
// If there has been no activity and the inactivity window has elapsed since the last header, index the latest
// block to remediate any false-stall alerts on downstream processors that rely on indexed state.
if l1Etl.cfg.AllowedInactivityWindowSeconds > 0 && len(l1BlockHeaders) == 0 {
latestHeader := batch.Headers[len(batch.Headers)-1]
if l1Etl.latestHeader == nil || latestHeader.Time-l1Etl.latestHeader.Time > uint64(l1Etl.cfg.AllowedInactivityWindowSeconds) {
l1BlockHeaders = append(l1BlockHeaders, database.L1BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&latestHeader)})
}
}
l1ContractEvents := make([]database.L1ContractEvent, len(batch.Logs)) l1ContractEvents := make([]database.L1ContractEvent, len(batch.Logs))
for i := range batch.Logs { for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
...@@ -180,8 +193,10 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error { ...@@ -180,8 +193,10 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
return err return err
} }
// we must have logs if we have l1 blocks // we must have logs if we have l1 blocks
if err := tx.ContractEvents.StoreL1ContractEvents(l1ContractEvents); err != nil { if len(l1ContractEvents) > 0 {
return err if err := tx.ContractEvents.StoreL1ContractEvents(l1ContractEvents); err != nil {
return err
}
} }
return nil return nil
}); err != nil { }); err != nil {
...@@ -203,16 +218,16 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error { ...@@ -203,16 +218,16 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
// Since not every L1 block is indexed, we still want our metrics to cover L1 blocks // Since not every L1 block is indexed, we still want our metrics to cover L1 blocks
// that have been observed so that a false stall alert isn't triggered on low activity // that have been observed so that a false stall alert isn't triggered on low activity
l1Etl.LatestHeader = &batch.Headers[len(batch.Headers)-1] l1Etl.latestHeader = &batch.Headers[len(batch.Headers)-1]
l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders)) l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders))
l1Etl.ETL.metrics.RecordEtlLatestHeight(l1Etl.LatestHeader.Number) l1Etl.ETL.metrics.RecordEtlLatestHeight(l1Etl.latestHeader.Number)
// Notify Listeners // Notify Listeners
l1Etl.mu.Lock() l1Etl.mu.Lock()
defer l1Etl.mu.Unlock() defer l1Etl.mu.Unlock()
for i := range l1Etl.listeners { for i := range l1Etl.listeners {
select { select {
case l1Etl.listeners[i] <- struct{}{}: case l1Etl.listeners[i] <- l1Etl.latestHeader:
default: default:
// do nothing if the listener hasn't picked // do nothing if the listener hasn't picked
// up the previous notif // up the previous notif
...@@ -222,10 +237,9 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error { ...@@ -222,10 +237,9 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
return nil return nil
} }
// Notify returns a channel that'll receive a value every time new data has // Notify returns a channel that'll receive the latest header when new data has been persisted
// been persisted by the L1ETL func (l1Etl *L1ETL) Notify() <-chan *types.Header {
func (l1Etl *L1ETL) Notify() <-chan interface{} { receiver := make(chan *types.Header)
receiver := make(chan interface{})
l1Etl.mu.Lock() l1Etl.mu.Lock()
defer l1Etl.mu.Unlock() defer l1Etl.mu.Unlock()
......
...@@ -20,7 +20,7 @@ import ( ...@@ -20,7 +20,7 @@ import (
type L2ETL struct { type L2ETL struct {
ETL ETL
LatestHeader *types.Header latestHeader *types.Header
// the batch handler may do work that we can interrupt on shutdown // the batch handler may do work that we can interrupt on shutdown
resourceCtx context.Context resourceCtx context.Context
...@@ -31,7 +31,7 @@ type L2ETL struct { ...@@ -31,7 +31,7 @@ type L2ETL struct {
db *database.DB db *database.DB
mu sync.Mutex mu sync.Mutex
listeners []chan interface{} listeners []chan *types.Header
} }
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient, func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient,
...@@ -86,7 +86,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -86,7 +86,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
resCtx, resCancel := context.WithCancel(context.Background()) resCtx, resCancel := context.WithCancel(context.Background())
return &L2ETL{ return &L2ETL{
ETL: etl, ETL: etl,
LatestHeader: fromHeader, latestHeader: fromHeader,
resourceCtx: resCtx, resourceCtx: resCtx,
resourceCancel: resCancel, resourceCancel: resCancel,
...@@ -150,6 +150,8 @@ func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error { ...@@ -150,6 +150,8 @@ func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error {
l2Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address) l2Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
} }
/** Every L2 block is indexed so the inactivity window does not apply here **/
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out // Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250} retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
if _, err := retry.Do[interface{}](l2Etl.resourceCtx, 10, retryStrategy, func() (interface{}, error) { if _, err := retry.Do[interface{}](l2Etl.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
...@@ -177,16 +179,16 @@ func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error { ...@@ -177,16 +179,16 @@ func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error {
batch.Logger.Info("indexed batch") batch.Logger.Info("indexed batch")
// All L2 blocks are indexed so len(batch.Headers) == len(l2BlockHeaders) // All L2 blocks are indexed so len(batch.Headers) == len(l2BlockHeaders)
l2Etl.LatestHeader = &batch.Headers[len(batch.Headers)-1] l2Etl.latestHeader = &batch.Headers[len(batch.Headers)-1]
l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders)) l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders))
l2Etl.ETL.metrics.RecordEtlLatestHeight(l2Etl.LatestHeader.Number) l2Etl.ETL.metrics.RecordEtlLatestHeight(l2Etl.latestHeader.Number)
// Notify Listeners // Notify Listeners
l2Etl.mu.Lock() l2Etl.mu.Lock()
defer l2Etl.mu.Unlock() defer l2Etl.mu.Unlock()
for i := range l2Etl.listeners { for i := range l2Etl.listeners {
select { select {
case l2Etl.listeners[i] <- struct{}{}: case l2Etl.listeners[i] <- l2Etl.latestHeader:
default: default:
// do nothing if the listener hasn't picked // do nothing if the listener hasn't picked
// up the previous notif // up the previous notif
...@@ -196,10 +198,9 @@ func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error { ...@@ -196,10 +198,9 @@ func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error {
return nil return nil
} }
// Notify returns a channel that'll receive a value every time new data has // Notify returns a channel that'll receive the latest header when new data has been persisted
// been persisted by the L2ETL func (l2Etl *L2ETL) Notify() <-chan *types.Header {
func (l2Etl *L2ETL) Notify() <-chan interface{} { receiver := make(chan *types.Header)
receiver := make(chan interface{})
l2Etl.mu.Lock() l2Etl.mu.Lock()
defer l2Etl.mu.Unlock() defer l2Etl.mu.Unlock()
......
...@@ -191,10 +191,11 @@ func (ix *Indexer) initDB(ctx context.Context, cfg config.DBConfig) error { ...@@ -191,10 +191,11 @@ func (ix *Indexer) initDB(ctx context.Context, cfg config.DBConfig) error {
func (ix *Indexer) initL1ETL(chainConfig config.ChainConfig) error { func (ix *Indexer) initL1ETL(chainConfig config.ChainConfig) error {
l1Cfg := etl.Config{ l1Cfg := etl.Config{
LoopIntervalMsec: chainConfig.L1PollingInterval, LoopIntervalMsec: chainConfig.L1PollingInterval,
HeaderBufferSize: chainConfig.L1HeaderBufferSize, HeaderBufferSize: chainConfig.L1HeaderBufferSize,
ConfirmationDepth: big.NewInt(int64(chainConfig.L1ConfirmationDepth)), AllowedInactivityWindowSeconds: chainConfig.ETLAllowedInactivityWindowSeconds,
StartHeight: big.NewInt(int64(chainConfig.L1StartingHeight)), ConfirmationDepth: big.NewInt(int64(chainConfig.L1ConfirmationDepth)),
StartHeight: big.NewInt(int64(chainConfig.L1StartingHeight)),
} }
l1Etl, err := etl.NewL1ETL(l1Cfg, ix.log, ix.DB, etl.NewMetrics(ix.metricsRegistry, "l1"), l1Etl, err := etl.NewL1ETL(l1Cfg, ix.log, ix.DB, etl.NewMetrics(ix.metricsRegistry, "l1"),
ix.l1Client, chainConfig.L1Contracts, ix.shutdown) ix.l1Client, chainConfig.L1Contracts, ix.shutdown)
......
...@@ -92,9 +92,9 @@ func (b *BridgeProcessor) Start() error { ...@@ -92,9 +92,9 @@ func (b *BridgeProcessor) Start() error {
// start L1 worker // start L1 worker
b.tasks.Go(func() error { b.tasks.Go(func() error {
l1EtlUpdates := b.l1Etl.Notify() l1EtlUpdates := b.l1Etl.Notify()
for range l1EtlUpdates { for latestHeader := range l1EtlUpdates {
b.log.Info("notified of traversed L1 state", "l1_etl_block_number", b.l1Etl.LatestHeader.Number) b.log.Info("notified of traversed L1 state", "l1_etl_block_number", latestHeader.Number)
if err := b.onL1Data(b.l1Etl.LatestHeader); err != nil { if err := b.onL1Data(latestHeader); err != nil {
b.log.Error("failed l1 bridge processing interval", "err", err) b.log.Error("failed l1 bridge processing interval", "err", err)
} }
} }
...@@ -104,9 +104,9 @@ func (b *BridgeProcessor) Start() error { ...@@ -104,9 +104,9 @@ func (b *BridgeProcessor) Start() error {
// start L2 worker // start L2 worker
b.tasks.Go(func() error { b.tasks.Go(func() error {
l2EtlUpdates := b.l2Etl.Notify() l2EtlUpdates := b.l2Etl.Notify()
for range l2EtlUpdates { for latestHeader := range l2EtlUpdates {
b.log.Info("notified of traversed L2 state", "l2_etl_block_number", b.l2Etl.LatestHeader.Number) b.log.Info("notified of traversed L2 state", "l2_etl_block_number", latestHeader.Number)
if err := b.onL2Data(b.l2Etl.LatestHeader); err != nil { if err := b.onL2Data(latestHeader); err != nil {
b.log.Error("failed l2 bridge processing interval", "err", err) b.log.Error("failed l2 bridge processing interval", "err", err)
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment