Commit 9dac9d66 authored by Adrian Sutton's avatar Adrian Sutton

Merge remote-tracking branch 'origin/develop' into aj/remove-todo

parents 3633de4f cd3713e3
...@@ -1224,6 +1224,7 @@ workflows: ...@@ -1224,6 +1224,7 @@ workflows:
target: test-external-geth target: test-external-geth
- bedrock-go-tests: - bedrock-go-tests:
requires: requires:
- cannon-go-lint-and-test
- op-batcher-lint - op-batcher-lint
- op-bootnode-lint - op-bootnode-lint
- op-bindings-lint - op-bindings-lint
......
...@@ -86,6 +86,9 @@ nuke: clean devnet-clean ...@@ -86,6 +86,9 @@ nuke: clean devnet-clean
.PHONY: nuke .PHONY: nuke
devnet-up: devnet-up:
@if [ ! -e op-program/bin ]; then \
make cannon-prestate; \
fi
$(shell ./ops/scripts/newer-file.sh .devnet/allocs-l1.json ./packages/contracts-bedrock) $(shell ./ops/scripts/newer-file.sh .devnet/allocs-l1.json ./packages/contracts-bedrock)
if [ $(.SHELLSTATUS) -ne 0 ]; then \ if [ $(.SHELLSTATUS) -ne 0 ]; then \
make devnet-allocs; \ make devnet-allocs; \
......
...@@ -33,3 +33,28 @@ TODO add indexer to the optimism devnet compose file (previously removed for bre ...@@ -33,3 +33,28 @@ TODO add indexer to the optimism devnet compose file (previously removed for bre
`docker-compose.dev.yml` is git ignored. Fill in your own docker-compose file here. `docker-compose.dev.yml` is git ignored. Fill in your own docker-compose file here.
## Architecture
![Architectural Diagram](./assets/architecture.png)
The indexer application supports two separate services for collective operation:
**Indexer API** - Provides a lightweight API service that supports paginated lookups for bridge events.
**Indexer Service** - A polling based service that constantly reads and persists OP Stack chain data (i.e, block meta, system contract events, synchronized bridge events) from a L1 and L2 chain.
### Indexer API
TBD
### Indexer Service
![Service Component Diagram](./assets/indexer-service.png)
The indexer service is responsible for polling and processing real-time batches of L1 and L2 chain data. The indexer service is currently composed of the following key components:
- **Poller Routines** - Individually polls the L1/L2 chain for new blocks and OP Stack system contract events.
- **Insertion Routines** - Awaits new batches from the poller routines and inserts them into the database upon retrieval.
- **Bridge Routine** - Polls the database directly for new L1 blocks and bridge events. Upon retrieval, the bridge routine will:
* Process and persist new bridge events
* Synchronize L1 proven/finalized withdrawals with their L2 initialization counterparts
### Database
The indexer service currently supports a Postgres database for storing L1/L2 OP Stack chain data. The most up-to-date database schemas can be found in the `./migrations` directory.
**NOTE:** The indexer service implementation currently does not natively support database migration. Because of this a database must be manually updated to ensure forward compatibility with the latest indexer service implementation.
\ No newline at end of file
...@@ -2,7 +2,6 @@ package config ...@@ -2,7 +2,6 @@ package config
import ( import (
"fmt" "fmt"
"math/big"
"os" "os"
"reflect" "reflect"
...@@ -65,10 +64,15 @@ func (c *L1Contracts) AsSlice() ([]common.Address, error) { ...@@ -65,10 +64,15 @@ func (c *L1Contracts) AsSlice() ([]common.Address, error) {
// ChainConfig configures of the chain being indexed // ChainConfig configures of the chain being indexed
type ChainConfig struct { type ChainConfig struct {
// Configure known chains with the l2 chain id // Configure known chains with the l2 chain id
Preset int Preset int
L1Contracts L1Contracts `toml:"l1-contracts"`
// L1StartingHeight is the block height to start indexing from L1Contracts L1Contracts `toml:"l1-contracts"`
L1StartingHeight uint `toml:"l1-starting-height"` L1StartingHeight uint `toml:"l1-starting-height"`
// These configuration options will be removed once
// native reorg handling is implemented
L1ConfirmationDepth uint `toml:"l1-confirmation-depth"`
L2ConfirmationDepth uint `toml:"l2-confirmation-depth"`
L1PollingInterval uint `toml:"l1-polling-interval"` L1PollingInterval uint `toml:"l1-polling-interval"`
L2PollingInterval uint `toml:"l2-polling-interval"` L2PollingInterval uint `toml:"l2-polling-interval"`
...@@ -77,11 +81,6 @@ type ChainConfig struct { ...@@ -77,11 +81,6 @@ type ChainConfig struct {
L2HeaderBufferSize uint `toml:"l2-header-buffer-size"` L2HeaderBufferSize uint `toml:"l2-header-buffer-size"`
} }
// L1StartHeight returns the block height to start indexing from
func (cc *ChainConfig) L1StartHeight() *big.Int {
return big.NewInt(int64(cc.L1StartingHeight))
}
// RPCsConfig configures the RPC urls // RPCsConfig configures the RPC urls
type RPCsConfig struct { type RPCsConfig struct {
L1RPC string `toml:"l1-rpc"` L1RPC string `toml:"l1-rpc"`
......
...@@ -2,6 +2,7 @@ package database ...@@ -2,6 +2,7 @@ package database
import ( import (
"errors" "errors"
"fmt"
"gorm.io/gorm" "gorm.io/gorm"
...@@ -133,21 +134,31 @@ func (db *bridgeTransfersDB) L1BridgeDepositsByAddress(address common.Address, c ...@@ -133,21 +134,31 @@ func (db *bridgeTransfersDB) L1BridgeDepositsByAddress(address common.Address, c
limit = defaultLimit limit = defaultLimit
} }
cursorClause := ""
if cursor != "" {
sourceHash := common.HexToHash(cursor)
txDeposit := new(L1TransactionDeposit)
result := db.gorm.Model(&L1TransactionDeposit{}).Where(&L1TransactionDeposit{SourceHash: sourceHash}).Take(txDeposit)
if result.Error != nil || errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("unable to find transaction with supplied cursor source hash %s: %w", sourceHash, result.Error)
}
cursorClause = fmt.Sprintf("l1_transaction_deposits.timestamp <= %d", txDeposit.Tx.Timestamp)
}
// TODO join with l1_bridged_tokens and l2_bridged_tokens // TODO join with l1_bridged_tokens and l2_bridged_tokens
ethAddressString := predeploys.LegacyERC20ETHAddr.String() ethAddressString := predeploys.LegacyERC20ETHAddr.String()
// Coalesce l1 transaction deposits that are simply ETH sends // Coalesce l1 transaction deposits that are simply ETH sends
ethTransactionDeposits := db.gorm.Model(&L1TransactionDeposit{}) ethTransactionDeposits := db.gorm.Model(&L1TransactionDeposit{})
ethTransactionDeposits = ethTransactionDeposits.Where(Transaction{FromAddress: address}).Where(`data = '0x' AND amount > 0`) ethTransactionDeposits = ethTransactionDeposits.Where(Transaction{FromAddress: address}).Where("data = '0x' AND amount > 0")
ethTransactionDeposits = ethTransactionDeposits.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = initiated_l1_event_guid") ethTransactionDeposits = ethTransactionDeposits.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = initiated_l1_event_guid")
ethTransactionDeposits = ethTransactionDeposits.Select(` ethTransactionDeposits = ethTransactionDeposits.Select(`
from_address, to_address, amount, data, source_hash AS transaction_source_hash, from_address, to_address, amount, data, source_hash AS transaction_source_hash,
l2_transaction_hash, l1_contract_events.transaction_hash AS l1_transaction_hash, l2_transaction_hash, l1_contract_events.transaction_hash AS l1_transaction_hash,
l1_transaction_deposits.timestamp, NULL AS cross_domain_message_hash, ? AS local_token_address, ? AS remote_token_address`, ethAddressString, ethAddressString) l1_transaction_deposits.timestamp, NULL AS cross_domain_message_hash, ? AS local_token_address, ? AS remote_token_address`, ethAddressString, ethAddressString)
ethTransactionDeposits = ethTransactionDeposits.Order("timestamp DESC").Limit(limit + 1)
if cursor != "" { if cursorClause != "" {
// Probably need to fix this and compare timestamps ethTransactionDeposits = ethTransactionDeposits.Where(cursorClause)
ethTransactionDeposits = ethTransactionDeposits.Where("source_hash < ?", cursor)
} }
depositsQuery := db.gorm.Model(&L1BridgeDeposit{}) depositsQuery := db.gorm.Model(&L1BridgeDeposit{})
...@@ -157,17 +168,16 @@ l1_transaction_deposits.timestamp, NULL AS cross_domain_message_hash, ? AS local ...@@ -157,17 +168,16 @@ l1_transaction_deposits.timestamp, NULL AS cross_domain_message_hash, ? AS local
l1_bridge_deposits.from_address, l1_bridge_deposits.to_address, l1_bridge_deposits.amount, l1_bridge_deposits.data, transaction_source_hash, l1_bridge_deposits.from_address, l1_bridge_deposits.to_address, l1_bridge_deposits.amount, l1_bridge_deposits.data, transaction_source_hash,
l2_transaction_hash, l1_contract_events.transaction_hash AS l1_transaction_hash, l2_transaction_hash, l1_contract_events.transaction_hash AS l1_transaction_hash,
l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, remote_token_address`) l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, remote_token_address`)
depositsQuery = depositsQuery.Order("timestamp DESC").Limit(limit + 1)
if cursor != "" { if cursorClause != "" {
// Probably need to fix this and compare timestamps depositsQuery = depositsQuery.Where(cursorClause)
depositsQuery = depositsQuery.Where("source_hash < ?", cursor)
} }
query := db.gorm.Table("(?) AS deposits", depositsQuery) query := db.gorm.Table("(?) AS deposits", depositsQuery)
query = query.Joins("UNION (?)", ethTransactionDeposits) query = query.Joins("UNION (?)", ethTransactionDeposits)
query = query.Select("*").Order("timestamp DESC").Limit(limit + 1) query = query.Select("*").Order("timestamp DESC").Limit(limit + 1)
deposits := []L1BridgeDepositWithTransactionHashes{} deposits := []L1BridgeDepositWithTransactionHashes{}
result := query.Debug().Find(&deposits) result := query.Find(&deposits)
if result.Error != nil { if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) { if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil return nil, nil
...@@ -179,16 +189,11 @@ l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, re ...@@ -179,16 +189,11 @@ l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, re
hasNextPage := false hasNextPage := false
if len(deposits) > limit { if len(deposits) > limit {
hasNextPage = true hasNextPage = true
nextCursor = deposits[limit].L1BridgeDeposit.TransactionSourceHash.String()
deposits = deposits[:limit] deposits = deposits[:limit]
nextCursor = deposits[limit].L1TransactionHash.String()
}
response := &L1BridgeDepositsResponse{
Deposits: deposits,
Cursor: nextCursor,
HasNextPage: hasNextPage,
} }
response := &L1BridgeDepositsResponse{Deposits: deposits, Cursor: nextCursor, HasNextPage: hasNextPage}
return response, nil return response, nil
} }
...@@ -242,6 +247,17 @@ func (db *bridgeTransfersDB) L2BridgeWithdrawalsByAddress(address common.Address ...@@ -242,6 +247,17 @@ func (db *bridgeTransfersDB) L2BridgeWithdrawalsByAddress(address common.Address
limit = defaultLimit limit = defaultLimit
} }
cursorClause := ""
if cursor != "" {
withdrawalHash := common.HexToHash(cursor)
var txWithdrawal L2TransactionWithdrawal
result := db.gorm.Model(&L2TransactionWithdrawal{}).Where(&L2TransactionWithdrawal{WithdrawalHash: withdrawalHash}).Take(&txWithdrawal)
if result.Error != nil || errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("unable to find transaction with supplied cursor withdrawal hash %s: %w", withdrawalHash, result.Error)
}
cursorClause = fmt.Sprintf("l2_transaction_withdrawals.timestamp <= %d", txWithdrawal.Tx.Timestamp)
}
// TODO join with l1_bridged_tokens and l2_bridged_tokens // TODO join with l1_bridged_tokens and l2_bridged_tokens
ethAddressString := predeploys.LegacyERC20ETHAddr.String() ethAddressString := predeploys.LegacyERC20ETHAddr.String()
...@@ -255,10 +271,9 @@ func (db *bridgeTransfersDB) L2BridgeWithdrawalsByAddress(address common.Address ...@@ -255,10 +271,9 @@ func (db *bridgeTransfersDB) L2BridgeWithdrawalsByAddress(address common.Address
from_address, to_address, amount, data, withdrawal_hash AS transaction_withdrawal_hash, from_address, to_address, amount, data, withdrawal_hash AS transaction_withdrawal_hash,
l2_contract_events.transaction_hash AS l2_transaction_hash, proven_l1_events.transaction_hash AS proven_l1_transaction_hash, finalized_l1_events.transaction_hash AS finalized_l1_transaction_hash, l2_contract_events.transaction_hash AS l2_transaction_hash, proven_l1_events.transaction_hash AS proven_l1_transaction_hash, finalized_l1_events.transaction_hash AS finalized_l1_transaction_hash,
l2_transaction_withdrawals.timestamp, NULL AS cross_domain_message_hash, ? AS local_token_address, ? AS remote_token_address`, ethAddressString, ethAddressString) l2_transaction_withdrawals.timestamp, NULL AS cross_domain_message_hash, ? AS local_token_address, ? AS remote_token_address`, ethAddressString, ethAddressString)
ethTransactionWithdrawals = ethTransactionWithdrawals.Order("timestamp DESC").Limit(limit + 1)
if cursor != "" { if cursorClause != "" {
// Probably need to fix this and compare timestamps ethTransactionWithdrawals = ethTransactionWithdrawals.Where(cursorClause)
ethTransactionWithdrawals = ethTransactionWithdrawals.Where("withdrawal_hash < ?", cursor)
} }
withdrawalsQuery := db.gorm.Model(&L2BridgeWithdrawal{}) withdrawalsQuery := db.gorm.Model(&L2BridgeWithdrawal{})
...@@ -270,17 +285,16 @@ l2_transaction_withdrawals.timestamp, NULL AS cross_domain_message_hash, ? AS lo ...@@ -270,17 +285,16 @@ l2_transaction_withdrawals.timestamp, NULL AS cross_domain_message_hash, ? AS lo
l2_bridge_withdrawals.from_address, l2_bridge_withdrawals.to_address, l2_bridge_withdrawals.amount, l2_bridge_withdrawals.data, transaction_withdrawal_hash, l2_bridge_withdrawals.from_address, l2_bridge_withdrawals.to_address, l2_bridge_withdrawals.amount, l2_bridge_withdrawals.data, transaction_withdrawal_hash,
l2_contract_events.transaction_hash AS l2_transaction_hash, proven_l1_events.transaction_hash AS proven_l1_transaction_hash, finalized_l1_events.transaction_hash AS finalized_l1_transaction_hash, l2_contract_events.transaction_hash AS l2_transaction_hash, proven_l1_events.transaction_hash AS proven_l1_transaction_hash, finalized_l1_events.transaction_hash AS finalized_l1_transaction_hash,
l2_bridge_withdrawals.timestamp, cross_domain_message_hash, local_token_address, remote_token_address`) l2_bridge_withdrawals.timestamp, cross_domain_message_hash, local_token_address, remote_token_address`)
withdrawalsQuery = withdrawalsQuery.Order("timestamp DESC").Limit(limit + 1)
if cursor != "" { if cursorClause != "" {
// Probably need to fix this and compare timestamps withdrawalsQuery = withdrawalsQuery.Where(cursorClause)
withdrawalsQuery = withdrawalsQuery.Where("withdrawal_hash < ?", cursor)
} }
query := db.gorm.Table("(?) AS withdrawals", withdrawalsQuery) query := db.gorm.Table("(?) AS withdrawals", withdrawalsQuery)
query = query.Joins("UNION (?)", ethTransactionWithdrawals) query = query.Joins("UNION (?)", ethTransactionWithdrawals)
query = query.Select("*").Order("timestamp DESC").Limit(limit + 1) query = query.Select("*").Order("timestamp DESC").Limit(limit + 1)
withdrawals := []L2BridgeWithdrawalWithTransactionHashes{} withdrawals := []L2BridgeWithdrawalWithTransactionHashes{}
result := query.Scan(&withdrawals) result := query.Find(&withdrawals)
if result.Error != nil { if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) { if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil return nil, nil
...@@ -292,21 +306,10 @@ l2_bridge_withdrawals.timestamp, cross_domain_message_hash, local_token_address, ...@@ -292,21 +306,10 @@ l2_bridge_withdrawals.timestamp, cross_domain_message_hash, local_token_address,
hasNextPage := false hasNextPage := false
if len(withdrawals) > limit { if len(withdrawals) > limit {
hasNextPage = true hasNextPage = true
nextCursor = withdrawals[limit].L2BridgeWithdrawal.TransactionWithdrawalHash.String()
withdrawals = withdrawals[:limit] withdrawals = withdrawals[:limit]
nextCursor = withdrawals[limit].L2TransactionHash.String()
}
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
response := &L2BridgeWithdrawalsResponse{
Withdrawals: withdrawals,
Cursor: nextCursor,
HasNextPage: hasNextPage,
} }
response := &L2BridgeWithdrawalsResponse{Withdrawals: withdrawals, Cursor: nextCursor, HasNextPage: hasNextPage}
return response, nil return response, nil
} }
...@@ -43,9 +43,9 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -43,9 +43,9 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
dbUser := os.Getenv("DB_USER") dbUser := os.Getenv("DB_USER")
dbName := setupTestDatabase(t) dbName := setupTestDatabase(t)
// Replace the handler of the global logger with the testlog // Discard the Global Logger as each component
logger := testlog.Logger(t, log.LvlInfo) // has its own configured logger
log.Root().SetHandler(logger.GetHandler()) log.Root().SetHandler(log.DiscardHandler())
// Rollup System Configuration and Start // Rollup System Configuration and Start
opCfg := op_e2e.DefaultSystemConfig(t) opCfg := op_e2e.DefaultSystemConfig(t)
...@@ -71,8 +71,10 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -71,8 +71,10 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
L2RPC: opSys.EthInstances["sequencer"].HTTPEndpoint(), L2RPC: opSys.EthInstances["sequencer"].HTTPEndpoint(),
}, },
Chain: config.ChainConfig{ Chain: config.ChainConfig{
L1PollingInterval: 1000, L1PollingInterval: uint(opCfg.DeployConfig.L1BlockTime) * 1000,
L2PollingInterval: 1000, L1ConfirmationDepth: 0,
L2PollingInterval: uint(opCfg.DeployConfig.L2BlockTime) * 1000,
L2ConfirmationDepth: 0,
L1Contracts: config.L1Contracts{ L1Contracts: config.L1Contracts{
OptimismPortalProxy: opCfg.L1Deployments.OptimismPortalProxy, OptimismPortalProxy: opCfg.L1Deployments.OptimismPortalProxy,
L2OutputOracleProxy: opCfg.L1Deployments.L2OutputOracleProxy, L2OutputOracleProxy: opCfg.L1Deployments.L2OutputOracleProxy,
...@@ -90,7 +92,8 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite { ...@@ -90,7 +92,8 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { db.Close() }) t.Cleanup(func() { db.Close() })
indexer, err := indexer.NewIndexer(logger, db, indexerCfg.Chain, indexerCfg.RPCs, indexerCfg.Metrics) indexerLog := testlog.Logger(t, log.LvlInfo).New("role", "indexer")
indexer, err := indexer.NewIndexer(indexerLog, db, indexerCfg.Chain, indexerCfg.RPCs, indexerCfg.Metrics)
require.NoError(t, err) require.NoError(t, err)
indexerCtx, indexerStop := context.WithCancel(context.Background()) indexerCtx, indexerStop := context.WithCancel(context.Background())
......
...@@ -10,14 +10,15 @@ import ( ...@@ -10,14 +10,15 @@ import (
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
type Config struct { type Config struct {
LoopIntervalMsec uint LoopIntervalMsec uint
HeaderBufferSize uint HeaderBufferSize uint
StartHeight *big.Int
StartHeight *big.Int
ConfirmationDepth *big.Int
} }
type ETL struct { type ETL struct {
...@@ -28,7 +29,7 @@ type ETL struct { ...@@ -28,7 +29,7 @@ type ETL struct {
headerBufferSize uint64 headerBufferSize uint64
headerTraversal *node.HeaderTraversal headerTraversal *node.HeaderTraversal
ethClient *ethclient.Client ethClient node.EthClient
contracts []common.Address contracts []common.Address
etlBatches chan ETLBatch etlBatches chan ETLBatch
} }
...@@ -103,8 +104,7 @@ func (etl *ETL) processBatch(headers []types.Header) error { ...@@ -103,8 +104,7 @@ func (etl *ETL) processBatch(headers []types.Header) error {
} }
headersWithLog := make(map[common.Hash]bool, len(headers)) headersWithLog := make(map[common.Hash]bool, len(headers))
logFilter := ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts} logs, err := etl.ethClient.FilterLogs(ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts})
logs, err := etl.ethClient.FilterLogs(context.Background(), logFilter)
if err != nil { if err != nil {
batchLog.Info("unable to extract logs", "err", err) batchLog.Info("unable to extract logs", "err", err)
return err return err
......
...@@ -3,6 +3,7 @@ package etl ...@@ -3,6 +3,7 @@ package etl
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
...@@ -16,12 +17,14 @@ import ( ...@@ -16,12 +17,14 @@ import (
type L1ETL struct { type L1ETL struct {
ETL ETL
db *database.DB db *database.DB
mu *sync.Mutex
listeners []chan interface{}
} }
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points // NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height. // depending on the state of the database and the supplied start height.
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metrics, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) { func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
log = log.New("etl", "l1") log = log.New("etl", "l1")
latestHeader, err := db.Blocks.L1LatestBlockHeader() latestHeader, err := db.Blocks.L1LatestBlockHeader()
...@@ -61,14 +64,14 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metrics, clie ...@@ -61,14 +64,14 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metrics, clie
headerBufferSize: uint64(cfg.HeaderBufferSize), headerBufferSize: uint64(cfg.HeaderBufferSize),
log: log, log: log,
metrics: metrics.newMetricer("l1"), metrics: metrics,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader, cfg.ConfirmationDepth),
ethClient: client.GethEthClient(), ethClient: client,
contracts: cSlice, contracts: cSlice,
etlBatches: etlBatches, etlBatches: etlBatches,
} }
return &L1ETL{ETL: etl, db: db}, nil return &L1ETL{ETL: etl, db: db, mu: new(sync.Mutex)}, nil
} }
func (l1Etl *L1ETL) Start(ctx context.Context) error { func (l1Etl *L1ETL) Start(ctx context.Context) error {
...@@ -129,6 +132,29 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error { ...@@ -129,6 +132,29 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
} }
batch.Logger.Info("indexed batch") batch.Logger.Info("indexed batch")
// Notify Listeners
l1Etl.mu.Lock()
for i := range l1Etl.listeners {
select {
case l1Etl.listeners[i] <- struct{}{}:
default:
// do nothing if the listener hasn't picked
// up the previous notif
}
}
l1Etl.mu.Unlock()
} }
} }
} }
// Notify returns a channel that'll receive a value every time new data has
// been persisted by the L1ETL
func (l1Etl *L1ETL) Notify() <-chan interface{} {
receiver := make(chan interface{})
l1Etl.mu.Lock()
defer l1Etl.mu.Unlock()
l1Etl.listeners = append(l1Etl.listeners, receiver)
return receiver
}
...@@ -18,7 +18,7 @@ import ( ...@@ -18,7 +18,7 @@ import (
) )
func Test_L1ETL_Construction(t *testing.T) { func Test_L1ETL_Construction(t *testing.T) {
etlMetrics := NewMetrics(metrics.NewRegistry()) etlMetrics := NewMetrics(metrics.NewRegistry(), "l1")
type testSuite struct { type testSuite struct {
db *database.MockDB db *database.MockDB
......
...@@ -19,7 +19,7 @@ type L2ETL struct { ...@@ -19,7 +19,7 @@ type L2ETL struct {
db *database.DB db *database.DB
} }
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metrics, client node.EthClient) (*L2ETL, error) { func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient) (*L2ETL, error) {
log = log.New("etl", "l2") log = log.New("etl", "l2")
// allow predeploys to be overridable // allow predeploys to be overridable
...@@ -48,9 +48,9 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metrics, clie ...@@ -48,9 +48,9 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metrics, clie
headerBufferSize: uint64(cfg.HeaderBufferSize), headerBufferSize: uint64(cfg.HeaderBufferSize),
log: log, log: log,
metrics: metrics.newMetricer("l2"), metrics: metrics,
headerTraversal: node.NewHeaderTraversal(client, fromHeader), headerTraversal: node.NewHeaderTraversal(client, fromHeader, cfg.ConfirmationDepth),
ethClient: client.GethEthClient(), ethClient: client,
contracts: l2Contracts, contracts: l2Contracts,
etlBatches: etlBatches, etlBatches: etlBatches,
} }
......
...@@ -10,14 +10,8 @@ import ( ...@@ -10,14 +10,8 @@ import (
var ( var (
MetricsNamespace string = "etl" MetricsNamespace string = "etl"
_ Metricer = &metricer{}
) )
type Metrics interface {
newMetricer(etl string) Metricer
}
type Metricer interface { type Metricer interface {
RecordInterval() (done func(err error)) RecordInterval() (done func(err error))
...@@ -34,109 +28,84 @@ type Metricer interface { ...@@ -34,109 +28,84 @@ type Metricer interface {
} }
type etlMetrics struct { type etlMetrics struct {
intervalTick *prometheus.CounterVec intervalTick prometheus.Counter
intervalDuration *prometheus.HistogramVec intervalDuration prometheus.Histogram
batchFailures *prometheus.CounterVec batchFailures prometheus.Counter
batchLatestHeight *prometheus.GaugeVec batchLatestHeight prometheus.Gauge
batchHeaders *prometheus.CounterVec batchHeaders prometheus.Counter
batchLogs *prometheus.CounterVec batchLogs *prometheus.CounterVec
indexedLatestHeight *prometheus.GaugeVec indexedLatestHeight prometheus.Gauge
indexedHeaders *prometheus.CounterVec indexedHeaders prometheus.Counter
indexedLogs *prometheus.CounterVec indexedLogs prometheus.Counter
}
type metricerFactory struct {
metrics *etlMetrics
} }
type metricer struct { func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
etl string
metrics *etlMetrics
}
func NewMetrics(registry *prometheus.Registry) Metrics {
return &metricerFactory{metrics: newMetrics(registry)}
}
func (factory *metricerFactory) newMetricer(etl string) Metricer {
return &metricer{etl, factory.metrics}
}
func newMetrics(registry *prometheus.Registry) *etlMetrics {
factory := metrics.With(registry) factory := metrics.With(registry)
return &etlMetrics{ return &etlMetrics{
intervalTick: factory.NewCounterVec(prometheus.CounterOpts{ intervalTick: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "intervals_total", Name: "intervals_total",
Help: "number of times the etl has run its extraction loop", Help: "number of times the etl has run its extraction loop",
}, []string{
"etl",
}), }),
intervalDuration: factory.NewHistogramVec(prometheus.HistogramOpts{ intervalDuration: factory.NewHistogram(prometheus.HistogramOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "interval_seconds", Name: "interval_seconds",
Help: "duration elapsed for during the processing loop", Help: "duration elapsed for during the processing loop",
}, []string{
"etl",
}), }),
batchFailures: factory.NewCounterVec(prometheus.CounterOpts{ batchFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "failures_total", Name: "failures_total",
Help: "number of times the etl encountered a failure to extract a batch", Help: "number of times the etl encountered a failure to extract a batch",
}, []string{
"etl",
}), }),
batchLatestHeight: factory.NewGaugeVec(prometheus.GaugeOpts{ batchLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "height", Name: "height",
Help: "the latest block height observed by an etl interval", Help: "the latest block height observed by an etl interval",
}, []string{
"etl",
}), }),
batchHeaders: factory.NewCounterVec(prometheus.CounterOpts{ batchHeaders: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "headers_total", Name: "headers_total",
Help: "number of headers observed by the etl", Help: "number of headers observed by the etl",
}, []string{
"etl",
}), }),
batchLogs: factory.NewCounterVec(prometheus.CounterOpts{ batchLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "logs_total", Name: "logs_total",
Help: "number of logs observed by the etl", Help: "number of logs observed by the etl",
}, []string{ }, []string{
"etl",
"contract", "contract",
}), }),
indexedLatestHeight: factory.NewGaugeVec(prometheus.GaugeOpts{ indexedLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "indexed_height", Name: "indexed_height",
Help: "the latest block height indexed into the database", Help: "the latest block height indexed into the database",
}, []string{
"etl",
}), }),
indexedHeaders: factory.NewCounterVec(prometheus.CounterOpts{ indexedHeaders: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "indexed_headers_total", Name: "indexed_headers_total",
Help: "number of headers indexed by the etl", Help: "number of headers indexed by the etl",
}, []string{
"etl",
}), }),
indexedLogs: factory.NewCounterVec(prometheus.CounterOpts{ indexedLogs: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace, Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "indexed_logs_total", Name: "indexed_logs_total",
Help: "number of logs indexed by the etl", Help: "number of logs indexed by the etl",
}, []string{
"etl",
}), }),
} }
} }
func (m *metricer) RecordInterval() func(error) { func (m *etlMetrics) RecordInterval() func(error) {
m.metrics.intervalTick.WithLabelValues(m.etl).Inc() m.intervalTick.Inc()
timer := prometheus.NewTimer(m.metrics.intervalDuration.WithLabelValues(m.etl)) timer := prometheus.NewTimer(m.intervalDuration)
return func(err error) { return func(err error) {
if err != nil { if err != nil {
m.RecordBatchFailure() m.RecordBatchFailure()
...@@ -146,30 +115,30 @@ func (m *metricer) RecordInterval() func(error) { ...@@ -146,30 +115,30 @@ func (m *metricer) RecordInterval() func(error) {
} }
} }
func (m *metricer) RecordBatchFailure() { func (m *etlMetrics) RecordBatchFailure() {
m.metrics.batchFailures.WithLabelValues(m.etl).Inc() m.batchFailures.Inc()
} }
func (m *metricer) RecordBatchLatestHeight(height *big.Int) { func (m *etlMetrics) RecordBatchLatestHeight(height *big.Int) {
m.metrics.batchLatestHeight.WithLabelValues(m.etl).Set(float64(height.Uint64())) m.batchLatestHeight.Set(float64(height.Uint64()))
} }
func (m *metricer) RecordBatchHeaders(size int) { func (m *etlMetrics) RecordBatchHeaders(size int) {
m.metrics.batchHeaders.WithLabelValues(m.etl).Add(float64(size)) m.batchHeaders.Add(float64(size))
} }
func (m *metricer) RecordBatchLog(contractAddress common.Address) { func (m *etlMetrics) RecordBatchLog(contractAddress common.Address) {
m.metrics.batchLogs.WithLabelValues(m.etl, contractAddress.String()).Inc() m.batchLogs.WithLabelValues(contractAddress.String()).Inc()
} }
func (m *metricer) RecordIndexedLatestHeight(height *big.Int) { func (m *etlMetrics) RecordIndexedLatestHeight(height *big.Int) {
m.metrics.indexedLatestHeight.WithLabelValues(m.etl).Set(float64(height.Uint64())) m.indexedLatestHeight.Set(float64(height.Uint64()))
} }
func (m *metricer) RecordIndexedHeaders(size int) { func (m *etlMetrics) RecordIndexedHeaders(size int) {
m.metrics.indexedHeaders.WithLabelValues(m.etl).Add(float64(size)) m.indexedHeaders.Add(float64(size))
} }
func (m *metricer) RecordIndexedLogs(size int) { func (m *etlMetrics) RecordIndexedLogs(size int) {
m.metrics.indexedLogs.WithLabelValues(m.etl).Add(float64(size)) m.indexedLogs.Add(float64(size))
} }
...@@ -3,6 +3,7 @@ package indexer ...@@ -3,6 +3,7 @@ package indexer
import ( import (
"context" "context"
"fmt" "fmt"
"math/big"
"runtime/debug" "runtime/debug"
"sync" "sync"
...@@ -26,41 +27,48 @@ type Indexer struct { ...@@ -26,41 +27,48 @@ type Indexer struct {
metricsConfig config.MetricsConfig metricsConfig config.MetricsConfig
metricsRegistry *prometheus.Registry metricsRegistry *prometheus.Registry
L1ETL *etl.L1ETL L1ETL *etl.L1ETL
L2ETL *etl.L2ETL L2ETL *etl.L2ETL
BridgeProcessor *processors.BridgeProcessor BridgeProcessor *processors.BridgeProcessor
} }
// NewIndexer initializes an instance of the Indexer // NewIndexer initializes an instance of the Indexer
func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConfig, rpcsConfig config.RPCsConfig, metricsConfig config.MetricsConfig) (*Indexer, error) { func NewIndexer(logger log.Logger, db *database.DB, chainConfig config.ChainConfig, rpcsConfig config.RPCsConfig, metricsConfig config.MetricsConfig) (*Indexer, error) {
metricsRegistry := metrics.NewRegistry() metricsRegistry := metrics.NewRegistry()
etlMetrics := etl.NewMetrics(metricsRegistry)
// L1 // L1
l1EthClient, err := node.DialEthClient(rpcsConfig.L1RPC) l1EthClient, err := node.DialEthClient(rpcsConfig.L1RPC, node.NewMetrics(metricsRegistry, "l1"))
if err != nil { if err != nil {
return nil, err return nil, err
} }
l1Cfg := etl.Config{LoopIntervalMsec: chainConfig.L1PollingInterval, HeaderBufferSize: chainConfig.L1HeaderBufferSize, StartHeight: chainConfig.L1StartHeight()} l1Cfg := etl.Config{
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, etlMetrics, l1EthClient, chainConfig.L1Contracts) LoopIntervalMsec: chainConfig.L1PollingInterval,
HeaderBufferSize: chainConfig.L1HeaderBufferSize,
ConfirmationDepth: big.NewInt(int64(chainConfig.L1ConfirmationDepth)),
StartHeight: big.NewInt(int64(chainConfig.L1StartingHeight)),
}
l1Etl, err := etl.NewL1ETL(l1Cfg, logger, db, etl.NewMetrics(metricsRegistry, "l1"), l1EthClient, chainConfig.L1Contracts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// L2 (defaults to predeploy contracts) // L2 (defaults to predeploy contracts)
l2EthClient, err := node.DialEthClient(rpcsConfig.L2RPC) l2EthClient, err := node.DialEthClient(rpcsConfig.L2RPC, node.NewMetrics(metricsRegistry, "l2"))
if err != nil { if err != nil {
return nil, err return nil, err
} }
l2Cfg := etl.Config{LoopIntervalMsec: chainConfig.L2PollingInterval, HeaderBufferSize: chainConfig.L2HeaderBufferSize} l2Cfg := etl.Config{
l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, etlMetrics, l2EthClient) LoopIntervalMsec: chainConfig.L2PollingInterval,
HeaderBufferSize: chainConfig.L2HeaderBufferSize,
ConfirmationDepth: big.NewInt(int64(chainConfig.L2ConfirmationDepth)),
}
l2Etl, err := etl.NewL2ETL(l2Cfg, logger, db, etl.NewMetrics(metricsRegistry, "l2"), l2EthClient)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Bridge // Bridge
bridgeProcessor, err := processors.NewBridgeProcessor(logger, db, chainConfig) bridgeProcessor, err := processors.NewBridgeProcessor(logger, db, l1Etl, chainConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
# Chain configures l1 chain addresses # Chain configures l1 chain addresses
# Can configure them manually or use a preset l2 ChainId for known chains including OP Mainnet, OP Goerli, Base, Base Goerli, Zora, and Zora goerli # Can configure them manually or use a preset l2 ChainId for known chains including OP Mainnet, OP Goerli, Base, Base Goerli, Zora, and Zora goerli
[chain] [chain]
# OP Goerli
preset = 420
# L1 Config
l1-polling-interval = 0 l1-polling-interval = 0
l1-header-buffer-size = 0 l1-header-buffer-size = 0
l1-confirmation-depth = 0
l1-starting-height = 0
# L2 Config
l2-polling-interval = 0 l2-polling-interval = 0
l2-header-buffer-size = 0 l2-header-buffer-size = 0
l2-confirmation-depth = 0
# OP Goerli
preset = 420
l1-starting-height = 0
[rpcs] [rpcs]
l1-rpc = "${INDEXER_RPC_URL_L1}" l1-rpc = "${INDEXER_RPC_URL_L1}"
......
...@@ -7,10 +7,10 @@ import ( ...@@ -7,10 +7,10 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
...@@ -25,23 +25,19 @@ const ( ...@@ -25,23 +25,19 @@ const (
) )
type EthClient interface { type EthClient interface {
FinalizedBlockHeight() (*big.Int, error)
BlockHeaderByNumber(*big.Int) (*types.Header, error) BlockHeaderByNumber(*big.Int) (*types.Header, error)
BlockHeaderByHash(common.Hash) (*types.Header, error) BlockHeaderByHash(common.Hash) (*types.Header, error)
BlockHeadersByRange(*big.Int, *big.Int) ([]types.Header, error) BlockHeadersByRange(*big.Int, *big.Int) ([]types.Header, error)
StorageHash(common.Address, *big.Int) (common.Hash, error) StorageHash(common.Address, *big.Int) (common.Hash, error)
FilterLogs(ethereum.FilterQuery) ([]types.Log, error)
GethRpcClient() *rpc.Client
GethEthClient() *ethclient.Client
} }
type client struct { type client struct {
rpcClient *rpc.Client rpc RPC
} }
func DialEthClient(rpcUrl string) (EthClient, error) { func DialEthClient(rpcUrl string, metrics Metricer) (EthClient, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultDialTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultDialTimeout)
defer cancel() defer cancel()
...@@ -50,46 +46,21 @@ func DialEthClient(rpcUrl string) (EthClient, error) { ...@@ -50,46 +46,21 @@ func DialEthClient(rpcUrl string) (EthClient, error) {
return nil, err return nil, err
} }
client := &client{rpcClient: rpcClient} client := &client{rpc: newRPC(rpcClient, metrics)}
return client, nil return client, nil
} }
func NewEthClient(rpcClient *rpc.Client) EthClient {
return &client{rpcClient}
}
func (c *client) GethRpcClient() *rpc.Client {
return c.rpcClient
}
func (c *client) GethEthClient() *ethclient.Client {
return ethclient.NewClient(c.GethRpcClient())
}
// FinalizedBlockHeight retrieves the latest block height in a finalized state
func (c *client) FinalizedBlockHeight() (*big.Int, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
// Local devnet is having issues with the "finalized" block tag. Switch to "latest"
// to iterate faster locally but this needs to be updated
header := new(types.Header)
err := c.rpcClient.CallContext(ctxwt, header, "eth_getBlockByNumber", "latest", false)
if err != nil {
return nil, err
}
return header.Number, nil
}
// BlockHeaderByHash retrieves the block header attributed to the supplied hash // BlockHeaderByHash retrieves the block header attributed to the supplied hash
func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) { func (c *client) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel() defer cancel()
header, err := ethclient.NewClient(c.rpcClient).HeaderByHash(ctxwt, hash) var header *types.Header
err := c.rpc.CallContext(ctxwt, &header, "eth_getBlockByHash", hash, false)
if err != nil { if err != nil {
return nil, err return nil, err
} else if header == nil {
return nil, ethereum.NotFound
} }
// sanity check on the data returned // sanity check on the data returned
...@@ -105,9 +76,12 @@ func (c *client) BlockHeaderByNumber(number *big.Int) (*types.Header, error) { ...@@ -105,9 +76,12 @@ func (c *client) BlockHeaderByNumber(number *big.Int) (*types.Header, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel() defer cancel()
header, err := ethclient.NewClient(c.rpcClient).HeaderByNumber(ctxwt, number) var header *types.Header
err := c.rpc.CallContext(ctxwt, &header, "eth_getBlockByNumber", toBlockNumArg(number), false)
if err != nil { if err != nil {
return nil, err return nil, err
} else if header == nil {
return nil, ethereum.NotFound
} }
return header, nil return header, nil
...@@ -117,6 +91,15 @@ func (c *client) BlockHeaderByNumber(number *big.Int) (*types.Header, error) { ...@@ -117,6 +91,15 @@ func (c *client) BlockHeaderByNumber(number *big.Int) (*types.Header, error) {
// are placed on the range such as blocks in the "latest", "safe" or "finalized" states. If the specified // are placed on the range such as blocks in the "latest", "safe" or "finalized" states. If the specified
// range is too large, `endHeight > latest`, the resulting list is truncated to the available headers // range is too large, `endHeight > latest`, the resulting list is truncated to the available headers
func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Header, error) { func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Header, error) {
// avoid the batch call if there's no range
if startHeight.Cmp(endHeight) == 0 {
header, err := c.BlockHeaderByNumber(startHeight)
if err != nil {
return nil, err
}
return []types.Header{*header}, nil
}
count := new(big.Int).Sub(endHeight, startHeight).Uint64() + 1 count := new(big.Int).Sub(endHeight, startHeight).Uint64() + 1
batchElems := make([]rpc.BatchElem, count) batchElems := make([]rpc.BatchElem, count)
for i := uint64(0); i < count; i++ { for i := uint64(0); i < count; i++ {
...@@ -131,7 +114,7 @@ func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.H ...@@ -131,7 +114,7 @@ func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.H
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel() defer cancel()
err := c.rpcClient.BatchCallContext(ctxwt, batchElems) err := c.rpc.BatchCallContext(ctxwt, batchElems)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -170,7 +153,7 @@ func (c *client) StorageHash(address common.Address, blockNumber *big.Int) (comm ...@@ -170,7 +153,7 @@ func (c *client) StorageHash(address common.Address, blockNumber *big.Int) (comm
defer cancel() defer cancel()
proof := struct{ StorageHash common.Hash }{} proof := struct{ StorageHash common.Hash }{}
err := c.rpcClient.CallContext(ctxwt, &proof, "eth_getProof", address, nil, toBlockNumArg(blockNumber)) err := c.rpc.CallContext(ctxwt, &proof, "eth_getProof", address, nil, toBlockNumArg(blockNumber))
if err != nil { if err != nil {
return common.Hash{}, err return common.Hash{}, err
} }
...@@ -178,19 +161,87 @@ func (c *client) StorageHash(address common.Address, blockNumber *big.Int) (comm ...@@ -178,19 +161,87 @@ func (c *client) StorageHash(address common.Address, blockNumber *big.Int) (comm
return proof.StorageHash, nil return proof.StorageHash, nil
} }
// FilterLogs returns logs that fit the query parameters
func (c *client) FilterLogs(query ethereum.FilterQuery) ([]types.Log, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
var result []types.Log
arg, err := toFilterArg(query)
if err != nil {
return nil, err
}
err = c.rpc.CallContext(ctxwt, &result, "eth_getLogs", arg)
return result, err
}
// Modeled off op-node/client.go. We can refactor this once the client/metrics portion
// of op-node/client has been generalized
type RPC interface {
Close()
CallContext(ctx context.Context, result any, method string, args ...any) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
}
type rpcClient struct {
rpc *rpc.Client
metrics Metricer
}
func newRPC(client *rpc.Client, metrics Metricer) RPC {
return &rpcClient{client, metrics}
}
func (c *rpcClient) Close() {
c.rpc.Close()
}
func (c *rpcClient) CallContext(ctx context.Context, result any, method string, args ...any) error {
record := c.metrics.RecordRPCClientRequest(method)
err := c.rpc.CallContext(ctx, result, method, args...)
record(err)
return err
}
func (c *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
record := c.metrics.RecordRPCClientBatchRequest(b)
err := c.rpc.BatchCallContext(ctx, b)
record(err)
return err
}
// Needed private utils from geth
func toBlockNumArg(number *big.Int) string { func toBlockNumArg(number *big.Int) string {
if number == nil { if number == nil {
return "latest" return "latest"
} else if number.Sign() >= 0 { }
if number.Sign() >= 0 {
return hexutil.EncodeBig(number) return hexutil.EncodeBig(number)
} }
// It's negative. // It's negative.
if number.IsInt64() { return rpc.BlockNumber(number.Int64()).String()
tag, _ := rpc.BlockNumber(number.Int64()).MarshalText() }
return string(tag)
}
// It's negative and large, which is invalid. func toFilterArg(q ethereum.FilterQuery) (interface{}, error) {
return fmt.Sprintf("<invalid %d>", number) arg := map[string]interface{}{
"address": q.Addresses,
"topics": q.Topics,
}
if q.BlockHash != nil {
arg["blockHash"] = *q.BlockHash
if q.FromBlock != nil || q.ToBlock != nil {
return nil, errors.New("cannot specify both BlockHash and FromBlock/ToBlock")
}
} else {
if q.FromBlock == nil {
arg["fromBlock"] = "0x0"
} else {
arg["fromBlock"] = toBlockNumArg(q.FromBlock)
}
arg["toBlock"] = toBlockNumArg(q.ToBlock)
}
return arg, nil
} }
...@@ -2,6 +2,7 @@ package node ...@@ -2,6 +2,7 @@ package node
import ( import (
"errors" "errors"
"fmt"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -13,15 +14,16 @@ var ( ...@@ -13,15 +14,16 @@ var (
) )
type HeaderTraversal struct { type HeaderTraversal struct {
ethClient EthClient ethClient EthClient
lastHeader *types.Header
lastHeader *types.Header
blockConfirmationDepth *big.Int
} }
// NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client. // NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client.
// The HeaderTraversal will start fetching blocks starting from the supplied header unless // The HeaderTraversal will start fetching blocks starting from the supplied header unless nil, indicating genesis.
// nil, indicating genesis. func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header, confDepth *big.Int) *HeaderTraversal {
func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header) *HeaderTraversal { return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader, blockConfirmationDepth: confDepth}
return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader}
} }
// LastHeader returns the last header that was fetched by the HeaderTraversal // LastHeader returns the last header that was fetched by the HeaderTraversal
...@@ -33,13 +35,19 @@ func (f *HeaderTraversal) LastHeader() *types.Header { ...@@ -33,13 +35,19 @@ func (f *HeaderTraversal) LastHeader() *types.Header {
// NextFinalizedHeaders retrives the next set of headers that have been // NextFinalizedHeaders retrives the next set of headers that have been
// marked as finalized by the connected client, bounded by the supplied size // marked as finalized by the connected client, bounded by the supplied size
func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) { func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) {
finalizedBlockHeight, err := f.ethClient.FinalizedBlockHeight() latestBlockHeader, err := f.ethClient.BlockHeaderByNumber(nil)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("unable to query latest block: %w", err)
}
endHeight := new(big.Int).Sub(latestBlockHeader.Number, f.blockConfirmationDepth)
if endHeight.Sign() < 0 {
// No blocks with the provided confirmation depth available
return nil, nil
} }
if f.lastHeader != nil { if f.lastHeader != nil {
cmp := f.lastHeader.Number.Cmp(finalizedBlockHeight) cmp := f.lastHeader.Number.Cmp(endHeight)
if cmp == 0 { if cmp == 0 {
return nil, nil return nil, nil
} else if cmp > 0 { } else if cmp > 0 {
...@@ -52,10 +60,10 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, ...@@ -52,10 +60,10 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
nextHeight = new(big.Int).Add(f.lastHeader.Number, bigOne) nextHeight = new(big.Int).Add(f.lastHeader.Number, bigOne)
} }
endHeight := clampBigInt(nextHeight, finalizedBlockHeight, maxSize) endHeight = clampBigInt(nextHeight, endHeight, maxSize)
headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight) headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("error querying blocks by range: %w", err)
} }
numHeaders := len(headers) numHeaders := len(headers)
......
...@@ -37,10 +37,10 @@ func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) { ...@@ -37,10 +37,10 @@ func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) {
// start from block 10 as the latest fetched block // start from block 10 as the latest fetched block
lastHeader := &types.Header{Number: big.NewInt(10)} lastHeader := &types.Header{Number: big.NewInt(10)}
headerTraversal := NewHeaderTraversal(client, lastHeader) headerTraversal := NewHeaderTraversal(client, lastHeader, bigZero)
// no new headers when matched with head // no new headers when matched with head
client.On("FinalizedBlockHeight").Return(big.NewInt(10), nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(lastHeader, nil)
headers, err := headerTraversal.NextFinalizedHeaders(100) headers, err := headerTraversal.NextFinalizedHeaders(100)
require.NoError(t, err) require.NoError(t, err)
require.Empty(t, headers) require.Empty(t, headers)
...@@ -50,11 +50,11 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) { ...@@ -50,11 +50,11 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
headerTraversal := NewHeaderTraversal(client, nil) headerTraversal := NewHeaderTraversal(client, nil, bigZero)
// blocks [0..4] // blocks [0..4]
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(0)), mock.MatchedBy(BigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(0)), mock.MatchedBy(BigIntMatcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5) headers, err := headerTraversal.NextFinalizedHeaders(5)
require.NoError(t, err) require.NoError(t, err)
...@@ -62,7 +62,7 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) { ...@@ -62,7 +62,7 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
// blocks [5..9] // blocks [5..9]
headers = makeHeaders(5, &headers[len(headers)-1]) headers = makeHeaders(5, &headers[len(headers)-1])
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil)
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(5)), mock.MatchedBy(BigIntMatcher(9))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(5)), mock.MatchedBy(BigIntMatcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5) headers, err = headerTraversal.NextFinalizedHeaders(5)
require.NoError(t, err) require.NoError(t, err)
...@@ -73,10 +73,10 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) { ...@@ -73,10 +73,10 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
headerTraversal := NewHeaderTraversal(client, nil) headerTraversal := NewHeaderTraversal(client, nil, bigZero)
// 100 "available" headers // 100 "available" headers
client.On("FinalizedBlockHeight").Return(big.NewInt(100), nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(100)}, nil)
// clamped by the supplied size // clamped by the supplied size
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
...@@ -97,11 +97,11 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) { ...@@ -97,11 +97,11 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
headerTraversal := NewHeaderTraversal(client, nil) headerTraversal := NewHeaderTraversal(client, nil, bigZero)
// blocks [0..4] // blocks [0..4]
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(0)), mock.MatchedBy(BigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(0)), mock.MatchedBy(BigIntMatcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5) headers, err := headerTraversal.NextFinalizedHeaders(5)
require.NoError(t, err) require.NoError(t, err)
...@@ -109,7 +109,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) { ...@@ -109,7 +109,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
// blocks [5..9]. Next batch is not chained correctly (starts again from genesis) // blocks [5..9]. Next batch is not chained correctly (starts again from genesis)
headers = makeHeaders(5, nil) headers = makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(9)}, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(5)), mock.MatchedBy(BigIntMatcher(9))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(5)), mock.MatchedBy(BigIntMatcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5) headers, err = headerTraversal.NextFinalizedHeaders(5)
require.Nil(t, headers) require.Nil(t, headers)
......
package node
import (
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
"github.com/prometheus/client_golang/prometheus"
)
var (
MetricsNamespace = "rpc"
batchMethod = "<batch>"
)
type Metricer interface {
RecordRPCClientRequest(method string) func(err error)
RecordRPCClientBatchRequest(b []rpc.BatchElem) func(err error)
}
type clientMetrics struct {
rpcClientRequestsTotal *prometheus.CounterVec
rpcClientRequestDurationSeconds *prometheus.HistogramVec
rpcClientResponsesTotal *prometheus.CounterVec
}
func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
factory := metrics.With(registry)
return &clientMetrics{
rpcClientRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "requests_total",
Help: "Total RPC requests initiated by the RPC client",
}, []string{
"method",
}),
rpcClientRequestDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "request_duration_seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
Help: "Histogram of RPC client request durations",
}, []string{
"method",
}),
rpcClientResponsesTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "responses_total",
Help: "Total RPC request responses received by the RPC client",
}, []string{
"method",
"error",
}),
}
}
func (m *clientMetrics) RecordRPCClientRequest(method string) func(err error) {
m.rpcClientRequestsTotal.WithLabelValues(method).Inc()
timer := prometheus.NewTimer(m.rpcClientRequestDurationSeconds.WithLabelValues(method))
return func(err error) {
m.recordRPCClientResponse(method, err)
timer.ObserveDuration()
}
}
func (m *clientMetrics) RecordRPCClientBatchRequest(b []rpc.BatchElem) func(err error) {
m.rpcClientRequestsTotal.WithLabelValues(batchMethod).Add(float64(len(b)))
for _, elem := range b {
m.rpcClientRequestsTotal.WithLabelValues(elem.Method).Inc()
}
timer := prometheus.NewTimer(m.rpcClientRequestDurationSeconds.WithLabelValues(batchMethod))
return func(err error) {
m.recordRPCClientResponse(batchMethod, err)
timer.ObserveDuration()
// Record errors for individual requests
for _, elem := range b {
m.recordRPCClientResponse(elem.Method, elem.Error)
}
}
}
func (m *clientMetrics) recordRPCClientResponse(method string, err error) {
var errStr string
var rpcErr rpc.Error
var httpErr rpc.HTTPError
if err == nil {
errStr = "<nil>"
} else if errors.As(err, &rpcErr) {
errStr = fmt.Sprintf("rpc_%d", rpcErr.ErrorCode())
} else if errors.As(err, &httpErr) {
errStr = fmt.Sprintf("http_%d", httpErr.StatusCode)
} else if errors.Is(err, ethereum.NotFound) {
errStr = "<not found>"
} else {
errStr = "<unknown>"
}
m.rpcClientResponsesTotal.WithLabelValues(method, errStr).Inc()
}
...@@ -3,13 +3,14 @@ package node ...@@ -3,13 +3,14 @@ package node
import ( import (
"math/big" "math/big"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
) )
var _ EthClient = &MockEthClient{}
type MockEthClient struct { type MockEthClient struct {
mock.Mock mock.Mock
} }
...@@ -19,9 +20,9 @@ func (m *MockEthClient) BlockHeaderByNumber(number *big.Int) (*types.Header, err ...@@ -19,9 +20,9 @@ func (m *MockEthClient) BlockHeaderByNumber(number *big.Int) (*types.Header, err
return args.Get(0).(*types.Header), args.Error(1) return args.Get(0).(*types.Header), args.Error(1)
} }
func (m *MockEthClient) FinalizedBlockHeight() (*big.Int, error) { func (m *MockEthClient) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
args := m.Called() args := m.Called(hash)
return args.Get(0).(*big.Int), args.Error(1) return args.Get(0).(*types.Header), args.Error(1)
} }
func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]types.Header, error) { func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]types.Header, error) {
...@@ -29,27 +30,12 @@ func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]types.Header, ...@@ -29,27 +30,12 @@ func (m *MockEthClient) BlockHeadersByRange(from, to *big.Int) ([]types.Header,
return args.Get(0).([]types.Header), args.Error(1) return args.Get(0).([]types.Header), args.Error(1)
} }
func (m *MockEthClient) BlockHeaderByHash(hash common.Hash) (*types.Header, error) {
args := m.Called(hash)
return args.Get(0).(*types.Header), args.Error(1)
}
func (m *MockEthClient) StorageHash(address common.Address, blockNumber *big.Int) (common.Hash, error) { func (m *MockEthClient) StorageHash(address common.Address, blockNumber *big.Int) (common.Hash, error) {
args := m.Called(address, blockNumber) args := m.Called(address, blockNumber)
return args.Get(0).(common.Hash), args.Error(1) return args.Get(0).(common.Hash), args.Error(1)
} }
func (m *MockEthClient) GethRpcClient() *rpc.Client { func (m *MockEthClient) FilterLogs(query ethereum.FilterQuery) ([]types.Log, error) {
args := m.Called() args := m.Called(query)
return args.Get(0).(*rpc.Client) return args.Get(0).([]types.Log), args.Error(1)
}
func (m *MockEthClient) GethEthClient() *ethclient.Client {
args := m.Called()
client, ok := args.Get(0).(*ethclient.Client)
if !ok {
return nil
}
return client
} }
...@@ -4,10 +4,10 @@ import ( ...@@ -4,10 +4,10 @@ import (
"context" "context"
"errors" "errors"
"math/big" "math/big"
"time"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/etl"
"github.com/ethereum-optimism/optimism/indexer/processors/bridge" "github.com/ethereum-optimism/optimism/indexer/processors/bridge"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
type BridgeProcessor struct { type BridgeProcessor struct {
log log.Logger log log.Logger
db *database.DB db *database.DB
l1Etl *etl.L1ETL
chainConfig config.ChainConfig chainConfig config.ChainConfig
// NOTE: We'll need this processor to handle for reorgs events. // NOTE: We'll need this processor to handle for reorgs events.
...@@ -25,7 +26,7 @@ type BridgeProcessor struct { ...@@ -25,7 +26,7 @@ type BridgeProcessor struct {
LatestL2Header *types.Header LatestL2Header *types.Header
} }
func NewBridgeProcessor(log log.Logger, db *database.DB, chainConfig config.ChainConfig) (*BridgeProcessor, error) { func NewBridgeProcessor(log log.Logger, db *database.DB, l1Etl *etl.L1ETL, chainConfig config.ChainConfig) (*BridgeProcessor, error) {
log = log.New("processor", "bridge") log = log.New("processor", "bridge")
latestL1Header, err := bridge.L1LatestBridgeEventHeader(db, chainConfig) latestL1Header, err := bridge.L1LatestBridgeEventHeader(db, chainConfig)
...@@ -56,17 +57,12 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, chainConfig config.Chai ...@@ -56,17 +57,12 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, chainConfig config.Chai
log.Info("detected the latest indexed state", "l1_block_number", latestL1Header.Number, "l2_block_number", latestL2Header.Number) log.Info("detected the latest indexed state", "l1_block_number", latestL1Header.Number, "l2_block_number", latestL2Header.Number)
} }
return &BridgeProcessor{log, db, chainConfig, latestL1Header, latestL2Header}, nil return &BridgeProcessor{log, db, l1Etl, chainConfig, latestL1Header, latestL2Header}, nil
} }
func (b *BridgeProcessor) Start(ctx context.Context) error { func (b *BridgeProcessor) Start(ctx context.Context) error {
done := ctx.Done() done := ctx.Done()
// NOTE: This should run on same iterval as L1 ETL rather than as finding the
// lasted epoch is constrained to how much L1 data we've indexed.
pollTicker := time.NewTicker(5 * time.Second)
defer pollTicker.Stop()
// In order to ensure all seen bridge finalization events correspond with seen // In order to ensure all seen bridge finalization events correspond with seen
// bridge initiated events, we establish a shared marker between L1 and L2 when // bridge initiated events, we establish a shared marker between L1 and L2 when
// processing events. // processing events.
...@@ -75,9 +71,7 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -75,9 +71,7 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
// sequencing epoch and corresponding L1 origin that has also been indexed // sequencing epoch and corresponding L1 origin that has also been indexed
// serves as this shared marker. // serves as this shared marker.
// TODOs: l1EtlUpdates := b.l1Etl.Notify()
// 1. Fix Logging. Should be clear if we're looking at L1 or L2 side of things
b.log.Info("starting bridge processor...") b.log.Info("starting bridge processor...")
for { for {
select { select {
...@@ -85,18 +79,18 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -85,18 +79,18 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
b.log.Info("stopping bridge processor") b.log.Info("stopping bridge processor")
return nil return nil
case <-pollTicker.C: case <-l1EtlUpdates:
latestEpoch, err := b.db.Blocks.LatestEpoch() latestEpoch, err := b.db.Blocks.LatestEpoch()
if err != nil { if err != nil {
return err return err
} }
if latestEpoch == nil { if latestEpoch == nil {
if b.LatestL1Header != nil { if b.LatestL1Header != nil {
// Once we have some satte `latestEpoch` should never return nil. // Once we have some state `latestEpoch` should never return nil.
b.log.Error("started with indexed bridge state, but no blocks epochs returned", "latest_bridge_l1_block_number", b.LatestL1Header.Number) b.log.Error("started with indexed bridge state, but no latest epoch returned", "latest_bridge_l1_block_number", b.LatestL1Header.Number)
return errors.New("started with indexed bridge state, but no blocks epochs returned") return errors.New("started with indexed bridge state, but no blocks epochs returned")
} else { } else {
b.log.Warn("no indexed block state. waiting...") b.log.Warn("no indexed epochs. waiting...")
continue continue
} }
} }
...@@ -116,7 +110,7 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -116,7 +110,7 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
} }
batchLog := b.log.New("epoch_start_number", fromL1Height, "epoch_end_number", toL1Height) batchLog := b.log.New("epoch_start_number", fromL1Height, "epoch_end_number", toL1Height)
batchLog.Info("scanning bridge events") batchLog.Info("scanning for new bridge events")
err = b.db.Transaction(func(tx *database.DB) error { err = b.db.Transaction(func(tx *database.DB) error {
l1BridgeLog := b.log.New("from_l1_block_number", fromL1Height, "to_l1_block_number", toL1Height) l1BridgeLog := b.log.New("from_l1_block_number", fromL1Height, "to_l1_block_number", toL1Height)
l2BridgeLog := b.log.New("from_l2_block_number", fromL2Height, "to_l2_block_number", toL2Height) l2BridgeLog := b.log.New("from_l2_block_number", fromL2Height, "to_l2_block_number", toL2Height)
......
...@@ -64,9 +64,15 @@ func (p *PreimageOracle) headerByBlockHash(blockHash common.Hash) *types.Header ...@@ -64,9 +64,15 @@ func (p *PreimageOracle) headerByBlockHash(blockHash common.Hash) *types.Header
func (p *PreimageOracle) BlockByHash(blockHash common.Hash) *types.Block { func (p *PreimageOracle) BlockByHash(blockHash common.Hash) *types.Block {
header := p.headerByBlockHash(blockHash) header := p.headerByBlockHash(blockHash)
txs := p.LoadTransactions(blockHash, header.TxHash)
return types.NewBlockWithHeader(header).WithBody(txs, nil)
}
func (p *PreimageOracle) LoadTransactions(blockHash common.Hash, txHash common.Hash) []*types.Transaction {
p.hint.Hint(TransactionsHint(blockHash)) p.hint.Hint(TransactionsHint(blockHash))
opaqueTxs := mpt.ReadTrie(header.TxHash, func(key common.Hash) []byte { opaqueTxs := mpt.ReadTrie(txHash, func(key common.Hash) []byte {
return p.oracle.Get(preimage.Keccak256Key(key)) return p.oracle.Get(preimage.Keccak256Key(key))
}) })
...@@ -74,8 +80,7 @@ func (p *PreimageOracle) BlockByHash(blockHash common.Hash) *types.Block { ...@@ -74,8 +80,7 @@ func (p *PreimageOracle) BlockByHash(blockHash common.Hash) *types.Block {
if err != nil { if err != nil {
panic(fmt.Errorf("failed to decode list of txs: %w", err)) panic(fmt.Errorf("failed to decode list of txs: %w", err))
} }
return txs
return types.NewBlockWithHeader(header).WithBody(txs, nil)
} }
func (p *PreimageOracle) NodeByHash(nodeHash common.Hash) []byte { func (p *PreimageOracle) NodeByHash(nodeHash common.Hash) []byte {
......
...@@ -17,8 +17,8 @@ const diskPermission = 0666 ...@@ -17,8 +17,8 @@ const diskPermission = 0666
// DiskKV is a disk-backed key-value store, every key-value pair is a hex-encoded .txt file, with the value as content. // DiskKV is a disk-backed key-value store, every key-value pair is a hex-encoded .txt file, with the value as content.
// DiskKV is safe for concurrent use with a single DiskKV instance. // DiskKV is safe for concurrent use with a single DiskKV instance.
// DiskKV is not safe for concurrent use between different DiskKV instances of the same disk directory: // DiskKV is safe for concurrent use between different DiskKV instances of the same disk directory as long as the
// a Put needs to be completed before another DiskKV Get retrieves the values. // file system supports atomic renames.
type DiskKV struct { type DiskKV struct {
sync.RWMutex sync.RWMutex
path string path string
...@@ -37,19 +37,22 @@ func (d *DiskKV) pathKey(k common.Hash) string { ...@@ -37,19 +37,22 @@ func (d *DiskKV) pathKey(k common.Hash) string {
func (d *DiskKV) Put(k common.Hash, v []byte) error { func (d *DiskKV) Put(k common.Hash, v []byte) error {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
f, err := os.OpenFile(d.pathKey(k), os.O_WRONLY|os.O_CREATE|os.O_EXCL|os.O_TRUNC, diskPermission) f, err := os.CreateTemp(d.path, k.String()+".txt.*")
if err != nil { if err != nil {
if errors.Is(err, os.ErrExist) { return fmt.Errorf("failed to open temp file for pre-image %s: %w", k, err)
return ErrAlreadyExists
}
return fmt.Errorf("failed to open new pre-image file %s: %w", k, err)
} }
defer os.Remove(f.Name()) // Clean up the temp file if it doesn't actually get moved into place
if _, err := f.Write([]byte(hex.EncodeToString(v))); err != nil { if _, err := f.Write([]byte(hex.EncodeToString(v))); err != nil {
_ = f.Close() _ = f.Close()
return fmt.Errorf("failed to write pre-image %s to disk: %w", k, err) return fmt.Errorf("failed to write pre-image %s to disk: %w", k, err)
} }
if err := f.Close(); err != nil { if err := f.Close(); err != nil {
return fmt.Errorf("failed to close pre-image %s file: %w", k, err) return fmt.Errorf("failed to close temp pre-image %s file: %w", k, err)
}
targetFile := d.pathKey(k)
if err := os.Rename(f.Name(), targetFile); err != nil {
return fmt.Errorf("failed to move temp dir %v to final destination %v: %w", f.Name(), targetFile, err)
} }
return nil return nil
} }
......
...@@ -9,13 +9,9 @@ import ( ...@@ -9,13 +9,9 @@ import (
// ErrNotFound is returned when a pre-image cannot be found in the KV store. // ErrNotFound is returned when a pre-image cannot be found in the KV store.
var ErrNotFound = errors.New("not found") var ErrNotFound = errors.New("not found")
// ErrAlreadyExists is returned when a pre-image already exists in the KV store.
var ErrAlreadyExists = errors.New("already exists")
// KV is a Key-Value store interface for pre-image data. // KV is a Key-Value store interface for pre-image data.
type KV interface { type KV interface {
// Put puts the pre-image value v in the key-value store with key k. // Put puts the pre-image value v in the key-value store with key k.
// It returns ErrAlreadyExists when the key already exists.
// KV store implementations may return additional errors specific to the KV storage. // KV store implementations may return additional errors specific to the KV storage.
Put(k common.Hash, v []byte) error Put(k common.Hash, v []byte) error
......
...@@ -45,9 +45,9 @@ func kvTest(t *testing.T, kv KV) { ...@@ -45,9 +45,9 @@ func kvTest(t *testing.T, kv KV) {
require.Equal(t, []byte{4, 2}, dat, "pre-image must match") require.Equal(t, []byte{4, 2}, dat, "pre-image must match")
}) })
t.Run("not overwriting pre-image", func(t *testing.T) { t.Run("allowing multiple writes for same pre-image", func(t *testing.T) {
t.Parallel() t.Parallel()
require.NoError(t, kv.Put(common.Hash{0xdd}, []byte{4, 2})) require.NoError(t, kv.Put(common.Hash{0xdd}, []byte{4, 2}))
require.ErrorIs(t, kv.Put(common.Hash{0xdd}, []byte{4, 2}), ErrAlreadyExists) require.NoError(t, kv.Put(common.Hash{0xdd}, []byte{4, 2}))
}) })
} }
...@@ -23,9 +23,6 @@ func NewMemKV() *MemKV { ...@@ -23,9 +23,6 @@ func NewMemKV() *MemKV {
func (m *MemKV) Put(k common.Hash, v []byte) error { func (m *MemKV) Put(k common.Hash, v []byte) error {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
if _, ok := m.m[k]; ok {
return ErrAlreadyExists
}
m.m[k] = v m.m[k] = v
return nil return nil
} }
......
...@@ -99,7 +99,7 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error { ...@@ -99,7 +99,7 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error {
return fmt.Errorf("failed to fetch L1 block %s receipts: %w", hash, err) return fmt.Errorf("failed to fetch L1 block %s receipts: %w", hash, err)
} }
return p.storeReceipts(receipts) return p.storeReceipts(receipts)
case l2.HintL2BlockHeader: case l2.HintL2BlockHeader, l2.HintL2Transactions:
header, txs, err := p.l2Fetcher.InfoAndTxsByHash(ctx, hash) header, txs, err := p.l2Fetcher.InfoAndTxsByHash(ctx, hash)
if err != nil { if err != nil {
return fmt.Errorf("failed to fetch L2 block %s: %w", hash, err) return fmt.Errorf("failed to fetch L2 block %s: %w", hash, err)
...@@ -155,10 +155,7 @@ func (p *Prefetcher) storeTrieNodes(values []hexutil.Bytes) error { ...@@ -155,10 +155,7 @@ func (p *Prefetcher) storeTrieNodes(values []hexutil.Bytes) error {
_, nodes := mpt.WriteTrie(values) _, nodes := mpt.WriteTrie(values)
for _, node := range nodes { for _, node := range nodes {
key := preimage.Keccak256Key(crypto.Keccak256Hash(node)).PreimageKey() key := preimage.Keccak256Key(crypto.Keccak256Hash(node)).PreimageKey()
if err := p.kvStore.Put(key, node); errors.Is(err, kvstore.ErrAlreadyExists) { if err := p.kvStore.Put(key, node); err != nil {
// It's not uncommon for different tries to contain common nodes (esp for receipts)
continue
} else if err != nil {
return fmt.Errorf("failed to store node: %w", err) return fmt.Errorf("failed to store node: %w", err)
} }
} }
......
...@@ -180,6 +180,31 @@ func TestFetchL2Block(t *testing.T) { ...@@ -180,6 +180,31 @@ func TestFetchL2Block(t *testing.T) {
}) })
} }
func TestFetchL2Transactions(t *testing.T) {
rng := rand.New(rand.NewSource(123))
block, rcpts := testutils.RandomBlock(rng, 10)
hash := block.Hash()
t.Run("AlreadyKnown", func(t *testing.T) {
prefetcher, _, _, kv := createPrefetcher(t)
storeBlock(t, kv, block, rcpts)
oracle := l2.NewPreimageOracle(asOracleFn(t, prefetcher), asHinter(t, prefetcher))
result := oracle.LoadTransactions(hash, block.TxHash())
assertTransactionsEqual(t, block.Transactions(), result)
})
t.Run("Unknown", func(t *testing.T) {
prefetcher, _, l2Cl, _ := createPrefetcher(t)
l2Cl.ExpectInfoAndTxsByHash(hash, eth.BlockToInfo(block), block.Transactions(), nil)
defer l2Cl.MockL2Client.AssertExpectations(t)
oracle := l2.NewPreimageOracle(asOracleFn(t, prefetcher), asHinter(t, prefetcher))
result := oracle.LoadTransactions(hash, block.TxHash())
assertTransactionsEqual(t, block.Transactions(), result)
})
}
func TestFetchL2Node(t *testing.T) { func TestFetchL2Node(t *testing.T) {
rng := rand.New(rand.NewSource(123)) rng := rand.New(rand.NewSource(123))
node := testutils.RandomData(rng, 30) node := testutils.RandomData(rng, 30)
......
...@@ -290,3 +290,34 @@ type Config struct { ...@@ -290,3 +290,34 @@ type Config struct {
Signer opcrypto.SignerFn Signer opcrypto.SignerFn
From common.Address From common.Address
} }
func (m Config) Check() error {
if m.Backend == nil {
return errors.New("must provide the Backend")
}
if m.NumConfirmations == 0 {
return errors.New("NumConfirmations must not be 0")
}
if m.NetworkTimeout == 0 {
return errors.New("must provide NetworkTimeout")
}
if m.ResubmissionTimeout == 0 {
return errors.New("must provide ResubmissionTimeout")
}
if m.ReceiptQueryInterval == 0 {
return errors.New("must provide ReceiptQueryInterval")
}
if m.TxNotInMempoolTimeout == 0 {
return errors.New("must provide TxNotInMempoolTimeout")
}
if m.SafeAbortNonceTooLowCount == 0 {
return errors.New("SafeAbortNonceTooLowCount must not be 0")
}
if m.Signer == nil {
return errors.New("must provide the Signer")
}
if m.ChainID == nil {
return errors.New("must provide the ChainID")
}
return nil
}
...@@ -112,7 +112,14 @@ func NewSimpleTxManager(name string, l log.Logger, m metrics.TxMetricer, cfg CLI ...@@ -112,7 +112,14 @@ func NewSimpleTxManager(name string, l log.Logger, m metrics.TxMetricer, cfg CLI
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewSimpleTxManagerFromConfig(name, l, m, conf)
}
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
func NewSimpleTxManagerFromConfig(name string, l log.Logger, m metrics.TxMetricer, conf Config) (*SimpleTxManager, error) {
if err := conf.Check(); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
}
return &SimpleTxManager{ return &SimpleTxManager{
chainID: conf.ChainID, chainID: conf.ChainID,
name: name, name: name,
...@@ -140,6 +147,8 @@ type TxCandidate struct { ...@@ -140,6 +147,8 @@ type TxCandidate struct {
To *common.Address To *common.Address
// GasLimit is the gas limit to be used in the constructed tx. // GasLimit is the gas limit to be used in the constructed tx.
GasLimit uint64 GasLimit uint64
// Value is the value to be used in the constructed tx.
Value *big.Int
} }
// Send is used to publish a transaction with incrementally higher gas prices // Send is used to publish a transaction with incrementally higher gas prices
...@@ -214,6 +223,7 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (* ...@@ -214,6 +223,7 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
GasTipCap: gasTipCap, GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap, GasFeeCap: gasFeeCap,
Data: candidate.TxData, Data: candidate.TxData,
Value: candidate.Value,
} }
m.l.Info("Creating tx", "to", rawTx.To, "from", m.cfg.From) m.l.Info("Creating tx", "to", rawTx.To, "from", m.cfg.From)
...@@ -229,6 +239,7 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (* ...@@ -229,6 +239,7 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
GasFeeCap: gasFeeCap, GasFeeCap: gasFeeCap,
GasTipCap: gasTipCap, GasTipCap: gasTipCap,
Data: rawTx.Data, Data: rawTx.Data,
Value: rawTx.Value,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to estimate gas: %w", err) return nil, fmt.Errorf("failed to estimate gas: %w", err)
......
...@@ -603,12 +603,21 @@ func TestWaitMinedMultipleConfs(t *testing.T) { ...@@ -603,12 +603,21 @@ func TestWaitMinedMultipleConfs(t *testing.T) {
require.Equal(t, txHash, receipt.TxHash) require.Equal(t, txHash, receipt.TxHash)
} }
// TestManagerErrsOnZeroCLIConfs ensures that the NewSimpleTxManager will error
// when attempting to configure with NumConfirmations set to zero.
func TestManagerErrsOnZeroCLIConfs(t *testing.T) {
t.Parallel()
_, err := NewSimpleTxManager("TEST", testlog.Logger(t, log.LvlCrit), &metrics.NoopTxMetrics{}, CLIConfig{})
require.Error(t, err)
}
// TestManagerErrsOnZeroConfs ensures that the NewSimpleTxManager will error // TestManagerErrsOnZeroConfs ensures that the NewSimpleTxManager will error
// when attempting to configure with NumConfirmations set to zero. // when attempting to configure with NumConfirmations set to zero.
func TestManagerErrsOnZeroConfs(t *testing.T) { func TestManagerErrsOnZeroConfs(t *testing.T) {
t.Parallel() t.Parallel()
_, err := NewSimpleTxManager("TEST", testlog.Logger(t, log.LvlCrit), &metrics.NoopTxMetrics{}, CLIConfig{}) _, err := NewSimpleTxManagerFromConfig("TEST", testlog.Logger(t, log.LvlCrit), &metrics.NoopTxMetrics{}, Config{})
require.Error(t, err) require.Error(t, err)
} }
......
...@@ -454,6 +454,63 @@ var ( ...@@ -454,6 +454,63 @@ var (
return engine.Copy(context.Background(), source, dest) return engine.Copy(context.Background(), source, dest)
}), }),
} }
EngineSetForkchoiceCmd = &cli.Command{
Name: "set-forkchoice",
Description: "Set forkchoice, specify unsafe, safe and finalized blocks by number",
Flags: []cli.Flag{
EngineEndpoint, EngineJWTPath,
&cli.Uint64Flag{
Name: "unsafe",
Usage: "Block number of block to set as latest block",
Required: true,
EnvVars: prefixEnvVars("UNSAFE"),
},
&cli.Uint64Flag{
Name: "safe",
Usage: "Block number of block to set as safe block",
Required: true,
EnvVars: prefixEnvVars("SAFE"),
},
&cli.Uint64Flag{
Name: "finalized",
Usage: "Block number of block to set as finalized block",
Required: true,
EnvVars: prefixEnvVars("FINALIZED"),
},
},
Action: EngineAction(func(ctx *cli.Context, client client.RPC) error {
return engine.SetForkchoice(ctx.Context, client, ctx.Uint64("finalized"), ctx.Uint64("safe"), ctx.Uint64("unsafe"))
}),
}
EngineJSONCmd = &cli.Command{
Name: "json",
Description: "read json values from remaining args, or STDIN, and use them as RPC params to call the engine RPC method (first arg)",
Flags: []cli.Flag{
EngineEndpoint, EngineJWTPath,
&cli.BoolFlag{
Name: "stdin",
Usage: "Read params from stdin instead",
Required: false,
EnvVars: prefixEnvVars("STDIN"),
},
},
ArgsUsage: "<rpc-method-name> [params...]",
Action: EngineAction(func(ctx *cli.Context, client client.RPC) error {
if ctx.NArg() == 0 {
return fmt.Errorf("expected at least 1 argument: RPC method name")
}
var r io.Reader
var args []string
if ctx.Bool("stdin") {
r = ctx.App.Reader
} else {
args = ctx.Args().Tail()
}
return engine.RawJSONInteraction(ctx.Context, client, ctx.Args().Get(0), args, r, ctx.App.Writer)
}),
}
) )
var CheatCmd = &cli.Command{ var CheatCmd = &cli.Command{
...@@ -481,5 +538,7 @@ var EngineCmd = &cli.Command{ ...@@ -481,5 +538,7 @@ var EngineCmd = &cli.Command{
EngineAutoCmd, EngineAutoCmd,
EngineStatusCmd, EngineStatusCmd,
EngineCopyCmd, EngineCopyCmd,
EngineSetForkchoiceCmd,
EngineJSONCmd,
}, },
} }
...@@ -3,8 +3,11 @@ package engine ...@@ -3,8 +3,11 @@ package engine
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io"
"math/big" "math/big"
"strings"
"time" "time"
"github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/beacon/engine"
...@@ -306,3 +309,88 @@ func Copy(ctx context.Context, copyFrom client.RPC, copyTo client.RPC) error { ...@@ -306,3 +309,88 @@ func Copy(ctx context.Context, copyFrom client.RPC, copyTo client.RPC) error {
} }
return nil return nil
} }
func SetForkchoice(ctx context.Context, client client.RPC, finalizedNum, safeNum, unsafeNum uint64) error {
if unsafeNum < safeNum {
return fmt.Errorf("cannot set unsafe (%d) < safe (%d)", unsafeNum, safeNum)
}
if safeNum < finalizedNum {
return fmt.Errorf("cannot set safe (%d) < finalized (%d)", safeNum, finalizedNum)
}
head, err := getHeader(ctx, client, "eth_getBlockByNumber", "latest")
if err != nil {
return fmt.Errorf("failed to get latest block: %w", err)
}
if unsafeNum > head.Number.Uint64() {
return fmt.Errorf("cannot set unsafe (%d) > latest (%d)", unsafeNum, head.Number.Uint64())
}
finalizedHeader, err := getHeader(ctx, client, "eth_getBlockByNumber", hexutil.Uint64(finalizedNum).String())
if err != nil {
return fmt.Errorf("failed to get block %d to mark finalized: %w", finalizedNum, err)
}
safeHeader, err := getHeader(ctx, client, "eth_getBlockByNumber", hexutil.Uint64(safeNum).String())
if err != nil {
return fmt.Errorf("failed to get block %d to mark safe: %w", safeNum, err)
}
if err := updateForkchoice(ctx, client, head.Hash(), safeHeader.Hash(), finalizedHeader.Hash()); err != nil {
return fmt.Errorf("failed to update forkchoice: %w", err)
}
return nil
}
func RawJSONInteraction(ctx context.Context, client client.RPC, method string, args []string, input io.Reader, output io.Writer) error {
var params []any
if input != nil {
r := json.NewDecoder(input)
for {
var param json.RawMessage
if err := r.Decode(&param); err != nil {
if errors.Is(err, io.EOF) {
break
}
return fmt.Errorf("unexpected error while reading json params: %w", err)
}
params = append(params, param)
}
} else {
for _, arg := range args {
// add quotes to unquoted strings, but not to other json data
if isUnquotedJsonString(arg) {
arg = fmt.Sprintf("%q", arg)
}
params = append(params, json.RawMessage(arg))
}
}
var result json.RawMessage
if err := client.CallContext(ctx, &result, method, params...); err != nil {
return fmt.Errorf("failed RPC call: %w", err)
}
if _, err := output.Write(result); err != nil {
return fmt.Errorf("failed to write RPC output: %w", err)
}
return nil
}
func isUnquotedJsonString(v string) bool {
v = strings.TrimSpace(v)
// check if empty string (must get quotes)
if len(v) == 0 {
return true
}
// check if special value
switch v {
case "null", "true", "false":
return false
}
// check if it looks like a json structure
switch v[0] {
case '[', '{', '"':
return false
}
// check if a number
var n json.Number
if err := json.Unmarshal([]byte(v), &n); err == nil {
return false
}
return true
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment