Commit b563765a authored by Ethen Pociask's avatar Ethen Pociask

[indexer.configurable_polling] Configurable Polling Variables for Indexer

parent 16b00d32
...@@ -11,6 +11,12 @@ import ( ...@@ -11,6 +11,12 @@ import (
geth_log "github.com/ethereum/go-ethereum/log" geth_log "github.com/ethereum/go-ethereum/log"
) )
const (
// default to 5 seconds
defaultLoopInterval = 5000
defaultHeaderBufferSize = 500
)
// in future presets can just be onchain config and fetched on initialization // in future presets can just be onchain config and fetched on initialization
// Config represents the `indexer.toml` file used to configure the indexer // Config represents the `indexer.toml` file used to configure the indexer
...@@ -59,13 +65,19 @@ func (c *L1Contracts) AsSlice() ([]common.Address, error) { ...@@ -59,13 +65,19 @@ func (c *L1Contracts) AsSlice() ([]common.Address, error) {
// ChainConfig configures of the chain being indexed // ChainConfig configures of the chain being indexed
type ChainConfig struct { type ChainConfig struct {
// Configure known chains with the l2 chain id // Configure known chains with the l2 chain id
// NOTE - This currently performs no lookups to extract known L1 contracts by l2 chain id
Preset int Preset int
L1Contracts L1Contracts `toml:"l1-contracts"` L1Contracts L1Contracts `toml:"l1-contracts"`
// L1StartingHeight is the block height to start indexing from // L1StartingHeight is the block height to start indexing from
L1StartingHeight uint `toml:"l1-starting-height"` L1StartingHeight uint `toml:"l1-starting-height"`
L1PollingInterval uint `toml:"l1-polling-interval"`
L2PollingInterval uint `toml:"l2-polling-interval"`
L1HeaderBufferSize uint `toml:"l1-header-buffer-size"`
L2HeaderBufferSize uint `toml:"l2-header-buffer-size"`
} }
// L1StartHeight returns the block height to start indexing from
func (cc *ChainConfig) L1StartHeight() *big.Int { func (cc *ChainConfig) L1StartHeight() *big.Int {
return big.NewInt(int64(cc.L1StartingHeight)) return big.NewInt(int64(cc.L1StartingHeight))
} }
...@@ -123,6 +135,27 @@ func LoadConfig(logger geth_log.Logger, path string) (Config, error) { ...@@ -123,6 +135,27 @@ func LoadConfig(logger geth_log.Logger, path string) (Config, error) {
} }
} }
// Set polling defaults if not set
if conf.Chain.L1PollingInterval == 0 {
logger.Info("setting default L1 polling interval", "interval", defaultLoopInterval)
conf.Chain.L1PollingInterval = defaultLoopInterval
}
if conf.Chain.L2PollingInterval == 0 {
logger.Info("setting default L2 polling interval", "interval", defaultLoopInterval)
conf.Chain.L2PollingInterval = defaultLoopInterval
}
if conf.Chain.L1HeaderBufferSize == 0 {
logger.Info("setting default L1 header buffer", "size", defaultHeaderBufferSize)
conf.Chain.L1HeaderBufferSize = defaultHeaderBufferSize
}
if conf.Chain.L2HeaderBufferSize == 0 {
logger.Info("setting default L2 header buffer", "size", defaultHeaderBufferSize)
conf.Chain.L2HeaderBufferSize = defaultHeaderBufferSize
}
logger.Info("loaded config") logger.Info("loaded config")
return conf, nil return conf, nil
} }
...@@ -79,6 +79,7 @@ func TestLoadConfig_WithoutPreset(t *testing.T) { ...@@ -79,6 +79,7 @@ func TestLoadConfig_WithoutPreset(t *testing.T) {
testData := ` testData := `
[chain] [chain]
[chain.l1-contracts] [chain.l1-contracts]
optimism-portal = "0x4205Fc579115071764c7423A4f12eDde41f106Ed" optimism-portal = "0x4205Fc579115071764c7423A4f12eDde41f106Ed"
l2-output-oracle = "0x42097868233d1aa22e815a266982f2cf17685a27" l2-output-oracle = "0x42097868233d1aa22e815a266982f2cf17685a27"
...@@ -102,11 +103,18 @@ func TestLoadConfig_WithoutPreset(t *testing.T) { ...@@ -102,11 +103,18 @@ func TestLoadConfig_WithoutPreset(t *testing.T) {
conf, err := LoadConfig(logger, tmpfile.Name()) conf, err := LoadConfig(logger, tmpfile.Name())
require.NoError(t, err) require.NoError(t, err)
// Enforce default values
require.Equal(t, conf.Chain.L1Contracts.OptimismPortalProxy.String(), common.HexToAddress("0x4205Fc579115071764c7423A4f12eDde41f106Ed").String()) require.Equal(t, conf.Chain.L1Contracts.OptimismPortalProxy.String(), common.HexToAddress("0x4205Fc579115071764c7423A4f12eDde41f106Ed").String())
require.Equal(t, conf.Chain.L1Contracts.L2OutputOracleProxy.String(), common.HexToAddress("0x42097868233d1aa22e815a266982f2cf17685a27").String()) require.Equal(t, conf.Chain.L1Contracts.L2OutputOracleProxy.String(), common.HexToAddress("0x42097868233d1aa22e815a266982f2cf17685a27").String())
require.Equal(t, conf.Chain.L1Contracts.L1CrossDomainMessengerProxy.String(), common.HexToAddress("0x420ce71c97B33Cc4729CF772ae268934F7ab5fA1").String()) require.Equal(t, conf.Chain.L1Contracts.L1CrossDomainMessengerProxy.String(), common.HexToAddress("0x420ce71c97B33Cc4729CF772ae268934F7ab5fA1").String())
require.Equal(t, conf.Chain.L1Contracts.L1StandardBridgeProxy.String(), common.HexToAddress("0x4209fc46f92E8a1c0deC1b1747d010903E884bE1").String()) require.Equal(t, conf.Chain.L1Contracts.L1StandardBridgeProxy.String(), common.HexToAddress("0x4209fc46f92E8a1c0deC1b1747d010903E884bE1").String())
require.Equal(t, conf.Chain.Preset, 0) require.Equal(t, conf.Chain.Preset, 0)
// Enforce polling default values
require.Equal(t, conf.Chain.L1PollingInterval, uint(5000))
require.Equal(t, conf.Chain.L2PollingInterval, uint(5000))
require.Equal(t, conf.Chain.L1HeaderBufferSize, uint(500))
require.Equal(t, conf.Chain.L2HeaderBufferSize, uint(500))
} }
func TestLoadConfig_WithUnknownPreset(t *testing.T) { func TestLoadConfig_WithUnknownPreset(t *testing.T) {
...@@ -140,6 +148,37 @@ func TestLoadConfig_WithUnknownPreset(t *testing.T) { ...@@ -140,6 +148,37 @@ func TestLoadConfig_WithUnknownPreset(t *testing.T) {
require.Equal(t, fmt.Sprintf("unknown preset: %d", faultyPreset), err.Error()) require.Equal(t, fmt.Sprintf("unknown preset: %d", faultyPreset), err.Error())
} }
func Test_LoadConfig_PollingValues(t *testing.T) {
tmpfile, err := os.CreateTemp("", "test_user_values.toml")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
testData := `
[chain]
l1-polling-interval = 1000
l2-polling-interval = 1005
l1-header-buffer-size = 100
l2-header-buffer-size = 105`
data := []byte(testData)
err = os.WriteFile(tmpfile.Name(), data, 0644)
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
err = tmpfile.Close()
require.NoError(t, err)
logger := testlog.Logger(t, log.LvlInfo)
conf, err := LoadConfig(logger, tmpfile.Name())
require.NoError(t, err)
require.Equal(t, conf.Chain.L1PollingInterval, uint(1000))
require.Equal(t, conf.Chain.L2PollingInterval, uint(1005))
require.Equal(t, conf.Chain.L1HeaderBufferSize, uint(100))
require.Equal(t, conf.Chain.L2HeaderBufferSize, uint(105))
}
func Test_AsSliceSuccess(t *testing.T) { func Test_AsSliceSuccess(t *testing.T) {
// error cases are intentionally ignored for testing since they can only be // error cases are intentionally ignored for testing since they can only be
// generated when the L1Contracts struct is developer modified to hold a non-address var field // generated when the L1Contracts struct is developer modified to hold a non-address var field
......
...@@ -3,6 +3,7 @@ package etl ...@@ -3,6 +3,7 @@ package etl
import ( import (
"context" "context"
"errors" "errors"
"math/big"
"time" "time"
"github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum-optimism/optimism/indexer/node"
...@@ -13,16 +14,16 @@ import ( ...@@ -13,16 +14,16 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
const ( type Config struct {
// NOTE - These values can be made configurable to allow for more fine grained control LoopInterval time.Duration
// Additionally a default interval of 5 seconds may be too slow for reading L2 blocks provided HeaderBufferSize uint64
// the current rate of L2 block production on OP Stack chains (2 seconds per block) StartHeight *big.Int
defaultLoopInterval = 5 * time.Second }
defaultHeaderBufferSize = 500
)
type ETL struct { type ETL struct {
log log.Logger log log.Logger
loopInterval time.Duration
headerBufferSize uint64
headerTraversal *node.HeaderTraversal headerTraversal *node.HeaderTraversal
ethClient *ethclient.Client ethClient *ethclient.Client
...@@ -43,7 +44,7 @@ type ETLBatch struct { ...@@ -43,7 +44,7 @@ type ETLBatch struct {
func (etl *ETL) Start(ctx context.Context) error { func (etl *ETL) Start(ctx context.Context) error {
done := ctx.Done() done := ctx.Done()
pollTicker := time.NewTicker(defaultLoopInterval) pollTicker := time.NewTicker(etl.loopInterval)
defer pollTicker.Stop() defer pollTicker.Stop()
etl.log.Info("starting etl...") etl.log.Info("starting etl...")
...@@ -56,7 +57,7 @@ func (etl *ETL) Start(ctx context.Context) error { ...@@ -56,7 +57,7 @@ func (etl *ETL) Start(ctx context.Context) error {
case <-pollTicker.C: case <-pollTicker.C:
if len(headers) == 0 { if len(headers) == 0 {
newHeaders, err := etl.headerTraversal.NextFinalizedHeaders(defaultHeaderBufferSize) newHeaders, err := etl.headerTraversal.NextFinalizedHeaders(etl.headerBufferSize)
if err != nil { if err != nil {
etl.log.Error("error querying for headers", "err", err) etl.log.Error("error querying for headers", "err", err)
continue continue
......
...@@ -3,7 +3,6 @@ package etl ...@@ -3,7 +3,6 @@ package etl
import ( import (
"context" "context"
"fmt" "fmt"
"math/big"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
...@@ -21,8 +20,7 @@ type L1ETL struct { ...@@ -21,8 +20,7 @@ type L1ETL struct {
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points // NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height. // depending on the state of the database and the supplied start height.
func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeight *big.Int, func NewL1ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
contracts config.L1Contracts) (*L1ETL, error) {
log = log.New("etl", "l1") log = log.New("etl", "l1")
latestHeader, err := db.Blocks.L1LatestBlockHeader() latestHeader, err := db.Blocks.L1LatestBlockHeader()
...@@ -41,9 +39,9 @@ func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeigh ...@@ -41,9 +39,9 @@ func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeigh
log.Info("detected last indexed block", "number", latestHeader.Number, "hash", latestHeader.Hash) log.Info("detected last indexed block", "number", latestHeader.Number, "hash", latestHeader.Hash)
fromHeader = latestHeader.RLPHeader.Header() fromHeader = latestHeader.RLPHeader.Header()
} else if startHeight.BitLen() > 0 { } else if cfg.StartHeight.BitLen() > 0 {
log.Info("no indexed state in storage, starting from supplied L1 height", "height", startHeight.String()) log.Info("no indexed state in storage, starting from supplied L1 height", "height", cfg.StartHeight.String())
header, err := client.BlockHeaderByNumber(startHeight) header, err := client.BlockHeaderByNumber(cfg.StartHeight)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not fetch starting block header: %w", err) return nil, fmt.Errorf("could not fetch starting block header: %w", err)
} }
...@@ -58,6 +56,9 @@ func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeigh ...@@ -58,6 +56,9 @@ func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeigh
// will be able to keep up with the rate of incoming batches // will be able to keep up with the rate of incoming batches
etlBatches := make(chan ETLBatch) etlBatches := make(chan ETLBatch)
etl := ETL{ etl := ETL{
loopInterval: cfg.LoopInterval,
headerBufferSize: cfg.HeaderBufferSize,
log: log, log: log,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(), ethClient: client.GethEthClient(),
......
...@@ -98,8 +98,11 @@ func Test_L1ETL_Construction(t *testing.T) { ...@@ -98,8 +98,11 @@ func Test_L1ETL_Construction(t *testing.T) {
ts := test.construction() ts := test.construction()
logger := log.NewLogger(log.DefaultCLIConfig()) logger := log.NewLogger(log.DefaultCLIConfig())
cfg := &Config{
StartHeight: ts.start,
}
etl, err := NewL1ETL(logger, ts.db.DB, ts.client, ts.start, ts.contracts) etl, err := NewL1ETL(cfg, logger, ts.db.DB, ts.client, ts.contracts)
test.assertion(etl, err) test.assertion(etl, err)
}) })
} }
......
...@@ -18,7 +18,7 @@ type L2ETL struct { ...@@ -18,7 +18,7 @@ type L2ETL struct {
db *database.DB db *database.DB
} }
func NewL2ETL(log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, error) { func NewL2ETL(cfg *Config, log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, error) {
log = log.New("etl", "l2") log = log.New("etl", "l2")
// allow predeploys to be overridable // allow predeploys to be overridable
...@@ -43,6 +43,9 @@ func NewL2ETL(log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, e ...@@ -43,6 +43,9 @@ func NewL2ETL(log log.Logger, db *database.DB, client node.EthClient) (*L2ETL, e
etlBatches := make(chan ETLBatch) etlBatches := make(chan ETLBatch)
etl := ETL{ etl := ETL{
loopInterval: cfg.LoopInterval,
headerBufferSize: cfg.HeaderBufferSize,
log: log, log: log,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(), ethClient: client.GethEthClient(),
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"runtime/debug" "runtime/debug"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -34,7 +35,13 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co ...@@ -34,7 +35,13 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co
return nil, err return nil, err
} }
l1Etl, err := etl.NewL1ETL(logger, db, l1EthClient, chainConfig.L1StartHeight(), chainConfig.L1Contracts) l1Cfg := &etl.Config{
LoopInterval: time.Duration(chainConfig.L1PollingInterval) * time.Millisecond,
HeaderBufferSize: uint64(chainConfig.L1HeaderBufferSize),
StartHeight: chainConfig.L1StartHeight(),
}
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, l1EthClient, chainConfig.L1Contracts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -44,8 +51,13 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co ...@@ -44,8 +51,13 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co
return nil, err return nil, err
} }
l2Cfg := &etl.Config{
LoopInterval: time.Duration(chainConfig.L2PollingInterval) * time.Millisecond,
HeaderBufferSize: uint64(chainConfig.L2HeaderBufferSize),
}
// Currently defaults to the predeploys // Currently defaults to the predeploys
l2Etl, err := etl.NewL2ETL(logger, db, l2EthClient) l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, l2EthClient)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment