Commit 462cc316 authored by Hamdi Allam's avatar Hamdi Allam

fix bridge processor height detection on startup

parent cf040a8c
...@@ -233,7 +233,7 @@ func (db *blocksDB) LatestEpoch() (*Epoch, error) { ...@@ -233,7 +233,7 @@ func (db *blocksDB) LatestEpoch() (*Epoch, error) {
// L2 for a faster query. Per the protocol, the L2 block that starts a new epoch // L2 for a faster query. Per the protocol, the L2 block that starts a new epoch
// will have a matching timestamp with the L1 origin. // will have a matching timestamp with the L1 origin.
query := db.gorm.Table("l1_block_headers").Order("l1_block_headers.timestamp DESC") query := db.gorm.Table("l1_block_headers").Order("l1_block_headers.timestamp DESC")
query = query.Joins("INNER JOIN l2_block_headers ON l1_block_headers.timestamp = l2_block_headers.timestamp") query = query.Joins("INNER JOIN l2_block_headers ON l2_block_headers.timestamp = l1_block_headers.timestamp")
query = query.Select("*") query = query.Select("*")
var epoch Epoch var epoch Epoch
......
...@@ -47,7 +47,10 @@ type L2TransactionWithdrawal struct { ...@@ -47,7 +47,10 @@ type L2TransactionWithdrawal struct {
type BridgeTransactionsView interface { type BridgeTransactionsView interface {
L1TransactionDeposit(common.Hash) (*L1TransactionDeposit, error) L1TransactionDeposit(common.Hash) (*L1TransactionDeposit, error)
L1LatestBlockHeader() (*L1BlockHeader, error)
L2TransactionWithdrawal(common.Hash) (*L2TransactionWithdrawal, error) L2TransactionWithdrawal(common.Hash) (*L2TransactionWithdrawal, error)
L2LatestBlockHeader() (*L2BlockHeader, error)
} }
type BridgeTransactionsDB interface { type BridgeTransactionsDB interface {
...@@ -94,6 +97,37 @@ func (db *bridgeTransactionsDB) L1TransactionDeposit(sourceHash common.Hash) (*L ...@@ -94,6 +97,37 @@ func (db *bridgeTransactionsDB) L1TransactionDeposit(sourceHash common.Hash) (*L
return &deposit, nil return &deposit, nil
} }
func (db *bridgeTransactionsDB) L1LatestBlockHeader() (*L1BlockHeader, error) {
// Markers for an indexed bridge event
// L1: Latest Transaction Deposit, Latest Proven/Finalized Withdrawal
l1DepositQuery := db.gorm.Table("l1_transaction_deposits").Order("l1_transaction_deposits.timestamp DESC").Limit(1)
l1DepositQuery = l1DepositQuery.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = l1_transaction_deposits.initiated_l1_event_guid")
l1DepositQuery = l1DepositQuery.Select("l1_contract_events.*")
l1ProvenQuery := db.gorm.Table("l2_transaction_withdrawals")
l1ProvenQuery = l1ProvenQuery.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = l2_transaction_withdrawals.proven_l1_event_guid")
l1ProvenQuery = l1ProvenQuery.Order("l1_contract_events.timestamp DESC").Select("l1_contract_events.*").Limit(1)
l1FinalizedQuery := db.gorm.Table("l2_transaction_withdrawals")
l1FinalizedQuery = l1FinalizedQuery.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = l2_transaction_withdrawals.proven_l1_event_guid")
l1FinalizedQuery = l1FinalizedQuery.Order("l1_contract_events.timestamp DESC").Select("l1_contract_events.*").Limit(1)
l1Query := db.gorm.Table("((?) UNION (?) UNION (?)) AS latest_bridge_events", l1DepositQuery.Limit(1), l1ProvenQuery, l1FinalizedQuery)
l1Query = l1Query.Joins("INNER JOIN l1_block_headers ON l1_block_headers.hash = latest_bridge_events.block_hash")
l1Query = l1Query.Order("l1_block_headers.number DESC").Select("l1_block_headers.*")
var l1Header L1BlockHeader
result := l1Query.Take(&l1Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
return &l1Header, nil
}
/** /**
* Transactions withdrawn from L2 * Transactions withdrawn from L2
*/ */
...@@ -149,3 +183,25 @@ func (db *bridgeTransactionsDB) MarkL2TransactionWithdrawalFinalizedEvent(withdr ...@@ -149,3 +183,25 @@ func (db *bridgeTransactionsDB) MarkL2TransactionWithdrawalFinalizedEvent(withdr
result := db.gorm.Save(&withdrawal) result := db.gorm.Save(&withdrawal)
return result.Error return result.Error
} }
func (db *bridgeTransactionsDB) L2LatestBlockHeader() (*L2BlockHeader, error) {
// L2: Inclusion of the latest deposit
l1DepositQuery := db.gorm.Table("l1_transaction_deposits").Order("l1_transaction_deposits.timestamp DESC")
l1DepositQuery = l1DepositQuery.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = l1_transaction_deposits.initiated_l1_event_guid")
l1DepositQuery = l1DepositQuery.Select("l1_contract_events.*")
l2Query := db.gorm.Table("(?) AS l1_deposit_events", l1DepositQuery)
l2Query = l2Query.Joins("INNER JOIN l2_block_headers ON l2_block_headers.timestamp = l1_deposit_events.timestamp")
l2Query = l2Query.Select("l2_block_headers.*")
var l2Header L2BlockHeader
result := l2Query.Take(&l2Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
return &l2Header, nil
}
...@@ -44,7 +44,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -44,7 +44,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
fromHeader = latestHeader.RLPHeader.Header() fromHeader = latestHeader.RLPHeader.Header()
} else if cfg.StartHeight.BitLen() > 0 { } else if cfg.StartHeight.BitLen() > 0 {
log.Info("no indexed state in storage, starting from supplied L1 height", "height", cfg.StartHeight.String()) log.Info("no indexed state starting from supplied L1 height", "height", cfg.StartHeight.String())
header, err := client.BlockHeaderByNumber(cfg.StartHeight) header, err := client.BlockHeaderByNumber(cfg.StartHeight)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not fetch starting block header: %w", err) return nil, fmt.Errorf("could not fetch starting block header: %w", err)
...@@ -53,7 +53,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli ...@@ -53,7 +53,7 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
fromHeader = header fromHeader = header
} else { } else {
log.Info("no indexed state in storage, starting from L1 genesis") log.Info("no indexed state, starting from genesis")
} }
// NOTE - The use of un-buffered channel here assumes that downstream consumers // NOTE - The use of un-buffered channel here assumes that downstream consumers
......
...@@ -29,35 +29,32 @@ type BridgeProcessor struct { ...@@ -29,35 +29,32 @@ type BridgeProcessor struct {
func NewBridgeProcessor(log log.Logger, db *database.DB, l1Etl *etl.L1ETL, chainConfig config.ChainConfig) (*BridgeProcessor, error) { func NewBridgeProcessor(log log.Logger, db *database.DB, l1Etl *etl.L1ETL, chainConfig config.ChainConfig) (*BridgeProcessor, error) {
log = log.New("processor", "bridge") log = log.New("processor", "bridge")
latestL1Header, err := bridge.L1LatestBridgeEventHeader(db, chainConfig) latestL1Header, err := db.BridgeTransactions.L1LatestBlockHeader()
if err != nil { if err != nil {
return nil, err return nil, err
} }
latestL2Header, err := bridge.L2LatestBridgeEventHeader(db) latestL2Header, err := db.BridgeTransactions.L2LatestBlockHeader()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Since the bridge processor indexes events based on epochs, there's var l1Header, l2Header *types.Header
// no scenario in which we have indexed L2 data with no L1 data.
//
// NOTE: Technically there is an exception if our bridging contracts are
// used to bridges native from L2 and an op-chain happens to launch where
// only L2 native bridge events have occurred. This is a rare situation now
// and it's worth the assertion as an integrity check. We can revisit this
// as more chains launch with primarily L2-native activity.
if latestL1Header == nil && latestL2Header != nil {
log.Error("detected indexed L2 bridge activity with no indexed L1 state", "l2_block_number", latestL2Header.Number)
return nil, errors.New("detected indexed L2 bridge activity with no indexed L1 state")
}
if latestL1Header == nil && latestL2Header == nil { if latestL1Header == nil && latestL2Header == nil {
log.Info("no indexed state, starting from genesis") log.Info("no indexed state, starting from rollup genesis")
} else { } else {
log.Info("detected the latest indexed state", "l1_block_number", latestL1Header.Number, "l2_block_number", latestL2Header.Number) l1Height, l2Height := big.NewInt(0), big.NewInt(0)
if latestL1Header != nil {
l1Height = latestL1Header.Number
l1Header = latestL1Header.RLPHeader.Header()
}
if latestL2Header != nil {
l2Height = latestL2Header.Number
l2Header = latestL2Header.RLPHeader.Header()
}
log.Info("detected latest indexed state", "l1_block_number", l1Height, "l2_block_number", l2Height)
} }
return &BridgeProcessor{log, db, l1Etl, chainConfig, latestL1Header, latestL2Header}, nil return &BridgeProcessor{log, db, l1Etl, chainConfig, l1Header, l2Header}, nil
} }
func (b *BridgeProcessor) Start(ctx context.Context) error { func (b *BridgeProcessor) Start(ctx context.Context) error {
...@@ -83,29 +80,40 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -83,29 +80,40 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
latestEpoch, err := b.db.Blocks.LatestEpoch() latestEpoch, err := b.db.Blocks.LatestEpoch()
if err != nil { if err != nil {
return err return err
} } else if latestEpoch == nil {
if latestEpoch == nil { if b.LatestL1Header != nil || b.LatestL2Header != nil {
if b.LatestL1Header != nil { // Once we have some indexed state `latestEpoch` can never return nil
// Once we have some state `latestEpoch` should never return nil. b.log.Error("bridge events indexed, but no indexed epoch returned", "latest_bridge_l1_block_number", b.LatestL1Header.Number)
b.log.Error("started with indexed bridge state, but no latest epoch returned", "latest_bridge_l1_block_number", b.LatestL1Header.Number) return errors.New("bridge events indexed, but no indexed epoch returned")
return errors.New("started with indexed bridge state, but no blocks epochs returned")
} else {
b.log.Warn("no indexed epochs. waiting...")
continue
} }
b.log.Warn("no indexed epochs available. waiting...")
continue
} }
// Integrity Checks
if b.LatestL1Header != nil && latestEpoch.L1BlockHeader.Hash == b.LatestL1Header.Hash() { if b.LatestL1Header != nil && latestEpoch.L1BlockHeader.Hash == b.LatestL1Header.Hash() {
// Marked as a warning since the bridge should always be processing at least 1 new epoch b.log.Warn("all available epochs indexed", "latest_bridge_l1_block_number", b.LatestL1Header.Number)
b.log.Warn("all available epochs indexed by the bridge", "latest_epoch_number", b.LatestL1Header.Number)
continue continue
} }
if b.LatestL1Header != nil && latestEpoch.L1BlockHeader.Number.Cmp(b.LatestL1Header.Number) <= 0 {
b.log.Error("non-increasing l1 block height observed", "latest_bridge_l1_block_number", b.LatestL1Header.Number, "latest_epoch_number", latestEpoch.L1BlockHeader.Number)
return errors.New("non-increasing l1 block heght observed")
}
if b.LatestL2Header != nil && latestEpoch.L2BlockHeader.Number.Cmp(b.LatestL2Header.Number) <= 0 {
b.log.Error("non-increasing l2 block height observed", "latest_bridge_l2_block_number", b.LatestL2Header.Number, "latest_epoch_number", latestEpoch.L2BlockHeader.Number)
return errors.New("non-increasing l2 block heght observed")
}
// Process Bridge Events
toL1Height, toL2Height := latestEpoch.L1BlockHeader.Number, latestEpoch.L2BlockHeader.Number toL1Height, toL2Height := latestEpoch.L1BlockHeader.Number, latestEpoch.L2BlockHeader.Number
fromL1Height, fromL2Height := big.NewInt(0), big.NewInt(0) fromL1Height, fromL2Height := big.NewInt(0), big.NewInt(0)
if b.LatestL1Header != nil { if b.LatestL1Header != nil {
// `NewBridgeProcessor` ensures that LatestL2Header must not be nil if LatestL1Header is set
fromL1Height = new(big.Int).Add(b.LatestL1Header.Number, big.NewInt(1)) fromL1Height = new(big.Int).Add(b.LatestL1Header.Number, big.NewInt(1))
}
if b.LatestL2Header != nil {
fromL2Height = new(big.Int).Add(b.LatestL2Header.Number, big.NewInt(1)) fromL2Height = new(big.Int).Add(b.LatestL2Header.Number, big.NewInt(1))
} }
...@@ -139,7 +147,7 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -139,7 +147,7 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
// Try again on a subsequent interval // Try again on a subsequent interval
batchLog.Error("unable to index new bridge events", "err", err) batchLog.Error("unable to index new bridge events", "err", err)
} else { } else {
batchLog.Info("done indexing new bridge events", "latest_l1_block_number", toL1Height, "latest_l2_block_number", toL2Height) batchLog.Info("done indexing bridge events", "latest_l1_block_number", toL1Height, "latest_l2_block_number", toL2Height)
b.LatestL1Header = latestEpoch.L1BlockHeader.RLPHeader.Header() b.LatestL1Header = latestEpoch.L1BlockHeader.RLPHeader.Header()
b.LatestL2Header = latestEpoch.L2BlockHeader.RLPHeader.Header() b.LatestL2Header = latestEpoch.L2BlockHeader.RLPHeader.Header()
} }
......
...@@ -8,7 +8,6 @@ import ( ...@@ -8,7 +8,6 @@ import (
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/processors/contracts" "github.com/ethereum-optimism/optimism/indexer/processors/contracts"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -233,80 +232,3 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, chainConfig ...@@ -233,80 +232,3 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, chainConfig
// a-ok! // a-ok!
return nil return nil
} }
// L1LatestBridgeEventHeader returns the latest header for which and on-chain event
// has been observed on L1 -- Both initiated L1 events and finalization markers on L2.
func L1LatestBridgeEventHeader(db *database.DB, chainConfig config.ChainConfig) (*types.Header, error) {
portalAbi, err := bindings.OptimismPortalMetaData.GetAbi()
if err != nil {
return nil, err
}
depositEventID := portalAbi.Events["TransactionDeposited"].ID
provenEventID := portalAbi.Events["WithdrawalProven"].ID
finalizedEventID := portalAbi.Events["WithdrawalFinalized"].ID
// (1) Initiated L1 Events
// Since all initaited bridge events eventually reach the OptimismPortal to
// conduct the deposit, we can simply look for the last deposited transaction
// event on L2.
var latestDepositHeader *types.Header
contractEventFilter := database.ContractEvent{ContractAddress: chainConfig.L1Contracts.OptimismPortalProxy, EventSignature: depositEventID}
depositEvent, err := db.ContractEvents.L1LatestContractEventWithFilter(contractEventFilter)
if err != nil {
return nil, err
}
if depositEvent != nil {
l1BlockHeader, err := db.Blocks.L1BlockHeader(depositEvent.BlockHash)
if err != nil {
return nil, err
}
if l1BlockHeader != nil {
latestDepositHeader = l1BlockHeader.RLPHeader.Header()
}
}
// (2) Finalization markers for L2
// Like initiated L1 events, all withdrawals must flow through the OptimismPortal
// contract. We must look for both proven and finalized withdrawal events.
var latestWithdrawHeader *types.Header
contractEventFilter.EventSignature = finalizedEventID
withdrawEvent, err := db.ContractEvents.L1LatestContractEventWithFilter(contractEventFilter)
if err != nil {
return nil, err
}
if withdrawEvent != nil {
// Check if a have a later detected proven event
contractEventFilter.EventSignature = provenEventID
provenEvent, err := db.ContractEvents.L1LatestContractEventWithFilter(contractEventFilter)
if err != nil {
return nil, err
}
if provenEvent != nil && provenEvent.Timestamp > withdrawEvent.Timestamp {
withdrawEvent = provenEvent
}
l1BlockHeader, err := db.Blocks.L1BlockHeader(withdrawEvent.BlockHash)
if err != nil {
return nil, err
}
latestWithdrawHeader = l1BlockHeader.RLPHeader.Header()
}
if latestDepositHeader == nil {
// If there has been no seen deposits yet, there could have been no seen withdrawals
if latestWithdrawHeader != nil {
return nil, errors.New("detected an indexed withdrawal without any deposits")
}
return nil, nil
} else if latestWithdrawHeader == nil {
return latestDepositHeader, nil
} else {
// both deposits & withdrawals have occurred
if latestDepositHeader.Time > latestWithdrawHeader.Time {
return latestDepositHeader, nil
} else {
return latestWithdrawHeader, nil
}
}
}
...@@ -7,10 +7,8 @@ import ( ...@@ -7,10 +7,8 @@ import (
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/processors/contracts" "github.com/ethereum-optimism/optimism/indexer/processors/contracts"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys" "github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -182,71 +180,3 @@ func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, fromHeight ...@@ -182,71 +180,3 @@ func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, fromHeight
// a-ok! // a-ok!
return nil return nil
} }
// L2LatestBridgeEventHeader returns the latest header for which and on-chain event
// has been observed on L2 -- Both initiated L2 events and finalization markers from L1.
func L2LatestBridgeEventHeader(db *database.DB) (*types.Header, error) {
l2ToL1MessagePasserAbi, err := bindings.L2ToL1MessagePasserMetaData.GetAbi()
if err != nil {
return nil, err
}
crossDomainMessengerAbi, err := bindings.CrossDomainMessengerMetaData.GetAbi()
if err != nil {
return nil, err
}
messagePassedID := l2ToL1MessagePasserAbi.Events["MessagePassed"].ID
relayedEventID := crossDomainMessengerAbi.Events["RelayedMessage"].ID
// (1) Initiated L2 Events
// Since all initiated bridge events eventually reach the L2ToL1MessagePasser to
// initiate the withdrawal, we can simply look for the last message passed from
// this cont
var latestWithdrawHeader *types.Header
contractEventFilter := database.ContractEvent{ContractAddress: predeploys.L2ToL1MessagePasserAddr, EventSignature: messagePassedID}
withdrawEvent, err := db.ContractEvents.L2LatestContractEventWithFilter(contractEventFilter)
if err != nil {
return nil, err
}
if withdrawEvent != nil {
l2BlockHeader, err := db.Blocks.L2BlockHeader(withdrawEvent.BlockHash)
if err != nil {
return nil, err
}
if l2BlockHeader != nil {
latestWithdrawHeader = l2BlockHeader.RLPHeader.Header()
}
}
// (2) Finalization markers for L1
// Since deposited transactions from L1 are apart of the block derivation process,
// there are no native finalization markers for OptimismPortal#TransactionDeposited.
// The lowest layer to check for here is the CrossDomainMessenger#RelayedMessage event.
// This also converts the StandardBridge which simply is an extension of the messenger.
var latestRelayedMessageHeader *types.Header
contractEventFilter = database.ContractEvent{ContractAddress: predeploys.L2CrossDomainMessengerAddr, EventSignature: relayedEventID}
relayedEvent, err := db.ContractEvents.L2LatestContractEventWithFilter(contractEventFilter)
if err != nil {
return nil, err
}
if relayedEvent != nil {
l2BlockHeader, err := db.Blocks.L2BlockHeader(relayedEvent.BlockHash)
if err != nil {
return nil, err
}
if l2BlockHeader != nil {
latestRelayedMessageHeader = l2BlockHeader.RLPHeader.Header()
}
}
// No causaal relationship between withdraw and relayed messages
if latestWithdrawHeader == nil || latestRelayedMessageHeader == nil {
return nil, nil
} else {
if latestWithdrawHeader.Time > latestRelayedMessageHeader.Time {
return latestWithdrawHeader, nil
} else {
return latestRelayedMessageHeader, nil
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment