Commit 20b115fd authored by Will Cory's avatar Will Cory

chore(indexer): Nuke legacy indexer

parent 92df7779
docker-compose.dev.yml
.env
indexer
indexer-refresh
......@@ -8,11 +8,8 @@ LDFLAGS := -ldflags "$(LDFLAGSSTRING)"
indexer:
env GO111MODULE=on go build -v $(LDFLAGS) ./cmd/indexer
indexer-refresh:
env GO111MODULE=on go build -v $(LDFLAGS) ./cmd/indexer-refresh
clean:
rm indexer && rm indexer-refresh
rm indexer
test:
go test -v ./...
......@@ -22,7 +19,6 @@ lint:
.PHONY: \
indexer \
indexer-refresh \
bindings \
bindings-scc \
clean \
......
# indexer/cmd/indexer-refresh
Entrypoint for the new WIP indexer. After project is deployed and stable we will delete the [indexer/main.go](../indexer/main.go) file and move [indexer-refresh/main.go](./main.go) in it's place.
package main
import (
"os"
"github.com/ethereum-optimism/optimism/indexer/cli"
"github.com/ethereum/go-ethereum/log"
)
var (
GitVersion = ""
GitCommit = ""
GitDate = ""
)
func main() {
app := cli.NewCli(GitVersion, GitCommit, GitDate)
if err := app.Run(os.Args); err != nil {
log.Crit("Application failed", "message", err)
}
}
package main
import (
"fmt"
"os"
"github.com/ethereum-optimism/optimism/indexer/cli"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/indexer/flags"
"github.com/ethereum-optimism/optimism/indexer/legacy"
)
var (
......@@ -19,26 +14,9 @@ var (
)
func main() {
// Set up logger with a default INFO level in case we fail to parse flags.
// Otherwise the final crtiical log won't show what the parsing error was.
log.Root().SetHandler(
log.LvlFilterHandler(
log.LvlInfo,
log.StreamHandler(os.Stdout, log.TerminalFormat(true)),
),
)
app := cli.NewApp()
app.Flags = flags.Flags
app.Version = fmt.Sprintf("%s-%s", GitVersion, params.VersionWithCommit(GitCommit, GitDate))
app.Name = "indexer"
app.Usage = "Indexer Service"
app.Description = "Service for indexing deposits and withdrawals " +
"by account on L1 and L2"
app := cli.NewCli(GitVersion, GitCommit, GitDate)
app.Action = legacy.Main(GitVersion)
err := app.Run(os.Args)
if err != nil {
if err := app.Run(os.Args); err != nil {
log.Crit("Application failed", "message", err)
}
}
package indexer
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
)
// ParseAddress parses a ETH address from a hex string. This method will
// fail if the address is not a valid hexadecimal address.
func ParseAddress(address string) (common.Address, error) {
if common.IsHexAddress(address) {
return common.HexToAddress(address), nil
}
return common.Address{}, fmt.Errorf("invalid address: %v", address)
}
package indexer_test
import (
"bytes"
"errors"
"testing"
indexer "github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
// TestParseAddress asserts that ParseAddress correctly parses
// 40-characater hexadecimal strings with optional 0x prefix into valid 20-byte
// addresses
func TestParseAddress(t *testing.T) {
tests := []struct {
name string
addr string
expErr error
expAddr common.Address
}{
{
name: "empty address",
addr: "",
expErr: errors.New("invalid address: "),
},
{
name: "only 0x",
addr: "0x",
expErr: errors.New("invalid address: 0x"),
},
{
name: "non hex character",
addr: "0xaaaaaazaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
expErr: errors.New("invalid address: 0xaaaaaazaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
},
{
name: "valid address",
addr: "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
expErr: nil,
expAddr: common.BytesToAddress(bytes.Repeat([]byte{170}, 20)),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
addr, err := indexer.ParseAddress(test.addr)
require.Equal(t, err, test.expErr)
if test.expErr != nil {
return
}
require.Equal(t, addr, test.expAddr)
})
}
}
package db
type Airdrop struct {
Address string `json:"address"`
VoterAmount string `json:"voterAmount"`
MultisigSignerAmount string `json:"multisigSignerAmount"`
GitcoinAmount string `json:"gitcoinAmount"`
ActiveBridgedAmount string `json:"activeBridgedAmount"`
OpUserAmount string `json:"opUserAmount"`
OpRepeatUserAmount string `json:"opRepeatUserAmount"`
BonusAmount string `json:"bonusAmount"`
TotalAmount string `json:"totalAmount"`
}
package db
import (
"database/sql"
"errors"
"fmt"
"strings"
"github.com/ethereum/go-ethereum/common"
// NOTE: Only postgresql backend is supported at the moment.
_ "github.com/lib/pq"
)
// Database contains the database instance and the connection string.
type Database struct {
db *sql.DB
config string
}
// NewDatabase returns the database for the given connection string.
func NewDatabase(config string) (*Database, error) {
db, err := sql.Open("postgres", config)
if err != nil {
return nil, err
}
err = db.Ping()
if err != nil {
return nil, err
}
for _, migration := range schema {
_, err = db.Exec(migration)
if err != nil {
return nil, err
}
}
return &Database{
db: db,
config: config,
}, nil
}
// Close closes the database.
// NOTE: "It is rarely necessary to close a DB."
// See: https://pkg.go.dev/database/sql#Open
func (d *Database) Close() error {
return d.db.Close()
}
// Config returns the db connection string.
func (d *Database) Config() string {
return d.config
}
// GetL1TokenByAddress returns the ERC20 Token corresponding to the given
// address on L1.
func (d *Database) GetL1TokenByAddress(address string) (*Token, error) {
const selectL1TokenStatement = `
SELECT name, symbol, decimals FROM l1_tokens WHERE address = $1;
`
var token *Token
err := txn(d.db, func(tx *sql.Tx) error {
row := tx.QueryRow(selectL1TokenStatement, address)
if row.Err() != nil {
return row.Err()
}
var name string
var symbol string
var decimals uint8
err := row.Scan(&name, &symbol, &decimals)
if errors.Is(err, sql.ErrNoRows) {
return nil
}
if err != nil {
return err
}
token = &Token{
Name: name,
Symbol: symbol,
Decimals: decimals,
}
return nil
})
if err != nil {
return nil, err
}
return token, nil
}
// GetL2TokenByAddress returns the ERC20 Token corresponding to the given
// address on L2.
func (d *Database) GetL2TokenByAddress(address string) (*Token, error) {
const selectL2TokenStatement = `
SELECT name, symbol, decimals FROM l2_tokens WHERE address = $1;
`
var token *Token
err := txn(d.db, func(tx *sql.Tx) error {
row := tx.QueryRow(selectL2TokenStatement, address)
if row.Err() != nil {
return row.Err()
}
var name string
var symbol string
var decimals uint8
err := row.Scan(&name, &symbol, &decimals)
if errors.Is(err, sql.ErrNoRows) {
return nil
}
if err != nil {
return err
}
token = &Token{
Name: name,
Symbol: symbol,
Decimals: decimals,
}
return nil
})
if err != nil {
return nil, err
}
return token, nil
}
// AddL1Token inserts the Token details for the given address into the known L1
// tokens database.
// NOTE: a Token MUST have a unique address
func (d *Database) AddL1Token(address string, token *Token) error {
const insertTokenStatement = `
INSERT INTO l1_tokens
(address, name, symbol, decimals)
VALUES
($1, $2, $3, $4)
`
return txn(d.db, func(tx *sql.Tx) error {
_, err := tx.Exec(
insertTokenStatement,
address,
token.Name,
token.Symbol,
token.Decimals,
)
return err
})
}
// AddL2Token inserts the Token details for the given address into the known L2
// tokens database.
// NOTE: a Token MUST have a unique address
func (d *Database) AddL2Token(address string, token *Token) error {
const insertTokenStatement = `
INSERT INTO l2_tokens
(address, name, symbol, decimals)
VALUES
($1, $2, $3, $4)
`
return txn(d.db, func(tx *sql.Tx) error {
_, err := tx.Exec(
insertTokenStatement,
address,
token.Name,
token.Symbol,
token.Decimals,
)
return err
})
}
// AddIndexedL1Block inserts the indexed block i.e. the L1 block containing all
// scanned Deposits into the known deposits database.
// NOTE: the block hash MUST be unique
func (d *Database) AddIndexedL1Block(block *IndexedL1Block) error {
const insertBlockStatement = `
INSERT INTO l1_blocks
(hash, parent_hash, number, timestamp)
VALUES
($1, $2, $3, $4)
`
const insertDepositStatement = `
INSERT INTO deposits
(guid, from_address, to_address, l1_token, l2_token, amount, tx_hash, log_index, block_hash, data)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`
const updateProvenWithdrawalStatement = `
UPDATE withdrawals SET (br_withdrawal_proven_tx_hash, br_withdrawal_proven_log_index) = ($1, $2)
WHERE br_withdrawal_hash = $3
`
const updateFinalizedWithdrawalStatement = `
UPDATE withdrawals SET (br_withdrawal_finalized_tx_hash, br_withdrawal_finalized_log_index, br_withdrawal_finalized_success) = ($1, $2, $3)
WHERE br_withdrawal_hash = $4
`
return txn(d.db, func(tx *sql.Tx) error {
_, err := tx.Exec(
insertBlockStatement,
block.Hash.String(),
block.ParentHash.String(),
block.Number,
block.Timestamp,
)
if err != nil {
return err
}
if len(block.Deposits) > 0 {
for _, deposit := range block.Deposits {
_, err = tx.Exec(
insertDepositStatement,
NewGUID(),
deposit.FromAddress.String(),
deposit.ToAddress.String(),
deposit.L1Token.String(),
deposit.L2Token.String(),
deposit.Amount.String(),
deposit.TxHash.String(),
deposit.LogIndex,
block.Hash.String(),
deposit.Data,
)
if err != nil {
return err
}
}
}
if len(block.ProvenWithdrawals) > 0 {
for _, wd := range block.ProvenWithdrawals {
_, err = tx.Exec(
updateProvenWithdrawalStatement,
wd.TxHash.String(),
wd.LogIndex,
wd.WithdrawalHash.String(),
)
if err != nil {
return err
}
}
}
if len(block.FinalizedWithdrawals) > 0 {
for _, wd := range block.FinalizedWithdrawals {
_, err = tx.Exec(
updateFinalizedWithdrawalStatement,
wd.TxHash.String(),
wd.LogIndex,
wd.Success,
wd.WithdrawalHash.String(),
)
if err != nil {
return err
}
}
}
return nil
})
}
// AddIndexedL2Block inserts the indexed block i.e. the L2 block containing all
// scanned Withdrawals into the known withdrawals database.
// NOTE: the block hash MUST be unique
func (d *Database) AddIndexedL2Block(block *IndexedL2Block) error {
const insertBlockStatement = `
INSERT INTO l2_blocks
(hash, parent_hash, number, timestamp)
VALUES
($1, $2, $3, $4)
`
const insertWithdrawalStatement = `
INSERT INTO withdrawals
(guid, from_address, to_address, l1_token, l2_token, amount, tx_hash, log_index, block_hash, data, br_withdrawal_hash)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
`
return txn(d.db, func(tx *sql.Tx) error {
_, err := tx.Exec(
insertBlockStatement,
block.Hash.String(),
block.ParentHash.String(),
block.Number,
block.Timestamp,
)
if err != nil {
return err
}
if len(block.Withdrawals) == 0 {
return nil
}
for _, withdrawal := range block.Withdrawals {
_, err = tx.Exec(
insertWithdrawalStatement,
NewGUID(),
withdrawal.FromAddress.String(),
withdrawal.ToAddress.String(),
withdrawal.L1Token.String(),
withdrawal.L2Token.String(),
withdrawal.Amount.String(),
withdrawal.TxHash.String(),
withdrawal.LogIndex,
block.Hash.String(),
withdrawal.Data,
nullableHash(withdrawal.BedrockHash),
)
if err != nil {
return err
}
}
return nil
})
}
// AddStateBatch inserts the state batches into the known state batches
// database.
func (d *Database) AddStateBatch(batches []StateBatch) error {
const insertStateBatchStatement = `
INSERT INTO state_batches
(index, root, size, prev_total, extra_data, block_hash)
VALUES
($1, $2, $3, $4, $5, $6)
`
return txn(d.db, func(tx *sql.Tx) error {
for _, sb := range batches {
_, err := tx.Exec(
insertStateBatchStatement,
sb.Index.Uint64(),
sb.Root.String(),
sb.Size.Uint64(),
sb.PrevTotal.Uint64(),
sb.ExtraData,
sb.BlockHash.String(),
)
if err != nil {
return err
}
}
return nil
})
}
// GetDepositsByAddress returns the list of Deposits indexed for the given
// address paginated by the given params.
func (d *Database) GetDepositsByAddress(address common.Address, page PaginationParam) (*PaginatedDeposits, error) {
const selectDepositsStatement = `
SELECT
deposits.guid, deposits.from_address, deposits.to_address,
deposits.amount, deposits.tx_hash, deposits.data,
deposits.l1_token, deposits.l2_token,
l1_tokens.name, l1_tokens.symbol, l1_tokens.decimals,
l1_blocks.number, l1_blocks.timestamp
FROM deposits
INNER JOIN l1_blocks ON deposits.block_hash=l1_blocks.hash
INNER JOIN l1_tokens ON deposits.l1_token=l1_tokens.address
WHERE deposits.from_address = $1 ORDER BY l1_blocks.timestamp LIMIT $2 OFFSET $3;
`
var deposits []DepositJSON
err := txn(d.db, func(tx *sql.Tx) error {
rows, err := tx.Query(selectDepositsStatement, address.String(), page.Limit, page.Offset)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var deposit DepositJSON
var l1Token Token
if err := rows.Scan(
&deposit.GUID, &deposit.FromAddress, &deposit.ToAddress,
&deposit.Amount, &deposit.TxHash, &deposit.Data,
&l1Token.Address, &deposit.L2Token,
&l1Token.Name, &l1Token.Symbol, &l1Token.Decimals,
&deposit.BlockNumber, &deposit.BlockTimestamp,
); err != nil {
return err
}
deposit.L1Token = &l1Token
deposits = append(deposits, deposit)
}
return rows.Err()
})
if err != nil {
return nil, err
}
const selectDepositCountStatement = `
SELECT
count(*)
FROM deposits
INNER JOIN l1_blocks ON deposits.block_hash=l1_blocks.hash
INNER JOIN l1_tokens ON deposits.l1_token=l1_tokens.address
WHERE deposits.from_address = $1;
`
var count uint64
err = txn(d.db, func(tx *sql.Tx) error {
row := tx.QueryRow(selectDepositCountStatement, address.String())
if err != nil {
return err
}
return row.Scan(&count)
})
if err != nil {
return nil, err
}
page.Total = count
return &PaginatedDeposits{
&page,
deposits,
}, nil
}
// GetWithdrawalBatch returns the StateBatch corresponding to the given
// withdrawal transaction hash.
func (d *Database) GetWithdrawalBatch(hash common.Hash) (*StateBatchJSON, error) {
const selectWithdrawalBatchStatement = `
SELECT
state_batches.index, state_batches.root, state_batches.size, state_batches.prev_total, state_batches.extra_data, state_batches.block_hash,
l1_blocks.number, l1_blocks.timestamp
FROM state_batches
INNER JOIN l1_blocks ON state_batches.block_hash = l1_blocks.hash
WHERE size + prev_total >= (
SELECT
number
FROM
withdrawals
INNER JOIN l2_blocks ON withdrawals.block_hash = l2_blocks.hash where tx_hash=$1
) ORDER BY "index" LIMIT 1;
`
var batch *StateBatchJSON
err := txn(d.db, func(tx *sql.Tx) error {
row := tx.QueryRow(selectWithdrawalBatchStatement, hash.String())
if row.Err() != nil {
return row.Err()
}
var index, size, prevTotal, blockNumber, blockTimestamp uint64
var root, blockHash string
var extraData []byte
err := row.Scan(&index, &root, &size, &prevTotal, &extraData, &blockHash,
&blockNumber, &blockTimestamp)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
batch = nil
return nil
}
return err
}
batch = &StateBatchJSON{
Index: index,
Root: root,
Size: size,
PrevTotal: prevTotal,
ExtraData: extraData,
BlockHash: blockHash,
BlockNumber: blockNumber,
BlockTimestamp: blockTimestamp,
}
return nil
})
if err != nil {
return nil, err
}
return batch, nil
}
// GetWithdrawalsByAddress returns the list of Withdrawals indexed for the given
// address paginated by the given params.
func (d *Database) GetWithdrawalsByAddress(address common.Address, page PaginationParam, state FinalizationState) (*PaginatedWithdrawals, error) {
selectWithdrawalsStatement := fmt.Sprintf(`
SELECT
withdrawals.guid, withdrawals.from_address, withdrawals.to_address,
withdrawals.amount, withdrawals.tx_hash, withdrawals.data,
withdrawals.l1_token, withdrawals.l2_token,
l2_tokens.name, l2_tokens.symbol, l2_tokens.decimals,
l2_blocks.number, l2_blocks.timestamp, withdrawals.br_withdrawal_hash,
withdrawals.br_withdrawal_proven_tx_hash, withdrawals.br_withdrawal_proven_log_index,
withdrawals.br_withdrawal_finalized_tx_hash, withdrawals.br_withdrawal_finalized_log_index,
withdrawals.br_withdrawal_finalized_success
FROM withdrawals
INNER JOIN l2_blocks ON withdrawals.block_hash=l2_blocks.hash
INNER JOIN l2_tokens ON withdrawals.l2_token=l2_tokens.address
WHERE withdrawals.from_address = $1 %s ORDER BY l2_blocks.timestamp LIMIT $2 OFFSET $3;
`, state.SQL())
var withdrawals []WithdrawalJSON
err := txn(d.db, func(tx *sql.Tx) error {
rows, err := tx.Query(selectWithdrawalsStatement, address.String(), page.Limit, page.Offset)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var withdrawal WithdrawalJSON
var l2Token Token
var wdHash sql.NullString
var proveTxHash sql.NullString
var proveLogIndex sql.NullInt32
var finTxHash sql.NullString
var finLogIndex sql.NullInt32
var finSuccess sql.NullBool
if err := rows.Scan(
&withdrawal.GUID, &withdrawal.FromAddress, &withdrawal.ToAddress,
&withdrawal.Amount, &withdrawal.TxHash, &withdrawal.Data,
&withdrawal.L1Token, &l2Token.Address,
&l2Token.Name, &l2Token.Symbol, &l2Token.Decimals,
&withdrawal.BlockNumber, &withdrawal.BlockTimestamp,
&wdHash, &proveTxHash, &proveLogIndex,
&finTxHash, &finLogIndex, &finSuccess,
); err != nil {
return err
}
withdrawal.L2Token = &l2Token
if wdHash.Valid {
withdrawal.BedrockWithdrawalHash = &wdHash.String
}
if proveTxHash.Valid {
withdrawal.BedrockProvenTxHash = &proveTxHash.String
}
if proveLogIndex.Valid {
idx := int(proveLogIndex.Int32)
withdrawal.BedrockProvenLogIndex = &idx
}
if finTxHash.Valid {
withdrawal.BedrockFinalizedTxHash = &finTxHash.String
}
if finLogIndex.Valid {
idx := int(finLogIndex.Int32)
withdrawal.BedrockFinalizedLogIndex = &idx
}
if finSuccess.Valid {
withdrawal.BedrockFinalizedSuccess = &finSuccess.Bool
}
withdrawals = append(withdrawals, withdrawal)
}
return rows.Err()
})
if err != nil {
return nil, err
}
for i := range withdrawals {
batch, _ := d.GetWithdrawalBatch(common.HexToHash(withdrawals[i].TxHash))
withdrawals[i].Batch = batch
}
const selectWithdrawalCountStatement = `
SELECT
count(*)
FROM withdrawals
INNER JOIN l2_blocks ON withdrawals.block_hash=l2_blocks.hash
INNER JOIN l2_tokens ON withdrawals.l2_token=l2_tokens.address
WHERE withdrawals.from_address = $1;
`
var count uint64
err = txn(d.db, func(tx *sql.Tx) error {
row := tx.QueryRow(selectWithdrawalCountStatement, address.String())
if err != nil {
return err
}
return row.Scan(&count)
})
if err != nil {
return nil, err
}
page.Total = count
return &PaginatedWithdrawals{
&page,
withdrawals,
}, nil
}
// GetHighestL1Block returns the highest known L1 block.
func (d *Database) GetHighestL1Block() (*BlockLocator, error) {
const selectHighestBlockStatement = `
SELECT number, hash FROM l1_blocks ORDER BY number DESC LIMIT 1
`
var highestBlock *BlockLocator
err := txn(d.db, func(tx *sql.Tx) error {
row := tx.QueryRow(selectHighestBlockStatement)
if row.Err() != nil {
return row.Err()
}
var number uint64
var hash string
err := row.Scan(&number, &hash)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
highestBlock = nil
return nil
}
return err
}
highestBlock = &BlockLocator{
Number: number,
Hash: common.HexToHash(hash),
}
return nil
})
if err != nil {
return nil, err
}
return highestBlock, nil
}
// GetHighestL2Block returns the highest known L2 block.
func (d *Database) GetHighestL2Block() (*BlockLocator, error) {
const selectHighestBlockStatement = `
SELECT number, hash FROM l2_blocks ORDER BY number DESC LIMIT 1
`
var highestBlock *BlockLocator
err := txn(d.db, func(tx *sql.Tx) error {
row := tx.QueryRow(selectHighestBlockStatement)
if row.Err() != nil {
return row.Err()
}
var number uint64
var hash string
err := row.Scan(&number, &hash)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
highestBlock = nil
return nil
}
return err
}
highestBlock = &BlockLocator{
Number: number,
Hash: common.HexToHash(hash),
}
return nil
})
if err != nil {
return nil, err
}
return highestBlock, nil
}
// GetIndexedL1BlockByHash returns the L1 block by it's hash.
func (d *Database) GetIndexedL1BlockByHash(hash common.Hash) (*IndexedL1Block, error) {
const selectBlockByHashStatement = `
SELECT
hash, parent_hash, number, timestamp
FROM l1_blocks
WHERE hash = $1
`
var block *IndexedL1Block
err := txn(d.db, func(tx *sql.Tx) error {
row := tx.QueryRow(selectBlockByHashStatement, hash.String())
if row.Err() != nil {
return row.Err()
}
var hash string
var parentHash string
var number uint64
var timestamp uint64
err := row.Scan(&hash, &parentHash, &number, &timestamp)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil
}
return err
}
block = &IndexedL1Block{
Hash: common.HexToHash(hash),
ParentHash: common.HexToHash(parentHash),
Number: number,
Timestamp: timestamp,
Deposits: nil,
}
return nil
})
if err != nil {
return nil, err
}
return block, nil
}
const getAirdropQuery = `
SELECT
address, voter_amount, multisig_signer_amount, gitcoin_amount,
active_bridged_amount, op_user_amount, op_repeat_user_amount,
bonus_amount, total_amount
FROM airdrops
WHERE address = $1
`
func (d *Database) GetAirdrop(address common.Address) (*Airdrop, error) {
row := d.db.QueryRow(getAirdropQuery, strings.ToLower(address.String()))
if row.Err() != nil {
return nil, fmt.Errorf("error getting airdrop: %w", row.Err())
}
airdrop := new(Airdrop)
err := row.Scan(
&airdrop.Address,
&airdrop.VoterAmount,
&airdrop.MultisigSignerAmount,
&airdrop.GitcoinAmount,
&airdrop.ActiveBridgedAmount,
&airdrop.OpUserAmount,
&airdrop.OpRepeatUserAmount,
&airdrop.BonusAmount,
&airdrop.TotalAmount,
)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("error scanning airdrop: %w", err)
}
return airdrop, nil
}
func nullableHash(in *common.Hash) *string {
if in == nil {
return nil
}
out := in.String()
return &out
}
package db
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
)
// Deposit contains transaction data for deposits made via the L1 to L2 bridge.
type Deposit struct {
GUID string
TxHash common.Hash
L1Token common.Address
L2Token common.Address
FromAddress common.Address
ToAddress common.Address
Amount *big.Int
Data []byte
LogIndex uint
}
// String returns the tx hash for the deposit.
func (d Deposit) String() string {
return d.TxHash.String()
}
// DepositJSON contains Deposit data suitable for JSON serialization.
type DepositJSON struct {
GUID string `json:"guid"`
FromAddress string `json:"from"`
ToAddress string `json:"to"`
L1Token *Token `json:"l1Token"`
L2Token string `json:"l2Token"`
Amount string `json:"amount"`
Data []byte `json:"data"`
LogIndex uint64 `json:"logIndex"`
BlockNumber uint64 `json:"blockNumber"`
BlockTimestamp string `json:"blockTimestamp"`
TxHash string `json:"transactionHash"`
}
package db
import (
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum/go-ethereum/common"
)
var ETHL1Address common.Address
// ETHL1Token is a placeholder token for differentiating ETH transactions from
// ERC20 transactions on L1.
var ETHL1Token = &Token{
Address: ETHL1Address.String(),
Name: "Ethereum",
Symbol: "ETH",
Decimals: 18,
}
// ETHL2Token is a placeholder token for differentiating ETH transactions from
// ERC20 transactions on L2.
var ETHL2Token = &Token{
Address: predeploys.LegacyERC20ETH,
Name: "Ethereum",
Symbol: "ETH",
Decimals: 18,
}
package db
import "github.com/google/uuid"
// NewGUID returns a new guid.
func NewGUID() string {
return uuid.New().String()
}
package db
import (
"github.com/ethereum/go-ethereum/common"
)
// IndexedL1Block contains the L1 block including the deposits in it.
type IndexedL1Block struct {
Hash common.Hash
ParentHash common.Hash
Number uint64
Timestamp uint64
Deposits []Deposit
ProvenWithdrawals []ProvenWithdrawal
FinalizedWithdrawals []FinalizedWithdrawal
}
// String returns the block hash for the indexed l1 block.
func (b IndexedL1Block) String() string {
return b.Hash.String()
}
// IndexedL2Block contains the L2 block including the withdrawals in it.
type IndexedL2Block struct {
Hash common.Hash
ParentHash common.Hash
Number uint64
Timestamp uint64
Withdrawals []Withdrawal
}
// String returns the block hash for the indexed l2 block.
func (b IndexedL2Block) String() string {
return b.Hash.String()
}
package db
import (
"github.com/ethereum/go-ethereum/common"
)
// BlockLocator contains the block number and hash. It can
// uniquely identify an Ethereum block
type BlockLocator struct {
Number uint64 `json:"number"`
Hash common.Hash `json:"hash"`
}
package db
// PaginationParam holds the pagination fields passed through by the REST
// middleware and queried by the database to page through deposits and
// withdrawals.
type PaginationParam struct {
Limit uint64 `json:"limit"`
Offset uint64 `json:"offset"`
Total uint64 `json:"total"`
}
type PaginatedDeposits struct {
Param *PaginationParam `json:"pagination"`
Deposits []DepositJSON `json:"items"`
}
type PaginatedWithdrawals struct {
Param *PaginationParam `json:"pagination"`
Withdrawals []WithdrawalJSON `json:"items"`
}
package db
const createL1BlocksTable = `
CREATE TABLE IF NOT EXISTS l1_blocks (
hash VARCHAR NOT NULL PRIMARY KEY,
parent_hash VARCHAR NOT NULL,
number INTEGER NOT NULL,
timestamp INTEGER NOT NULL
)
`
const createL2BlocksTable = `
CREATE TABLE IF NOT EXISTS l2_blocks (
hash VARCHAR NOT NULL PRIMARY KEY,
parent_hash VARCHAR NOT NULL,
number INTEGER NOT NULL,
timestamp INTEGER NOT NULL
)
`
const createDepositsTable = `
CREATE TABLE IF NOT EXISTS deposits (
guid VARCHAR PRIMARY KEY NOT NULL,
from_address VARCHAR NOT NULL,
to_address VARCHAR NOT NULL,
l1_token VARCHAR NOT NULL REFERENCES l1_tokens(address),
l2_token VARCHAR NOT NULL,
amount VARCHAR NOT NULL,
data BYTEA NOT NULL,
log_index INTEGER NOT NULL,
block_hash VARCHAR NOT NULL REFERENCES l1_blocks(hash),
tx_hash VARCHAR NOT NULL
)
`
const createL1TokensTable = `
CREATE TABLE IF NOT EXISTS l1_tokens (
address VARCHAR NOT NULL PRIMARY KEY,
name VARCHAR NOT NULL,
symbol VARCHAR NOT NULL,
decimals INTEGER NOT NULL
)
`
const createL2TokensTable = `
CREATE TABLE IF NOT EXISTS l2_tokens (
address TEXT NOT NULL PRIMARY KEY,
name TEXT NOT NULL,
symbol TEXT NOT NULL,
decimals INTEGER NOT NULL
)
`
const createStateBatchesTable = `
CREATE TABLE IF NOT EXISTS state_batches (
index INTEGER NOT NULL PRIMARY KEY,
root VARCHAR NOT NULL,
size INTEGER NOT NULL,
prev_total INTEGER NOT NULL,
extra_data BYTEA NOT NULL,
block_hash VARCHAR NOT NULL REFERENCES l1_blocks(hash)
);
CREATE INDEX IF NOT EXISTS state_batches_block_hash ON state_batches(block_hash);
CREATE INDEX IF NOT EXISTS state_batches_size ON state_batches(size);
CREATE INDEX IF NOT EXISTS state_batches_prev_total ON state_batches(prev_total);
`
const createWithdrawalsTable = `
CREATE TABLE IF NOT EXISTS withdrawals (
guid VARCHAR PRIMARY KEY NOT NULL,
from_address VARCHAR NOT NULL,
to_address VARCHAR NOT NULL,
l1_token VARCHAR NOT NULL,
l2_token VARCHAR NOT NULL REFERENCES l2_tokens(address),
amount VARCHAR NOT NULL,
data BYTEA NOT NULL,
log_index INTEGER NOT NULL,
block_hash VARCHAR NOT NULL REFERENCES l2_blocks(hash),
tx_hash VARCHAR NOT NULL,
state_batch INTEGER REFERENCES state_batches(index)
)
`
const insertETHL1Token = `
INSERT INTO l1_tokens
(address, name, symbol, decimals)
VALUES ('0x0000000000000000000000000000000000000000', 'Ethereum', 'ETH', 18)
ON CONFLICT (address) DO NOTHING;
`
// earlier transactions used 0x0000000000000000000000000000000000000000 as
// address of ETH so insert both that and
// 0xDeadDeAddeAddEAddeadDEaDDEAdDeaDDeAD0000
const insertETHL2Token = `
INSERT INTO l2_tokens
(address, name, symbol, decimals)
VALUES ('0xDeadDeAddeAddEAddeadDEaDDEAdDeaDDeAD0000', 'Ethereum', 'ETH', 18)
ON CONFLICT (address) DO NOTHING;
INSERT INTO l2_tokens
(address, name, symbol, decimals)
VALUES ('0x0000000000000000000000000000000000000000', 'Ethereum', 'ETH', 18)
ON CONFLICT (address) DO NOTHING;
`
const createL1L2NumberIndex = `
CREATE UNIQUE INDEX IF NOT EXISTS l1_blocks_number ON l1_blocks(number);
CREATE UNIQUE INDEX IF NOT EXISTS l2_blocks_number ON l2_blocks(number);
`
const createAirdropsTable = `
CREATE TABLE IF NOT EXISTS airdrops (
address VARCHAR(42) PRIMARY KEY,
voter_amount VARCHAR NOT NULL DEFAULT '0' CHECK(voter_amount ~ '^\d+$') ,
multisig_signer_amount VARCHAR NOT NULL DEFAULT '0' CHECK(multisig_signer_amount ~ '^\d+$'),
gitcoin_amount VARCHAR NOT NULL DEFAULT '0' CHECK(gitcoin_amount ~ '^\d+$'),
active_bridged_amount VARCHAR NOT NULL DEFAULT '0' CHECK(active_bridged_amount ~ '^\d+$'),
op_user_amount VARCHAR NOT NULL DEFAULT '0' CHECK(op_user_amount ~ '^\d+$'),
op_repeat_user_amount VARCHAR NOT NULL DEFAULT '0' CHECK(op_user_amount ~ '^\d+$'),
op_og_amount VARCHAR NOT NULL DEFAULT '0' CHECK(op_og_amount ~ '^\d+$'),
bonus_amount VARCHAR NOT NULL DEFAULT '0' CHECK(bonus_amount ~ '^\d+$'),
total_amount VARCHAR NOT NULL CHECK(voter_amount ~ '^\d+$')
)
`
const updateWithdrawalsTable = `
ALTER TABLE withdrawals ADD COLUMN IF NOT EXISTS br_withdrawal_hash VARCHAR NULL;
ALTER TABLE withdrawals ADD COLUMN IF NOT EXISTS br_withdrawal_proven_tx_hash VARCHAR NULL;
ALTER TABLE withdrawals ADD COLUMN IF NOT EXISTS br_withdrawal_proven_log_index INTEGER NULL;
ALTER TABLE withdrawals ADD COLUMN IF NOT EXISTS br_withdrawal_finalized_tx_hash VARCHAR NULL;
ALTER TABLE withdrawals ADD COLUMN IF NOT EXISTS br_withdrawal_finalized_log_index INTEGER NULL;
ALTER TABLE withdrawals ADD COLUMN IF NOT EXISTS br_withdrawal_finalized_success BOOLEAN NULL;
CREATE INDEX IF NOT EXISTS withdrawals_br_withdrawal_hash ON withdrawals(br_withdrawal_hash);
`
var schema = []string{
createL1BlocksTable,
createL2BlocksTable,
createL1TokensTable,
createL2TokensTable,
createStateBatchesTable,
insertETHL1Token,
insertETHL2Token,
createDepositsTable,
createWithdrawalsTable,
createL1L2NumberIndex,
createAirdropsTable,
updateWithdrawalsTable,
}
package db
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
)
// StateBatch is the state batch containing merkle root of the withdrawals
// periodically written to L1.
type StateBatch struct {
Index *big.Int
Root common.Hash
Size *big.Int
PrevTotal *big.Int
ExtraData []byte
BlockHash common.Hash
}
// StateBatchJSON contains StateBatch data suitable for JSON serialization.
type StateBatchJSON struct {
Index uint64 `json:"index"`
Root string `json:"root"`
Size uint64 `json:"size"`
PrevTotal uint64 `json:"prevTotal"`
ExtraData []byte `json:"extraData"`
BlockHash string `json:"blockHash"`
BlockNumber uint64 `json:"blockNumber"`
BlockTimestamp uint64 `json:"blockTimestamp"`
}
package db
// Token contains the token details of the ERC20 contract at the given address.
// NOTE: The Token address will almost definitely be different on L1 and L2, so
// we need to track it on both chains when handling transactions.
type Token struct {
Address string `json:"address"`
Name string `json:"name"`
Symbol string `json:"symbol"`
Decimals uint8 `json:"decimals"`
}
package db
import "database/sql"
func txn(db *sql.DB, apply func(*sql.Tx) error) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if p := recover(); p != nil {
// Ignore since we're panicking anyway
_ = tx.Rollback()
panic(p)
}
}()
err = apply(tx)
if err != nil {
// Don't swallow application error
_ = tx.Rollback()
return err
}
return tx.Commit()
}
package db
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
)
// Withdrawal contains transaction data for withdrawals made via the L2 to L1 bridge.
type Withdrawal struct {
GUID string
TxHash common.Hash
L1Token common.Address
L2Token common.Address
FromAddress common.Address
ToAddress common.Address
Amount *big.Int
Data []byte
LogIndex uint
BedrockHash *common.Hash
}
// String returns the tx hash for the withdrawal.
func (w Withdrawal) String() string {
return w.TxHash.String()
}
// WithdrawalJSON contains Withdrawal data suitable for JSON serialization.
type WithdrawalJSON struct {
GUID string `json:"guid"`
FromAddress string `json:"from"`
ToAddress string `json:"to"`
L1Token string `json:"l1Token"`
L2Token *Token `json:"l2Token"`
Amount string `json:"amount"`
Data []byte `json:"data"`
LogIndex uint64 `json:"logIndex"`
BlockNumber uint64 `json:"blockNumber"`
BlockTimestamp string `json:"blockTimestamp"`
TxHash string `json:"transactionHash"`
Batch *StateBatchJSON `json:"batch"`
BedrockWithdrawalHash *string `json:"bedrockWithdrawalHash"`
BedrockProvenTxHash *string `json:"bedrockProvenTxHash"`
BedrockProvenLogIndex *int `json:"bedrockProvenLogIndex"`
BedrockFinalizedTxHash *string `json:"bedrockFinalizedTxHash"`
BedrockFinalizedLogIndex *int `json:"bedrockFinalizedLogIndex"`
BedrockFinalizedSuccess *bool `json:"bedrockFinalizedSuccess"`
}
type FinalizationState int
const (
FinalizationStateAny FinalizationState = iota
FinalizationStateFinalized
FinalizationStateUnfinalized
)
func ParseFinalizationState(in string) FinalizationState {
switch in {
case "true":
return FinalizationStateFinalized
case "false":
return FinalizationStateUnfinalized
default:
return FinalizationStateAny
}
}
func (f FinalizationState) SQL() string {
switch f {
case FinalizationStateFinalized:
return "AND withdrawals.br_withdrawal_finalized_tx_hash IS NOT NULL"
case FinalizationStateUnfinalized:
return "AND withdrawals.br_withdrawal_finalized_tx_hash IS NULL"
}
return ""
}
type ProvenWithdrawal struct {
From common.Address
To common.Address
WithdrawalHash common.Hash
TxHash common.Hash
LogIndex uint
}
type FinalizedWithdrawal struct {
WithdrawalHash common.Hash
TxHash common.Hash
Success bool
LogIndex uint
}
package flags
import (
"time"
"github.com/urfave/cli"
)
const envVarPrefix = "INDEXER_"
func prefixEnvVar(name string) string {
return envVarPrefix + name
}
var (
/* Required Flags */
BuildEnvFlag = cli.StringFlag{
Name: "build-env",
Usage: "Build environment for which the binary is produced, " +
"e.g. production or development",
Required: true,
EnvVar: prefixEnvVar("BUILD_ENV"),
}
ChainIDFlag = cli.StringFlag{
Name: "chain-id",
Usage: "Ethereum chain ID",
Required: true,
EnvVar: prefixEnvVar("CHAIN_ID"),
}
L1EthRPCFlag = cli.StringFlag{
Name: "l1-eth-rpc",
Usage: "HTTP provider URL for L1",
Required: true,
EnvVar: prefixEnvVar("L1_ETH_RPC"),
}
L2EthRPCFlag = cli.StringFlag{
Name: "l2-eth-rpc",
Usage: "HTTP provider URL for L2",
Required: true,
EnvVar: prefixEnvVar("L2_ETH_RPC"),
}
L1AddressManagerAddressFlag = cli.StringFlag{
Name: "l1-address-manager-address",
Usage: "Address of the L1 address manager",
Required: true,
EnvVar: prefixEnvVar("L1_ADDRESS_MANAGER_ADDRESS"),
}
DBHostFlag = cli.StringFlag{
Name: "db-host",
Usage: "Hostname of the database connection",
Required: true,
EnvVar: prefixEnvVar("DB_HOST"),
}
DBPortFlag = cli.Uint64Flag{
Name: "db-port",
Usage: "Port of the database connection",
Required: true,
EnvVar: prefixEnvVar("DB_PORT"),
}
DBUserFlag = cli.StringFlag{
Name: "db-user",
Usage: "Username of the database connection",
Required: true,
EnvVar: prefixEnvVar("DB_USER"),
}
DBPasswordFlag = cli.StringFlag{
Name: "db-password",
Usage: "Password of the database connection",
Required: true,
EnvVar: prefixEnvVar("DB_PASSWORD"),
}
DBNameFlag = cli.StringFlag{
Name: "db-name",
Usage: "Database name of the database connection",
Required: true,
EnvVar: prefixEnvVar("DB_NAME"),
}
/* Bedrock Flags */
BedrockFlag = cli.BoolFlag{
Name: "bedrock",
Usage: "Whether or not this indexer should operate in Bedrock mode",
EnvVar: prefixEnvVar("BEDROCK"),
}
BedrockL1StandardBridgeAddress = cli.StringFlag{
Name: "bedrock.l1-standard-bridge-address",
Usage: "Address of the L1 standard bridge",
EnvVar: prefixEnvVar("BEDROCK_L1_STANDARD_BRIDGE"),
}
BedrockOptimismPortalAddress = cli.StringFlag{
Name: "bedrock.portal-address",
Usage: "Address of the portal",
EnvVar: prefixEnvVar("BEDROCK_OPTIMISM_PORTAL"),
}
/* Optional Flags */
DisableIndexer = cli.BoolFlag{
Name: "disable-indexer",
Usage: "Whether or not to enable the indexer on this instance",
Required: false,
EnvVar: prefixEnvVar("DISABLE_INDEXER"),
}
LogLevelFlag = cli.StringFlag{
Name: "log-level",
Usage: "The lowest log level that will be output",
Value: "info",
EnvVar: prefixEnvVar("LOG_LEVEL"),
}
LogTerminalFlag = cli.BoolFlag{
Name: "log-terminal",
Usage: "If true, outputs logs in terminal format, otherwise prints " +
"in JSON format. If SENTRY_ENABLE is set to true, this flag is " +
"ignored and logs are printed using JSON",
EnvVar: prefixEnvVar("LOG_TERMINAL"),
}
SentryEnableFlag = cli.BoolFlag{
Name: "sentry-enable",
Usage: "Whether or not to enable Sentry. If true, sentry-dsn must also be set",
EnvVar: prefixEnvVar("SENTRY_ENABLE"),
}
SentryDsnFlag = cli.StringFlag{
Name: "sentry-dsn",
Usage: "Sentry data source name",
EnvVar: prefixEnvVar("SENTRY_DSN"),
}
SentryTraceRateFlag = cli.DurationFlag{
Name: "sentry-trace-rate",
Usage: "Sentry trace rate",
Value: 50 * time.Millisecond,
EnvVar: prefixEnvVar("SENTRY_TRACE_RATE"),
}
L1StartBlockNumberFlag = cli.Uint64Flag{
Name: "start-block-number",
Usage: "The block number to start indexing from. Must be use together with start block hash",
Value: 0,
EnvVar: prefixEnvVar("START_BLOCK_NUMBER"),
}
L1ConfDepthFlag = cli.Uint64Flag{
Name: "l1-conf-depth",
Usage: "The number of confirmations after which headers are considered confirmed on L1",
Value: 20,
EnvVar: prefixEnvVar("L1_CONF_DEPTH"),
}
L2ConfDepthFlag = cli.Uint64Flag{
Name: "l2-conf-depth",
Usage: "The number of confirmations after which headers are considered confirmed on L1",
Value: 24,
EnvVar: prefixEnvVar("L2_CONF_DEPTH"),
}
MaxHeaderBatchSizeFlag = cli.Uint64Flag{
Name: "max-header-batch-size",
Usage: "The maximum number of headers to request as a batch",
Value: 2000,
EnvVar: prefixEnvVar("MAX_HEADER_BATCH_SIZE"),
}
RESTHostnameFlag = cli.StringFlag{
Name: "rest-hostname",
Usage: "The hostname of the REST server",
Value: "127.0.0.1",
EnvVar: prefixEnvVar("REST_HOSTNAME"),
}
RESTPortFlag = cli.Uint64Flag{
Name: "rest-port",
Usage: "The port of the REST server",
Value: 8080,
EnvVar: prefixEnvVar("REST_PORT"),
}
MetricsServerEnableFlag = cli.BoolFlag{
Name: "metrics-server-enable",
Usage: "Whether or not to run the embedded metrics server",
EnvVar: prefixEnvVar("METRICS_SERVER_ENABLE"),
}
MetricsHostnameFlag = cli.StringFlag{
Name: "metrics-hostname",
Usage: "The hostname of the metrics server",
Value: "127.0.0.1",
EnvVar: prefixEnvVar("METRICS_HOSTNAME"),
}
MetricsPortFlag = cli.Uint64Flag{
Name: "metrics-port",
Usage: "The port of the metrics server",
Value: 7300,
EnvVar: prefixEnvVar("METRICS_PORT"),
}
)
var requiredFlags = []cli.Flag{
BuildEnvFlag,
ChainIDFlag,
L1EthRPCFlag,
L2EthRPCFlag,
L1AddressManagerAddressFlag,
DBHostFlag,
DBPortFlag,
DBUserFlag,
DBPasswordFlag,
DBNameFlag,
}
var optionalFlags = []cli.Flag{
BedrockFlag,
BedrockL1StandardBridgeAddress,
BedrockOptimismPortalAddress,
DisableIndexer,
LogLevelFlag,
LogTerminalFlag,
SentryEnableFlag,
SentryDsnFlag,
SentryTraceRateFlag,
L1ConfDepthFlag,
L2ConfDepthFlag,
MaxHeaderBatchSizeFlag,
L1StartBlockNumberFlag,
RESTHostnameFlag,
RESTPortFlag,
MetricsServerEnableFlag,
MetricsHostnameFlag,
MetricsPortFlag,
}
// Flags contains the list of configuration options available to the binary.
var Flags = append(requiredFlags, optionalFlags...)
package flags
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/urfave/cli"
)
// TestRequiredFlagsSetRequired asserts that all flags deemed required properly
// have the Required field set to true.
func TestRequiredFlagsSetRequired(t *testing.T) {
for _, flag := range requiredFlags {
reqFlag, ok := flag.(cli.RequiredFlag)
require.True(t, ok)
require.True(t, reqFlag.IsRequired())
}
}
// TestOptionalFlagsDontSetRequired asserts that all flags deemed optional set
// the Required field to false.
func TestOptionalFlagsDontSetRequired(t *testing.T) {
for _, flag := range optionalFlags {
reqFlag, ok := flag.(cli.RequiredFlag)
require.True(t, ok)
require.False(t, reqFlag.IsRequired())
}
}
package integration_tests
import (
"database/sql"
"encoding/json"
"fmt"
"math/big"
"net/http"
"os"
"testing"
"time"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/indexer/legacy"
"github.com/ethereum-optimism/optimism/indexer/services/l1"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
op_e2e "github.com/ethereum-optimism/optimism/op-e2e"
"github.com/ethereum-optimism/optimism/op-e2e/config"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/withdrawals"
"github.com/ethereum-optimism/optimism/op-service/client/utils"
_ "github.com/lib/pq"
)
func TestBedrockIndexer(t *testing.T) {
dbParams := createTestDB(t)
cfg := op_e2e.DefaultSystemConfig(t)
cfg.DeployConfig.FinalizationPeriodSeconds = 2
sys, err := cfg.Start()
require.NoError(t, err)
defer sys.Close()
l1Client := sys.Clients["l1"]
l2Client := sys.Clients["sequencer"]
fromAddr := cfg.Secrets.Addresses().Alice
// wait a couple of blocks
require.NoError(t, utils.WaitBlock(e2eutils.TimeoutCtx(t, 30*time.Second), l2Client, 10))
l1SB, err := bindings.NewL1StandardBridge(cfg.L1Deployments.L1StandardBridgeProxy, l1Client)
require.NoError(t, err)
l2SB, err := bindings.NewL2StandardBridge(predeploys.L2StandardBridgeAddr, l2Client)
require.NoError(t, err)
l1Opts, err := bind.NewKeyedTransactorWithChainID(cfg.Secrets.Alice, cfg.L1ChainIDBig())
require.NoError(t, err)
l2Opts, err := bind.NewKeyedTransactorWithChainID(cfg.Secrets.Alice, cfg.L2ChainIDBig())
require.NoError(t, err)
idxrCfg := legacy.Config{
ChainID: cfg.DeployConfig.L1ChainID,
L1EthRpc: sys.Nodes["l1"].HTTPEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].HTTPEndpoint(),
PollInterval: time.Second,
DBHost: dbParams.Host,
DBPort: dbParams.Port,
DBUser: dbParams.User,
DBPassword: dbParams.Password,
DBName: dbParams.Name,
LogLevel: "info",
LogTerminal: true,
L1StartBlockNumber: 0,
L1ConfDepth: 1,
L2ConfDepth: 1,
MaxHeaderBatchSize: 2,
RESTHostname: "127.0.0.1",
RESTPort: 7980,
DisableIndexer: false,
Bedrock: true,
BedrockL1StandardBridgeAddress: cfg.DeployConfig.L1StandardBridgeProxy,
BedrockOptimismPortalAddress: cfg.DeployConfig.OptimismPortalProxy,
}
idxr, err := legacy.NewIndexer(idxrCfg)
require.NoError(t, err)
errCh := make(chan error, 1)
go func() {
errCh <- idxr.Start()
}()
t.Cleanup(func() {
idxr.Stop()
require.NoError(t, <-errCh)
})
makeURL := func(path string) string {
return fmt.Sprintf("http://%s:%d/%s", idxrCfg.RESTHostname, idxrCfg.RESTPort, path)
}
t.Run("deposit ETH", func(t *testing.T) {
l1Opts.Value = big.NewInt(params.Ether)
depTx, err := l1SB.DepositETH(l1Opts, 200_000, nil)
require.NoError(t, err)
depReceipt, err := utils.WaitReceiptOK(e2eutils.TimeoutCtx(t, 10*time.Second), l1Client, depTx.Hash())
require.NoError(t, err)
require.Greaterf(t, len(depReceipt.Logs), 0, "must have logs")
var l2Hash common.Hash
for _, eLog := range depReceipt.Logs {
if len(eLog.Topics) == 0 || eLog.Topics[0] != derive.DepositEventABIHash {
continue
}
depLog, err := derive.UnmarshalDepositLogEvent(eLog)
require.NoError(t, err)
tx := types.NewTx(depLog)
l2Hash = tx.Hash()
}
require.NotEqual(t, common.Hash{}, l2Hash)
_, err = utils.WaitReceiptOK(e2eutils.TimeoutCtx(t, 15*time.Second), l2Client, l2Hash)
require.NoError(t, err)
// Poll for indexer deposit
var depPage *db.PaginatedDeposits
require.NoError(t, utils.WaitFor(e2eutils.TimeoutCtx(t, 30*time.Second), 100*time.Millisecond, func() (bool, error) {
res := new(db.PaginatedDeposits)
err := getJSON(makeURL(fmt.Sprintf("v1/deposits/%s", fromAddr)), res)
if err != nil {
return false, err
}
if len(res.Deposits) == 0 {
return false, nil
}
depPage = res
return true, nil
}))
// Make sure deposit is what we expect
require.Equal(t, 1, len(depPage.Deposits))
deposit := depPage.Deposits[0]
require.Equal(t, big.NewInt(params.Ether).String(), deposit.Amount)
require.Equal(t, depTx.Hash().String(), deposit.TxHash)
require.Equal(t, depReceipt.BlockNumber.Uint64(), deposit.BlockNumber)
require.Equal(t, fromAddr.String(), deposit.FromAddress)
require.Equal(t, fromAddr.String(), deposit.ToAddress)
require.EqualValues(t, db.ETHL1Token, deposit.L1Token)
require.Equal(t, l1.ZeroAddress.String(), deposit.L2Token)
require.NotEmpty(t, deposit.GUID)
// Perform withdrawal through bridge
l2Opts.Value = big.NewInt(0.5 * params.Ether)
wdTx, err := l2SB.Withdraw(l2Opts, predeploys.LegacyERC20ETHAddr, big.NewInt(0.5*params.Ether), 0, nil)
require.NoError(t, err)
wdReceipt, err := utils.WaitReceiptOK(e2eutils.TimeoutCtx(t, 30*time.Second), l2Client, wdTx.Hash())
require.NoError(t, err)
var wdPage *db.PaginatedWithdrawals
require.NoError(t, utils.WaitFor(e2eutils.TimeoutCtx(t, 30*time.Second), 100*time.Millisecond, func() (bool, error) {
res := new(db.PaginatedWithdrawals)
err := getJSON(makeURL(fmt.Sprintf("v1/withdrawals/%s", fromAddr)), res)
if err != nil {
return false, err
}
if len(res.Withdrawals) == 0 {
return false, nil
}
wdPage = res
return true, nil
}))
require.Equal(t, 1, len(wdPage.Withdrawals))
withdrawal := wdPage.Withdrawals[0]
require.Nil(t, withdrawal.BedrockProvenTxHash)
require.Nil(t, withdrawal.BedrockFinalizedTxHash)
require.Equal(t, big.NewInt(0.5*params.Ether).String(), withdrawal.Amount)
require.Equal(t, wdTx.Hash().String(), withdrawal.TxHash)
require.Equal(t, wdReceipt.BlockNumber.Uint64(), withdrawal.BlockNumber)
// use fromaddr twice here because the user is withdrawing
// to themselves
require.Equal(t, fromAddr.String(), withdrawal.FromAddress)
require.Equal(t, fromAddr.String(), withdrawal.ToAddress)
require.EqualValues(t, l1.ZeroAddress.String(), withdrawal.L1Token)
require.Equal(t, db.ETHL2Token, withdrawal.L2Token)
require.NotEmpty(t, withdrawal.GUID)
// Prove our withdrawal
wdParams, proveReceipt := op_e2e.ProveWithdrawal(t, cfg, l1Client, sys.Nodes["sequencer"], cfg.Secrets.Alice, wdReceipt)
wdPage = nil
require.NoError(t, utils.WaitFor(e2eutils.TimeoutCtx(t, 30*time.Second), 100*time.Millisecond, func() (bool, error) {
res := new(db.PaginatedWithdrawals)
err := getJSON(makeURL(fmt.Sprintf("v1/withdrawals/%s", fromAddr)), res)
if err != nil {
return false, err
}
if res.Withdrawals[0].BedrockProvenTxHash == nil {
return false, nil
}
wdPage = res
return true, nil
}))
wd := wdPage.Withdrawals[0]
require.Equal(t, proveReceipt.TxHash.String(), *wd.BedrockProvenTxHash)
require.Nil(t, wd.BedrockFinalizedTxHash)
// Finalize withdrawal
err = withdrawals.WaitForFinalizationPeriod(e2eutils.TimeoutCtx(t, 30*time.Second), l1Client, proveReceipt.BlockNumber, config.L1Deployments.L2OutputOracleProxy)
require.Nil(t, err)
finReceipt := op_e2e.FinalizeWithdrawal(t, cfg, l1Client, cfg.Secrets.Alice, wdReceipt, wdParams)
wdPage = nil
require.NoError(t, utils.WaitFor(e2eutils.TimeoutCtx(t, 30*time.Second), 100*time.Millisecond, func() (bool, error) {
res := new(db.PaginatedWithdrawals)
err := getJSON(makeURL(fmt.Sprintf("v1/withdrawals/%s", fromAddr)), res)
if err != nil {
return false, err
}
if res.Withdrawals[0].BedrockFinalizedTxHash == nil {
return false, nil
}
wdPage = res
return true, nil
}))
wd = wdPage.Withdrawals[0]
require.Equal(t, proveReceipt.TxHash.String(), *wd.BedrockProvenTxHash)
require.Equal(t, finReceipt.TxHash.String(), *wd.BedrockFinalizedTxHash)
require.True(t, *wd.BedrockFinalizedSuccess)
wdPage = new(db.PaginatedWithdrawals)
err = getJSON(makeURL(fmt.Sprintf("v1/withdrawals/%s?finalized=false", fromAddr)), wdPage)
require.NoError(t, err)
require.Equal(t, 0, len(wdPage.Withdrawals))
})
}
type testDBParams struct {
Host string
Port uint64
User string
Password string
Name string
}
func createTestDB(t *testing.T) *testDBParams {
user := os.Getenv("DB_USER")
name := fmt.Sprintf("indexer_test_%d", time.Now().Unix())
dsn := "postgres://"
if user != "" {
dsn += user
dsn += "@"
}
dsn += "localhost:5432?sslmode=disable"
pg, err := sql.Open(
"postgres",
dsn,
)
require.NoError(t, err)
_, err = pg.Exec("CREATE DATABASE " + name)
require.NoError(t, err)
t.Cleanup(func() {
_, err = pg.Exec("DROP DATABASE " + name)
require.NoError(t, err)
pg.Close()
})
return &testDBParams{
Host: "localhost",
Port: 5432,
Name: name,
User: user,
}
}
func getJSON(url string, out interface{}) error {
res, err := http.Get(url)
if err != nil {
return err
}
if res.StatusCode != 200 {
return fmt.Errorf("non-200 status code %d", res.StatusCode)
}
defer res.Body.Close()
dec := json.NewDecoder(res.Body)
return dec.Decode(out)
}
package legacy
import (
"errors"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/indexer/flags"
)
var (
// ErrSentryDSNNotSet signals that not Data Source Name was provided
// with which to configure Sentry logging.
ErrSentryDSNNotSet = errors.New("sentry-dsn must be set if use-sentry " +
"is true")
)
type Config struct {
/* Required Params */
// ChainID identifies the chain being indexed.
ChainID uint64
// L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string
// L2EthRpc is the HTTP provider URL for L1.
L2EthRpc string
// L1AddressManagerAddress is the address of the address manager for L1.
L1AddressManagerAddress string
// PollInterval is the delay between querying L2 for more transaction
// and creating a new batch.
PollInterval time.Duration
// Hostname of the database connection.
DBHost string
// Port of the database connection.
DBPort uint64
// Username of the database connection.
DBUser string
// Password of the database connection.
DBPassword string
// Database name of the database connection.
DBName string
/* Optional Params */
// LogLevel is the lowest log level that will be output.
LogLevel string
// LogTerminal if true, prints to stdout in terminal format, otherwise
// prints using JSON. If SentryEnable is true this flag is ignored, and logs
// are printed using JSON.
LogTerminal bool
// L1StartBlockNumber is the block number to start indexing L1 from.
L1StartBlockNumber uint64
// L1ConfDepth is the number of confirmations after which headers are
// considered confirmed on L1.
L1ConfDepth uint64
// L2ConfDepth is the number of confirmations after which headers are
// considered confirmed on L2.
L2ConfDepth uint64
// MaxHeaderBatchSize is the maximum number of headers to request as a
// batch.
MaxHeaderBatchSize uint64
// RESTHostname is the hostname at which the REST server is running.
RESTHostname string
// RESTPort is the port at which the REST server is running.
RESTPort uint64
// MetricsServerEnable if true, will create a metrics client and log to
// Prometheus.
MetricsServerEnable bool
// MetricsHostname is the hostname at which the metrics server is running.
MetricsHostname string
// MetricsPort is the port at which the metrics server is running.
MetricsPort uint64
// DisableIndexer enables/disables the indexer.
DisableIndexer bool
// Bedrock enabled Bedrock indexing.
Bedrock bool
BedrockL1StandardBridgeAddress common.Address
BedrockOptimismPortalAddress common.Address
}
// NewConfig parses the Config from the provided flags or environment variables.
// This method fails if ValidateConfig deems the configuration to be malformed.
func NewConfig(ctx *cli.Context) (Config, error) {
cfg := Config{
/* Required Flags */
ChainID: ctx.GlobalUint64(flags.ChainIDFlag.Name),
L1EthRpc: ctx.GlobalString(flags.L1EthRPCFlag.Name),
L2EthRpc: ctx.GlobalString(flags.L2EthRPCFlag.Name),
L1AddressManagerAddress: ctx.GlobalString(flags.L1AddressManagerAddressFlag.Name),
DBHost: ctx.GlobalString(flags.DBHostFlag.Name),
DBPort: ctx.GlobalUint64(flags.DBPortFlag.Name),
DBUser: ctx.GlobalString(flags.DBUserFlag.Name),
DBPassword: ctx.GlobalString(flags.DBPasswordFlag.Name),
DBName: ctx.GlobalString(flags.DBNameFlag.Name),
/* Optional Flags */
Bedrock: ctx.GlobalBool(flags.BedrockFlag.Name),
BedrockL1StandardBridgeAddress: common.HexToAddress(ctx.GlobalString(flags.BedrockL1StandardBridgeAddress.Name)),
BedrockOptimismPortalAddress: common.HexToAddress(ctx.GlobalString(flags.BedrockOptimismPortalAddress.Name)),
DisableIndexer: ctx.GlobalBool(flags.DisableIndexer.Name),
LogLevel: ctx.GlobalString(flags.LogLevelFlag.Name),
LogTerminal: ctx.GlobalBool(flags.LogTerminalFlag.Name),
L1StartBlockNumber: ctx.GlobalUint64(flags.L1StartBlockNumberFlag.Name),
L1ConfDepth: ctx.GlobalUint64(flags.L1ConfDepthFlag.Name),
L2ConfDepth: ctx.GlobalUint64(flags.L2ConfDepthFlag.Name),
MaxHeaderBatchSize: ctx.GlobalUint64(flags.MaxHeaderBatchSizeFlag.Name),
MetricsServerEnable: ctx.GlobalBool(flags.MetricsServerEnableFlag.Name),
RESTHostname: ctx.GlobalString(flags.RESTHostnameFlag.Name),
RESTPort: ctx.GlobalUint64(flags.RESTPortFlag.Name),
MetricsHostname: ctx.GlobalString(flags.MetricsHostnameFlag.Name),
MetricsPort: ctx.GlobalUint64(flags.MetricsPortFlag.Name),
}
err := ValidateConfig(&cfg)
if err != nil {
return Config{}, err
}
return cfg, nil
}
// ValidateConfig ensures additional constraints on the parsed configuration to
// ensure that it is well-formed.
func ValidateConfig(cfg *Config) error {
// Sanity check log level.
if cfg.LogLevel == "" {
cfg.LogLevel = "debug"
}
_, err := log.LvlFromString(cfg.LogLevel)
if err != nil {
return err
}
if cfg.Bedrock && (cfg.BedrockL1StandardBridgeAddress == common.Address{} || cfg.BedrockOptimismPortalAddress == common.Address{}) {
return errors.New("must specify l1 standard bridge and optimism portal addresses in bedrock mode")
}
return nil
}
package legacy_test
import (
"fmt"
"testing"
legacy "github.com/ethereum-optimism/optimism/indexer/legacy"
"github.com/stretchr/testify/require"
)
var validateConfigTests = []struct {
name string
cfg legacy.Config
expErr error
}{
{
name: "bad log level",
cfg: legacy.Config{
LogLevel: "unknown",
},
expErr: fmt.Errorf("unknown level: unknown"),
},
}
// TestValidateConfig asserts the behavior of ValidateConfig by testing expected
// error and success configurations.
func TestValidateConfig(t *testing.T) {
for _, test := range validateConfigTests {
t.Run(test.name, func(t *testing.T) {
err := legacy.ValidateConfig(&test.cfg)
require.Equal(t, err, test.expErr)
})
}
}
package legacy
import (
"context"
"fmt"
"math/big"
"net"
"net/http"
"os"
"strconv"
"time"
"github.com/ethereum-optimism/optimism/indexer/services"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/indexer/metrics"
"github.com/ethereum-optimism/optimism/indexer/server"
"github.com/rs/cors"
database "github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/indexer/services/l1"
"github.com/ethereum-optimism/optimism/indexer/services/l2"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/mux"
"github.com/urfave/cli"
)
const (
// defaultDialTimeout is default duration the service will wait on
// startup to make a connection to either the L1 or L2 backends.
defaultDialTimeout = 5 * time.Second
)
// Main is the entrypoint into the indexer service. This method returns
// a closure that executes the service and blocks until the service exits. The
// use of a closure allows the parameters bound to the top-level main package,
// e.g. GitVersion, to be captured and used once the function is executed.
func Main(gitVersion string) func(ctx *cli.Context) error {
return func(ctx *cli.Context) error {
cfg, err := NewConfig(ctx)
if err != nil {
return err
}
log.Info("Initializing indexer")
indexer, err := NewIndexer(cfg)
if err != nil {
log.Error("Unable to create indexer", "error", err)
return err
}
log.Info("Starting indexer")
if err := indexer.Start(); err != nil {
return err
}
defer indexer.Stop()
log.Info("Indexer started")
<-(chan struct{})(nil)
return nil
}
}
// Indexer is a service that configures the necessary resources for
// running the Sync and BlockHandler sub-services.
type Indexer struct {
ctx context.Context
cfg Config
l1Client *ethclient.Client
l2Client *ethclient.Client
l1IndexingService *l1.Service
l2IndexingService *l2.Service
airdropService *services.Airdrop
router *mux.Router
metrics *metrics.Metrics
db *database.Database
server *http.Server
}
// NewIndexer initializes the Indexer, gathering any resources
// that will be needed by the TxIndexer and StateIndexer
// sub-services.
func NewIndexer(cfg Config) (*Indexer, error) {
ctx := context.Background()
var logHandler log.Handler
if cfg.LogTerminal {
logHandler = log.StreamHandler(os.Stdout, log.TerminalFormat(true))
} else {
logHandler = log.StreamHandler(os.Stdout, log.JSONFormat())
}
logLevel, err := log.LvlFromString(cfg.LogLevel)
if err != nil {
return nil, err
}
log.Root().SetHandler(log.LvlFilterHandler(logLevel, logHandler))
// Connect to L1 and L2 providers. Perform these last since they are the
// most expensive.
l1Client, rawl1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc)
if err != nil {
return nil, err
}
l2Client, l2RPC, err := dialEthClientWithTimeout(ctx, cfg.L2EthRpc)
if err != nil {
return nil, err
}
m := metrics.NewMetrics(nil)
if cfg.MetricsServerEnable {
go func() {
_, err := m.Serve(cfg.MetricsHostname, cfg.MetricsPort)
if err != nil {
log.Error("metrics server failed to start", "err", err)
}
}()
log.Info("metrics server enabled", "host", cfg.MetricsHostname, "port", cfg.MetricsPort)
}
dsn := fmt.Sprintf("host=%s port=%d dbname=%s sslmode=disable",
cfg.DBHost, cfg.DBPort, cfg.DBName)
if cfg.DBUser != "" {
dsn += fmt.Sprintf(" user=%s", cfg.DBUser)
}
if cfg.DBPassword != "" {
dsn += fmt.Sprintf(" password=%s", cfg.DBPassword)
}
db, err := database.NewDatabase(dsn)
if err != nil {
return nil, err
}
var addrManager services.AddressManager
if cfg.Bedrock {
addrManager, err = services.NewBedrockAddresses(
l1Client,
cfg.BedrockL1StandardBridgeAddress,
cfg.BedrockOptimismPortalAddress,
)
} else {
addrManager, err = services.NewLegacyAddresses(l1Client, common.HexToAddress(cfg.L1AddressManagerAddress))
}
if err != nil {
return nil, err
}
l1IndexingService, err := l1.NewService(l1.ServiceConfig{
Context: ctx,
Metrics: m,
L1Client: l1Client,
RawL1Client: rawl1Client,
ChainID: new(big.Int).SetUint64(cfg.ChainID),
AddressManager: addrManager,
DB: db,
ConfDepth: cfg.L1ConfDepth,
MaxHeaderBatchSize: cfg.MaxHeaderBatchSize,
StartBlockNumber: cfg.L1StartBlockNumber,
Bedrock: cfg.Bedrock,
})
if err != nil {
return nil, err
}
l2IndexingService, err := l2.NewService(l2.ServiceConfig{
Context: ctx,
Metrics: m,
L2RPC: l2RPC,
L2Client: l2Client,
DB: db,
ConfDepth: cfg.L2ConfDepth,
MaxHeaderBatchSize: cfg.MaxHeaderBatchSize,
StartBlockNumber: uint64(0),
Bedrock: cfg.Bedrock,
})
if err != nil {
return nil, err
}
return &Indexer{
ctx: ctx,
cfg: cfg,
l1Client: l1Client,
l2Client: l2Client,
l1IndexingService: l1IndexingService,
l2IndexingService: l2IndexingService,
airdropService: services.NewAirdrop(db, m),
router: mux.NewRouter(),
metrics: m,
db: db,
}, nil
}
// Serve spins up a REST API server at the given hostname and port.
func (b *Indexer) Serve() error {
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
})
b.router.HandleFunc("/v1/l1/status", b.l1IndexingService.GetIndexerStatus).Methods("GET")
b.router.HandleFunc("/v1/l2/status", b.l2IndexingService.GetIndexerStatus).Methods("GET")
b.router.HandleFunc("/v1/deposits/0x{address:[a-fA-F0-9]{40}}", b.l1IndexingService.GetDeposits).Methods("GET")
b.router.HandleFunc("/v1/withdrawal/0x{hash:[a-fA-F0-9]{64}}", b.l2IndexingService.GetWithdrawalBatch).Methods("GET")
b.router.HandleFunc("/v1/withdrawals/0x{address:[a-fA-F0-9]{40}}", b.l2IndexingService.GetWithdrawals).Methods("GET")
b.router.HandleFunc("/v1/airdrops/0x{address:[a-fA-F0-9]{40}}", b.airdropService.GetAirdrop)
b.router.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
_, err := w.Write([]byte("OK"))
if err != nil {
log.Error("Error handling /healthz", "error", err)
}
})
middleware := server.LoggingMiddleware(b.metrics, log.New("service", "server"))
port := strconv.FormatUint(b.cfg.RESTPort, 10)
addr := net.JoinHostPort(b.cfg.RESTHostname, port)
b.server = &http.Server{
Addr: addr,
Handler: middleware(c.Handler(b.router)),
}
errCh := make(chan error, 1)
go func() {
errCh <- b.server.ListenAndServe()
}()
// Capture server startup errors
<-time.After(10 * time.Millisecond)
select {
case err := <-errCh:
return err
default:
log.Info("indexer REST server listening on", "addr", addr)
return nil
}
}
// Start starts the starts the indexing service on L1 and L2 chains and also
// starts the REST server.
func (b *Indexer) Start() error {
if b.cfg.DisableIndexer {
log.Info("indexer disabled, only serving data")
} else {
err := b.l1IndexingService.Start()
if err != nil {
return err
}
err = b.l2IndexingService.Start()
if err != nil {
return err
}
}
return b.Serve()
}
// Stop stops the indexing service on L1 and L2 chains.
func (b *Indexer) Stop() {
b.db.Close()
if b.server != nil {
// background context here so it waits for
// conns to close
_ = b.server.Shutdown(context.Background())
}
if !b.cfg.DisableIndexer {
b.l1IndexingService.Stop()
b.l2IndexingService.Stop()
}
}
// dialL1EthClientWithTimeout attempts to dial the L1 provider using the
// provided URL. If the dial doesn't complete within defaultDialTimeout seconds,
// this method will return an error.
func dialEthClientWithTimeout(ctx context.Context, url string) (
*ethclient.Client, *rpc.Client, error) {
ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
c, err := rpc.DialContext(ctxt, url)
if err != nil {
return nil, nil, err
}
return ethclient.NewClient(c), c, nil
}
package metrics
import (
"net"
"net/http"
"strconv"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
const metricsNamespace = "indexer"
type Metrics struct {
SyncHeight *prometheus.GaugeVec
DepositsCount *prometheus.CounterVec
WithdrawalsCount *prometheus.CounterVec
StateBatchesCount prometheus.Counter
L1CatchingUp prometheus.Gauge
L2CatchingUp prometheus.Gauge
SyncPercent *prometheus.GaugeVec
UpdateDuration *prometheus.SummaryVec
CachedTokensCount *prometheus.CounterVec
HTTPRequestsCount prometheus.Counter
HTTPResponsesCount *prometheus.CounterVec
HTTPRequestDurationSecs prometheus.Summary
tokenAddrs map[string]string
}
func NewMetrics(monitoredTokens map[string]string) *Metrics {
mts := make(map[string]string)
mts["0x0000000000000000000000000000000000000000"] = "ETH"
for addr, symbol := range monitoredTokens {
mts[addr] = symbol
}
return &Metrics{
SyncHeight: promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "sync_height",
Help: "The max height of the indexer's last batch of L1/L1 blocks.",
Namespace: metricsNamespace,
}, []string{
"chain",
}),
DepositsCount: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "deposits_count",
Help: "The number of deposits indexed.",
Namespace: metricsNamespace,
}, []string{
"symbol",
}),
WithdrawalsCount: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "withdrawals_count",
Help: "The number of withdrawals indexed.",
Namespace: metricsNamespace,
}, []string{
"symbol",
}),
StateBatchesCount: promauto.NewCounter(prometheus.CounterOpts{
Name: "state_batches_count",
Help: "The number of state batches indexed.",
Namespace: metricsNamespace,
}),
L1CatchingUp: promauto.NewGauge(prometheus.GaugeOpts{
Name: "l1_catching_up",
Help: "Whether or not L1 is far behind the chain tip.",
Namespace: metricsNamespace,
}),
L2CatchingUp: promauto.NewGauge(prometheus.GaugeOpts{
Name: "l2_catching_up",
Help: "Whether or not L2 is far behind the chain tip.",
Namespace: metricsNamespace,
}),
SyncPercent: promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "sync_percent",
Help: "Sync percentage for each chain.",
Namespace: metricsNamespace,
}, []string{
"chain",
}),
UpdateDuration: promauto.NewSummaryVec(prometheus.SummaryOpts{
Name: "update_duration_seconds",
Help: "How long each update took.",
Namespace: metricsNamespace,
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
}, []string{
"chain",
}),
CachedTokensCount: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "cached_tokens_count",
Help: "How many tokens are in the cache",
Namespace: metricsNamespace,
}, []string{
"chain",
}),
HTTPRequestsCount: promauto.NewCounter(prometheus.CounterOpts{
Name: "http_requests_count",
Help: "How many HTTP requests this instance has seen",
Namespace: metricsNamespace,
}),
HTTPResponsesCount: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "http_responses_count",
Help: "How many HTTP responses this instance has served",
Namespace: metricsNamespace,
}, []string{
"status_code",
}),
HTTPRequestDurationSecs: promauto.NewSummary(prometheus.SummaryOpts{
Name: "http_request_duration_secs",
Help: "How long each HTTP request took",
Namespace: metricsNamespace,
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
}),
tokenAddrs: mts,
}
}
func (m *Metrics) SetL1SyncHeight(height uint64) {
m.SyncHeight.WithLabelValues("l1").Set(float64(height))
}
func (m *Metrics) SetL2SyncHeight(height uint64) {
m.SyncHeight.WithLabelValues("l2").Set(float64(height))
}
func (m *Metrics) RecordDeposit(addr common.Address) {
sym := m.tokenAddrs[addr.String()]
if sym == "" {
sym = "UNKNOWN"
}
m.DepositsCount.WithLabelValues(sym).Inc()
}
func (m *Metrics) RecordWithdrawal(addr common.Address) {
sym := m.tokenAddrs[addr.String()]
if sym == "" {
sym = "UNKNOWN"
}
m.WithdrawalsCount.WithLabelValues(sym).Inc()
}
func (m *Metrics) RecordStateBatches(count int) {
m.StateBatchesCount.Add(float64(count))
}
func (m *Metrics) SetL1CatchingUp(state bool) {
var catchingUp float64
if state {
catchingUp = 1
}
m.L1CatchingUp.Set(catchingUp)
}
func (m *Metrics) SetL2CatchingUp(state bool) {
var catchingUp float64
if state {
catchingUp = 1
}
m.L2CatchingUp.Set(catchingUp)
}
func (m *Metrics) SetL1SyncPercent(height uint64, head uint64) {
m.SyncPercent.WithLabelValues("l1").Set(float64(height) / float64(head))
}
func (m *Metrics) SetL2SyncPercent(height uint64, head uint64) {
m.SyncPercent.WithLabelValues("l2").Set(float64(height) / float64(head))
}
func (m *Metrics) IncL1CachedTokensCount() {
m.CachedTokensCount.WithLabelValues("l1").Inc()
}
func (m *Metrics) IncL2CachedTokensCount() {
m.CachedTokensCount.WithLabelValues("l2").Inc()
}
func (m *Metrics) RecordHTTPRequest() {
m.HTTPRequestsCount.Inc()
}
func (m *Metrics) RecordHTTPResponse(code int, dur time.Duration) {
m.HTTPResponsesCount.WithLabelValues(strconv.Itoa(code)).Inc()
m.HTTPRequestDurationSecs.Observe(float64(dur) / float64(time.Second))
}
func (m *Metrics) Serve(hostname string, port uint64) (*http.Server, error) {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
srv := new(http.Server)
srv.Addr = net.JoinHostPort(hostname, strconv.FormatUint(port, 10))
srv.Handler = mux
err := srv.ListenAndServe()
return srv, err
}
package server
import (
"encoding/json"
"net/http"
"runtime/debug"
"time"
"github.com/ethereum-optimism/optimism/indexer/metrics"
"github.com/ethereum/go-ethereum/log"
)
// RespondWithError writes the given error code and message to the writer.
func RespondWithError(w http.ResponseWriter, code int, message string) {
RespondWithJSON(w, code, map[string]string{"error": message})
}
// RespondWithJSON writes the given payload marshalled as JSON to the writer.
func RespondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
response, _ := json.Marshal(payload)
w.WriteHeader(code)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write(response)
}
// responseWriter is a minimal wrapper for http.ResponseWriter that allows the
// written HTTP status code to be captured for logging.
type responseWriter struct {
http.ResponseWriter
status int
wroteHeader bool
}
func wrapResponseWriter(w http.ResponseWriter) *responseWriter {
return &responseWriter{ResponseWriter: w}
}
func (rw *responseWriter) Status() int {
return rw.status
}
func (rw *responseWriter) WriteHeader(code int) {
if rw.wroteHeader {
return
}
rw.status = code
rw.ResponseWriter.WriteHeader(code)
rw.wroteHeader = true
}
// LoggingMiddleware logs the incoming HTTP request & its duration.
func LoggingMiddleware(metrics *metrics.Metrics, logger log.Logger) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err != nil {
w.WriteHeader(http.StatusInternalServerError)
logger.Error(
"server panicked",
"err", err,
"trace", debug.Stack(),
)
}
}()
metrics.RecordHTTPRequest()
start := time.Now()
wrapped := wrapResponseWriter(w)
next.ServeHTTP(wrapped, r)
dur := time.Since(start)
logger.Info(
"served request",
"status", wrapped.status,
"method", r.Method,
"path", r.URL.EscapedPath(),
"duration", dur,
)
metrics.RecordHTTPResponse(wrapped.status, dur)
}
return http.HandlerFunc(fn)
}
}
package services
import (
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
legacy_bindings "github.com/ethereum-optimism/optimism/op-bindings/legacy-bindings"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)
type AddressManager interface {
L1StandardBridge() (common.Address, *bindings.L1StandardBridge)
StateCommitmentChain() (common.Address, *legacy_bindings.StateCommitmentChain)
OptimismPortal() (common.Address, *bindings.OptimismPortal)
}
type LegacyAddresses struct {
l1SB *bindings.L1StandardBridge
l1SBAddr common.Address
scc *legacy_bindings.StateCommitmentChain
sccAddr common.Address
}
var _ AddressManager = (*LegacyAddresses)(nil)
func NewLegacyAddresses(client bind.ContractBackend, addrMgrAddr common.Address) (AddressManager, error) {
mgr, err := bindings.NewAddressManager(addrMgrAddr, client)
if err != nil {
return nil, err
}
l1SBAddr, err := mgr.GetAddress(nil, "Proxy__OVM_L1StandardBridge")
if err != nil {
return nil, err
}
sccAddr, err := mgr.GetAddress(nil, "StateCommitmentChain")
if err != nil {
return nil, err
}
l1SB, err := bindings.NewL1StandardBridge(l1SBAddr, client)
if err != nil {
return nil, err
}
sccContract, err := legacy_bindings.NewStateCommitmentChain(sccAddr, client)
if err != nil {
return nil, err
}
return &LegacyAddresses{
l1SB: l1SB,
l1SBAddr: l1SBAddr,
scc: sccContract,
sccAddr: sccAddr,
}, nil
}
func (a *LegacyAddresses) L1StandardBridge() (common.Address, *bindings.L1StandardBridge) {
return a.l1SBAddr, a.l1SB
}
func (a *LegacyAddresses) StateCommitmentChain() (common.Address, *legacy_bindings.StateCommitmentChain) {
return a.sccAddr, a.scc
}
func (a *LegacyAddresses) OptimismPortal() (common.Address, *bindings.OptimismPortal) {
panic("OptimismPortal not configured on legacy networks - this is a programmer error")
}
type BedrockAddresses struct {
l1SB *bindings.L1StandardBridge
l1SBAddr common.Address
portal *bindings.OptimismPortal
portalAddr common.Address
}
var _ AddressManager = (*BedrockAddresses)(nil)
func NewBedrockAddresses(client bind.ContractBackend, l1SBAddr, portalAddr common.Address) (AddressManager, error) {
l1SB, err := bindings.NewL1StandardBridge(l1SBAddr, client)
if err != nil {
return nil, err
}
portal, err := bindings.NewOptimismPortal(portalAddr, client)
if err != nil {
return nil, err
}
return &BedrockAddresses{
l1SB: l1SB,
l1SBAddr: l1SBAddr,
portal: portal,
portalAddr: portalAddr,
}, nil
}
func (b *BedrockAddresses) L1StandardBridge() (common.Address, *bindings.L1StandardBridge) {
return b.l1SBAddr, b.l1SB
}
func (b *BedrockAddresses) StateCommitmentChain() (common.Address, *legacy_bindings.StateCommitmentChain) {
panic("SCC not configured on legacy networks - this is a programmer error")
}
func (b *BedrockAddresses) OptimismPortal() (common.Address, *bindings.OptimismPortal) {
return b.portalAddr, b.portal
}
package services
import (
"net/http"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/indexer/metrics"
"github.com/ethereum-optimism/optimism/indexer/server"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/gorilla/mux"
)
var airdropLogger = log.New("service", "airdrop")
type Airdrop struct {
db *db.Database
metrics *metrics.Metrics
}
func NewAirdrop(db *db.Database, metrics *metrics.Metrics) *Airdrop {
return &Airdrop{
db: db,
metrics: metrics,
}
}
func (a *Airdrop) GetAirdrop(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
address := vars["address"]
airdrop, err := a.db.GetAirdrop(common.HexToAddress(address))
if err != nil {
airdropLogger.Error("db error getting airdrop", "err", err)
server.RespondWithError(w, http.StatusInternalServerError, "database error")
return
}
if airdrop == nil {
server.RespondWithError(w, http.StatusNotFound, "airdrop not found")
return
}
server.RespondWithJSON(w, http.StatusOK, airdrop)
}
package bridge
import (
"context"
"errors"
"math/big"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/indexer/services"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
legacy_bindings "github.com/ethereum-optimism/optimism/op-bindings/legacy-bindings"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)
// DepositsMap is a collection of deposit objects keyed
// on block hashes.
type DepositsMap map[common.Hash][]db.Deposit
// WithdrawalsMap is a collection of withdrawal objects keyed
// on block hashes.
type InitiatedWithdrawalMap map[common.Hash][]db.Withdrawal
// ProvenWithdrawalsMap is a collection of proven withdrawal
// objects keyed on block hashses
type ProvenWithdrawalsMap map[common.Hash][]db.ProvenWithdrawal
// FinalizedWithdrawalsMap is a collection of finalized withdrawal
// objects keyed on block hashes.
type FinalizedWithdrawalsMap map[common.Hash][]db.FinalizedWithdrawal
type Bridge interface {
Address() common.Address
GetDepositsByBlockRange(context.Context, uint64, uint64) (DepositsMap, error)
String() string
}
type implConfig struct {
name string
impl string
addr common.Address
}
var customBridgeCfgs = map[uint64][]*implConfig{
// Mainnet
1: {
{"BitBTC", "StandardBridge", common.HexToAddress("0xaBA2c5F108F7E820C049D5Af70B16ac266c8f128")},
{"DAI", "StandardBridge", common.HexToAddress("0x10E6593CDda8c58a1d0f14C5164B376352a55f2F")},
{"wstETH", "StandardBridge", common.HexToAddress("0x76943C0D61395d8F2edF9060e1533529cAe05dE6")},
},
}
func BridgesByChainID(chainID *big.Int, client bind.ContractBackend, addrs services.AddressManager) (map[string]Bridge, error) {
l1SBAddr, _ := addrs.L1StandardBridge()
allCfgs := []*implConfig{
{"Standard", "StandardBridge", l1SBAddr},
{"ETH", "ETHBridge", l1SBAddr},
}
allCfgs = append(allCfgs, customBridgeCfgs[chainID.Uint64()]...)
bridges := make(map[string]Bridge)
for _, bridge := range allCfgs {
switch bridge.impl {
case "StandardBridge":
l1SB, err := bindings.NewL1StandardBridge(bridge.addr, client)
if err != nil {
return nil, err
}
standardBridge := &StandardBridge{
name: bridge.name,
address: bridge.addr,
contract: l1SB,
}
bridges[bridge.name] = standardBridge
case "ETHBridge":
l1SB, err := bindings.NewL1StandardBridge(bridge.addr, client)
if err != nil {
return nil, err
}
ethBridge := &EthBridge{
name: bridge.name,
address: bridge.addr,
contract: l1SB,
}
bridges[bridge.name] = ethBridge
default:
return nil, errors.New("unsupported bridge")
}
}
return bridges, nil
}
func StateCommitmentChainScanner(client bind.ContractFilterer, addrs services.AddressManager) (*legacy_bindings.StateCommitmentChainFilterer, error) {
sccAddr, _ := addrs.StateCommitmentChain()
filter, err := legacy_bindings.NewStateCommitmentChainFilterer(sccAddr, client)
if err != nil {
return nil, err
}
return filter, nil
}
package bridge
import "time"
const (
DefaultConnectionTimeout = 60 * time.Second
)
package bridge
import (
"context"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-service/backoff"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)
type EthBridge struct {
name string
address common.Address
contract *bindings.L1StandardBridge
}
func (e *EthBridge) Address() common.Address {
return e.address
}
func (e *EthBridge) GetDepositsByBlockRange(ctx context.Context, start, end uint64) (DepositsMap, error) {
depositsByBlockhash := make(DepositsMap)
opts := &bind.FilterOpts{
Context: ctx,
Start: start,
End: &end,
}
iter, err := backoff.Do(ctx, 3, backoff.Exponential(), func() (*bindings.L1StandardBridgeETHDepositInitiatedIterator, error) {
return e.contract.FilterETHDepositInitiated(opts, nil, nil)
})
if err != nil {
return nil, err
}
defer iter.Close()
for iter.Next() {
depositsByBlockhash[iter.Event.Raw.BlockHash] = append(
depositsByBlockhash[iter.Event.Raw.BlockHash], db.Deposit{
TxHash: iter.Event.Raw.TxHash,
FromAddress: iter.Event.From,
ToAddress: iter.Event.To,
Amount: iter.Event.Amount,
Data: iter.Event.ExtraData,
LogIndex: iter.Event.Raw.Index,
})
}
return depositsByBlockhash, iter.Error()
}
func (e *EthBridge) String() string {
return e.name
}
package bridge
import (
"context"
"time"
legacy_bindings "github.com/ethereum-optimism/optimism/op-bindings/legacy-bindings"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
)
// clientRetryInterval is the interval to wait between retrying client API
// calls.
var clientRetryInterval = 5 * time.Second
// FilterStateBatchAppendedWithRetry retries the given func until it succeeds,
// waiting for clientRetryInterval duration after every call.
func FilterStateBatchAppendedWithRetry(ctx context.Context, filterer *legacy_bindings.StateCommitmentChainFilterer, opts *bind.FilterOpts) (*legacy_bindings.StateCommitmentChainStateBatchAppendedIterator, error) {
for {
ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout)
opts.Context = ctxt
res, err := filterer.FilterStateBatchAppended(opts, nil)
cancel()
if err == nil {
return res, nil
}
logger.Error("Error fetching filter", "err", err)
time.Sleep(clientRetryInterval)
}
}
package bridge
import "github.com/ethereum/go-ethereum/log"
var logger = log.New("service", "l1-bridge")
package bridge
import (
"context"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/indexer/services"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-service/backoff"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)
type Portal struct {
address common.Address
contract *bindings.OptimismPortal
}
func NewPortal(addrs services.AddressManager) *Portal {
address, contract := addrs.OptimismPortal()
return &Portal{
address: address,
contract: contract,
}
}
func (p *Portal) Address() common.Address {
return p.address
}
func (p *Portal) GetProvenWithdrawalsByBlockRange(ctx context.Context, start, end uint64) (ProvenWithdrawalsMap, error) {
wdsByBlockHash := make(ProvenWithdrawalsMap)
opts := &bind.FilterOpts{
Context: ctx,
Start: start,
End: &end,
}
iter, err := backoff.Do(ctx, 3, backoff.Exponential(), func() (*bindings.OptimismPortalWithdrawalProvenIterator, error) {
return p.contract.FilterWithdrawalProven(opts, nil, nil, nil)
})
if err != nil {
return nil, err
}
defer iter.Close()
for iter.Next() {
wdsByBlockHash[iter.Event.Raw.BlockHash] = append(
wdsByBlockHash[iter.Event.Raw.BlockHash], db.ProvenWithdrawal{
WithdrawalHash: iter.Event.WithdrawalHash,
From: iter.Event.From,
To: iter.Event.To,
TxHash: iter.Event.Raw.TxHash,
LogIndex: iter.Event.Raw.Index,
},
)
}
return wdsByBlockHash, iter.Error()
}
func (p *Portal) GetFinalizedWithdrawalsByBlockRange(ctx context.Context, start, end uint64) (FinalizedWithdrawalsMap, error) {
wdsByBlockHash := make(FinalizedWithdrawalsMap)
opts := &bind.FilterOpts{
Context: ctx,
Start: start,
End: &end,
}
iter, err := backoff.Do(ctx, 3, backoff.Exponential(), func() (*bindings.OptimismPortalWithdrawalFinalizedIterator, error) {
return p.contract.FilterWithdrawalFinalized(opts, nil)
})
if err != nil {
return nil, err
}
defer iter.Close()
for iter.Next() {
wdsByBlockHash[iter.Event.Raw.BlockHash] = append(
wdsByBlockHash[iter.Event.Raw.BlockHash], db.FinalizedWithdrawal{
TxHash: iter.Event.Raw.TxHash,
WithdrawalHash: iter.Event.WithdrawalHash,
Success: iter.Event.Success,
LogIndex: iter.Event.Raw.Index,
},
)
}
return wdsByBlockHash, iter.Error()
}
package bridge
import (
"context"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-service/backoff"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)
type StandardBridge struct {
name string
address common.Address
contract *bindings.L1StandardBridge
}
func (s *StandardBridge) Address() common.Address {
return s.address
}
func (s *StandardBridge) GetDepositsByBlockRange(ctx context.Context, start, end uint64) (DepositsMap, error) {
depositsByBlockhash := make(DepositsMap)
opts := &bind.FilterOpts{
Context: ctx,
Start: start,
End: &end,
}
iter, err := backoff.Do(ctx, 3, backoff.Exponential(), func() (*bindings.L1StandardBridgeERC20DepositInitiatedIterator, error) {
return s.contract.FilterERC20DepositInitiated(opts, nil, nil, nil)
})
if err != nil {
return nil, err
}
defer iter.Close()
for iter.Next() {
depositsByBlockhash[iter.Event.Raw.BlockHash] = append(
depositsByBlockhash[iter.Event.Raw.BlockHash], db.Deposit{
TxHash: iter.Event.Raw.TxHash,
L1Token: iter.Event.L1Token,
L2Token: iter.Event.L2Token,
FromAddress: iter.Event.From,
ToAddress: iter.Event.To,
Amount: iter.Event.Amount,
Data: iter.Event.ExtraData,
LogIndex: iter.Event.Raw.Index,
})
}
return depositsByBlockhash, iter.Error()
}
func (s *StandardBridge) String() string {
return s.name
}
package l1
import (
"context"
"encoding/json"
"errors"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/indexer/services/util"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
const (
DefaultConnectionTimeout = 30 * time.Second
DefaultMaxBatchSize = 100
)
type NewHeader struct {
types.Header
Hash common.Hash
}
func (h *NewHeader) UnmarshalJSON(input []byte) error {
type NewHeader struct {
Hash *common.Hash `json:"hash" gencodec:"required"`
ParentHash *common.Hash `json:"parentHash" gencodec:"required"`
UncleHash *common.Hash `json:"sha3Uncles" gencodec:"required"`
Coinbase *common.Address `json:"miner" gencodec:"required"`
Root *common.Hash `json:"stateRoot" gencodec:"required"`
TxHash *common.Hash `json:"transactionsRoot" gencodec:"required"`
ReceiptHash *common.Hash `json:"receiptsRoot" gencodec:"required"`
Bloom *types.Bloom `json:"logsBloom" gencodec:"required"`
Difficulty *hexutil.Big `json:"difficulty" gencodec:"required"`
Number *hexutil.Big `json:"number" gencodec:"required"`
GasLimit *hexutil.Uint64 `json:"gasLimit" gencodec:"required"`
GasUsed *hexutil.Uint64 `json:"gasUsed" gencodec:"required"`
Time *hexutil.Uint64 `json:"timestamp" gencodec:"required"`
Extra *hexutil.Bytes `json:"extraData" gencodec:"required"`
MixDigest *common.Hash `json:"mixHash"`
Nonce *types.BlockNonce `json:"nonce"`
BaseFee *hexutil.Big `json:"baseFeePerGas" rlp:"optional"`
}
var dec NewHeader
if err := json.Unmarshal(input, &dec); err != nil {
return err
}
if dec.Hash == nil {
return errors.New("missing required field 'hash' for Header")
}
h.Hash = *dec.Hash
if dec.ParentHash == nil {
return errors.New("missing required field 'parentHash' for Header")
}
h.ParentHash = *dec.ParentHash
if dec.UncleHash == nil {
return errors.New("missing required field 'sha3Uncles' for Header")
}
h.UncleHash = *dec.UncleHash
if dec.Coinbase == nil {
return errors.New("missing required field 'miner' for Header")
}
h.Coinbase = *dec.Coinbase
if dec.Root == nil {
return errors.New("missing required field 'stateRoot' for Header")
}
h.Root = *dec.Root
if dec.TxHash == nil {
return errors.New("missing required field 'transactionsRoot' for Header")
}
h.TxHash = *dec.TxHash
if dec.ReceiptHash == nil {
return errors.New("missing required field 'receiptsRoot' for Header")
}
h.ReceiptHash = *dec.ReceiptHash
if dec.Bloom == nil {
return errors.New("missing required field 'logsBloom' for Header")
}
h.Bloom = *dec.Bloom
if dec.Difficulty == nil {
return errors.New("missing required field 'difficulty' for Header")
}
h.Difficulty = (*big.Int)(dec.Difficulty)
if dec.Number == nil {
return errors.New("missing required field 'number' for Header")
}
h.Number = (*big.Int)(dec.Number)
if dec.GasLimit == nil {
return errors.New("missing required field 'gasLimit' for Header")
}
h.GasLimit = uint64(*dec.GasLimit)
if dec.GasUsed == nil {
return errors.New("missing required field 'gasUsed' for Header")
}
h.GasUsed = uint64(*dec.GasUsed)
if dec.Time == nil {
return errors.New("missing required field 'timestamp' for Header")
}
h.Time = uint64(*dec.Time)
if dec.Extra == nil {
return errors.New("missing required field 'extraData' for Header")
}
h.Extra = *dec.Extra
if dec.MixDigest != nil {
h.MixDigest = *dec.MixDigest
}
if dec.Nonce != nil {
h.Nonce = *dec.Nonce
}
if dec.BaseFee != nil {
h.BaseFee = (*big.Int)(dec.BaseFee)
}
return nil
}
type HeaderSelectorConfig struct {
ConfDepth uint64
MaxBatchSize uint64
}
type ConfirmedHeaderSelector struct {
cfg HeaderSelectorConfig
}
func HeadersByRange(ctx context.Context, client *rpc.Client, startHeight uint64, count int) ([]*NewHeader, error) {
height := startHeight
batchElems := make([]rpc.BatchElem, count)
for i := 0; i < count; i++ {
batchElems[i] = rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{
util.ToBlockNumArg(new(big.Int).SetUint64(height + uint64(i))),
false,
},
Result: new(NewHeader),
Error: nil,
}
}
if err := client.BatchCallContext(ctx, batchElems); err != nil {
return nil, err
}
out := make([]*NewHeader, count)
for i := 0; i < len(batchElems); i++ {
if batchElems[i].Error != nil {
return nil, batchElems[i].Error
}
out[i] = batchElems[i].Result.(*NewHeader)
}
return out, nil
}
func (f *ConfirmedHeaderSelector) NewHead(
ctx context.Context,
lowest uint64,
header *types.Header,
client *rpc.Client,
) ([]*NewHeader, error) {
number := header.Number.Uint64()
blockHash := header.Hash
logger.Info("New block", "block", number, "hash", blockHash)
if number < f.cfg.ConfDepth {
return nil, nil
}
endHeight := number - f.cfg.ConfDepth + 1
minNextHeight := lowest + f.cfg.ConfDepth
if minNextHeight > number {
log.Info("Fork block ", "block", number, "hash", blockHash)
return nil, nil
}
startHeight := lowest + 1
// Clamp to max batch size
if startHeight+f.cfg.MaxBatchSize < endHeight+1 {
endHeight = startHeight + f.cfg.MaxBatchSize - 1
}
nHeaders := int(endHeight - startHeight + 1)
if nHeaders > 1 {
logger.Info("Loading blocks",
"startHeight", startHeight, "endHeight", endHeight)
}
headers := make([]*NewHeader, 0)
height := startHeight
left := nHeaders - len(headers)
for left > 0 {
count := DefaultMaxBatchSize
if count > left {
count = left
}
logger.Info("Loading block batch",
"height", height, "count", count)
ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout)
fetched, err := HeadersByRange(ctxt, client, height, count)
cancel()
if err != nil {
return nil, err
}
headers = append(headers, fetched...)
left = nHeaders - len(headers)
height += uint64(count)
}
logger.Debug("Verifying block range ",
"startHeight", startHeight, "endHeight", endHeight)
for i, header := range headers {
// Trim the returned headers if any of the lookups failed.
if header == nil {
headers = headers[:i]
break
}
// Assert that each header builds on the parent before it, trim if there
// are any discontinuities.
if i > 0 {
prevHeader := headers[i-1]
if prevHeader.Hash != header.ParentHash {
log.Error("Parent hash does not connect to ",
"block", header.Number.Uint64(), "hash", header.Hash,
"prev", prevHeader.Number.Uint64(), "hash", prevHeader.Hash)
headers = headers[:i]
break
}
}
log.Debug("Confirmed block ",
"block", header.Number.Uint64(), "hash", header.Hash)
}
return headers, nil
}
func NewConfirmedHeaderSelector(cfg HeaderSelectorConfig) (*ConfirmedHeaderSelector,
error) {
if cfg.ConfDepth == 0 {
return nil, errors.New("ConfDepth must be greater than zero")
}
if cfg.MaxBatchSize == 0 {
return nil, errors.New("MaxBatchSize must be greater than zero")
}
return &ConfirmedHeaderSelector{
cfg: cfg,
}, nil
}
package l1
import (
"context"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/indexer/services/l1/bridge"
legacy_bindings "github.com/ethereum-optimism/optimism/op-bindings/legacy-bindings"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)
func QueryStateBatches(filterer *legacy_bindings.StateCommitmentChainFilterer, startHeight, endHeight uint64, ctx context.Context) (map[common.Hash][]db.StateBatch, error) {
batches := make(map[common.Hash][]db.StateBatch)
iter, err := bridge.FilterStateBatchAppendedWithRetry(ctx, filterer, &bind.FilterOpts{
Start: startHeight,
End: &endHeight,
})
if err != nil {
return nil, err
}
defer iter.Close()
for iter.Next() {
batches[iter.Event.Raw.BlockHash] = append(
batches[iter.Event.Raw.BlockHash], db.StateBatch{
Index: iter.Event.BatchIndex,
Root: iter.Event.BatchRoot,
Size: iter.Event.BatchSize,
PrevTotal: iter.Event.PrevTotalElements,
ExtraData: iter.Event.ExtraData,
BlockHash: iter.Event.Raw.BlockHash,
})
}
return batches, iter.Error()
}
package l1
import (
"context"
"errors"
"fmt"
"math/big"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/indexer/metrics"
"github.com/ethereum-optimism/optimism/indexer/services"
"github.com/ethereum-optimism/optimism/indexer/services/query"
legacy_bindings "github.com/ethereum-optimism/optimism/op-bindings/legacy-bindings"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum-optimism/optimism/indexer/server"
"github.com/ethereum-optimism/optimism/indexer/services/l1/bridge"
_ "github.com/lib/pq"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/mux"
)
var logger = log.New("service", "l1")
// errNoChainID represents the error when the chain id is not provided
// and it cannot be remotely fetched
var errNoChainID = errors.New("no chain id provided")
var errNoNewBlocks = errors.New("no new blocks")
var ZeroAddress common.Address
// Driver is an interface for indexing deposits from l1.
type Driver interface {
// Name is an identifier used to prefix logs for a particular service.
Name() string
}
type ServiceConfig struct {
Context context.Context
Metrics *metrics.Metrics
L1Client *ethclient.Client
RawL1Client *rpc.Client
ChainID *big.Int
AddressManager services.AddressManager
ConfDepth uint64
MaxHeaderBatchSize uint64
StartBlockNumber uint64
DB *db.Database
Bedrock bool
}
type Service struct {
cfg ServiceConfig
ctx context.Context
cancel func()
bridges map[string]bridge.Bridge
portal *bridge.Portal
batchScanner *legacy_bindings.StateCommitmentChainFilterer
latestHeader uint64
headerSelector *ConfirmedHeaderSelector
l1Client *ethclient.Client
metrics *metrics.Metrics
tokenCache map[common.Address]*db.Token
isBedrock bool
wg sync.WaitGroup
}
type IndexerStatus struct {
Synced float64 `json:"synced"`
Highest db.BlockLocator `json:"highest_block"`
}
func NewService(cfg ServiceConfig) (*Service, error) {
ctx, cancel := context.WithCancel(cfg.Context)
// Handle restart logic
logger.Info("Creating L1 Indexer")
chainID, err := cfg.L1Client.ChainID(context.Background())
if err != nil {
cancel()
return nil, err
}
if cfg.ChainID.Cmp(chainID) != 0 {
cancel()
return nil, fmt.Errorf("chain ID configured with %d but got %d", cfg.ChainID, chainID)
}
bridges, err := bridge.BridgesByChainID(cfg.ChainID, cfg.L1Client, cfg.AddressManager)
if err != nil {
cancel()
return nil, err
}
var portal *bridge.Portal
var batchScanner *legacy_bindings.StateCommitmentChainFilterer
if cfg.Bedrock {
portal = bridge.NewPortal(cfg.AddressManager)
} else {
batchScanner, err = bridge.StateCommitmentChainScanner(cfg.L1Client, cfg.AddressManager)
if err != nil {
cancel()
return nil, err
}
}
logger.Info("Scanning bridges for deposits", "bridges", bridges)
confirmedHeaderSelector, err := NewConfirmedHeaderSelector(HeaderSelectorConfig{
ConfDepth: cfg.ConfDepth,
MaxBatchSize: cfg.MaxHeaderBatchSize,
})
if err != nil {
cancel()
return nil, err
}
service := &Service{
cfg: cfg,
ctx: ctx,
cancel: cancel,
portal: portal,
bridges: bridges,
batchScanner: batchScanner,
headerSelector: confirmedHeaderSelector,
metrics: cfg.Metrics,
tokenCache: map[common.Address]*db.Token{
ZeroAddress: db.ETHL1Token,
},
isBedrock: cfg.Bedrock,
l1Client: cfg.L1Client,
}
service.wg.Add(1)
return service, nil
}
func (s *Service) loop() {
defer s.wg.Done()
for {
err := s.catchUp()
if err == nil {
break
}
if err == context.Canceled {
return
}
logger.Error("error catching up to tip, trying again in a bit", "err", err)
time.Sleep(10 * time.Second)
continue
}
newHeads := make(chan *types.Header, 1000)
tick := time.NewTicker(5 * time.Second)
defer tick.Stop()
for {
select {
case <-tick.C:
header, err := query.HeaderByNumberWithRetry(s.ctx, s.cfg.L1Client)
if err != nil {
logger.Error("error fetching header by number", "err", err)
continue
}
newHeads <- header
case header := <-newHeads:
if header == nil {
break
}
logger.Info("Received new header", "header", header.Hash)
atomic.StoreUint64(&s.latestHeader, header.Number.Uint64())
for {
err := s.Update(header)
if err != nil {
if err != errNoNewBlocks {
logger.Error("Unable to update indexer ", "err", err)
}
break
}
}
case <-s.ctx.Done():
logger.Info("service stopped")
return
}
}
}
func (s *Service) Update(newHeader *types.Header) error {
var lowest db.BlockLocator
highestConfirmed, err := s.cfg.DB.GetHighestL1Block()
if err != nil {
return err
}
if highestConfirmed == nil {
startHeader, err := s.l1Client.HeaderByNumber(s.ctx, new(big.Int).SetUint64(s.cfg.StartBlockNumber))
if err != nil {
return fmt.Errorf("error fetching header by number: %w", err)
}
highestConfirmed = &db.BlockLocator{
Number: s.cfg.StartBlockNumber,
Hash: startHeader.Hash(),
}
}
lowest = *highestConfirmed
headers, err := s.headerSelector.NewHead(s.ctx, lowest.Number, newHeader, s.cfg.RawL1Client)
if err != nil {
return err
}
if len(headers) == 0 {
return errNoNewBlocks
}
if lowest.Number+1 != headers[0].Number.Uint64() {
logger.Error("Block number does not immediately follow ",
"block", headers[0].Number.Uint64(), "hash", headers[0].Hash,
"lowest_block", lowest.Number, "hash", lowest.Hash)
return nil
}
if lowest.Number > 0 && lowest.Hash != headers[0].ParentHash {
logger.Error("Parent hash does not connect to ",
"block", headers[0].Number.Uint64(), "hash", headers[0].Hash,
"lowest_block", lowest.Number, "hash", lowest.Hash)
return nil
}
startHeight := headers[0].Number.Uint64()
endHeight := headers[len(headers)-1].Number.Uint64()
depositsByBlockHash := make(map[common.Hash][]db.Deposit)
start := prometheus.NewTimer(s.metrics.UpdateDuration.WithLabelValues("l1"))
defer func() {
dur := start.ObserveDuration()
logger.Info("updated index", "start_height", startHeight, "end_height", endHeight, "duration", dur)
}()
bridgeDepositsCh := make(chan bridge.DepositsMap, len(s.bridges))
provenWithdrawalsCh := make(chan bridge.ProvenWithdrawalsMap, 1)
finalizedWithdrawalsCh := make(chan bridge.FinalizedWithdrawalsMap, 1)
errCh := make(chan error, len(s.bridges)+1)
for _, bridgeImpl := range s.bridges {
go func(b bridge.Bridge) {
deposits, err := b.GetDepositsByBlockRange(s.ctx, startHeight, endHeight)
if err != nil {
errCh <- err
return
}
bridgeDepositsCh <- deposits
}(bridgeImpl)
}
if s.isBedrock {
go func() {
provenWithdrawals, err := s.portal.GetProvenWithdrawalsByBlockRange(s.ctx, startHeight, endHeight)
if err != nil {
errCh <- err
return
}
provenWithdrawalsCh <- provenWithdrawals
}()
go func() {
finalizedWithdrawals, err := s.portal.GetFinalizedWithdrawalsByBlockRange(s.ctx, startHeight, endHeight)
if err != nil {
errCh <- err
return
}
finalizedWithdrawalsCh <- finalizedWithdrawals
}()
} else {
provenWithdrawalsCh <- make(bridge.ProvenWithdrawalsMap)
finalizedWithdrawalsCh <- make(bridge.FinalizedWithdrawalsMap)
}
var receives int
for {
select {
case bridgeDeposits := <-bridgeDepositsCh:
for blockHash, deposits := range bridgeDeposits {
for _, deposit := range deposits {
if err := s.cacheToken(deposit); err != nil {
logger.Warn("error caching token", "err", err)
}
}
depositsByBlockHash[blockHash] = append(depositsByBlockHash[blockHash], deposits...)
}
case err := <-errCh:
return err
}
receives++
if receives == len(s.bridges) {
break
}
}
provenWithdrawalsByBlockHash := <-provenWithdrawalsCh
finalizedWithdrawalsByBlockHash := <-finalizedWithdrawalsCh
var stateBatches map[common.Hash][]db.StateBatch
if !s.isBedrock {
stateBatches, err = QueryStateBatches(s.batchScanner, startHeight, endHeight, s.ctx)
if err != nil {
logger.Error("Error querying state batches", "err", err)
return err
}
}
for i, header := range headers {
blockHash := header.Hash
number := header.Number.Uint64()
deposits := depositsByBlockHash[blockHash]
batches := stateBatches[blockHash]
provenWds := provenWithdrawalsByBlockHash[blockHash]
finalizedWds := finalizedWithdrawalsByBlockHash[blockHash]
// Always record block data in the last block
// in the list of headers
if len(deposits) == 0 && len(batches) == 0 && len(provenWds) == 0 && len(finalizedWds) == 0 && i != len(headers)-1 {
continue
}
block := &db.IndexedL1Block{
Hash: blockHash,
ParentHash: header.ParentHash,
Number: number,
Timestamp: header.Time,
Deposits: deposits,
ProvenWithdrawals: provenWds,
FinalizedWithdrawals: finalizedWds,
}
err := s.cfg.DB.AddIndexedL1Block(block)
if err != nil {
logger.Error(
"Unable to import ",
"block", number,
"hash", blockHash, "err", err,
"block", block,
)
return err
}
err = s.cfg.DB.AddStateBatch(batches)
if err != nil {
logger.Error(
"Unable to import state append batch",
"block", number,
"hash", blockHash, "err", err,
"block", block,
)
return err
}
s.metrics.RecordStateBatches(len(batches))
logger.Debug("Imported ",
"block", number, "hash", blockHash, "deposits", len(block.Deposits))
for _, deposit := range block.Deposits {
token := s.tokenCache[deposit.L1Token]
logger.Info(
"indexed deposit",
"tx_hash", deposit.TxHash,
"symbol", token.Symbol,
"amount", deposit.Amount,
)
s.metrics.RecordDeposit(deposit.L1Token)
}
}
newHeaderNumber := newHeader.Number.Uint64()
s.metrics.SetL1SyncHeight(endHeight)
s.metrics.SetL1SyncPercent(endHeight, newHeaderNumber)
latestHeaderNumber := headers[len(headers)-1].Number.Uint64()
if latestHeaderNumber+s.cfg.ConfDepth-1 == newHeaderNumber {
return errNoNewBlocks
}
return nil
}
func (s *Service) GetIndexerStatus(w http.ResponseWriter, r *http.Request) {
highestBlock, err := s.cfg.DB.GetHighestL1Block()
if err != nil {
server.RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
var synced float64
if s.latestHeader != 0 {
synced = float64(highestBlock.Number) / float64(s.latestHeader)
}
status := &IndexerStatus{
Synced: synced,
Highest: *highestBlock,
}
server.RespondWithJSON(w, http.StatusOK, status)
}
func (s *Service) GetDeposits(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
limitStr := r.URL.Query().Get("limit")
limit, err := strconv.ParseUint(limitStr, 10, 64)
if err != nil && limitStr != "" {
server.RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
if limit == 0 {
limit = 10
}
offsetStr := r.URL.Query().Get("offset")
offset, err := strconv.ParseUint(offsetStr, 10, 64)
if err != nil && offsetStr != "" {
server.RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
page := db.PaginationParam{
Limit: limit,
Offset: offset,
}
deposits, err := s.cfg.DB.GetDepositsByAddress(common.HexToAddress(vars["address"]), page)
if err != nil {
server.RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
server.RespondWithJSON(w, http.StatusOK, deposits)
}
func (s *Service) catchUp() error {
realHead, err := query.HeaderByNumberWithRetry(s.ctx, s.cfg.L1Client)
if err != nil {
return err
}
realHeadNum := realHead.Number.Uint64()
currHead, err := s.cfg.DB.GetHighestL1Block()
if err != nil {
return err
}
var currHeadNum uint64
if currHead != nil {
currHeadNum = currHead.Number
}
if realHeadNum-s.cfg.ConfDepth <= currHeadNum+s.cfg.MaxHeaderBatchSize {
return nil
}
logger.Info("chain is far behind head, resyncing")
s.metrics.SetL1CatchingUp(true)
for realHeadNum-s.cfg.ConfDepth > currHeadNum+s.cfg.MaxHeaderBatchSize {
select {
case <-s.ctx.Done():
return s.ctx.Err()
default:
if err := s.Update(realHead); err != nil && err != errNoNewBlocks {
return err
}
currHead, err := s.cfg.DB.GetHighestL1Block()
if err != nil {
return err
}
currHeadNum = currHead.Number
}
}
logger.Info("indexer is close enough to tip, starting regular loop")
s.metrics.SetL1CatchingUp(false)
return nil
}
func (s *Service) cacheToken(deposit db.Deposit) error {
if s.tokenCache[deposit.L1Token] != nil {
return nil
}
token, err := s.cfg.DB.GetL1TokenByAddress(deposit.L1Token.String())
if err != nil {
return err
}
if token != nil {
s.metrics.IncL1CachedTokensCount()
s.tokenCache[deposit.L1Token] = token
return nil
}
token, err = query.NewERC20(deposit.L1Token, s.cfg.L1Client)
if err != nil {
logger.Error("Error querying ERC20 token details",
"l1_token", deposit.L1Token.String(), "err", err)
token = &db.Token{
Address: deposit.L1Token.String(),
}
}
if err := s.cfg.DB.AddL1Token(deposit.L1Token.String(), token); err != nil {
return err
}
s.tokenCache[deposit.L1Token] = token
s.metrics.IncL1CachedTokensCount()
return nil
}
func (s *Service) Start() error {
if s.cfg.ChainID == nil {
return errNoChainID
}
go s.loop()
return nil
}
func (s *Service) Stop() {
s.cancel()
s.wg.Wait()
}
package bridge
import (
"context"
"errors"
"math/big"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/common"
)
type WithdrawalsMap map[common.Hash][]db.Withdrawal
type Bridge interface {
Address() common.Address
GetWithdrawalsByBlockRange(context.Context, uint64, uint64) (WithdrawalsMap, error)
String() string
}
type implConfig struct {
name string
impl string
addr common.Address
}
var defaultBridgeCfgs = []*implConfig{
{"Standard", "StandardBridge", predeploys.L2StandardBridgeAddr},
}
var customBridgeCfgs = map[uint64][]*implConfig{
// Mainnet
10: {
{"BitBTC", StandardBridgeImpl, common.HexToAddress("0x158F513096923fF2d3aab2BcF4478536de6725e2")},
{"DAI", StandardBridgeImpl, common.HexToAddress("0x467194771dAe2967Aef3ECbEDD3Bf9a310C76C65")},
{"wstETH", StandardBridgeImpl, common.HexToAddress("0x8E01013243a96601a86eb3153F0d9Fa4fbFb6957")},
},
}
func BridgesByChainID(chainID *big.Int, client *ethclient.Client, isBedrock bool) (map[string]Bridge, error) {
allCfgs := make([]*implConfig, 0)
allCfgs = append(allCfgs, defaultBridgeCfgs...)
allCfgs = append(allCfgs, customBridgeCfgs[chainID.Uint64()]...)
var l2L1MP *bindings.L2ToL1MessagePasser
var err error
if isBedrock {
l2L1MP, err = bindings.NewL2ToL1MessagePasser(predeploys.L2ToL1MessagePasserAddr, client)
if err != nil {
return nil, err
}
}
bridges := make(map[string]Bridge)
for _, bridge := range allCfgs {
switch bridge.impl {
case "StandardBridge":
l2SB, err := bindings.NewL2StandardBridge(bridge.addr, client)
if err != nil {
return nil, err
}
bridges[bridge.name] = &StandardBridge{
name: bridge.name,
address: bridge.addr,
client: client,
l2SB: l2SB,
l2L1MP: l2L1MP,
isBedrock: isBedrock,
}
default:
return nil, errors.New("unsupported bridge")
}
}
return bridges, nil
}
package bridge
import "time"
const (
DefaultConnectionTimeout = 60 * time.Second
L2StandardBridgeAddr = "0x4200000000000000000000000000000000000010"
StandardBridgeImpl = "StandardBridge"
)
package bridge
import "github.com/ethereum/go-ethereum/log"
var logger = log.New("service", "l2-bridge")
package bridge
import (
"context"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-node/withdrawals"
"github.com/ethereum-optimism/optimism/op-service/backoff"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)
type StandardBridge struct {
name string
address common.Address
client *ethclient.Client
l2SB *bindings.L2StandardBridge
l2L1MP *bindings.L2ToL1MessagePasser
isBedrock bool
}
func (s *StandardBridge) Address() common.Address {
return s.address
}
func (s *StandardBridge) GetWithdrawalsByBlockRange(ctx context.Context, start, end uint64) (WithdrawalsMap, error) {
withdrawalsByBlockhash := make(map[common.Hash][]db.Withdrawal)
opts := &bind.FilterOpts{
Context: ctx,
Start: start,
End: &end,
}
iter, err := backoff.Do(ctx, 3, backoff.Exponential(), func() (*bindings.L2StandardBridgeWithdrawalInitiatedIterator, error) {
return s.l2SB.FilterWithdrawalInitiated(opts, nil, nil, nil)
})
if err != nil {
return nil, err
}
receipts := make(map[common.Hash]*types.Receipt)
defer iter.Close()
for iter.Next() {
ev := iter.Event
if s.isBedrock {
receipt := receipts[ev.Raw.TxHash]
if receipt == nil {
receipt, err = s.client.TransactionReceipt(ctx, ev.Raw.TxHash)
if err != nil {
return nil, err
}
receipts[ev.Raw.TxHash] = receipt
}
var withdrawalInitiated *bindings.L2ToL1MessagePasserMessagePassed
for _, eLog := range receipt.Logs {
if len(eLog.Topics) == 0 || eLog.Topics[0] != withdrawals.MessagePassedTopic {
continue
}
if withdrawalInitiated != nil {
logger.Warn("detected multiple withdrawal initiated events! ignoring", "tx_hash", ev.Raw.TxHash)
continue
}
withdrawalInitiated, err = s.l2L1MP.ParseMessagePassed(*eLog)
if err != nil {
return nil, err
}
}
hash, err := withdrawals.WithdrawalHash(withdrawalInitiated)
if err != nil {
return nil, err
}
withdrawalsByBlockhash[ev.Raw.BlockHash] = append(
withdrawalsByBlockhash[ev.Raw.BlockHash], db.Withdrawal{
TxHash: ev.Raw.TxHash,
L1Token: ev.L1Token,
L2Token: ev.L2Token,
FromAddress: ev.From,
ToAddress: ev.To,
Amount: ev.Amount,
Data: ev.ExtraData,
LogIndex: ev.Raw.Index,
BedrockHash: &hash,
},
)
} else {
withdrawalsByBlockhash[ev.Raw.BlockHash] = append(
withdrawalsByBlockhash[ev.Raw.BlockHash], db.Withdrawal{
TxHash: ev.Raw.TxHash,
L1Token: ev.L1Token,
L2Token: ev.L2Token,
FromAddress: ev.From,
ToAddress: ev.To,
Amount: ev.Amount,
Data: ev.ExtraData,
LogIndex: ev.Raw.Index,
},
)
}
}
return withdrawalsByBlockhash, iter.Error()
}
func (s *StandardBridge) String() string {
return s.name
}
package l2
import (
"context"
"errors"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/indexer/services/util"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
const (
DefaultConnectionTimeout = 20 * time.Second
DefaultConfDepth uint64 = 20
DefaultMaxBatchSize = 50
)
type HeaderSelectorConfig struct {
ConfDepth uint64
MaxBatchSize uint64
}
type ConfirmedHeaderSelector struct {
cfg HeaderSelectorConfig
}
func HeadersByRange(ctx context.Context, client *rpc.Client, startHeight uint64, count int) ([]*types.Header, error) {
height := startHeight
batchElems := make([]rpc.BatchElem, count)
for i := 0; i < count; i++ {
batchElems[i] = rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{
util.ToBlockNumArg(new(big.Int).SetUint64(height + uint64(i))),
false,
},
Result: new(types.Header),
Error: nil,
}
}
if err := client.BatchCallContext(ctx, batchElems); err != nil {
return nil, err
}
out := make([]*types.Header, count)
for i := 0; i < len(batchElems); i++ {
if batchElems[i].Error != nil {
return nil, batchElems[i].Error
}
out[i] = batchElems[i].Result.(*types.Header)
}
return out, nil
}
func (f *ConfirmedHeaderSelector) NewHead(
ctx context.Context,
lowest uint64,
header *types.Header,
client *rpc.Client,
) ([]*types.Header, error) {
number := header.Number.Uint64()
blockHash := header.Hash()
logger.Info("New block", "block", number, "hash", blockHash)
if number < f.cfg.ConfDepth {
return nil, nil
}
endHeight := number - f.cfg.ConfDepth + 1
minNextHeight := lowest + f.cfg.ConfDepth
if minNextHeight > number {
log.Info("Fork block=%d hash=%s", number, blockHash)
return nil, nil
}
startHeight := lowest + 1
// Clamp to max batch size
if startHeight+f.cfg.MaxBatchSize < endHeight+1 {
endHeight = startHeight + f.cfg.MaxBatchSize - 1
}
nHeaders := int(endHeight - startHeight + 1)
if nHeaders > 1 {
logger.Info("Loading blocks",
"startHeight", startHeight, "endHeight", endHeight)
}
headers := make([]*types.Header, 0)
height := startHeight
left := nHeaders - len(headers)
for left > 0 {
count := DefaultMaxBatchSize
if count > left {
count = left
}
logger.Info("Loading block batch",
"height", height, "count", count)
ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout)
fetched, err := HeadersByRange(ctxt, client, height, count)
cancel()
if err != nil {
return nil, err
}
headers = append(headers, fetched...)
left = nHeaders - len(headers)
height += uint64(count)
}
logger.Debug("Verifying block range ",
"startHeight", startHeight, "endHeight", endHeight)
for i, header := range headers {
// Trim the returned headers if any of the lookups failed.
if header == nil {
headers = headers[:i]
break
}
// Assert that each header builds on the parent before it, trim if there
// are any discontinuities.
if i > 0 {
prevHeader := headers[i-1]
if prevHeader.Hash() != header.ParentHash {
log.Error("Parent hash does not connect to ",
"block", header.Number.Uint64(), "hash", header.Hash(),
"prev", prevHeader.Number.Uint64(), "hash", prevHeader.Hash())
headers = headers[:i]
break
}
}
log.Debug("Confirmed block ",
"block", header.Number.Uint64(), "hash", header.Hash())
}
return headers, nil
}
func NewConfirmedHeaderSelector(cfg HeaderSelectorConfig) (*ConfirmedHeaderSelector, error) {
if cfg.ConfDepth == 0 {
return nil, errors.New("ConfDepth must be greater than zero")
}
if cfg.MaxBatchSize == 0 {
return nil, errors.New("MaxBatchSize must be greater than zero")
}
return &ConfirmedHeaderSelector{
cfg: cfg,
}, nil
}
package l2
import (
"context"
"errors"
"fmt"
"math/big"
"net/http"
"strconv"
"sync"
"time"
"github.com/ethereum-optimism/optimism/indexer/metrics"
"github.com/ethereum-optimism/optimism/indexer/server"
"github.com/ethereum-optimism/optimism/indexer/services/query"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/indexer/services/l2/bridge"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/gorilla/mux"
)
var logger = log.New("service", "l2")
// errNoChainID represents the error when the chain id is not provided
// and it cannot be remotely fetched
var errNoChainID = errors.New("no chain id provided")
// errWrongChainID represents the error when the configured chain id is not
// correct
var errWrongChainID = errors.New("wrong chain id provided")
var errNoNewBlocks = errors.New("no new blocks")
type ServiceConfig struct {
Context context.Context
Metrics *metrics.Metrics
L2RPC *rpc.Client
L2Client *ethclient.Client
ChainID *big.Int
ConfDepth uint64
MaxHeaderBatchSize uint64
StartBlockNumber uint64
DB *db.Database
Bedrock bool
}
type Service struct {
cfg ServiceConfig
ctx context.Context
cancel func()
bridges map[string]bridge.Bridge
latestHeader uint64
headerSelector *ConfirmedHeaderSelector
metrics *metrics.Metrics
tokenCache map[common.Address]*db.Token
wg sync.WaitGroup
}
type IndexerStatus struct {
Synced float64 `json:"synced"`
Highest db.BlockLocator `json:"highest_block"`
}
func NewService(cfg ServiceConfig) (*Service, error) {
ctx, cancel := context.WithCancel(cfg.Context)
// Handle restart logic
logger.Info("Creating L2 Indexer")
chainID, err := cfg.L2Client.ChainID(context.Background())
if err != nil {
cancel()
return nil, err
}
if cfg.ChainID != nil {
if cfg.ChainID.Cmp(chainID) != 0 {
cancel()
return nil, fmt.Errorf("%w: configured with %d and got %d",
errWrongChainID, cfg.ChainID, chainID)
}
} else {
cfg.ChainID = chainID
}
bridges, err := bridge.BridgesByChainID(cfg.ChainID, cfg.L2Client, cfg.Bedrock)
if err != nil {
cancel()
return nil, err
}
logger.Info("Scanning bridges for withdrawals", "bridges", bridges)
confirmedHeaderSelector, err := NewConfirmedHeaderSelector(HeaderSelectorConfig{
ConfDepth: cfg.ConfDepth,
MaxBatchSize: cfg.MaxHeaderBatchSize,
})
if err != nil {
cancel()
return nil, err
}
service := &Service{
cfg: cfg,
ctx: ctx,
cancel: cancel,
bridges: bridges,
headerSelector: confirmedHeaderSelector,
metrics: cfg.Metrics,
tokenCache: map[common.Address]*db.Token{
predeploys.LegacyERC20ETHAddr: db.ETHL1Token,
},
}
service.wg.Add(1)
return service, nil
}
func (s *Service) loop() {
defer s.wg.Done()
for {
err := s.catchUp()
if err == nil {
break
}
if err == context.Canceled {
return
}
logger.Error("error catching up to tip, trying again in a bit", "err", err)
time.Sleep(10 * time.Second)
continue
}
newHeads := make(chan *types.Header, 1000)
tick := time.NewTicker(5 * time.Second)
defer tick.Stop()
for {
select {
case <-tick.C:
header, err := query.HeaderByNumberWithRetry(s.ctx, s.cfg.L2Client)
if err != nil {
logger.Error("error fetching header by number", "err", err)
continue
}
newHeads <- header
case header := <-newHeads:
logger.Info("Received new header", "header", header.Hash)
for {
err := s.Update(header)
if err != nil {
if err != errNoNewBlocks {
logger.Error("Unable to update indexer ", "err", err)
}
break
}
}
case <-s.ctx.Done():
logger.Info("service stopped")
return
}
}
}
func (s *Service) Update(newHeader *types.Header) error {
var lowest = db.BlockLocator{
Number: s.cfg.StartBlockNumber,
}
highestConfirmed, err := s.cfg.DB.GetHighestL2Block()
if err != nil {
return err
}
if highestConfirmed != nil {
lowest = *highestConfirmed
}
headers, err := s.headerSelector.NewHead(s.ctx, lowest.Number, newHeader, s.cfg.L2RPC)
if err != nil {
return err
}
if len(headers) == 0 {
return errNoNewBlocks
}
if lowest.Number+1 != headers[0].Number.Uint64() {
logger.Error("Block number does not immediately follow ",
"block", headers[0].Number.Uint64(), "hash", headers[0].Hash(),
"lowest_block", lowest.Number, "hash", lowest.Hash)
return nil
}
if lowest.Number > 0 && lowest.Hash != headers[0].ParentHash {
logger.Error("Parent hash does not connect to ",
"block", headers[0].Number.Uint64(), "hash", headers[0].Hash(),
"lowest_block", lowest.Number, "hash", lowest.Hash)
return nil
}
startHeight := headers[0].Number.Uint64()
endHeight := headers[len(headers)-1].Number.Uint64()
withdrawalsByBlockHash := make(map[common.Hash][]db.Withdrawal)
start := prometheus.NewTimer(s.metrics.UpdateDuration.WithLabelValues("l2"))
defer func() {
dur := start.ObserveDuration()
logger.Info("updated index", "start_height", startHeight, "end_height", endHeight, "duration", dur)
}()
bridgeWdsCh := make(chan bridge.WithdrawalsMap)
errCh := make(chan error, len(s.bridges))
for _, bridgeImpl := range s.bridges {
go func(b bridge.Bridge) {
wds, err := b.GetWithdrawalsByBlockRange(s.ctx, startHeight, endHeight)
if err != nil {
errCh <- err
return
}
bridgeWdsCh <- wds
}(bridgeImpl)
}
var receives int
for {
select {
case bridgeWds := <-bridgeWdsCh:
for blockHash, withdrawals := range bridgeWds {
for _, wd := range withdrawals {
if err := s.cacheToken(wd); err != nil {
logger.Warn("error caching token", "err", err)
}
}
withdrawalsByBlockHash[blockHash] = append(withdrawalsByBlockHash[blockHash], withdrawals...)
}
case err := <-errCh:
return err
}
receives++
if receives == len(s.bridges) {
break
}
}
for i, header := range headers {
blockHash := header.Hash()
number := header.Number.Uint64()
withdrawals := withdrawalsByBlockHash[blockHash]
if len(withdrawals) == 0 && i != len(headers)-1 {
continue
}
block := &db.IndexedL2Block{
Hash: blockHash,
ParentHash: header.ParentHash,
Number: number,
Timestamp: header.Time,
Withdrawals: withdrawals,
}
err := s.cfg.DB.AddIndexedL2Block(block)
if err != nil {
logger.Error(
"Unable to import ",
"block", number,
"hash", blockHash,
"err", err,
"block", block,
)
return err
}
logger.Debug("Imported ",
"block", number, "hash", blockHash, "withdrawals", len(block.Withdrawals))
for _, withdrawal := range block.Withdrawals {
token := s.tokenCache[withdrawal.L2Token]
logger.Info(
"indexed withdrawal ",
"tx_hash", withdrawal.TxHash,
"symbol", token.Symbol,
"amount", withdrawal.Amount,
)
s.metrics.RecordWithdrawal(withdrawal.L2Token)
}
}
newHeaderNumber := newHeader.Number.Uint64()
s.metrics.SetL2SyncHeight(endHeight)
s.metrics.SetL2SyncPercent(endHeight, newHeaderNumber)
latestHeaderNumber := headers[len(headers)-1].Number.Uint64()
if latestHeaderNumber+s.cfg.ConfDepth-1 == newHeaderNumber {
return errNoNewBlocks
}
return nil
}
func (s *Service) GetIndexerStatus(w http.ResponseWriter, r *http.Request) {
highestBlock, err := s.cfg.DB.GetHighestL2Block()
if err != nil {
server.RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
var synced float64
if s.latestHeader != 0 {
synced = float64(highestBlock.Number) / float64(s.latestHeader)
}
status := &IndexerStatus{
Synced: synced,
Highest: *highestBlock,
}
server.RespondWithJSON(w, http.StatusOK, status)
}
func (s *Service) GetWithdrawalBatch(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
hash := vars["hash"]
if hash == "" {
server.RespondWithError(w, http.StatusBadRequest, "must specify a hash")
return
}
batch, err := s.cfg.DB.GetWithdrawalBatch(common.HexToHash(vars["hash"]))
if err != nil {
server.RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
server.RespondWithJSON(w, http.StatusOK, batch)
}
func (s *Service) GetWithdrawals(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
limitStr := r.URL.Query().Get("limit")
limit, err := strconv.ParseUint(limitStr, 10, 64)
if err != nil && limitStr != "" {
server.RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
if limit == 0 {
limit = 10
}
offsetStr := r.URL.Query().Get("offset")
offset, err := strconv.ParseUint(offsetStr, 10, 64)
if err != nil && offsetStr != "" {
server.RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
finalizationState := db.ParseFinalizationState(r.URL.Query().Get("finalized"))
page := db.PaginationParam{
Limit: limit,
Offset: offset,
}
withdrawals, err := s.cfg.DB.GetWithdrawalsByAddress(common.HexToAddress(vars["address"]), page, finalizationState)
if err != nil {
server.RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
server.RespondWithJSON(w, http.StatusOK, withdrawals)
}
func (s *Service) catchUp() error {
realHead, err := query.HeaderByNumberWithRetry(s.ctx, s.cfg.L2Client)
if err != nil {
return err
}
realHeadNum := realHead.Number.Uint64()
currHead, err := s.cfg.DB.GetHighestL2Block()
if err != nil {
return err
}
var currHeadNum uint64
if currHead != nil {
currHeadNum = currHead.Number
}
if realHeadNum-s.cfg.ConfDepth <= currHeadNum+s.cfg.MaxHeaderBatchSize {
return nil
}
logger.Info("chain is far behind head, resyncing")
s.metrics.SetL2CatchingUp(true)
for realHeadNum-s.cfg.ConfDepth > currHeadNum+s.cfg.MaxHeaderBatchSize {
select {
case <-s.ctx.Done():
return s.ctx.Err()
default:
if err := s.Update(realHead); err != nil && err != errNoNewBlocks {
return err
}
currHead, err := s.cfg.DB.GetHighestL2Block()
if err != nil {
return err
}
currHeadNum = currHead.Number
}
}
logger.Info("indexer is close enough to tip, starting regular loop")
s.metrics.SetL2CatchingUp(false)
return nil
}
func (s *Service) cacheToken(withdrawal db.Withdrawal) error {
if s.tokenCache[withdrawal.L2Token] != nil {
return nil
}
token, err := s.cfg.DB.GetL2TokenByAddress(withdrawal.L2Token.String())
if err != nil {
return err
}
if token != nil {
s.metrics.IncL2CachedTokensCount()
s.tokenCache[withdrawal.L2Token] = token
return nil
}
token, err = query.NewERC20(withdrawal.L2Token, s.cfg.L2Client)
if err != nil {
logger.Error("Error querying ERC20 token details",
"l2_token", withdrawal.L2Token.String(), "err", err)
token = &db.Token{
Address: withdrawal.L2Token.String(),
}
}
if err := s.cfg.DB.AddL2Token(withdrawal.L2Token.String(), token); err != nil {
return err
}
s.tokenCache[withdrawal.L2Token] = token
s.metrics.IncL2CachedTokensCount()
return nil
}
func (s *Service) Start() error {
if s.cfg.ChainID == nil {
return errNoChainID
}
go s.loop()
return nil
}
func (s *Service) Stop() {
s.cancel()
s.wg.Wait()
}
package query
import (
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
)
func NewERC20(address common.Address, client *ethclient.Client) (*db.Token, error) {
contract, err := bindings.NewERC20(address, client)
if err != nil {
return nil, err
}
name, err := contract.Name(&bind.CallOpts{})
if err != nil {
return nil, err
}
symbol, err := contract.Symbol(&bind.CallOpts{})
if err != nil {
return nil, err
}
decimals, err := contract.Decimals(&bind.CallOpts{})
if err != nil {
return nil, err
}
return &db.Token{
Name: name,
Symbol: symbol,
Decimals: decimals,
}, nil
}
package query
import (
"context"
"github.com/ethereum-optimism/optimism/op-service/backoff"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)
// HeaderByNumberWithRetry retries getting headers.
func HeaderByNumberWithRetry(ctx context.Context, client *ethclient.Client) (*types.Header, error) {
return backoff.Do(ctx, 3, backoff.Exponential(), func() (*types.Header, error) {
return client.HeaderByNumber(ctx, nil)
})
}
package util
import (
"math/big"
"github.com/ethereum/go-ethereum/common/hexutil"
)
func ToBlockNumArg(number *big.Int) string {
if number == nil {
return "latest"
}
pending := big.NewInt(-1)
if number.Cmp(pending) == 0 {
return "pending"
}
return hexutil.EncodeBig(number)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment