Commit 6063f24d authored by Hamdi Allam's avatar Hamdi Allam

update l1 processor to index output proposals. l2 process only indexes checkpointed blocks

parent 69955784
......@@ -2,14 +2,20 @@ package processor
import (
"context"
"encoding/hex"
"errors"
"math/big"
"reflect"
"github.com/google/uuid"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/google/uuid"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
legacy_bindings "github.com/ethereum-optimism/optimism/op-bindings/legacy-bindings"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
......@@ -30,6 +36,11 @@ type L1Contracts struct {
// Remove afterwards?
}
type checkpointAbi struct {
l2OutputOracle *abi.ABI
legacyStateCommitmentChain *abi.ABI
}
func (c L1Contracts) toSlice() []common.Address {
fields := reflect.VisibleFields(reflect.TypeOf(c))
v := reflect.ValueOf(c)
......@@ -50,7 +61,19 @@ func NewL1Processor(ethClient node.EthClient, db *database.DB, l1Contracts L1Con
l1ProcessLog := log.New("processor", "l1")
l1ProcessLog.Info("initializing processor")
latestHeader, err := db.Blocks.FinalizedL1BlockHeader()
l2OutputOracleABI, err := bindings.L2OutputOracleMetaData.GetAbi()
if err != nil {
l1ProcessLog.Error("unable to generate L2OutputOracle ABI", "err", err)
return nil, err
}
legacyStateCommitmentChainABI, err := legacy_bindings.StateCommitmentChainMetaData.GetAbi()
if err != nil {
l1ProcessLog.Error("unable to generate legacy StateCommitmentChain ABI", "err", err)
return nil, err
}
checkpointAbi := checkpointAbi{l2OutputOracle: l2OutputOracleABI, legacyStateCommitmentChain: legacyStateCommitmentChainABI}
latestHeader, err := db.Blocks.LatestL1BlockHeader()
if err != nil {
return nil, err
}
......@@ -66,16 +89,16 @@ func NewL1Processor(ethClient node.EthClient, db *database.DB, l1Contracts L1Con
fromL1Header = l1Header
} else {
// we shouldn't start from genesis with l1. Need a "genesis" height to be defined here
// we shouldn't start from genesis with l1. Need a "genesis" L1 height provided for the rollup
l1ProcessLog.Info("no indexed state, starting from genesis")
fromL1Header = nil
}
l1Processor := &L1Processor{
processor: processor{
fetcher: node.NewFetcher(ethClient, fromL1Header),
headerTraversal: node.NewBufferedHeaderTraversal(ethClient, fromL1Header),
db: db,
processFn: l1ProcessFn(l1ProcessLog, ethClient, l1Contracts),
processFn: l1ProcessFn(l1ProcessLog, ethClient, l1Contracts, checkpointAbi),
processLog: l1ProcessLog,
},
}
......@@ -83,17 +106,20 @@ func NewL1Processor(ethClient node.EthClient, db *database.DB, l1Contracts L1Con
return l1Processor, nil
}
func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1Contracts) func(db *database.DB, headers []*types.Header) error {
func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1Contracts, checkpointAbi checkpointAbi) ProcessFn {
rawEthClient := ethclient.NewClient(ethClient.RawRpcClient())
contractAddrs := l1Contracts.toSlice()
processLog.Info("processor configured with contracts", "contracts", l1Contracts)
return func(db *database.DB, headers []*types.Header) error {
outputProposedEventSig := checkpointAbi.l2OutputOracle.Events["OutputProposed"].ID
legacyStateBatchAppendedEventSig := checkpointAbi.legacyStateCommitmentChain.Events["StateBatchAppended"].ID
return func(db *database.DB, headers []*types.Header) (*types.Header, error) {
numHeaders := len(headers)
l1HeaderMap := make(map[common.Hash]*types.Header)
headerMap := make(map[common.Hash]*types.Header)
for _, header := range headers {
l1HeaderMap[header.Hash()] = header
headerMap[header.Hash()] = header
}
/** Watch for Contract Events **/
......@@ -101,21 +127,24 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
logFilter := ethereum.FilterQuery{FromBlock: headers[0].Number, ToBlock: headers[numHeaders-1].Number, Addresses: contractAddrs}
logs, err := rawEthClient.FilterLogs(context.Background(), logFilter)
if err != nil {
return err
return nil, err
}
// L2 blocks posted on L1
outputProposals := []*database.OutputProposal{}
legacyStateBatches := []*database.LegacyStateBatch{}
numLogs := len(logs)
l1ContractEvents := make([]*database.L1ContractEvent, numLogs)
l1HeadersOfInterest := make(map[common.Hash]bool)
for i, log := range logs {
header, ok := l1HeaderMap[log.BlockHash]
header, ok := headerMap[log.BlockHash]
if !ok {
processLog.Crit("contract event found with associated header not in the batch", "header", log.BlockHash, "log_index", log.Index)
return errors.New("parsed log with a block hash not in this batch")
processLog.Error("contract event found with associated header not in the batch", "header", log.BlockHash, "log_index", log.Index)
return nil, errors.New("parsed log with a block hash not in this batch")
}
l1HeadersOfInterest[log.BlockHash] = true
l1ContractEvents[i] = &database.L1ContractEvent{
contractEvent := &database.L1ContractEvent{
ContractEvent: database.ContractEvent{
GUID: uuid.New(),
BlockHash: log.BlockHash,
......@@ -125,21 +154,54 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
Timestamp: header.Time,
},
}
l1ContractEvents[i] = contractEvent
l1HeadersOfInterest[log.BlockHash] = true
// Track Checkpoint Events for L2
switch contractEvent.EventSignature {
case outputProposedEventSig:
if len(log.Topics) != 4 {
processLog.Error("parsed unexpected number of L2OutputOracle#OutputProposed log topics", "log_topics", log.Topics)
return nil, errors.New("parsed unexpected OutputProposed event")
}
/** Index L1 Blocks that have an optimism event **/
outputProposals = append(outputProposals, &database.OutputProposal{
OutputRoot: log.Topics[1],
L2BlockNumber: database.U256{Int: new(big.Int).SetBytes(log.Topics[2].Bytes())},
L1ContractEventGUID: contractEvent.GUID,
})
case legacyStateBatchAppendedEventSig:
var stateBatchAppended legacy_bindings.StateCommitmentChainStateBatchAppended
err := checkpointAbi.l2OutputOracle.UnpackIntoInterface(&stateBatchAppended, "StateBatchAppended", log.Data)
if err != nil || len(log.Topics) != 2 {
processLog.Error("unexpected StateCommitmentChain#StateBatchAppended log data or log topics", "log_topics", log.Topics, "log_data", hex.EncodeToString(log.Data), "err", err)
return nil, err
}
legacyStateBatches = append(legacyStateBatches, &database.LegacyStateBatch{
Index: new(big.Int).SetBytes(log.Topics[1].Bytes()).Uint64(),
Root: stateBatchAppended.BatchRoot,
Size: stateBatchAppended.BatchSize.Uint64(),
PrevTotal: stateBatchAppended.PrevTotalElements.Uint64(),
L1ContractEventGUID: contractEvent.GUID,
})
}
}
/** Aggregate applicable L1 Blocks **/
// we iterate on the original array to maintain ordering. probably can find a more efficient
// way to iterate over the `l1HeadersOfInterest` map while maintaining ordering
indexedL1Header := []*database.L1BlockHeader{}
l1Headers := []*database.L1BlockHeader{}
for _, header := range headers {
blockHash := header.Hash()
_, hasLogs := l1HeadersOfInterest[blockHash]
if !hasLogs {
if _, hasLogs := l1HeadersOfInterest[blockHash]; !hasLogs {
continue
}
indexedL1Header = append(indexedL1Header, &database.L1BlockHeader{
l1Headers = append(l1Headers, &database.L1BlockHeader{
BlockHeader: database.BlockHeader{
Hash: blockHash,
ParentHash: header.ParentHash,
......@@ -151,25 +213,44 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
/** Update Database **/
numIndexedL1Headers := len(indexedL1Header)
if numIndexedL1Headers > 0 {
processLog.Info("saved l1 blocks of interest within batch", "num", numIndexedL1Headers, "batchSize", numHeaders)
err = db.Blocks.StoreL1BlockHeaders(indexedL1Header)
numL1Headers := len(l1Headers)
if numL1Headers == 0 {
processLog.Info("no l1 blocks of interest")
return headers[numHeaders-1], nil
}
processLog.Info("saving l1 blocks of interest", "size", numL1Headers, "batch_size", numHeaders)
err = db.Blocks.StoreL1BlockHeaders(l1Headers)
if err != nil {
return err
return nil, err
}
// Since the headers to index are derived from the existence of logs, we know in this branch `numLogs > 0`
processLog.Info("saving contract logs", "size", numLogs)
err = db.ContractEvents.StoreL1ContractEvents(l1ContractEvents)
if err != nil {
return err
return nil, err
}
// Mark L2 checkpoints that have been recorded on L1 (L2OutputProposal & StateBatchAppended events)
numLegacyStateBatches := len(legacyStateBatches)
if numLegacyStateBatches > 0 {
latestBatch := legacyStateBatches[numLegacyStateBatches-1]
latestL2Height := latestBatch.PrevTotal + latestBatch.Size - 1
processLog.Info("detected legacy state batches", "size", numLegacyStateBatches, "latest_l2_block_number", latestL2Height)
}
numOutputProposals := len(outputProposals)
if numOutputProposals > 0 {
latestL2Height := outputProposals[numOutputProposals-1].L2BlockNumber.Int
processLog.Info("detected output proposals", "size", numOutputProposals, "latest_l2_block_number", latestL2Height)
err := db.Blocks.StoreOutputProposals(outputProposals)
if err != nil {
return nil, err
}
} else {
processLog.Info("no l1 blocks of interest within batch")
}
// a-ok!
return nil
return headers[numHeaders-1], nil
}
}
......@@ -3,6 +3,7 @@ package processor
import (
"context"
"errors"
"math/big"
"reflect"
"github.com/ethereum-optimism/optimism/indexer/database"
......@@ -58,7 +59,7 @@ func NewL2Processor(ethClient node.EthClient, db *database.DB, l2Contracts L2Con
l2ProcessLog := log.New("processor", "l2")
l2ProcessLog.Info("initializing processor")
latestHeader, err := db.Blocks.FinalizedL2BlockHeader()
latestHeader, err := db.Blocks.LatestL2BlockHeader()
if err != nil {
return nil, err
}
......@@ -80,7 +81,7 @@ func NewL2Processor(ethClient node.EthClient, db *database.DB, l2Contracts L2Con
l2Processor := &L2Processor{
processor: processor{
fetcher: node.NewFetcher(ethClient, fromL2Header),
headerTraversal: node.NewBufferedHeaderTraversal(ethClient, fromL2Header),
db: db,
processFn: l2ProcessFn(l2ProcessLog, ethClient, l2Contracts),
processLog: l2ProcessLog,
......@@ -90,15 +91,42 @@ func NewL2Processor(ethClient node.EthClient, db *database.DB, l2Contracts L2Con
return l2Processor, nil
}
func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2Contracts) func(db *database.DB, headers []*types.Header) error {
func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2Contracts) ProcessFn {
rawEthClient := ethclient.NewClient(ethClient.RawRpcClient())
contractAddrs := l2Contracts.toSlice()
processLog.Info("processor configured with contracts", "contracts", l2Contracts)
return func(db *database.DB, headers []*types.Header) error {
return func(db *database.DB, headers []*types.Header) (*types.Header, error) {
numHeaders := len(headers)
/** Index All L2 Blocks **/
latestOutput, err := db.Blocks.LatestOutputProposed()
if err != nil {
return nil, err
} else if latestOutput == nil {
processLog.Warn("no checkpointed outputs found. waiting...")
return nil, errors.New("no checkpointed l2 outputs")
}
// check if any of these blocks have been published to L1
latestOutputHeight := latestOutput.L2BlockNumber.Int
if headers[0].Number.Cmp(latestOutputHeight) > 0 {
processLog.Warn("entire batch exceeds the latest output", "latest_output_block_number", latestOutputHeight)
return nil, errors.New("entire batch exceeds latest output")
}
// check if we need to partially process this batch
if headers[numHeaders-1].Number.Cmp(latestOutputHeight) > 0 {
processLog.Info("reducing batch", "latest_output_block_number", latestOutputHeight)
// reduce the batch size
lastHeaderIndex := new(big.Int).Sub(latestOutputHeight, headers[0].Number).Uint64()
// update markers (including `lastHeaderIndex`)
headers = headers[:lastHeaderIndex+1]
numHeaders = len(headers)
}
/** Index all L2 blocks **/
l2Headers := make([]*database.L2BlockHeader, len(headers))
l2HeaderMap := make(map[common.Hash]*types.Header)
......@@ -121,7 +149,7 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
logFilter := ethereum.FilterQuery{FromBlock: headers[0].Number, ToBlock: headers[numHeaders-1].Number, Addresses: contractAddrs}
logs, err := rawEthClient.FilterLogs(context.Background(), logFilter)
if err != nil {
return err
return nil, err
}
numLogs := len(logs)
......@@ -129,9 +157,8 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
for i, log := range logs {
header, ok := l2HeaderMap[log.BlockHash]
if !ok {
// Log the individual headers in the batch?
processLog.Crit("contract event found with associated header not in the batch", "header", header, "log_index", log.Index)
return errors.New("parsed log with a block hash not in this batch")
processLog.Error("contract event found with associated header not in the batch", "header", header, "log_index", log.Index)
return nil, errors.New("parsed log with a block hash not in this batch")
}
l2ContractEvents[i] = &database.L2ContractEvent{
......@@ -148,20 +175,21 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
/** Update Database **/
processLog.Info("saving l2 blocks", "size", numHeaders)
err = db.Blocks.StoreL2BlockHeaders(l2Headers)
if err != nil {
return err
return nil, err
}
if numLogs > 0 {
processLog.Info("detected new contract logs", "size", numLogs)
processLog.Info("detected contract logs", "size", numLogs)
err = db.ContractEvents.StoreL2ContractEvents(l2ContractEvents)
if err != nil {
return err
return nil, err
}
}
// a-ok!
return nil
return headers[numHeaders-1], nil
}
}
......@@ -12,53 +12,63 @@ import (
const defaultLoopInterval = 5 * time.Second
// processFn is the the function used to process unindexed headers. In
// the event of a failure, all database operations are not committed
type processFn func(*database.DB, []*types.Header) error
// ProcessFn is the the entrypoint for processing a batch of headers. To support
// partial batch processing, the function must return the last processed header
// in the batch. In the event of failure, database operations are rolled back
type ProcessFn func(*database.DB, []*types.Header) (*types.Header, error)
type processor struct {
fetcher *node.Fetcher
headerTraversal *node.BufferedHeaderTraversal
db *database.DB
processFn processFn
processFn ProcessFn
processLog log.Logger
}
// Start kicks off the processing loop
func (p processor) Start() {
pollTicker := time.NewTicker(defaultLoopInterval)
p.processLog.Info("starting processor...")
defer pollTicker.Stop()
// Make this loop stoppable
p.processLog.Info("starting processor...")
for range pollTicker.C {
p.processLog.Info("checking for new headers...")
headers, err := p.fetcher.NextFinalizedHeaders()
headers, err := p.headerTraversal.NextFinalizedHeaders(500)
if err != nil {
p.processLog.Error("unable to query for headers", "err", err)
p.processLog.Error("error querying for headers", "err", err)
continue
}
if len(headers) == 0 {
p.processLog.Info("no new headers. indexer must be at head...")
} else if len(headers) == 0 {
// Logged as an error since this loop should be operating at a longer interval than the provider
p.processLog.Error("no new headers. processor unexpectadly at head...")
continue
}
batchLog := p.processLog.New("startHeight", headers[0].Number, "endHeight", headers[len(headers)-1].Number)
batchLog.Info("indexing batch of headers")
batchLog := p.processLog.New("batch_start_block_number", headers[0].Number, "batch_end_block_number", headers[len(headers)-1].Number)
batchLog.Info("processing batch")
// wrap operations within a single transaction
var lastProcessedHeader *types.Header
err = p.db.Transaction(func(db *database.DB) error {
return p.processFn(db, headers)
})
lastProcessedHeader, err = p.processFn(db, headers)
if err != nil {
return err
}
// TODO(DX-79) if processFn failed, the next poll should retry starting from this same batch of headers
err = p.headerTraversal.Advance(lastProcessedHeader)
if err != nil {
batchLog.Error("unable to advance processor", "last_processed_block_number", lastProcessedHeader.Number)
return err
}
return nil
})
if err != nil {
batchLog.Info("unable to index batch", "err", err)
panic(err)
batchLog.Warn("error processing batch. no operations committed", "err", err)
} else {
if lastProcessedHeader.Number.Cmp(headers[len(headers)-1].Number) == 0 {
batchLog.Info("fully committed batch")
} else {
batchLog.Info("done indexing batch")
batchLog.Info("partially committed batch", "last_processed_block_number", lastProcessedHeader.Number)
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment