Commit 70912c0d authored by Brian Bland's avatar Brian Bland Committed by GitHub

op-batcher/op-proposer: Only pass lifecycle contexts into RollupProvider.RollupClient (#10705)

* Only pass lifecycle contexts into RollupProvider.RollupClient

* Ensure that L2EndpointProvider.EthClient is called without a timeout
parent af1d9391
...@@ -186,13 +186,15 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error { ...@@ -186,13 +186,15 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error {
// loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded. // loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded.
func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) { func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) {
ctx, cancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
defer cancel()
l2Client, err := l.EndpointProvider.EthClient(ctx) l2Client, err := l.EndpointProvider.EthClient(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("getting L2 client: %w", err) return nil, fmt.Errorf("getting L2 client: %w", err)
} }
block, err := l2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber))
cCtx, cancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
defer cancel()
block, err := l2Client.BlockByNumber(cCtx, new(big.Int).SetUint64(blockNumber))
if err != nil { if err != nil {
return nil, fmt.Errorf("getting L2 block: %w", err) return nil, fmt.Errorf("getting L2 block: %w", err)
} }
...@@ -208,13 +210,15 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin ...@@ -208,13 +210,15 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state. // calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions) // It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions)
func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) { func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) {
ctx, cancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
defer cancel()
rollupClient, err := l.EndpointProvider.RollupClient(ctx) rollupClient, err := l.EndpointProvider.RollupClient(ctx)
if err != nil { if err != nil {
return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("getting rollup client: %w", err) return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("getting rollup client: %w", err)
} }
syncStatus, err := rollupClient.SyncStatus(ctx)
cCtx, cancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
defer cancel()
syncStatus, err := rollupClient.SyncStatus(cCtx)
// Ensure that we have the sync status // Ensure that we have the sync status
if err != nil { if err != nil {
return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("failed to get sync status: %w", err) return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("failed to get sync status: %w", err)
...@@ -337,15 +341,16 @@ func (l *BatchSubmitter) loop() { ...@@ -337,15 +341,16 @@ func (l *BatchSubmitter) loop() {
// waitNodeSync Check to see if there was a batcher tx sent recently that // waitNodeSync Check to see if there was a batcher tx sent recently that
// still needs more block confirmations before being considered finalized // still needs more block confirmations before being considered finalized
func (l *BatchSubmitter) waitNodeSync() error { func (l *BatchSubmitter) waitNodeSync() error {
ctx, cancel := context.WithTimeout(l.shutdownCtx, l.Config.NetworkTimeout) ctx := l.shutdownCtx
defer cancel()
rollupClient, err := l.EndpointProvider.RollupClient(ctx) rollupClient, err := l.EndpointProvider.RollupClient(ctx)
if err != nil { if err != nil {
return fmt.Errorf("failed to get rollup client: %w", err) return fmt.Errorf("failed to get rollup client: %w", err)
} }
l1Tip, err := l.l1Tip(ctx) cCtx, cancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
defer cancel()
l1Tip, err := l.l1Tip(cCtx)
if err != nil { if err != nil {
return fmt.Errorf("failed to retrieve l1 tip: %w", err) return fmt.Errorf("failed to retrieve l1 tip: %w", err)
} }
...@@ -353,7 +358,7 @@ func (l *BatchSubmitter) waitNodeSync() error { ...@@ -353,7 +358,7 @@ func (l *BatchSubmitter) waitNodeSync() error {
l1TargetBlock := l1Tip.Number l1TargetBlock := l1Tip.Number
if l.Config.CheckRecentTxsDepth != 0 { if l.Config.CheckRecentTxsDepth != 0 {
l.Log.Info("Checking for recently submitted batcher transactions on L1") l.Log.Info("Checking for recently submitted batcher transactions on L1")
recentBlock, found, err := eth.CheckRecentTxs(ctx, l.L1Client, l.Config.CheckRecentTxsDepth, l.Txmgr.From()) recentBlock, found, err := eth.CheckRecentTxs(cCtx, l.L1Client, l.Config.CheckRecentTxsDepth, l.Txmgr.From())
if err != nil { if err != nil {
return fmt.Errorf("failed checking recent batcher txs: %w", err) return fmt.Errorf("failed checking recent batcher txs: %w", err)
} }
...@@ -451,16 +456,16 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t ...@@ -451,16 +456,16 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
} }
func (l *BatchSubmitter) safeL1Origin(ctx context.Context) (eth.BlockID, error) { func (l *BatchSubmitter) safeL1Origin(ctx context.Context) (eth.BlockID, error) {
ctx, cancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
defer cancel()
c, err := l.EndpointProvider.RollupClient(ctx) c, err := l.EndpointProvider.RollupClient(ctx)
if err != nil { if err != nil {
log.Error("Failed to get rollup client", "err", err) log.Error("Failed to get rollup client", "err", err)
return eth.BlockID{}, fmt.Errorf("safe l1 origin: error getting rollup client: %w", err) return eth.BlockID{}, fmt.Errorf("safe l1 origin: error getting rollup client: %w", err)
} }
status, err := c.SyncStatus(ctx) cCtx, cancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
defer cancel()
status, err := c.SyncStatus(cCtx)
if err != nil { if err != nil {
log.Error("Failed to get sync status", "err", err) log.Error("Failed to get sync status", "err", err)
return eth.BlockID{}, fmt.Errorf("safe l1 origin: error getting sync status: %w", err) return eth.BlockID{}, fmt.Errorf("safe l1 origin: error getting sync status: %w", err)
......
...@@ -244,13 +244,15 @@ func (l *L2OutputSubmitter) FetchNextOutputInfo(ctx context.Context) (*eth.Outpu ...@@ -244,13 +244,15 @@ func (l *L2OutputSubmitter) FetchNextOutputInfo(ctx context.Context) (*eth.Outpu
// FetchCurrentBlockNumber gets the current block number from the [L2OutputSubmitter]'s [RollupClient]. If the `AllowNonFinalized` configuration // FetchCurrentBlockNumber gets the current block number from the [L2OutputSubmitter]'s [RollupClient]. If the `AllowNonFinalized` configuration
// option is set, it will return the safe head block number, and if not, it will return the finalized head block number. // option is set, it will return the safe head block number, and if not, it will return the finalized head block number.
func (l *L2OutputSubmitter) FetchCurrentBlockNumber(ctx context.Context) (*big.Int, error) { func (l *L2OutputSubmitter) FetchCurrentBlockNumber(ctx context.Context) (*big.Int, error) {
cCtx, cancel := context.WithTimeout(ctx, l.Cfg.NetworkTimeout) rollupClient, err := l.RollupProvider.RollupClient(ctx)
defer cancel()
rollupClient, err := l.RollupProvider.RollupClient(cCtx)
if err != nil { if err != nil {
l.Log.Error("proposer unable to get rollup client", "err", err) l.Log.Error("proposer unable to get rollup client", "err", err)
return nil, err return nil, err
} }
cCtx, cancel := context.WithTimeout(ctx, l.Cfg.NetworkTimeout)
defer cancel()
status, err := rollupClient.SyncStatus(cCtx) status, err := rollupClient.SyncStatus(cCtx)
if err != nil { if err != nil {
l.Log.Error("proposer unable to get sync status", "err", err) l.Log.Error("proposer unable to get sync status", "err", err)
...@@ -268,15 +270,16 @@ func (l *L2OutputSubmitter) FetchCurrentBlockNumber(ctx context.Context) (*big.I ...@@ -268,15 +270,16 @@ func (l *L2OutputSubmitter) FetchCurrentBlockNumber(ctx context.Context) (*big.I
} }
func (l *L2OutputSubmitter) FetchOutput(ctx context.Context, block *big.Int) (*eth.OutputResponse, bool, error) { func (l *L2OutputSubmitter) FetchOutput(ctx context.Context, block *big.Int) (*eth.OutputResponse, bool, error) {
ctx, cancel := context.WithTimeout(ctx, l.Cfg.NetworkTimeout)
defer cancel()
rollupClient, err := l.RollupProvider.RollupClient(ctx) rollupClient, err := l.RollupProvider.RollupClient(ctx)
if err != nil { if err != nil {
l.Log.Error("proposer unable to get rollup client", "err", err) l.Log.Error("proposer unable to get rollup client", "err", err)
return nil, false, err return nil, false, err
} }
output, err := rollupClient.OutputAtBlock(ctx, block.Uint64())
cCtx, cancel := context.WithTimeout(ctx, l.Cfg.NetworkTimeout)
defer cancel()
output, err := rollupClient.OutputAtBlock(cCtx, block.Uint64())
if err != nil { if err != nil {
l.Log.Error("failed to fetch output at block", "block", block, "err", err) l.Log.Error("failed to fetch output at block", "block", block, "err", err)
return nil, false, err return nil, false, err
...@@ -431,15 +434,15 @@ func (l *L2OutputSubmitter) loop() { ...@@ -431,15 +434,15 @@ func (l *L2OutputSubmitter) loop() {
} }
func (l *L2OutputSubmitter) waitNodeSync() error { func (l *L2OutputSubmitter) waitNodeSync() error {
ctx, cancel := context.WithTimeout(l.ctx, l.Cfg.NetworkTimeout) cCtx, cancel := context.WithTimeout(l.ctx, l.Cfg.NetworkTimeout)
defer cancel() defer cancel()
l1head, err := l.Txmgr.BlockNumber(ctx) l1head, err := l.Txmgr.BlockNumber(cCtx)
if err != nil { if err != nil {
return fmt.Errorf("failed to retrieve current L1 block number: %w", err) return fmt.Errorf("failed to retrieve current L1 block number: %w", err)
} }
rollupClient, err := l.RollupProvider.RollupClient(ctx) rollupClient, err := l.RollupProvider.RollupClient(l.ctx)
if err != nil { if err != nil {
return fmt.Errorf("failed to get rollup client: %w", err) return fmt.Errorf("failed to get rollup client: %w", err)
} }
......
...@@ -12,7 +12,9 @@ import ( ...@@ -12,7 +12,9 @@ import (
// It does this by extending the RollupProvider interface to add the ability to get an EthClient // It does this by extending the RollupProvider interface to add the ability to get an EthClient
type L2EndpointProvider interface { type L2EndpointProvider interface {
RollupProvider RollupProvider
// EthClient(ctx) returns the underlying ethclient pointing to the L2 execution node // EthClient(ctx) returns the underlying ethclient pointing to the L2 execution node.
// Note: ctx should be a lifecycle context without an attached timeout as client selection may involve
// multiple network operations, specifically in the case of failover.
EthClient(ctx context.Context) (EthClientInterface, error) EthClient(ctx context.Context) (EthClientInterface, error)
} }
......
...@@ -10,7 +10,9 @@ import ( ...@@ -10,7 +10,9 @@ import (
// RollupProvider is an interface for providing a RollupClient // RollupProvider is an interface for providing a RollupClient
// It manages the lifecycle of the RollupClient for callers // It manages the lifecycle of the RollupClient for callers
type RollupProvider interface { type RollupProvider interface {
// RollupClient(ctx) returns the underlying sources.RollupClient pointing to the L2 rollup consensus node // RollupClient(ctx) returns the underlying sources.RollupClient pointing to the L2 rollup consensus node.
// Note: ctx should be a lifecycle context without an attached timeout as client selection may involve
// multiple network operations, specifically in the case of failover.
RollupClient(ctx context.Context) (RollupClientInterface, error) RollupClient(ctx context.Context) (RollupClientInterface, error)
// Close() closes the underlying client or clients // Close() closes the underlying client or clients
Close() Close()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment