Commit b4243cc6 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #6914 from epociask/indexer.l1height-param

Optional L1 Starting Height for Indexer Syncing
parents b97ff3dc 72da248c
......@@ -4,7 +4,7 @@
### Setup env
The `indexer.toml` stores a set of preset environmental variables that can be used to run the indexer with the exception of the network specific `l1-rpc` and `l2-rpc` variables. The `indexer.toml` file can be ran as a default config, otherwise a custom `.toml` config can provided via the `--config` flag when running the application. Additionally, L1 system contract addresses must provided for the specific OP Stack network actively being indexed. Currently the indexer has no way to infer L1 system config addresses provided a L2 chain ID or network enum.
The `indexer.toml` stores a set of preset environmental variables that can be used to run the indexer with the exception of the network specific `l1-rpc` and `l2-rpc` variables. The `indexer.toml` file can be ran as a default config, otherwise a custom `.toml` config can provided via the `--config` flag when running the application. An optional `l1-starting-height` value can be provided to the indexer to specify the L1 starting block height to begin indexing from. This should be ideally be an L1 block that holds a correlated L2 genesis commitment. Furthermore, this value must be less than the current L1 block height to pass validation. If no starting height value is provided and the database is empty, the indexer will begin sequentially processing from L1 genesis.
### Testing
All tests can be ran by running `make test` from the `/indexer` directory. This will run all unit and e2e tests.
......
......@@ -2,7 +2,9 @@ package config
import (
"fmt"
"math/big"
"os"
"reflect"
"github.com/BurntSushi/toml"
"github.com/ethereum/go-ethereum/common"
......@@ -13,11 +15,11 @@ import (
// Config represents the `indexer.toml` file used to configure the indexer
type Config struct {
Chain ChainConfig
RPCs RPCsConfig `toml:"rpcs"`
DB DBConfig
API APIConfig
Metrics MetricsConfig
Chain ChainConfig `toml:"chain"`
RPCs RPCsConfig `toml:"rpcs"`
DB DBConfig `toml:"db"`
API APIConfig `toml:"api"`
Metrics MetricsConfig `toml:"metrics"`
}
// fetch this via onchain config from RPCsConfig and remove from config in future
......@@ -34,6 +36,26 @@ type L1Contracts struct {
// Remove afterwards?
}
// converts struct of to a slice of addresses for easy iteration
// also validates that all fields are addresses
func (c *L1Contracts) AsSlice() ([]common.Address, error) {
clone := *c
contractValue := reflect.ValueOf(clone)
fields := reflect.VisibleFields(reflect.TypeOf(clone))
l1Contracts := make([]common.Address, len(fields))
for i, field := range fields {
// ruleid: unsafe-reflect-by-name
addr, ok := (contractValue.FieldByName(field.Name).Interface()).(common.Address)
if !ok {
return nil, fmt.Errorf("non-address found in L1Contracts: %s", field.Name)
}
l1Contracts[i] = addr
}
return l1Contracts, nil
}
// ChainConfig configures of the chain being indexed
type ChainConfig struct {
// Configure known chains with the l2 chain id
......@@ -41,8 +63,11 @@ type ChainConfig struct {
Preset int
L1Contracts L1Contracts `toml:"l1-contracts"`
// L1StartingHeight is the block height to start indexing from
// NOTE - This is currently unimplemented
L1StartingHeight int
L1StartingHeight uint `toml:"l1-starting-height"`
}
func (cc *ChainConfig) L1StartHeight() *big.Int {
return big.NewInt(int64(cc.L1StartingHeight))
}
// RPCsConfig configures the RPC urls
......@@ -53,23 +78,23 @@ type RPCsConfig struct {
// DBConfig configures the postgres database
type DBConfig struct {
Host string
Port int
Name string
User string
Password string
Host string `toml:"host"`
Port int `toml:"port"`
Name string `toml:"name"`
User string `toml:"user"`
Password string `toml:"password"`
}
// APIConfig configures the API server
type APIConfig struct {
Host string
Port int
Host string `toml:"host"`
Port int `toml:"port"`
}
// MetricsConfig configures the metrics server
type MetricsConfig struct {
Host string
Port int
Host string `toml:"host"`
Port int `toml:"port"`
}
// LoadConfig loads the `indexer.toml` config file from a given path
......
......@@ -139,3 +139,23 @@ func TestLoadConfig_WithUnknownPreset(t *testing.T) {
require.Error(t, err)
require.Equal(t, fmt.Sprintf("unknown preset: %d", faultyPreset), err.Error())
}
func Test_AsSliceSuccess(t *testing.T) {
// error cases are intentionally ignored for testing since they can only be
// generated when the L1Contracts struct is developer modified to hold a non-address var field
testCfg := &L1Contracts{
OptimismPortalProxy: common.HexToAddress("0x4205Fc579115071764c7423A4f12eDde41f106Ed"),
L2OutputOracleProxy: common.HexToAddress("0x42097868233d1aa22e815a266982f2cf17685a27"),
L1CrossDomainMessengerProxy: common.HexToAddress("0x420ce71c97B33Cc4729CF772ae268934F7ab5fA1"),
L1StandardBridgeProxy: common.HexToAddress("0x4209fc46f92E8a1c0deC1b1747d010903E884bE1"),
}
slice, err := testCfg.AsSlice()
require.NoError(t, err)
require.Equal(t, len(slice), 4)
require.Equal(t, slice[0].String(), testCfg.OptimismPortalProxy.String())
require.Equal(t, slice[1].String(), testCfg.L2OutputOracleProxy.String())
require.Equal(t, slice[2].String(), testCfg.L1CrossDomainMessengerProxy.String())
require.Equal(t, slice[3].String(), testCfg.L1StandardBridgeProxy.String())
}
package database
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/mock"
)
type MockBlocksView struct {
mock.Mock
}
func (m *MockBlocksView) L1BlockHeader(common.Hash) (*L1BlockHeader, error) {
args := m.Called()
header, ok := args.Get(0).(*L1BlockHeader)
if !ok {
header = nil
}
return header, args.Error(1)
}
func (m *MockBlocksView) L1BlockHeaderWithFilter(BlockHeader) (*L1BlockHeader, error) {
args := m.Called()
return args.Get(0).(*L1BlockHeader), args.Error(1)
}
func (m *MockBlocksView) L1LatestBlockHeader() (*L1BlockHeader, error) {
args := m.Called()
header, ok := args.Get(0).(*L1BlockHeader)
if !ok {
header = nil
}
return header, args.Error(1)
}
func (m *MockBlocksView) L2BlockHeader(common.Hash) (*L2BlockHeader, error) {
args := m.Called()
return args.Get(0).(*L2BlockHeader), args.Error(1)
}
func (m *MockBlocksView) L2BlockHeaderWithFilter(BlockHeader) (*L2BlockHeader, error) {
args := m.Called()
return args.Get(0).(*L2BlockHeader), args.Error(1)
}
func (m *MockBlocksView) L2LatestBlockHeader() (*L2BlockHeader, error) {
args := m.Called()
return args.Get(0).(*L2BlockHeader), args.Error(1)
}
func (m *MockBlocksView) LatestCheckpointedOutput() (*OutputProposal, error) {
args := m.Called()
return args.Get(0).(*OutputProposal), args.Error(1)
}
func (m *MockBlocksView) OutputProposal(index *big.Int) (*OutputProposal, error) {
args := m.Called()
return args.Get(0).(*OutputProposal), args.Error(1)
}
func (m *MockBlocksView) LatestEpoch() (*Epoch, error) {
args := m.Called()
return args.Get(0).(*Epoch), args.Error(1)
}
type MockBlocksDB struct {
MockBlocksView
}
func (m *MockBlocksDB) StoreL1BlockHeaders(headers []L1BlockHeader) error {
args := m.Called(headers)
return args.Error(1)
}
func (m *MockBlocksDB) StoreL2BlockHeaders(headers []L2BlockHeader) error {
args := m.Called(headers)
return args.Error(1)
}
func (m *MockBlocksDB) StoreLegacyStateBatches(headers []LegacyStateBatch) error {
args := m.Called(headers)
return args.Error(1)
}
func (m *MockBlocksDB) StoreOutputProposals(headers []OutputProposal) error {
args := m.Called(headers)
return args.Error(1)
}
// MockDB is a mock database that can be used for testing
type MockDB struct {
MockBlocks *MockBlocksDB
DB *DB
}
func NewMockDB() *MockDB {
// This is currently just mocking the BlocksDB interface
// but can be expanded to mock other inner DB interfaces
// as well
mockBlocks := new(MockBlocksDB)
db := &DB{Blocks: mockBlocks}
return &MockDB{MockBlocks: mockBlocks, DB: db}
}
......@@ -14,6 +14,9 @@ import (
)
const (
// NOTE - These values can be made configurable to allow for more fine grained control
// Additionally a default interval of 5 seconds may be too slow for reading L2 blocks provided
// the current rate of L2 block production on OP Stack chains (2 seconds per block)
defaultLoopInterval = 5 * time.Second
defaultHeaderBufferSize = 500
)
......@@ -90,7 +93,7 @@ func (etl *ETL) Start(ctx context.Context) error {
for i := range logs {
if _, ok := headerMap[logs[i].BlockHash]; !ok {
// NOTE. Definitely an error state if the none of the headers were re-orged out in between
// the blocks and logs retreival operations. However, we need to gracefully handle reorgs
// the blocks and logs retrieval operations. However, we need to gracefully handle reorgs
batchLog.Error("log found with block hash not in the batch", "block_hash", logs[i].BlockHash, "log_index", logs[i].Index)
return errors.New("parsed log with a block hash not in the fetched batch")
}
......
......@@ -2,14 +2,13 @@ package etl
import (
"context"
"errors"
"reflect"
"fmt"
"math/big"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
......@@ -20,43 +19,49 @@ type L1ETL struct {
db *database.DB
}
func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height.
func NewL1ETL(log log.Logger, db *database.DB, client node.EthClient, startHeight *big.Int,
contracts config.L1Contracts) (*L1ETL, error) {
log = log.New("etl", "l1")
contractValue := reflect.ValueOf(contracts)
fields := reflect.VisibleFields(reflect.TypeOf(contracts))
l1Contracts := make([]common.Address, len(fields))
for i, field := range fields {
// ruleid: unsafe-reflect-by-name
addr, ok := (contractValue.FieldByName(field.Name).Interface()).(common.Address)
if !ok {
log.Error("non-address found in L1Contracts", "name", field.Name)
return nil, errors.New("non-address found in L1Contracts")
}
log.Info("configured contract", "name", field.Name, "addr", addr)
l1Contracts[i] = addr
latestHeader, err := db.Blocks.L1LatestBlockHeader()
if err != nil {
return nil, err
}
latestHeader, err := db.Blocks.L1LatestBlockHeader()
cSlice, err := contracts.AsSlice()
if err != nil {
return nil, err
}
// Determine the starting height for traversal
var fromHeader *types.Header
if latestHeader != nil {
log.Info("detected last indexed block", "number", latestHeader.Number.Int, "hash", latestHeader.Hash)
fromHeader = latestHeader.RLPHeader.Header()
} else if startHeight.BitLen() > 0 {
log.Info("no indexed state in storage, starting from supplied L1 height", "height", startHeight.String())
header, err := client.BlockHeaderByNumber(startHeight)
if err != nil {
return nil, fmt.Errorf("could not fetch starting block header: %w", err)
}
fromHeader = header
} else {
log.Info("no indexed state, starting from genesis")
log.Info("no indexed state in storage, starting from L1 genesis")
}
// NOTE - The use of un-buffered channel here assumes that downstream consumers
// will be able to keep up with the rate of incoming batches
etlBatches := make(chan ETLBatch)
etl := ETL{
log: log,
headerTraversal: node.NewHeaderTraversal(client, fromHeader),
ethClient: client.GethEthClient(),
contracts: l1Contracts,
contracts: cSlice,
etlBatches: etlBatches,
}
......
package etl
import (
"math/big"
"github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"testing"
)
func Test_L1ETL_Construction(t *testing.T) {
type testSuite struct {
db *database.MockDB
client *node.MockEthClient
start *big.Int
contracts config.L1Contracts
}
var tests = []struct {
name string
construction func() *testSuite
assertion func(*L1ETL, error)
}{
{
name: "Start from L1 config height",
construction: func() *testSuite {
client := new(node.MockEthClient)
db := database.NewMockDB()
testStart := big.NewInt(100)
db.MockBlocks.On("L1LatestBlockHeader").Return(nil, nil)
client.On("BlockHeaderByNumber", mock.MatchedBy(
node.BigIntMatcher(100))).Return(
&types.Header{
ParentHash: common.HexToHash("0x69"),
}, nil)
client.On("GethEthClient").Return(nil)
return &testSuite{
db: db,
client: client,
start: testStart,
contracts: config.L1Contracts{},
}
},
assertion: func(etl *L1ETL, err error) {
require.NoError(t, err)
require.Equal(t, etl.headerTraversal.LastHeader().ParentHash, common.HexToHash("0x69"))
},
},
{
name: "Start from recent height stored in DB",
construction: func() *testSuite {
client := new(node.MockEthClient)
db := database.NewMockDB()
testStart := big.NewInt(100)
db.MockBlocks.On("L1LatestBlockHeader").Return(
&database.L1BlockHeader{
BlockHeader: database.BlockHeader{
RLPHeader: &database.RLPHeader{
Number: big.NewInt(69),
},
}}, nil)
client.On("GethEthClient").Return(nil)
return &testSuite{
db: db,
client: client,
start: testStart,
contracts: config.L1Contracts{},
}
},
assertion: func(etl *L1ETL, err error) {
require.NoError(t, err)
header := etl.headerTraversal.LastHeader()
require.True(t, header.Number.Cmp(big.NewInt(69)) == 0)
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ts := test.construction()
logger := log.NewLogger(log.DefaultCLIConfig())
etl, err := NewL1ETL(logger, ts.db.DB, ts.client, ts.start, ts.contracts)
test.assertion(etl, err)
})
}
}
......@@ -34,7 +34,7 @@ func NewIndexer(logger log.Logger, chainConfig config.ChainConfig, rpcsConfig co
return nil, err
}
l1Etl, err := etl.NewL1ETL(logger, db, l1EthClient, chainConfig.L1Contracts)
l1Etl, err := etl.NewL1ETL(logger, db, l1EthClient, chainConfig.L1StartHeight(), chainConfig.L1Contracts)
if err != nil {
return nil, err
}
......
......@@ -3,6 +3,7 @@
[chain]
# OP Goerli
preset = 420
l1-starting-height = 0
[rpcs]
l1-rpc = "${INDEXER_RPC_URL_L1}"
......
......@@ -19,3 +19,8 @@ func clampBigInt(start, end *big.Int, size uint64) *big.Int {
temp.Add(start, big.NewInt(int64(size-1)))
return temp
}
// returns an inner comparison function result for a big.Int
func BigIntMatcher(num int64) func(*big.Int) bool {
return func(bi *big.Int) bool { return bi.Int64() == num }
}
......@@ -7,17 +7,13 @@ import (
"github.com/stretchr/testify/assert"
)
func bigIntMatcher(num int64) func(*big.Int) bool {
return func(bi *big.Int) bool { return bi.Int64() == num }
}
func TestClampBigInt(t *testing.T) {
assert.True(t, true)
start := big.NewInt(1)
end := big.NewInt(10)
// When the (start, end) boudnds are within range
// When the (start, end) bounds are within range
// the same end pointer should be returned
// larger range
......
......@@ -27,8 +27,9 @@ const (
type EthClient interface {
FinalizedBlockHeight() (*big.Int, error)
BlockHeadersByRange(*big.Int, *big.Int) ([]types.Header, error)
BlockHeaderByNumber(*big.Int) (*types.Header, error)
BlockHeaderByHash(common.Hash) (*types.Header, error)
BlockHeadersByRange(*big.Int, *big.Int) ([]types.Header, error)
StorageHash(common.Address, *big.Int) (common.Hash, error)
......@@ -99,7 +100,20 @@ func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
return header, nil
}
// BlockHeadersByRange will retrieve block headers within the specified range -- includsive. No restrictions
// BlockHeaderByNumber retrieves the block header attributed to the supplied height
func (c *client) BlockHeaderByNumber(number *big.Int) (*types.Header, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
header, err := ethclient.NewClient(c.rpcClient).HeaderByNumber(ctxwt, number)
if err != nil {
return nil, err
}
return header, nil
}
// BlockHeadersByRange will retrieve block headers within the specified range -- inclusive. No restrictions
// are placed on the range such as blocks in the "latest", "safe" or "finalized" states. If the specified
// range is too large, `endHeight > latest`, the resulting list is truncated to the available headers
func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Header, error) {
......
......@@ -24,6 +24,12 @@ func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header) *HeaderTr
return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader}
}
// LastHeader returns the last header that was fetched by the HeaderTraversal
// This is useful for testing the state of the HeaderTraversal
func (f *HeaderTraversal) LastHeader() *types.Header {
return f.lastHeader
}
// NextFinalizedHeaders retrives the next set of headers that have been
// marked as finalized by the connected client, bounded by the supplied size
func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) {
......
......@@ -55,7 +55,7 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
// blocks [0..4]
headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(0)), mock.MatchedBy(BigIntMatcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
......@@ -63,7 +63,7 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
// blocks [5..9]
headers = makeHeaders(5, &headers[len(headers)-1])
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(5)), mock.MatchedBy(BigIntMatcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
......@@ -80,14 +80,14 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
// clamped by the supplied size
headers := makeHeaders(5, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(0)), mock.MatchedBy(BigIntMatcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
// clamped by the supplied size. FinalizedHeight == 100
headers = makeHeaders(10, &headers[len(headers)-1])
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(14))).Return(headers, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(5)), mock.MatchedBy(BigIntMatcher(14))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(10)
require.NoError(t, err)
require.Len(t, headers, 10)
......@@ -102,7 +102,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
// blocks [0..4]
headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(0)), mock.MatchedBy(BigIntMatcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
......@@ -110,7 +110,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
// blocks [5..9]. Next batch is not chained correctly (starts again from genesis)
headers = makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(5)), mock.MatchedBy(BigIntMatcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5)
require.Nil(t, headers)
require.Equal(t, ErrHeaderTraversalAndProviderMismatchedState, err)
......
......@@ -3,20 +3,22 @@ package node
import (
"math/big"
"github.com/stretchr/testify/mock"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/mock"
)
var _ EthClient = &MockEthClient{}
type MockEthClient struct {
mock.Mock
}
func (m *MockEthClient) BlockHeaderByNumber(number *big.Int) (*types.Header, error) {
args := m.Called(number)
return args.Get(0).(*types.Header), args.Error(1)
}
func (m *MockEthClient) FinalizedBlockHeight() (*big.Int, error) {
args := m.Called()
return args.Get(0).(*big.Int), args.Error(1)
......@@ -44,5 +46,10 @@ func (m *MockEthClient) GethRpcClient() *rpc.Client {
func (m *MockEthClient) GethEthClient() *ethclient.Client {
args := m.Called()
return args.Get(0).(*ethclient.Client)
client, ok := args.Get(0).(*ethclient.Client)
if !ok {
return nil
}
return client
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment