Commit 02303023 authored by Javed Khan's avatar Javed Khan Committed by GitHub

indexer: use bedrock events for finalization (#3324)

parent 0308fe4e
...@@ -193,10 +193,20 @@ func (d *Database) AddIndexedL1Block(block *IndexedL1Block) error { ...@@ -193,10 +193,20 @@ func (d *Database) AddIndexedL1Block(block *IndexedL1Block) error {
const insertDepositStatement = ` const insertDepositStatement = `
INSERT INTO deposits INSERT INTO deposits
(guid, from_address, to_address, l1_token, l2_token, amount, tx_hash, log_index, block_hash, data) (guid, from_address, to_address, l1_token, l2_token, amount, tx_hash, log_index, l1_block_hash, data)
VALUES VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
` `
const insertWithdrawalStatement = `
INSERT INTO withdrawals
(guid, from_address, to_address, l1_token, l2_token, amount, tx_hash, log_index, l1_block_hash, data)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (tx_hash)
DO UPDATE SET l1_block_hash = $9;
`
return txn(d.db, func(tx *sql.Tx) error { return txn(d.db, func(tx *sql.Tx) error {
_, err := tx.Exec( _, err := tx.Exec(
insertBlockStatement, insertBlockStatement,
...@@ -232,6 +242,29 @@ func (d *Database) AddIndexedL1Block(block *IndexedL1Block) error { ...@@ -232,6 +242,29 @@ func (d *Database) AddIndexedL1Block(block *IndexedL1Block) error {
} }
} }
if len(block.Withdrawals) == 0 {
return nil
}
for _, withdrawal := range block.Withdrawals {
_, err = tx.Exec(
insertWithdrawalStatement,
NewGUID(),
withdrawal.FromAddress.String(),
withdrawal.ToAddress.String(),
withdrawal.L1Token.String(),
withdrawal.L2Token.String(),
withdrawal.Amount.String(),
withdrawal.TxHash.String(),
withdrawal.LogIndex,
block.Hash.String(),
withdrawal.Data,
)
if err != nil {
return err
}
}
return nil return nil
}) })
} }
...@@ -249,7 +282,7 @@ func (d *Database) AddIndexedL2Block(block *IndexedL2Block) error { ...@@ -249,7 +282,7 @@ func (d *Database) AddIndexedL2Block(block *IndexedL2Block) error {
const insertWithdrawalStatement = ` const insertWithdrawalStatement = `
INSERT INTO withdrawals INSERT INTO withdrawals
(guid, from_address, to_address, l1_token, l2_token, amount, tx_hash, log_index, block_hash, data) (guid, from_address, to_address, l1_token, l2_token, amount, tx_hash, log_index, l2_block_hash, data)
VALUES VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
` `
...@@ -303,7 +336,7 @@ func (d *Database) GetDepositsByAddress(address common.Address, page PaginationP ...@@ -303,7 +336,7 @@ func (d *Database) GetDepositsByAddress(address common.Address, page PaginationP
l1_tokens.name, l1_tokens.symbol, l1_tokens.decimals, l1_tokens.name, l1_tokens.symbol, l1_tokens.decimals,
l1_blocks.number, l1_blocks.timestamp l1_blocks.number, l1_blocks.timestamp
FROM deposits FROM deposits
INNER JOIN l1_blocks ON deposits.block_hash=l1_blocks.hash INNER JOIN l1_blocks ON deposits.l1_block_hash=l1_blocks.hash
INNER JOIN l1_tokens ON deposits.l1_token=l1_tokens.address INNER JOIN l1_tokens ON deposits.l1_token=l1_tokens.address
WHERE deposits.from_address = $1 ORDER BY l1_blocks.timestamp LIMIT $2 OFFSET $3; WHERE deposits.from_address = $1 ORDER BY l1_blocks.timestamp LIMIT $2 OFFSET $3;
` `
...@@ -342,7 +375,7 @@ func (d *Database) GetDepositsByAddress(address common.Address, page PaginationP ...@@ -342,7 +375,7 @@ func (d *Database) GetDepositsByAddress(address common.Address, page PaginationP
SELECT SELECT
count(*) count(*)
FROM deposits FROM deposits
INNER JOIN l1_blocks ON deposits.block_hash=l1_blocks.hash INNER JOIN l1_blocks ON deposits.l1_block_hash=l1_blocks.hash
INNER JOIN l1_tokens ON deposits.l1_token=l1_tokens.address INNER JOIN l1_tokens ON deposits.l1_token=l1_tokens.address
WHERE deposits.from_address = $1; WHERE deposits.from_address = $1;
` `
...@@ -368,6 +401,53 @@ func (d *Database) GetDepositsByAddress(address common.Address, page PaginationP ...@@ -368,6 +401,53 @@ func (d *Database) GetDepositsByAddress(address common.Address, page PaginationP
}, nil }, nil
} }
// GetWithdrawalStatus returns the finalization status corresponding to the
// given withdrawal transaction hash.
func (d *Database) GetWithdrawalStatus(hash common.Hash) (*WithdrawalJSON, error) {
const selectWithdrawalStatement = `
SELECT
withdrawals.guid, withdrawals.from_address, withdrawals.to_address,
withdrawals.amount, withdrawals.tx_hash, withdrawals.data,
withdrawals.l1_token, withdrawals.l2_token,
l2_tokens.name, l2_tokens.symbol, l2_tokens.decimals,
l1_blocks.number, l1_blocks.timestamp,
l2_blocks.number, l2_blocks.timestamp
FROM withdrawals
INNER JOIN l1_blocks ON withdrawals.l1_block_hash=l1_blocks.hash
INNER JOIN l2_blocks ON withdrawals.l2_block_hash=l2_blocks.hash
INNER JOIN l2_tokens ON withdrawals.l2_token=l2_tokens.address
WHERE withdrawals.tx_hash = $1;
`
var withdrawal *WithdrawalJSON
err := txn(d.db, func(tx *sql.Tx) error {
row := tx.QueryRow(selectWithdrawalStatement, hash.String())
if row.Err() != nil {
return row.Err()
}
var l2Token Token
if err := row.Scan(
&withdrawal.GUID, &withdrawal.FromAddress, &withdrawal.ToAddress,
&withdrawal.Amount, &withdrawal.TxHash, &withdrawal.Data,
&withdrawal.L1Token, &l2Token.Address,
&l2Token.Name, &l2Token.Symbol, &l2Token.Decimals,
&withdrawal.L1BlockNumber, &withdrawal.L1BlockTimestamp,
&withdrawal.L2BlockNumber, &withdrawal.L2BlockTimestamp,
); err != nil {
return err
}
withdrawal.L2Token = &l2Token
return nil
})
if err != nil {
return nil, err
}
return withdrawal, nil
}
// GetWithdrawalsByAddress returns the list of Withdrawals indexed for the given // GetWithdrawalsByAddress returns the list of Withdrawals indexed for the given
// address paginated by the given params. // address paginated by the given params.
func (d *Database) GetWithdrawalsByAddress(address common.Address, page PaginationParam) (*PaginatedWithdrawals, error) { func (d *Database) GetWithdrawalsByAddress(address common.Address, page PaginationParam) (*PaginatedWithdrawals, error) {
...@@ -379,7 +459,7 @@ func (d *Database) GetWithdrawalsByAddress(address common.Address, page Paginati ...@@ -379,7 +459,7 @@ func (d *Database) GetWithdrawalsByAddress(address common.Address, page Paginati
l2_tokens.name, l2_tokens.symbol, l2_tokens.decimals, l2_tokens.name, l2_tokens.symbol, l2_tokens.decimals,
l2_blocks.number, l2_blocks.timestamp l2_blocks.number, l2_blocks.timestamp
FROM withdrawals FROM withdrawals
INNER JOIN l2_blocks ON withdrawals.block_hash=l2_blocks.hash INNER JOIN l2_blocks ON withdrawals.l2_block_hash=l2_blocks.hash
INNER JOIN l2_tokens ON withdrawals.l2_token=l2_tokens.address INNER JOIN l2_tokens ON withdrawals.l2_token=l2_tokens.address
WHERE withdrawals.from_address = $1 ORDER BY l2_blocks.timestamp LIMIT $2 OFFSET $3; WHERE withdrawals.from_address = $1 ORDER BY l2_blocks.timestamp LIMIT $2 OFFSET $3;
` `
...@@ -400,7 +480,7 @@ func (d *Database) GetWithdrawalsByAddress(address common.Address, page Paginati ...@@ -400,7 +480,7 @@ func (d *Database) GetWithdrawalsByAddress(address common.Address, page Paginati
&withdrawal.Amount, &withdrawal.TxHash, &withdrawal.Data, &withdrawal.Amount, &withdrawal.TxHash, &withdrawal.Data,
&withdrawal.L1Token, &l2Token.Address, &withdrawal.L1Token, &l2Token.Address,
&l2Token.Name, &l2Token.Symbol, &l2Token.Decimals, &l2Token.Name, &l2Token.Symbol, &l2Token.Decimals,
&withdrawal.BlockNumber, &withdrawal.BlockTimestamp, &withdrawal.L2BlockNumber, &withdrawal.L2BlockTimestamp,
); err != nil { ); err != nil {
return err return err
} }
...@@ -419,7 +499,7 @@ func (d *Database) GetWithdrawalsByAddress(address common.Address, page Paginati ...@@ -419,7 +499,7 @@ func (d *Database) GetWithdrawalsByAddress(address common.Address, page Paginati
SELECT SELECT
count(*) count(*)
FROM withdrawals FROM withdrawals
INNER JOIN l2_blocks ON withdrawals.block_hash=l2_blocks.hash INNER JOIN l2_blocks ON withdrawals.l2_block_hash=l2_blocks.hash
INNER JOIN l2_tokens ON withdrawals.l2_token=l2_tokens.address INNER JOIN l2_tokens ON withdrawals.l2_token=l2_tokens.address
WHERE withdrawals.from_address = $1; WHERE withdrawals.from_address = $1;
` `
...@@ -567,9 +647,9 @@ func (d *Database) GetIndexedL1BlockByHash(hash common.Hash) (*IndexedL1Block, e ...@@ -567,9 +647,9 @@ func (d *Database) GetIndexedL1BlockByHash(hash common.Hash) (*IndexedL1Block, e
} }
const getAirdropQuery = ` const getAirdropQuery = `
SELECT SELECT
address, voter_amount, multisig_signer_amount, gitcoin_amount, address, voter_amount, multisig_signer_amount, gitcoin_amount,
active_bridged_amount, op_user_amount, op_repeat_user_amount, active_bridged_amount, op_user_amount, op_repeat_user_amount,
bonus_amount, total_amount bonus_amount, total_amount
FROM airdrops FROM airdrops
WHERE address = $1 WHERE address = $1
......
...@@ -6,11 +6,12 @@ import ( ...@@ -6,11 +6,12 @@ import (
// IndexedL1Block contains the L1 block including the deposits in it. // IndexedL1Block contains the L1 block including the deposits in it.
type IndexedL1Block struct { type IndexedL1Block struct {
Hash common.Hash Hash common.Hash
ParentHash common.Hash ParentHash common.Hash
Number uint64 Number uint64
Timestamp uint64 Timestamp uint64
Deposits []Deposit Deposits []Deposit
Withdrawals []Withdrawal
} }
// String returns the block hash for the indexed l1 block. // String returns the block hash for the indexed l1 block.
...@@ -24,6 +25,7 @@ type IndexedL2Block struct { ...@@ -24,6 +25,7 @@ type IndexedL2Block struct {
ParentHash common.Hash ParentHash common.Hash
Number uint64 Number uint64
Timestamp uint64 Timestamp uint64
Deposits []Deposit
Withdrawals []Withdrawal Withdrawals []Withdrawal
} }
......
...@@ -28,8 +28,10 @@ CREATE TABLE IF NOT EXISTS deposits ( ...@@ -28,8 +28,10 @@ CREATE TABLE IF NOT EXISTS deposits (
amount VARCHAR NOT NULL, amount VARCHAR NOT NULL,
data BYTEA NOT NULL, data BYTEA NOT NULL,
log_index INTEGER NOT NULL, log_index INTEGER NOT NULL,
block_hash VARCHAR NOT NULL REFERENCES l1_blocks(hash), l1_block_hash VARCHAR NOT NULL REFERENCES l1_blocks(hash),
tx_hash VARCHAR NOT NULL l2_block_hash VARCHAR REFERENCES l2_blocks(hash),
tx_hash VARCHAR NOT NULL,
failed BOOLEAN NOT NULL DEFAULT false
) )
` `
...@@ -61,7 +63,8 @@ CREATE TABLE IF NOT EXISTS withdrawals ( ...@@ -61,7 +63,8 @@ CREATE TABLE IF NOT EXISTS withdrawals (
amount VARCHAR NOT NULL, amount VARCHAR NOT NULL,
data BYTEA NOT NULL, data BYTEA NOT NULL,
log_index INTEGER NOT NULL, log_index INTEGER NOT NULL,
block_hash VARCHAR NOT NULL REFERENCES l2_blocks(hash), l1_block_hash VARCHAR REFERENCES l1_blocks(hash),
l2_block_hash VARCHAR NOT NULL REFERENCES l2_blocks(hash),
tx_hash VARCHAR NOT NULL, tx_hash VARCHAR NOT NULL,
) )
` `
......
...@@ -26,15 +26,17 @@ func (w Withdrawal) String() string { ...@@ -26,15 +26,17 @@ func (w Withdrawal) String() string {
// WithdrawalJSON contains Withdrawal data suitable for JSON serialization. // WithdrawalJSON contains Withdrawal data suitable for JSON serialization.
type WithdrawalJSON struct { type WithdrawalJSON struct {
GUID string `json:"guid"` GUID string `json:"guid"`
FromAddress string `json:"from"` FromAddress string `json:"from"`
ToAddress string `json:"to"` ToAddress string `json:"to"`
L1Token string `json:"l1Token"` L1Token string `json:"l1Token"`
L2Token *Token `json:"l2Token"` L2Token *Token `json:"l2Token"`
Amount string `json:"amount"` Amount string `json:"amount"`
Data []byte `json:"data"` Data []byte `json:"data"`
LogIndex uint64 `json:"logIndex"` LogIndex uint64 `json:"logIndex"`
BlockNumber uint64 `json:"blockNumber"` L1BlockNumber uint64 `json:"l1BlockNumber"`
BlockTimestamp string `json:"blockTimestamp"` L1BlockTimestamp string `json:"l1BlockTimestamp"`
TxHash string `json:"transactionHash"` L2BlockNumber uint64 `json:"l2BlockNumber"`
L2BlockTimestamp string `json:"l2BlockTimestamp"`
TxHash string `json:"transactionHash"`
} }
...@@ -212,6 +212,7 @@ func (b *Indexer) Serve() error { ...@@ -212,6 +212,7 @@ func (b *Indexer) Serve() error {
b.router.HandleFunc("/v1/l1/status", b.l1IndexingService.GetIndexerStatus).Methods("GET") b.router.HandleFunc("/v1/l1/status", b.l1IndexingService.GetIndexerStatus).Methods("GET")
b.router.HandleFunc("/v1/l2/status", b.l2IndexingService.GetIndexerStatus).Methods("GET") b.router.HandleFunc("/v1/l2/status", b.l2IndexingService.GetIndexerStatus).Methods("GET")
b.router.HandleFunc("/v1/deposits/0x{address:[a-fA-F0-9]{40}}", b.l1IndexingService.GetDeposits).Methods("GET") b.router.HandleFunc("/v1/deposits/0x{address:[a-fA-F0-9]{40}}", b.l1IndexingService.GetDeposits).Methods("GET")
b.router.HandleFunc("/v1/withdrawal/0x{hash:[a-fA-F0-9]{64}}", b.l2IndexingService.GetWithdrawalStatus).Methods("GET")
b.router.HandleFunc("/v1/withdrawals/0x{address:[a-fA-F0-9]{40}}", b.l2IndexingService.GetWithdrawals).Methods("GET") b.router.HandleFunc("/v1/withdrawals/0x{address:[a-fA-F0-9]{40}}", b.l2IndexingService.GetWithdrawals).Methods("GET")
b.router.HandleFunc("/v1/airdrops/0x{address:[a-fA-F0-9]{40}}", b.airdropService.GetAirdrop) b.router.HandleFunc("/v1/airdrops/0x{address:[a-fA-F0-9]{40}}", b.airdropService.GetAirdrop)
b.router.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { b.router.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
......
...@@ -13,10 +13,12 @@ import ( ...@@ -13,10 +13,12 @@ import (
) )
type DepositsMap map[common.Hash][]db.Deposit type DepositsMap map[common.Hash][]db.Deposit
type WithdrawalsMap map[common.Hash][]db.Withdrawal // Finalizations
type Bridge interface { type Bridge interface {
Address() common.Address Address() common.Address
GetDepositsByBlockRange(uint64, uint64) (DepositsMap, error) GetDepositsByBlockRange(uint64, uint64) (DepositsMap, error)
GetWithdrawalsByBlockRange(uint64, uint64) (WithdrawalsMap, error)
String() string String() string
} }
......
...@@ -50,6 +50,36 @@ func (e *EthBridge) GetDepositsByBlockRange(start, end uint64) (DepositsMap, err ...@@ -50,6 +50,36 @@ func (e *EthBridge) GetDepositsByBlockRange(start, end uint64) (DepositsMap, err
return depositsByBlockhash, nil return depositsByBlockhash, nil
} }
func (s *EthBridge) GetWithdrawalsByBlockRange(start, end uint64) (WithdrawalsMap, error) {
withdrawalsByBlockHash := make(WithdrawalsMap)
iter, err := FilterETHWithdrawalFinalizedWithRetry(s.ctx, s.filterer, &bind.FilterOpts{
Start: start,
End: &end,
})
if err != nil {
logger.Error("Error fetching filter", "err", err)
}
for iter.Next() {
withdrawalsByBlockHash[iter.Event.Raw.BlockHash] = append(
withdrawalsByBlockHash[iter.Event.Raw.BlockHash], db.Withdrawal{
TxHash: iter.Event.Raw.TxHash,
FromAddress: iter.Event.From,
ToAddress: iter.Event.To,
Amount: iter.Event.Amount,
Data: iter.Event.ExtraData,
LogIndex: iter.Event.Raw.Index,
})
}
if err := iter.Error(); err != nil {
return nil, err
}
return withdrawalsByBlockHash, nil
}
func (e *EthBridge) String() string { func (e *EthBridge) String() string {
return e.name return e.name
} }
...@@ -52,6 +52,38 @@ func (s *StandardBridge) GetDepositsByBlockRange(start, end uint64) (DepositsMap ...@@ -52,6 +52,38 @@ func (s *StandardBridge) GetDepositsByBlockRange(start, end uint64) (DepositsMap
return depositsByBlockhash, nil return depositsByBlockhash, nil
} }
func (s *StandardBridge) GetWithdrawalsByBlockRange(start, end uint64) (WithdrawalsMap, error) {
withdrawalsByBlockHash := make(WithdrawalsMap)
iter, err := FilterERC20WithdrawalFinalizedWithRetry(s.ctx, s.filterer, &bind.FilterOpts{
Start: start,
End: &end,
})
if err != nil {
logger.Error("Error fetching filter", "err", err)
}
for iter.Next() {
withdrawalsByBlockHash[iter.Event.Raw.BlockHash] = append(
withdrawalsByBlockHash[iter.Event.Raw.BlockHash], db.Withdrawal{
TxHash: iter.Event.Raw.TxHash,
L1Token: iter.Event.L1Token,
L2Token: iter.Event.L2Token,
FromAddress: iter.Event.From,
ToAddress: iter.Event.To,
Amount: iter.Event.Amount,
Data: iter.Event.ExtraData,
LogIndex: iter.Event.Raw.Index,
})
}
if err := iter.Error(); err != nil {
return nil, err
}
return withdrawalsByBlockHash, nil
}
func (s *StandardBridge) String() string { func (s *StandardBridge) String() string {
return s.name return s.name
} }
...@@ -219,6 +219,7 @@ func (s *Service) Update(newHeader *types.Header) error { ...@@ -219,6 +219,7 @@ func (s *Service) Update(newHeader *types.Header) error {
startHeight := headers[0].Number.Uint64() startHeight := headers[0].Number.Uint64()
endHeight := headers[len(headers)-1].Number.Uint64() endHeight := headers[len(headers)-1].Number.Uint64()
depositsByBlockHash := make(map[common.Hash][]db.Deposit) depositsByBlockHash := make(map[common.Hash][]db.Deposit)
withdrawalsByBlockHash := make(map[common.Hash][]db.Withdrawal)
start := prometheus.NewTimer(s.metrics.UpdateDuration.WithLabelValues("l1")) start := prometheus.NewTimer(s.metrics.UpdateDuration.WithLabelValues("l1"))
defer func() { defer func() {
...@@ -227,6 +228,7 @@ func (s *Service) Update(newHeader *types.Header) error { ...@@ -227,6 +228,7 @@ func (s *Service) Update(newHeader *types.Header) error {
}() }()
bridgeDepositsCh := make(chan bridge.DepositsMap, len(s.bridges)) bridgeDepositsCh := make(chan bridge.DepositsMap, len(s.bridges))
bridgeWdsCh := make(chan bridge.WithdrawalsMap, len(s.bridges))
errCh := make(chan error, len(s.bridges)) errCh := make(chan error, len(s.bridges))
for _, bridgeImpl := range s.bridges { for _, bridgeImpl := range s.bridges {
...@@ -238,6 +240,14 @@ func (s *Service) Update(newHeader *types.Header) error { ...@@ -238,6 +240,14 @@ func (s *Service) Update(newHeader *types.Header) error {
} }
bridgeDepositsCh <- deposits bridgeDepositsCh <- deposits
}(bridgeImpl) }(bridgeImpl)
go func(b bridge.Bridge) {
withdrawals, err := b.GetWithdrawalsByBlockRange(startHeight, endHeight)
if err != nil {
errCh <- err
return
}
bridgeWdsCh <- withdrawals
}(bridgeImpl)
} }
var receives int var receives int
...@@ -246,13 +256,23 @@ func (s *Service) Update(newHeader *types.Header) error { ...@@ -246,13 +256,23 @@ func (s *Service) Update(newHeader *types.Header) error {
case bridgeDeposits := <-bridgeDepositsCh: case bridgeDeposits := <-bridgeDepositsCh:
for blockHash, deposits := range bridgeDeposits { for blockHash, deposits := range bridgeDeposits {
for _, deposit := range deposits { for _, deposit := range deposits {
if err := s.cacheToken(deposit); err != nil { if err := s.cacheToken(deposit.L1Token); err != nil {
logger.Warn("error caching token", "err", err) logger.Warn("error caching token", "err", err)
} }
} }
depositsByBlockHash[blockHash] = append(depositsByBlockHash[blockHash], deposits...) depositsByBlockHash[blockHash] = append(depositsByBlockHash[blockHash], deposits...)
} }
case bridgeWithdrawals := <-bridgeWdsCh:
for blockHash, withdrawals := range bridgeWithdrawals {
for _, withdrawal := range withdrawals {
if err := s.cacheToken(withdrawal.L1Token); err != nil {
logger.Warn("error caching token", "err", err)
}
}
withdrawalsByBlockHash[blockHash] = append(withdrawalsByBlockHash[blockHash], withdrawals...)
}
case err := <-errCh: case err := <-errCh:
return err return err
} }
...@@ -263,6 +283,51 @@ func (s *Service) Update(newHeader *types.Header) error { ...@@ -263,6 +283,51 @@ func (s *Service) Update(newHeader *types.Header) error {
} }
} }
for i, header := range headers {
blockHash := header.Hash
number := header.Number.Uint64()
deposits := depositsByBlockHash[blockHash]
withdrawals := withdrawalsByBlockHash[blockHash]
if len(deposits) == 0 && len(withdrawals) == 0 && i != len(headers)-1 {
continue
}
block := &db.IndexedL1Block{
Hash: blockHash,
ParentHash: header.ParentHash,
Number: number,
Timestamp: header.Time,
Deposits: deposits,
Withdrawals: withdrawals,
}
err := s.cfg.DB.AddIndexedL1Block(block)
if err != nil {
logger.Error(
"Unable to import ",
"block", number,
"hash", blockHash,
"err", err,
"block", block,
)
return err
}
logger.Debug("Imported ",
"block", number, "hash", blockHash, "deposits", len(block.Deposits))
for _, deposit := range block.Deposits {
token := s.tokenCache[deposit.L2Token]
logger.Info(
"indexed deposit ",
"tx_hash", deposit.TxHash,
"symbol", token.Symbol,
"amount", deposit.Amount,
)
s.metrics.RecordDeposit(deposit.L2Token)
}
}
newHeaderNumber := newHeader.Number.Uint64() newHeaderNumber := newHeader.Number.Uint64()
s.metrics.SetL1SyncHeight(endHeight) s.metrics.SetL1SyncHeight(endHeight)
s.metrics.SetL1SyncPercent(endHeight, newHeaderNumber) s.metrics.SetL1SyncPercent(endHeight, newHeaderNumber)
...@@ -388,33 +453,33 @@ func (s *Service) catchUp(ctx context.Context) error { ...@@ -388,33 +453,33 @@ func (s *Service) catchUp(ctx context.Context) error {
return nil return nil
} }
func (s *Service) cacheToken(deposit db.Deposit) error { func (s *Service) cacheToken(address common.Address) error {
if s.tokenCache[deposit.L1Token] != nil { if s.tokenCache[address] != nil {
return nil return nil
} }
token, err := s.cfg.DB.GetL1TokenByAddress(deposit.L1Token.String()) token, err := s.cfg.DB.GetL1TokenByAddress(address.String())
if err != nil { if err != nil {
return err return err
} }
if token != nil { if token != nil {
s.metrics.IncL1CachedTokensCount() s.metrics.IncL1CachedTokensCount()
s.tokenCache[deposit.L1Token] = token s.tokenCache[address] = token
return nil return nil
} }
token, err = QueryERC20(deposit.L1Token, s.cfg.L1Client) token, err = QueryERC20(address, s.cfg.L1Client)
if err != nil { if err != nil {
logger.Error("Error querying ERC20 token details", logger.Error("Error querying ERC20 token details",
"l1_token", deposit.L1Token.String(), "err", err) "l1_token", address.String(), "err", err)
token = &db.Token{ token = &db.Token{
Address: deposit.L1Token.String(), Address: address.String(),
} }
} }
if err := s.cfg.DB.AddL1Token(deposit.L1Token.String(), token); err != nil { if err := s.cfg.DB.AddL1Token(address.String(), token); err != nil {
return err return err
} }
s.tokenCache[deposit.L1Token] = token s.tokenCache[address] = token
s.metrics.IncL1CachedTokensCount() s.metrics.IncL1CachedTokensCount()
return nil return nil
} }
......
...@@ -12,10 +12,12 @@ import ( ...@@ -12,10 +12,12 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
) )
type DepositsMap map[common.Hash][]db.Deposit // Finalizations
type WithdrawalsMap map[common.Hash][]db.Withdrawal type WithdrawalsMap map[common.Hash][]db.Withdrawal
type Bridge interface { type Bridge interface {
Address() common.Address Address() common.Address
GetDepositsByBlockRange(uint64, uint64) (DepositsMap, error)
GetWithdrawalsByBlockRange(uint64, uint64) (WithdrawalsMap, error) GetWithdrawalsByBlockRange(uint64, uint64) (WithdrawalsMap, error)
String() string String() string
} }
......
...@@ -22,6 +22,37 @@ func (s *StandardBridge) Address() common.Address { ...@@ -22,6 +22,37 @@ func (s *StandardBridge) Address() common.Address {
return s.address return s.address
} }
func (s *StandardBridge) GetDepositsByBlockRange(start, end uint64) (DepositsMap, error) {
depositsByBlockhash := make(DepositsMap)
iter, err := FilterDepositFinalizedWithRetry(s.ctx, s.filterer, &bind.FilterOpts{
Start: start,
End: &end,
})
if err != nil {
logger.Error("Error fetching filter", "err", err)
}
for iter.Next() {
depositsByBlockhash[iter.Event.Raw.BlockHash] = append(
depositsByBlockhash[iter.Event.Raw.BlockHash], db.Deposit{
TxHash: iter.Event.Raw.TxHash,
L1Token: iter.Event.L1Token,
L2Token: iter.Event.L2Token,
FromAddress: iter.Event.From,
ToAddress: iter.Event.To,
Amount: iter.Event.Amount,
Data: iter.Event.ExtraData,
LogIndex: iter.Event.Raw.Index,
})
}
if err := iter.Error(); err != nil {
return nil, err
}
return depositsByBlockhash, nil
}
func (s *StandardBridge) GetWithdrawalsByBlockRange(start, end uint64) (WithdrawalsMap, error) { func (s *StandardBridge) GetWithdrawalsByBlockRange(start, end uint64) (WithdrawalsMap, error) {
withdrawalsByBlockhash := make(map[common.Hash][]db.Withdrawal) withdrawalsByBlockhash := make(map[common.Hash][]db.Withdrawal)
......
...@@ -218,6 +218,7 @@ func (s *Service) Update(newHeader *types.Header) error { ...@@ -218,6 +218,7 @@ func (s *Service) Update(newHeader *types.Header) error {
startHeight := headers[0].Number.Uint64() startHeight := headers[0].Number.Uint64()
endHeight := headers[len(headers)-1].Number.Uint64() endHeight := headers[len(headers)-1].Number.Uint64()
depositsByBlockHash := make(map[common.Hash][]db.Deposit)
withdrawalsByBlockHash := make(map[common.Hash][]db.Withdrawal) withdrawalsByBlockHash := make(map[common.Hash][]db.Withdrawal)
start := prometheus.NewTimer(s.metrics.UpdateDuration.WithLabelValues("l2")) start := prometheus.NewTimer(s.metrics.UpdateDuration.WithLabelValues("l2"))
...@@ -226,10 +227,19 @@ func (s *Service) Update(newHeader *types.Header) error { ...@@ -226,10 +227,19 @@ func (s *Service) Update(newHeader *types.Header) error {
logger.Info("updated index", "start_height", startHeight, "end_height", endHeight, "duration", dur) logger.Info("updated index", "start_height", startHeight, "end_height", endHeight, "duration", dur)
}() }()
bridgeDepositsCh := make(chan bridge.DepositsMap, len(s.bridges))
bridgeWdsCh := make(chan bridge.WithdrawalsMap) bridgeWdsCh := make(chan bridge.WithdrawalsMap)
errCh := make(chan error, len(s.bridges)) errCh := make(chan error, len(s.bridges))
for _, bridgeImpl := range s.bridges { for _, bridgeImpl := range s.bridges {
go func(b bridge.Bridge) {
deposits, err := b.GetDepositsByBlockRange(startHeight, endHeight)
if err != nil {
errCh <- err
return
}
bridgeDepositsCh <- deposits
}(bridgeImpl)
go func(b bridge.Bridge) { go func(b bridge.Bridge) {
wds, err := b.GetWithdrawalsByBlockRange(startHeight, endHeight) wds, err := b.GetWithdrawalsByBlockRange(startHeight, endHeight)
if err != nil { if err != nil {
...@@ -246,13 +256,23 @@ func (s *Service) Update(newHeader *types.Header) error { ...@@ -246,13 +256,23 @@ func (s *Service) Update(newHeader *types.Header) error {
case bridgeWds := <-bridgeWdsCh: case bridgeWds := <-bridgeWdsCh:
for blockHash, withdrawals := range bridgeWds { for blockHash, withdrawals := range bridgeWds {
for _, wd := range withdrawals { for _, wd := range withdrawals {
if err := s.cacheToken(wd); err != nil { if err := s.cacheToken(wd.L2Token); err != nil {
logger.Warn("error caching token", "err", err) logger.Warn("error caching token", "err", err)
} }
} }
withdrawalsByBlockHash[blockHash] = append(withdrawalsByBlockHash[blockHash], withdrawals...) withdrawalsByBlockHash[blockHash] = append(withdrawalsByBlockHash[blockHash], withdrawals...)
} }
case bridgeDeposits := <-bridgeDepositsCh:
for blockHash, deposits := range bridgeDeposits {
for _, deposit := range deposits {
if err := s.cacheToken(deposit.L2Token); err != nil {
logger.Warn("error caching token", "err", err)
}
}
depositsByBlockHash[blockHash] = append(depositsByBlockHash[blockHash], deposits...)
}
case err := <-errCh: case err := <-errCh:
return err return err
} }
...@@ -266,6 +286,7 @@ func (s *Service) Update(newHeader *types.Header) error { ...@@ -266,6 +286,7 @@ func (s *Service) Update(newHeader *types.Header) error {
for i, header := range headers { for i, header := range headers {
blockHash := header.Hash() blockHash := header.Hash()
number := header.Number.Uint64() number := header.Number.Uint64()
deposits := depositsByBlockHash[blockHash]
withdrawals := withdrawalsByBlockHash[blockHash] withdrawals := withdrawalsByBlockHash[blockHash]
if len(withdrawals) == 0 && i != len(headers)-1 { if len(withdrawals) == 0 && i != len(headers)-1 {
...@@ -277,6 +298,7 @@ func (s *Service) Update(newHeader *types.Header) error { ...@@ -277,6 +298,7 @@ func (s *Service) Update(newHeader *types.Header) error {
ParentHash: header.ParentHash, ParentHash: header.ParentHash,
Number: number, Number: number,
Timestamp: header.Time, Timestamp: header.Time,
Deposits: deposits,
Withdrawals: withdrawals, Withdrawals: withdrawals,
} }
...@@ -336,6 +358,18 @@ func (s *Service) GetIndexerStatus(w http.ResponseWriter, r *http.Request) { ...@@ -336,6 +358,18 @@ func (s *Service) GetIndexerStatus(w http.ResponseWriter, r *http.Request) {
server.RespondWithJSON(w, http.StatusOK, status) server.RespondWithJSON(w, http.StatusOK, status)
} }
func (s *Service) GetWithdrawalStatus(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
withdrawal, err := s.cfg.DB.GetWithdrawalStatus(common.HexToHash(vars["hash"]))
if err != nil {
server.RespondWithError(w, http.StatusInternalServerError, err.Error())
return
}
server.RespondWithJSON(w, http.StatusOK, withdrawal)
}
func (s *Service) GetWithdrawals(w http.ResponseWriter, r *http.Request) { func (s *Service) GetWithdrawals(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
...@@ -431,32 +465,32 @@ func (s *Service) catchUp(ctx context.Context) error { ...@@ -431,32 +465,32 @@ func (s *Service) catchUp(ctx context.Context) error {
return nil return nil
} }
func (s *Service) cacheToken(withdrawal db.Withdrawal) error { func (s *Service) cacheToken(address common.Address) error {
if s.tokenCache[withdrawal.L2Token] != nil { if s.tokenCache[address] != nil {
return nil return nil
} }
token, err := s.cfg.DB.GetL2TokenByAddress(withdrawal.L2Token.String()) token, err := s.cfg.DB.GetL2TokenByAddress(address.String())
if err != nil { if err != nil {
return err return err
} }
if token != nil { if token != nil {
s.metrics.IncL2CachedTokensCount() s.metrics.IncL2CachedTokensCount()
s.tokenCache[withdrawal.L2Token] = token s.tokenCache[address] = token
return nil return nil
} }
token, err = QueryERC20(withdrawal.L2Token, s.cfg.L2Client) token, err = QueryERC20(address, s.cfg.L2Client)
if err != nil { if err != nil {
logger.Error("Error querying ERC20 token details", logger.Error("Error querying ERC20 token details",
"l2_token", withdrawal.L2Token.String(), "err", err) "l2_token", address.String(), "err", err)
token = &db.Token{ token = &db.Token{
Address: withdrawal.L2Token.String(), Address: address.String(),
} }
} }
if err := s.cfg.DB.AddL2Token(withdrawal.L2Token.String(), token); err != nil { if err := s.cfg.DB.AddL2Token(address.String(), token); err != nil {
return err return err
} }
s.tokenCache[withdrawal.L2Token] = token s.tokenCache[address] = token
s.metrics.IncL2CachedTokensCount() s.metrics.IncL2CachedTokensCount()
return nil return nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment