Commit f37a108c authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #7015 from epociask/indexer.configurable_polling

Configurable Polling Variables for Indexer
parents 96562692 4667823d
...@@ -6,6 +6,9 @@ ...@@ -6,6 +6,9 @@
### Setup env ### Setup env
The `indexer.toml` stores a set of preset environmental variables that can be used to run the indexer with the exception of the network specific `l1-rpc` and `l2-rpc` variables. The `indexer.toml` file can be ran as a default config, otherwise a custom `.toml` config can provided via the `--config` flag when running the application. An optional `l1-starting-height` value can be provided to the indexer to specify the L1 starting block height to begin indexing from. This should be ideally be an L1 block that holds a correlated L2 genesis commitment. Furthermore, this value must be less than the current L1 block height to pass validation. If no starting height value is provided and the database is empty, the indexer will begin sequentially processing from L1 genesis. The `indexer.toml` stores a set of preset environmental variables that can be used to run the indexer with the exception of the network specific `l1-rpc` and `l2-rpc` variables. The `indexer.toml` file can be ran as a default config, otherwise a custom `.toml` config can provided via the `--config` flag when running the application. An optional `l1-starting-height` value can be provided to the indexer to specify the L1 starting block height to begin indexing from. This should be ideally be an L1 block that holds a correlated L2 genesis commitment. Furthermore, this value must be less than the current L1 block height to pass validation. If no starting height value is provided and the database is empty, the indexer will begin sequentially processing from L1 genesis.
### Setup polling intervals
The indexer polls and processes batches from the L1 and L2 chains on a set interval/size. The default polling interval is 5 seconds for both chains with a default batch header size of 500. The polling frequency can be changed by setting the `l1-polling-interval` and `l2-polling-interval` values in the `indexer.toml` file. The batch header size can be changed by setting the `l1-batch-size` and `l2-batch-size` values in the `indexer.toml` file.
### Testing ### Testing
All tests can be ran by running `make test` from the `/indexer` directory. This will run all unit and e2e tests. All tests can be ran by running `make test` from the `/indexer` directory. This will run all unit and e2e tests.
......
...@@ -11,6 +11,12 @@ import ( ...@@ -11,6 +11,12 @@ import (
geth_log "github.com/ethereum/go-ethereum/log" geth_log "github.com/ethereum/go-ethereum/log"
) )
const (
// default to 5 seconds
defaultLoopInterval = 5000
defaultHeaderBufferSize = 500
)
// in future presets can just be onchain config and fetched on initialization // in future presets can just be onchain config and fetched on initialization
// Config represents the `indexer.toml` file used to configure the indexer // Config represents the `indexer.toml` file used to configure the indexer
...@@ -59,13 +65,19 @@ func (c *L1Contracts) AsSlice() ([]common.Address, error) { ...@@ -59,13 +65,19 @@ func (c *L1Contracts) AsSlice() ([]common.Address, error) {
// ChainConfig configures of the chain being indexed // ChainConfig configures of the chain being indexed
type ChainConfig struct { type ChainConfig struct {
// Configure known chains with the l2 chain id // Configure known chains with the l2 chain id
// NOTE - This currently performs no lookups to extract known L1 contracts by l2 chain id
Preset int Preset int
L1Contracts L1Contracts `toml:"l1-contracts"` L1Contracts L1Contracts `toml:"l1-contracts"`
// L1StartingHeight is the block height to start indexing from // L1StartingHeight is the block height to start indexing from
L1StartingHeight uint `toml:"l1-starting-height"` L1StartingHeight uint `toml:"l1-starting-height"`
L1PollingInterval uint `toml:"l1-polling-interval"`
L2PollingInterval uint `toml:"l2-polling-interval"`
L1HeaderBufferSize uint `toml:"l1-header-buffer-size"`
L2HeaderBufferSize uint `toml:"l2-header-buffer-size"`
} }
// L1StartHeight returns the block height to start indexing from
func (cc *ChainConfig) L1StartHeight() *big.Int { func (cc *ChainConfig) L1StartHeight() *big.Int {
return big.NewInt(int64(cc.L1StartingHeight)) return big.NewInt(int64(cc.L1StartingHeight))
} }
...@@ -123,6 +135,27 @@ func LoadConfig(logger geth_log.Logger, path string) (Config, error) { ...@@ -123,6 +135,27 @@ func LoadConfig(logger geth_log.Logger, path string) (Config, error) {
} }
} }
// Set polling defaults if not set
if conf.Chain.L1PollingInterval == 0 {
logger.Info("setting default L1 polling interval", "interval", defaultLoopInterval)
conf.Chain.L1PollingInterval = defaultLoopInterval
}
if conf.Chain.L2PollingInterval == 0 {
logger.Info("setting default L2 polling interval", "interval", defaultLoopInterval)
conf.Chain.L2PollingInterval = defaultLoopInterval
}
if conf.Chain.L1HeaderBufferSize == 0 {
logger.Info("setting default L1 header buffer", "size", defaultHeaderBufferSize)
conf.Chain.L1HeaderBufferSize = defaultHeaderBufferSize
}
if conf.Chain.L2HeaderBufferSize == 0 {
logger.Info("setting default L2 header buffer", "size", defaultHeaderBufferSize)
conf.Chain.L2HeaderBufferSize = defaultHeaderBufferSize
}
logger.Info("loaded config") logger.Info("loaded config")
return conf, nil return conf, nil
} }
...@@ -79,6 +79,7 @@ func TestLoadConfig_WithoutPreset(t *testing.T) { ...@@ -79,6 +79,7 @@ func TestLoadConfig_WithoutPreset(t *testing.T) {
testData := ` testData := `
[chain] [chain]
[chain.l1-contracts] [chain.l1-contracts]
optimism-portal = "0x4205Fc579115071764c7423A4f12eDde41f106Ed" optimism-portal = "0x4205Fc579115071764c7423A4f12eDde41f106Ed"
l2-output-oracle = "0x42097868233d1aa22e815a266982f2cf17685a27" l2-output-oracle = "0x42097868233d1aa22e815a266982f2cf17685a27"
...@@ -102,11 +103,18 @@ func TestLoadConfig_WithoutPreset(t *testing.T) { ...@@ -102,11 +103,18 @@ func TestLoadConfig_WithoutPreset(t *testing.T) {
conf, err := LoadConfig(logger, tmpfile.Name()) conf, err := LoadConfig(logger, tmpfile.Name())
require.NoError(t, err) require.NoError(t, err)
// Enforce default values
require.Equal(t, conf.Chain.L1Contracts.OptimismPortalProxy.String(), common.HexToAddress("0x4205Fc579115071764c7423A4f12eDde41f106Ed").String()) require.Equal(t, conf.Chain.L1Contracts.OptimismPortalProxy.String(), common.HexToAddress("0x4205Fc579115071764c7423A4f12eDde41f106Ed").String())
require.Equal(t, conf.Chain.L1Contracts.L2OutputOracleProxy.String(), common.HexToAddress("0x42097868233d1aa22e815a266982f2cf17685a27").String()) require.Equal(t, conf.Chain.L1Contracts.L2OutputOracleProxy.String(), common.HexToAddress("0x42097868233d1aa22e815a266982f2cf17685a27").String())
require.Equal(t, conf.Chain.L1Contracts.L1CrossDomainMessengerProxy.String(), common.HexToAddress("0x420ce71c97B33Cc4729CF772ae268934F7ab5fA1").String()) require.Equal(t, conf.Chain.L1Contracts.L1CrossDomainMessengerProxy.String(), common.HexToAddress("0x420ce71c97B33Cc4729CF772ae268934F7ab5fA1").String())
require.Equal(t, conf.Chain.L1Contracts.L1StandardBridgeProxy.String(), common.HexToAddress("0x4209fc46f92E8a1c0deC1b1747d010903E884bE1").String()) require.Equal(t, conf.Chain.L1Contracts.L1StandardBridgeProxy.String(), common.HexToAddress("0x4209fc46f92E8a1c0deC1b1747d010903E884bE1").String())
require.Equal(t, conf.Chain.Preset, 0) require.Equal(t, conf.Chain.Preset, 0)
// Enforce polling default values
require.Equal(t, conf.Chain.L1PollingInterval, uint(5000))
require.Equal(t, conf.Chain.L2PollingInterval, uint(5000))
require.Equal(t, conf.Chain.L1HeaderBufferSize, uint(500))
require.Equal(t, conf.Chain.L2HeaderBufferSize, uint(500))
} }
func TestLoadConfig_WithUnknownPreset(t *testing.T) { func TestLoadConfig_WithUnknownPreset(t *testing.T) {
...@@ -140,6 +148,37 @@ func TestLoadConfig_WithUnknownPreset(t *testing.T) { ...@@ -140,6 +148,37 @@ func TestLoadConfig_WithUnknownPreset(t *testing.T) {
require.Equal(t, fmt.Sprintf("unknown preset: %d", faultyPreset), err.Error()) require.Equal(t, fmt.Sprintf("unknown preset: %d", faultyPreset), err.Error())
} }
func Test_LoadConfig_PollingValues(t *testing.T) {
tmpfile, err := os.CreateTemp("", "test_user_values.toml")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
testData := `
[chain]
l1-polling-interval = 1000
l2-polling-interval = 1005
l1-header-buffer-size = 100
l2-header-buffer-size = 105`
data := []byte(testData)
err = os.WriteFile(tmpfile.Name(), data, 0644)
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
err = tmpfile.Close()
require.NoError(t, err)
logger := testlog.Logger(t, log.LvlInfo)
conf, err := LoadConfig(logger, tmpfile.Name())
require.NoError(t, err)
require.Equal(t, conf.Chain.L1PollingInterval, uint(1000))
require.Equal(t, conf.Chain.L2PollingInterval, uint(1005))
require.Equal(t, conf.Chain.L1HeaderBufferSize, uint(100))
require.Equal(t, conf.Chain.L2HeaderBufferSize, uint(105))
}
func Test_AsSliceSuccess(t *testing.T) { func Test_AsSliceSuccess(t *testing.T) {
// error cases are intentionally ignored for testing since they can only be // error cases are intentionally ignored for testing since they can only be
// generated when the L1Contracts struct is developer modified to hold a non-address var field // generated when the L1Contracts struct is developer modified to hold a non-address var field
......
...@@ -71,6 +71,8 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -71,6 +71,8 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
L2RPC: opSys.EthInstances["sequencer"].HTTPEndpoint(), L2RPC: opSys.EthInstances["sequencer"].HTTPEndpoint(),
}, },
Chain: config.ChainConfig{ Chain: config.ChainConfig{
L1PollingInterval: 1000,
L2PollingInterval: 1000,
L1Contracts: config.L1Contracts{ L1Contracts: config.L1Contracts{
OptimismPortalProxy: opCfg.L1Deployments.OptimismPortalProxy, OptimismPortalProxy: opCfg.L1Deployments.OptimismPortalProxy,
L2OutputOracleProxy: opCfg.L1Deployments.L2OutputOracleProxy, L2OutputOracleProxy: opCfg.L1Deployments.L2OutputOracleProxy,
......
...@@ -3,6 +3,7 @@ package etl ...@@ -3,6 +3,7 @@ package etl
import ( import (
"context" "context"
"errors" "errors"
"math/big"
"time" "time"
"github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum-optimism/optimism/indexer/node"
...@@ -13,16 +14,16 @@ import ( ...@@ -13,16 +14,16 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
const ( type Config struct {
// NOTE - These values can be made configurable to allow for more fine grained control LoopInterval time.Duration
// Additionally a default interval of 5 seconds may be too slow for reading L2 blocks provided HeaderBufferSize uint64
// the current rate of L2 block production on OP Stack chains (2 seconds per block) StartHeight *big.Int
defaultLoopInterval = 5 * time.Second }
defaultHeaderBufferSize = 500
)
type ETL struct { type ETL struct {
log log.Logger log log.Logger
loopInterval time.Duration
headerBufferSize uint64
headerTraversal *node.HeaderTraversal headerTraversal *node.HeaderTraversal
ethClient *ethclient.Client ethClient *ethclient.Client
...@@ -43,7 +44,7 @@ type ETLBatch struct { ...@@ -43,7 +44,7 @@ type ETLBatch struct {
func (etl *ETL) Start(ctx context.Context) error { func (etl *ETL) Start(ctx context.Context) error {
done := ctx.Done() done := ctx.Done()
pollTicker := time.NewTicker(defaultLoopInterval) pollTicker := time.NewTicker(etl.loopInterval)
defer pollTicker.Stop() defer pollTicker.Stop()
etl.log.Info("starting etl...") etl.log.Info("starting etl...")
...@@ -56,7 +57,7 @@ func (etl *ETL) Start(ctx context.Context) error { ...@@ -56,7 +57,7 @@ func (etl *ETL) Start(ctx context.Context) error {
case <-pollTicker.C: case <-pollTicker.C:
if len(headers) == 0 { if len(headers) == 0 {
newHeaders, err := etl.headerTraversal.NextFinalizedHeaders(defaultHeaderBufferSize) newHeaders, err := etl.headerTraversal.NextFinalizedHeaders(etl.headerBufferSize)
if err != nil { if err != nil {
etl.log.Error("error querying for headers", "err", err) etl.log.Error("error querying for headers", "err", err)
continue continue
......
...@@ -3,7 +3,6 @@ package etl ...@@ -3,7 +3,6 @@ package etl
import ( import (
"context" "context"
"fmt" "fmt"
"math/big"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
...@@ -21,8 +20,7 @@ type L1ETL struct { ...@@ -21,8 +20,7 @@ type L1ETL struct {
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points // NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height. // depending on the state of the database and the supplied start height.
func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeight *big.Int, func NewL1ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
contracts config.L1Contracts) (*L1ETL, error) {
log = log.New("etl", "l1") log = log.New("etl", "l1")
latestHeader, err := db.Blocks.L1LatestBlockHeader() latestHeader, err := db.Blocks.L1LatestBlockHeader()
...@@ -41,9 +39,9 @@ func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeigh ...@@ -41,9 +39,9 @@ func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeigh
log.Info("detected last indexed block", "number", latestHeader.Number, "hash", latestHeader.Hash) log.Info("detected last indexed block", "number", latestHeader.Number, "hash", latestHeader.Hash)
fromHeader = latestHeader.RLPHeader.Header() fromHeader = latestHeader.RLPHeader.Header()
} else if startHeight.BitLen() > 0 { } else if cfg.StartHeight.BitLen() > 0 {
log.Info("no indexed state in storage, starting from supplied L1 height", "height", startHeight.String()) log.Info("no indexed state in storage, starting from supplied L1 height", "height", cfg.StartHeight.String())
header, err := client.BlockHeaderByNumber(startHeight) header, err := client.BlockHeaderByNumber(cfg.StartHeight)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not fetch starting block header: %w", err) return nil, fmt.Errorf("could not fetch starting block header: %w", err)
} }
...@@ -58,6 +56,9 @@ func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeigh ...@@ -58,6 +56,9 @@ func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeigh
// will be able to keep up with the rate of incoming batches // will be able to keep up with the rate of incoming batches
etlBatches := make(chan ETLBatch) etlBatches := make(chan ETLBatch)
etl := ETL{ etl := ETL{
loopInterval: cfg.LoopInterval,
headerBufferSize: cfg.HeaderBufferSize,
log: log, log: log,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(), ethClient: client.GethEthClient(),
......
...@@ -98,8 +98,11 @@ func Test_L1ETL_Construction(t *testing.T) { ...@@ -98,8 +98,11 @@ func Test_L1ETL_Construction(t *testing.T) {
ts := test.construction() ts := test.construction()
logger := log.NewLogger(log.DefaultCLIConfig()) logger := log.NewLogger(log.DefaultCLIConfig())
cfg := &Config{
StartHeight: ts.start,
}
etl, err := NewL1ETL(logger, ts.db.DB, ts.client, ts.start, ts.contracts) etl, err := NewL1ETL(cfg, logger, ts.db.DB, ts.client, ts.contracts)
test.assertion(etl, err) test.assertion(etl, err)
}) })
} }
......
...@@ -18,7 +18,7 @@ type L2ETL struct { ...@@ -18,7 +18,7 @@ type L2ETL struct {
db *database.DB db *database.DB
} }
func NewL2ETL(log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, error) { func NewL2ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, error) {
log = log.New("etl", "l2") log = log.New("etl", "l2")
// allow predeploys to be overridable // allow predeploys to be overridable
...@@ -43,6 +43,9 @@ func NewL2ETL(log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, e ...@@ -43,6 +43,9 @@ func NewL2ETL(log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, e
etlBatches := make(chan ETLBatch) etlBatches := make(chan ETLBatch)
etl := ETL{ etl := ETL{
loopInterval: cfg.LoopInterval,
headerBufferSize: cfg.HeaderBufferSize,
log: log, log: log,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(), ethClient: client.GethEthClient(),
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"runtime/debug" "runtime/debug"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -34,7 +35,13 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co ...@@ -34,7 +35,13 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co
return nil, err return nil, err
} }
l1Etl, err := etl.NewL1ETL(logger, db, l1EthClient, chainConfig.L1StartHeight(), chainConfig.L1Contracts) l1Cfg := &etl.Config{
LoopInterval: time.Duration(chainConfig.L1PollingInterval) * time.Millisecond,
HeaderBufferSize: uint64(chainConfig.L1HeaderBufferSize),
StartHeight: chainConfig.L1StartHeight(),
}
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, l1EthClient, chainConfig.L1Contracts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -44,8 +51,13 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co ...@@ -44,8 +51,13 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co
return nil, err return nil, err
} }
l2Cfg := &etl.Config{
LoopInterval: time.Duration(chainConfig.L2PollingInterval) * time.Millisecond,
HeaderBufferSize: uint64(chainConfig.L2HeaderBufferSize),
}
// Currently defaults to the predeploys // Currently defaults to the predeploys
l2Etl, err := etl.NewL2ETL(logger, db, l2EthClient) l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, l2EthClient)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
# Chain configures l1 chain addresses # Chain configures l1 chain addresses
# Can configure them manually or use a preset l2 ChainId for known chains including OP Mainnet, OP Goerli, Base, Base Goerli, Zora, and Zora goerli # Can configure them manually or use a preset l2 ChainId for known chains including OP Mainnet, OP Goerli, Base, Base Goerli, Zora, and Zora goerli
[chain] [chain]
l1-polling-interval = 0
l1-header-buffer-size = 0
l2-polling-interval = 0
l2-header-buffer-size = 0
# OP Goerli # OP Goerli
preset = 420 preset = 420
l1-starting-height = 0 l1-starting-height = 0
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment