Commit e85caec3 authored by Hamdi Allam's avatar Hamdi Allam

bridge events

parent 256ab17b
package processor package processor
import ( import (
"bytes"
"context" "context"
"encoding/hex" "encoding/hex"
"errors" "errors"
...@@ -22,6 +23,10 @@ import ( ...@@ -22,6 +23,10 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
var (
ethAddress = common.HexToAddress("0xDeadDeAddeAddEAddeadDEaDDEAdDeaDDeAD0000")
)
type L1Contracts struct { type L1Contracts struct {
OptimismPortal common.Address OptimismPortal common.Address
L2OutputOracle common.Address L2OutputOracle common.Address
...@@ -122,10 +127,10 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -122,10 +127,10 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
headerMap[header.Hash()] = header headerMap[header.Hash()] = header
} }
/** Watch for Contract Events **/ /** Watch for all Optimism Contract Events **/
logFilter := ethereum.FilterQuery{FromBlock: headers[0].Number, ToBlock: headers[numHeaders-1].Number, Addresses: contractAddrs} logFilter := ethereum.FilterQuery{FromBlock: headers[0].Number, ToBlock: headers[numHeaders-1].Number, Addresses: contractAddrs}
logs, err := rawEthClient.FilterLogs(context.Background(), logFilter) logs, err := rawEthClient.FilterLogs(context.Background(), logFilter) // []types.Log
if err != nil { if err != nil {
return err return err
} }
...@@ -135,7 +140,10 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -135,7 +140,10 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
legacyStateBatches := []*database.LegacyStateBatch{} legacyStateBatches := []*database.LegacyStateBatch{}
numLogs := len(logs) numLogs := len(logs)
logsByIndex := make(map[uint]*types.Log, numLogs)
l1ContractEvents := make([]*database.L1ContractEvent, numLogs) l1ContractEvents := make([]*database.L1ContractEvent, numLogs)
l1ContractEventLogs := make(map[uuid.UUID]*types.Log)
l1HeadersOfInterest := make(map[common.Hash]bool) l1HeadersOfInterest := make(map[common.Hash]bool)
for i, log := range logs { for i, log := range logs {
header, ok := headerMap[log.BlockHash] header, ok := headerMap[log.BlockHash]
...@@ -144,16 +152,8 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -144,16 +152,8 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
return errors.New("parsed log with a block hash not in this batch") return errors.New("parsed log with a block hash not in this batch")
} }
contractEvent := &database.L1ContractEvent{ logsByIndex[log.Index] = &logs[i]
ContractEvent: database.ContractEvent{ contractEvent := &database.L1ContractEvent{ContractEvent: database.ContractEventFromLog(&log, header.Time)}
GUID: uuid.New(),
BlockHash: log.BlockHash,
TransactionHash: log.TxHash,
EventSignature: log.Topics[0],
LogIndex: uint64(log.Index),
Timestamp: header.Time,
},
}
l1ContractEvents[i] = contractEvent l1ContractEvents[i] = contractEvent
l1HeadersOfInterest[log.BlockHash] = true l1HeadersOfInterest[log.BlockHash] = true
...@@ -194,63 +194,164 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -194,63 +194,164 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
// we iterate on the original array to maintain ordering. probably can find a more efficient // we iterate on the original array to maintain ordering. probably can find a more efficient
// way to iterate over the `l1HeadersOfInterest` map while maintaining ordering // way to iterate over the `l1HeadersOfInterest` map while maintaining ordering
l1Headers := []*database.L1BlockHeader{} indexedL1Headers := []*database.L1BlockHeader{}
for _, header := range headers { for _, header := range headers {
blockHash := header.Hash() _, hasLogs := l1HeadersOfInterest[header.Hash()]
if _, hasLogs := l1HeadersOfInterest[blockHash]; !hasLogs { if !hasLogs {
continue continue
} }
l1Headers = append(l1Headers, &database.L1BlockHeader{ indexedL1Headers = append(
BlockHeader: database.BlockHeader{ indexedL1Headers,
Hash: blockHash, &database.L1BlockHeader{BlockHeader: database.BlockHeaderFromHeader(header)},
ParentHash: header.ParentHash, )
Number: database.U256{Int: header.Number},
Timestamp: header.Time,
},
})
} }
/** Update Database **/ /** Update Database **/
numL1Headers := len(l1Headers) numIndexedL1Headers := len(indexedL1Headers)
if numL1Headers == 0 { if numIndexedL1Headers > 0 {
processLog.Info("no l1 blocks of interest") processLog.Info("saved l1 blocks of interest within batch", "num", numIndexedL1Headers, "batchSize", numHeaders)
return nil err = db.Blocks.StoreL1BlockHeaders(indexedL1Headers)
} if err != nil {
return err
}
processLog.Info("saving l1 blocks of interest", "size", numL1Headers, "batch_size", numHeaders) // Since the headers to index are derived from the existence of logs, we know in this branch `numLogs > 0`
err = db.Blocks.StoreL1BlockHeaders(l1Headers) processLog.Info("saving contract logs", "size", numLogs)
if err != nil { err = db.ContractEvents.StoreL1ContractEvents(l1ContractEvents)
return err if err != nil {
} return err
}
// Since the headers to index are derived from the existence of logs, we know in this branch `numLogs > 0` // Mark L2 checkpoints that have been recorded on L1 (L2OutputProposal & StateBatchAppended events)
processLog.Info("saving contract logs", "size", numLogs) numLegacyStateBatches := len(legacyStateBatches)
err = db.ContractEvents.StoreL1ContractEvents(l1ContractEvents) if numLegacyStateBatches > 0 {
if err != nil { latestBatch := legacyStateBatches[numLegacyStateBatches-1]
return err latestL2Height := latestBatch.PrevTotal + latestBatch.Size - 1
} processLog.Info("detected legacy state batches", "size", numLegacyStateBatches, "latest_l2_block_number", latestL2Height)
}
// Mark L2 checkpoints that have been recorded on L1 (L2OutputProposal & StateBatchAppended events) numOutputProposals := len(outputProposals)
numLegacyStateBatches := len(legacyStateBatches) if numOutputProposals > 0 {
if numLegacyStateBatches > 0 { latestL2Height := outputProposals[numOutputProposals-1].L2BlockNumber.Int
latestBatch := legacyStateBatches[numLegacyStateBatches-1] processLog.Info("detected output proposals", "size", numOutputProposals, "latest_l2_block_number", latestL2Height)
latestL2Height := latestBatch.PrevTotal + latestBatch.Size - 1 err := db.Blocks.StoreOutputProposals(outputProposals)
processLog.Info("detected legacy state batches", "size", numLegacyStateBatches, "latest_l2_block_number", latestL2Height) if err != nil {
} return err
}
}
numOutputProposals := len(outputProposals) // forward along contract events to the bridge processor
if numOutputProposals > 0 { err = l1BridgeProcessContractEvents(processLog, db, l1ContractEvents, l1ContractEventLogs, logsByIndex)
latestL2Height := outputProposals[numOutputProposals-1].L2BlockNumber.Int
processLog.Info("detected output proposals", "size", numOutputProposals, "latest_l2_block_number", latestL2Height)
err := db.Blocks.StoreOutputProposals(outputProposals)
if err != nil { if err != nil {
return err return err
} }
} else {
processLog.Info("no l1 blocks of interest within batch")
} }
// a-ok! // a-ok!
return nil return nil
} }
} }
func l1BridgeProcessContractEvents(
processLog log.Logger,
db *database.DB,
events []*database.L1ContractEvent,
eventLogs map[uuid.UUID]*types.Log,
logsByIndex map[uint]*types.Log,
) error {
l1StandardBridgeABI, err := bindings.L1StandardBridgeMetaData.GetAbi()
if err != nil {
return err
}
l1CrossDomainMessengerABI, err := bindings.L1CrossDomainMessengerMetaData.GetAbi()
if err != nil {
return err
}
type MessageData struct {
Sender common.Address
Message []byte
MessageNonce *big.Int
GasLimit *big.Int
}
type BridgeData struct {
From common.Address
To common.Address
Amount *big.Int
ExtraData []byte
}
l1StandardBridgeDeposits := []*database.Deposit{}
ethBridgeInitiatedEventSig := l1StandardBridgeABI.Events["ETHBridgeInitiated"].ID
sentMessageEventSig := l1CrossDomainMessengerABI.Events["SentMessage"].ID
for _, contractEvent := range events {
eventSig := contractEvent.EventSignature
log := eventLogs[contractEvent.GUID]
if eventSig == ethBridgeInitiatedEventSig {
// (1) Deconstruct the bridge event
var bridgeData BridgeData
err = l1StandardBridgeABI.UnpackIntoInterface(&bridgeData, "ETHBridgeInitiated", log.Data)
if err != nil || len(log.Topics) != 3 {
processLog.Crit("unexpected ETHDepositInitiated log format", "tx", log.TxHash, "err", err)
return err
}
// from/to are indexed event fields (not present in the log data)
bridgeData.From = common.BytesToAddress(log.Topics[1].Bytes())
bridgeData.To = common.BytesToAddress(log.Topics[2].Bytes())
// (2) Look for the sent message event to extract the associated messager nonce
// - The `SentMessage` event is the second after the bridge initiated event. BridgeInitiated -> Portal#DepositTransaction -> SentMesage ...
sentMsgLog := logsByIndex[log.Index+2]
if sentMsgLog.Topics[0] != sentMessageEventSig {
processLog.Crit("expected CrossDomainMessenger#SentMessage to follow StandardBridge#EthBridgeInitiated event", "event_sig", sentMsgLog.Topics[0], "sent_message_sig", sentMessageEventSig)
return errors.New("unexpected bridge event ordering")
}
expectedMsg, err := l1StandardBridgeABI.Pack("finalizeBridgeETH", bridgeData.From, bridgeData.To, bridgeData.Amount, bridgeData.ExtraData)
if err != nil {
processLog.Crit("unable to create bridge message")
return err
}
var sentMsgData MessageData
err = l1CrossDomainMessengerABI.UnpackIntoInterface(&sentMsgData, "SentMessage", sentMsgLog.Data)
if err != nil {
processLog.Crit("unexpected SentMessage log format", "tx", log.TxHash, "err", err)
return err
} else if !bytes.Equal(sentMsgData.Message, expectedMsg) {
processLog.Crit("SentMessage message mismatch", "expected_bridge_msg", hex.EncodeToString(expectedMsg), "event_msg", hex.EncodeToString(sentMsgData.Message))
return errors.New("bridge message mismatch")
}
// (3) Record the deposit
l1StandardBridgeDeposits = append(l1StandardBridgeDeposits, &database.Deposit{
GUID: uuid.New(),
InitiatedL1EventGUID: contractEvent.GUID,
SentMessageNonce: database.U256{Int: sentMsgData.MessageNonce},
TokenPair: database.TokenPair{L1TokenAddress: ethAddress, L2TokenAddress: ethAddress},
Tx: database.Transaction{
FromAddress: common.BytesToAddress(log.Topics[1].Bytes()),
ToAddress: common.BytesToAddress(log.Topics[2].Bytes()),
Amount: database.U256{Int: bridgeData.Amount},
Data: bridgeData.ExtraData,
Timestamp: contractEvent.Timestamp,
},
})
}
}
if len(l1StandardBridgeDeposits) > 0 {
processLog.Info("detected L1StandardBridge deposits", "num", len(l1StandardBridgeDeposits))
return db.Bridge.StoreDeposits(l1StandardBridgeDeposits)
}
// no-op
return nil
}
package processor package processor
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"math/big"
"reflect" "reflect"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
...@@ -125,7 +128,10 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2 ...@@ -125,7 +128,10 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
} }
numLogs := len(logs) numLogs := len(logs)
logsByIndex := make(map[uint]*types.Log, numLogs)
l2ContractEvents := make([]*database.L2ContractEvent, numLogs) l2ContractEvents := make([]*database.L2ContractEvent, numLogs)
l2ContractEventLogs := make(map[uuid.UUID]*types.Log)
for i, log := range logs { for i, log := range logs {
header, ok := l2HeaderMap[log.BlockHash] header, ok := l2HeaderMap[log.BlockHash]
if !ok { if !ok {
...@@ -133,16 +139,11 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2 ...@@ -133,16 +139,11 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
return errors.New("parsed log with a block hash not in this batch") return errors.New("parsed log with a block hash not in this batch")
} }
l2ContractEvents[i] = &database.L2ContractEvent{ logsByIndex[log.Index] = &logs[i]
ContractEvent: database.ContractEvent{ contractEvent := &database.L2ContractEvent{ContractEvent: database.ContractEventFromLog(&log, header.Time)}
GUID: uuid.New(),
BlockHash: log.BlockHash, l2ContractEvents[i] = contractEvent
TransactionHash: log.TxHash, l2ContractEventLogs[contractEvent.GUID] = &logs[i]
EventSignature: log.Topics[0],
LogIndex: uint64(log.Index),
Timestamp: header.Time,
},
}
} }
/** Update Database **/ /** Update Database **/
...@@ -159,9 +160,125 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2 ...@@ -159,9 +160,125 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
if err != nil { if err != nil {
return err return err
} }
// forward along contract events to the bridge processor
err = l2BridgeProcessContractEvents(processLog, db, ethClient, l2ContractEvents, l2ContractEventLogs, logsByIndex)
if err != nil {
return err
}
} }
// a-ok! // a-ok!
return nil return nil
} }
} }
func l2BridgeProcessContractEvents(
processLog log.Logger,
db *database.DB,
ethClient node.EthClient,
events []*database.L2ContractEvent,
eventLogs map[uuid.UUID]*types.Log,
logsByIndex map[uint]*types.Log,
) error {
rawEthClient := ethclient.NewClient(ethClient.RawRpcClient())
l2StandardBridgeABI, err := bindings.L2StandardBridgeMetaData.GetAbi()
if err != nil {
return err
}
l2CrossDomainMessengerABI, err := bindings.L2CrossDomainMessengerMetaData.GetAbi()
if err != nil {
return err
}
type BridgeData struct {
From common.Address
To common.Address
Amount *big.Int
ExtraData []byte
}
numFinalizedDeposits := 0
ethBridgeFinalizedEventSig := l2StandardBridgeABI.Events["ETHBridgeFinalized"].ID
relayedMessageEventSig := l2CrossDomainMessengerABI.Events["RelayedMessage"].ID
relayMessageMethod := l2CrossDomainMessengerABI.Methods["relayMessage"]
for _, contractEvent := range events {
eventSig := contractEvent.EventSignature
log := eventLogs[contractEvent.GUID]
if eventSig == ethBridgeFinalizedEventSig {
// (1) Ensure the RelayedMessage follows the log right after the bridge event
relayedMsgLog := logsByIndex[log.Index+1]
if relayedMsgLog.Topics[0] != relayedMessageEventSig {
processLog.Crit("expected CrossDomainMessenger#RelayedMessage following StandardBridge#EthBridgeFinalized event", "event_sig", relayedMsgLog.Topics[0], "relayed_message_sig", relayedMessageEventSig)
return errors.New("unexpected bridge event ordering")
}
// unfortunately there's no way to extract the nonce on the relayed message event. we can
// extract the nonce by unpacking the transaction input for the `relayMessage` transaction
tx, isPending, err := rawEthClient.TransactionByHash(context.Background(), relayedMsgLog.TxHash)
if err != nil || isPending {
processLog.Crit("CrossDomainMessager#relayeMessage transaction query err or found pending", err, "err", "isPending", isPending)
return errors.New("unable to query relayMessage tx")
}
txData := tx.Data()
fnSelector := txData[:4]
if !bytes.Equal(fnSelector, relayMessageMethod.ID) {
processLog.Crit("expected relayMessage function selector")
return errors.New("RelayMessage log does not match relayMessage transaction")
}
fnData := txData[4:]
inputsMap := make(map[string]interface{})
err = relayMessageMethod.Inputs.UnpackIntoMap(inputsMap, fnData)
if err != nil {
processLog.Crit("unable to unpack CrossDomainMessenger#relayMessage function data", "err", err)
return err
}
nonce, ok := inputsMap["_nonce"].(*big.Int)
if !ok {
processLog.Crit("unable to extract _nonce from CrossDomainMessenger#relayMessage function call")
return errors.New("unable to extract relayMessage nonce")
}
// (2) Mark initiated L1 deposit as finalized
deposit, err := db.Bridge.DepositByMessageNonce(nonce)
if err != nil {
processLog.Error("error querying initiated deposit messsage using nonce", "nonce", nonce)
return err
} else if deposit == nil {
latestNonce, err := db.Bridge.LatestDepositMessageNonce()
if err != nil {
return err
}
// check if the the L1Processor is behind or really has missed an event
if latestNonce == nil || nonce.Cmp(latestNonce) > 0 {
processLog.Warn("behind on indexed L1 deposits", "deposit_message_nonce", nonce, "latest_deposit_message_nonce", latestNonce)
return errors.New("waiting for L1Processor to catch up")
} else {
processLog.Crit("missing indexed deposit for this finalization event", "deposit_message_nonce", nonce, "tx_hash", log.TxHash, "log_index", log.Index)
return errors.New("missing deposit message")
}
}
err = db.Bridge.MarkFinalizedDepositEvent(deposit.GUID, contractEvent.GUID)
if err != nil {
processLog.Error("error finalizing deposit", "err", err)
return err
}
numFinalizedDeposits++
}
}
// a-ok!
if numFinalizedDeposits > 0 {
processLog.Info("finalized deposits", "num", numFinalizedDeposits)
}
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment