Commit 628357f3 authored by Hamdi Allam's avatar Hamdi Allam

confirmation depth

parent 73f2c2b3
...@@ -2,7 +2,6 @@ package config ...@@ -2,7 +2,6 @@ package config
import ( import (
"fmt" "fmt"
"math/big"
"os" "os"
"reflect" "reflect"
...@@ -65,10 +64,15 @@ func (c *L1Contracts) AsSlice() ([]common.Address, error) { ...@@ -65,10 +64,15 @@ func (c *L1Contracts) AsSlice() ([]common.Address, error) {
// ChainConfig configures of the chain being indexed // ChainConfig configures of the chain being indexed
type ChainConfig struct { type ChainConfig struct {
// Configure known chains with the l2 chain id // Configure known chains with the l2 chain id
Preset int Preset int
L1Contracts L1Contracts `toml:"l1-contracts"`
// L1StartingHeight is the block height to start indexing from L1Contracts L1Contracts `toml:"l1-contracts"`
L1StartingHeight uint `toml:"l1-starting-height"` L1StartingHeight uint `toml:"l1-starting-height"`
// These configuration options will be removed once
// native reorg handling is implemented
L1ConfirmationDepth uint `toml:"l1-confirmation-depth"`
L2ConfirmationDepth uint `toml:"l2-confirmation-depth"`
L1PollingInterval uint `toml:"l1-polling-interval"` L1PollingInterval uint `toml:"l1-polling-interval"`
L2PollingInterval uint `toml:"l2-polling-interval"` L2PollingInterval uint `toml:"l2-polling-interval"`
...@@ -77,11 +81,6 @@ type ChainConfig struct { ...@@ -77,11 +81,6 @@ type ChainConfig struct {
L2HeaderBufferSize uint `toml:"l2-header-buffer-size"` L2HeaderBufferSize uint `toml:"l2-header-buffer-size"`
} }
// L1StartHeight returns the block height to start indexing from
func (cc *ChainConfig) L1StartHeight() *big.Int {
return big.NewInt(int64(cc.L1StartingHeight))
}
// RPCsConfig configures the RPC urls // RPCsConfig configures the RPC urls
type RPCsConfig struct { type RPCsConfig struct {
L1RPC string `toml:"l1-rpc"` L1RPC string `toml:"l1-rpc"`
......
...@@ -71,8 +71,10 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -71,8 +71,10 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
L2RPC: opSys.EthInstances["sequencer"].HTTPEndpoint(), L2RPC: opSys.EthInstances["sequencer"].HTTPEndpoint(),
}, },
Chain: config.ChainConfig{ Chain: config.ChainConfig{
L1PollingInterval: 1000, L1PollingInterval: uint(opCfg.DeployConfig.L1BlockTime),
L2PollingInterval: 1000, L1ConfirmationDepth: 0,
L2PollingInterval: uint(opCfg.DeployConfig.L2BlockTime),
L2ConfirmationDepth: 0,
L1Contracts: config.L1Contracts{ L1Contracts: config.L1Contracts{
OptimismPortalProxy: opCfg.L1Deployments.OptimismPortalProxy, OptimismPortalProxy: opCfg.L1Deployments.OptimismPortalProxy,
L2OutputOracleProxy: opCfg.L1Deployments.L2OutputOracleProxy, L2OutputOracleProxy: opCfg.L1Deployments.L2OutputOracleProxy,
......
...@@ -16,7 +16,9 @@ import ( ...@@ -16,7 +16,9 @@ import (
type Config struct { type Config struct {
LoopIntervalMsec uint LoopIntervalMsec uint
HeaderBufferSize uint HeaderBufferSize uint
StartHeight *big.Int
StartHeight *big.Int
ConfirmationDepth *big.Int
} }
type ETL struct { type ETL struct {
......
...@@ -62,7 +62,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -62,7 +62,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
log: log, log: log,
metrics: metrics, metrics: metrics,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader, cfg.ConfirmationDepth),
ethClient: client, ethClient: client,
contracts: cSlice, contracts: cSlice,
etlBatches: etlBatches, etlBatches: etlBatches,
......
...@@ -49,7 +49,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -49,7 +49,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
log: log, log: log,
metrics: metrics, metrics: metrics,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader, cfg.ConfirmationDepth),
ethClient: client, ethClient: client,
contracts: l2Contracts, contracts: l2Contracts,
etlBatches: etlBatches, etlBatches: etlBatches,
......
...@@ -3,6 +3,7 @@ package indexer ...@@ -3,6 +3,7 @@ package indexer
import ( import (
"context" "context"
"fmt" "fmt"
"math/big"
"runtime/debug" "runtime/debug"
"sync" "sync"
...@@ -26,9 +27,8 @@ type Indexer struct { ...@@ -26,9 +27,8 @@ type Indexer struct {
metricsConfig config.MetricsConfig metricsConfig config.MetricsConfig
metricsRegistry *prometheus.Registry metricsRegistry *prometheus.Registry
L1ETL *etl.L1ETL L1ETL *etl.L1ETL
L2ETL *etl.L2ETL L2ETL *etl.L2ETL
BridgeProcessor *processors.BridgeProcessor BridgeProcessor *processors.BridgeProcessor
} }
...@@ -41,7 +41,12 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf ...@@ -41,7 +41,12 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
if err != nil { if err != nil {
return nil, err return nil, err
} }
l1Cfg := etl.Config{LoopIntervalMsec: chainConfig.L1PollingInterval, HeaderBufferSize: chainConfig.L1HeaderBufferSize, StartHeight: chainConfig.L1StartHeight()} l1Cfg := etl.Config{
LoopIntervalMsec: chainConfig.L1PollingInterval,
HeaderBufferSize: chainConfig.L1HeaderBufferSize,
ConfirmationDepth: big.NewInt(int64(chainConfig.L1ConfirmationDepth)),
StartHeight: big.NewInt(int64(chainConfig.L1StartingHeight)),
}
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, etl.NewMetrics(metricsRegistry, "l1"), l1EthClient, chainConfig.L1Contracts) l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, etl.NewMetrics(metricsRegistry, "l1"), l1EthClient, chainConfig.L1Contracts)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -52,7 +57,11 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf ...@@ -52,7 +57,11 @@ func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConf
if err != nil { if err != nil {
return nil, err return nil, err
} }
l2Cfg := etl.Config{LoopIntervalMsec: chainConfig.L2PollingInterval, HeaderBufferSize: chainConfig.L2HeaderBufferSize} l2Cfg := etl.Config{
LoopIntervalMsec: chainConfig.L2PollingInterval,
HeaderBufferSize: chainConfig.L2HeaderBufferSize,
ConfirmationDepth: big.NewInt(int64(chainConfig.L2ConfirmationDepth)),
}
l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, etl.NewMetrics(metricsRegistry, "l2"), l2EthClient) l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, etl.NewMetrics(metricsRegistry, "l2"), l2EthClient)
if err != nil { if err != nil {
return nil, err return nil, err
......
...@@ -25,8 +25,6 @@ const ( ...@@ -25,8 +25,6 @@ const (
) )
type EthClient interface { type EthClient interface {
FinalizedBlockHeight() (*big.Int, error)
BlockHeaderByNumber(*big.Int) (*types.Header, error) BlockHeaderByNumber(*big.Int) (*types.Header, error)
BlockHeaderByHash(common.Hash) (*types.Header, error) BlockHeaderByHash(common.Hash) (*types.Header, error)
BlockHeadersByRange(*big.Int, *big.Int) ([]types.Header, error) BlockHeadersByRange(*big.Int, *big.Int) ([]types.Header, error)
...@@ -52,24 +50,6 @@ func DialEthClient(rpcUrl string, metrics Metricer) (EthClient, error) { ...@@ -52,24 +50,6 @@ func DialEthClient(rpcUrl string, metrics Metricer) (EthClient, error) {
return client, nil return client, nil
} }
// FinalizedBlockHeight retrieves the latest block height in a finalized state
func (c *client) FinalizedBlockHeight() (*big.Int, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
// **NOTE** Local devnet is having issues with the "finalized" block tag. Temp switch
// to "latest" to iterate faster locally but this needs to be updated
var header *types.Header
err := c.rpc.CallContext(ctxwt, &header, "eth_getBlockByNumber", "latest", false)
if err != nil {
return nil, err
} else if header == nil {
return nil, ethereum.NotFound
}
return header.Number, nil
}
// BlockHeaderByHash retrieves the block header attributed to the supplied hash // BlockHeaderByHash retrieves the block header attributed to the supplied hash
func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) { func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
......
...@@ -14,15 +14,16 @@ var ( ...@@ -14,15 +14,16 @@ var (
) )
type HeaderTraversal struct { type HeaderTraversal struct {
ethClient EthClient ethClient EthClient
lastHeader *types.Header
lastHeader *types.Header
blockConfirmationDepth *big.Int
} }
// NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client. // NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client.
// The HeaderTraversal will start fetching blocks starting from the supplied header unless // The HeaderTraversal will start fetching blocks starting from the supplied header unless nil, indicating genesis.
// nil, indicating genesis. func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header, confDepth *big.Int) *HeaderTraversal {
func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header) *HeaderTraversal { return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader, blockConfirmationDepth: confDepth}
return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader}
} }
// LastHeader returns the last header that was fetched by the HeaderTraversal // LastHeader returns the last header that was fetched by the HeaderTraversal
...@@ -34,13 +35,19 @@ func (f *HeaderTraversal) LastHeader() *types.Header { ...@@ -34,13 +35,19 @@ func (f *HeaderTraversal) LastHeader() *types.Header {
// NextFinalizedHeaders retrives the next set of headers that have been // NextFinalizedHeaders retrives the next set of headers that have been
// marked as finalized by the connected client, bounded by the supplied size // marked as finalized by the connected client, bounded by the supplied size
func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) { func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) {
finalizedBlockHeight, err := f.ethClient.FinalizedBlockHeight() latestBlockHeader, err := f.ethClient.BlockHeaderByNumber(nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to query latest finalized height: %w", err) return nil, fmt.Errorf("unable to query latest block: %w", err)
}
endHeight := new(big.Int).Sub(latestBlockHeader.Number, f.blockConfirmationDepth)
if endHeight.Sign() < 0 {
// No blocks with the provided confirmation depth available
return nil, nil
} }
if f.lastHeader != nil { if f.lastHeader != nil {
cmp := f.lastHeader.Number.Cmp(finalizedBlockHeight) cmp := f.lastHeader.Number.Cmp(endHeight)
if cmp == 0 { if cmp == 0 {
return nil, nil return nil, nil
} else if cmp > 0 { } else if cmp > 0 {
...@@ -53,7 +60,7 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, ...@@ -53,7 +60,7 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
nextHeight = new(big.Int).Add(f.lastHeader.Number, bigOne) nextHeight = new(big.Int).Add(f.lastHeader.Number, bigOne)
} }
endHeight := clampBigInt(nextHeight, finalizedBlockHeight, maxSize) endHeight = clampBigInt(nextHeight, endHeight, maxSize)
headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight) headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight)
if err != nil { if err != nil {
return nil, fmt.Errorf("error querying blocks by range: %w", err) return nil, fmt.Errorf("error querying blocks by range: %w", err)
......
...@@ -37,7 +37,7 @@ func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) { ...@@ -37,7 +37,7 @@ func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) {
// start from block 10 as the latest fetched block // start from block 10 as the latest fetched block
lastHeader := &types.Header{Number: big.NewInt(10)} lastHeader := &types.Header{Number: big.NewInt(10)}
headerTraversal := NewHeaderTraversal(client, lastHeader) headerTraversal := NewHeaderTraversal(client, lastHeader, bigZero)
// no new headers when matched with head // no new headers when matched with head
client.On("FinalizedBlockHeight").Return(big.NewInt(10), nil) client.On("FinalizedBlockHeight").Return(big.NewInt(10), nil)
...@@ -50,7 +50,7 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) { ...@@ -50,7 +50,7 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
headerTraversal := NewHeaderTraversal(client, nil) headerTraversal := NewHeaderTraversal(client, nil, bigZero)
// blocks [0..4] // blocks [0..4]
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
...@@ -73,7 +73,7 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) { ...@@ -73,7 +73,7 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
headerTraversal := NewHeaderTraversal(client, nil) headerTraversal := NewHeaderTraversal(client, nil, bigZero)
// 100 "available" headers // 100 "available" headers
client.On("FinalizedBlockHeight").Return(big.NewInt(100), nil) client.On("FinalizedBlockHeight").Return(big.NewInt(100), nil)
...@@ -97,7 +97,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) { ...@@ -97,7 +97,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
headerTraversal := NewHeaderTraversal(client, nil) headerTraversal := NewHeaderTraversal(client, nil, bigZero)
// blocks [0..4] // blocks [0..4]
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment