Commit 102c475c authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #6215 from ethereum-optimism/indexer.processor.retry

feat: indexer retries batches on failure. l1_processor indexes l1 checkpoints
parents ab15393d 1fedc1ff
...@@ -3,9 +3,9 @@ package database ...@@ -3,9 +3,9 @@ package database
import ( import (
"context" "context"
"errors" "errors"
"math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
"gorm.io/gorm" "gorm.io/gorm"
) )
...@@ -27,11 +27,6 @@ type L1BlockHeader struct { ...@@ -27,11 +27,6 @@ type L1BlockHeader struct {
type L2BlockHeader struct { type L2BlockHeader struct {
BlockHeader BlockHeader
// Marked when the proposed output is finalized on L1.
// All bedrock blocks will have `LegacyStateBatchIndex ^== NULL`
L1BlockHash *common.Hash `gorm:"serializer:json"`
LegacyStateBatchIndex *uint64
} }
type LegacyStateBatch struct { type LegacyStateBatch struct {
...@@ -39,25 +34,33 @@ type LegacyStateBatch struct { ...@@ -39,25 +34,33 @@ type LegacyStateBatch struct {
// violating the primary key constraint. // violating the primary key constraint.
Index uint64 `gorm:"primaryKey;default:0"` Index uint64 `gorm:"primaryKey;default:0"`
Root common.Hash `gorm:"serializer:json"` Root common.Hash `gorm:"serializer:json"`
Size uint64 Size uint64
PrevTotal uint64 PrevTotal uint64
L1BlockHash common.Hash `gorm:"serializer:json"` L1ContractEventGUID uuid.UUID
}
type OutputProposal struct {
OutputRoot common.Hash `gorm:"primaryKey;serializer:json"`
L2BlockNumber U256
L1ContractEventGUID uuid.UUID
} }
type BlocksView interface { type BlocksView interface {
FinalizedL1BlockHeader() (*L1BlockHeader, error) LatestL1BlockHeader() (*L1BlockHeader, error)
FinalizedL2BlockHeader() (*L2BlockHeader, error) LatestCheckpointedOutput() (*OutputProposal, error)
LatestL2BlockHeader() (*L2BlockHeader, error)
} }
type BlocksDB interface { type BlocksDB interface {
BlocksView BlocksView
StoreL1BlockHeaders([]*L1BlockHeader) error StoreL1BlockHeaders([]*L1BlockHeader) error
StoreLegacyStateBatch(*LegacyStateBatch) error
StoreL2BlockHeaders([]*L2BlockHeader) error StoreL2BlockHeaders([]*L2BlockHeader) error
MarkFinalizedL1RootForL2Block(common.Hash, common.Hash) error
StoreLegacyStateBatches([]*LegacyStateBatch) error
StoreOutputProposals([]*OutputProposal) error
} }
/** /**
...@@ -79,39 +82,33 @@ func (db *blocksDB) StoreL1BlockHeaders(headers []*L1BlockHeader) error { ...@@ -79,39 +82,33 @@ func (db *blocksDB) StoreL1BlockHeaders(headers []*L1BlockHeader) error {
return result.Error return result.Error
} }
func (db *blocksDB) StoreLegacyStateBatch(stateBatch *LegacyStateBatch) error { func (db *blocksDB) StoreLegacyStateBatches(stateBatches []*LegacyStateBatch) error {
result := db.gorm.Create(stateBatch) result := db.gorm.Create(stateBatches)
if result.Error != nil { return result.Error
return result.Error }
}
// Mark this state batch index & l1 block hash for all applicable l2 blocks func (db *blocksDB) StoreOutputProposals(outputs []*OutputProposal) error {
l2Headers := make([]*L2BlockHeader, stateBatch.Size) result := db.gorm.Create(outputs)
return result.Error
}
// [start, end] range is inclusive. Since `PrevTotal` is the index of the prior batch, no func (db *blocksDB) LatestL1BlockHeader() (*L1BlockHeader, error) {
// need to subtract one when adding the size var l1Header L1BlockHeader
startHeight := U256{Int: big.NewInt(int64(stateBatch.PrevTotal + 1))} result := db.gorm.Order("number DESC").Take(&l1Header)
endHeight := U256{Int: big.NewInt(int64(stateBatch.PrevTotal + stateBatch.Size))}
result = db.gorm.Where("number BETWEEN ? AND ?", &startHeight, &endHeight).Find(&l2Headers)
if result.Error != nil { if result.Error != nil {
return result.Error if errors.Is(result.Error, gorm.ErrRecordNotFound) {
} else if result.RowsAffected != int64(stateBatch.Size) { return nil, nil
return errors.New("state batch size exceeds number of indexed l2 blocks") }
}
for _, header := range l2Headers { return nil, result.Error
header.LegacyStateBatchIndex = &stateBatch.Index
header.L1BlockHash = &stateBatch.L1BlockHash
} }
result = db.gorm.Save(&l2Headers) return &l1Header, nil
return result.Error
} }
// FinalizedL1BlockHeader returns the latest L1 block header stored in the database, nil otherwise func (db *blocksDB) LatestCheckpointedOutput() (*OutputProposal, error) {
func (db *blocksDB) FinalizedL1BlockHeader() (*L1BlockHeader, error) { var outputProposal OutputProposal
var l1Header L1BlockHeader result := db.gorm.Order("l2_block_number DESC").Take(&outputProposal)
result := db.gorm.Order("number DESC").Take(&l1Header)
if result.Error != nil { if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) { if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil return nil, nil
...@@ -120,7 +117,7 @@ func (db *blocksDB) FinalizedL1BlockHeader() (*L1BlockHeader, error) { ...@@ -120,7 +117,7 @@ func (db *blocksDB) FinalizedL1BlockHeader() (*L1BlockHeader, error) {
return nil, result.Error return nil, result.Error
} }
return &l1Header, nil return &outputProposal, nil
} }
// L2 // L2
...@@ -130,8 +127,7 @@ func (db *blocksDB) StoreL2BlockHeaders(headers []*L2BlockHeader) error { ...@@ -130,8 +127,7 @@ func (db *blocksDB) StoreL2BlockHeaders(headers []*L2BlockHeader) error {
return result.Error return result.Error
} }
// FinalizedL2BlockHeader returns the latest L2 block header stored in the database, nil otherwise func (db *blocksDB) LatestL2BlockHeader() (*L2BlockHeader, error) {
func (db *blocksDB) FinalizedL2BlockHeader() (*L2BlockHeader, error) {
var l2Header L2BlockHeader var l2Header L2BlockHeader
result := db.gorm.Order("number DESC").Take(&l2Header) result := db.gorm.Order("number DESC").Take(&l2Header)
if result.Error != nil { if result.Error != nil {
...@@ -145,19 +141,3 @@ func (db *blocksDB) FinalizedL2BlockHeader() (*L2BlockHeader, error) { ...@@ -145,19 +141,3 @@ func (db *blocksDB) FinalizedL2BlockHeader() (*L2BlockHeader, error) {
result.Logger.Info(context.Background(), "number ", l2Header.Number) result.Logger.Info(context.Background(), "number ", l2Header.Number)
return &l2Header, nil return &l2Header, nil
} }
// MarkFinalizedL1RootForL2Block updates the stored L2 block header with the L1 block
// that contains the output proposal for the L2 root.
func (db *blocksDB) MarkFinalizedL1RootForL2Block(l2Root, l1Root common.Hash) error {
var l2Header L2BlockHeader
l2Header.Hash = l2Root // set the primary key
result := db.gorm.First(&l2Header)
if result.Error != nil {
return result.Error
}
l2Header.L1BlockHash = &l1Root
result = db.gorm.Save(&l2Header)
return result.Error
}
...@@ -13,28 +13,11 @@ CREATE TABLE IF NOT EXISTS l1_block_headers ( ...@@ -13,28 +13,11 @@ CREATE TABLE IF NOT EXISTS l1_block_headers (
timestamp INTEGER NOT NULL timestamp INTEGER NOT NULL
); );
CREATE TABLE IF NOT EXISTS legacy_state_batches (
index INTEGER NOT NULL PRIMARY KEY,
root VARCHAR NOT NULL,
size INTEGER NOT NULL,
prev_total INTEGER NOT NULL,
-- Finalization information. Unlike `l2_block_headers` the NOT NULL
-- constraint is added since the l1 block hash will be known when
-- when reading the output event
l1_block_hash VARCHAR NOT NULL REFERENCES l1_block_headers(hash)
);
CREATE TABLE IF NOT EXISTS l2_block_headers ( CREATE TABLE IF NOT EXISTS l2_block_headers (
-- Block header hash VARCHAR NOT NULL PRIMARY KEY,
hash VARCHAR NOT NULL PRIMARY KEY, parent_hash VARCHAR NOT NULL,
parent_hash VARCHAR NOT NULL, number UINT256,
number UINT256, timestamp INTEGER NOT NULL
timestamp INTEGER NOT NULL,
-- Finalization information
l1_block_hash VARCHAR REFERENCES l1_block_headers(hash),
legacy_state_batch_index INTEGER REFERENCES legacy_state_batches(index)
); );
/** /**
...@@ -59,6 +42,24 @@ CREATE TABLE IF NOT EXISTS l2_contract_events ( ...@@ -59,6 +42,24 @@ CREATE TABLE IF NOT EXISTS l2_contract_events (
timestamp INTEGER NOT NULL timestamp INTEGER NOT NULL
); );
-- Tables that index finalization markers for L2 blocks.
CREATE TABLE IF NOT EXISTS legacy_state_batches (
index INTEGER NOT NULL PRIMARY KEY,
root VARCHAR NOT NULL,
size INTEGER NOT NULL,
prev_total INTEGER NOT NULL,
l1_contract_event_guid VARCHAR REFERENCES l1_contract_events(guid)
);
CREATE TABLE IF NOT EXISTS output_proposals (
output_root VARCHAR NOT NULL PRIMARY KEY,
l2_block_number UINT256,
l1_contract_event_guid VARCHAR REFERENCES l1_contract_events(guid)
);
/** /**
* BRIDGING DATA * BRIDGING DATA
*/ */
...@@ -71,6 +72,7 @@ CREATE TABLE IF NOT EXISTS deposits ( ...@@ -71,6 +72,7 @@ CREATE TABLE IF NOT EXISTS deposits (
-- Deposit information (do we need indexes on from/to?) -- Deposit information (do we need indexes on from/to?)
from_address VARCHAR NOT NULL, from_address VARCHAR NOT NULL,
to_address VARCHAR NOT NULL, to_address VARCHAR NOT NULL,
l1_token_address VARCHAR NOT NULL, l1_token_address VARCHAR NOT NULL,
l2_token_address VARCHAR NOT NULL, l2_token_address VARCHAR NOT NULL,
......
...@@ -91,7 +91,7 @@ func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) { ...@@ -91,7 +91,7 @@ func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
// are placed on the range such as blocks in the "latest", "safe" or "finalized" states. If the specified // are placed on the range such as blocks in the "latest", "safe" or "finalized" states. If the specified
// range is too large, `endHeight > latest`, the resulting list is truncated to the available headers // range is too large, `endHeight > latest`, the resulting list is truncated to the available headers
func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]*types.Header, error) { func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]*types.Header, error) {
count := new(big.Int).Sub(endHeight, startHeight).Uint64() count := new(big.Int).Sub(endHeight, startHeight).Uint64() + 1
batchElems := make([]rpc.BatchElem, count) batchElems := make([]rpc.BatchElem, count)
for i := uint64(0); i < count; i++ { for i := uint64(0); i < count; i++ {
height := new(big.Int).Add(startHeight, new(big.Int).SetUint64(i)) height := new(big.Int).Add(startHeight, new(big.Int).SetUint64(i))
......
...@@ -7,35 +7,38 @@ import ( ...@@ -7,35 +7,38 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
// Max number of headers that's bee returned by the Fetcher at once. var (
const maxHeaderBatchSize = 50 ErrHeaderTraversalAheadOfProvider = errors.New("the HeaderTraversal's internal state is ahead of the provider")
ErrHeaderTraversalAndProviderMismatchedState = errors.New("the HeaderTraversal and provider have diverged in state")
var ErrFetcherAndProviderMismatchedState = errors.New("the fetcher and provider have diverged in finalized state") )
type Fetcher struct { type HeaderTraversal struct {
ethClient EthClient ethClient EthClient
lastHeader *types.Header lastHeader *types.Header
} }
// NewFetcher instantiates a new instance of Fetcher against the supplied rpc client. // NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client.
// The Fetcher will start fetching blocks starting from the supplied header unless // The HeaderTraversal will start fetching blocks starting from the supplied header unless
// nil, indicating genesis. // nil, indicating genesis.
func NewFetcher(ethClient EthClient, fromHeader *types.Header) *Fetcher { func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header) *HeaderTraversal {
return &Fetcher{ethClient: ethClient, lastHeader: fromHeader} return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader}
} }
// NextConfirmedHeaders retrives the next set of headers that have been // NextFinalizedHeaders retrives the next set of headers that have been
// marked as finalized by the connected client // marked as finalized by the connected client, bounded by the supplied size
func (f *Fetcher) NextFinalizedHeaders() ([]*types.Header, error) { func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]*types.Header, error) {
finalizedBlockHeight, err := f.ethClient.FinalizedBlockHeight() finalizedBlockHeight, err := f.ethClient.FinalizedBlockHeight()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if f.lastHeader != nil && f.lastHeader.Number.Cmp(finalizedBlockHeight) >= 0 { if f.lastHeader != nil {
// Warn if our fetcher is ahead of the provider. The fetcher should always cmp := f.lastHeader.Number.Cmp(finalizedBlockHeight)
// be behind or at head with the provider. if cmp == 0 {
return nil, nil return nil, nil
} else if cmp > 0 {
return nil, ErrHeaderTraversalAheadOfProvider
}
} }
nextHeight := bigZero nextHeight := bigZero
...@@ -43,7 +46,7 @@ func (f *Fetcher) NextFinalizedHeaders() ([]*types.Header, error) { ...@@ -43,7 +46,7 @@ func (f *Fetcher) NextFinalizedHeaders() ([]*types.Header, error) {
nextHeight = new(big.Int).Add(f.lastHeader.Number, bigOne) nextHeight = new(big.Int).Add(f.lastHeader.Number, bigOne)
} }
endHeight := clampBigInt(nextHeight, finalizedBlockHeight, maxHeaderBatchSize) endHeight := clampBigInt(nextHeight, finalizedBlockHeight, maxSize)
headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight) headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -55,7 +58,7 @@ func (f *Fetcher) NextFinalizedHeaders() ([]*types.Header, error) { ...@@ -55,7 +58,7 @@ func (f *Fetcher) NextFinalizedHeaders() ([]*types.Header, error) {
} else if f.lastHeader != nil && headers[0].ParentHash != f.lastHeader.Hash() { } else if f.lastHeader != nil && headers[0].ParentHash != f.lastHeader.Hash() {
// The indexer's state is in an irrecoverable state relative to the provider. This // The indexer's state is in an irrecoverable state relative to the provider. This
// should never happen since the indexer is dealing with only finalized blocks. // should never happen since the indexer is dealing with only finalized blocks.
return nil, ErrFetcherAndProviderMismatchedState return nil, ErrHeaderTraversalAndProviderMismatchedState
} }
f.lastHeader = headers[numHeaders-1] f.lastHeader = headers[numHeaders-1]
......
...@@ -33,31 +33,31 @@ func makeHeaders(numHeaders uint64, prevHeader *types.Header) []*types.Header { ...@@ -33,31 +33,31 @@ func makeHeaders(numHeaders uint64, prevHeader *types.Header) []*types.Header {
return headers return headers
} }
func TestFetcherNextFinalizedHeadersNoOp(t *testing.T) { func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from block 0 as the latest fetched block // start from block 10 as the latest fetched block
lastHeader := &types.Header{Number: bigZero} lastHeader := &types.Header{Number: big.NewInt(10)}
fetcher := NewFetcher(client, lastHeader) headerTraversal := NewHeaderTraversal(client, lastHeader)
// no new headers when matched with head // no new headers when matched with head
client.On("FinalizedBlockHeight").Return(big.NewInt(0), nil) client.On("FinalizedBlockHeight").Return(big.NewInt(10), nil)
headers, err := fetcher.NextFinalizedHeaders() headers, err := headerTraversal.NextFinalizedHeaders(100)
assert.NoError(t, err) assert.NoError(t, err)
assert.Empty(t, headers) assert.Empty(t, headers)
} }
func TestFetcherNextFinalizedHeadersCursored(t *testing.T) { func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
fetcher := NewFetcher(client, nil) headerTraversal := NewHeaderTraversal(client, nil)
// blocks [0..4] // blocks [0..4]
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err := fetcher.NextFinalizedHeaders() headers, err := headerTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, headers, 5) assert.Len(t, headers, 5)
...@@ -65,46 +65,46 @@ func TestFetcherNextFinalizedHeadersCursored(t *testing.T) { ...@@ -65,46 +65,46 @@ func TestFetcherNextFinalizedHeadersCursored(t *testing.T) {
headers = makeHeaders(5, headers[len(headers)-1]) headers = makeHeaders(5, headers[len(headers)-1])
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil) client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders() headers, err = headerTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, headers, 5) assert.Len(t, headers, 5)
} }
func TestFetcherNextFinalizedHeadersMaxHeaderBatch(t *testing.T) { func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
fetcher := NewFetcher(client, nil) headerTraversal := NewHeaderTraversal(client, nil)
// blocks [0..maxBatchSize] size == maxBatchSize = 1 // 100 "available" headers
headers := makeHeaders(maxHeaderBatchSize, nil) client.On("FinalizedBlockHeight").Return(big.NewInt(100), nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(maxHeaderBatchSize), nil)
// clamped by the max batch size // clamped by the supplied size
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize-1))).Return(headers, nil) headers := makeHeaders(5, nil)
headers, err := fetcher.NextFinalizedHeaders() client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, headers, maxHeaderBatchSize) assert.Len(t, headers, 5)
// blocks [maxBatchSize..maxBatchSize] // clamped by the supplied size. FinalizedHeight == 100
headers = makeHeaders(1, headers[len(headers)-1]) headers = makeHeaders(10, headers[len(headers)-1])
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize)), mock.MatchedBy(bigIntMatcher(maxHeaderBatchSize))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(14))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders() headers, err = headerTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, headers, 1) assert.Len(t, headers, 10)
} }
func TestFetcherMismatchedProviderStateError(t *testing.T) { func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
fetcher := NewFetcher(client, nil) headerTraversal := NewHeaderTraversal(client, nil)
// blocks [0..4] // blocks [0..4]
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err := fetcher.NextFinalizedHeaders() headers, err := headerTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, headers, 5) assert.Len(t, headers, 5)
...@@ -112,7 +112,7 @@ func TestFetcherMismatchedProviderStateError(t *testing.T) { ...@@ -112,7 +112,7 @@ func TestFetcherMismatchedProviderStateError(t *testing.T) {
headers = makeHeaders(5, nil) headers = makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil) client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil)
headers, err = fetcher.NextFinalizedHeaders() headers, err = headerTraversal.NextFinalizedHeaders(5)
assert.Nil(t, headers) assert.Nil(t, headers)
assert.Equal(t, ErrFetcherAndProviderMismatchedState, err) assert.Equal(t, ErrHeaderTraversalAndProviderMismatchedState, err)
} }
...@@ -2,14 +2,20 @@ package processor ...@@ -2,14 +2,20 @@ package processor
import ( import (
"context" "context"
"encoding/hex"
"errors" "errors"
"math/big"
"reflect" "reflect"
"github.com/google/uuid"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum-optimism/optimism/indexer/node"
"github.com/google/uuid" "github.com/ethereum-optimism/optimism/op-bindings/bindings"
legacy_bindings "github.com/ethereum-optimism/optimism/op-bindings/legacy-bindings"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
...@@ -30,6 +36,11 @@ type L1Contracts struct { ...@@ -30,6 +36,11 @@ type L1Contracts struct {
// Remove afterwards? // Remove afterwards?
} }
type checkpointAbi struct {
l2OutputOracle *abi.ABI
legacyStateCommitmentChain *abi.ABI
}
func (c L1Contracts) toSlice() []common.Address { func (c L1Contracts) toSlice() []common.Address {
fields := reflect.VisibleFields(reflect.TypeOf(c)) fields := reflect.VisibleFields(reflect.TypeOf(c))
v := reflect.ValueOf(c) v := reflect.ValueOf(c)
...@@ -50,7 +61,19 @@ func NewL1Processor(ethClient node.EthClient, db *database.DB, l1Contracts L1Con ...@@ -50,7 +61,19 @@ func NewL1Processor(ethClient node.EthClient, db *database.DB, l1Contracts L1Con
l1ProcessLog := log.New("processor", "l1") l1ProcessLog := log.New("processor", "l1")
l1ProcessLog.Info("initializing processor") l1ProcessLog.Info("initializing processor")
latestHeader, err := db.Blocks.FinalizedL1BlockHeader() l2OutputOracleABI, err := bindings.L2OutputOracleMetaData.GetAbi()
if err != nil {
l1ProcessLog.Error("unable to generate L2OutputOracle ABI", "err", err)
return nil, err
}
legacyStateCommitmentChainABI, err := legacy_bindings.StateCommitmentChainMetaData.GetAbi()
if err != nil {
l1ProcessLog.Error("unable to generate legacy StateCommitmentChain ABI", "err", err)
return nil, err
}
checkpointAbi := checkpointAbi{l2OutputOracle: l2OutputOracleABI, legacyStateCommitmentChain: legacyStateCommitmentChainABI}
latestHeader, err := db.Blocks.LatestL1BlockHeader()
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -66,34 +89,37 @@ func NewL1Processor(ethClient node.EthClient, db *database.DB, l1Contracts L1Con ...@@ -66,34 +89,37 @@ func NewL1Processor(ethClient node.EthClient, db *database.DB, l1Contracts L1Con
fromL1Header = l1Header fromL1Header = l1Header
} else { } else {
// we shouldn't start from genesis with l1. Need a "genesis" height to be defined here // we shouldn't start from genesis with l1. Need a "genesis" L1 height provided for the rollup
l1ProcessLog.Info("no indexed state, starting from genesis") l1ProcessLog.Info("no indexed state, starting from genesis")
fromL1Header = nil fromL1Header = nil
} }
l1Processor := &L1Processor{ l1Processor := &L1Processor{
processor: processor{ processor: processor{
fetcher: node.NewFetcher(ethClient, fromL1Header), headerTraversal: node.NewHeaderTraversal(ethClient, fromL1Header),
db: db, db: db,
processFn: l1ProcessFn(l1ProcessLog, ethClient, l1Contracts), processFn: l1ProcessFn(l1ProcessLog, ethClient, l1Contracts, checkpointAbi),
processLog: l1ProcessLog, processLog: l1ProcessLog,
}, },
} }
return l1Processor, nil return l1Processor, nil
} }
func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1Contracts) func(db *database.DB, headers []*types.Header) error { func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1Contracts, checkpointAbi checkpointAbi) ProcessFn {
rawEthClient := ethclient.NewClient(ethClient.RawRpcClient()) rawEthClient := ethclient.NewClient(ethClient.RawRpcClient())
contractAddrs := l1Contracts.toSlice() contractAddrs := l1Contracts.toSlice()
processLog.Info("processor configured with contracts", "contracts", l1Contracts) processLog.Info("processor configured with contracts", "contracts", l1Contracts)
outputProposedEventSig := checkpointAbi.l2OutputOracle.Events["OutputProposed"].ID
legacyStateBatchAppendedEventSig := checkpointAbi.legacyStateCommitmentChain.Events["StateBatchAppended"].ID
return func(db *database.DB, headers []*types.Header) error { return func(db *database.DB, headers []*types.Header) error {
numHeaders := len(headers) numHeaders := len(headers)
l1HeaderMap := make(map[common.Hash]*types.Header) headerMap := make(map[common.Hash]*types.Header)
for _, header := range headers { for _, header := range headers {
l1HeaderMap[header.Hash()] = header headerMap[header.Hash()] = header
} }
/** Watch for Contract Events **/ /** Watch for Contract Events **/
...@@ -104,18 +130,21 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -104,18 +130,21 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
return err return err
} }
// L2 checkpoitns posted on L1
outputProposals := []*database.OutputProposal{}
legacyStateBatches := []*database.LegacyStateBatch{}
numLogs := len(logs) numLogs := len(logs)
l1ContractEvents := make([]*database.L1ContractEvent, numLogs) l1ContractEvents := make([]*database.L1ContractEvent, numLogs)
l1HeadersOfInterest := make(map[common.Hash]bool) l1HeadersOfInterest := make(map[common.Hash]bool)
for i, log := range logs { for i, log := range logs {
header, ok := l1HeaderMap[log.BlockHash] header, ok := headerMap[log.BlockHash]
if !ok { if !ok {
processLog.Crit("contract event found with associated header not in the batch", "header", log.BlockHash, "log_index", log.Index) processLog.Error("contract event found with associated header not in the batch", "header", log.BlockHash, "log_index", log.Index)
return errors.New("parsed log with a block hash not in this batch") return errors.New("parsed log with a block hash not in this batch")
} }
l1HeadersOfInterest[log.BlockHash] = true contractEvent := &database.L1ContractEvent{
l1ContractEvents[i] = &database.L1ContractEvent{
ContractEvent: database.ContractEvent{ ContractEvent: database.ContractEvent{
GUID: uuid.New(), GUID: uuid.New(),
BlockHash: log.BlockHash, BlockHash: log.BlockHash,
...@@ -125,21 +154,54 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -125,21 +154,54 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
Timestamp: header.Time, Timestamp: header.Time,
}, },
} }
l1ContractEvents[i] = contractEvent
l1HeadersOfInterest[log.BlockHash] = true
// Track Checkpoint Events for L2
switch contractEvent.EventSignature {
case outputProposedEventSig:
if len(log.Topics) != 4 {
processLog.Error("parsed unexpected number of L2OutputOracle#OutputProposed log topics", "log_topics", log.Topics)
return errors.New("parsed unexpected OutputProposed event")
}
outputProposals = append(outputProposals, &database.OutputProposal{
OutputRoot: log.Topics[1],
L2BlockNumber: database.U256{Int: new(big.Int).SetBytes(log.Topics[2].Bytes())},
L1ContractEventGUID: contractEvent.GUID,
})
case legacyStateBatchAppendedEventSig:
var stateBatchAppended legacy_bindings.StateCommitmentChainStateBatchAppended
err := checkpointAbi.l2OutputOracle.UnpackIntoInterface(&stateBatchAppended, "StateBatchAppended", log.Data)
if err != nil || len(log.Topics) != 2 {
processLog.Error("unexpected StateCommitmentChain#StateBatchAppended log data or log topics", "log_topics", log.Topics, "log_data", hex.EncodeToString(log.Data), "err", err)
return err
}
legacyStateBatches = append(legacyStateBatches, &database.LegacyStateBatch{
Index: new(big.Int).SetBytes(log.Topics[1].Bytes()).Uint64(),
Root: stateBatchAppended.BatchRoot,
Size: stateBatchAppended.BatchSize.Uint64(),
PrevTotal: stateBatchAppended.PrevTotalElements.Uint64(),
L1ContractEventGUID: contractEvent.GUID,
})
}
} }
/** Index L1 Blocks that have an optimism event **/ /** Aggregate applicable L1 Blocks **/
// we iterate on the original array to maintain ordering. probably can find a more efficient // we iterate on the original array to maintain ordering. probably can find a more efficient
// way to iterate over the `l1HeadersOfInterest` map while maintaining ordering // way to iterate over the `l1HeadersOfInterest` map while maintaining ordering
indexedL1Header := []*database.L1BlockHeader{} l1Headers := []*database.L1BlockHeader{}
for _, header := range headers { for _, header := range headers {
blockHash := header.Hash() blockHash := header.Hash()
_, hasLogs := l1HeadersOfInterest[blockHash] if _, hasLogs := l1HeadersOfInterest[blockHash]; !hasLogs {
if !hasLogs {
continue continue
} }
indexedL1Header = append(indexedL1Header, &database.L1BlockHeader{ l1Headers = append(l1Headers, &database.L1BlockHeader{
BlockHeader: database.BlockHeader{ BlockHeader: database.BlockHeader{
Hash: blockHash, Hash: blockHash,
ParentHash: header.ParentHash, ParentHash: header.ParentHash,
...@@ -151,22 +213,41 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -151,22 +213,41 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
/** Update Database **/ /** Update Database **/
numIndexedL1Headers := len(indexedL1Header) numL1Headers := len(l1Headers)
if numIndexedL1Headers > 0 { if numL1Headers == 0 {
processLog.Info("saved l1 blocks of interest within batch", "num", numIndexedL1Headers, "batchSize", numHeaders) processLog.Info("no l1 blocks of interest")
err = db.Blocks.StoreL1BlockHeaders(indexedL1Header) return nil
if err != nil { }
return err
} processLog.Info("saving l1 blocks of interest", "size", numL1Headers, "batch_size", numHeaders)
err = db.Blocks.StoreL1BlockHeaders(l1Headers)
if err != nil {
return err
}
// Since the headers to index are derived from the existence of logs, we know in this branch `numLogs > 0`
processLog.Info("saving contract logs", "size", numLogs)
err = db.ContractEvents.StoreL1ContractEvents(l1ContractEvents)
if err != nil {
return err
}
// Mark L2 checkpoints that have been recorded on L1 (L2OutputProposal & StateBatchAppended events)
numLegacyStateBatches := len(legacyStateBatches)
if numLegacyStateBatches > 0 {
latestBatch := legacyStateBatches[numLegacyStateBatches-1]
latestL2Height := latestBatch.PrevTotal + latestBatch.Size - 1
processLog.Info("detected legacy state batches", "size", numLegacyStateBatches, "latest_l2_block_number", latestL2Height)
}
// Since the headers to index are derived from the existence of logs, we know in this branch `numLogs > 0` numOutputProposals := len(outputProposals)
processLog.Info("saving contract logs", "size", numLogs) if numOutputProposals > 0 {
err = db.ContractEvents.StoreL1ContractEvents(l1ContractEvents) latestL2Height := outputProposals[numOutputProposals-1].L2BlockNumber.Int
processLog.Info("detected output proposals", "size", numOutputProposals, "latest_l2_block_number", latestL2Height)
err := db.Blocks.StoreOutputProposals(outputProposals)
if err != nil { if err != nil {
return err return err
} }
} else {
processLog.Info("no l1 blocks of interest within batch")
} }
// a-ok! // a-ok!
......
...@@ -58,7 +58,7 @@ func NewL2Processor(ethClient node.EthClient, db *database.DB, l2Contracts L2Con ...@@ -58,7 +58,7 @@ func NewL2Processor(ethClient node.EthClient, db *database.DB, l2Contracts L2Con
l2ProcessLog := log.New("processor", "l2") l2ProcessLog := log.New("processor", "l2")
l2ProcessLog.Info("initializing processor") l2ProcessLog.Info("initializing processor")
latestHeader, err := db.Blocks.FinalizedL2BlockHeader() latestHeader, err := db.Blocks.LatestL2BlockHeader()
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -80,17 +80,17 @@ func NewL2Processor(ethClient node.EthClient, db *database.DB, l2Contracts L2Con ...@@ -80,17 +80,17 @@ func NewL2Processor(ethClient node.EthClient, db *database.DB, l2Contracts L2Con
l2Processor := &L2Processor{ l2Processor := &L2Processor{
processor: processor{ processor: processor{
fetcher: node.NewFetcher(ethClient, fromL2Header), headerTraversal: node.NewHeaderTraversal(ethClient, fromL2Header),
db: db, db: db,
processFn: l2ProcessFn(l2ProcessLog, ethClient, l2Contracts), processFn: l2ProcessFn(l2ProcessLog, ethClient, l2Contracts),
processLog: l2ProcessLog, processLog: l2ProcessLog,
}, },
} }
return l2Processor, nil return l2Processor, nil
} }
func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2Contracts) func(db *database.DB, headers []*types.Header) error { func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2Contracts) ProcessFn {
rawEthClient := ethclient.NewClient(ethClient.RawRpcClient()) rawEthClient := ethclient.NewClient(ethClient.RawRpcClient())
contractAddrs := l2Contracts.toSlice() contractAddrs := l2Contracts.toSlice()
...@@ -98,7 +98,7 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2 ...@@ -98,7 +98,7 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
return func(db *database.DB, headers []*types.Header) error { return func(db *database.DB, headers []*types.Header) error {
numHeaders := len(headers) numHeaders := len(headers)
/** Index All L2 Blocks **/ /** Index all L2 blocks **/
l2Headers := make([]*database.L2BlockHeader, len(headers)) l2Headers := make([]*database.L2BlockHeader, len(headers))
l2HeaderMap := make(map[common.Hash]*types.Header) l2HeaderMap := make(map[common.Hash]*types.Header)
...@@ -129,8 +129,7 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2 ...@@ -129,8 +129,7 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
for i, log := range logs { for i, log := range logs {
header, ok := l2HeaderMap[log.BlockHash] header, ok := l2HeaderMap[log.BlockHash]
if !ok { if !ok {
// Log the individual headers in the batch? processLog.Error("contract event found with associated header not in the batch", "header", header, "log_index", log.Index)
processLog.Crit("contract event found with associated header not in the batch", "header", header, "log_index", log.Index)
return errors.New("parsed log with a block hash not in this batch") return errors.New("parsed log with a block hash not in this batch")
} }
...@@ -148,13 +147,14 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2 ...@@ -148,13 +147,14 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
/** Update Database **/ /** Update Database **/
processLog.Info("saving l2 blocks", "size", numHeaders)
err = db.Blocks.StoreL2BlockHeaders(l2Headers) err = db.Blocks.StoreL2BlockHeaders(l2Headers)
if err != nil { if err != nil {
return err return err
} }
if numLogs > 0 { if numLogs > 0 {
processLog.Info("detected new contract logs", "size", numLogs) processLog.Info("detected contract logs", "size", numLogs)
err = db.ContractEvents.StoreL2ContractEvents(l2ContractEvents) err = db.ContractEvents.StoreL2ContractEvents(l2ContractEvents)
if err != nil { if err != nil {
return err return err
......
...@@ -10,55 +10,61 @@ import ( ...@@ -10,55 +10,61 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
const defaultLoopInterval = 5 * time.Second const (
defaultLoopInterval = 5 * time.Second
defaultHeaderBufferSize = 500
)
// processFn is the the function used to process unindexed headers. In // ProcessFn is the the entrypoint for processing a batch of headers.
// the event of a failure, all database operations are not committed // In the event of failure, database operations are rolled back
type processFn func(*database.DB, []*types.Header) error type ProcessFn func(*database.DB, []*types.Header) error
type processor struct { type processor struct {
fetcher *node.Fetcher headerTraversal *node.HeaderTraversal
db *database.DB db *database.DB
processFn processFn processFn ProcessFn
processLog log.Logger processLog log.Logger
} }
// Start kicks off the processing loop // Start kicks off the processing loop
func (p processor) Start() { func (p processor) Start() {
pollTicker := time.NewTicker(defaultLoopInterval) pollTicker := time.NewTicker(defaultLoopInterval)
defer pollTicker.Stop()
p.processLog.Info("starting processor...") p.processLog.Info("starting processor...")
// Make this loop stoppable var unprocessedHeaders []*types.Header
for range pollTicker.C { for range pollTicker.C {
p.processLog.Info("checking for new headers...") if len(unprocessedHeaders) == 0 {
newHeaders, err := p.headerTraversal.NextFinalizedHeaders(defaultHeaderBufferSize)
headers, err := p.fetcher.NextFinalizedHeaders() if err != nil {
if err != nil { p.processLog.Error("error querying for headers", "err", err)
p.processLog.Error("unable to query for headers", "err", err) continue
continue } else if len(newHeaders) == 0 {
} // Logged as an error since this loop should be operating at a longer interval than the provider
p.processLog.Error("no new headers. processor unexpectedly at head...")
continue
}
if len(headers) == 0 { unprocessedHeaders = newHeaders
p.processLog.Info("no new headers. indexer must be at head...") } else {
continue p.processLog.Info("retrying previous batch")
} }
batchLog := p.processLog.New("startHeight", headers[0].Number, "endHeight", headers[len(headers)-1].Number) firstHeader := unprocessedHeaders[0]
batchLog.Info("indexing batch of headers") lastHeader := unprocessedHeaders[len(unprocessedHeaders)-1]
batchLog := p.processLog.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
// wrap operations within a single transaction batchLog.Info("processing batch")
err = p.db.Transaction(func(db *database.DB) error { err := p.db.Transaction(func(db *database.DB) error {
return p.processFn(db, headers) return p.processFn(db, unprocessedHeaders)
}) })
// TODO(DX-79) if processFn failed, the next poll should retry starting from this same batch of headers
if err != nil { if err != nil {
batchLog.Info("unable to index batch", "err", err) batchLog.Warn("error processing batch. no operations committed", "err", err)
panic(err)
} else { } else {
batchLog.Info("done indexing batch") batchLog.Info("fully committed batch")
unprocessedHeaders = nil
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment