Commit 81d0d336 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into aj/deploy-dispute-game-e2e

parents 9d51d103 b4921f3e
...@@ -23,6 +23,7 @@ require ( ...@@ -23,6 +23,7 @@ require (
github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-ds-leveldb v0.5.0
github.com/jackc/pgtype v1.14.0 github.com/jackc/pgtype v1.14.0
github.com/jackc/pgx/v5 v5.3.1
github.com/lib/pq v1.10.9 github.com/lib/pq v1.10.9
github.com/libp2p/go-libp2p v0.25.1 github.com/libp2p/go-libp2p v0.25.1
github.com/libp2p/go-libp2p-pubsub v0.9.3 github.com/libp2p/go-libp2p-pubsub v0.9.3
...@@ -105,7 +106,6 @@ require ( ...@@ -105,7 +106,6 @@ require (
github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.3.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect github.com/jbenet/goprocess v0.1.4 // indirect
......
...@@ -21,8 +21,8 @@ const ( ...@@ -21,8 +21,8 @@ const (
) )
// DepositsByAddress mocks returning deposits by an address // DepositsByAddress mocks returning deposits by an address
func (mbv *MockBridgeView) DepositsByAddress(address common.Address) ([]*database.DepositWithTransactionHash, error) { func (mbv *MockBridgeView) DepositsByAddress(address common.Address) ([]*database.DepositWithTransactionHashes, error) {
return []*database.DepositWithTransactionHash{ return []*database.DepositWithTransactionHashes{
{ {
Deposit: database.Deposit{ Deposit: database.Deposit{
GUID: uuid.MustParse(guid1), GUID: uuid.MustParse(guid1),
......
package cli package cli
import ( import (
"context"
"fmt" "fmt"
"os"
"github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
) )
...@@ -20,16 +24,26 @@ type Cli struct { ...@@ -20,16 +24,26 @@ type Cli struct {
func runIndexer(ctx *cli.Context) error { func runIndexer(ctx *cli.Context) error {
configPath := ctx.String(ConfigFlag.Name) configPath := ctx.String(ConfigFlag.Name)
conf, err := config.LoadConfig(configPath) cfg, err := config.LoadConfig(configPath)
if err != nil {
return err
}
fmt.Println(conf) // setup logger
cfg.Logger = log.NewLogger(log.ReadCLIConfig(ctx))
indexer, err := indexer.NewIndexer(cfg)
if err != nil { if err != nil {
log.Crit("Failed to load config", "message", err) return err
} }
// finish me indexerCtx, indexerCancel := context.WithCancel(context.Background())
return nil go func() {
opio.BlockOnInterrupts()
indexerCancel()
}()
return indexer.Run(indexerCtx)
} }
func runApi(ctx *cli.Context) error { func runApi(ctx *cli.Context) error {
...@@ -39,8 +53,9 @@ func runApi(ctx *cli.Context) error { ...@@ -39,8 +53,9 @@ func runApi(ctx *cli.Context) error {
fmt.Println(conf) fmt.Println(conf)
if err != nil { if err != nil {
log.Crit("Failed to load config", "message", err) panic(err)
} }
// finish me // finish me
return nil return nil
} }
...@@ -71,17 +86,7 @@ func (c *Cli) Run(args []string) error { ...@@ -71,17 +86,7 @@ func (c *Cli) Run(args []string) error {
} }
func NewCli(GitVersion string, GitCommit string, GitDate string) *Cli { func NewCli(GitVersion string, GitCommit string, GitDate string) *Cli {
log.Root().SetHandler( flags := append([]cli.Flag{ConfigFlag}, log.CLIFlags("INDEXER")...)
log.LvlFilterHandler(
log.LvlInfo,
log.StreamHandler(os.Stdout, log.TerminalFormat(true)),
),
)
flags := []cli.Flag{
ConfigFlag,
}
app := &cli.App{ app := &cli.App{
Version: fmt.Sprintf("%s-%s", GitVersion, params.VersionWithCommit(GitCommit, GitDate)), Version: fmt.Sprintf("%s-%s", GitVersion, params.VersionWithCommit(GitCommit, GitDate)),
Description: "An indexer of all optimism events with a serving api layer", Description: "An indexer of all optimism events with a serving api layer",
......
...@@ -4,6 +4,8 @@ import ( ...@@ -4,6 +4,8 @@ import (
"os" "os"
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
"github.com/ethereum/go-ethereum/log"
) )
// Config represents the `indexer.toml` file used to configure the indexer // Config represents the `indexer.toml` file used to configure the indexer
...@@ -13,6 +15,7 @@ type Config struct { ...@@ -13,6 +15,7 @@ type Config struct {
DB DBConfig DB DBConfig
API APIConfig API APIConfig
Metrics MetricsConfig Metrics MetricsConfig
Logger log.Logger `toml:"-"`
} }
// ChainConfig configures of the chain being indexed // ChainConfig configures of the chain being indexed
...@@ -31,6 +34,7 @@ type RPCsConfig struct { ...@@ -31,6 +34,7 @@ type RPCsConfig struct {
type DBConfig struct { type DBConfig struct {
Host string Host string
Port int Port int
Name string
User string User string
Password string Password string
} }
......
...@@ -3,6 +3,7 @@ package database ...@@ -3,6 +3,7 @@ package database
import ( import (
"context" "context"
"errors" "errors"
"math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -53,15 +54,20 @@ type LegacyStateBatch struct { ...@@ -53,15 +54,20 @@ type LegacyStateBatch struct {
type OutputProposal struct { type OutputProposal struct {
OutputRoot common.Hash `gorm:"primaryKey;serializer:json"` OutputRoot common.Hash `gorm:"primaryKey;serializer:json"`
L2OutputIndex U256
L2BlockNumber U256 L2BlockNumber U256
L1ContractEventGUID uuid.UUID L1ContractEventGUID uuid.UUID
} }
type BlocksView interface { type BlocksView interface {
L1BlockHeader(*big.Int) (*L1BlockHeader, error)
LatestL1BlockHeader() (*L1BlockHeader, error) LatestL1BlockHeader() (*L1BlockHeader, error)
LatestCheckpointedOutput() (*OutputProposal, error) LatestCheckpointedOutput() (*OutputProposal, error)
OutputProposal(index *big.Int) (*OutputProposal, error)
L2BlockHeader(*big.Int) (*L2BlockHeader, error)
LatestL2BlockHeader() (*L2BlockHeader, error) LatestL2BlockHeader() (*L2BlockHeader, error)
} }
...@@ -104,6 +110,20 @@ func (db *blocksDB) StoreOutputProposals(outputs []*OutputProposal) error { ...@@ -104,6 +110,20 @@ func (db *blocksDB) StoreOutputProposals(outputs []*OutputProposal) error {
return result.Error return result.Error
} }
func (db *blocksDB) L1BlockHeader(height *big.Int) (*L1BlockHeader, error) {
var l1Header L1BlockHeader
result := db.gorm.Where(&BlockHeader{Number: U256{Int: height}}).Take(&l1Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
return &l1Header, nil
}
func (db *blocksDB) LatestL1BlockHeader() (*L1BlockHeader, error) { func (db *blocksDB) LatestL1BlockHeader() (*L1BlockHeader, error) {
var l1Header L1BlockHeader var l1Header L1BlockHeader
result := db.gorm.Order("number DESC").Take(&l1Header) result := db.gorm.Order("number DESC").Take(&l1Header)
...@@ -120,7 +140,21 @@ func (db *blocksDB) LatestL1BlockHeader() (*L1BlockHeader, error) { ...@@ -120,7 +140,21 @@ func (db *blocksDB) LatestL1BlockHeader() (*L1BlockHeader, error) {
func (db *blocksDB) LatestCheckpointedOutput() (*OutputProposal, error) { func (db *blocksDB) LatestCheckpointedOutput() (*OutputProposal, error) {
var outputProposal OutputProposal var outputProposal OutputProposal
result := db.gorm.Order("l2_block_number DESC").Take(&outputProposal) result := db.gorm.Order("l2_output_index DESC").Take(&outputProposal)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
return &outputProposal, nil
}
func (db *blocksDB) OutputProposal(index *big.Int) (*OutputProposal, error) {
var outputProposal OutputProposal
result := db.gorm.Where(&OutputProposal{L2OutputIndex: U256{Int: index}}).Take(&outputProposal)
if result.Error != nil { if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) { if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil return nil, nil
...@@ -139,6 +173,20 @@ func (db *blocksDB) StoreL2BlockHeaders(headers []*L2BlockHeader) error { ...@@ -139,6 +173,20 @@ func (db *blocksDB) StoreL2BlockHeaders(headers []*L2BlockHeader) error {
return result.Error return result.Error
} }
func (db *blocksDB) L2BlockHeader(height *big.Int) (*L2BlockHeader, error) {
var l2Header L2BlockHeader
result := db.gorm.Where(&BlockHeader{Number: U256{Int: height}}).Take(&l2Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
return &l2Header, nil
}
func (db *blocksDB) LatestL2BlockHeader() (*L2BlockHeader, error) { func (db *blocksDB) LatestL2BlockHeader() (*L2BlockHeader, error) {
var l2Header L2BlockHeader var l2Header L2BlockHeader
result := db.gorm.Order("number DESC").Take(&l2Header) result := db.gorm.Order("number DESC").Take(&l2Header)
......
...@@ -48,9 +48,11 @@ type Deposit struct { ...@@ -48,9 +48,11 @@ type Deposit struct {
TokenPair TokenPair `gorm:"embedded"` TokenPair TokenPair `gorm:"embedded"`
} }
type DepositWithTransactionHash struct { type DepositWithTransactionHashes struct {
Deposit Deposit `gorm:"embedded"` Deposit Deposit `gorm:"embedded"`
L1TransactionHash common.Hash `gorm:"serializer:json"` L1TransactionHash common.Hash `gorm:"serializer:json"`
FinalizedL2TransactionHash common.Hash `gorm:"serializer:json"`
} }
type Withdrawal struct { type Withdrawal struct {
...@@ -77,12 +79,12 @@ type WithdrawalWithTransactionHashes struct { ...@@ -77,12 +79,12 @@ type WithdrawalWithTransactionHashes struct {
Withdrawal Withdrawal `gorm:"embedded"` Withdrawal Withdrawal `gorm:"embedded"`
L2TransactionHash common.Hash `gorm:"serializer:json"` L2TransactionHash common.Hash `gorm:"serializer:json"`
ProvenL1TransactionHash *common.Hash `gorm:"serializer:json"` ProvenL1TransactionHash common.Hash `gorm:"serializer:json"`
FinalizedL1TransactionHash *common.Hash `gorm:"serializer:json"` FinalizedL1TransactionHash common.Hash `gorm:"serializer:json"`
} }
type BridgeView interface { type BridgeView interface {
DepositsByAddress(address common.Address) ([]*DepositWithTransactionHash, error) DepositsByAddress(address common.Address) ([]*DepositWithTransactionHashes, error)
DepositByMessageNonce(*big.Int) (*Deposit, error) DepositByMessageNonce(*big.Int) (*Deposit, error)
LatestDepositMessageNonce() (*big.Int, error) LatestDepositMessageNonce() (*big.Int, error)
...@@ -122,14 +124,16 @@ func (db *bridgeDB) StoreDeposits(deposits []*Deposit) error { ...@@ -122,14 +124,16 @@ func (db *bridgeDB) StoreDeposits(deposits []*Deposit) error {
return result.Error return result.Error
} }
func (db *bridgeDB) DepositsByAddress(address common.Address) ([]*DepositWithTransactionHash, error) { func (db *bridgeDB) DepositsByAddress(address common.Address) ([]*DepositWithTransactionHashes, error) {
depositsQuery := db.gorm.Table("deposits").Select("deposits.*, l1_contract_events.transaction_hash AS l1_transaction_hash") depositsQuery := db.gorm.Table("deposits").Select("deposits.*, l1_contract_events.transaction_hash AS l1_transaction_hash, l2_contract_events.transaction_hash AS finalized_l2_transaction_hash")
eventsJoinQuery := depositsQuery.Joins("LEFT JOIN l1_contract_events ON deposits.initiated_l1_event_guid = l1_contract_events.guid")
initiatedJoinQuery := depositsQuery.Joins("LEFT JOIN l1_contract_events ON deposits.initiated_l1_event_guid = l1_contract_events.guid")
finalizedJoinQuery := initiatedJoinQuery.Joins("LEFT JOIN l2_contract_events ON deposits.finalized_l2_event_guid = l2_contract_events.guid")
// add in cursoring options // add in cursoring options
filteredQuery := eventsJoinQuery.Where(&Transaction{FromAddress: address}).Order("deposits.timestamp DESC").Limit(100) filteredQuery := finalizedJoinQuery.Where(&Transaction{FromAddress: address}).Order("deposits.timestamp DESC").Limit(100)
deposits := make([]*DepositWithTransactionHash, 100) deposits := make([]*DepositWithTransactionHashes, 100)
result := filteredQuery.Scan(&deposits) result := filteredQuery.Scan(&deposits)
if result.Error != nil { if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) { if errors.Is(result.Error, gorm.ErrRecordNotFound) {
...@@ -144,7 +148,7 @@ func (db *bridgeDB) DepositsByAddress(address common.Address) ([]*DepositWithTra ...@@ -144,7 +148,7 @@ func (db *bridgeDB) DepositsByAddress(address common.Address) ([]*DepositWithTra
func (db *bridgeDB) DepositByMessageNonce(nonce *big.Int) (*Deposit, error) { func (db *bridgeDB) DepositByMessageNonce(nonce *big.Int) (*Deposit, error) {
var deposit Deposit var deposit Deposit
result := db.gorm.First(&deposit, "sent_message_nonce = ?", U256{Int: nonce}) result := db.gorm.Where(&Deposit{SentMessageNonce: U256{Int: nonce}}).Take(&deposit)
if result.Error != nil { if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) { if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil return nil, nil
...@@ -172,7 +176,7 @@ func (db *bridgeDB) LatestDepositMessageNonce() (*big.Int, error) { ...@@ -172,7 +176,7 @@ func (db *bridgeDB) LatestDepositMessageNonce() (*big.Int, error) {
func (db *bridgeDB) MarkFinalizedDepositEvent(guid, finalizationEventGUID uuid.UUID) error { func (db *bridgeDB) MarkFinalizedDepositEvent(guid, finalizationEventGUID uuid.UUID) error {
var deposit Deposit var deposit Deposit
result := db.gorm.First(&deposit, "guid = ?", guid) result := db.gorm.Where(&Deposit{GUID: guid}).Take(&deposit)
if result.Error != nil { if result.Error != nil {
return result.Error return result.Error
} }
...@@ -191,7 +195,7 @@ func (db *bridgeDB) StoreWithdrawals(withdrawals []*Withdrawal) error { ...@@ -191,7 +195,7 @@ func (db *bridgeDB) StoreWithdrawals(withdrawals []*Withdrawal) error {
func (db *bridgeDB) MarkProvenWithdrawalEvent(guid, provenL1EventGuid uuid.UUID) error { func (db *bridgeDB) MarkProvenWithdrawalEvent(guid, provenL1EventGuid uuid.UUID) error {
var withdrawal Withdrawal var withdrawal Withdrawal
result := db.gorm.First(&withdrawal, "guid = ?", guid) result := db.gorm.Where(&Withdrawal{GUID: guid}).Take(&withdrawal)
if result.Error != nil { if result.Error != nil {
return result.Error return result.Error
} }
...@@ -203,7 +207,7 @@ func (db *bridgeDB) MarkProvenWithdrawalEvent(guid, provenL1EventGuid uuid.UUID) ...@@ -203,7 +207,7 @@ func (db *bridgeDB) MarkProvenWithdrawalEvent(guid, provenL1EventGuid uuid.UUID)
func (db *bridgeDB) MarkFinalizedWithdrawalEvent(guid, finalizedL1EventGuid uuid.UUID) error { func (db *bridgeDB) MarkFinalizedWithdrawalEvent(guid, finalizedL1EventGuid uuid.UUID) error {
var withdrawal Withdrawal var withdrawal Withdrawal
result := db.gorm.First(&withdrawal, "guid = ?", guid) result := db.gorm.Where(&Withdrawal{GUID: guid}).Take(&withdrawal)
if result.Error != nil { if result.Error != nil {
return result.Error return result.Error
} }
...@@ -242,7 +246,7 @@ func (db *bridgeDB) WithdrawalsByAddress(address common.Address) ([]*WithdrawalW ...@@ -242,7 +246,7 @@ func (db *bridgeDB) WithdrawalsByAddress(address common.Address) ([]*WithdrawalW
func (db *bridgeDB) WithdrawalByMessageNonce(nonce *big.Int) (*Withdrawal, error) { func (db *bridgeDB) WithdrawalByMessageNonce(nonce *big.Int) (*Withdrawal, error) {
var withdrawal Withdrawal var withdrawal Withdrawal
result := db.gorm.First(&withdrawal, "sent_message_nonce = ?", U256{Int: nonce}) result := db.gorm.Where(&Withdrawal{SentMessageNonce: U256{Int: nonce}}).Take(&withdrawal)
if result.Error != nil { if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) { if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil return nil, nil
...@@ -256,7 +260,7 @@ func (db *bridgeDB) WithdrawalByMessageNonce(nonce *big.Int) (*Withdrawal, error ...@@ -256,7 +260,7 @@ func (db *bridgeDB) WithdrawalByMessageNonce(nonce *big.Int) (*Withdrawal, error
func (db *bridgeDB) WithdrawalByHash(hash common.Hash) (*Withdrawal, error) { func (db *bridgeDB) WithdrawalByHash(hash common.Hash) (*Withdrawal, error) {
var withdrawal Withdrawal var withdrawal Withdrawal
result := db.gorm.First(&withdrawal, "withdrawal_hash = ?", hash.String()) result := db.gorm.Where(&Withdrawal{WithdrawalHash: hash}).Take(&withdrawal)
if result.Error != nil { if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) { if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil return nil, nil
......
package database package database
import ( import (
"errors"
"gorm.io/gorm" "gorm.io/gorm"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -46,6 +48,11 @@ type L2ContractEvent struct { ...@@ -46,6 +48,11 @@ type L2ContractEvent struct {
} }
type ContractEventsView interface { type ContractEventsView interface {
L1ContractEvent(uuid.UUID) (*L1ContractEvent, error)
L1ContractEventByTxLogIndex(common.Hash, uint64) (*L1ContractEvent, error)
L2ContractEvent(uuid.UUID) (*L2ContractEvent, error)
L2ContractEventByTxLogIndex(common.Hash, uint64) (*L2ContractEvent, error)
} }
type ContractEventsDB interface { type ContractEventsDB interface {
...@@ -74,9 +81,65 @@ func (db *contractEventsDB) StoreL1ContractEvents(events []*L1ContractEvent) err ...@@ -74,9 +81,65 @@ func (db *contractEventsDB) StoreL1ContractEvents(events []*L1ContractEvent) err
return result.Error return result.Error
} }
func (db *contractEventsDB) L1ContractEvent(uuid uuid.UUID) (*L1ContractEvent, error) {
var l1ContractEvent L1ContractEvent
result := db.gorm.Where(&ContractEvent{GUID: uuid}).Take(&l1ContractEvent)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
return &l1ContractEvent, nil
}
func (db *contractEventsDB) L1ContractEventByTxLogIndex(txHash common.Hash, logIndex uint64) (*L1ContractEvent, error) {
var l1ContractEvent L1ContractEvent
result := db.gorm.Where(&ContractEvent{TransactionHash: txHash, LogIndex: logIndex}).Take(&l1ContractEvent)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
return &l1ContractEvent, nil
}
// L2 // L2
func (db *contractEventsDB) StoreL2ContractEvents(events []*L2ContractEvent) error { func (db *contractEventsDB) StoreL2ContractEvents(events []*L2ContractEvent) error {
result := db.gorm.Create(&events) result := db.gorm.Create(&events)
return result.Error return result.Error
} }
func (db *contractEventsDB) L2ContractEvent(uuid uuid.UUID) (*L2ContractEvent, error) {
var l2ContractEvent L2ContractEvent
result := db.gorm.Where(&ContractEvent{GUID: uuid}).Take(&l2ContractEvent)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
return &l2ContractEvent, nil
}
func (db *contractEventsDB) L2ContractEventByTxLogIndex(txHash common.Hash, logIndex uint64) (*L2ContractEvent, error) {
var l2ContractEvent L2ContractEvent
result := db.gorm.Where(&ContractEvent{TransactionHash: txHash, LogIndex: logIndex}).Take(&l2ContractEvent)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
return &l2ContractEvent, nil
}
...@@ -4,6 +4,7 @@ package database ...@@ -4,6 +4,7 @@ package database
import ( import (
"gorm.io/driver/postgres" "gorm.io/driver/postgres"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/logger"
) )
type DB struct { type DB struct {
...@@ -19,6 +20,10 @@ func NewDB(dsn string) (*DB, error) { ...@@ -19,6 +20,10 @@ func NewDB(dsn string) (*DB, error) {
// The indexer will explicitly manage the transaction // The indexer will explicitly manage the transaction
// flow processing blocks // flow processing blocks
SkipDefaultTransaction: true, SkipDefaultTransaction: true,
// We may choose to create an adapter such that the
// logger emits to the geth logger when on DEBUG mode
Logger: logger.Default.LogMode(logger.Silent),
}) })
if err != nil { if err != nil {
...@@ -43,6 +48,15 @@ func (db *DB) Transaction(fn func(db *DB) error) error { ...@@ -43,6 +48,15 @@ func (db *DB) Transaction(fn func(db *DB) error) error {
}) })
} }
func (db *DB) Close() error {
sql, err := db.gorm.DB()
if err != nil {
return err
}
return sql.Close()
}
func dbFromGormTx(tx *gorm.DB) *DB { func dbFromGormTx(tx *gorm.DB) *DB {
return &DB{ return &DB{
gorm: tx, gorm: tx,
......
package e2e_tests
import (
"context"
"math/big"
"testing"
"time"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/indexer/processor"
"github.com/ethereum-optimism/optimism/op-service/client/utils"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/require"
)
func TestE2EBlockHeaders(t *testing.T) {
testSuite := createE2ETestSuite(t)
l1Client := testSuite.OpSys.Clients["l1"]
l2Client := testSuite.OpSys.Clients["sequencer"]
l2OutputOracle, err := bindings.NewL2OutputOracleCaller(predeploys.DevL2OutputOracleAddr, l1Client)
require.NoError(t, err)
// a minute for total setup to finish
setupCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
// wait for at least 10 L2 blocks to be created & posted on L1
require.NoError(t, utils.WaitFor(setupCtx, time.Second, func() (bool, error) {
l2Height, err := l2OutputOracle.LatestBlockNumber(&bind.CallOpts{Context: setupCtx})
return l2Height != nil && l2Height.Uint64() >= 9, err
}))
// ensure the processors are caught up to this state
l1Height, err := l1Client.BlockNumber(setupCtx)
require.NoError(t, err)
require.NoError(t, utils.WaitFor(setupCtx, time.Second, func() (bool, error) {
l1Header := testSuite.Indexer.L1Processor.LatestProcessedHeader()
l2Header := testSuite.Indexer.L2Processor.LatestProcessedHeader()
return (l1Header != nil && l1Header.Number.Uint64() >= l1Height) && (l2Header != nil && l2Header.Number.Uint64() >= 9), nil
}))
t.Run("indexes L2 blocks", func(t *testing.T) {
latestL2Header, err := testSuite.DB.Blocks.LatestL2BlockHeader()
require.NoError(t, err)
require.NotNil(t, latestL2Header)
require.True(t, latestL2Header.Number.Int.Uint64() >= 9)
for i := int64(0); i < 10; i++ {
height := big.NewInt(i)
indexedHeader, err := testSuite.DB.Blocks.L2BlockHeader(height)
require.NoError(t, err)
require.NotNil(t, indexedHeader)
header, err := l2Client.HeaderByNumber(context.Background(), height)
require.NoError(t, err)
require.NotNil(t, indexedHeader)
require.Equal(t, header.Number.Int64(), indexedHeader.Number.Int.Int64())
require.Equal(t, header.Hash(), indexedHeader.Hash)
require.Equal(t, header.ParentHash, indexedHeader.ParentHash)
require.Equal(t, header.Time, indexedHeader.Timestamp)
}
})
t.Run("indexes L2 checkpoints", func(t *testing.T) {
latestOutput, err := testSuite.DB.Blocks.LatestCheckpointedOutput()
require.NoError(t, err)
require.NotNil(t, latestOutput)
require.GreaterOrEqual(t, latestOutput.L2BlockNumber.Int.Uint64(), uint64(9))
l2EthClient, err := node.DialEthClient(testSuite.OpSys.Nodes["sequencer"].HTTPEndpoint())
require.NoError(t, err)
submissionInterval := testSuite.OpCfg.DeployConfig.L2OutputOracleSubmissionInterval
numOutputs := latestOutput.L2BlockNumber.Int.Uint64() / submissionInterval
for i := int64(0); i < int64(numOutputs); i++ {
blockNumber := big.NewInt((i + 1) * int64(submissionInterval))
output, err := testSuite.DB.Blocks.OutputProposal(big.NewInt(i))
require.NoError(t, err)
require.NotNil(t, output)
require.Equal(t, i, output.L2OutputIndex.Int.Int64())
require.Equal(t, blockNumber, output.L2BlockNumber.Int)
require.NotEmpty(t, output.L1ContractEventGUID)
// we may as well check the integrity of the output root
l2Block, err := l2Client.BlockByNumber(context.Background(), blockNumber)
require.NoError(t, err)
messagePasserStorageHash, err := l2EthClient.StorageHash(predeploys.L2ToL1MessagePasserAddr, blockNumber)
require.NoError(t, err)
// construct and check output root
outputRootPreImage := [128]byte{} // 4 words (first 32 are zero for version 0)
copy(outputRootPreImage[32:64], l2Block.Root().Bytes()) // state root
copy(outputRootPreImage[64:96], messagePasserStorageHash.Bytes()) // message passer storage root
copy(outputRootPreImage[96:128], l2Block.Hash().Bytes()) // block hash
require.Equal(t, crypto.Keccak256Hash(outputRootPreImage[:]), output.OutputRoot)
}
})
t.Run("indexes L1 logs and associated blocks", func(t *testing.T) {
testCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
devContracts := processor.DevL1Contracts().ToSlice()
logFilter := ethereum.FilterQuery{FromBlock: big.NewInt(0), ToBlock: big.NewInt(int64(l1Height)), Addresses: devContracts}
logs, err := l1Client.FilterLogs(testCtx, logFilter) // []types.Log
require.NoError(t, err)
for _, log := range logs {
contractEvent, err := testSuite.DB.ContractEvents.L1ContractEventByTxLogIndex(log.TxHash, uint64(log.Index))
require.NoError(t, err)
require.Equal(t, log.Topics[0], contractEvent.EventSignature)
require.Equal(t, log.BlockHash, contractEvent.BlockHash)
require.Equal(t, log.TxHash, contractEvent.TransactionHash)
require.Equal(t, log.Index, uint(contractEvent.LogIndex))
// ensure the block is also indexed
block, err := l1Client.BlockByNumber(testCtx, big.NewInt(int64(log.BlockNumber)))
require.NoError(t, err)
require.Equal(t, block.Time(), contractEvent.Timestamp)
l1BlockHeader, err := testSuite.DB.Blocks.L1BlockHeader(block.Number())
require.NoError(t, err)
require.Equal(t, block.Hash(), l1BlockHeader.Hash)
require.Equal(t, block.ParentHash(), l1BlockHeader.ParentHash)
require.Equal(t, block.Number(), l1BlockHeader.Number.Int)
require.Equal(t, block.Time(), l1BlockHeader.Timestamp)
}
})
}
package e2e_tests
import (
"context"
"math/big"
"testing"
"time"
"github.com/ethereum-optimism/optimism/indexer/processor"
"github.com/ethereum-optimism/optimism/op-service/client/utils"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
op_e2e "github.com/ethereum-optimism/optimism/op-e2e"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
)
func TestE2EBridge(t *testing.T) {
testSuite := createE2ETestSuite(t)
l1Client := testSuite.OpSys.Clients["l1"]
l2Client := testSuite.OpSys.Clients["sequencer"]
l1StandardBridge, err := bindings.NewL1StandardBridge(predeploys.DevL1StandardBridgeAddr, l1Client)
require.NoError(t, err)
l2StandardBridge, err := bindings.NewL2StandardBridge(predeploys.L2StandardBridgeAddr, l2Client)
require.NoError(t, err)
// pre-emptively conduct a deposit & withdrawal to speed up the test
setupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
aliceAddr := testSuite.OpCfg.Secrets.Addresses().Alice
l1Opts, err := bind.NewKeyedTransactorWithChainID(testSuite.OpCfg.Secrets.Alice, testSuite.OpCfg.L1ChainIDBig())
require.NoError(t, err)
l2Opts, err := bind.NewKeyedTransactorWithChainID(testSuite.OpCfg.Secrets.Alice, testSuite.OpCfg.L2ChainIDBig())
require.NoError(t, err)
l1Opts.Value = big.NewInt(params.Ether)
l2Opts.Value = big.NewInt(params.Ether)
depositTx, err := l1StandardBridge.DepositETH(l1Opts, 200_000, []byte{byte(1)})
require.NoError(t, err)
withdrawTx, err := l2StandardBridge.Withdraw(l2Opts, processor.EthAddress, big.NewInt(params.Ether), 200_000, []byte{byte(1)})
require.NoError(t, err)
depositReceipt, err := utils.WaitReceiptOK(setupCtx, l1Client, depositTx.Hash())
require.NoError(t, err)
withdrawalReceipt, err := utils.WaitReceiptOK(setupCtx, l2Client, withdrawTx.Hash())
require.NoError(t, err)
t.Run("indexes ETH deposits", func(t *testing.T) {
testCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
// Pause the L2Processor so that we can test for finalization separately. A pause is
// required since deposit inclusion is apart of the L2 block derivation process
testSuite.Indexer.L2Processor.PauseForTest()
// (1) Test Deposit Initiation
// wait for processor catchup
require.NoError(t, utils.WaitFor(testCtx, 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.L1Processor.LatestProcessedHeader()
return l1Header != nil && l1Header.Number.Uint64() >= depositReceipt.BlockNumber.Uint64(), nil
}))
aliceDeposits, err := testSuite.DB.Bridge.DepositsByAddress(aliceAddr)
require.NoError(t, err)
require.Len(t, aliceDeposits, 1)
require.Equal(t, depositTx.Hash(), aliceDeposits[0].L1TransactionHash)
require.Empty(t, aliceDeposits[0].FinalizedL2TransactionHash)
deposit := aliceDeposits[0].Deposit
require.Nil(t, deposit.FinalizedL2EventGUID)
require.Equal(t, processor.EthAddress, deposit.TokenPair.L1TokenAddress)
require.Equal(t, processor.EthAddress, deposit.TokenPair.L2TokenAddress)
require.Equal(t, big.NewInt(params.Ether), deposit.Tx.Amount.Int)
require.Equal(t, aliceAddr, deposit.Tx.FromAddress)
require.Equal(t, aliceAddr, deposit.Tx.ToAddress)
require.Equal(t, byte(1), deposit.Tx.Data[0])
// (2) Test Deposit Finalization
testSuite.Indexer.L2Processor.ResumeForTest()
// finalization hash can be deterministically derived from TransactionDeposited log
var depositTxHash common.Hash
for _, log := range depositReceipt.Logs {
if log.Topics[0] == derive.DepositEventABIHash {
deposit, err := derive.UnmarshalDepositLogEvent(log)
require.NoError(t, err)
depositTxHash = types.NewTx(deposit).Hash()
break
}
}
// wait for the l2 processor to catch this deposit in the derivation process
_, err = utils.WaitReceiptOK(testCtx, l2Client, depositTxHash)
require.NoError(t, err)
l2Height, err := l2Client.BlockNumber(testCtx)
require.NoError(t, err)
require.NoError(t, utils.WaitFor(testCtx, 500*time.Millisecond, func() (bool, error) {
l2Header := testSuite.Indexer.L2Processor.LatestProcessedHeader()
return l2Header != nil && l2Header.Number.Uint64() >= l2Height, nil
}))
aliceDeposits, err = testSuite.DB.Bridge.DepositsByAddress(aliceAddr)
require.NoError(t, err)
require.Equal(t, depositTxHash, aliceDeposits[0].FinalizedL2TransactionHash)
require.NotNil(t, aliceDeposits[0].Deposit.FinalizedL2EventGUID)
})
t.Run("indexes ETH withdrawals", func(t *testing.T) {
testCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
// (1) Test Withdrawal Initiation
// wait for processor catchup
require.NoError(t, utils.WaitFor(testCtx, 500*time.Millisecond, func() (bool, error) {
l2Header := testSuite.Indexer.L2Processor.LatestProcessedHeader()
return l2Header != nil && l2Header.Number.Uint64() >= withdrawalReceipt.BlockNumber.Uint64(), nil
}))
aliceWithdrawals, err := testSuite.DB.Bridge.WithdrawalsByAddress(aliceAddr)
require.NoError(t, err)
require.Len(t, aliceWithdrawals, 1)
require.Equal(t, withdrawTx.Hash(), aliceWithdrawals[0].L2TransactionHash)
require.Empty(t, aliceWithdrawals[0].ProvenL1TransactionHash)
require.Empty(t, aliceWithdrawals[0].FinalizedL1TransactionHash)
withdrawal := aliceWithdrawals[0].Withdrawal
require.Nil(t, withdrawal.ProvenL1EventGUID)
require.Nil(t, withdrawal.FinalizedL1EventGUID)
require.Equal(t, processor.EthAddress, withdrawal.TokenPair.L1TokenAddress)
require.Equal(t, processor.EthAddress, withdrawal.TokenPair.L2TokenAddress)
require.Equal(t, big.NewInt(params.Ether), withdrawal.Tx.Amount.Int)
require.Equal(t, aliceAddr, withdrawal.Tx.FromAddress)
require.Equal(t, aliceAddr, withdrawal.Tx.ToAddress)
require.Equal(t, byte(1), withdrawal.Tx.Data[0])
// (2) Test Withdrawal Proven
// prove & wait for processor catchup
withdrawParams, proveReceipt := op_e2e.ProveWithdrawal(t, *testSuite.OpCfg, l1Client, testSuite.OpSys.Nodes["sequencer"], testSuite.OpCfg.Secrets.Alice, withdrawalReceipt)
require.NoError(t, utils.WaitFor(testCtx, 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.L1Processor.LatestProcessedHeader()
return l1Header != nil && l1Header.Number.Uint64() >= proveReceipt.BlockNumber.Uint64(), nil
}))
aliceWithdrawals, err = testSuite.DB.Bridge.WithdrawalsByAddress(aliceAddr)
require.NoError(t, err)
require.Empty(t, aliceWithdrawals[0].FinalizedL1TransactionHash)
require.Equal(t, proveReceipt.TxHash, aliceWithdrawals[0].ProvenL1TransactionHash)
// (3) Test Withdrawal Finalization
// finalize & wait for processor catchup
finalizeReceipt := op_e2e.FinalizeWithdrawal(t, *testSuite.OpCfg, l1Client, testSuite.OpCfg.Secrets.Alice, withdrawalReceipt, withdrawParams)
require.NoError(t, utils.WaitFor(testCtx, 500*time.Millisecond, func() (bool, error) {
l1Header := testSuite.Indexer.L1Processor.LatestProcessedHeader()
return l1Header != nil && l1Header.Number.Uint64() >= finalizeReceipt.BlockNumber.Uint64(), nil
}))
aliceWithdrawals, err = testSuite.DB.Bridge.WithdrawalsByAddress(aliceAddr)
require.NoError(t, err)
require.Equal(t, finalizeReceipt.TxHash, aliceWithdrawals[0].FinalizedL1TransactionHash)
})
}
package e2e_tests
import (
"context"
"database/sql"
"fmt"
"io/fs"
"os"
"path/filepath"
"testing"
"time"
"github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
op_e2e "github.com/ethereum-optimism/optimism/op-e2e"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/log"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/stretchr/testify/require"
)
type E2ETestSuite struct {
t *testing.T
// Indexer
DB *database.DB
Indexer *indexer.Indexer
// Rollup
OpCfg *op_e2e.SystemConfig
OpSys *op_e2e.System
}
func createE2ETestSuite(t *testing.T) E2ETestSuite {
dbUser := os.Getenv("DB_USER")
dbName := setupTestDatabase(t)
// Replace the handler of the global logger with the testlog
logger := testlog.Logger(t, log.LvlInfo)
log.Root().SetHandler(logger.GetHandler())
// Rollup System Configuration and Start
opCfg := op_e2e.DefaultSystemConfig(t)
opCfg.DeployConfig.FinalizationPeriodSeconds = 2
opSys, err := opCfg.Start()
require.NoError(t, err)
// Indexer Configuration and Start
indexerCfg := config.Config{
DB: config.DBConfig{
Host: "127.0.0.1",
Port: 5432,
Name: dbName,
User: dbUser,
},
RPCs: config.RPCsConfig{
L1RPC: opSys.Nodes["l1"].HTTPEndpoint(),
L2RPC: opSys.Nodes["sequencer"].HTTPEndpoint(),
},
Logger: logger,
}
db, err := database.NewDB(fmt.Sprintf("postgres://%s@localhost:5432/%s?sslmode=disable", dbUser, dbName))
require.NoError(t, err)
indexer, err := indexer.NewIndexer(indexerCfg)
require.NoError(t, err)
indexerCtx, indexerStop := context.WithCancel(context.Background())
go func() {
err := indexer.Run(indexerCtx)
require.NoError(t, err)
indexer.Cleanup()
}()
t.Cleanup(func() {
indexerStop()
// wait a second for the stop signal to be received
time.Sleep(1 * time.Second)
indexer.Cleanup()
db.Close()
opSys.Close()
})
return E2ETestSuite{
t: t,
DB: db,
Indexer: indexer,
OpCfg: &opCfg,
OpSys: opSys,
}
}
func setupTestDatabase(t *testing.T) string {
user := os.Getenv("DB_USER")
pg, err := sql.Open("pgx", fmt.Sprintf("postgres://%s@localhost:5432?sslmode=disable", user))
require.NoError(t, err)
require.NoError(t, pg.Ping())
// create database
dbName := fmt.Sprintf("indexer_test_%d", time.Now().UnixNano())
_, err = pg.Exec("CREATE DATABASE " + dbName)
require.NoError(t, err)
t.Cleanup(func() {
_, err := pg.Exec("DROP DATABASE " + dbName)
require.NoError(t, err)
pg.Close()
})
// setup schema, migration files ware walked in lexical order
t.Logf("created database %s", dbName)
db, err := sql.Open("pgx", fmt.Sprintf("postgres://%s@localhost:5432/%s?sslmode=disable", user, dbName))
require.NoError(t, err)
require.NoError(t, db.Ping())
defer db.Close()
t.Logf("running schema migrations...")
require.NoError(t, filepath.Walk("../migrations", func(path string, info fs.FileInfo, err error) error {
if err != nil {
return err
} else if info.IsDir() {
return nil
}
t.Logf("running schema migration: %s", path)
data, err := os.ReadFile(path)
if err != nil {
return err
}
_, err = db.Exec(string(data))
return err
}))
t.Logf("schema loaded")
return dbName
}
package indexer package indexer
import ( import (
"context"
"fmt" "fmt"
"os" "sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/flags"
"github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/indexer/processor" "github.com/ethereum-optimism/optimism/indexer/processor"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli"
) )
// Main is the entrypoint into the indexer service. This method returns // Indexer contains the necessary resources for
// a closure that executes the service and blocks until the service exits. The // indexing the configured L1 and L2 chains
// use of a closure allows the parameters bound to the top-level main package,
// e.g. GitVersion, to be captured and used once the function is executed.
func Main(gitVersion string) func(ctx *cli.Context) error {
return func(ctx *cli.Context) error {
log.Info("initializing indexer")
indexer, err := NewIndexer(ctx)
if err != nil {
log.Error("unable to initialize indexer", "err", err)
return err
}
log.Info("starting indexer")
if err := indexer.Start(); err != nil {
log.Error("unable to start indexer", "err", err)
}
defer indexer.Stop()
log.Info("indexer started")
// Never terminate
<-(chan struct{})(nil)
return nil
}
}
// Indexer is a service that configures the necessary resources for
// running the Sync and BlockHandler sub-services.
type Indexer struct { type Indexer struct {
db *database.DB db *database.DB
log log.Logger
l1Processor *processor.L1Processor L1Processor *processor.L1Processor
l2Processor *processor.L2Processor L2Processor *processor.L2Processor
} }
// NewIndexer initializes the Indexer, gathering any resources // NewIndexer initializes an instance of the Indexer
// that will be needed by the TxIndexer and StateIndexer func NewIndexer(cfg config.Config) (*Indexer, error) {
// sub-services. dsn := fmt.Sprintf("host=%s port=%d dbname=%s sslmode=disable", cfg.DB.Host, cfg.DB.Port, cfg.DB.Name)
func NewIndexer(ctx *cli.Context) (*Indexer, error) { if cfg.DB.User != "" {
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data dsn += fmt.Sprintf(" user=%s", cfg.DB.User)
// do json format too }
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data if cfg.DB.Password != "" {
dsn += fmt.Sprintf(" password=%s", cfg.DB.Password)
logLevel, err := log.LvlFromString(ctx.GlobalString(flags.LogLevelFlag.Name))
if err != nil {
return nil, err
} }
logHandler := log.StreamHandler(os.Stdout, log.TerminalFormat(true))
log.Root().SetHandler(log.LvlFilterHandler(logLevel, logHandler))
dsn := fmt.Sprintf("database=%s", ctx.GlobalString(flags.DBNameFlag.Name))
db, err := database.NewDB(dsn) db, err := database.NewDB(dsn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// L1 Processor (hardhat devnet contracts). Make this configurable // L1 Processor (hardhat devnet contracts). Make this configurable
l1Contracts := processor.L1Contracts{ l1Contracts := processor.DevL1Contracts()
OptimismPortal: common.HexToAddress("0x6900000000000000000000000000000000000000"), l1EthClient, err := node.DialEthClient(cfg.RPCs.L1RPC)
L2OutputOracle: common.HexToAddress("0x6900000000000000000000000000000000000001"),
L1CrossDomainMessenger: common.HexToAddress("0x6900000000000000000000000000000000000002"),
L1StandardBridge: common.HexToAddress("0x6900000000000000000000000000000000000003"),
L1ERC721Bridge: common.HexToAddress("0x6900000000000000000000000000000000000004"),
}
l1EthClient, err := node.NewEthClient(ctx.GlobalString(flags.L1EthRPCFlag.Name))
if err != nil { if err != nil {
return nil, err return nil, err
} }
l1Processor, err := processor.NewL1Processor(l1EthClient, db, l1Contracts) l1Processor, err := processor.NewL1Processor(cfg.Logger, l1EthClient, db, l1Contracts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// L2Processor // L2Processor (predeploys). Although most likely the right setting, make this configurable?
l2Contracts := processor.L2ContractPredeploys() // Make this configurable l2Contracts := processor.L2ContractPredeploys()
l2EthClient, err := node.NewEthClient(ctx.GlobalString(flags.L2EthRPCFlag.Name)) l2EthClient, err := node.DialEthClient(cfg.RPCs.L2RPC)
if err != nil { if err != nil {
return nil, err return nil, err
} }
l2Processor, err := processor.NewL2Processor(l2EthClient, db, l2Contracts) l2Processor, err := processor.NewL2Processor(cfg.Logger, l2EthClient, db, l2Contracts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
indexer := &Indexer{ indexer := &Indexer{
db: db, db: db,
l1Processor: l1Processor, log: cfg.Logger,
l2Processor: l2Processor, L1Processor: l1Processor,
L2Processor: l2Processor,
} }
return indexer, nil return indexer, nil
} }
// Serve spins up a REST API server at the given hostname and port. // Start starts the indexing service on L1 and L2 chains
func (b *Indexer) Serve() error { func (i *Indexer) Run(ctx context.Context) error {
return nil var wg sync.WaitGroup
} errCh := make(chan error)
// If either processor errors out, we stop
processorCtx, cancel := context.WithCancel(ctx)
run := func(start func(ctx context.Context) error) {
wg.Add(1)
defer wg.Done()
err := start(processorCtx)
if err != nil {
i.log.Error("halting indexer on error", "err", err)
cancel()
errCh <- err
}
}
// Start starts the starts the indexing service on L1 and L2 chains and also // Kick off the processors
// starts the REST server. go run(i.L1Processor.Start)
func (b *Indexer) Start() error { go run(i.L2Processor.Start)
go b.l1Processor.Start() err := <-errCh
go b.l2Processor.Start()
return nil // ensure both processors have halted before returning
wg.Wait()
return err
} }
// Stop stops the indexing service on L1 and L2 chains. // Cleanup releases any resources that might be currently held by the indexer
func (b *Indexer) Stop() { func (i *Indexer) Cleanup() {
i.db.Close()
} }
...@@ -56,6 +56,8 @@ CREATE TABLE IF NOT EXISTS legacy_state_batches ( ...@@ -56,6 +56,8 @@ CREATE TABLE IF NOT EXISTS legacy_state_batches (
CREATE TABLE IF NOT EXISTS output_proposals ( CREATE TABLE IF NOT EXISTS output_proposals (
output_root VARCHAR NOT NULL PRIMARY KEY, output_root VARCHAR NOT NULL PRIMARY KEY,
l2_output_index UINT256,
l2_block_number UINT256, l2_block_number UINT256,
l1_contract_event_guid VARCHAR REFERENCES l1_contract_events(guid) l1_contract_event_guid VARCHAR REFERENCES l1_contract_events(guid)
......
...@@ -3,6 +3,7 @@ package node ...@@ -3,6 +3,7 @@ package node
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"math/big" "math/big"
"time" "time"
...@@ -29,6 +30,8 @@ type EthClient interface { ...@@ -29,6 +30,8 @@ type EthClient interface {
BlockHeadersByRange(*big.Int, *big.Int) ([]*types.Header, error) BlockHeadersByRange(*big.Int, *big.Int) ([]*types.Header, error)
BlockHeaderByHash(common.Hash) (*types.Header, error) BlockHeaderByHash(common.Hash) (*types.Header, error)
StorageHash(common.Address, *big.Int) (common.Hash, error)
RawRpcClient() *rpc.Client RawRpcClient() *rpc.Client
} }
...@@ -36,7 +39,7 @@ type client struct { ...@@ -36,7 +39,7 @@ type client struct {
rpcClient *rpc.Client rpcClient *rpc.Client
} }
func NewEthClient(rpcUrl string) (EthClient, error) { func DialEthClient(rpcUrl string) (EthClient, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultDialTimeout) ctxwt, cancel := context.WithTimeout(context.Background(), defaultDialTimeout)
defer cancel() defer cancel()
...@@ -49,6 +52,10 @@ func NewEthClient(rpcUrl string) (EthClient, error) { ...@@ -49,6 +52,10 @@ func NewEthClient(rpcUrl string) (EthClient, error) {
return client, nil return client, nil
} }
func NewEthClient(rpcClient *rpc.Client) EthClient {
return &client{rpcClient}
}
func (c *client) RawRpcClient() *rpc.Client { func (c *client) RawRpcClient() *rpc.Client {
return c.rpcClient return c.rpcClient
} }
...@@ -136,15 +143,33 @@ func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]*types. ...@@ -136,15 +143,33 @@ func (c *client) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]*types.
return headers, nil return headers, nil
} }
// StorageHash returns the sha3 of the storage root for the specified account
func (c *client) StorageHash(address common.Address, blockNumber *big.Int) (common.Hash, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
proof := struct{ StorageHash common.Hash }{}
err := c.rpcClient.CallContext(ctxwt, &proof, "eth_getProof", address, nil, toBlockNumArg(blockNumber))
if err != nil {
return common.Hash{}, err
}
return proof.StorageHash, nil
}
func toBlockNumArg(number *big.Int) string { func toBlockNumArg(number *big.Int) string {
if number == nil { if number == nil {
return "latest" return "latest"
} else if number.Sign() >= 0 {
return hexutil.EncodeBig(number)
} }
pending := big.NewInt(-1) // It's negative.
if number.Cmp(pending) == 0 { if number.IsInt64() {
return "pending" tag, _ := rpc.BlockNumber(number.Int64()).MarshalText()
return string(tag)
} }
return hexutil.EncodeBig(number) // It's negative and large, which is invalid.
return fmt.Sprintf("<invalid %d>", number)
} }
...@@ -31,6 +31,11 @@ func (m *MockEthClient) BlockHeaderByHash(hash common.Hash) (*types.Header, erro ...@@ -31,6 +31,11 @@ func (m *MockEthClient) BlockHeaderByHash(hash common.Hash) (*types.Header, erro
return args.Get(0).(*types.Header), args.Error(1) return args.Get(0).(*types.Header), args.Error(1)
} }
func (m *MockEthClient) StorageHash(address common.Address, blockNumber *big.Int) (common.Hash, error) {
args := m.Called(address, blockNumber)
return args.Get(0).(common.Hash), args.Error(1)
}
func (m *MockEthClient) RawRpcClient() *rpc.Client { func (m *MockEthClient) RawRpcClient() *rpc.Client {
args := m.Called() args := m.Called()
return args.Get(0).(*rpc.Client) return args.Get(0).(*rpc.Client)
......
...@@ -4,8 +4,8 @@ import ( ...@@ -4,8 +4,8 @@ import (
"math/big" "math/big"
"testing" "testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
...@@ -43,8 +43,8 @@ func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) { ...@@ -43,8 +43,8 @@ func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) {
// no new headers when matched with head // no new headers when matched with head
client.On("FinalizedBlockHeight").Return(big.NewInt(10), nil) client.On("FinalizedBlockHeight").Return(big.NewInt(10), nil)
headers, err := headerTraversal.NextFinalizedHeaders(100) headers, err := headerTraversal.NextFinalizedHeaders(100)
assert.NoError(t, err) require.NoError(t, err)
assert.Empty(t, headers) require.Empty(t, headers)
} }
func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) { func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
...@@ -58,16 +58,16 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) { ...@@ -58,16 +58,16 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5) headers, err := headerTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err) require.NoError(t, err)
assert.Len(t, headers, 5) require.Len(t, headers, 5)
// blocks [5..9] // blocks [5..9]
headers = makeHeaders(5, headers[len(headers)-1]) headers = makeHeaders(5, headers[len(headers)-1])
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil) client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5) headers, err = headerTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err) require.NoError(t, err)
assert.Len(t, headers, 5) require.Len(t, headers, 5)
} }
func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) { func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
...@@ -83,15 +83,15 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) { ...@@ -83,15 +83,15 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5) headers, err := headerTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err) require.NoError(t, err)
assert.Len(t, headers, 5) require.Len(t, headers, 5)
// clamped by the supplied size. FinalizedHeight == 100 // clamped by the supplied size. FinalizedHeight == 100
headers = makeHeaders(10, headers[len(headers)-1]) headers = makeHeaders(10, headers[len(headers)-1])
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(14))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(14))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(10) headers, err = headerTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err) require.NoError(t, err)
assert.Len(t, headers, 10) require.Len(t, headers, 10)
} }
func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) { func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
...@@ -105,14 +105,14 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) { ...@@ -105,14 +105,14 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next client.On("FinalizedBlockHeight").Return(big.NewInt(4), nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5) headers, err := headerTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err) require.NoError(t, err)
assert.Len(t, headers, 5) require.Len(t, headers, 5)
// blocks [5..9]. Next batch is not chained correctly (starts again from genesis) // blocks [5..9]. Next batch is not chained correctly (starts again from genesis)
headers = makeHeaders(5, nil) headers = makeHeaders(5, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil) client.On("FinalizedBlockHeight").Return(big.NewInt(9), nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(5)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5) headers, err = headerTraversal.NextFinalizedHeaders(5)
assert.Nil(t, headers) require.Nil(t, headers)
assert.Equal(t, ErrHeaderTraversalAndProviderMismatchedState, err) require.Equal(t, ErrHeaderTraversalAndProviderMismatchedState, err)
} }
...@@ -2,9 +2,7 @@ package processor ...@@ -2,9 +2,7 @@ package processor
import ( import (
"context" "context"
"encoding/hex"
"errors" "errors"
"math/big"
"reflect" "reflect"
"github.com/google/uuid" "github.com/google/uuid"
...@@ -36,12 +34,17 @@ type L1Contracts struct { ...@@ -36,12 +34,17 @@ type L1Contracts struct {
// Remove afterwards? // Remove afterwards?
} }
type checkpointAbi struct { func DevL1Contracts() L1Contracts {
l2OutputOracle *abi.ABI return L1Contracts{
legacyStateCommitmentChain *abi.ABI OptimismPortal: common.HexToAddress("0x6900000000000000000000000000000000000000"),
L2OutputOracle: common.HexToAddress("0x6900000000000000000000000000000000000001"),
L1CrossDomainMessenger: common.HexToAddress("0x6900000000000000000000000000000000000002"),
L1StandardBridge: common.HexToAddress("0x6900000000000000000000000000000000000003"),
L1ERC721Bridge: common.HexToAddress("0x6900000000000000000000000000000000000004"),
}
} }
func (c L1Contracts) toSlice() []common.Address { func (c L1Contracts) ToSlice() []common.Address {
fields := reflect.VisibleFields(reflect.TypeOf(c)) fields := reflect.VisibleFields(reflect.TypeOf(c))
v := reflect.ValueOf(c) v := reflect.ValueOf(c)
...@@ -53,12 +56,17 @@ func (c L1Contracts) toSlice() []common.Address { ...@@ -53,12 +56,17 @@ func (c L1Contracts) toSlice() []common.Address {
return contracts return contracts
} }
type checkpointAbi struct {
l2OutputOracle *abi.ABI
legacyStateCommitmentChain *abi.ABI
}
type L1Processor struct { type L1Processor struct {
processor processor
} }
func NewL1Processor(ethClient node.EthClient, db *database.DB, l1Contracts L1Contracts) (*L1Processor, error) { func NewL1Processor(logger log.Logger, ethClient node.EthClient, db *database.DB, l1Contracts L1Contracts) (*L1Processor, error) {
l1ProcessLog := log.New("processor", "l1") l1ProcessLog := logger.New("processor", "l1")
l1ProcessLog.Info("initializing processor") l1ProcessLog.Info("initializing processor")
l2OutputOracleABI, err := bindings.L2OutputOracleMetaData.GetAbi() l2OutputOracleABI, err := bindings.L2OutputOracleMetaData.GetAbi()
...@@ -109,14 +117,16 @@ func NewL1Processor(ethClient node.EthClient, db *database.DB, l1Contracts L1Con ...@@ -109,14 +117,16 @@ func NewL1Processor(ethClient node.EthClient, db *database.DB, l1Contracts L1Con
func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1Contracts, checkpointAbi checkpointAbi) ProcessFn { func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1Contracts, checkpointAbi checkpointAbi) ProcessFn {
rawEthClient := ethclient.NewClient(ethClient.RawRpcClient()) rawEthClient := ethclient.NewClient(ethClient.RawRpcClient())
contractAddrs := l1Contracts.toSlice() contractAddrs := l1Contracts.ToSlice()
processLog.Info("processor configured with contracts", "contracts", l1Contracts) processLog.Info("processor configured with contracts", "contracts", l1Contracts)
outputProposedEventSig := checkpointAbi.l2OutputOracle.Events["OutputProposed"].ID outputProposedEventName := "OutputProposed"
legacyStateBatchAppendedEventSig := checkpointAbi.legacyStateCommitmentChain.Events["StateBatchAppended"].ID outputProposedEventSig := checkpointAbi.l2OutputOracle.Events[outputProposedEventName].ID
legacyStateBatchAppendedEventName := "StateBatchAppended"
legacyStateBatchAppendedEventSig := checkpointAbi.legacyStateCommitmentChain.Events[legacyStateBatchAppendedEventName].ID
return func(db *database.DB, headers []*types.Header) error { return func(db *database.DB, headers []*types.Header) error {
numHeaders := len(headers)
headerMap := make(map[common.Hash]*types.Header) headerMap := make(map[common.Hash]*types.Header)
for _, header := range headers { for _, header := range headers {
headerMap[header.Hash()] = header headerMap[header.Hash()] = header
...@@ -124,7 +134,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -124,7 +134,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
/** Watch for all Optimism Contract Events **/ /** Watch for all Optimism Contract Events **/
logFilter := ethereum.FilterQuery{FromBlock: headers[0].Number, ToBlock: headers[numHeaders-1].Number, Addresses: contractAddrs} logFilter := ethereum.FilterQuery{FromBlock: headers[0].Number, ToBlock: headers[len(headers)-1].Number, Addresses: contractAddrs}
logs, err := rawEthClient.FilterLogs(context.Background(), logFilter) // []types.Log logs, err := rawEthClient.FilterLogs(context.Background(), logFilter) // []types.Log
if err != nil { if err != nil {
return err return err
...@@ -138,41 +148,43 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -138,41 +148,43 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
l1ContractEvents := make([]*database.L1ContractEvent, len(logs)) l1ContractEvents := make([]*database.L1ContractEvent, len(logs))
processedContractEvents := NewProcessedContractEvents() processedContractEvents := NewProcessedContractEvents()
for i, log := range logs { for i := range logs {
log := &logs[i]
header, ok := headerMap[log.BlockHash] header, ok := headerMap[log.BlockHash]
if !ok { if !ok {
processLog.Error("contract event found with associated header not in the batch", "header", log.BlockHash, "log_index", log.Index) processLog.Error("contract event found with associated header not in the batch", "header", log.BlockHash, "log_index", log.Index)
return errors.New("parsed log with a block hash not in this batch") return errors.New("parsed log with a block hash not in this batch")
} }
contractEvent := processedContractEvents.AddLog(&logs[i], header.Time) contractEvent := processedContractEvents.AddLog(log, header.Time)
l1HeadersOfInterest[log.BlockHash] = true l1HeadersOfInterest[log.BlockHash] = true
l1ContractEvents[i] = &database.L1ContractEvent{ContractEvent: *contractEvent} l1ContractEvents[i] = &database.L1ContractEvent{ContractEvent: *contractEvent}
// Track Checkpoint Events for L2 // Track Checkpoint Events for L2
switch contractEvent.EventSignature { switch contractEvent.EventSignature {
case outputProposedEventSig: case outputProposedEventSig:
if len(log.Topics) != 4 { var outputProposed bindings.L2OutputOracleOutputProposed
processLog.Error("parsed unexpected number of L2OutputOracle#OutputProposed log topics", "log_topics", log.Topics) err := UnpackLog(&outputProposed, log, outputProposedEventName, checkpointAbi.l2OutputOracle)
return errors.New("parsed unexpected OutputProposed event") if err != nil {
return err
} }
outputProposals = append(outputProposals, &database.OutputProposal{ outputProposals = append(outputProposals, &database.OutputProposal{
OutputRoot: log.Topics[1], OutputRoot: outputProposed.OutputRoot,
L2BlockNumber: database.U256{Int: new(big.Int).SetBytes(log.Topics[2].Bytes())}, L2OutputIndex: database.U256{Int: outputProposed.L2OutputIndex},
L2BlockNumber: database.U256{Int: outputProposed.L2BlockNumber},
L1ContractEventGUID: contractEvent.GUID, L1ContractEventGUID: contractEvent.GUID,
}) })
case legacyStateBatchAppendedEventSig: case legacyStateBatchAppendedEventSig:
var stateBatchAppended legacy_bindings.StateCommitmentChainStateBatchAppended var stateBatchAppended legacy_bindings.StateCommitmentChainStateBatchAppended
err := checkpointAbi.l2OutputOracle.UnpackIntoInterface(&stateBatchAppended, "StateBatchAppended", log.Data) err := UnpackLog(&stateBatchAppended, log, legacyStateBatchAppendedEventName, checkpointAbi.legacyStateCommitmentChain)
if err != nil || len(log.Topics) != 2 { if err != nil {
processLog.Error("unexpected StateCommitmentChain#StateBatchAppended log data or log topics", "log_topics", log.Topics, "log_data", hex.EncodeToString(log.Data), "err", err)
return err return err
} }
legacyStateBatches = append(legacyStateBatches, &database.LegacyStateBatch{ legacyStateBatches = append(legacyStateBatches, &database.LegacyStateBatch{
Index: new(big.Int).SetBytes(log.Topics[1].Bytes()).Uint64(), Index: stateBatchAppended.BatchIndex.Uint64(),
Root: stateBatchAppended.BatchRoot, Root: stateBatchAppended.BatchRoot,
Size: stateBatchAppended.BatchSize.Uint64(), Size: stateBatchAppended.BatchSize.Uint64(),
PrevTotal: stateBatchAppended.PrevTotalElements.Uint64(), PrevTotal: stateBatchAppended.PrevTotalElements.Uint64(),
...@@ -199,7 +211,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -199,7 +211,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
numIndexedL1Headers := len(indexedL1Headers) numIndexedL1Headers := len(indexedL1Headers)
if numIndexedL1Headers > 0 { if numIndexedL1Headers > 0 {
processLog.Info("saving l1 blocks with optimism logs", "size", numIndexedL1Headers, "batch_size", numHeaders) processLog.Info("saving l1 blocks with optimism logs", "size", numIndexedL1Headers, "batch_size", len(headers))
err = db.Blocks.StoreL1BlockHeaders(indexedL1Headers) err = db.Blocks.StoreL1BlockHeaders(indexedL1Headers)
if err != nil { if err != nil {
return err return err
...@@ -296,15 +308,16 @@ func l1BridgeProcessContractEvents(processLog log.Logger, db *database.DB, ethCl ...@@ -296,15 +308,16 @@ func l1BridgeProcessContractEvents(processLog log.Logger, db *database.DB, ethCl
// Check if the L2Processor is behind or really has missed an event. We can compare against the // Check if the L2Processor is behind or really has missed an event. We can compare against the
// OptimismPortal#ProvenWithdrawal on-chain mapping relative to the latest indexed L2 height // OptimismPortal#ProvenWithdrawal on-chain mapping relative to the latest indexed L2 height
if withdrawal == nil { if withdrawal == nil {
bridgeAddress := l1Contracts.L1StandardBridge
portalAddress := l1Contracts.OptimismPortal // This needs to be updated to read from config as well as correctly identify if the CrossDomainMessenger message is a standard
if provenWithdrawalEvent.From != bridgeAddress || provenWithdrawalEvent.To != bridgeAddress { // bridge message. This will easier to do once we index passed messages separately which will include the right To/From fields
if provenWithdrawalEvent.From != common.HexToAddress("0x4200000000000000000000000000000000000007") || provenWithdrawalEvent.To != l1Contracts.L1CrossDomainMessenger {
// non-bridge withdrawal // non-bridge withdrawal
continue continue
} }
// Query for the the proven withdrawal on-chain // Query for the the proven withdrawal on-chain
provenWithdrawal, err := OptimismPortalQueryProvenWithdrawal(rawEthClient, portalAddress, withdrawalHash) provenWithdrawal, err := OptimismPortalQueryProvenWithdrawal(rawEthClient, l1Contracts.OptimismPortal, withdrawalHash)
if err != nil { if err != nil {
return err return err
} }
...@@ -349,8 +362,8 @@ func l1BridgeProcessContractEvents(processLog log.Logger, db *database.DB, ethCl ...@@ -349,8 +362,8 @@ func l1BridgeProcessContractEvents(processLog log.Logger, db *database.DB, ethCl
return err return err
} }
// Since we have to prove the event on-chain first, we don't need to check if the processor is // Since we have to prove the event on-chain first, we don't need to check if the processor is behind
// behind. we're definitely in an error state if we cannot find the withdrawal when parsing this even // We're definitely in an error state if we cannot find the withdrawal when parsing this event
if withdrawal == nil { if withdrawal == nil {
processLog.Crit("missing indexed withdrawal for this finalization event") processLog.Crit("missing indexed withdrawal for this finalization event")
return errors.New("missing withdrawal message") return errors.New("missing withdrawal message")
......
...@@ -39,7 +39,7 @@ func L2ContractPredeploys() L2Contracts { ...@@ -39,7 +39,7 @@ func L2ContractPredeploys() L2Contracts {
} }
} }
func (c L2Contracts) toSlice() []common.Address { func (c L2Contracts) ToSlice() []common.Address {
fields := reflect.VisibleFields(reflect.TypeOf(c)) fields := reflect.VisibleFields(reflect.TypeOf(c))
v := reflect.ValueOf(c) v := reflect.ValueOf(c)
...@@ -55,8 +55,8 @@ type L2Processor struct { ...@@ -55,8 +55,8 @@ type L2Processor struct {
processor processor
} }
func NewL2Processor(ethClient node.EthClient, db *database.DB, l2Contracts L2Contracts) (*L2Processor, error) { func NewL2Processor(logger log.Logger, ethClient node.EthClient, db *database.DB, l2Contracts L2Contracts) (*L2Processor, error) {
l2ProcessLog := log.New("processor", "l2") l2ProcessLog := logger.New("processor", "l2")
l2ProcessLog.Info("initializing processor") l2ProcessLog.Info("initializing processor")
latestHeader, err := db.Blocks.LatestL2BlockHeader() latestHeader, err := db.Blocks.LatestL2BlockHeader()
...@@ -94,7 +94,7 @@ func NewL2Processor(ethClient node.EthClient, db *database.DB, l2Contracts L2Con ...@@ -94,7 +94,7 @@ func NewL2Processor(ethClient node.EthClient, db *database.DB, l2Contracts L2Con
func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2Contracts) ProcessFn { func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2Contracts) ProcessFn {
rawEthClient := ethclient.NewClient(ethClient.RawRpcClient()) rawEthClient := ethclient.NewClient(ethClient.RawRpcClient())
contractAddrs := l2Contracts.toSlice() contractAddrs := l2Contracts.ToSlice()
processLog.Info("processor configured with contracts", "contracts", l2Contracts) processLog.Info("processor configured with contracts", "contracts", l2Contracts)
return func(db *database.DB, headers []*types.Header) error { return func(db *database.DB, headers []*types.Header) error {
numHeaders := len(headers) numHeaders := len(headers)
...@@ -127,14 +127,15 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2 ...@@ -127,14 +127,15 @@ func l2ProcessFn(processLog log.Logger, ethClient node.EthClient, l2Contracts L2
l2ContractEvents := make([]*database.L2ContractEvent, len(logs)) l2ContractEvents := make([]*database.L2ContractEvent, len(logs))
processedContractEvents := NewProcessedContractEvents() processedContractEvents := NewProcessedContractEvents()
for i, log := range logs { for i := range logs {
log := &logs[i]
header, ok := l2HeaderMap[log.BlockHash] header, ok := l2HeaderMap[log.BlockHash]
if !ok { if !ok {
processLog.Error("contract event found with associated header not in the batch", "header", header, "log_index", log.Index) processLog.Error("contract event found with associated header not in the batch", "header", header, "log_index", log.Index)
return errors.New("parsed log with a block hash not in this batch") return errors.New("parsed log with a block hash not in this batch")
} }
contractEvent := processedContractEvents.AddLog(&logs[i], header.Time) contractEvent := processedContractEvents.AddLog(log, header.Time)
l2ContractEvents[i] = &database.L2ContractEvent{ContractEvent: *contractEvent} l2ContractEvents[i] = &database.L2ContractEvent{ContractEvent: *contractEvent}
} }
......
...@@ -30,11 +30,20 @@ func OptimismPortalWithdrawalProvenEvents(events *ProcessedContractEvents) ([]Op ...@@ -30,11 +30,20 @@ func OptimismPortalWithdrawalProvenEvents(events *ProcessedContractEvents) ([]Op
return nil, err return nil, err
} }
processedWithdrawalProvenEvents := events.eventsBySignature[optimismPortalAbi.Events["WithdrawalProven"].ID] eventName := "WithdrawalProven"
processedWithdrawalProvenEvents := events.eventsBySignature[optimismPortalAbi.Events[eventName].ID]
provenEvents := make([]OptimismPortalWithdrawalProvenEvent, len(processedWithdrawalProvenEvents)) provenEvents := make([]OptimismPortalWithdrawalProvenEvent, len(processedWithdrawalProvenEvents))
for i, provenEvent := range processedWithdrawalProvenEvents { for i, provenEvent := range processedWithdrawalProvenEvents {
log := events.eventLog[provenEvent.GUID]
var withdrawalProven bindings.OptimismPortalWithdrawalProven
err := UnpackLog(&withdrawalProven, log, eventName, optimismPortalAbi)
if err != nil {
return nil, err
}
provenEvents[i] = OptimismPortalWithdrawalProvenEvent{nil, provenEvent} provenEvents[i] = OptimismPortalWithdrawalProvenEvent{&withdrawalProven, provenEvent}
} }
return provenEvents, nil return provenEvents, nil
......
package processor package processor
import ( import (
"context"
"time" "time"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
...@@ -25,46 +26,81 @@ type processor struct { ...@@ -25,46 +26,81 @@ type processor struct {
db *database.DB db *database.DB
processFn ProcessFn processFn ProcessFn
processLog log.Logger processLog log.Logger
paused bool
latestProcessedHeader *types.Header
} }
// Start kicks off the processing loop // Start kicks off the processing loop. This is a block operation
func (p processor) Start() { // unless the processor encountering an error, abrupting the loop,
// or the supplied context is cancelled.
func (p *processor) Start(ctx context.Context) error {
done := ctx.Done()
pollTicker := time.NewTicker(defaultLoopInterval) pollTicker := time.NewTicker(defaultLoopInterval)
defer pollTicker.Stop() defer pollTicker.Stop()
p.processLog.Info("starting processor...") p.processLog.Info("starting processor...")
var unprocessedHeaders []*types.Header var unprocessedHeaders []*types.Header
for range pollTicker.C { for {
if len(unprocessedHeaders) == 0 { select {
newHeaders, err := p.headerTraversal.NextFinalizedHeaders(defaultHeaderBufferSize) case <-done:
if err != nil { p.processLog.Info("stopping processor")
p.processLog.Error("error querying for headers", "err", err) return nil
continue
} else if len(newHeaders) == 0 { case <-pollTicker.C:
// Logged as an error since this loop should be operating at a longer interval than the provider if p.paused {
p.processLog.Error("no new headers. processor unexpectedly at head...") p.processLog.Warn("processor is paused...")
continue continue
} }
unprocessedHeaders = newHeaders if len(unprocessedHeaders) == 0 {
} else { newHeaders, err := p.headerTraversal.NextFinalizedHeaders(defaultHeaderBufferSize)
p.processLog.Info("retrying previous batch") if err != nil {
} p.processLog.Error("error querying for headers", "err", err)
continue
} else if len(newHeaders) == 0 {
// Logged as an error since this loop should be operating at a longer interval than the provider
p.processLog.Error("no new headers. processor unexpectedly at head...")
continue
}
unprocessedHeaders = newHeaders
} else {
p.processLog.Info("retrying previous batch")
}
firstHeader := unprocessedHeaders[0] firstHeader := unprocessedHeaders[0]
lastHeader := unprocessedHeaders[len(unprocessedHeaders)-1] lastHeader := unprocessedHeaders[len(unprocessedHeaders)-1]
batchLog := p.processLog.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number) batchLog := p.processLog.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
err := p.db.Transaction(func(db *database.DB) error { err := p.db.Transaction(func(db *database.DB) error {
batchLog.Info("processing batch") batchLog.Info("processing batch")
return p.processFn(db, unprocessedHeaders) return p.processFn(db, unprocessedHeaders)
}) })
if err != nil { // Eventually, we want to halt the processor on any error rather than rely
batchLog.Warn("error processing batch. no operations committed", "err", err) // on this loop for retry functionality.
} else { if err != nil {
batchLog.Info("fully committed batch") batchLog.Warn("error processing batch. no operations committed", "err", err)
unprocessedHeaders = nil } else {
batchLog.Info("fully committed batch")
unprocessedHeaders = nil
p.latestProcessedHeader = lastHeader
}
} }
} }
} }
func (p processor) LatestProcessedHeader() *types.Header {
return p.latestProcessedHeader
}
// Useful ONLY for tests!
func (p *processor) PauseForTest() {
p.paused = true
}
func (p *processor) ResumeForTest() {
p.paused = false
}
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"errors" "errors"
"fmt"
"math/big" "math/big"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
...@@ -14,7 +15,7 @@ import ( ...@@ -14,7 +15,7 @@ import (
) )
var ( var (
ethAddress = common.HexToAddress("0xDeadDeAddeAddEAddeadDEaDDEAdDeaDDeAD0000") EthAddress = common.HexToAddress("0xDeadDeAddeAddEAddeadDEaDDEAdDeaDDeAD0000")
) )
type StandardBridgeInitiatedEvent struct { type StandardBridgeInitiatedEvent struct {
...@@ -131,7 +132,7 @@ func _standardBridgeInitiatedEvents[BridgeEvent bindings.L1StandardBridgeETHBrid ...@@ -131,7 +132,7 @@ func _standardBridgeInitiatedEvents[BridgeEvent bindings.L1StandardBridgeETHBrid
// represent eth bridge as an erc20 // represent eth bridge as an erc20
erc20BridgeData = &bindings.L1StandardBridgeERC20BridgeInitiated{ erc20BridgeData = &bindings.L1StandardBridgeERC20BridgeInitiated{
// Represent ETH using the hardcoded address // Represent ETH using the hardcoded address
LocalToken: ethAddress, RemoteToken: ethAddress, LocalToken: EthAddress, RemoteToken: EthAddress,
// Bridge data // Bridge data
From: ethBridgeData.From, To: ethBridgeData.To, Amount: ethBridgeData.Amount, ExtraData: ethBridgeData.ExtraData, From: ethBridgeData.From, To: ethBridgeData.To, Amount: ethBridgeData.Amount, ExtraData: ethBridgeData.ExtraData,
} }
...@@ -170,8 +171,14 @@ func _standardBridgeFinalizedEvents[BridgeEvent bindings.L1StandardBridgeETHBrid ...@@ -170,8 +171,14 @@ func _standardBridgeFinalizedEvents[BridgeEvent bindings.L1StandardBridgeETHBrid
return nil, err return nil, err
} }
optimismPortalAbi, err := bindings.OptimismPortalMetaData.GetAbi()
if err != nil {
return nil, err
}
relayedMessageEventAbi := l1CrossDomainMessengerABI.Events["RelayedMessage"] relayedMessageEventAbi := l1CrossDomainMessengerABI.Events["RelayedMessage"]
relayMessageMethodAbi := l1CrossDomainMessengerABI.Methods["relayMessage"] relayMessageMethodAbi := l1CrossDomainMessengerABI.Methods["relayMessage"]
finalizeWithdrawalTransactionMethodAbi := optimismPortalAbi.Methods["finalizeWithdrawalTransaction"]
var bridgeData BridgeEvent var bridgeData BridgeEvent
var eventName string var eventName string
...@@ -201,27 +208,52 @@ func _standardBridgeFinalizedEvents[BridgeEvent bindings.L1StandardBridgeETHBrid ...@@ -201,27 +208,52 @@ func _standardBridgeFinalizedEvents[BridgeEvent bindings.L1StandardBridgeETHBrid
return nil, errors.New("unexpected bridge event ordering") return nil, errors.New("unexpected bridge event ordering")
} }
// There's no way to extract the nonce on the relayed message event. we can extract // There's no way to extract the nonce on the relayed message event. we can extract the nonce by
// the nonce by unpacking the transaction input for the `relayMessage` transaction // by unpacking the transaction input for the `relayMessage` transaction. Since bedrock has OptimismPortal
// as on L1 as an intermediary for finalization, we have to check both scenarios
tx, isPending, err := rawEthClient.TransactionByHash(context.Background(), relayedMsgLog.TxHash) tx, isPending, err := rawEthClient.TransactionByHash(context.Background(), relayedMsgLog.TxHash)
if err != nil || isPending { if err != nil || isPending {
return nil, errors.New("unable to query relayMessage tx for bridge finalization event") return nil, errors.New("unable to query relayMessage tx for bridge finalization event")
} }
txData := tx.Data() // If this is a finalization step with the optimism portal, the calldata for relayMessage invocation can be
if !bytes.Equal(txData[:4], relayMessageMethodAbi.ID) { // extracted from the withdrawal transaction.
return nil, errors.New("bridge finalization event does not match relayMessage tx invocation")
// NOTE: the L2CrossDomainMessenger nonce may not match the L2ToL1MessagePasser nonce, hence the additional
// layer of decoding vs reading the nocne of the withdrawal transaction. Both nonces have a similar but
// different lifeycle that might not match (i.e L2ToL1MessagePasser can be invoced directly)
var relayMsgCallData []byte
switch {
case bytes.Equal(tx.Data()[:4], relayMessageMethodAbi.ID):
relayMsgCallData = tx.Data()[4:]
case bytes.Equal(tx.Data()[:4], finalizeWithdrawalTransactionMethodAbi.ID):
data, err := finalizeWithdrawalTransactionMethodAbi.Inputs.Unpack(tx.Data()[4:])
if err != nil {
return nil, err
}
finalizeWithdrawTransactionInput := new(struct {
Tx bindings.TypesWithdrawalTransaction
})
err = finalizeWithdrawalTransactionMethodAbi.Inputs.Copy(finalizeWithdrawTransactionInput, data)
if err != nil {
return nil, fmt.Errorf("unable extract withdrawal tx input from finalizeWithdrawalTransaction calldata: %w", err)
} else if !bytes.Equal(finalizeWithdrawTransactionInput.Tx.Data[:4], relayMessageMethodAbi.ID) {
return nil, errors.New("finalizeWithdrawalTransaction calldata does not match relayMessage invocation")
}
relayMsgCallData = finalizeWithdrawTransactionInput.Tx.Data[4:]
default:
return nil, errors.New("bridge finalization event does not correlate with a relayMessage tx invocation")
} }
inputsMap := make(map[string]interface{}) inputsMap := make(map[string]interface{})
err = relayMessageMethodAbi.Inputs.UnpackIntoMap(inputsMap, txData[4:]) err = relayMessageMethodAbi.Inputs.UnpackIntoMap(inputsMap, relayMsgCallData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
nonce, ok := inputsMap["_nonce"].(*big.Int) nonce, ok := inputsMap["_nonce"].(*big.Int)
if !ok { if !ok {
return nil, errors.New("unable to extract `_nonce` parameter from relayMessage transaction") return nil, errors.New("unable to extract `_nonce` parameter from relayMessage calldata")
} }
var erc20BridgeData *bindings.L1StandardBridgeERC20BridgeFinalized var erc20BridgeData *bindings.L1StandardBridgeERC20BridgeFinalized
...@@ -230,7 +262,7 @@ func _standardBridgeFinalizedEvents[BridgeEvent bindings.L1StandardBridgeETHBrid ...@@ -230,7 +262,7 @@ func _standardBridgeFinalizedEvents[BridgeEvent bindings.L1StandardBridgeETHBrid
ethBridgeData := any(bridgeData).(bindings.L1StandardBridgeETHBridgeFinalized) ethBridgeData := any(bridgeData).(bindings.L1StandardBridgeETHBridgeFinalized)
erc20BridgeData = &bindings.L1StandardBridgeERC20BridgeFinalized{ erc20BridgeData = &bindings.L1StandardBridgeERC20BridgeFinalized{
// Represent ETH using the hardcoded address // Represent ETH using the hardcoded address
LocalToken: ethAddress, RemoteToken: ethAddress, LocalToken: EthAddress, RemoteToken: EthAddress,
// Bridge data // Bridge data
From: ethBridgeData.From, To: ethBridgeData.To, Amount: ethBridgeData.Amount, ExtraData: ethBridgeData.ExtraData, From: ethBridgeData.From, To: ethBridgeData.To, Amount: ethBridgeData.Amount, ExtraData: ethBridgeData.ExtraData,
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment