Commit 999c473d authored by Hamdi Allam's avatar Hamdi Allam

fix outputproposed event decoding bug utilizing generic UnpackLog

parent 61e99b00
...@@ -2,9 +2,7 @@ package processor ...@@ -2,9 +2,7 @@ package processor
import ( import (
"context" "context"
"encoding/hex"
"errors" "errors"
"math/big"
"reflect" "reflect"
"github.com/google/uuid" "github.com/google/uuid"
...@@ -112,11 +110,13 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -112,11 +110,13 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
contractAddrs := l1Contracts.toSlice() contractAddrs := l1Contracts.toSlice()
processLog.Info("processor configured with contracts", "contracts", l1Contracts) processLog.Info("processor configured with contracts", "contracts", l1Contracts)
outputProposedEventSig := checkpointAbi.l2OutputOracle.Events["OutputProposed"].ID outputProposedEventName := "OutputProposed"
legacyStateBatchAppendedEventSig := checkpointAbi.legacyStateCommitmentChain.Events["StateBatchAppended"].ID outputProposedEventSig := checkpointAbi.l2OutputOracle.Events[outputProposedEventName].ID
legacyStateBatchAppendedEventName := "StateBatchAppended"
legacyStateBatchAppendedEventSig := checkpointAbi.legacyStateCommitmentChain.Events[legacyStateBatchAppendedEventName].ID
return func(db *database.DB, headers []*types.Header) error { return func(db *database.DB, headers []*types.Header) error {
numHeaders := len(headers)
headerMap := make(map[common.Hash]*types.Header) headerMap := make(map[common.Hash]*types.Header)
for _, header := range headers { for _, header := range headers {
headerMap[header.Hash()] = header headerMap[header.Hash()] = header
...@@ -124,7 +124,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -124,7 +124,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
/** Watch for all Optimism Contract Events **/ /** Watch for all Optimism Contract Events **/
logFilter := ethereum.FilterQuery{FromBlock: headers[0].Number, ToBlock: headers[numHeaders-1].Number, Addresses: contractAddrs} logFilter := ethereum.FilterQuery{FromBlock: headers[0].Number, ToBlock: headers[len(headers)-1].Number, Addresses: contractAddrs}
logs, err := rawEthClient.FilterLogs(context.Background(), logFilter) // []types.Log logs, err := rawEthClient.FilterLogs(context.Background(), logFilter) // []types.Log
if err != nil { if err != nil {
return err return err
...@@ -138,41 +138,43 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -138,41 +138,43 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
l1ContractEvents := make([]*database.L1ContractEvent, len(logs)) l1ContractEvents := make([]*database.L1ContractEvent, len(logs))
processedContractEvents := NewProcessedContractEvents() processedContractEvents := NewProcessedContractEvents()
for i, log := range logs { for i := range logs {
log := &logs[i]
header, ok := headerMap[log.BlockHash] header, ok := headerMap[log.BlockHash]
if !ok { if !ok {
processLog.Error("contract event found with associated header not in the batch", "header", log.BlockHash, "log_index", log.Index) processLog.Error("contract event found with associated header not in the batch", "header", log.BlockHash, "log_index", log.Index)
return errors.New("parsed log with a block hash not in this batch") return errors.New("parsed log with a block hash not in this batch")
} }
contractEvent := processedContractEvents.AddLog(&logs[i], header.Time) contractEvent := processedContractEvents.AddLog(log, header.Time)
l1HeadersOfInterest[log.BlockHash] = true l1HeadersOfInterest[log.BlockHash] = true
l1ContractEvents[i] = &database.L1ContractEvent{ContractEvent: *contractEvent} l1ContractEvents[i] = &database.L1ContractEvent{ContractEvent: *contractEvent}
// Track Checkpoint Events for L2 // Track Checkpoint Events for L2
switch contractEvent.EventSignature { switch contractEvent.EventSignature {
case outputProposedEventSig: case outputProposedEventSig:
if len(log.Topics) != 4 { var outputProposed bindings.L2OutputOracleOutputProposed
processLog.Error("parsed unexpected number of L2OutputOracle#OutputProposed log topics", "log_topics", log.Topics) err := UnpackLog(&outputProposed, log, outputProposedEventName, checkpointAbi.l2OutputOracle)
return errors.New("parsed unexpected OutputProposed event") if err != nil {
return err
} }
outputProposals = append(outputProposals, &database.OutputProposal{ outputProposals = append(outputProposals, &database.OutputProposal{
OutputRoot: log.Topics[1], OutputRoot: outputProposed.OutputRoot,
L2BlockNumber: database.U256{Int: new(big.Int).SetBytes(log.Topics[2].Bytes())}, L2OutputIndex: database.U256{Int: outputProposed.L2OutputIndex},
L2BlockNumber: database.U256{Int: outputProposed.L2BlockNumber},
L1ContractEventGUID: contractEvent.GUID, L1ContractEventGUID: contractEvent.GUID,
}) })
case legacyStateBatchAppendedEventSig: case legacyStateBatchAppendedEventSig:
var stateBatchAppended legacy_bindings.StateCommitmentChainStateBatchAppended var stateBatchAppended legacy_bindings.StateCommitmentChainStateBatchAppended
err := checkpointAbi.l2OutputOracle.UnpackIntoInterface(&stateBatchAppended, "StateBatchAppended", log.Data) err := UnpackLog(&stateBatchAppended, log, legacyStateBatchAppendedEventName, checkpointAbi.legacyStateCommitmentChain)
if err != nil || len(log.Topics) != 2 { if err != nil {
processLog.Error("unexpected StateCommitmentChain#StateBatchAppended log data or log topics", "log_topics", log.Topics, "log_data", hex.EncodeToString(log.Data), "err", err)
return err return err
} }
legacyStateBatches = append(legacyStateBatches, &database.LegacyStateBatch{ legacyStateBatches = append(legacyStateBatches, &database.LegacyStateBatch{
Index: new(big.Int).SetBytes(log.Topics[1].Bytes()).Uint64(), Index: stateBatchAppended.BatchIndex.Uint64(),
Root: stateBatchAppended.BatchRoot, Root: stateBatchAppended.BatchRoot,
Size: stateBatchAppended.BatchSize.Uint64(), Size: stateBatchAppended.BatchSize.Uint64(),
PrevTotal: stateBatchAppended.PrevTotalElements.Uint64(), PrevTotal: stateBatchAppended.PrevTotalElements.Uint64(),
...@@ -199,7 +201,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -199,7 +201,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
numIndexedL1Headers := len(indexedL1Headers) numIndexedL1Headers := len(indexedL1Headers)
if numIndexedL1Headers > 0 { if numIndexedL1Headers > 0 {
processLog.Info("saving l1 blocks with optimism logs", "size", numIndexedL1Headers, "batch_size", numHeaders) processLog.Info("saving l1 blocks with optimism logs", "size", numIndexedL1Headers, "batch_size", len(headers))
err = db.Blocks.StoreL1BlockHeaders(indexedL1Headers) err = db.Blocks.StoreL1BlockHeaders(indexedL1Headers)
if err != nil { if err != nil {
return err return err
......
...@@ -127,14 +127,15 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2 ...@@ -127,14 +127,15 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
l2ContractEvents := make([]*database.L2ContractEvent, len(logs)) l2ContractEvents := make([]*database.L2ContractEvent, len(logs))
processedContractEvents := NewProcessedContractEvents() processedContractEvents := NewProcessedContractEvents()
for i, log := range logs { for i := range logs {
log := &logs[i]
header, ok := l2HeaderMap[log.BlockHash] header, ok := l2HeaderMap[log.BlockHash]
if !ok { if !ok {
processLog.Error("contract event found with associated header not in the batch", "header", header, "log_index", log.Index) processLog.Error("contract event found with associated header not in the batch", "header", header, "log_index", log.Index)
return errors.New("parsed log with a block hash not in this batch") return errors.New("parsed log with a block hash not in this batch")
} }
contractEvent := processedContractEvents.AddLog(&logs[i], header.Time) contractEvent := processedContractEvents.AddLog(log, header.Time)
l2ContractEvents[i] = &database.L2ContractEvent{ContractEvent: *contractEvent} l2ContractEvents[i] = &database.L2ContractEvent{ContractEvent: *contractEvent}
} }
......
...@@ -26,12 +26,14 @@ type processor struct { ...@@ -26,12 +26,14 @@ type processor struct {
db *database.DB db *database.DB
processFn ProcessFn processFn ProcessFn
processLog log.Logger processLog log.Logger
latestProcessedHeader *types.Header
} }
// Start kicks off the processing loop. This is a block operation // Start kicks off the processing loop. This is a block operation
// unless the processor encountering an error, abrupting the loop, // unless the processor encountering an error, abrupting the loop,
// or the supplied context is cancelled. // or the supplied context is cancelled.
func (p processor) Start(ctx context.Context) error { func (p *processor) Start(ctx context.Context) error {
done := ctx.Done() done := ctx.Done()
pollTicker := time.NewTicker(defaultLoopInterval) pollTicker := time.NewTicker(defaultLoopInterval)
defer pollTicker.Stop() defer pollTicker.Stop()
...@@ -75,8 +77,14 @@ func (p processor) Start(ctx context.Context) error { ...@@ -75,8 +77,14 @@ func (p processor) Start(ctx context.Context) error {
batchLog.Warn("error processing batch. no operations committed", "err", err) batchLog.Warn("error processing batch. no operations committed", "err", err)
} else { } else {
batchLog.Info("fully committed batch") batchLog.Info("fully committed batch")
unprocessedHeaders = nil unprocessedHeaders = nil
p.latestProcessedHeader = lastHeader
} }
} }
} }
} }
func (p processor) LatestProcessedHeader() *types.Header {
return p.latestProcessedHeader
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment