Commit 26cc56f6 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into aj/atomic-put

parents de92c9f8 c9b69597
...@@ -86,6 +86,9 @@ nuke: clean devnet-clean ...@@ -86,6 +86,9 @@ nuke: clean devnet-clean
.PHONY: nuke .PHONY: nuke
devnet-up: devnet-up:
@if [ ! -e op-program/bin ]; then \
make cannon-prestate; \
fi
$(shell ./ops/scripts/newer-file.sh .devnet/allocs-l1.json ./packages/contracts-bedrock) $(shell ./ops/scripts/newer-file.sh .devnet/allocs-l1.json ./packages/contracts-bedrock)
if [ $(.SHELLSTATUS) -ne 0 ]; then \ if [ $(.SHELLSTATUS) -ne 0 ]; then \
make devnet-allocs; \ make devnet-allocs; \
......
...@@ -33,3 +33,28 @@ TODO add indexer to the optimism devnet compose file (previously removed for bre ...@@ -33,3 +33,28 @@ TODO add indexer to the optimism devnet compose file (previously removed for bre
`docker-compose.dev.yml` is git ignored. Fill in your own docker-compose file here. `docker-compose.dev.yml` is git ignored. Fill in your own docker-compose file here.
## Architecture
![Architectural Diagram](./assets/architecture.png)
The indexer application supports two separate services for collective operation:
**Indexer API** - Provides a lightweight API service that supports paginated lookups for bridge events.
**Indexer Service** - A polling based service that constantly reads and persists OP Stack chain data (i.e, block meta, system contract events, synchronized bridge events) from a L1 and L2 chain.
### Indexer API
TBD
### Indexer Service
![Service Component Diagram](./assets/indexer-service.png)
The indexer service is responsible for polling and processing real-time batches of L1 and L2 chain data. The indexer service is currently composed of the following key components:
- **Poller Routines** - Individually polls the L1/L2 chain for new blocks and OP Stack system contract events.
- **Insertion Routines** - Awaits new batches from the poller routines and inserts them into the database upon retrieval.
- **Bridge Routine** - Polls the database directly for new L1 blocks and bridge events. Upon retrieval, the bridge routine will:
* Process and persist new bridge events
* Synchronize L1 proven/finalized withdrawals with their L2 initialization counterparts
### Database
The indexer service currently supports a Postgres database for storing L1/L2 OP Stack chain data. The most up-to-date database schemas can be found in the `./migrations` directory.
**NOTE:** The indexer service implementation currently does not natively support database migration. Because of this a database must be manually updated to ensure forward compatibility with the latest indexer service implementation.
\ No newline at end of file
...@@ -2,6 +2,7 @@ package database ...@@ -2,6 +2,7 @@ package database
import ( import (
"errors" "errors"
"fmt"
"gorm.io/gorm" "gorm.io/gorm"
...@@ -133,21 +134,31 @@ func (db *bridgeTransfersDB) L1BridgeDepositsByAddress(address common.Address, c ...@@ -133,21 +134,31 @@ func (db *bridgeTransfersDB) L1BridgeDepositsByAddress(address common.Address, c
limit = defaultLimit limit = defaultLimit
} }
cursorClause := ""
if cursor != "" {
sourceHash := common.HexToHash(cursor)
txDeposit := new(L1TransactionDeposit)
result := db.gorm.Model(&L1TransactionDeposit{}).Where(&L1TransactionDeposit{SourceHash: sourceHash}).Take(txDeposit)
if result.Error != nil || errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("unable to find transaction with supplied cursor source hash %s: %w", sourceHash, result.Error)
}
cursorClause = fmt.Sprintf("l1_transaction_deposits.timestamp <= %d", txDeposit.Tx.Timestamp)
}
// TODO join with l1_bridged_tokens and l2_bridged_tokens // TODO join with l1_bridged_tokens and l2_bridged_tokens
ethAddressString := predeploys.LegacyERC20ETHAddr.String() ethAddressString := predeploys.LegacyERC20ETHAddr.String()
// Coalesce l1 transaction deposits that are simply ETH sends // Coalesce l1 transaction deposits that are simply ETH sends
ethTransactionDeposits := db.gorm.Model(&L1TransactionDeposit{}) ethTransactionDeposits := db.gorm.Model(&L1TransactionDeposit{})
ethTransactionDeposits = ethTransactionDeposits.Where(Transaction{FromAddress: address}).Where(`data = '0x' AND amount > 0`) ethTransactionDeposits = ethTransactionDeposits.Where(Transaction{FromAddress: address}).Where("data = '0x' AND amount > 0")
ethTransactionDeposits = ethTransactionDeposits.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = initiated_l1_event_guid") ethTransactionDeposits = ethTransactionDeposits.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = initiated_l1_event_guid")
ethTransactionDeposits = ethTransactionDeposits.Select(` ethTransactionDeposits = ethTransactionDeposits.Select(`
from_address, to_address, amount, data, source_hash AS transaction_source_hash, from_address, to_address, amount, data, source_hash AS transaction_source_hash,
l2_transaction_hash, l1_contract_events.transaction_hash AS l1_transaction_hash, l2_transaction_hash, l1_contract_events.transaction_hash AS l1_transaction_hash,
l1_transaction_deposits.timestamp, NULL AS cross_domain_message_hash, ? AS local_token_address, ? AS remote_token_address`, ethAddressString, ethAddressString) l1_transaction_deposits.timestamp, NULL AS cross_domain_message_hash, ? AS local_token_address, ? AS remote_token_address`, ethAddressString, ethAddressString)
ethTransactionDeposits = ethTransactionDeposits.Order("timestamp DESC").Limit(limit + 1)
if cursor != "" { if cursorClause != "" {
// Probably need to fix this and compare timestamps ethTransactionDeposits = ethTransactionDeposits.Where(cursorClause)
ethTransactionDeposits = ethTransactionDeposits.Where("source_hash < ?", cursor)
} }
depositsQuery := db.gorm.Model(&L1BridgeDeposit{}) depositsQuery := db.gorm.Model(&L1BridgeDeposit{})
...@@ -157,10 +168,9 @@ l1_transaction_deposits.timestamp, NULL AS cross_domain_message_hash, ? AS local ...@@ -157,10 +168,9 @@ l1_transaction_deposits.timestamp, NULL AS cross_domain_message_hash, ? AS local
l1_bridge_deposits.from_address, l1_bridge_deposits.to_address, l1_bridge_deposits.amount, l1_bridge_deposits.data, transaction_source_hash, l1_bridge_deposits.from_address, l1_bridge_deposits.to_address, l1_bridge_deposits.amount, l1_bridge_deposits.data, transaction_source_hash,
l2_transaction_hash, l1_contract_events.transaction_hash AS l1_transaction_hash, l2_transaction_hash, l1_contract_events.transaction_hash AS l1_transaction_hash,
l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, remote_token_address`) l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, remote_token_address`)
depositsQuery = depositsQuery.Order("timestamp DESC").Limit(limit + 1)
if cursor != "" { if cursorClause != "" {
// Probably need to fix this and compare timestamps depositsQuery = depositsQuery.Where(cursorClause)
depositsQuery = depositsQuery.Where("source_hash < ?", cursor)
} }
query := db.gorm.Table("(?) AS deposits", depositsQuery) query := db.gorm.Table("(?) AS deposits", depositsQuery)
...@@ -179,16 +189,11 @@ l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, re ...@@ -179,16 +189,11 @@ l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, re
hasNextPage := false hasNextPage := false
if len(deposits) > limit { if len(deposits) > limit {
hasNextPage = true hasNextPage = true
nextCursor = deposits[limit].L1BridgeDeposit.TransactionSourceHash.String()
deposits = deposits[:limit] deposits = deposits[:limit]
nextCursor = deposits[limit].L1TransactionHash.String()
}
response := &L1BridgeDepositsResponse{
Deposits: deposits,
Cursor: nextCursor,
HasNextPage: hasNextPage,
} }
response := &L1BridgeDepositsResponse{Deposits: deposits, Cursor: nextCursor, HasNextPage: hasNextPage}
return response, nil return response, nil
} }
...@@ -242,6 +247,17 @@ func (db *bridgeTransfersDB) L2BridgeWithdrawalsByAddress(address common.Address ...@@ -242,6 +247,17 @@ func (db *bridgeTransfersDB) L2BridgeWithdrawalsByAddress(address common.Address
limit = defaultLimit limit = defaultLimit
} }
cursorClause := ""
if cursor != "" {
withdrawalHash := common.HexToHash(cursor)
var txWithdrawal L2TransactionWithdrawal
result := db.gorm.Model(&L2TransactionWithdrawal{}).Where(&L2TransactionWithdrawal{WithdrawalHash: withdrawalHash}).Take(&txWithdrawal)
if result.Error != nil || errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("unable to find transaction with supplied cursor withdrawal hash %s: %w", withdrawalHash, result.Error)
}
cursorClause = fmt.Sprintf("l2_transaction_withdrawals.timestamp <= %d", txWithdrawal.Tx.Timestamp)
}
// TODO join with l1_bridged_tokens and l2_bridged_tokens // TODO join with l1_bridged_tokens and l2_bridged_tokens
ethAddressString := predeploys.LegacyERC20ETHAddr.String() ethAddressString := predeploys.LegacyERC20ETHAddr.String()
...@@ -255,10 +271,9 @@ func (db *bridgeTransfersDB) L2BridgeWithdrawalsByAddress(address common.Address ...@@ -255,10 +271,9 @@ func (db *bridgeTransfersDB) L2BridgeWithdrawalsByAddress(address common.Address
from_address, to_address, amount, data, withdrawal_hash AS transaction_withdrawal_hash, from_address, to_address, amount, data, withdrawal_hash AS transaction_withdrawal_hash,
l2_contract_events.transaction_hash AS l2_transaction_hash, proven_l1_events.transaction_hash AS proven_l1_transaction_hash, finalized_l1_events.transaction_hash AS finalized_l1_transaction_hash, l2_contract_events.transaction_hash AS l2_transaction_hash, proven_l1_events.transaction_hash AS proven_l1_transaction_hash, finalized_l1_events.transaction_hash AS finalized_l1_transaction_hash,
l2_transaction_withdrawals.timestamp, NULL AS cross_domain_message_hash, ? AS local_token_address, ? AS remote_token_address`, ethAddressString, ethAddressString) l2_transaction_withdrawals.timestamp, NULL AS cross_domain_message_hash, ? AS local_token_address, ? AS remote_token_address`, ethAddressString, ethAddressString)
ethTransactionWithdrawals = ethTransactionWithdrawals.Order("timestamp DESC").Limit(limit + 1)
if cursor != "" { if cursorClause != "" {
// Probably need to fix this and compare timestamps ethTransactionWithdrawals = ethTransactionWithdrawals.Where(cursorClause)
ethTransactionWithdrawals = ethTransactionWithdrawals.Where("withdrawal_hash < ?", cursor)
} }
withdrawalsQuery := db.gorm.Model(&L2BridgeWithdrawal{}) withdrawalsQuery := db.gorm.Model(&L2BridgeWithdrawal{})
...@@ -270,17 +285,16 @@ l2_transaction_withdrawals.timestamp, NULL AS cross_domain_message_hash, ? AS lo ...@@ -270,17 +285,16 @@ l2_transaction_withdrawals.timestamp, NULL AS cross_domain_message_hash, ? AS lo
l2_bridge_withdrawals.from_address, l2_bridge_withdrawals.to_address, l2_bridge_withdrawals.amount, l2_bridge_withdrawals.data, transaction_withdrawal_hash, l2_bridge_withdrawals.from_address, l2_bridge_withdrawals.to_address, l2_bridge_withdrawals.amount, l2_bridge_withdrawals.data, transaction_withdrawal_hash,
l2_contract_events.transaction_hash AS l2_transaction_hash, proven_l1_events.transaction_hash AS proven_l1_transaction_hash, finalized_l1_events.transaction_hash AS finalized_l1_transaction_hash, l2_contract_events.transaction_hash AS l2_transaction_hash, proven_l1_events.transaction_hash AS proven_l1_transaction_hash, finalized_l1_events.transaction_hash AS finalized_l1_transaction_hash,
l2_bridge_withdrawals.timestamp, cross_domain_message_hash, local_token_address, remote_token_address`) l2_bridge_withdrawals.timestamp, cross_domain_message_hash, local_token_address, remote_token_address`)
withdrawalsQuery = withdrawalsQuery.Order("timestamp DESC").Limit(limit + 1)
if cursor != "" { if cursorClause != "" {
// Probably need to fix this and compare timestamps withdrawalsQuery = withdrawalsQuery.Where(cursorClause)
withdrawalsQuery = withdrawalsQuery.Where("withdrawal_hash < ?", cursor)
} }
query := db.gorm.Table("(?) AS withdrawals", withdrawalsQuery) query := db.gorm.Table("(?) AS withdrawals", withdrawalsQuery)
query = query.Joins("UNION (?)", ethTransactionWithdrawals) query = query.Joins("UNION (?)", ethTransactionWithdrawals)
query = query.Select("*").Order("timestamp DESC").Limit(limit + 1) query = query.Select("*").Order("timestamp DESC").Limit(limit + 1)
withdrawals := []L2BridgeWithdrawalWithTransactionHashes{} withdrawals := []L2BridgeWithdrawalWithTransactionHashes{}
result := query.Scan(&withdrawals) result := query.Find(&withdrawals)
if result.Error != nil { if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) { if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil return nil, nil
...@@ -292,21 +306,10 @@ l2_bridge_withdrawals.timestamp, cross_domain_message_hash, local_token_address, ...@@ -292,21 +306,10 @@ l2_bridge_withdrawals.timestamp, cross_domain_message_hash, local_token_address,
hasNextPage := false hasNextPage := false
if len(withdrawals) > limit { if len(withdrawals) > limit {
hasNextPage = true hasNextPage = true
nextCursor = withdrawals[limit].L2BridgeWithdrawal.TransactionWithdrawalHash.String()
withdrawals = withdrawals[:limit] withdrawals = withdrawals[:limit]
nextCursor = withdrawals[limit].L2TransactionHash.String()
}
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
response := &L2BridgeWithdrawalsResponse{
Withdrawals: withdrawals,
Cursor: nextCursor,
HasNextPage: hasNextPage,
} }
response := &L2BridgeWithdrawalsResponse{Withdrawals: withdrawals, Cursor: nextCursor, HasNextPage: hasNextPage}
return response, nil return response, nil
} }
...@@ -43,9 +43,9 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -43,9 +43,9 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
dbUser := os.Getenv("DB_USER") dbUser := os.Getenv("DB_USER")
dbName := setupTestDatabase(t) dbName := setupTestDatabase(t)
// Replace the handler of the global logger with the testlog // Discard the Global Logger as each component
logger := testlog.Logger(t, log.LvlInfo) // has its own configured logger
log.Root().SetHandler(logger.GetHandler()) log.Root().SetHandler(log.DiscardHandler())
// Rollup System Configuration and Start // Rollup System Configuration and Start
opCfg := op_e2e.DefaultSystemConfig(t) opCfg := op_e2e.DefaultSystemConfig(t)
...@@ -90,7 +90,8 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -90,7 +90,8 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { db.Close() }) t.Cleanup(func() { db.Close() })
indexer, err := indexer.NewIndexer(logger, db, indexerCfg.Chain, indexerCfg.RPCs, indexerCfg.Metrics) indexerLog := testlog.Logger(t, log.LvlInfo).New("role", "indexer")
indexer, err := indexer.NewIndexer(indexerLog, db, indexerCfg.Chain, indexerCfg.RPCs, indexerCfg.Metrics)
require.NoError(t, err) require.NoError(t, err)
indexerCtx, indexerStop := context.WithCancel(context.Background()) indexerCtx, indexerStop := context.WithCancel(context.Background())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment