Commit cd3713e3 authored by Hamdi Allam's avatar Hamdi Allam Committed by GitHub

Merge pull request #7066 from ethereum-optimism/indexer.conf.depth

feat(indexer): configurable confirmation depth when traversing headers
parents 2f3fc92d b29eb9fc
...@@ -2,7 +2,6 @@ package config ...@@ -2,7 +2,6 @@ package config
import ( import (
"fmt" "fmt"
"math/big"
"os" "os"
"reflect" "reflect"
...@@ -66,10 +65,15 @@ func (c *L1Contracts) AsSlice() ([]common.Address, error) { ...@@ -66,10 +65,15 @@ func (c *L1Contracts) AsSlice() ([]common.Address, error) {
type ChainConfig struct { type ChainConfig struct {
// Configure known chains with the l2 chain id // Configure known chains with the l2 chain id
Preset int Preset int
L1Contracts L1Contracts `toml:"l1-contracts"` L1Contracts L1Contracts `toml:"l1-contracts"`
// L1StartingHeight is the block height to start indexing from
L1StartingHeight uint `toml:"l1-starting-height"` L1StartingHeight uint `toml:"l1-starting-height"`
// These configuration options will be removed once
// native reorg handling is implemented
L1ConfirmationDepth uint `toml:"l1-confirmation-depth"`
L2ConfirmationDepth uint `toml:"l2-confirmation-depth"`
L1PollingInterval uint `toml:"l1-polling-interval"` L1PollingInterval uint `toml:"l1-polling-interval"`
L2PollingInterval uint `toml:"l2-polling-interval"` L2PollingInterval uint `toml:"l2-polling-interval"`
...@@ -77,11 +81,6 @@ type ChainConfig struct { ...@@ -77,11 +81,6 @@ type ChainConfig struct {
L2HeaderBufferSize uint `toml:"l2-header-buffer-size"` L2HeaderBufferSize uint `toml:"l2-header-buffer-size"`
} }
// L1StartHeight returns the block height to start indexing from
func (cc *ChainConfig) L1StartHeight() *big.Int {
return big.NewInt(int64(cc.L1StartingHeight))
}
// RPCsConfig configures the RPC urls // RPCsConfig configures the RPC urls
type RPCsConfig struct { type RPCsConfig struct {
L1RPC string `toml:"l1-rpc"` L1RPC string `toml:"l1-rpc"`
......
...@@ -71,8 +71,10 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -71,8 +71,10 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
L2RPC: opSys.EthInstances["sequencer"].HTTPEndpoint(), L2RPC: opSys.EthInstances["sequencer"].HTTPEndpoint(),
}, },
Chain: config.ChainConfig{ Chain: config.ChainConfig{
L1PollingInterval: 1000, L1PollingInterval: uint(opCfg.DeployConfig.L1BlockTime) * 1000,
L2PollingInterval: 1000, L1ConfirmationDepth: 0,
L2PollingInterval: uint(opCfg.DeployConfig.L2BlockTime) * 1000,
L2ConfirmationDepth: 0,
L1Contracts: config.L1Contracts{ L1Contracts: config.L1Contracts{
OptimismPortalProxy: opCfg.L1Deployments.OptimismPortalProxy, OptimismPortalProxy: opCfg.L1Deployments.OptimismPortalProxy,
L2OutputOracleProxy: opCfg.L1Deployments.L2OutputOracleProxy, L2OutputOracleProxy: opCfg.L1Deployments.L2OutputOracleProxy,
......
...@@ -16,7 +16,9 @@ import ( ...@@ -16,7 +16,9 @@ import (
type Config struct { type Config struct {
LoopIntervalMsec uint LoopIntervalMsec uint
HeaderBufferSize uint HeaderBufferSize uint
StartHeight *big.Int StartHeight *big.Int
ConfirmationDepth *big.Int
} }
type ETL struct { type ETL struct {
......
...@@ -65,7 +65,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -65,7 +65,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
log: log, log: log,
metrics: metrics, metrics: metrics,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader, cfg.ConfirmationDepth),
ethClient: client, ethClient: client,
contracts: cSlice, contracts: cSlice,
etlBatches: etlBatches, etlBatches: etlBatches,
......
...@@ -49,7 +49,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -49,7 +49,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
log: log, log: log,
metrics: metrics, metrics: metrics,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader, cfg.ConfirmationDepth),
ethClient: client, ethClient: client,
contracts: l2Contracts, contracts: l2Contracts,
etlBatches: etlBatches, etlBatches: etlBatches,
......
...@@ -3,6 +3,7 @@ package indexer ...@@ -3,6 +3,7 @@ package indexer
import ( import (
"context" "context"
"fmt" "fmt"
"math/big"
"runtime/debug" "runtime/debug"
"sync" "sync"
...@@ -28,7 +29,6 @@ type Indexer struct { ...@@ -28,7 +29,6 @@ type Indexer struct {
L1ETL *etl.L1ETL L1ETL *etl.L1ETL
L2ETL *etl.L2ETL L2ETL *etl.L2ETL
BridgeProcessor *processors.BridgeProcessor BridgeProcessor *processors.BridgeProcessor
} }
...@@ -41,7 +41,12 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf ...@@ -41,7 +41,12 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
if err != nil { if err != nil {
return nil, err return nil, err
} }
l1Cfg := etl.Config{LoopIntervalMsec: chainConfig.L1PollingInterval, HeaderBufferSize: chainConfig.L1HeaderBufferSize, StartHeight: chainConfig.L1StartHeight()} l1Cfg := etl.Config{
LoopIntervalMsec: chainConfig.L1PollingInterval,
HeaderBufferSize: chainConfig.L1HeaderBufferSize,
ConfirmationDepth: big.NewInt(int64(chainConfig.L1ConfirmationDepth)),
StartHeight: big.NewInt(int64(chainConfig.L1StartingHeight)),
}
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, etl.NewMetrics(metricsRegistry, "l1"), l1EthClient, chainConfig.L1Contracts) l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, etl.NewMetrics(metricsRegistry, "l1"), l1EthClient, chainConfig.L1Contracts)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -52,7 +57,11 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf ...@@ -52,7 +57,11 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
if err != nil { if err != nil {
return nil, err return nil, err
} }
l2Cfg := etl.Config{LoopIntervalMsec: chainConfig.L2PollingInterval, HeaderBufferSize: chainConfig.L2HeaderBufferSize} l2Cfg := etl.Config{
LoopIntervalMsec: chainConfig.L2PollingInterval,
HeaderBufferSize: chainConfig.L2HeaderBufferSize,
ConfirmationDepth: big.NewInt(int64(chainConfig.L2ConfirmationDepth)),
}
l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, etl.NewMetrics(metricsRegistry, "l2"), l2EthClient) l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, etl.NewMetrics(metricsRegistry, "l2"), l2EthClient)
if err != nil { if err != nil {
return nil, err return nil, err
......
# Chain configures l1 chain addresses # Chain configures l1 chain addresses
# Can configure them manually or use a preset l2 ChainId for known chains including OP Mainnet, OP Goerli, Base, Base Goerli, Zora, and Zora goerli # Can configure them manually or use a preset l2 ChainId for known chains including OP Mainnet, OP Goerli, Base, Base Goerli, Zora, and Zora goerli
[chain] [chain]
# OP Goerli
preset = 420
# L1 Config
l1-polling-interval = 0 l1-polling-interval = 0
l1-header-buffer-size = 0 l1-header-buffer-size = 0
l1-confirmation-depth = 0
l1-starting-height = 0
# L2 Config
l2-polling-interval = 0 l2-polling-interval = 0
l2-header-buffer-size = 0 l2-header-buffer-size = 0
l2-confirmation-depth = 0
# OP Goerli
preset = 420
l1-starting-height = 0
[rpcs] [rpcs]
l1-rpc = "${INDEXER_RPC_URL_L1}" l1-rpc = "${INDEXER_RPC_URL_L1}"
......
...@@ -25,8 +25,6 @@ const ( ...@@ -25,8 +25,6 @@ const (
) )
type EthClient interface { type EthClient interface {
FinalizedBlockHeight() (*big.Int, error)
BlockHeaderByNumber(*big.Int) (*types.Header, error) BlockHeaderByNumber(*big.Int) (*types.Header, error)
BlockHeaderByHash(common.Hash) (*types.Header, error) BlockHeaderByHash(common.Hash) (*types.Header, error)
BlockHeadersByRange(*big.Int, *big.Int) ([]types.Header, error) BlockHeadersByRange(*big.Int, *big.Int) ([]types.Header, error)
...@@ -52,24 +50,6 @@ func DialEthClient(rpcUrl string, metrics Metricer) (EthClient, error) { ...@@ -52,24 +50,6 @@ func DialEthClient(rpcUrl string, metrics Metricer) (EthClient, error) {
return client, nil return client, nil
} }
// FinalizedBlockHeight retrieves the latest block height in a finalized state
func (c *client) FinalizedBlockHeight() (*big.Int, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
// **NOTE** Local devnet is having issues with the "finalized" block tag. Temp switch
// to "latest" to iterate faster locally but this needs to be updated
var header *types.Header
err := c.rpc.CallContext(ctxwt, &header, "eth_getBlockByNumber", "latest", false)
if err != nil {
return nil, err
} else if header == nil {
return nil, ethereum.NotFound
}
return header.Number, nil
}
// BlockHeaderByHash retrieves the block header attributed to the supplied hash // BlockHeaderByHash retrieves the block header attributed to the supplied hash
func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) { func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
......
...@@ -15,14 +15,15 @@ var ( ...@@ -15,14 +15,15 @@ var (
type HeaderTraversal struct { type HeaderTraversal struct {
ethClient EthClient ethClient EthClient
lastHeader *types.Header lastHeader *types.Header
blockConfirmationDepth *big.Int
} }
// NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client. // NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client.
// The HeaderTraversal will start fetching blocks starting from the supplied header unless // The HeaderTraversal will start fetching blocks starting from the supplied header unless nil, indicating genesis.
// nil, indicating genesis. func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header, confDepth *big.Int) *HeaderTraversal {
func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header) *HeaderTraversal { return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader, blockConfirmationDepth: confDepth}
return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader}
} }
// LastHeader returns the last header that was fetched by the HeaderTraversal // LastHeader returns the last header that was fetched by the HeaderTraversal
...@@ -34,13 +35,19 @@ func (f *HeaderTraversal) LastHeader() *types.Header { ...@@ -34,13 +35,19 @@ func (f *HeaderTraversal) LastHeader() *types.Header {
// NextFinalizedHeaders retrives the next set of headers that have been // NextFinalizedHeaders retrives the next set of headers that have been
// marked as finalized by the connected client, bounded by the supplied size // marked as finalized by the connected client, bounded by the supplied size
func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) { func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) {
finalizedBlockHeight, err := f.ethClient.FinalizedBlockHeight() latestBlockHeader, err := f.ethClient.BlockHeaderByNumber(nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to query latest finalized height: %w", err) return nil, fmt.Errorf("unable to query latest block: %w", err)
}
endHeight := new(big.Int).Sub(latestBlockHeader.Number, f.blockConfirmationDepth)
if endHeight.Sign() < 0 {
// No blocks with the provided confirmation depth available
return nil, nil
} }
if f.lastHeader != nil { if f.lastHeader != nil {
cmp := f.lastHeader.Number.Cmp(finalizedBlockHeight) cmp := f.lastHeader.Number.Cmp(endHeight)
if cmp == 0 { if cmp == 0 {
return nil, nil return nil, nil
} else if cmp > 0 { } else if cmp > 0 {
...@@ -53,7 +60,7 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, ...@@ -53,7 +60,7 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
nextHeight = new(big.Int).Add(f.lastHeader.Number, bigOne) nextHeight = new(big.Int).Add(f.lastHeader.Number, bigOne)
} }
endHeight := clampBigInt(nextHeight, finalizedBlockHeight, maxSize) endHeight = clampBigInt(nextHeight, endHeight, maxSize)
headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight) headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight)
if err != nil { if err != nil {
return nil, fmt.Errorf("error querying blocks by range: %w", err) return nil, fmt.Errorf("error querying blocks by range: %w", err)
......
...@@ -37,10 +37,10 @@ func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) { ...@@ -37,10 +37,10 @@ func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) {
// start from block 10 as the latest fetched block // start from block 10 as the latest fetched block
lastHeader := &types.Header{Number: big.NewInt(10)} lastHeader := &types.Header{Number: big.NewInt(10)}
headerTraversal := NewHeaderTraversal(client, lastHeader) headerTraversal := NewHeaderTraversal(client, lastHeader, bigZero)
// no new headers when matched with head // no new headers when matched with head
client.On("FinalizedBlockHeight").Return(big.NewInt(10), nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(lastHeader, nil)
headers, err := headerTraversal.NextFinalizedHeaders(100) headers, err := headerTraversal.NextFinalizedHeaders(100)
require.NoError(t, err) require.NoError(t, err)
require.Empty(t, headers) require.Empty(t, headers)
...@@ -50,11 +50,11 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) { ...@@ -50,11 +50,11 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
headerTraversal := NewHeaderTraversal(client, nil) headerTraversal := NewHeaderTraversal(client, nil, bigZero)
// blocks [0..4] // blocks [0..4]
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(0)), mock.MatchedBy(BigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(0)), mock.MatchedBy(BigIntMatcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5) headers, err := headerTraversal.NextFinalizedHeaders(5)
require.NoError(t, err) require.NoError(t, err)
...@@ -62,7 +62,7 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) { ...@@ -62,7 +62,7 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
// blocks [5..9] // blocks [5..9]
headers = makeHeaders(5, &headers[len(headers)-1]) headers = makeHeaders(5, &headers[len(headers)-1])
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil)
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(5)), mock.MatchedBy(BigIntMatcher(9))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(5)), mock.MatchedBy(BigIntMatcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5) headers, err = headerTraversal.NextFinalizedHeaders(5)
require.NoError(t, err) require.NoError(t, err)
...@@ -73,10 +73,10 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) { ...@@ -73,10 +73,10 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
headerTraversal := NewHeaderTraversal(client, nil) headerTraversal := NewHeaderTraversal(client, nil, bigZero)
// 100 "available" headers // 100 "available" headers
client.On("FinalizedBlockHeight").Return(big.NewInt(100), nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(100)}, nil)
// clamped by the supplied size // clamped by the supplied size
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
...@@ -97,11 +97,11 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) { ...@@ -97,11 +97,11 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
headerTraversal := NewHeaderTraversal(client, nil) headerTraversal := NewHeaderTraversal(client, nil, bigZero)
// blocks [0..4] // blocks [0..4]
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(0)), mock.MatchedBy(BigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(0)), mock.MatchedBy(BigIntMatcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5) headers, err := headerTraversal.NextFinalizedHeaders(5)
require.NoError(t, err) require.NoError(t, err)
...@@ -109,7 +109,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) { ...@@ -109,7 +109,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
// blocks [5..9]. Next batch is not chained correctly (starts again from genesis) // blocks [5..9]. Next batch is not chained correctly (starts again from genesis)
headers = makeHeaders(5, nil) headers = makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(9)}, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(5)), mock.MatchedBy(BigIntMatcher(9))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(5)), mock.MatchedBy(BigIntMatcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5) headers, err = headerTraversal.NextFinalizedHeaders(5)
require.Nil(t, headers) require.Nil(t, headers)
......
...@@ -20,9 +20,9 @@ func (m *MockEthClient) BlockHeaderByNumber(number *big.Int) (*types.Header, err ...@@ -20,9 +20,9 @@ func (m *MockEthClient) BlockHeaderByNumber(number *big.Int) (*types.Header, err
return args.Get(0).(*types.Header), args.Error(1) return args.Get(0).(*types.Header), args.Error(1)
} }
func (m *MockEthClient) FinalizedBlockHeight() (*big.Int, error) { func (m *MockEthClient) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
args := m.Called() args := m.Called(hash)
return args.Get(0).(*big.Int), args.Error(1) return args.Get(0).(*types.Header), args.Error(1)
} }
func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]types.Header, error) { func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]types.Header, error) {
...@@ -30,11 +30,6 @@ func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]types.Header, ...@@ -30,11 +30,6 @@ func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]types.Header,
return args.Get(0).([]types.Header), args.Error(1) return args.Get(0).([]types.Header), args.Error(1)
} }
func (m *MockEthClient) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
args := m.Called(hash)
return args.Get(0).(*types.Header), args.Error(1)
}
func (m *MockEthClient) StorageHash(address common.Address, blockNumber *big.Int) (common.Hash, error) { func (m *MockEthClient) StorageHash(address common.Address, blockNumber *big.Int) (common.Hash, error) {
args := m.Called(address, blockNumber) args := m.Called(address, blockNumber)
return args.Get(0).(common.Hash), args.Error(1) return args.Get(0).(common.Hash), args.Error(1)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment