Commit 71a657e9 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into jg/configurable_txmgr_cli_default_values

parents 3206f4bb 20f9664c
......@@ -25,35 +25,3 @@ func Clamp(start, end *big.Int, size uint64) *big.Int {
func Matcher(num int64) func(*big.Int) bool {
return func(bi *big.Int) bool { return bi.Int64() == num }
}
type Range struct {
Start *big.Int
End *big.Int
}
// Grouped will return a slice of inclusive ranges from (start, end),
// capped to the supplied size from `(start, end)`.
func Grouped(start, end *big.Int, size uint64) []Range {
if end.Cmp(start) < 0 || size == 0 {
return nil
}
bigMaxDiff := big.NewInt(int64(size - 1))
groups := []Range{}
for start.Cmp(end) <= 0 {
diff := new(big.Int).Sub(end, start)
switch {
case diff.Uint64()+1 <= size:
// re-use allocated diff as the next start
groups = append(groups, Range{start, end})
start = diff.Add(end, One)
default:
// re-use allocated diff as the next start
end := new(big.Int).Add(start, bigMaxDiff)
groups = append(groups, Range{start, end})
start = diff.Add(end, One)
}
}
return groups
}
......@@ -27,46 +27,3 @@ func TestClamp(t *testing.T) {
require.False(t, end == result)
require.Equal(t, uint64(5), result.Uint64())
}
func TestGrouped(t *testing.T) {
// base cases
require.Nil(t, Grouped(One, Zero, 1))
require.Nil(t, Grouped(Zero, One, 0))
// Same Start/End
group := Grouped(One, One, 1)
require.Len(t, group, 1)
require.Equal(t, One, group[0].Start)
require.Equal(t, One, group[0].End)
Three, Five := big.NewInt(3), big.NewInt(5)
// One at a time
group = Grouped(One, Three, 1)
require.Equal(t, One, group[0].End)
require.Equal(t, int64(1), group[0].End.Int64())
require.Equal(t, int64(2), group[1].Start.Int64())
require.Equal(t, int64(2), group[1].End.Int64())
require.Equal(t, int64(3), group[2].Start.Int64())
require.Equal(t, int64(3), group[2].End.Int64())
// Split groups
group = Grouped(One, Five, 3)
require.Len(t, group, 2)
require.Equal(t, One, group[0].Start)
require.Equal(t, int64(3), group[0].End.Int64())
require.Equal(t, int64(4), group[1].Start.Int64())
require.Equal(t, Five, group[1].End)
// Encompasses the range
group = Grouped(One, Five, 5)
require.Len(t, group, 1)
require.Equal(t, One, group[0].Start, Zero)
require.Equal(t, Five, group[0].End)
// Size larger than the entire range
group = Grouped(One, Five, 100)
require.Len(t, group, 1)
require.Equal(t, One, group[0].Start, Zero)
require.Equal(t, Five, group[0].End)
}
......@@ -2,8 +2,10 @@ package database
import (
"errors"
"fmt"
"math/big"
"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
......@@ -51,7 +53,7 @@ type BlocksView interface {
L2BlockHeaderWithFilter(BlockHeader) (*L2BlockHeader, error)
L2LatestBlockHeader() (*L2BlockHeader, error)
LatestEpoch() (*Epoch, error)
LatestObservedEpoch(*big.Int, uint64) (*Epoch, error)
}
type BlocksDB interface {
......@@ -155,36 +157,74 @@ type Epoch struct {
L2BlockHeader L2BlockHeader `gorm:"embedded"`
}
// LatestEpoch return the latest epoch, seen on L1 & L2. In other words
// this returns the latest indexed L1 block that has a corresponding
// indexed L2 block with a matching L1Origin (equal timestamps).
// LatestObservedEpoch return the marker for latest epoch, observed on L1 & L2, within
// the specified bounds. In other words this returns the latest indexed L1 block that has
// a corresponding indexed L2 block with a matching L1Origin (equal timestamps).
//
// If `fromL1Height` (inclusive) is not specified, the search will start from genesis and
// continue all the way to latest indexed heights if `maxL1Range == 0`.
//
// For more, see the protocol spec:
// - https://github.com/ethereum-optimism/optimism/blob/develop/specs/derivation.md
func (db *blocksDB) LatestEpoch() (*Epoch, error) {
latestL1Header, err := db.L1LatestBlockHeader()
if err != nil {
return nil, err
} else if latestL1Header == nil {
return nil, nil
func (db *blocksDB) LatestObservedEpoch(fromL1Height *big.Int, maxL1Range uint64) (*Epoch, error) {
// We use timestamps since that translates to both L1 & L2
var fromTimestamp, toTimestamp uint64
if fromL1Height == nil {
fromL1Height = bigint.Zero
}
latestL2Header, err := db.L2LatestBlockHeader()
if err != nil {
return nil, err
} else if latestL2Header == nil {
return nil, nil
// Lower Bound (the default `fromTimestamp = 0` suffices genesis representation)
if fromL1Height.BitLen() > 0 {
var header L1BlockHeader
result := db.gorm.Where("number = ?", fromL1Height).Take(&header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
fromTimestamp = header.Timestamp
}
minTime := latestL1Header.Timestamp
if latestL2Header.Timestamp < minTime {
minTime = latestL2Header.Timestamp
// Upper Bound (lowest timestamp indexed between L1/L2 bounded by `maxL1Range`)
{
l1QueryFilter := fmt.Sprintf("timestamp >= %d", fromTimestamp)
if maxL1Range > 0 {
maxHeight := new(big.Int).Add(fromL1Height, big.NewInt(int64(maxL1Range)))
l1QueryFilter = fmt.Sprintf("%s AND number <= %d", l1QueryFilter, maxHeight)
}
var l1Header L1BlockHeader
result := db.gorm.Where(l1QueryFilter).Order("timestamp DESC").Take(&l1Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
toTimestamp = l1Header.Timestamp
var l2Header L2BlockHeader
result = db.gorm.Where("timestamp <= ?", toTimestamp).Order("timestamp DESC").Take(&l2Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
if l2Header.Timestamp < toTimestamp {
toTimestamp = l2Header.Timestamp
}
}
// This is a faster query than doing an INNER JOIN between l1_block_headers and l2_block_headers
// which requires a full table scan to compute the resulting table.
l1Query := db.gorm.Table("l1_block_headers").Where("timestamp <= ?", minTime)
l2Query := db.gorm.Table("l2_block_headers").Where("timestamp <= ?", minTime)
// Search for the latest indexed epoch within range. This is a faster query than doing an INNER JOIN between
// l1_block_headers and l2_block_headers which requires a full table scan to compute the resulting table.
l1Query := db.gorm.Table("l1_block_headers").Where("timestamp >= ? AND timestamp <= ?", fromTimestamp, toTimestamp)
l2Query := db.gorm.Table("l2_block_headers").Where("timestamp >= ? AND timestamp <= ?", fromTimestamp, toTimestamp)
query := db.gorm.Raw(`SELECT * FROM (?) AS l1_block_headers, (?) AS l2_block_headers
WHERE l1_block_headers.timestamp = l2_block_headers.timestamp
ORDER BY l2_block_headers.number DESC LIMIT 1`, l1Query, l2Query)
......
package database
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/mock"
......@@ -51,7 +53,7 @@ func (m *MockBlocksView) L2LatestBlockHeader() (*L2BlockHeader, error) {
return args.Get(0).(*L2BlockHeader), args.Error(1)
}
func (m *MockBlocksView) LatestEpoch() (*Epoch, error) {
func (m *MockBlocksView) LatestObservedEpoch(*big.Int, uint64) (*Epoch, error) {
args := m.Called()
return args.Get(0).(*Epoch), args.Error(1)
}
......
......@@ -41,21 +41,24 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
dbUser := os.Getenv("DB_USER")
dbName := setupTestDatabase(t)
// Discard the Global Logger as each component
// has its own configured logger
// Rollup System Configuration. Unless specified,
// omit logs emitted by the various components. Maybe
// we can eventually dump these logs to a temp file
log.Root().SetHandler(log.DiscardHandler())
// Rollup System Configuration and Start
opCfg := op_e2e.DefaultSystemConfig(t)
opCfg.DeployConfig.FinalizationPeriodSeconds = 2
if len(os.Getenv("ENABLE_ROLLUP_LOGS")) == 0 {
t.Log("set env 'ENABLE_ROLLUP_LOGS' to show rollup logs")
for name, logger := range opCfg.Loggers {
t.Logf("discarding logs for %s", name)
logger.SetHandler(log.DiscardHandler())
}
}
// Rollup Start
opSys, err := opCfg.Start(t)
require.NoError(t, err)
t.Cleanup(func() { opSys.Close() })
// E2E tests can run on the order of magnitude of minutes. Once
// the system is running, mark this test for Parallel execution
t.Parallel()
// Indexer Configuration and Start
indexerCfg := config.Config{
DB: config.DBConfig{
......@@ -86,8 +89,14 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
MetricsServer: config.ServerConfig{Host: "127.0.0.1", Port: 0},
}
// Emit debug log levels
db, err := database.NewDB(testlog.Logger(t, log.LvlDebug).New("role", "db"), indexerCfg.DB)
// E2E tests can run on the order of magnitude of minutes. Once
// the system is running, mark this test for Parallel execution
t.Parallel()
// provide a DB for the unit test. disable logging
silentLog := testlog.Logger(t, log.LvlInfo)
silentLog.SetHandler(log.DiscardHandler())
db, err := database.NewDB(silentLog, indexerCfg.DB)
require.NoError(t, err)
t.Cleanup(func() { db.Close() })
......@@ -138,7 +147,6 @@ func setupTestDatabase(t *testing.T) string {
User: user,
Password: "",
}
// NewDB will create the database schema
silentLog := log.New()
silentLog.SetHandler(log.DiscardHandler())
......
......@@ -71,10 +71,10 @@ func (etl *ETL) Start(ctx context.Context) error {
etl.log.Error("error querying for headers", "err", err)
} else if len(newHeaders) == 0 {
etl.log.Warn("no new headers. processor unexpectedly at head...")
} else {
headers = newHeaders
etl.metrics.RecordBatchHeaders(len(newHeaders))
}
headers = newHeaders
etl.metrics.RecordBatchHeaders(len(newHeaders))
}
// only clear the reference if we were able to process this batch
......@@ -107,7 +107,7 @@ func (etl *ETL) processBatch(headers []types.Header) error {
headersWithLog := make(map[common.Hash]bool, len(headers))
logs, err := etl.EthClient.FilterLogs(ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts})
if err != nil {
batchLog.Info("unable to extract logs", "err", err)
batchLog.Info("failed to extract logs", "err", err)
return err
}
if len(logs) > 0 {
......@@ -118,7 +118,8 @@ func (etl *ETL) processBatch(headers []types.Header) error {
log := logs[i]
if _, ok := headerMap[log.BlockHash]; !ok {
// NOTE. Definitely an error state if the none of the headers were re-orged out in between
// the blocks and logs retrieval operations. However, we need to gracefully handle reorgs
// the blocks and logs retrieval operations. Unlikely as long as the confirmation depth has
// been appropriately set or when we get to natively handling reorgs.
batchLog.Error("log found with block hash not in the batch", "block_hash", logs[i].BlockHash, "log_index", logs[i].Index)
return errors.New("parsed log with a block hash not in the batch")
}
......
......@@ -16,7 +16,6 @@ type Metricer interface {
RecordInterval() (done func(err error))
// Batch Extraction
RecordBatchFailure()
RecordBatchLatestHeight(height *big.Int)
RecordBatchHeaders(size int)
RecordBatchLog(contractAddress common.Address)
......@@ -108,17 +107,13 @@ func (m *etlMetrics) RecordInterval() func(error) {
timer := prometheus.NewTimer(m.intervalDuration)
return func(err error) {
if err != nil {
m.RecordBatchFailure()
m.batchFailures.Inc()
}
timer.ObserveDuration()
}
}
func (m *etlMetrics) RecordBatchFailure() {
m.batchFailures.Inc()
}
func (m *etlMetrics) RecordBatchLatestHeight(height *big.Int) {
m.batchLatestHeight.Set(float64(height.Uint64()))
}
......
......@@ -20,6 +20,7 @@ import (
"github.com/ethereum-optimism/optimism/indexer/etl"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/indexer/processors"
"github.com/ethereum-optimism/optimism/indexer/processors/bridge"
"github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/ethereum-optimism/optimism/op-service/metrics"
)
......@@ -82,7 +83,7 @@ func NewIndexer(
}
// Bridge
bridgeProcessor, err := processors.NewBridgeProcessor(log, db, l1Etl, chainConfig)
bridgeProcessor, err := processors.NewBridgeProcessor(log, db, bridge.NewMetrics(metricsRegistry), l1Etl, chainConfig)
if err != nil {
return nil, err
}
......
......@@ -7,6 +7,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
"github.com/prometheus/client_golang/prometheus"
)
......
This diff is collapsed.
......@@ -8,6 +8,7 @@ import (
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/processors/contracts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
......@@ -17,7 +18,7 @@ import (
// 1. OptimismPortal
// 2. L1CrossDomainMessenger
// 3. L1StandardBridge
func L1ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, l1Contracts config.L1Contracts, fromHeight, toHeight *big.Int) error {
func L1ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, metrics L1Metricer, l1Contracts config.L1Contracts, fromHeight, toHeight *big.Int) error {
// (1) OptimismPortal
optimismPortalTxDeposits, err := contracts.OptimismPortalTransactionDepositEvents(l1Contracts.OptimismPortalProxy, db, fromHeight, toHeight)
if err != nil {
......@@ -44,6 +45,7 @@ func L1ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, l1Contracts
if err := db.BridgeTransactions.StoreL1TransactionDeposits(transactionDeposits); err != nil {
return err
}
metrics.RecordL1TransactionDeposits(len(transactionDeposits))
}
// (2) L1CrossDomainMessenger
......@@ -56,7 +58,7 @@ func L1ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, l1Contracts
}
sentMessages := make(map[logKey]*contracts.CrossDomainMessengerSentMessageEvent, len(crossDomainSentMessages))
l1BridgeMessages := make([]database.L1BridgeMessage, len(crossDomainSentMessages))
bridgeMessages := make([]database.L1BridgeMessage, len(crossDomainSentMessages))
for i := range crossDomainSentMessages {
sentMessage := crossDomainSentMessages[i]
sentMessages[logKey{sentMessage.Event.BlockHash, sentMessage.Event.LogIndex}] = &sentMessage
......@@ -68,12 +70,13 @@ func L1ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, l1Contracts
return fmt.Errorf("expected TransactionDeposit preceding SentMessage event. tx_hash = %s", sentMessage.Event.TransactionHash.String())
}
l1BridgeMessages[i] = database.L1BridgeMessage{TransactionSourceHash: portalDeposit.DepositTx.SourceHash, BridgeMessage: sentMessage.BridgeMessage}
bridgeMessages[i] = database.L1BridgeMessage{TransactionSourceHash: portalDeposit.DepositTx.SourceHash, BridgeMessage: sentMessage.BridgeMessage}
}
if len(l1BridgeMessages) > 0 {
if err := db.BridgeMessages.StoreL1BridgeMessages(l1BridgeMessages); err != nil {
if len(bridgeMessages) > 0 {
if err := db.BridgeMessages.StoreL1BridgeMessages(bridgeMessages); err != nil {
return err
}
metrics.RecordL1CrossDomainSentMessages(len(bridgeMessages))
}
// (3) L1StandardBridge
......@@ -85,7 +88,8 @@ func L1ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, l1Contracts
log.Info("detected bridge deposits", "size", len(initiatedBridges))
}
l1BridgeDeposits := make([]database.L1BridgeDeposit, len(initiatedBridges))
bridgedTokens := make(map[common.Address]int)
bridgeDeposits := make([]database.L1BridgeDeposit, len(initiatedBridges))
for i := range initiatedBridges {
initiatedBridge := initiatedBridges[i]
......@@ -102,15 +106,19 @@ func L1ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, l1Contracts
}
initiatedBridge.BridgeTransfer.CrossDomainMessageHash = &sentMessage.BridgeMessage.MessageHash
l1BridgeDeposits[i] = database.L1BridgeDeposit{
bridgedTokens[initiatedBridge.BridgeTransfer.TokenPair.LocalTokenAddress]++
bridgeDeposits[i] = database.L1BridgeDeposit{
TransactionSourceHash: portalDeposit.DepositTx.SourceHash,
BridgeTransfer: initiatedBridge.BridgeTransfer,
}
}
if len(l1BridgeDeposits) > 0 {
if err := db.BridgeTransfers.StoreL1BridgeDeposits(l1BridgeDeposits); err != nil {
if len(bridgeDeposits) > 0 {
if err := db.BridgeTransfers.StoreL1BridgeDeposits(bridgeDeposits); err != nil {
return err
}
for tokenAddr, size := range bridgedTokens {
metrics.RecordL1InitiatedBridgeTransfers(tokenAddr, size)
}
}
return nil
......@@ -121,7 +129,7 @@ func L1ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, l1Contracts
// 1. OptimismPortal (Bedrock prove & finalize steps)
// 2. L1CrossDomainMessenger (relayMessage marker)
// 3. L1StandardBridge (no-op, since this is simply a wrapper over the L1CrossDomainMessenger)
func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, l1Contracts config.L1Contracts, fromHeight, toHeight *big.Int) error {
func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, metrics L1Metricer, l1Contracts config.L1Contracts, fromHeight, toHeight *big.Int) error {
// (1) OptimismPortal (proven withdrawals)
provenWithdrawals, err := contracts.OptimismPortalWithdrawalProvenEvents(l1Contracts.OptimismPortalProxy, db, fromHeight, toHeight)
if err != nil {
......@@ -146,6 +154,9 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, l1Contracts
return err
}
}
if len(provenWithdrawals) > 0 {
metrics.RecordL1ProvenWithdrawals(len(provenWithdrawals))
}
// (2) OptimismPortal (finalized withdrawals)
finalizedWithdrawals, err := contracts.OptimismPortalWithdrawalFinalizedEvents(l1Contracts.OptimismPortalProxy, db, fromHeight, toHeight)
......@@ -171,6 +182,9 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, l1Contracts
return err
}
}
if len(finalizedWithdrawals) > 0 {
metrics.RecordL1FinalizedWithdrawals(len(finalizedWithdrawals))
}
// (3) L1CrossDomainMessenger
crossDomainRelayedMessages, err := contracts.CrossDomainMessengerRelayedMessageEvents("l1", l1Contracts.L1CrossDomainMessengerProxy, db, fromHeight, toHeight)
......@@ -198,6 +212,9 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, l1Contracts
return err
}
}
if len(crossDomainRelayedMessages) > 0 {
metrics.RecordL1CrossDomainRelayedMessages(len(crossDomainRelayedMessages))
}
// (4) L1StandardBridge
finalizedBridges, err := contracts.StandardBridgeFinalizedEvents("l1", l1Contracts.L1StandardBridgeProxy, db, fromHeight, toHeight)
......@@ -208,6 +225,7 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, l1Contracts
log.Info("detected finalized bridge withdrawals", "size", len(finalizedBridges))
}
finalizedTokens := make(map[common.Address]int)
for i := range finalizedBridges {
// Nothing actionable on the database. However, we can treat the relayed message
// as an invariant by ensuring we can query for a deposit by the same hash
......@@ -218,8 +236,7 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, l1Contracts
return fmt.Errorf("expected RelayedMessage following BridgeFinalized event. tx_hash = %s", finalizedBridge.Event.TransactionHash.String())
}
// Since the message hash is computed from the relayed message, this ensures the deposit fields must match. For good measure,
// we may choose to make sure `withdrawal.BridgeTransfer` matches with the finalized bridge
// Since the message hash is computed from the relayed message, this ensures the deposit fields must match
withdrawal, err := db.BridgeTransfers.L2BridgeWithdrawalWithFilter(database.BridgeTransfer{CrossDomainMessageHash: &relayedMessage.MessageHash})
if err != nil {
return err
......@@ -227,6 +244,13 @@ func L1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, l1Contracts
log.Error("missing L2StandardBridge withdrawal on L1 finalization", "tx_hash", finalizedBridge.Event.TransactionHash.String())
return fmt.Errorf("missing L2StandardBridge withdrawal on L1 finalization. tx_hash: %s", finalizedBridge.Event.TransactionHash.String())
}
finalizedTokens[finalizedBridge.BridgeTransfer.TokenPair.LocalTokenAddress]++
}
if len(finalizedBridges) > 0 {
for tokenAddr, size := range finalizedTokens {
metrics.RecordL1FinalizedBridgeTransfers(tokenAddr, size)
}
}
// a-ok!
......
......@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/processors/contracts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
......@@ -17,7 +18,7 @@ import (
// 1. OptimismPortal
// 2. L2CrossDomainMessenger
// 3. L2StandardBridge
func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, l2Contracts config.L2Contracts, fromHeight, toHeight *big.Int) error {
func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, metrics L2Metricer, l2Contracts config.L2Contracts, fromHeight, toHeight *big.Int) error {
// (1) L2ToL1MessagePasser
l2ToL1MPMessagesPassed, err := contracts.L2ToL1MessagePasserMessagePassedEvents(l2Contracts.L2ToL1MessagePasser, db, fromHeight, toHeight)
if err != nil {
......@@ -44,6 +45,7 @@ func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, l2Contracts
if err := db.BridgeTransactions.StoreL2TransactionWithdrawals(transactionWithdrawals); err != nil {
return err
}
metrics.RecordL2TransactionWithdrawals(len(transactionWithdrawals))
}
// (2) L2CrossDomainMessenger
......@@ -56,7 +58,7 @@ func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, l2Contracts
}
sentMessages := make(map[logKey]*contracts.CrossDomainMessengerSentMessageEvent, len(crossDomainSentMessages))
l2BridgeMessages := make([]database.L2BridgeMessage, len(crossDomainSentMessages))
bridgeMessages := make([]database.L2BridgeMessage, len(crossDomainSentMessages))
for i := range crossDomainSentMessages {
sentMessage := crossDomainSentMessages[i]
sentMessages[logKey{sentMessage.Event.BlockHash, sentMessage.Event.LogIndex}] = &sentMessage
......@@ -68,13 +70,13 @@ func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, l2Contracts
return fmt.Errorf("expected MessagePassedEvent preceding SentMessage. tx_hash = %s", sentMessage.Event.TransactionHash.String())
}
l2BridgeMessages[i] = database.L2BridgeMessage{TransactionWithdrawalHash: messagePassed.WithdrawalHash, BridgeMessage: sentMessage.BridgeMessage}
bridgeMessages[i] = database.L2BridgeMessage{TransactionWithdrawalHash: messagePassed.WithdrawalHash, BridgeMessage: sentMessage.BridgeMessage}
}
if len(l2BridgeMessages) > 0 {
if err := db.BridgeMessages.StoreL2BridgeMessages(l2BridgeMessages); err != nil {
if len(bridgeMessages) > 0 {
if err := db.BridgeMessages.StoreL2BridgeMessages(bridgeMessages); err != nil {
return err
}
metrics.RecordL2CrossDomainSentMessages(len(bridgeMessages))
}
// (3) L2StandardBridge
......@@ -86,7 +88,8 @@ func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, l2Contracts
log.Info("detected bridge withdrawals", "size", len(initiatedBridges))
}
l2BridgeWithdrawals := make([]database.L2BridgeWithdrawal, len(initiatedBridges))
bridgedTokens := make(map[common.Address]int)
bridgeWithdrawals := make([]database.L2BridgeWithdrawal, len(initiatedBridges))
for i := range initiatedBridges {
initiatedBridge := initiatedBridges[i]
......@@ -103,13 +106,19 @@ func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, l2Contracts
}
initiatedBridge.BridgeTransfer.CrossDomainMessageHash = &sentMessage.BridgeMessage.MessageHash
l2BridgeWithdrawals[i] = database.L2BridgeWithdrawal{TransactionWithdrawalHash: messagePassed.WithdrawalHash, BridgeTransfer: initiatedBridge.BridgeTransfer}
bridgedTokens[initiatedBridge.BridgeTransfer.TokenPair.LocalTokenAddress]++
bridgeWithdrawals[i] = database.L2BridgeWithdrawal{
TransactionWithdrawalHash: messagePassed.WithdrawalHash,
BridgeTransfer: initiatedBridge.BridgeTransfer,
}
}
if len(l2BridgeWithdrawals) > 0 {
if err := db.BridgeTransfers.StoreL2BridgeWithdrawals(l2BridgeWithdrawals); err != nil {
if len(bridgeWithdrawals) > 0 {
if err := db.BridgeTransfers.StoreL2BridgeWithdrawals(bridgeWithdrawals); err != nil {
return err
}
for tokenAddr, size := range bridgedTokens {
metrics.RecordL2InitiatedBridgeTransfers(tokenAddr, size)
}
}
// a-ok!
......@@ -122,7 +131,7 @@ func L2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, l2Contracts
// 2. L2StandardBridge (no-op, since this is simply a wrapper over the L2CrossDomainMEssenger)
//
// NOTE: Unlike L1, there's no L2ToL1MessagePasser stage since transaction deposits are apart of the block derivation process.
func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, l2Contracts config.L2Contracts, fromHeight, toHeight *big.Int) error {
func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, metrics L2Metricer, l2Contracts config.L2Contracts, fromHeight, toHeight *big.Int) error {
// (1) L2CrossDomainMessenger
crossDomainRelayedMessages, err := contracts.CrossDomainMessengerRelayedMessageEvents("l2", l2Contracts.L2CrossDomainMessenger, db, fromHeight, toHeight)
if err != nil {
......@@ -149,6 +158,9 @@ func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, l2Contracts
return err
}
}
if len(relayedMessages) > 0 {
metrics.RecordL2CrossDomainRelayedMessages(len(relayedMessages))
}
// (2) L2StandardBridge
finalizedBridges, err := contracts.StandardBridgeFinalizedEvents("l2", l2Contracts.L2StandardBridge, db, fromHeight, toHeight)
......@@ -159,6 +171,7 @@ func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, l2Contracts
log.Info("detected finalized bridge deposits", "size", len(finalizedBridges))
}
finalizedTokens := make(map[common.Address]int)
for i := range finalizedBridges {
// Nothing actionable on the database. However, we can treat the relayed message
// as an invariant by ensuring we can query for a deposit by the same hash
......@@ -169,8 +182,7 @@ func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, l2Contracts
return fmt.Errorf("expected RelayedMessage following BridgeFinalized event. tx_hash = %s", finalizedBridge.Event.TransactionHash.String())
}
// Since the message hash is computed from the relayed message, this ensures the withdrawal fields must match. For good measure,
// we may choose to make sure `deposit.BridgeTransfer` matches with the finalized bridge
// Since the message hash is computed from the relayed message, this ensures the withdrawal fields must match
deposit, err := db.BridgeTransfers.L1BridgeDepositWithFilter(database.BridgeTransfer{CrossDomainMessageHash: &relayedMessage.MessageHash})
if err != nil {
return err
......@@ -178,6 +190,13 @@ func L2ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, l2Contracts
log.Error("missing L1StandardBridge deposit on L2 finalization", "tx_hash", finalizedBridge.Event.TransactionHash.String())
return errors.New("missing L1StandardBridge deposit on L2 finalization")
}
finalizedTokens[finalizedBridge.BridgeTransfer.TokenPair.LocalTokenAddress]++
}
if len(finalizedBridges) > 0 {
for tokenAddr, size := range finalizedTokens {
metrics.RecordL2FinalizedBridgeTransfers(tokenAddr, size)
}
}
// a-ok!
......
package bridge
import (
"math/big"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
)
var (
MetricsNamespace string = "bridge"
)
type L1Metricer interface {
RecordLatestIndexedL1Height(height *big.Int)
RecordL1TransactionDeposits(size int)
RecordL1ProvenWithdrawals(size int)
RecordL1FinalizedWithdrawals(size int)
RecordL1CrossDomainSentMessages(size int)
RecordL1CrossDomainRelayedMessages(size int)
RecordL1InitiatedBridgeTransfers(token common.Address, size int)
RecordL1FinalizedBridgeTransfers(token common.Address, size int)
}
type L2Metricer interface {
RecordLatestIndexedL2Height(height *big.Int)
RecordL2TransactionWithdrawals(size int)
RecordL2CrossDomainSentMessages(size int)
RecordL2CrossDomainRelayedMessages(size int)
RecordL2InitiatedBridgeTransfers(token common.Address, size int)
RecordL2FinalizedBridgeTransfers(token common.Address, size int)
}
type Metricer interface {
L1Metricer
L2Metricer
RecordInterval() (done func(err error))
}
type bridgeMetrics struct {
intervalTick prometheus.Counter
intervalDuration prometheus.Histogram
intervalFailures prometheus.Counter
latestL1Height prometheus.Gauge
latestL2Height prometheus.Gauge
txDeposits prometheus.Counter
txWithdrawals prometheus.Counter
provenWithdrawals prometheus.Counter
finalizedWithdrawals prometheus.Counter
sentMessages *prometheus.CounterVec
relayedMessages *prometheus.CounterVec
initiatedBridgeTransfers *prometheus.CounterVec
finalizedBridgeTransfers *prometheus.CounterVec
}
func NewMetrics(registry *prometheus.Registry) Metricer {
factory := metrics.With(registry)
return &bridgeMetrics{
intervalTick: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "intervals_total",
Help: "number of times processing loop has run",
}),
intervalDuration: factory.NewHistogram(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Name: "interval_seconds",
Help: "duration elapsed in the processing loop",
}),
intervalFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "failures_total",
Help: "number of failures encountered",
}),
latestL1Height: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: "l1",
Name: "height",
Help: "the latest processed l1 block height",
}),
latestL2Height: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: "l2",
Name: "height",
Help: "the latest processed l2 block height",
}),
txDeposits: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "tx_deposits",
Help: "number of processed transactions deposited from l1",
}),
txWithdrawals: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "tx_withdrawals",
Help: "number of processed transactions withdrawn from l2",
}),
provenWithdrawals: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "proven_withdrawals",
Help: "number of proven tx withdrawals on l1",
}),
finalizedWithdrawals: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "finalized_withdrawals",
Help: "number of finalized tx withdrawals on l1",
}),
sentMessages: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "sent_messages",
Help: "number of bridged messages between l1 and l2",
}, []string{
"chain",
}),
relayedMessages: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "relayed_messages",
Help: "number of relayed messages between l1 and l2",
}, []string{
"chain",
}),
initiatedBridgeTransfers: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "initiated_token_transfers",
Help: "number of bridged tokens between l1 and l2",
}, []string{
"chain",
"token_address",
}),
finalizedBridgeTransfers: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "finalized_token_transfers",
Help: "number of finalized token transfers between l1 and l2",
}, []string{
"chain",
"token_address",
}),
}
}
func (m *bridgeMetrics) RecordInterval() func(error) {
m.intervalTick.Inc()
timer := prometheus.NewTimer(m.intervalDuration)
return func(err error) {
timer.ObserveDuration()
if err != nil {
m.intervalFailures.Inc()
}
}
}
// L1Metricer
func (m *bridgeMetrics) RecordLatestIndexedL1Height(height *big.Int) {
m.latestL1Height.Set(float64(height.Uint64()))
}
func (m *bridgeMetrics) RecordL1TransactionDeposits(size int) {
m.txDeposits.Add(float64(size))
}
func (m *bridgeMetrics) RecordL1ProvenWithdrawals(size int) {
m.provenWithdrawals.Add(float64(size))
}
func (m *bridgeMetrics) RecordL1FinalizedWithdrawals(size int) {
m.finalizedWithdrawals.Add(float64(size))
}
func (m *bridgeMetrics) RecordL1CrossDomainSentMessages(size int) {
m.sentMessages.WithLabelValues("l1").Add(float64(size))
}
func (m *bridgeMetrics) RecordL1CrossDomainRelayedMessages(size int) {
m.relayedMessages.WithLabelValues("l1").Add(float64(size))
}
func (m *bridgeMetrics) RecordL1InitiatedBridgeTransfers(tokenAddr common.Address, size int) {
m.initiatedBridgeTransfers.WithLabelValues("l1", tokenAddr.String()).Add(float64(size))
}
func (m *bridgeMetrics) RecordL1FinalizedBridgeTransfers(tokenAddr common.Address, size int) {
m.finalizedBridgeTransfers.WithLabelValues("l1", tokenAddr.String()).Add(float64(size))
}
// L2Metricer
func (m *bridgeMetrics) RecordLatestIndexedL2Height(height *big.Int) {
m.latestL2Height.Set(float64(height.Uint64()))
}
func (m *bridgeMetrics) RecordL2TransactionWithdrawals(size int) {
m.txWithdrawals.Add(float64(size))
}
func (m *bridgeMetrics) RecordL2CrossDomainSentMessages(size int) {
m.sentMessages.WithLabelValues("l2").Add(float64(size))
}
func (m *bridgeMetrics) RecordL2CrossDomainRelayedMessages(size int) {
m.relayedMessages.WithLabelValues("l2").Add(float64(size))
}
func (m *bridgeMetrics) RecordL2InitiatedBridgeTransfers(tokenAddr common.Address, size int) {
m.initiatedBridgeTransfers.WithLabelValues("l2", tokenAddr.String()).Add(float64(size))
}
func (m *bridgeMetrics) RecordL2FinalizedBridgeTransfers(tokenAddr common.Address, size int) {
m.finalizedBridgeTransfers.WithLabelValues("l2", tokenAddr.String()).Add(float64(size))
}
......@@ -15,21 +15,23 @@ type LegacyBridgeEvent struct {
}
func L1StandardBridgeLegacyDepositInitiatedEvents(contractAddress common.Address, db *database.DB, fromHeight, toHeight *big.Int) ([]LegacyBridgeEvent, error) {
// The L1StandardBridge ABI contains the legacy events
l1StandardBridgeAbi, err := bindings.L1StandardBridgeMetaData.GetAbi()
if err != nil {
return nil, err
}
// The L1StandardBridge contains the legacy events
ethDepositEventAbi := l1StandardBridgeAbi.Events["ETHDepositInitiated"]
erc20DepositEventAbi := l1StandardBridgeAbi.Events["ERC20DepositInitiated"]
// Grab both ETH & ERC20 Events
ethDepositEvents, err := db.ContractEvents.L1ContractEventsWithFilter(database.ContractEvent{ContractAddress: contractAddress, EventSignature: ethDepositEventAbi.ID}, fromHeight, toHeight)
contractEventFilter := database.ContractEvent{ContractAddress: contractAddress, EventSignature: ethDepositEventAbi.ID}
ethDepositEvents, err := db.ContractEvents.L1ContractEventsWithFilter(contractEventFilter, fromHeight, toHeight)
if err != nil {
return nil, err
}
erc20DepositEvents, err := db.ContractEvents.L1ContractEventsWithFilter(database.ContractEvent{ContractAddress: contractAddress, EventSignature: erc20DepositEventAbi.ID}, fromHeight, toHeight)
contractEventFilter.EventSignature = erc20DepositEventAbi.ID
erc20DepositEvents, err := db.ContractEvents.L1ContractEventsWithFilter(contractEventFilter, fromHeight, toHeight)
if err != nil {
return nil, err
}
......@@ -81,13 +83,15 @@ func L1StandardBridgeLegacyDepositInitiatedEvents(contractAddress common.Address
}
func L2StandardBridgeLegacyWithdrawalInitiatedEvents(contractAddress common.Address, db *database.DB, fromHeight, toHeight *big.Int) ([]LegacyBridgeEvent, error) {
// The L2StandardBridge ABI contains the legacy events
l2StandardBridgeAbi, err := bindings.L2StandardBridgeMetaData.GetAbi()
if err != nil {
return nil, err
}
withdrawalInitiatedEventAbi := l2StandardBridgeAbi.Events["WithdrawalInitiated"]
withdrawalEvents, err := db.ContractEvents.L2ContractEventsWithFilter(database.ContractEvent{ContractAddress: contractAddress, EventSignature: withdrawalInitiatedEventAbi.ID}, fromHeight, toHeight)
contractEventFilter := database.ContractEvent{ContractAddress: contractAddress, EventSignature: withdrawalInitiatedEventAbi.ID}
withdrawalEvents, err := db.ContractEvents.L2ContractEventsWithFilter(contractEventFilter, fromHeight, toHeight)
if err != nil {
return nil, err
}
......
......@@ -152,10 +152,10 @@ func (a *Agent) tryResolveClaims(ctx context.Context) error {
resolvableClaims = append(resolvableClaims, int64(claim.ContractIndex))
}
}
a.log.Info("Resolving claims", "numClaims", len(resolvableClaims))
if len(resolvableClaims) == 0 {
return errNoResolvableClaims
}
a.log.Info("Resolving claims", "numClaims", len(resolvableClaims))
var wg sync.WaitGroup
wg.Add(len(resolvableClaims))
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment