Commit d7bb9625 authored by Javed Khan's avatar Javed Khan Committed by GitHub

indexer: fix context reuse in bridge filter query (#2496)

* fix (indexer): bridge filter query - fix ctx reuse

* changeset: indexer patch - fix context reuse
parent 5397d2ea
---
'@eth-optimism/indexer': patch
---
fix context reuse
...@@ -24,10 +24,9 @@ func (e *EthBridge) Address() common.Address { ...@@ -24,10 +24,9 @@ func (e *EthBridge) Address() common.Address {
func (e *EthBridge) GetDepositsByBlockRange(start, end uint64) (DepositsMap, error) { func (e *EthBridge) GetDepositsByBlockRange(start, end uint64) (DepositsMap, error) {
depositsByBlockhash := make(DepositsMap) depositsByBlockhash := make(DepositsMap)
iter, err := FilterETHDepositInitiatedWithRetry(e.filterer, &bind.FilterOpts{ iter, err := FilterETHDepositInitiatedWithRetry(e.ctx, e.filterer, &bind.FilterOpts{
Start: start, Start: start,
End: &end, End: &end,
Context: e.ctx,
}) })
if err != nil { if err != nil {
logger.Error("Error fetching filter", "err", err) logger.Error("Error fetching filter", "err", err)
......
...@@ -15,54 +15,48 @@ var clientRetryInterval = 5 * time.Second ...@@ -15,54 +15,48 @@ var clientRetryInterval = 5 * time.Second
// FilterStateBatchAppendedWithRetry retries the given func until it succeeds, // FilterStateBatchAppendedWithRetry retries the given func until it succeeds,
// waiting for clientRetryInterval duration after every call. // waiting for clientRetryInterval duration after every call.
func FilterStateBatchAppendedWithRetry(filterer *scc.StateCommitmentChainFilterer, opts *bind.FilterOpts) (*scc.StateCommitmentChainStateBatchAppendedIterator, error) { func FilterStateBatchAppendedWithRetry(ctx context.Context, filterer *scc.StateCommitmentChainFilterer, opts *bind.FilterOpts) (*scc.StateCommitmentChainStateBatchAppendedIterator, error) {
for { for {
ctxt, cancel := context.WithTimeout(opts.Context, DefaultConnectionTimeout) ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout)
opts.Context = ctxt opts.Context = ctxt
res, err := filterer.FilterStateBatchAppended(opts, nil) res, err := filterer.FilterStateBatchAppended(opts, nil)
switch err { cancel()
case nil: if err == nil {
cancel() return res, nil
return res, err
default:
logger.Error("Error fetching filter", "err", err)
} }
logger.Error("Error fetching filter", "err", err)
time.Sleep(clientRetryInterval) time.Sleep(clientRetryInterval)
} }
} }
// FilterETHDepositInitiatedWithRetry retries the given func until it succeeds, // FilterETHDepositInitiatedWithRetry retries the given func until it succeeds,
// waiting for clientRetryInterval duration after every call. // waiting for clientRetryInterval duration after every call.
func FilterETHDepositInitiatedWithRetry(filterer *l1bridge.L1StandardBridgeFilterer, opts *bind.FilterOpts) (*l1bridge.L1StandardBridgeETHDepositInitiatedIterator, error) { func FilterETHDepositInitiatedWithRetry(ctx context.Context, filterer *l1bridge.L1StandardBridgeFilterer, opts *bind.FilterOpts) (*l1bridge.L1StandardBridgeETHDepositInitiatedIterator, error) {
for { for {
ctxt, cancel := context.WithTimeout(opts.Context, DefaultConnectionTimeout) ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout)
opts.Context = ctxt opts.Context = ctxt
res, err := filterer.FilterETHDepositInitiated(opts, nil, nil) res, err := filterer.FilterETHDepositInitiated(opts, nil, nil)
switch err { cancel()
case nil: if err == nil {
cancel() return res, nil
return res, err
default:
logger.Error("Error fetching filter", "err", err)
} }
logger.Error("Error fetching filter", "err", err)
time.Sleep(clientRetryInterval) time.Sleep(clientRetryInterval)
} }
} }
// FilterERC20DepositInitiatedWithRetry retries the given func until it succeeds, // FilterERC20DepositInitiatedWithRetry retries the given func until it succeeds,
// waiting for clientRetryInterval duration after every call. // waiting for clientRetryInterval duration after every call.
func FilterERC20DepositInitiatedWithRetry(filterer *l1bridge.L1StandardBridgeFilterer, opts *bind.FilterOpts) (*l1bridge.L1StandardBridgeERC20DepositInitiatedIterator, error) { func FilterERC20DepositInitiatedWithRetry(ctx context.Context, filterer *l1bridge.L1StandardBridgeFilterer, opts *bind.FilterOpts) (*l1bridge.L1StandardBridgeERC20DepositInitiatedIterator, error) {
for { for {
ctxt, cancel := context.WithTimeout(opts.Context, DefaultConnectionTimeout) ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout)
opts.Context = ctxt opts.Context = ctxt
res, err := filterer.FilterERC20DepositInitiated(opts, nil, nil, nil) res, err := filterer.FilterERC20DepositInitiated(opts, nil, nil, nil)
switch err { cancel()
case nil: if err == nil {
cancel() return res, nil
return res, err
default:
logger.Error("Error fetching filter", "err", err)
} }
logger.Error("Error fetching filter", "err", err)
time.Sleep(clientRetryInterval) time.Sleep(clientRetryInterval)
} }
} }
...@@ -24,10 +24,9 @@ func (s *StandardBridge) Address() common.Address { ...@@ -24,10 +24,9 @@ func (s *StandardBridge) Address() common.Address {
func (s *StandardBridge) GetDepositsByBlockRange(start, end uint64) (DepositsMap, error) { func (s *StandardBridge) GetDepositsByBlockRange(start, end uint64) (DepositsMap, error) {
depositsByBlockhash := make(DepositsMap) depositsByBlockhash := make(DepositsMap)
iter, err := FilterERC20DepositInitiatedWithRetry(s.filterer, &bind.FilterOpts{ iter, err := FilterERC20DepositInitiatedWithRetry(s.ctx, s.filterer, &bind.FilterOpts{
Start: start, Start: start,
End: &end, End: &end,
Context: s.ctx,
}) })
if err != nil { if err != nil {
logger.Error("Error fetching filter", "err", err) logger.Error("Error fetching filter", "err", err)
......
...@@ -44,10 +44,9 @@ func QueryERC20(address common.Address, client *ethclient.Client) (*db.Token, er ...@@ -44,10 +44,9 @@ func QueryERC20(address common.Address, client *ethclient.Client) (*db.Token, er
func QueryStateBatches(filterer *scc.StateCommitmentChainFilterer, startHeight, endHeight uint64, ctx context.Context) (map[common.Hash][]db.StateBatch, error) { func QueryStateBatches(filterer *scc.StateCommitmentChainFilterer, startHeight, endHeight uint64, ctx context.Context) (map[common.Hash][]db.StateBatch, error) {
batches := make(map[common.Hash][]db.StateBatch) batches := make(map[common.Hash][]db.StateBatch)
iter, err := bridge.FilterStateBatchAppendedWithRetry(filterer, &bind.FilterOpts{ iter, err := bridge.FilterStateBatchAppendedWithRetry(ctx, filterer, &bind.FilterOpts{
Start: startHeight, Start: startHeight,
End: &endHeight, End: &endHeight,
Context: ctx,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
......
...@@ -14,18 +14,16 @@ var clientRetryInterval = 5 * time.Second ...@@ -14,18 +14,16 @@ var clientRetryInterval = 5 * time.Second
// FilterWithdrawalInitiatedWithRetry retries the given func until it succeeds, // FilterWithdrawalInitiatedWithRetry retries the given func until it succeeds,
// waiting for clientRetryInterval duration after every call. // waiting for clientRetryInterval duration after every call.
func FilterWithdrawalInitiatedWithRetry(filterer *l2bridge.L2StandardBridgeFilterer, opts *bind.FilterOpts) (*l2bridge.L2StandardBridgeWithdrawalInitiatedIterator, error) { func FilterWithdrawalInitiatedWithRetry(ctx context.Context, filterer *l2bridge.L2StandardBridgeFilterer, opts *bind.FilterOpts) (*l2bridge.L2StandardBridgeWithdrawalInitiatedIterator, error) {
for { for {
ctxt, cancel := context.WithTimeout(opts.Context, DefaultConnectionTimeout) ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout)
opts.Context = ctxt opts.Context = ctxt
res, err := filterer.FilterWithdrawalInitiated(opts, nil, nil, nil) res, err := filterer.FilterWithdrawalInitiated(opts, nil, nil, nil)
switch err { cancel()
case nil: if err == nil {
cancel() return res, nil
return res, err
default:
logger.Error("Error fetching filter", "err", err)
} }
logger.Error("Error fetching filter", "err", err)
time.Sleep(clientRetryInterval) time.Sleep(clientRetryInterval)
} }
} }
...@@ -25,10 +25,9 @@ func (s *StandardBridge) Address() common.Address { ...@@ -25,10 +25,9 @@ func (s *StandardBridge) Address() common.Address {
func (s *StandardBridge) GetWithdrawalsByBlockRange(start, end uint64) (WithdrawalsMap, error) { func (s *StandardBridge) GetWithdrawalsByBlockRange(start, end uint64) (WithdrawalsMap, error) {
withdrawalsByBlockhash := make(map[common.Hash][]db.Withdrawal) withdrawalsByBlockhash := make(map[common.Hash][]db.Withdrawal)
iter, err := FilterWithdrawalInitiatedWithRetry(s.filterer, &bind.FilterOpts{ iter, err := FilterWithdrawalInitiatedWithRetry(s.ctx, s.filterer, &bind.FilterOpts{
Start: start, Start: start,
End: &end, End: &end,
Context: s.ctx,
}) })
if err != nil { if err != nil {
logger.Error("Error fetching filter", "err", err) logger.Error("Error fetching filter", "err", err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment