Commit 8ad6f38a authored by Hamdi Allam's avatar Hamdi Allam

no need to support partial batch processing

parent 6063f24d
...@@ -48,7 +48,7 @@ type OutputProposal struct { ...@@ -48,7 +48,7 @@ type OutputProposal struct {
type BlocksView interface { type BlocksView interface {
LatestL1BlockHeader() (*L1BlockHeader, error) LatestL1BlockHeader() (*L1BlockHeader, error)
LatestOutputProposed() (*OutputProposal, error) LatestCheckpointedOutput() (*OutputProposal, error)
LatestL2BlockHeader() (*L2BlockHeader, error) LatestL2BlockHeader() (*L2BlockHeader, error)
} }
...@@ -106,7 +106,7 @@ func (db *blocksDB) LatestL1BlockHeader() (*L1BlockHeader, error) { ...@@ -106,7 +106,7 @@ func (db *blocksDB) LatestL1BlockHeader() (*L1BlockHeader, error) {
return &l1Header, nil return &l1Header, nil
} }
func (db *blocksDB) LatestOutputProposed() (*OutputProposal, error) { func (db *blocksDB) LatestCheckpointedOutput() (*OutputProposal, error) {
var outputProposal OutputProposal var outputProposal OutputProposal
result := db.gorm.Order("l2_block_number DESC").Take(&outputProposal) result := db.gorm.Order("l2_block_number DESC").Take(&outputProposal)
if result.Error != nil { if result.Error != nil {
......
...@@ -36,7 +36,7 @@ func (bf *BufferedHeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]*type ...@@ -36,7 +36,7 @@ func (bf *BufferedHeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]*type
return nil, err return nil, err
} }
// swallow the error and return existing buffered headers // swallow the error and return existing buffered headers since we have some
return bf.bufferedHeaders, nil return bf.bufferedHeaders, nil
} }
......
...@@ -115,7 +115,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -115,7 +115,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
outputProposedEventSig := checkpointAbi.l2OutputOracle.Events["OutputProposed"].ID outputProposedEventSig := checkpointAbi.l2OutputOracle.Events["OutputProposed"].ID
legacyStateBatchAppendedEventSig := checkpointAbi.legacyStateCommitmentChain.Events["StateBatchAppended"].ID legacyStateBatchAppendedEventSig := checkpointAbi.legacyStateCommitmentChain.Events["StateBatchAppended"].ID
return func(db *database.DB, headers []*types.Header) (*types.Header, error) { return func(db *database.DB, headers []*types.Header) error {
numHeaders := len(headers) numHeaders := len(headers)
headerMap := make(map[common.Hash]*types.Header) headerMap := make(map[common.Hash]*types.Header)
for _, header := range headers { for _, header := range headers {
...@@ -127,7 +127,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -127,7 +127,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
logFilter := ethereum.FilterQuery{FromBlock: headers[0].Number, ToBlock: headers[numHeaders-1].Number, Addresses: contractAddrs} logFilter := ethereum.FilterQuery{FromBlock: headers[0].Number, ToBlock: headers[numHeaders-1].Number, Addresses: contractAddrs}
logs, err := rawEthClient.FilterLogs(context.Background(), logFilter) logs, err := rawEthClient.FilterLogs(context.Background(), logFilter)
if err != nil { if err != nil {
return nil, err return err
} }
// L2 blocks posted on L1 // L2 blocks posted on L1
...@@ -141,7 +141,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -141,7 +141,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
header, ok := headerMap[log.BlockHash] header, ok := headerMap[log.BlockHash]
if !ok { if !ok {
processLog.Error("contract event found with associated header not in the batch", "header", log.BlockHash, "log_index", log.Index) processLog.Error("contract event found with associated header not in the batch", "header", log.BlockHash, "log_index", log.Index)
return nil, errors.New("parsed log with a block hash not in this batch") return errors.New("parsed log with a block hash not in this batch")
} }
contractEvent := &database.L1ContractEvent{ contractEvent := &database.L1ContractEvent{
...@@ -163,7 +163,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -163,7 +163,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
case outputProposedEventSig: case outputProposedEventSig:
if len(log.Topics) != 4 { if len(log.Topics) != 4 {
processLog.Error("parsed unexpected number of L2OutputOracle#OutputProposed log topics", "log_topics", log.Topics) processLog.Error("parsed unexpected number of L2OutputOracle#OutputProposed log topics", "log_topics", log.Topics)
return nil, errors.New("parsed unexpected OutputProposed event") return errors.New("parsed unexpected OutputProposed event")
} }
outputProposals = append(outputProposals, &database.OutputProposal{ outputProposals = append(outputProposals, &database.OutputProposal{
...@@ -177,7 +177,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -177,7 +177,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
err := checkpointAbi.l2OutputOracle.UnpackIntoInterface(&stateBatchAppended, "StateBatchAppended", log.Data) err := checkpointAbi.l2OutputOracle.UnpackIntoInterface(&stateBatchAppended, "StateBatchAppended", log.Data)
if err != nil || len(log.Topics) != 2 { if err != nil || len(log.Topics) != 2 {
processLog.Error("unexpected StateCommitmentChain#StateBatchAppended log data or log topics", "log_topics", log.Topics, "log_data", hex.EncodeToString(log.Data), "err", err) processLog.Error("unexpected StateCommitmentChain#StateBatchAppended log data or log topics", "log_topics", log.Topics, "log_data", hex.EncodeToString(log.Data), "err", err)
return nil, err return err
} }
legacyStateBatches = append(legacyStateBatches, &database.LegacyStateBatch{ legacyStateBatches = append(legacyStateBatches, &database.LegacyStateBatch{
...@@ -216,20 +216,20 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -216,20 +216,20 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
numL1Headers := len(l1Headers) numL1Headers := len(l1Headers)
if numL1Headers == 0 { if numL1Headers == 0 {
processLog.Info("no l1 blocks of interest") processLog.Info("no l1 blocks of interest")
return headers[numHeaders-1], nil return nil
} }
processLog.Info("saving l1 blocks of interest", "size", numL1Headers, "batch_size", numHeaders) processLog.Info("saving l1 blocks of interest", "size", numL1Headers, "batch_size", numHeaders)
err = db.Blocks.StoreL1BlockHeaders(l1Headers) err = db.Blocks.StoreL1BlockHeaders(l1Headers)
if err != nil { if err != nil {
return nil, err return err
} }
// Since the headers to index are derived from the existence of logs, we know in this branch `numLogs > 0` // Since the headers to index are derived from the existence of logs, we know in this branch `numLogs > 0`
processLog.Info("saving contract logs", "size", numLogs) processLog.Info("saving contract logs", "size", numLogs)
err = db.ContractEvents.StoreL1ContractEvents(l1ContractEvents) err = db.ContractEvents.StoreL1ContractEvents(l1ContractEvents)
if err != nil { if err != nil {
return nil, err return err
} }
// Mark L2 checkpoints that have been recorded on L1 (L2OutputProposal & StateBatchAppended events) // Mark L2 checkpoints that have been recorded on L1 (L2OutputProposal & StateBatchAppended events)
...@@ -246,11 +246,11 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -246,11 +246,11 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
processLog.Info("detected output proposals", "size", numOutputProposals, "latest_l2_block_number", latestL2Height) processLog.Info("detected output proposals", "size", numOutputProposals, "latest_l2_block_number", latestL2Height)
err := db.Blocks.StoreOutputProposals(outputProposals) err := db.Blocks.StoreOutputProposals(outputProposals)
if err != nil { if err != nil {
return nil, err return err
} }
} }
// a-ok! // a-ok!
return headers[numHeaders-1], nil return nil
} }
} }
...@@ -3,7 +3,6 @@ package processor ...@@ -3,7 +3,6 @@ package processor
import ( import (
"context" "context"
"errors" "errors"
"math/big"
"reflect" "reflect"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
...@@ -96,36 +95,9 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2 ...@@ -96,36 +95,9 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
contractAddrs := l2Contracts.toSlice() contractAddrs := l2Contracts.toSlice()
processLog.Info("processor configured with contracts", "contracts", l2Contracts) processLog.Info("processor configured with contracts", "contracts", l2Contracts)
return func(db *database.DB, headers []*types.Header) (*types.Header, error) { return func(db *database.DB, headers []*types.Header) error {
numHeaders := len(headers) numHeaders := len(headers)
latestOutput, err := db.Blocks.LatestOutputProposed()
if err != nil {
return nil, err
} else if latestOutput == nil {
processLog.Warn("no checkpointed outputs found. waiting...")
return nil, errors.New("no checkpointed l2 outputs")
}
// check if any of these blocks have been published to L1
latestOutputHeight := latestOutput.L2BlockNumber.Int
if headers[0].Number.Cmp(latestOutputHeight) > 0 {
processLog.Warn("entire batch exceeds the latest output", "latest_output_block_number", latestOutputHeight)
return nil, errors.New("entire batch exceeds latest output")
}
// check if we need to partially process this batch
if headers[numHeaders-1].Number.Cmp(latestOutputHeight) > 0 {
processLog.Info("reducing batch", "latest_output_block_number", latestOutputHeight)
// reduce the batch size
lastHeaderIndex := new(big.Int).Sub(latestOutputHeight, headers[0].Number).Uint64()
// update markers (including `lastHeaderIndex`)
headers = headers[:lastHeaderIndex+1]
numHeaders = len(headers)
}
/** Index all L2 blocks **/ /** Index all L2 blocks **/
l2Headers := make([]*database.L2BlockHeader, len(headers)) l2Headers := make([]*database.L2BlockHeader, len(headers))
...@@ -149,7 +121,7 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2 ...@@ -149,7 +121,7 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
logFilter := ethereum.FilterQuery{FromBlock: headers[0].Number, ToBlock: headers[numHeaders-1].Number, Addresses: contractAddrs} logFilter := ethereum.FilterQuery{FromBlock: headers[0].Number, ToBlock: headers[numHeaders-1].Number, Addresses: contractAddrs}
logs, err := rawEthClient.FilterLogs(context.Background(), logFilter) logs, err := rawEthClient.FilterLogs(context.Background(), logFilter)
if err != nil { if err != nil {
return nil, err return err
} }
numLogs := len(logs) numLogs := len(logs)
...@@ -158,7 +130,7 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2 ...@@ -158,7 +130,7 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
header, ok := l2HeaderMap[log.BlockHash] header, ok := l2HeaderMap[log.BlockHash]
if !ok { if !ok {
processLog.Error("contract event found with associated header not in the batch", "header", header, "log_index", log.Index) processLog.Error("contract event found with associated header not in the batch", "header", header, "log_index", log.Index)
return nil, errors.New("parsed log with a block hash not in this batch") return errors.New("parsed log with a block hash not in this batch")
} }
l2ContractEvents[i] = &database.L2ContractEvent{ l2ContractEvents[i] = &database.L2ContractEvent{
...@@ -178,18 +150,18 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2 ...@@ -178,18 +150,18 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
processLog.Info("saving l2 blocks", "size", numHeaders) processLog.Info("saving l2 blocks", "size", numHeaders)
err = db.Blocks.StoreL2BlockHeaders(l2Headers) err = db.Blocks.StoreL2BlockHeaders(l2Headers)
if err != nil { if err != nil {
return nil, err return err
} }
if numLogs > 0 { if numLogs > 0 {
processLog.Info("detected contract logs", "size", numLogs) processLog.Info("detected contract logs", "size", numLogs)
err = db.ContractEvents.StoreL2ContractEvents(l2ContractEvents) err = db.ContractEvents.StoreL2ContractEvents(l2ContractEvents)
if err != nil { if err != nil {
return nil, err return err
} }
} }
// a-ok! // a-ok!
return headers[numHeaders-1], nil return nil
} }
} }
...@@ -10,12 +10,14 @@ import ( ...@@ -10,12 +10,14 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
const defaultLoopInterval = 5 * time.Second const (
defaultLoopInterval = 5 * time.Second
defaultHeaderBufferSize = 500
)
// ProcessFn is the the entrypoint for processing a batch of headers. To support // ProcessFn is the the entrypoint for processing a batch of headers.
// partial batch processing, the function must return the last processed header // In the event of failure, database operations are rolled back
// in the batch. In the event of failure, database operations are rolled back type ProcessFn func(*database.DB, []*types.Header) error
type ProcessFn func(*database.DB, []*types.Header) (*types.Header, error)
type processor struct { type processor struct {
headerTraversal *node.BufferedHeaderTraversal headerTraversal *node.BufferedHeaderTraversal
...@@ -32,7 +34,7 @@ func (p processor) Start() { ...@@ -32,7 +34,7 @@ func (p processor) Start() {
p.processLog.Info("starting processor...") p.processLog.Info("starting processor...")
for range pollTicker.C { for range pollTicker.C {
headers, err := p.headerTraversal.NextFinalizedHeaders(500) headers, err := p.headerTraversal.NextFinalizedHeaders(defaultHeaderBufferSize)
if err != nil { if err != nil {
p.processLog.Error("error querying for headers", "err", err) p.processLog.Error("error querying for headers", "err", err)
continue continue
...@@ -45,30 +47,18 @@ func (p processor) Start() { ...@@ -45,30 +47,18 @@ func (p processor) Start() {
batchLog := p.processLog.New("batch_start_block_number", headers[0].Number, "batch_end_block_number", headers[len(headers)-1].Number) batchLog := p.processLog.New("batch_start_block_number", headers[0].Number, "batch_end_block_number", headers[len(headers)-1].Number)
batchLog.Info("processing batch") batchLog.Info("processing batch")
var lastProcessedHeader *types.Header
err = p.db.Transaction(func(db *database.DB) error { err = p.db.Transaction(func(db *database.DB) error {
lastProcessedHeader, err = p.processFn(db, headers) err := p.processFn(db, headers)
if err != nil { if err != nil {
return err return err
} }
return p.headerTraversal.Advance(headers[len(headers)-1])
err = p.headerTraversal.Advance(lastProcessedHeader)
if err != nil {
batchLog.Error("unable to advance processor", "last_processed_block_number", lastProcessedHeader.Number)
return err
}
return nil
}) })
if err != nil { if err != nil {
batchLog.Warn("error processing batch. no operations committed", "err", err) batchLog.Warn("error processing batch. no operations committed", "err", err)
} else { } else {
if lastProcessedHeader.Number.Cmp(headers[len(headers)-1].Number) == 0 { batchLog.Info("fully committed batch")
batchLog.Info("fully committed batch")
} else {
batchLog.Info("partially committed batch", "last_processed_block_number", lastProcessedHeader.Number)
}
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment