Commit 29008011 authored by Hamdi Allam's avatar Hamdi Allam Committed by GitHub

Merge pull request #8046 from ethereum-optimism/indexer.bridge.processor

feat(indexer): bridge processor independent L1 & L2 tasks
parents 314b0414 4d612f57
......@@ -37,6 +37,10 @@ func BlockHeaderFromHeader(header *types.Header) BlockHeader {
}
}
func (b BlockHeader) String() string {
return fmt.Sprintf("{Hash: %s, Number: %s}", b.Hash, b.Number)
}
type L1BlockHeader struct {
BlockHeader `gorm:"embedded"`
}
......@@ -48,13 +52,13 @@ type L2BlockHeader struct {
type BlocksView interface {
L1BlockHeader(common.Hash) (*L1BlockHeader, error)
L1BlockHeaderWithFilter(BlockHeader) (*L1BlockHeader, error)
L1BlockHeaderWithScope(func(db *gorm.DB) *gorm.DB) (*L1BlockHeader, error)
L1LatestBlockHeader() (*L1BlockHeader, error)
L2BlockHeader(common.Hash) (*L2BlockHeader, error)
L2BlockHeaderWithFilter(BlockHeader) (*L2BlockHeader, error)
L2BlockHeaderWithScope(func(db *gorm.DB) *gorm.DB) (*L2BlockHeader, error)
L2LatestBlockHeader() (*L2BlockHeader, error)
LatestObservedEpoch(*big.Int, uint64) (*Epoch, error)
}
type BlocksDB interface {
......@@ -94,8 +98,12 @@ func (db *blocksDB) L1BlockHeader(hash common.Hash) (*L1BlockHeader, error) {
}
func (db *blocksDB) L1BlockHeaderWithFilter(filter BlockHeader) (*L1BlockHeader, error) {
return db.L1BlockHeaderWithScope(func(gorm *gorm.DB) *gorm.DB { return gorm.Where(&filter) })
}
func (db *blocksDB) L1BlockHeaderWithScope(scope func(*gorm.DB) *gorm.DB) (*L1BlockHeader, error) {
var l1Header L1BlockHeader
result := db.gorm.Where(&filter).Take(&l1Header)
result := db.gorm.Scopes(scope).Take(&l1Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
......@@ -137,8 +145,12 @@ func (db *blocksDB) L2BlockHeader(hash common.Hash) (*L2BlockHeader, error) {
}
func (db *blocksDB) L2BlockHeaderWithFilter(filter BlockHeader) (*L2BlockHeader, error) {
return db.L2BlockHeaderWithScope(func(gorm *gorm.DB) *gorm.DB { return gorm.Where(&filter) })
}
func (db *blocksDB) L2BlockHeaderWithScope(scope func(*gorm.DB) *gorm.DB) (*L2BlockHeader, error) {
var l2Header L2BlockHeader
result := db.gorm.Where(&filter).Take(&l2Header)
result := db.gorm.Scopes(scope).Take(&l2Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
......@@ -161,104 +173,3 @@ func (db *blocksDB) L2LatestBlockHeader() (*L2BlockHeader, error) {
return &l2Header, nil
}
// Auxiliary Methods on both L1 & L2
type Epoch struct {
L1BlockHeader L1BlockHeader `gorm:"embedded"`
L2BlockHeader L2BlockHeader `gorm:"embedded"`
}
// LatestObservedEpoch return the marker for latest epoch, observed on L1 & L2, within
// the specified bounds. In other words this returns the latest indexed L1 block that has
// a corresponding indexed L2 block with a matching L1Origin (equal timestamps).
//
// If `fromL1Height` (inclusive) is not specified, the search will start from genesis and
// continue all the way to latest indexed heights if `maxL1Range == 0`.
//
// For more, see the protocol spec:
// - https://github.com/ethereum-optimism/optimism/blob/develop/specs/derivation.md
func (db *blocksDB) LatestObservedEpoch(fromL1Height *big.Int, maxL1Range uint64) (*Epoch, error) {
// We use timestamps since that translates to both L1 & L2
var fromTimestamp, toTimestamp uint64
// Lower Bound (the default `fromTimestamp = l1_starting_height` (default=0) suffices genesis representation)
var header L1BlockHeader
if fromL1Height != nil {
result := db.gorm.Where("number = ?", fromL1Height).Take(&header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
fromTimestamp = header.Timestamp
} else {
// Take the lowest indexed L1 block to compute the lower bound
result := db.gorm.Order("number ASC").Take(&header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
fromL1Height = header.Number
fromTimestamp = header.Timestamp
}
// Upper Bound (lowest timestamp indexed between L1/L2 bounded by `maxL1Range`)
{
l1QueryFilter := fmt.Sprintf("timestamp >= %d", fromTimestamp)
if maxL1Range > 0 {
maxHeight := new(big.Int).Add(fromL1Height, big.NewInt(int64(maxL1Range)))
l1QueryFilter = fmt.Sprintf("%s AND number <= %d", l1QueryFilter, maxHeight)
}
// Fetch most recent header from l1_block_headers table
var l1Header L1BlockHeader
result := db.gorm.Where(l1QueryFilter).Order("timestamp DESC").Take(&l1Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
toTimestamp = l1Header.Timestamp
// Fetch most recent header from l2_block_headers table
var l2Header L2BlockHeader
result = db.gorm.Where("timestamp <= ?", toTimestamp).Order("timestamp DESC").Take(&l2Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
if l2Header.Timestamp < toTimestamp {
toTimestamp = l2Header.Timestamp
}
}
// Search for the latest indexed epoch within range. This is a faster query than doing an INNER JOIN between
// l1_block_headers and l2_block_headers which requires a full table scan to compute the resulting table.
l1Query := db.gorm.Table("l1_block_headers").Where("timestamp >= ? AND timestamp <= ?", fromTimestamp, toTimestamp)
l2Query := db.gorm.Table("l2_block_headers").Where("timestamp >= ? AND timestamp <= ?", fromTimestamp, toTimestamp)
query := db.gorm.Raw(`SELECT * FROM (?) AS l1_block_headers, (?) AS l2_block_headers
WHERE l1_block_headers.timestamp = l2_block_headers.timestamp
ORDER BY l2_block_headers.number DESC LIMIT 1`, l1Query, l2Query)
var epoch Epoch
result := query.Take(&epoch)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
return &epoch, nil
}
......@@ -50,9 +50,11 @@ type L2TransactionWithdrawal struct {
type BridgeTransactionsView interface {
L1TransactionDeposit(common.Hash) (*L1TransactionDeposit, error)
L1LatestBlockHeader() (*L1BlockHeader, error)
L1LatestFinalizedBlockHeader() (*L1BlockHeader, error)
L2TransactionWithdrawal(common.Hash) (*L2TransactionWithdrawal, error)
L2LatestBlockHeader() (*L2BlockHeader, error)
L2LatestFinalizedBlockHeader() (*L2BlockHeader, error)
}
type BridgeTransactionsDB interface {
......@@ -106,23 +108,41 @@ func (db *bridgeTransactionsDB) L1TransactionDeposit(sourceHash common.Hash) (*L
}
func (db *bridgeTransactionsDB) L1LatestBlockHeader() (*L1BlockHeader, error) {
// Markers for an indexed bridge event
// L1: Latest Transaction Deposit, Latest Proven/Finalized Withdrawal
l1DepositQuery := db.gorm.Table("l1_transaction_deposits").Order("l1_transaction_deposits.timestamp DESC").Limit(1)
l1DepositQuery = l1DepositQuery.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = l1_transaction_deposits.initiated_l1_event_guid")
l1DepositQuery = l1DepositQuery.Select("l1_contract_events.*")
// Latest Transaction Deposit
l1Query := db.gorm.Table("l1_transaction_deposits").Order("l1_transaction_deposits.timestamp DESC")
l1Query = l1Query.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = l1_transaction_deposits.initiated_l1_event_guid")
l1Query = l1Query.Joins("INNER JOIN l1_block_headers ON l1_block_headers.hash = l1_contract_events.block_hash")
l1Query = l1Query.Select("l1_block_headers.*")
l1ProvenQuery := db.gorm.Table("l2_transaction_withdrawals")
l1ProvenQuery = l1ProvenQuery.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = l2_transaction_withdrawals.proven_l1_event_guid")
l1ProvenQuery = l1ProvenQuery.Order("l1_contract_events.timestamp DESC").Select("l1_contract_events.*").Limit(1)
var l1Header L1BlockHeader
result := l1Query.Take(&l1Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
return &l1Header, nil
}
func (db *bridgeTransactionsDB) L1LatestFinalizedBlockHeader() (*L1BlockHeader, error) {
// A Proven, Finalized Event or Relayed Message
provenQuery := db.gorm.Table("l2_transaction_withdrawals").Order("timestamp DESC").Limit(1)
provenQuery = provenQuery.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = l2_transaction_withdrawals.proven_l1_event_guid")
provenQuery = provenQuery.Order("l1_contract_events.timestamp DESC").Select("l1_contract_events.*")
l1FinalizedQuery := db.gorm.Table("l2_transaction_withdrawals")
l1FinalizedQuery = l1FinalizedQuery.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = l2_transaction_withdrawals.proven_l1_event_guid")
l1FinalizedQuery = l1FinalizedQuery.Order("l1_contract_events.timestamp DESC").Select("l1_contract_events.*").Limit(1)
finalizedQuery := db.gorm.Table("l2_transaction_withdrawals").Order("timestamp DESC").Limit(1)
finalizedQuery = finalizedQuery.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = l2_transaction_withdrawals.proven_l1_event_guid")
finalizedQuery = finalizedQuery.Select("l1_contract_events.*")
l1Query := db.gorm.Table("((?) UNION (?) UNION (?)) AS latest_bridge_events", l1DepositQuery.Limit(1), l1ProvenQuery, l1FinalizedQuery)
l1Query = l1Query.Joins("INNER JOIN l1_block_headers ON l1_block_headers.hash = latest_bridge_events.block_hash")
l1Query = l1Query.Order("latest_bridge_events.timestamp DESC").Select("l1_block_headers.*")
relayedQuery := db.gorm.Table("l2_bridge_messages").Order("timestamp DESC")
relayedQuery = relayedQuery.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = l2_bridge_messages.relayed_message_event_guid")
relayedQuery = relayedQuery.Select("l1_contract_events.*")
l1Query := db.gorm.Table("((?) UNION (?) UNION (?)) AS finalized_bridge_events", provenQuery, finalizedQuery, relayedQuery)
l1Query = l1Query.Joins("INNER JOIN l1_block_headers ON l1_block_headers.hash = finalized_bridge_events.block_hash")
l1Query = l1Query.Order("finalized_bridge_events.timestamp DESC").Select("l1_block_headers.*")
var l1Header L1BlockHeader
result := l1Query.Take(&l1Header)
......@@ -251,3 +271,22 @@ func (db *bridgeTransactionsDB) L2LatestBlockHeader() (*L2BlockHeader, error) {
return latestL2DepositHeader, nil
}
}
func (db *bridgeTransactionsDB) L2LatestFinalizedBlockHeader() (*L2BlockHeader, error) {
// Only a Relayed message since we dont track L1 deposit inclusion status.
relayedQuery := db.gorm.Table("l1_bridge_messages").Order("timestamp DESC").Limit(1)
relayedQuery = relayedQuery.Joins("INNER JOIN l2_contract_events ON l2_contract_events.guid = l1_bridge_messages.relayed_message_event_guid")
relayedQuery = relayedQuery.Joins("INNER JOIN l2_block_headers ON l2_block_headers.hash = l2_contract_events.block_hash")
relayedQuery = relayedQuery.Select("l2_block_headers.*")
var l2Header L2BlockHeader
result := relayedQuery.Take(&l2Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
return &l2Header, nil
}
package database
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"gorm.io/gorm"
"github.com/stretchr/testify/mock"
)
......@@ -27,6 +26,11 @@ func (m *MockBlocksView) L1BlockHeaderWithFilter(BlockHeader) (*L1BlockHeader, e
return args.Get(0).(*L1BlockHeader), args.Error(1)
}
func (m *MockBlocksView) L1BlockHeaderWithScope(func(*gorm.DB) *gorm.DB) (*L1BlockHeader, error) {
args := m.Called()
return args.Get(0).(*L1BlockHeader), args.Error(1)
}
func (m *MockBlocksView) L1LatestBlockHeader() (*L1BlockHeader, error) {
args := m.Called()
......@@ -48,14 +52,14 @@ func (m *MockBlocksView) L2BlockHeaderWithFilter(BlockHeader) (*L2BlockHeader, e
return args.Get(0).(*L2BlockHeader), args.Error(1)
}
func (m *MockBlocksView) L2LatestBlockHeader() (*L2BlockHeader, error) {
func (m *MockBlocksView) L2BlockHeaderWithScope(func(*gorm.DB) *gorm.DB) (*L2BlockHeader, error) {
args := m.Called()
return args.Get(0).(*L2BlockHeader), args.Error(1)
return args.Get(0).(*L2BlockHeader), args.Error(2)
}
func (m *MockBlocksView) LatestObservedEpoch(*big.Int, uint64) (*Epoch, error) {
func (m *MockBlocksView) L2LatestBlockHeader() (*L2BlockHeader, error) {
args := m.Called()
return args.Get(0).(*Epoch), args.Error(1)
return args.Get(0).(*L2BlockHeader), args.Error(1)
}
type MockBlocksDB struct {
......
......@@ -43,7 +43,7 @@ func TestE2EBridgeL1CrossDomainMessenger(t *testing.T) {
// wait for processor catchup
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.BridgeProcessor.LatestL1Header
l1Header := testSuite.Indexer.BridgeProcessor.LastL1Header
return l1Header != nil && l1Header.Number.Uint64() >= sentMsgReceipt.BlockNumber.Uint64(), nil
}))
......@@ -77,7 +77,7 @@ func TestE2EBridgeL1CrossDomainMessenger(t *testing.T) {
l2DepositReceipt, err := wait.ForReceiptOK(context.Background(), testSuite.L2Client, transaction.L2TransactionHash)
require.NoError(t, err)
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l2Header := testSuite.Indexer.BridgeProcessor.LatestL2Header
l2Header := testSuite.Indexer.BridgeProcessor.LastFinalizedL2Header
return l2Header != nil && l2Header.Number.Uint64() >= l2DepositReceipt.BlockNumber.Uint64(), nil
}))
......@@ -130,7 +130,7 @@ func TestE2EBridgeL2CrossDomainMessenger(t *testing.T) {
// wait for processor catchup
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l2Header := testSuite.Indexer.BridgeProcessor.LatestL2Header
l2Header := testSuite.Indexer.BridgeProcessor.LastL2Header
return l2Header != nil && l2Header.Number.Uint64() >= sentMsgReceipt.BlockNumber.Uint64(), nil
}))
......@@ -159,7 +159,7 @@ func TestE2EBridgeL2CrossDomainMessenger(t *testing.T) {
// wait for processor catchup
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.BridgeProcessor.LatestL1Header
l1Header := testSuite.Indexer.BridgeProcessor.LastFinalizedL1Header
return l1Header != nil && l1Header.Number.Uint64() >= finalizedReceipt.BlockNumber.Uint64(), nil
}))
......
......@@ -49,7 +49,7 @@ func TestE2EBridgeTransactionsOptimismPortalDeposits(t *testing.T) {
// wait for processor catchup
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.BridgeProcessor.LatestL1Header
l1Header := testSuite.Indexer.BridgeProcessor.LastL1Header
return l1Header != nil && l1Header.Number.Uint64() >= depositReceipt.BlockNumber.Uint64(), nil
}))
......@@ -104,7 +104,7 @@ func TestE2EBridgeTransactionsL2ToL1MessagePasserWithdrawal(t *testing.T) {
// wait for processor catchup
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l2Header := testSuite.Indexer.BridgeProcessor.LatestL2Header
l2Header := testSuite.Indexer.BridgeProcessor.LastL2Header
return l2Header != nil && l2Header.Number.Uint64() >= withdrawReceipt.BlockNumber.Uint64(), nil
}))
......@@ -134,7 +134,7 @@ func TestE2EBridgeTransactionsL2ToL1MessagePasserWithdrawal(t *testing.T) {
withdrawParams, proveReceipt := op_e2e.ProveWithdrawal(t, *testSuite.OpCfg, testSuite.L1Client, testSuite.OpSys.EthInstances["sequencer"], testSuite.OpCfg.Secrets.Alice, withdrawReceipt)
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.BridgeProcessor.LatestL1Header
l1Header := testSuite.Indexer.BridgeProcessor.LastFinalizedL1Header
return l1Header != nil && l1Header.Number.Uint64() >= proveReceipt.BlockNumber.Uint64(), nil
}))
......@@ -152,7 +152,7 @@ func TestE2EBridgeTransactionsL2ToL1MessagePasserWithdrawal(t *testing.T) {
finalizeReceipt := op_e2e.FinalizeWithdrawal(t, *testSuite.OpCfg, testSuite.L1Client, testSuite.OpCfg.Secrets.Alice, proveReceipt, withdrawParams)
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.BridgeProcessor.LatestL1Header
l1Header := testSuite.Indexer.BridgeProcessor.LastFinalizedL1Header
return l1Header != nil && l1Header.Number.Uint64() >= finalizeReceipt.BlockNumber.Uint64(), nil
}))
......@@ -194,7 +194,7 @@ func TestE2EBridgeTransactionsL2ToL1MessagePasserFailedWithdrawal(t *testing.T)
// Prove&Finalize withdrawal
_, finalizeReceipt := op_e2e.ProveAndFinalizeWithdrawal(t, *testSuite.OpCfg, testSuite.L1Client, testSuite.OpSys.EthInstances["sequencer"], testSuite.OpCfg.Secrets.Alice, withdrawReceipt)
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.BridgeProcessor.LatestL1Header
l1Header := testSuite.Indexer.BridgeProcessor.LastFinalizedL1Header
return l1Header != nil && l1Header.Number.Uint64() >= finalizeReceipt.BlockNumber.Uint64(), nil
}))
......
......@@ -48,7 +48,7 @@ func TestE2EBridgeTransfersStandardBridgeETHDeposit(t *testing.T) {
// wait for processor catchup
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.BridgeProcessor.LatestL1Header
l1Header := testSuite.Indexer.BridgeProcessor.LastL1Header
return l1Header != nil && l1Header.Number.Uint64() >= depositReceipt.BlockNumber.Uint64(), nil
}))
......@@ -82,7 +82,7 @@ func TestE2EBridgeTransfersStandardBridgeETHDeposit(t *testing.T) {
l2DepositReceipt, err := wait.ForReceiptOK(context.Background(), testSuite.L2Client, types.NewTx(depositInfo.DepositTx).Hash())
require.NoError(t, err)
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l2Header := testSuite.Indexer.BridgeProcessor.LatestL2Header
l2Header := testSuite.Indexer.BridgeProcessor.LastFinalizedL2Header
return l2Header != nil && l2Header.Number.Uint64() >= l2DepositReceipt.BlockNumber.Uint64(), nil
}))
......@@ -115,7 +115,7 @@ func TestE2EBridgeTransfersOptimismPortalETHReceive(t *testing.T) {
// wait for processor catchup
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.BridgeProcessor.LatestL1Header
l1Header := testSuite.Indexer.BridgeProcessor.LastL1Header
return l1Header != nil && l1Header.Number.Uint64() >= portalDepositReceipt.BlockNumber.Uint64(), nil
}))
......@@ -141,7 +141,7 @@ func TestE2EBridgeTransfersOptimismPortalETHReceive(t *testing.T) {
l2DepositReceipt, err := wait.ForReceiptOK(context.Background(), testSuite.L2Client, types.NewTx(depositInfo.DepositTx).Hash())
require.NoError(t, err)
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l2Header := testSuite.Indexer.BridgeProcessor.LatestL2Header
l2Header := testSuite.Indexer.BridgeProcessor.LastFinalizedL2Header
return l2Header != nil && l2Header.Number.Uint64() >= l2DepositReceipt.BlockNumber.Uint64(), nil
}))
......@@ -183,7 +183,7 @@ func TestE2EBridgeTransfersCursoredDeposits(t *testing.T) {
// wait for processor catchup of the latest tx
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.BridgeProcessor.LatestL1Header
l1Header := testSuite.Indexer.BridgeProcessor.LastL1Header
return l1Header != nil && l1Header.Number.Uint64() >= depositReceipts[2].BlockNumber.Uint64(), nil
}))
......@@ -251,7 +251,7 @@ func TestE2EBridgeTransfersStandardBridgeETHWithdrawal(t *testing.T) {
// wait for processor catchup
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l2Header := testSuite.Indexer.BridgeProcessor.LatestL2Header
l2Header := testSuite.Indexer.BridgeProcessor.LastL2Header
return l2Header != nil && l2Header.Number.Uint64() >= withdrawReceipt.BlockNumber.Uint64(), nil
}))
......@@ -289,7 +289,7 @@ func TestE2EBridgeTransfersStandardBridgeETHWithdrawal(t *testing.T) {
// wait for processor catchup
proveReceipt, finalizeReceipt := op_e2e.ProveAndFinalizeWithdrawal(t, *testSuite.OpCfg, testSuite.L1Client, testSuite.OpSys.EthInstances["sequencer"], testSuite.OpCfg.Secrets.Alice, withdrawReceipt)
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.BridgeProcessor.LatestL1Header
l1Header := testSuite.Indexer.BridgeProcessor.LastFinalizedL1Header
return l1Header != nil && l1Header.Number.Uint64() >= finalizeReceipt.BlockNumber.Uint64(), nil
}))
......@@ -335,7 +335,7 @@ func TestE2EBridgeTransfersL2ToL1MessagePasserETHReceive(t *testing.T) {
// wait for processor catchup
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l2Header := testSuite.Indexer.BridgeProcessor.LatestL2Header
l2Header := testSuite.Indexer.BridgeProcessor.LastL2Header
return l2Header != nil && l2Header.Number.Uint64() >= l2ToL1WithdrawReceipt.BlockNumber.Uint64(), nil
}))
......@@ -368,7 +368,7 @@ func TestE2EBridgeTransfersL2ToL1MessagePasserETHReceive(t *testing.T) {
// wait for processor catchup
proveReceipt, finalizeReceipt := op_e2e.ProveAndFinalizeWithdrawal(t, *testSuite.OpCfg, testSuite.L1Client, testSuite.OpSys.EthInstances["sequencer"], testSuite.OpCfg.Secrets.Alice, l2ToL1WithdrawReceipt)
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.BridgeProcessor.LatestL1Header
l1Header := testSuite.Indexer.BridgeProcessor.LastFinalizedL1Header
return l1Header != nil && l1Header.Number.Uint64() >= finalizeReceipt.BlockNumber.Uint64(), nil
}))
......@@ -411,7 +411,7 @@ func TestE2EBridgeTransfersCursoredWithdrawals(t *testing.T) {
// wait for processor catchup of the latest tx
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l2Header := testSuite.Indexer.BridgeProcessor.LatestL2Header
l2Header := testSuite.Indexer.BridgeProcessor.LastL2Header
return l2Header != nil && l2Header.Number.Uint64() >= withdrawReceipts[2].BlockNumber.Uint64(), nil
}))
......@@ -497,7 +497,7 @@ func TestClientBridgeFunctions(t *testing.T) {
l1Opts.Value = l2Opts.Value
depositTx, err := optimismPortal.Receive(l1Opts)
require.NoError(t, err)
_, err = wait.ForReceiptOK(context.Background(), testSuite.L1Client, depositTx.Hash())
depositReceipt, err := wait.ForReceiptOK(context.Background(), testSuite.L1Client, depositTx.Hash())
require.NoError(t, err)
mintSum = new(big.Int).Add(mintSum, depositTx.Value())
......@@ -508,10 +508,13 @@ func TestClientBridgeFunctions(t *testing.T) {
l2ToL1WithdrawReceipt, err := wait.ForReceiptOK(context.Background(), testSuite.L2Client, l2ToL1MessagePasserWithdrawTx.Hash())
require.NoError(t, err)
// (3.c) wait for indexer processor to catchup with the L2 block containing the withdrawal tx
// (3.c) wait for indexer processor to catchup with the L1 & L2 block containing the deposit & withdrawal tx
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
l2Header := testSuite.Indexer.BridgeProcessor.LatestL2Header
return l2Header != nil && l2Header.Number.Uint64() >= l2ToL1WithdrawReceipt.BlockNumber.Uint64(), nil
l1Header := testSuite.Indexer.BridgeProcessor.LastL1Header
l2Header := testSuite.Indexer.BridgeProcessor.LastL2Header
seenL2 := l2Header != nil && l2Header.Number.Uint64() >= l2ToL1WithdrawReceipt.BlockNumber.Uint64()
seenL1 := l1Header != nil && l1Header.Number.Uint64() >= depositReceipt.BlockNumber.Uint64()
return seenL1 && seenL2, nil
}))
withdrawSum = new(big.Int).Add(withdrawSum, l2ToL1MessagePasserWithdrawTx.Value())
......
......@@ -59,10 +59,9 @@ func (etl *ETL) Start() error {
if etl.worker != nil {
return errors.New("already started")
}
etl.log.Info("starting etl...")
etl.worker = clock.NewLoopFn(clock.SystemClock, etl.tick, func() error {
etl.log.Info("shutting down batch producer")
close(etl.etlBatches) // can close the channel now, to signal to the consumer that we're done
etl.log.Info("stopped etl worker loop")
return nil
}, etl.loopInterval)
return nil
......
......@@ -21,6 +21,7 @@ import (
type L1ETL struct {
ETL
LatestHeader *types.Header
// the batch handler may do work that we can interrupt on shutdown
resourceCtx context.Context
......@@ -30,8 +31,7 @@ type L1ETL struct {
db *database.DB
mu sync.Mutex
mu sync.Mutex
listeners []chan interface{}
}
......@@ -101,7 +101,9 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
resCtx, resCancel := context.WithCancel(context.Background())
return &L1ETL{
ETL: etl,
ETL: etl,
LatestHeader: fromHeader,
db: db,
resourceCtx: resCtx,
resourceCancel: resCancel,
......@@ -123,32 +125,35 @@ func (l1Etl *L1ETL) Close() error {
if err := l1Etl.tasks.Wait(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to await batch handler completion: %w", err))
}
// close listeners
for i := range l1Etl.listeners {
close(l1Etl.listeners[i])
}
return result
}
func (l1Etl *L1ETL) Start() error {
l1Etl.log.Info("starting etl...")
// start ETL batch producer
if err := l1Etl.ETL.Start(); err != nil {
return fmt.Errorf("failed to start internal ETL: %w", err)
}
// start ETL batch consumer
l1Etl.tasks.Go(func() error {
for {
// Index incoming batches (only L1 blocks that have an emitted log)
batch, ok := <-l1Etl.etlBatches
if !ok {
l1Etl.log.Info("No more batches, shutting down L1 batch handler")
return nil
}
for batch := range l1Etl.etlBatches {
if err := l1Etl.handleBatch(batch); err != nil {
return fmt.Errorf("failed to handle batch, stopping L2 ETL: %w", err)
}
}
l1Etl.log.Info("no more batches, shutting down batch handler")
return nil
})
return nil
}
func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
// Index incoming batches (only L1 blocks that have an emitted log)
l1BlockHeaders := make([]database.L1BlockHeader, 0, len(batch.Headers))
for i := range batch.Headers {
if _, ok := batch.HeadersWithLog[batch.Headers[i].Hash()]; ok {
......@@ -195,9 +200,11 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
}
batch.Logger.Info("indexed batch")
l1Etl.LatestHeader = &batch.Headers[len(batch.Headers)-1]
// Notify Listeners
l1Etl.mu.Lock()
defer l1Etl.mu.Unlock()
for i := range l1Etl.listeners {
select {
case l1Etl.listeners[i] <- struct{}{}:
......@@ -206,7 +213,7 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
// up the previous notif
}
}
l1Etl.mu.Unlock()
return nil
}
......
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
......@@ -19,6 +20,7 @@ import (
type L2ETL struct {
ETL
LatestHeader *types.Header
// the batch handler may do work that we can interrupt on shutdown
resourceCtx context.Context
......@@ -27,6 +29,9 @@ type L2ETL struct {
tasks tasks.Group
db *database.DB
mu sync.Mutex
listeners []chan interface{}
}
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient,
......@@ -80,7 +85,9 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
resCtx, resCancel := context.WithCancel(context.Background())
return &L2ETL{
ETL: etl,
ETL: etl,
LatestHeader: fromHeader,
resourceCtx: resCtx,
resourceCancel: resCancel,
db: db,
......@@ -102,10 +109,16 @@ func (l2Etl *L2ETL) Close() error {
if err := l2Etl.tasks.Wait(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to await batch handler completion: %w", err))
}
// close listeners
for i := range l2Etl.listeners {
close(l2Etl.listeners[i])
}
return result
}
func (l2Etl *L2ETL) Start() error {
l2Etl.log.Info("starting etl...")
// start ETL batch producer
if err := l2Etl.ETL.Start(); err != nil {
return fmt.Errorf("failed to start internal ETL: %w", err)
......@@ -113,17 +126,13 @@ func (l2Etl *L2ETL) Start() error {
// start ETL batch consumer
l2Etl.tasks.Go(func() error {
for {
// Index incoming batches (all L2 blocks)
batch, ok := <-l2Etl.etlBatches
if !ok {
l2Etl.log.Info("No more batches, shutting down L2 batch handler")
return nil
}
for batch := range l2Etl.etlBatches {
if err := l2Etl.handleBatch(batch); err != nil {
return fmt.Errorf("failed to handle batch, stopping L2 ETL: %w", err)
}
}
l2Etl.log.Info("no more batches, shutting down batch handler")
return nil
})
return nil
}
......@@ -169,5 +178,30 @@ func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error {
}
batch.Logger.Info("indexed batch")
l2Etl.LatestHeader = &batch.Headers[len(batch.Headers)-1]
// Notify Listeners
l2Etl.mu.Lock()
defer l2Etl.mu.Unlock()
for i := range l2Etl.listeners {
select {
case l2Etl.listeners[i] <- struct{}{}:
default:
// do nothing if the listener hasn't picked
// up the previous notif
}
}
return nil
}
// Notify returns a channel that'll receive a value every time new data has
// been persisted by the L2ETL
func (l2Etl *L2ETL) Notify() <-chan interface{} {
receiver := make(chan interface{})
l2Etl.mu.Lock()
defer l2Etl.mu.Unlock()
l2Etl.listeners = append(l2Etl.listeners, receiver)
return receiver
}
......@@ -223,7 +223,7 @@ func (ix *Indexer) initL2ETL(chainConfig config.ChainConfig) error {
func (ix *Indexer) initBridgeProcessor(chainConfig config.ChainConfig) error {
bridgeProcessor, err := processors.NewBridgeProcessor(
ix.log, ix.DB, bridge.NewMetrics(ix.metricsRegistry), ix.L1ETL, chainConfig, ix.shutdown)
ix.log, ix.DB, bridge.NewMetrics(ix.metricsRegistry), ix.L1ETL, ix.L2ETL, chainConfig, ix.shutdown)
if err != nil {
return err
}
......
This diff is collapsed.
......@@ -14,7 +14,9 @@ var (
)
type L1Metricer interface {
RecordLatestIndexedL1Height(height *big.Int)
RecordL1Interval() (done func(err error))
RecordL1LatestHeight(height *big.Int)
RecordL1LatestFinalizedHeight(height *big.Int)
RecordL1TransactionDeposits(size int, mintedETH float64)
RecordL1ProvenWithdrawals(size int)
......@@ -28,7 +30,9 @@ type L1Metricer interface {
}
type L2Metricer interface {
RecordLatestIndexedL2Height(height *big.Int)
RecordL2Interval() (done func(err error))
RecordL2LatestHeight(height *big.Int)
RecordL2LatestFinalizedHeight(height *big.Int)
RecordL2TransactionWithdrawals(size int, withdrawnETH float64)
......@@ -42,17 +46,14 @@ type L2Metricer interface {
type Metricer interface {
L1Metricer
L2Metricer
RecordInterval() (done func(err error))
}
type bridgeMetrics struct {
intervalTick prometheus.Counter
intervalDuration prometheus.Histogram
intervalFailures prometheus.Counter
latestHeight *prometheus.GaugeVec
latestL1Height prometheus.Gauge
latestL2Height prometheus.Gauge
intervalTick *prometheus.CounterVec
intervalDuration *prometheus.HistogramVec
intervalFailures *prometheus.CounterVec
txDeposits prometheus.Counter
txMintedETH prometheus.Counter
......@@ -71,32 +72,35 @@ type bridgeMetrics struct {
func NewMetrics(registry *prometheus.Registry) Metricer {
factory := metrics.With(registry)
return &bridgeMetrics{
intervalTick: factory.NewCounter(prometheus.CounterOpts{
intervalTick: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "intervals_total",
Help: "number of times processing loop has run",
}, []string{
"chain",
}),
intervalDuration: factory.NewHistogram(prometheus.HistogramOpts{
intervalDuration: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Name: "interval_seconds",
Help: "duration elapsed in the processing loop",
}, []string{
"chain",
}),
intervalFailures: factory.NewCounter(prometheus.CounterOpts{
intervalFailures: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "interval_failures_total",
Help: "number of failures encountered",
}, []string{
"chain",
}),
latestL1Height: factory.NewGauge(prometheus.GaugeOpts{
latestHeight: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: "l1",
Name: "height",
Help: "the latest processed l1 block height",
}),
latestL2Height: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: "l2",
Name: "height",
Help: "the latest processed l2 block height",
}, []string{
"chain",
"kind",
}),
txDeposits: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
......@@ -161,21 +165,25 @@ func NewMetrics(registry *prometheus.Registry) Metricer {
}
}
func (m *bridgeMetrics) RecordInterval() func(error) {
m.intervalTick.Inc()
timer := prometheus.NewTimer(m.intervalDuration)
// L1Metricer
func (m *bridgeMetrics) RecordL1Interval() func(error) {
m.intervalTick.WithLabelValues("l1").Inc()
timer := prometheus.NewTimer(m.intervalDuration.WithLabelValues("l1"))
return func(err error) {
timer.ObserveDuration()
if err != nil {
m.intervalFailures.Inc()
m.intervalFailures.WithLabelValues("l1").Inc()
}
}
}
// L1Metricer
func (m *bridgeMetrics) RecordL1LatestHeight(height *big.Int) {
m.latestHeight.WithLabelValues("l1", "initiated").Set(float64(height.Uint64()))
}
func (m *bridgeMetrics) RecordLatestIndexedL1Height(height *big.Int) {
m.latestL1Height.Set(float64(height.Uint64()))
func (m *bridgeMetrics) RecordL1LatestFinalizedHeight(height *big.Int) {
m.latestHeight.WithLabelValues("l1", "finalized").Set(float64(height.Uint64()))
}
func (m *bridgeMetrics) RecordL1TransactionDeposits(size int, mintedETH float64) {
......@@ -209,8 +217,23 @@ func (m *bridgeMetrics) RecordL1FinalizedBridgeTransfers(tokenAddr common.Addres
// L2Metricer
func (m *bridgeMetrics) RecordLatestIndexedL2Height(height *big.Int) {
m.latestL2Height.Set(float64(height.Uint64()))
func (m *bridgeMetrics) RecordL2Interval() func(error) {
m.intervalTick.WithLabelValues("l2").Inc()
timer := prometheus.NewTimer(m.intervalDuration.WithLabelValues("l2"))
return func(err error) {
timer.ObserveDuration()
if err != nil {
m.intervalFailures.WithLabelValues("l2").Inc()
}
}
}
func (m *bridgeMetrics) RecordL2LatestHeight(height *big.Int) {
m.latestHeight.WithLabelValues("l2", "initiated").Set(float64(height.Uint64()))
}
func (m *bridgeMetrics) RecordL2LatestFinalizedHeight(height *big.Int) {
m.latestHeight.WithLabelValues("l2", "finalized").Set(float64(height.Uint64()))
}
func (m *bridgeMetrics) RecordL2TransactionWithdrawals(size int, withdrawnETH float64) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment