Commit 5eaaee10 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #7159 from ethereum-optimism/indexer.bridge.l2.start

fix(indexer): bridge processor fixes
parents 3ec079da c30891a4
package database
import (
"context"
"errors"
"math/big"
......@@ -211,7 +210,6 @@ func (db *blocksDB) L2LatestBlockHeader() (*L2BlockHeader, error) {
return nil, result.Error
}
result.Logger.Info(context.Background(), "number ", l2Header.Number)
return &l2Header, nil
}
......@@ -229,13 +227,32 @@ type Epoch struct {
// For more, see the protocol spec:
// - https://github.com/ethereum-optimism/optimism/blob/develop/specs/derivation.md
func (db *blocksDB) LatestEpoch() (*Epoch, error) {
// Since L1 blocks occur less frequently than L2, we do a INNER JOIN from L1 on
// L2 for a faster query. Per the protocol, the L2 block that starts a new epoch
// will have a matching timestamp with the L1 origin.
query := db.gorm.Table("l1_block_headers").Order("l1_block_headers.timestamp DESC")
query = query.Joins("INNER JOIN l2_block_headers ON l2_block_headers.timestamp = l1_block_headers.timestamp")
query = query.Order("l2_block_headers.number DESC")
query = query.Select("*")
latestL1Header, err := db.L1LatestBlockHeader()
if err != nil {
return nil, err
} else if latestL1Header == nil {
return nil, nil
}
latestL2Header, err := db.L2LatestBlockHeader()
if err != nil {
return nil, err
} else if latestL2Header == nil {
return nil, nil
}
minTime := latestL1Header.Timestamp
if latestL2Header.Timestamp < minTime {
minTime = latestL2Header.Timestamp
}
// This is a faster query than doing an INNER JOIN between l1_block_headers and l2_block_headers
// which requires a full table scan to compute the resulting table.
l1Query := db.gorm.Table("l1_block_headers").Where("timestamp <= ?", minTime)
l2Query := db.gorm.Table("l2_block_headers").Where("timestamp <= ?", minTime)
query := db.gorm.Raw(`SELECT * FROM (?) AS l1_block_headers, (?) AS l2_block_headers
WHERE l1_block_headers.timestamp = l2_block_headers.timestamp
ORDER BY l2_block_headers.number DESC LIMIT 1`, l1Query, l2Query)
var epoch Epoch
result := query.Take(&epoch)
......
......@@ -114,7 +114,7 @@ func (db *bridgeTransactionsDB) L1LatestBlockHeader() (*L1BlockHeader, error) {
l1Query := db.gorm.Table("((?) UNION (?) UNION (?)) AS latest_bridge_events", l1DepositQuery.Limit(1), l1ProvenQuery, l1FinalizedQuery)
l1Query = l1Query.Joins("INNER JOIN l1_block_headers ON l1_block_headers.hash = latest_bridge_events.block_hash")
l1Query = l1Query.Order("l1_block_headers.number DESC").Select("l1_block_headers.*")
l1Query = l1Query.Order("latest_bridge_events.timestamp DESC").Select("l1_block_headers.*")
var l1Header L1BlockHeader
result := l1Query.Take(&l1Header)
......@@ -185,23 +185,47 @@ func (db *bridgeTransactionsDB) MarkL2TransactionWithdrawalFinalizedEvent(withdr
}
func (db *bridgeTransactionsDB) L2LatestBlockHeader() (*L2BlockHeader, error) {
// L2: Inclusion of the latest deposit
l1DepositQuery := db.gorm.Table("l1_transaction_deposits").Order("l1_transaction_deposits.timestamp DESC")
l1DepositQuery = l1DepositQuery.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = l1_transaction_deposits.initiated_l1_event_guid")
l1DepositQuery = l1DepositQuery.Select("l1_contract_events.*")
l2Query := db.gorm.Table("(?) AS l1_deposit_events", l1DepositQuery)
l2Query = l2Query.Joins("INNER JOIN l2_block_headers ON l2_block_headers.timestamp = l1_deposit_events.timestamp")
l2Query = l2Query.Order("l2_block_headers.timestamp DESC").Select("l2_block_headers.*")
// L2: Latest Withdrawal, Latest L2 Header of indexed deposit epoch
var latestWithdrawalHeader, latestL2DepositHeader *L2BlockHeader
var withdrawHeader L2BlockHeader
withdrawalQuery := db.gorm.Table("l2_transaction_withdrawals").Order("timestamp DESC").Limit(1)
withdrawalQuery = withdrawalQuery.Joins("INNER JOIN l2_contract_events ON l2_contract_events.guid = l2_transaction_withdrawals.initiated_l2_event_guid")
withdrawalQuery = withdrawalQuery.Joins("INNER JOIN l2_block_headers ON l2_block_headers.hash = l2_contract_events.block_hash")
result := withdrawalQuery.Select("l2_block_headers.*").Take(&withdrawHeader)
if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, result.Error
} else if !errors.Is(result.Error, gorm.ErrRecordNotFound) {
latestWithdrawalHeader = &withdrawHeader
}
var l2Header L2BlockHeader
result := l2Query.Take(&l2Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
// Check for any deposits that may have been included after the latest withdrawal. However, since the bridge
// processor only inserts entries when the corresponding epoch has been indexed on both L1 and L2, we can
// simply look for the latest L2 block with at <= time of the latest L1 deposit.
var l1Deposit L1TransactionDeposit
result = db.gorm.Table("l1_transaction_deposits").Order("timestamp DESC").Limit(1).Take(&l1Deposit)
if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, result.Error
} else if !errors.Is(result.Error, gorm.ErrRecordNotFound) {
var l2DepositHeader L2BlockHeader
result := db.gorm.Table("l2_block_headers").Order("timestamp DESC").Limit(1).Where("timestamp <= ?", l1Deposit.Tx.Timestamp).Take(&l2DepositHeader)
if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, result.Error
} else if !errors.Is(result.Error, gorm.ErrRecordNotFound) {
latestL2DepositHeader = &l2DepositHeader
}
}
return &l2Header, nil
// compare
if latestWithdrawalHeader == nil {
return latestL2DepositHeader, nil
} else if latestL2DepositHeader == nil {
return latestWithdrawalHeader, nil
}
if latestWithdrawalHeader.Timestamp >= latestL2DepositHeader.Timestamp {
return latestWithdrawalHeader, nil
} else {
return latestL2DepositHeader, nil
}
}
......@@ -23,6 +23,9 @@ func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, fromHeight
if err != nil {
return err
}
if len(l2ToL1MPMessagesPassed) > 0 {
log.Info("detected transaction withdrawals", "size", len(l2ToL1MPMessagesPassed))
}
messagesPassed := make(map[logKey]*contracts.L2ToL1MessagePasserMessagePassed, len(l2ToL1MPMessagesPassed))
transactionWithdrawals := make([]database.L2TransactionWithdrawal, len(l2ToL1MPMessagesPassed))
......@@ -37,9 +40,7 @@ func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, fromHeight
Tx: messagePassed.Tx,
}
}
if len(messagesPassed) > 0 {
log.Info("detected transaction withdrawals", "size", len(transactionWithdrawals))
if err := db.BridgeTransactions.StoreL2TransactionWithdrawals(transactionWithdrawals); err != nil {
return err
}
......@@ -50,8 +51,8 @@ func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, fromHeight
if err != nil {
return err
}
if len(crossDomainSentMessages) > len(messagesPassed) {
return fmt.Errorf("missing L2ToL1MP withdrawal for each cross-domain message. withdrawals: %d, messages: %d", len(messagesPassed), len(crossDomainSentMessages))
if len(crossDomainSentMessages) > 0 {
log.Info("detected sent messages", "size", len(crossDomainSentMessages))
}
sentMessages := make(map[logKey]*contracts.CrossDomainMessengerSentMessageEvent, len(crossDomainSentMessages))
......@@ -63,14 +64,14 @@ func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, fromHeight
// extract the withdrawal hash from the previous MessagePassed event
messagePassed, ok := messagesPassed[logKey{sentMessage.Event.BlockHash, sentMessage.Event.LogIndex - 1}]
if !ok {
return fmt.Errorf("expected MessagePassedEvent preceding SentMessage. tx_hash = %s", sentMessage.Event.TransactionHash)
log.Error("expected MessagePassedEvent preceding SentMessage", "tx_hash", sentMessage.Event.TransactionHash.String())
return fmt.Errorf("expected MessagePassedEvent preceding SentMessage. tx_hash = %s", sentMessage.Event.TransactionHash.String())
}
l2BridgeMessages[i] = database.L2BridgeMessage{TransactionWithdrawalHash: messagePassed.WithdrawalHash, BridgeMessage: sentMessage.BridgeMessage}
}
if len(l2BridgeMessages) > 0 {
log.Info("detected L2CrossDomainMessenger messages", "size", len(l2BridgeMessages))
if err := db.BridgeMessages.StoreL2BridgeMessages(l2BridgeMessages); err != nil {
return err
}
......@@ -81,8 +82,8 @@ func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, fromHeight
if err != nil {
return err
}
if len(initiatedBridges) > len(crossDomainSentMessages) {
return fmt.Errorf("missing cross-domain message for each initiated bridge event. messages: %d, bridges: %d", len(crossDomainSentMessages), len(initiatedBridges))
if len(initiatedBridges) > 0 {
log.Info("detected bridge withdrawals", "size", len(initiatedBridges))
}
l2BridgeWithdrawals := make([]database.L2BridgeWithdrawal, len(initiatedBridges))
......@@ -92,11 +93,13 @@ func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, fromHeight
// extract the cross domain message hash & deposit source hash from the following events
messagePassed, ok := messagesPassed[logKey{initiatedBridge.Event.BlockHash, initiatedBridge.Event.LogIndex + 1}]
if !ok {
return fmt.Errorf("expected MessagePassed following BridgeInitiated event. tx_hash = %s", initiatedBridge.Event.TransactionHash)
log.Error("expected MessagePassed following BridgeInitiated event", "tx_hash", initiatedBridge.Event.TransactionHash.String())
return fmt.Errorf("expected MessagePassed following BridgeInitiated event. tx_hash = %s", initiatedBridge.Event.TransactionHash.String())
}
sentMessage, ok := sentMessages[logKey{initiatedBridge.Event.BlockHash, initiatedBridge.Event.LogIndex + 2}]
if !ok {
return fmt.Errorf("expected SentMessage following MessagePassed event. tx_hash = %s", initiatedBridge.Event.TransactionHash)
log.Error("expected SentMessage following MessagePassed event", "tx_hash", initiatedBridge.Event.TransactionHash.String())
return fmt.Errorf("expected SentMessage following MessagePassed event. tx_hash = %s", initiatedBridge.Event.TransactionHash.String())
}
initiatedBridge.BridgeTransfer.CrossDomainMessageHash = &sentMessage.BridgeMessage.MessageHash
......@@ -104,7 +107,6 @@ func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, fromHeight
}
if len(l2BridgeWithdrawals) > 0 {
log.Info("detected L2StandardBridge withdrawals", "size", len(l2BridgeWithdrawals))
if err := db.BridgeTransfers.StoreL2BridgeWithdrawals(l2BridgeWithdrawals); err != nil {
return err
}
......@@ -121,11 +123,14 @@ func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, fromHeight
//
// NOTE: Unlike L1, there's no L2ToL1MessagePasser stage since transaction deposits are apart of the block derivation process.
func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, fromHeight *big.Int, toHeight *big.Int) error {
// (1) L2CrossDomainMessenger relayedMessage
// (1) L2CrossDomainMessenger
crossDomainRelayedMessages, err := contracts.CrossDomainMessengerRelayedMessageEvents("l2", predeploys.L2CrossDomainMessengerAddr, db, fromHeight, toHeight)
if err != nil {
return err
}
if len(crossDomainRelayedMessages) > 0 {
log.Info("detected relayed messages", "size", len(crossDomainRelayedMessages))
}
relayedMessages := make(map[logKey]*contracts.CrossDomainMessengerRelayedMessageEvent, len(crossDomainRelayedMessages))
for i := range crossDomainRelayedMessages {
......@@ -135,26 +140,23 @@ func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, fromHeight
if err != nil {
return err
} else if message == nil {
log.Error("missing indexed L1CrossDomainMessenger message", "message_hash", relayed.MessageHash, "tx_hash", relayed.Event.TransactionHash)
return fmt.Errorf("missing indexed L1CrossDomainMessager message")
log.Error("missing indexed L1CrossDomainMessenger message", "tx_hash", relayed.Event.TransactionHash.String())
return fmt.Errorf("missing indexed L1CrossDomainMessager message. tx_hash = %s", relayed.Event.TransactionHash.String())
}
if err := db.BridgeMessages.MarkRelayedL1BridgeMessage(relayed.MessageHash, relayed.Event.GUID); err != nil {
log.Error("failed to relay cross domain message", "err", err, "tx_hash", relayed.Event.TransactionHash.String())
return err
}
}
if len(crossDomainRelayedMessages) > 0 {
log.Info("relayed L1CrossDomainMessenger messages", "size", len(crossDomainRelayedMessages))
}
// (2) L2StandardBridge BridgeFinalized
// (2) L2StandardBridge
finalizedBridges, err := contracts.StandardBridgeFinalizedEvents("l2", predeploys.L2StandardBridgeAddr, db, fromHeight, toHeight)
if err != nil {
return err
}
if len(finalizedBridges) > len(crossDomainRelayedMessages) {
return fmt.Errorf("missing cross-domain message for each finalized bridge event. messages: %d, bridges: %d", len(crossDomainRelayedMessages), len(finalizedBridges))
if len(finalizedBridges) > 0 {
log.Info("detected finalized bridge deposits", "size", len(finalizedBridges))
}
for i := range finalizedBridges {
......@@ -163,7 +165,8 @@ func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, fromHeight
finalizedBridge := finalizedBridges[i]
relayedMessage, ok := relayedMessages[logKey{finalizedBridge.Event.BlockHash, finalizedBridge.Event.LogIndex + 1}]
if !ok {
return fmt.Errorf("expected RelayedMessage following BridgeFinalized event. tx_hash = %s", finalizedBridge.Event.TransactionHash)
log.Error("expected RelayedMessage following BridgeFinalized event", "tx_hash", finalizedBridge.Event.TransactionHash.String())
return fmt.Errorf("expected RelayedMessage following BridgeFinalized event. tx_hash = %s", finalizedBridge.Event.TransactionHash.String())
}
// Since the message hash is computed from the relayed message, this ensures the withdrawal fields must match. For good measure,
......@@ -172,7 +175,7 @@ func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, fromHeight
if err != nil {
return err
} else if deposit == nil {
log.Error("missing L1StandardBridge deposit on L2 finalization", "tx_hash", finalizedBridge.Event.TransactionHash)
log.Error("missing L1StandardBridge deposit on L2 finalization", "tx_hash", finalizedBridge.Event.TransactionHash.String())
return errors.New("missing L1StandardBridge deposit on L2 finalization")
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment