Commit 475b2bfa authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

indexer: Upgrade L1 services (#3667)

parent dc6d4ca3
......@@ -22,7 +22,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
sentry "github.com/getsentry/sentry-go"
"github.com/getsentry/sentry-go"
"github.com/gorilla/mux"
"github.com/urfave/cli"
)
......@@ -169,7 +169,6 @@ func NewIndexer(cfg Config, gitVersion string) (*Indexer, error) {
ConfDepth: cfg.ConfDepth,
MaxHeaderBatchSize: cfg.MaxHeaderBatchSize,
StartBlockNumber: cfg.StartBlockNumber,
StartBlockHash: cfg.StartBlockHash,
})
if err != nil {
return nil, err
......
......@@ -21,6 +21,8 @@ type Metrics struct {
WithdrawalsCount *prometheus.CounterVec
StateBatchesCount prometheus.Counter
L1CatchingUp prometheus.Gauge
L2CatchingUp prometheus.Gauge
......@@ -72,6 +74,12 @@ func NewMetrics(monitoredTokens map[string]string) *Metrics {
"symbol",
}),
StateBatchesCount: promauto.NewCounter(prometheus.CounterOpts{
Name: "state_batches_count",
Help: "The number of state batches indexed.",
Namespace: metricsNamespace,
}),
L1CatchingUp: promauto.NewGauge(prometheus.GaugeOpts{
Name: "l1_catching_up",
Help: "Whether or not L1 is far behind the chain tip.",
......@@ -160,6 +168,10 @@ func (m *Metrics) RecordWithdrawal(addr common.Address) {
m.WithdrawalsCount.WithLabelValues(sym).Inc()
}
func (m *Metrics) RecordStateBatches(count int) {
m.StateBatchesCount.Add(float64(count))
}
func (m *Metrics) SetL1CatchingUp(state bool) {
var catchingUp float64
if state {
......
......@@ -5,10 +5,10 @@ import (
"errors"
"math/big"
"github.com/ethereum-optimism/optimism/indexer/bindings/legacy/scc"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/indexer/services"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)
......@@ -19,7 +19,7 @@ type DepositsMap map[common.Hash][]db.Deposit
// WithdrawalsMap is a collection of withdrawal objects keyed
// on block hashes.
type WithdrawalsMap map[common.Hash][]db.Withdrawal
type InitiatedWithdrawalMap map[common.Hash][]db.Withdrawal
// FinalizedWithdrawalsMap is a collection of finalized withdrawal
// objected keyed on block hashes.
......@@ -27,80 +27,63 @@ type FinalizedWithdrawalsMap map[common.Hash][]db.FinalizedWithdrawal
type Bridge interface {
Address() common.Address
GetDepositsByBlockRange(uint64, uint64) (DepositsMap, error)
GetWithdrawalsByBlockRange(uint64, uint64) (WithdrawalsMap, error)
GetDepositsByBlockRange(context.Context, uint64, uint64) (DepositsMap, error)
String() string
}
type implConfig struct {
name string
impl string
addr string
}
var defaultBridgeCfgs = map[uint64][]*implConfig{
// Devnet
900: {
{"Standard", "StandardBridge", predeploys.DevL1StandardBridge},
{"ETH", "ETHBridge", predeploys.DevL1StandardBridge},
},
// Goerli
5: {
{"Standard", "StandardBridge", "0xFf94B6C486350aD92561Ba09bad3a59df764Da92"},
{"ETH", "ETHBridge", "0xFf94B6C486350aD92561Ba09bad3a59df764Da92"},
},
addr common.Address
}
var customBridgeCfgs = map[uint64][]*implConfig{
// Mainnet
1: {
{"BitBTC", "StandardBridge", "0xaBA2c5F108F7E820C049D5Af70B16ac266c8f128"},
{"DAI", "StandardBridge", "0x10E6593CDda8c58a1d0f14C5164B376352a55f2F"},
{"BitBTC", "StandardBridge", common.HexToAddress("0xaBA2c5F108F7E820C049D5Af70B16ac266c8f128")},
{"DAI", "StandardBridge", common.HexToAddress("0x10E6593CDda8c58a1d0f14C5164B376352a55f2F")},
{"wstETH", "StandardBridge", common.HexToAddress("0x76943C0D61395d8F2edF9060e1533529cAe05dE6")},
},
// Kovan
42: {
{"BitBTC", "StandardBridge", "0x0b651A42F32069d62d5ECf4f2a7e5Bd3E9438746"},
{"USX", "StandardBridge", "0x40E862341b2416345F02c41Ac70df08525150dC7"},
{"DAI", "StandardBridge", "0xb415e822C4983ecD6B1c1596e8a5f976cf6CD9e3"},
{"BitBTC", "StandardBridge", common.HexToAddress("0x0b651A42F32069d62d5ECf4f2a7e5Bd3E9438746")},
{"USX", "StandardBridge", common.HexToAddress("0x40E862341b2416345F02c41Ac70df08525150dC7")},
{"DAI", "StandardBridge", common.HexToAddress("0xb415e822C4983ecD6B1c1596e8a5f976cf6CD9e3")},
{"wstETH", "StandardBridge", common.HexToAddress("0x65321bf24210b81500230dCEce14Faa70a9f50a7")},
},
}
func BridgesByChainID(chainID *big.Int, client bind.ContractBackend, ctx context.Context) (map[string]Bridge, error) {
allCfgs := make([]*implConfig, 0)
allCfgs = append(allCfgs, defaultBridgeCfgs[chainID.Uint64()]...)
func BridgesByChainID(chainID *big.Int, client bind.ContractBackend, addrs services.AddressManager) (map[string]Bridge, error) {
l1SBAddr, _ := addrs.L1StandardBridge()
allCfgs := []*implConfig{
{"Standard", "StandardBridge", l1SBAddr},
{"ETH", "ETHBridge", l1SBAddr},
}
allCfgs = append(allCfgs, customBridgeCfgs[chainID.Uint64()]...)
bridges := make(map[string]Bridge)
for _, bridge := range allCfgs {
switch bridge.impl {
case "StandardBridge":
l1StandardBridgeAddress := common.HexToAddress(bridge.addr)
l1StandardBridgeFilter, err := bindings.NewL1StandardBridgeFilterer(l1StandardBridgeAddress, client)
l1SB, err := bindings.NewL1StandardBridge(bridge.addr, client)
if err != nil {
return nil, err
}
standardBridge := &StandardBridge{
name: bridge.name,
ctx: ctx,
address: l1StandardBridgeAddress,
client: client,
filterer: l1StandardBridgeFilter,
address: bridge.addr,
contract: l1SB,
}
bridges[bridge.name] = standardBridge
case "ETHBridge":
l1StandardBridgeAddress := common.HexToAddress(bridge.addr)
l1EthBridgeFilter, err := bindings.NewL1StandardBridgeFilterer(l1StandardBridgeAddress, client)
l1SB, err := bindings.NewL1StandardBridge(bridge.addr, client)
if err != nil {
return nil, err
}
ethBridge := &EthBridge{
name: bridge.name,
ctx: ctx,
address: l1StandardBridgeAddress,
client: client,
filterer: l1EthBridgeFilter,
address: bridge.addr,
contract: l1SB,
}
bridges[bridge.name] = ethBridge
default:
......@@ -109,3 +92,12 @@ func BridgesByChainID(chainID *big.Int, client bind.ContractBackend, ctx context
}
return bridges, nil
}
func StateCommitmentChainScanner(client bind.ContractFilterer, addrs services.AddressManager) (*scc.StateCommitmentChainFilterer, error) {
sccAddr, _ := addrs.StateCommitmentChain()
filter, err := scc.NewStateCommitmentChainFilterer(sccAddr, client)
if err != nil {
return nil, err
}
return filter, nil
}
package bridge
import (
"context"
"errors"
"math/big"
"github.com/ethereum-optimism/optimism/indexer/bindings/legacy/scc"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/indexer/services"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)
// DepositsMap is a collection of deposit objects keyed
// on block hashes.
type DepositsMap map[common.Hash][]db.Deposit
<<<<<<< HEAD
// WithdrawalsMap is a collection of withdrawal objects keyed
// on block hashes.
type WithdrawalsMap map[common.Hash][]db.Withdrawal
// FinalizedWithdrawalsMap is a collection of finalized withdrawal
// objected keyed on block hashes.
=======
type InitiatedWithdrawalMap map[common.Hash][]db.Withdrawal
>>>>>>> 22c039efc (indexer: Upgrade L1 services)
type FinalizedWithdrawalsMap map[common.Hash][]db.FinalizedWithdrawal
type Bridge interface {
Address() common.Address
GetDepositsByBlockRange(context.Context, uint64, uint64) (DepositsMap, error)
String() string
}
type implConfig struct {
name string
impl string
addr common.Address
}
var customBridgeCfgs = map[uint64][]*implConfig{
// Mainnet
1: {
{"BitBTC", "StandardBridge", common.HexToAddress("0xaBA2c5F108F7E820C049D5Af70B16ac266c8f128")},
{"DAI", "StandardBridge", common.HexToAddress("0x10E6593CDda8c58a1d0f14C5164B376352a55f2F")},
{"wstETH", "StandardBridge", common.HexToAddress("0x76943C0D61395d8F2edF9060e1533529cAe05dE6")},
},
// Kovan
42: {
{"BitBTC", "StandardBridge", common.HexToAddress("0x0b651A42F32069d62d5ECf4f2a7e5Bd3E9438746")},
{"USX", "StandardBridge", common.HexToAddress("0x40E862341b2416345F02c41Ac70df08525150dC7")},
{"DAI", "StandardBridge", common.HexToAddress("0xb415e822C4983ecD6B1c1596e8a5f976cf6CD9e3")},
{"wstETH", "StandardBridge", common.HexToAddress("0x65321bf24210b81500230dCEce14Faa70a9f50a7")},
},
}
func BridgesByChainID(chainID *big.Int, client bind.ContractBackend, addrs services.AddressManager) (map[string]Bridge, error) {
l1SBAddr, _ := addrs.L1StandardBridge()
allCfgs := []*implConfig{
{"Standard", "StandardBridge", l1SBAddr},
{"ETH", "ETHBridge", l1SBAddr},
}
allCfgs = append(allCfgs, customBridgeCfgs[chainID.Uint64()]...)
bridges := make(map[string]Bridge)
for _, bridge := range allCfgs {
switch bridge.impl {
case "StandardBridge":
l1SB, err := bindings.NewL1StandardBridge(bridge.addr, client)
if err != nil {
return nil, err
}
standardBridge := &StandardBridge{
name: bridge.name,
address: bridge.addr,
contract: l1SB,
}
bridges[bridge.name] = standardBridge
case "ETHBridge":
l1SB, err := bindings.NewL1StandardBridge(bridge.addr, client)
if err != nil {
return nil, err
}
ethBridge := &EthBridge{
name: bridge.name,
address: bridge.addr,
contract: l1SB,
}
bridges[bridge.name] = ethBridge
default:
return nil, errors.New("unsupported bridge")
}
}
return bridges, nil
}
func StateCommitmentChainScanner(client bind.ContractFilterer, addrs services.AddressManager) (*scc.StateCommitmentChainFilterer, error) {
sccAddr, _ := addrs.StateCommitmentChain()
filter, err := scc.NewStateCommitmentChainFilterer(sccAddr, client)
if err != nil {
return nil, err
}
return filter, nil
}
......@@ -5,66 +5,43 @@ import (
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-service/backoff"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)
type EthBridge struct {
name string
ctx context.Context
address common.Address
client bind.ContractFilterer
filterer *bindings.L1StandardBridgeFilterer
contract *bindings.L1StandardBridge
}
func (e *EthBridge) Address() common.Address {
return e.address
}
func (e *EthBridge) GetDepositsByBlockRange(start, end uint64) (DepositsMap, error) {
func (e *EthBridge) GetDepositsByBlockRange(ctx context.Context, start, end uint64) (DepositsMap, error) {
depositsByBlockhash := make(DepositsMap)
iter, err := FilterETHDepositInitiatedWithRetry(e.ctx, e.filterer, &bind.FilterOpts{
opts := &bind.FilterOpts{
Context: ctx,
Start: start,
End: &end,
})
if err != nil {
logger.Error("Error fetching filter", "err", err)
}
for iter.Next() {
depositsByBlockhash[iter.Event.Raw.BlockHash] = append(
depositsByBlockhash[iter.Event.Raw.BlockHash], db.Deposit{
TxHash: iter.Event.Raw.TxHash,
FromAddress: iter.Event.From,
ToAddress: iter.Event.To,
Amount: iter.Event.Amount,
Data: iter.Event.ExtraData,
LogIndex: iter.Event.Raw.Index,
})
}
if err := iter.Error(); err != nil {
return nil, err
}
return depositsByBlockhash, nil
}
func (s *EthBridge) GetWithdrawalsByBlockRange(start, end uint64) (WithdrawalsMap, error) {
withdrawalsByBlockHash := make(WithdrawalsMap)
iter, err := FilterETHWithdrawalFinalizedWithRetry(s.ctx, s.filterer, &bind.FilterOpts{
Start: start,
End: &end,
var iter *bindings.L1StandardBridgeETHDepositInitiatedIterator
err := backoff.Do(3, backoff.Exponential(), func() error {
var err error
iter, err = e.contract.FilterETHDepositInitiated(opts, nil, nil)
return err
})
if err != nil {
logger.Error("Error fetching filter", "err", err)
return nil, err
}
defer iter.Close()
for iter.Next() {
withdrawalsByBlockHash[iter.Event.Raw.BlockHash] = append(
withdrawalsByBlockHash[iter.Event.Raw.BlockHash], db.Withdrawal{
depositsByBlockhash[iter.Event.Raw.BlockHash] = append(
depositsByBlockhash[iter.Event.Raw.BlockHash], db.Deposit{
TxHash: iter.Event.Raw.TxHash,
FromAddress: iter.Event.From,
ToAddress: iter.Event.To,
......@@ -73,11 +50,8 @@ func (s *EthBridge) GetWithdrawalsByBlockRange(start, end uint64) (WithdrawalsMa
LogIndex: iter.Event.Raw.Index,
})
}
if err := iter.Error(); err != nil {
return nil, err
}
return withdrawalsByBlockHash, nil
return depositsByBlockhash, iter.Error()
}
func (e *EthBridge) String() string {
......
......@@ -4,7 +4,7 @@ import (
"context"
"time"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/indexer/bindings/legacy/scc"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
)
......@@ -12,61 +12,13 @@ import (
// calls.
var clientRetryInterval = 5 * time.Second
// FilterETHDepositInitiatedWithRetry retries the given func until it succeeds,
// FilterStateBatchAppendedWithRetry retries the given func until it succeeds,
// waiting for clientRetryInterval duration after every call.
func FilterETHDepositInitiatedWithRetry(ctx context.Context, filterer *bindings.L1StandardBridgeFilterer, opts *bind.FilterOpts) (*bindings.L1StandardBridgeETHDepositInitiatedIterator, error) {
func FilterStateBatchAppendedWithRetry(ctx context.Context, filterer *scc.StateCommitmentChainFilterer, opts *bind.FilterOpts) (*scc.StateCommitmentChainStateBatchAppendedIterator, error) {
for {
ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout)
opts.Context = ctxt
res, err := filterer.FilterETHDepositInitiated(opts, nil, nil)
cancel()
if err == nil {
return res, nil
}
logger.Error("Error fetching filter", "err", err)
time.Sleep(clientRetryInterval)
}
}
// FilterERC20DepositInitiatedWithRetry retries the given func until it succeeds,
// waiting for clientRetryInterval duration after every call.
func FilterERC20DepositInitiatedWithRetry(ctx context.Context, filterer *bindings.L1StandardBridgeFilterer, opts *bind.FilterOpts) (*bindings.L1StandardBridgeERC20DepositInitiatedIterator, error) {
for {
ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout)
opts.Context = ctxt
res, err := filterer.FilterERC20DepositInitiated(opts, nil, nil, nil)
cancel()
if err == nil {
return res, nil
}
logger.Error("Error fetching filter", "err", err)
time.Sleep(clientRetryInterval)
}
}
// FilterETHWithdrawalFinalizedWithRetry retries the given func until it succeeds,
// waiting for clientRetryInterval duration after every call.
func FilterETHWithdrawalFinalizedWithRetry(ctx context.Context, filterer *bindings.L1StandardBridgeFilterer, opts *bind.FilterOpts) (*bindings.L1StandardBridgeETHWithdrawalFinalizedIterator, error) {
for {
ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout)
opts.Context = ctxt
res, err := filterer.FilterETHWithdrawalFinalized(opts, nil, nil)
cancel()
if err == nil {
return res, nil
}
logger.Error("Error fetching filter", "err", err)
time.Sleep(clientRetryInterval)
}
}
// FilterERC20WithdrawalFinalizedWithRetry retries the given func until it succeeds,
// waiting for clientRetryInterval duration after every call.
func FilterERC20WithdrawalFinalizedWithRetry(ctx context.Context, filterer *bindings.L1StandardBridgeFilterer, opts *bind.FilterOpts) (*bindings.L1StandardBridgeERC20WithdrawalFinalizedIterator, error) {
for {
ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout)
opts.Context = ctxt
res, err := filterer.FilterERC20WithdrawalFinalized(opts, nil, nil, nil)
res, err := filterer.FilterStateBatchAppended(opts, nil)
cancel()
if err == nil {
return res, nil
......
......@@ -5,68 +5,43 @@ import (
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-service/backoff"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)
type StandardBridge struct {
name string
ctx context.Context
address common.Address
client bind.ContractFilterer
filterer *bindings.L1StandardBridgeFilterer
contract *bindings.L1StandardBridge
}
func (s *StandardBridge) Address() common.Address {
return s.address
}
func (s *StandardBridge) GetDepositsByBlockRange(start, end uint64) (DepositsMap, error) {
func (s *StandardBridge) GetDepositsByBlockRange(ctx context.Context, start, end uint64) (DepositsMap, error) {
depositsByBlockhash := make(DepositsMap)
iter, err := FilterERC20DepositInitiatedWithRetry(s.ctx, s.filterer, &bind.FilterOpts{
opts := &bind.FilterOpts{
Context: ctx,
Start: start,
End: &end,
})
if err != nil {
logger.Error("Error fetching filter", "err", err)
}
for iter.Next() {
depositsByBlockhash[iter.Event.Raw.BlockHash] = append(
depositsByBlockhash[iter.Event.Raw.BlockHash], db.Deposit{
TxHash: iter.Event.Raw.TxHash,
L1Token: iter.Event.L1Token,
L2Token: iter.Event.L2Token,
FromAddress: iter.Event.From,
ToAddress: iter.Event.To,
Amount: iter.Event.Amount,
Data: iter.Event.ExtraData,
LogIndex: iter.Event.Raw.Index,
})
}
if err := iter.Error(); err != nil {
return nil, err
}
return depositsByBlockhash, nil
}
func (s *StandardBridge) GetWithdrawalsByBlockRange(start, end uint64) (WithdrawalsMap, error) {
withdrawalsByBlockHash := make(WithdrawalsMap)
iter, err := FilterERC20WithdrawalFinalizedWithRetry(s.ctx, s.filterer, &bind.FilterOpts{
Start: start,
End: &end,
var iter *bindings.L1StandardBridgeERC20DepositInitiatedIterator
err := backoff.Do(3, backoff.Exponential(), func() error {
var err error
iter, err = s.contract.FilterERC20DepositInitiated(opts, nil, nil, nil)
return err
})
if err != nil {
logger.Error("Error fetching filter", "err", err)
return nil, err
}
defer iter.Close()
for iter.Next() {
withdrawalsByBlockHash[iter.Event.Raw.BlockHash] = append(
withdrawalsByBlockHash[iter.Event.Raw.BlockHash], db.Withdrawal{
depositsByBlockhash[iter.Event.Raw.BlockHash] = append(
depositsByBlockhash[iter.Event.Raw.BlockHash], db.Deposit{
TxHash: iter.Event.Raw.TxHash,
L1Token: iter.Event.L1Token,
L2Token: iter.Event.L2Token,
......@@ -77,11 +52,8 @@ func (s *StandardBridge) GetWithdrawalsByBlockRange(start, end uint64) (Withdraw
LogIndex: iter.Event.Raw.Index,
})
}
if err := iter.Error(); err != nil {
return nil, err
}
return withdrawalsByBlockHash, nil
return depositsByBlockhash, iter.Error()
}
func (s *StandardBridge) String() string {
......
......@@ -18,7 +18,6 @@ import (
const (
DefaultConnectionTimeout = 30 * time.Second
DefaultConfDepth uint64 = 20
DefaultMaxBatchSize = 100
)
......
package l1
import (
"github.com/ethereum/go-ethereum/ethclient"
"context"
"github.com/ethereum-optimism/optimism/indexer/bindings/legacy/scc"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/indexer/services/l1/bridge"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
)
func QueryERC20(address common.Address, client *ethclient.Client) (*db.Token, error) {
contract, err := bindings.NewERC20(address, client)
if err != nil {
return nil, err
}
name, err := contract.Name(&bind.CallOpts{})
if err != nil {
return nil, err
}
func QueryStateBatches(filterer *scc.StateCommitmentChainFilterer, startHeight, endHeight uint64, ctx context.Context) (map[common.Hash][]db.StateBatch, error) {
batches := make(map[common.Hash][]db.StateBatch)
symbol, err := contract.Symbol(&bind.CallOpts{})
iter, err := bridge.FilterStateBatchAppendedWithRetry(ctx, filterer, &bind.FilterOpts{
Start: startHeight,
End: &endHeight,
})
if err != nil {
return nil, err
}
decimals, err := contract.Decimals(&bind.CallOpts{})
if err != nil {
return nil, err
defer iter.Close()
for iter.Next() {
batches[iter.Event.Raw.BlockHash] = append(
batches[iter.Event.Raw.BlockHash], db.StateBatch{
Index: iter.Event.BatchIndex,
Root: iter.Event.BatchRoot,
Size: iter.Event.BatchSize,
PrevTotal: iter.Event.PrevTotalElements,
ExtraData: iter.Event.ExtraData,
BlockHash: iter.Event.Raw.BlockHash,
})
}
return &db.Token{
Name: name,
Symbol: symbol,
Decimals: decimals,
}, nil
return batches, iter.Error()
}
......@@ -11,7 +11,10 @@ import (
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/indexer/bindings/legacy/scc"
"github.com/ethereum-optimism/optimism/indexer/metrics"
"github.com/ethereum-optimism/optimism/indexer/services"
"github.com/ethereum-optimism/optimism/indexer/services/query"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum-optimism/optimism/indexer/server"
......@@ -36,28 +39,8 @@ var errNoChainID = errors.New("no chain id provided")
var errNoNewBlocks = errors.New("no new blocks")
// clientRetryInterval is the interval to wait between retrying client API
// calls.
var clientRetryInterval = 5 * time.Second
var ZeroAddress common.Address
// HeaderByNumberWithRetry retries the given func until it succeeds, waiting
// for clientRetryInterval duration after every call.
func HeaderByNumberWithRetry(ctx context.Context,
client *ethclient.Client) (*types.Header, error) {
for {
res, err := client.HeaderByNumber(ctx, nil)
switch err {
case nil:
return res, err
default:
log.Error("Error fetching header", "err", err)
}
time.Sleep(clientRetryInterval)
}
}
// Driver is an interface for indexing deposits from l1.
type Driver interface {
// Name is an identifier used to prefix logs for a particular service.
......@@ -70,11 +53,12 @@ type ServiceConfig struct {
L1Client *ethclient.Client
RawL1Client *rpc.Client
ChainID *big.Int
AddressManager services.AddressManager
ConfDepth uint64
MaxHeaderBatchSize uint64
StartBlockNumber uint64
StartBlockHash string
DB *db.Database
Bedrock bool
}
type Service struct {
......@@ -83,11 +67,14 @@ type Service struct {
cancel func()
bridges map[string]bridge.Bridge
portal *bridge.Portal
batchScanner *scc.StateCommitmentChainFilterer
latestHeader uint64
headerSelector *ConfirmedHeaderSelector
metrics *metrics.Metrics
tokenCache map[common.Address]*db.Token
isBedrock bool
wg sync.WaitGroup
}
......@@ -113,11 +100,24 @@ func NewService(cfg ServiceConfig) (*Service, error) {
return nil, fmt.Errorf("chain ID configured with %d but got %d", cfg.ChainID, chainID)
}
bridges, err := bridge.BridgesByChainID(cfg.ChainID, cfg.L1Client, ctx)
bridges, err := bridge.BridgesByChainID(cfg.ChainID, cfg.L1Client, cfg.AddressManager)
if err != nil {
cancel()
return nil, err
}
var portal *bridge.Portal
var batchScanner *scc.StateCommitmentChainFilterer
if cfg.Bedrock {
portal = bridge.NewPortal(cfg.AddressManager)
} else {
batchScanner, err = bridge.StateCommitmentChainScanner(cfg.L1Client, cfg.AddressManager)
if err != nil {
cancel()
return nil, err
}
}
logger.Info("Scanning bridges for deposits", "bridges", bridges)
confirmedHeaderSelector, err := NewConfirmedHeaderSelector(HeaderSelectorConfig{
......@@ -130,22 +130,29 @@ func NewService(cfg ServiceConfig) (*Service, error) {
return nil, err
}
return &Service{
service := &Service{
cfg: cfg,
ctx: ctx,
cancel: cancel,
portal: portal,
bridges: bridges,
batchScanner: batchScanner,
headerSelector: confirmedHeaderSelector,
metrics: cfg.Metrics,
tokenCache: map[common.Address]*db.Token{
ZeroAddress: db.ETHL1Token,
},
}, nil
isBedrock: cfg.Bedrock,
}
service.wg.Add(1)
return service, nil
}
func (s *Service) Loop(ctx context.Context) {
func (s *Service) loop() {
defer s.wg.Done()
for {
err := s.catchUp(ctx)
err := s.catchUp()
if err == nil {
break
}
......@@ -159,10 +166,18 @@ func (s *Service) Loop(ctx context.Context) {
}
newHeads := make(chan *types.Header, 1000)
go s.subscribeNewHeads(ctx, newHeads)
tick := time.NewTicker(5 * time.Second)
defer tick.Stop()
for {
select {
case <-tick.C:
header, err := query.HeaderByNumberWithRetry(s.ctx, s.cfg.L1Client)
if err != nil {
logger.Error("error fetching header by number", "err", err)
continue
}
newHeads <- header
case header := <-newHeads:
if header == nil {
break
......@@ -180,6 +195,7 @@ func (s *Service) Loop(ctx context.Context) {
}
}
case <-s.ctx.Done():
logger.Info("service stopped")
return
}
}
......@@ -188,7 +204,6 @@ func (s *Service) Loop(ctx context.Context) {
func (s *Service) Update(newHeader *types.Header) error {
var lowest = db.BlockLocator{
Number: s.cfg.StartBlockNumber,
Hash: common.HexToHash(s.cfg.StartBlockHash),
}
highestConfirmed, err := s.cfg.DB.GetHighestL1Block()
if err != nil {
......@@ -213,7 +228,7 @@ func (s *Service) Update(newHeader *types.Header) error {
return nil
}
if lowest.Hash != headers[0].ParentHash {
if lowest.Number > 0 && lowest.Hash != headers[0].ParentHash {
logger.Error("Parent hash does not connect to ",
"block", headers[0].Number.Uint64(), "hash", headers[0].Hash,
"lowest_block", lowest.Number, "hash", lowest.Hash)
......@@ -223,7 +238,6 @@ func (s *Service) Update(newHeader *types.Header) error {
startHeight := headers[0].Number.Uint64()
endHeight := headers[len(headers)-1].Number.Uint64()
depositsByBlockHash := make(map[common.Hash][]db.Deposit)
withdrawalsByBlockHash := make(map[common.Hash][]db.Withdrawal)
start := prometheus.NewTimer(s.metrics.UpdateDuration.WithLabelValues("l1"))
defer func() {
......@@ -232,27 +246,27 @@ func (s *Service) Update(newHeader *types.Header) error {
}()
bridgeDepositsCh := make(chan bridge.DepositsMap, len(s.bridges))
bridgeWdsCh := make(chan bridge.WithdrawalsMap, len(s.bridges))
errCh := make(chan error, len(s.bridges))
finalizedWithdrawalsCh := make(chan bridge.FinalizedWithdrawalsMap, 1)
errCh := make(chan error, len(s.bridges)+1)
for _, bridgeImpl := range s.bridges {
go func(b bridge.Bridge) {
deposits, err := b.GetDepositsByBlockRange(startHeight, endHeight)
deposits, err := b.GetDepositsByBlockRange(s.ctx, startHeight, endHeight)
if err != nil {
errCh <- err
return
}
bridgeDepositsCh <- deposits
}(bridgeImpl)
go func(b bridge.Bridge) {
withdrawals, err := b.GetWithdrawalsByBlockRange(startHeight, endHeight)
}
go func() {
finalizedWithdrawals, err := s.portal.GetFinalizedWithdrawalsByBlockRange(s.ctx, startHeight, endHeight)
if err != nil {
errCh <- err
return
}
bridgeWdsCh <- withdrawals
}(bridgeImpl)
}
finalizedWithdrawalsCh <- finalizedWithdrawals
}()
var receives int
for {
......@@ -260,40 +274,44 @@ func (s *Service) Update(newHeader *types.Header) error {
case bridgeDeposits := <-bridgeDepositsCh:
for blockHash, deposits := range bridgeDeposits {
for _, deposit := range deposits {
if err := s.cacheToken(deposit.L1Token); err != nil {
if err := s.cacheToken(deposit); err != nil {
logger.Warn("error caching token", "err", err)
}
}
depositsByBlockHash[blockHash] = append(depositsByBlockHash[blockHash], deposits...)
}
case bridgeWithdrawals := <-bridgeWdsCh:
for blockHash, withdrawals := range bridgeWithdrawals {
for _, withdrawal := range withdrawals {
if err := s.cacheToken(withdrawal.L1Token); err != nil {
logger.Warn("error caching token", "err", err)
}
}
withdrawalsByBlockHash[blockHash] = append(withdrawalsByBlockHash[blockHash], withdrawals...)
}
case err := <-errCh:
return err
}
receives++
if receives == 2*len(s.bridges) {
if receives == len(s.bridges) {
break
}
}
finalizedWithdrawalsByBlockHash := <-finalizedWithdrawalsCh
var stateBatches map[common.Hash][]db.StateBatch
if !s.isBedrock {
stateBatches, err = QueryStateBatches(s.batchScanner, startHeight, endHeight, s.ctx)
if err != nil {
logger.Error("Error querying state batches", "err", err)
return err
}
}
for i, header := range headers {
blockHash := header.Hash
number := header.Number.Uint64()
deposits := depositsByBlockHash[blockHash]
withdrawals := withdrawalsByBlockHash[blockHash]
batches := stateBatches[blockHash]
finalizedWds := finalizedWithdrawalsByBlockHash[blockHash]
if len(deposits) == 0 && len(withdrawals) == 0 && i != len(headers)-1 {
// Always record block data in the last block
// in the list of headers
if len(deposits) == 0 && len(batches) == 0 && len(finalizedWds) == 0 && i != len(headers)-1 {
continue
}
......@@ -303,6 +321,7 @@ func (s *Service) Update(newHeader *types.Header) error {
Number: number,
Timestamp: header.Time,
Deposits: deposits,
FinalizedWithdrawals: finalizedWds,
}
err := s.cfg.DB.AddIndexedL1Block(block)
......@@ -310,24 +329,35 @@ func (s *Service) Update(newHeader *types.Header) error {
logger.Error(
"Unable to import ",
"block", number,
"hash", blockHash,
"err", err,
"hash", blockHash, "err", err,
"block", block,
)
return err
}
err = s.cfg.DB.AddStateBatch(batches)
if err != nil {
logger.Error(
"Unable to import state append batch",
"block", number,
"hash", blockHash, "err", err,
"block", block,
)
return err
}
s.metrics.RecordStateBatches(len(batches))
logger.Debug("Imported ",
"block", number, "hash", blockHash, "deposits", len(block.Deposits))
for _, deposit := range block.Deposits {
token := s.tokenCache[deposit.L2Token]
token := s.tokenCache[deposit.L1Token]
logger.Info(
"indexed deposit ",
"indexed deposit",
"tx_hash", deposit.TxHash,
"symbol", token.Symbol,
"amount", deposit.Amount,
)
s.metrics.RecordDeposit(deposit.L2Token)
s.metrics.RecordDeposit(deposit.L1Token)
}
}
......@@ -382,8 +412,8 @@ func (s *Service) GetDeposits(w http.ResponseWriter, r *http.Request) {
}
page := db.PaginationParam{
Limit: uint64(limit),
Offset: uint64(offset),
Limit: limit,
Offset: offset,
}
deposits, err := s.cfg.DB.GetDepositsByAddress(common.HexToAddress(vars["address"]), page)
......@@ -395,25 +425,8 @@ func (s *Service) GetDeposits(w http.ResponseWriter, r *http.Request) {
server.RespondWithJSON(w, http.StatusOK, deposits)
}
func (s *Service) subscribeNewHeads(ctx context.Context, heads chan *types.Header) {
tick := time.NewTicker(5 * time.Second)
for {
select {
case <-tick.C:
header, err := HeaderByNumberWithRetry(ctx, s.cfg.L1Client)
if err != nil {
logger.Error("error fetching header by number", "err", err)
}
heads <- header
case <-ctx.Done():
return
}
}
}
func (s *Service) catchUp(ctx context.Context) error {
realHead, err := HeaderByNumberWithRetry(ctx, s.cfg.L1Client)
func (s *Service) catchUp() error {
realHead, err := query.HeaderByNumberWithRetry(s.ctx, s.cfg.L1Client)
if err != nil {
return err
}
......@@ -437,8 +450,8 @@ func (s *Service) catchUp(ctx context.Context) error {
for realHeadNum-s.cfg.ConfDepth > currHeadNum+s.cfg.MaxHeaderBatchSize {
select {
case <-ctx.Done():
return context.Canceled
case <-s.ctx.Done():
return s.ctx.Err()
default:
if err := s.Update(realHead); err != nil && err != errNoNewBlocks {
return err
......@@ -456,33 +469,33 @@ func (s *Service) catchUp(ctx context.Context) error {
return nil
}
func (s *Service) cacheToken(address common.Address) error {
if s.tokenCache[address] != nil {
func (s *Service) cacheToken(deposit db.Deposit) error {
if s.tokenCache[deposit.L1Token] != nil {
return nil
}
token, err := s.cfg.DB.GetL1TokenByAddress(address.String())
token, err := s.cfg.DB.GetL1TokenByAddress(deposit.L1Token.String())
if err != nil {
return err
}
if token != nil {
s.metrics.IncL1CachedTokensCount()
s.tokenCache[address] = token
s.tokenCache[deposit.L1Token] = token
return nil
}
token, err = QueryERC20(address, s.cfg.L1Client)
token, err = query.NewERC20(deposit.L1Token, s.cfg.L1Client)
if err != nil {
logger.Error("Error querying ERC20 token details",
"l1_token", address.String(), "err", err)
"l1_token", deposit.L1Token.String(), "err", err)
token = &db.Token{
Address: address.String(),
Address: deposit.L1Token.String(),
}
}
if err := s.cfg.DB.AddL1Token(address.String(), token); err != nil {
if err := s.cfg.DB.AddL1Token(deposit.L1Token.String(), token); err != nil {
return err
}
s.tokenCache[address] = token
s.tokenCache[deposit.L1Token] = token
s.metrics.IncL1CachedTokensCount()
return nil
}
......@@ -491,16 +504,11 @@ func (s *Service) Start() error {
if s.cfg.ChainID == nil {
return errNoChainID
}
s.wg.Add(1)
go s.Loop(s.ctx)
go s.loop()
return nil
}
func (s *Service) Stop() {
s.cancel()
s.wg.Wait()
err := s.cfg.DB.Close()
if err != nil {
logger.Error("Error closing db", "err", err)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment