Commit 4fa47249 authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #6305 from ethereum-optimism/jg/backoff_when_dial_rpc_proposer_batcher

op-service,batcher,proposer: Retry when dialing RPC URLs
parents 1e46da0f 416289e8
......@@ -28,11 +28,8 @@ func (e *EthBridge) GetDepositsByBlockRange(ctx context.Context, start, end uint
End: &end,
}
var iter *bindings.L1StandardBridgeETHDepositInitiatedIterator
err := backoff.Do(3, backoff.Exponential(), func() error {
var err error
iter, err = e.contract.FilterETHDepositInitiated(opts, nil, nil)
return err
iter, err := backoff.Do(ctx, 3, backoff.Exponential(), func() (*bindings.L1StandardBridgeETHDepositInitiatedIterator, error) {
return e.contract.FilterETHDepositInitiated(opts, nil, nil)
})
if err != nil {
return nil, err
......
......@@ -37,11 +37,8 @@ func (p *Portal) GetProvenWithdrawalsByBlockRange(ctx context.Context, start, en
End: &end,
}
var iter *bindings.OptimismPortalWithdrawalProvenIterator
err := backoff.Do(3, backoff.Exponential(), func() error {
var err error
iter, err = p.contract.FilterWithdrawalProven(opts, nil, nil, nil)
return err
iter, err := backoff.Do(ctx, 3, backoff.Exponential(), func() (*bindings.OptimismPortalWithdrawalProvenIterator, error) {
return p.contract.FilterWithdrawalProven(opts, nil, nil, nil)
})
if err != nil {
return nil, err
......@@ -71,11 +68,8 @@ func (p *Portal) GetFinalizedWithdrawalsByBlockRange(ctx context.Context, start,
End: &end,
}
var iter *bindings.OptimismPortalWithdrawalFinalizedIterator
err := backoff.Do(3, backoff.Exponential(), func() error {
var err error
iter, err = p.contract.FilterWithdrawalFinalized(opts, nil)
return err
iter, err := backoff.Do(ctx, 3, backoff.Exponential(), func() (*bindings.OptimismPortalWithdrawalFinalizedIterator, error) {
return p.contract.FilterWithdrawalFinalized(opts, nil)
})
if err != nil {
return nil, err
......
......@@ -28,11 +28,8 @@ func (s *StandardBridge) GetDepositsByBlockRange(ctx context.Context, start, end
End: &end,
}
var iter *bindings.L1StandardBridgeERC20DepositInitiatedIterator
err := backoff.Do(3, backoff.Exponential(), func() error {
var err error
iter, err = s.contract.FilterERC20DepositInitiated(opts, nil, nil, nil)
return err
iter, err := backoff.Do(ctx, 3, backoff.Exponential(), func() (*bindings.L1StandardBridgeERC20DepositInitiatedIterator, error) {
return s.contract.FilterERC20DepositInitiated(opts, nil, nil, nil)
})
if err != nil {
return nil, err
......
......@@ -35,11 +35,8 @@ func (s *StandardBridge) GetWithdrawalsByBlockRange(ctx context.Context, start,
End: &end,
}
var iter *bindings.L2StandardBridgeWithdrawalInitiatedIterator
err := backoff.Do(3, backoff.Exponential(), func() error {
var err error
iter, err = s.l2SB.FilterWithdrawalInitiated(opts, nil, nil, nil)
return err
iter, err := backoff.Do(ctx, 3, backoff.Exponential(), func() (*bindings.L2StandardBridgeWithdrawalInitiatedIterator, error) {
return s.l2SB.FilterWithdrawalInitiated(opts, nil, nil, nil)
})
if err != nil {
return nil, err
......
......@@ -10,11 +10,7 @@ import (
// HeaderByNumberWithRetry retries getting headers.
func HeaderByNumberWithRetry(ctx context.Context, client *ethclient.Client) (*types.Header, error) {
var res *types.Header
err := backoff.DoCtx(ctx, 3, backoff.Exponential(), func() error {
var err error
res, err = client.HeaderByNumber(ctx, nil)
return err
return backoff.Do(ctx, 3, backoff.Exponential(), func() (*types.Header, error) {
return client.HeaderByNumber(ctx, nil)
})
return res, err
}
......@@ -50,17 +50,17 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
// Connect to L1 and L2 providers. Perform these last since they are the
// most expensive.
l1Client, err := opclient.DialEthClientWithTimeout(ctx, cfg.L1EthRpc, opclient.DefaultDialTimeout)
l1Client, err := opclient.DialEthClientWithTimeout(opclient.DefaultDialTimeout, l, cfg.L1EthRpc)
if err != nil {
return nil, err
}
l2Client, err := opclient.DialEthClientWithTimeout(ctx, cfg.L2EthRpc, opclient.DefaultDialTimeout)
l2Client, err := opclient.DialEthClientWithTimeout(opclient.DefaultDialTimeout, l, cfg.L2EthRpc)
if err != nil {
return nil, err
}
rollupClient, err := opclient.DialRollupClientWithTimeout(ctx, cfg.RollupRpc, opclient.DefaultDialTimeout)
rollupClient, err := opclient.DialRollupClientWithTimeout(opclient.DefaultDialTimeout, l, cfg.RollupRpc)
if err != nil {
return nil, err
}
......
......@@ -102,22 +102,16 @@ func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption)
// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional.
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, attempts int, opts ...rpc.ClientOption) (*rpc.Client, error) {
bOff := backoff.Exponential()
var ret *rpc.Client
err := backoff.DoCtx(ctx, attempts, bOff, func() error {
return backoff.Do(ctx, attempts, bOff, func() (*rpc.Client, error) {
client, err := rpc.DialOptions(ctx, addr, opts...)
if err != nil {
if client == nil {
return fmt.Errorf("failed to dial address (%s): %w", addr, err)
return nil, fmt.Errorf("failed to dial address (%s): %w", addr, err)
}
log.Warn("failed to dial address, but may connect later", "addr", addr, "err", err)
}
ret = client
return nil
return client, nil
})
if err != nil {
return nil, err
}
return ret, nil
}
// BaseRPCClient is a wrapper around a concrete *rpc.Client instance to make it compliant
......
......@@ -142,12 +142,12 @@ func (s *SyncClient) eventLoop() {
s.log.Debug("Shutting down RPC sync worker")
return
case reqNum := <-s.requests:
err := backoff.DoCtx(s.resCtx, 5, backoffStrategy, func() error {
_, err := backoff.Do(s.resCtx, 5, backoffStrategy, func() (interface{}, error) {
// Limit the maximum time for fetching payloads
ctx, cancel := context.WithTimeout(s.resCtx, time.Second*10)
defer cancel()
// We are only fetching one block at a time here.
return s.fetchUnsafeBlockFromRpc(ctx, reqNum)
return nil, s.fetchUnsafeBlockFromRpc(ctx, reqNum)
})
if err != nil {
if err == s.resCtx.Err() {
......
......@@ -28,49 +28,33 @@ func NewRetryingL1Source(logger log.Logger, source L1Source) *RetryingL1Source {
}
func (s *RetryingL1Source) InfoByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, error) {
var info eth.BlockInfo
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
return backoff.Do(ctx, maxAttempts, s.strategy, func() (eth.BlockInfo, error) {
res, err := s.source.InfoByHash(ctx, blockHash)
if err != nil {
s.logger.Warn("Failed to retrieve info", "hash", blockHash, "err", err)
return err
}
info = res
return nil
return res, err
})
return info, err
}
func (s *RetryingL1Source) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) {
var info eth.BlockInfo
var txs types.Transactions
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
return backoff.Do2(ctx, maxAttempts, s.strategy, func() (eth.BlockInfo, types.Transactions, error) {
i, t, err := s.source.InfoAndTxsByHash(ctx, blockHash)
if err != nil {
s.logger.Warn("Failed to retrieve l1 info and txs", "hash", blockHash, "err", err)
return err
}
info = i
txs = t
return nil
return i, t, err
})
return info, txs, err
}
func (s *RetryingL1Source) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) {
var info eth.BlockInfo
var rcpts types.Receipts
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
return backoff.Do2(ctx, maxAttempts, s.strategy, func() (eth.BlockInfo, types.Receipts, error) {
i, r, err := s.source.FetchReceipts(ctx, blockHash)
if err != nil {
s.logger.Warn("Failed to fetch receipts", "hash", blockHash, "err", err)
return err
}
info = i
rcpts = r
return nil
return i, r, err
})
return info, rcpts, err
}
var _ L1Source = (*RetryingL1Source)(nil)
......@@ -82,61 +66,44 @@ type RetryingL2Source struct {
}
func (s *RetryingL2Source) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) {
var info eth.BlockInfo
var txs types.Transactions
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
return backoff.Do2(ctx, maxAttempts, s.strategy, func() (eth.BlockInfo, types.Transactions, error) {
i, t, err := s.source.InfoAndTxsByHash(ctx, blockHash)
if err != nil {
s.logger.Warn("Failed to retrieve l2 info and txs", "hash", blockHash, "err", err)
return err
}
info = i
txs = t
return nil
return i, t, err
})
return info, txs, err
}
func (s *RetryingL2Source) NodeByHash(ctx context.Context, hash common.Hash) ([]byte, error) {
var node []byte
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
return backoff.Do(ctx, maxAttempts, s.strategy, func() ([]byte, error) {
n, err := s.source.NodeByHash(ctx, hash)
if err != nil {
s.logger.Warn("Failed to retrieve node", "hash", hash, "err", err)
return err
}
node = n
return nil
return n, err
})
return node, err
}
func (s *RetryingL2Source) CodeByHash(ctx context.Context, hash common.Hash) ([]byte, error) {
var code []byte
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
return backoff.Do(ctx, maxAttempts, s.strategy, func() ([]byte, error) {
c, err := s.source.CodeByHash(ctx, hash)
if err != nil {
s.logger.Warn("Failed to retrieve code", "hash", hash, "err", err)
return err
}
code = c
return nil
return c, err
})
return code, err
}
func (s *RetryingL2Source) OutputByRoot(ctx context.Context, root common.Hash) (eth.Output, error) {
var output eth.Output
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
return backoff.Do(ctx, maxAttempts, s.strategy, func() (eth.Output, error) {
o, err := s.source.OutputByRoot(ctx, root)
if err != nil {
s.logger.Warn("Failed to fetch l2 output", "root", root, "err", err)
return err
return o, err
}
output = o
return nil
return o, nil
})
return output, err
}
func NewRetryingL2Source(logger log.Logger, source L2Source) *RetryingL2Source {
......
......@@ -157,13 +157,12 @@ func NewL2OutputSubmitterConfigFromCLIConfig(cfg CLIConfig, l log.Logger, m metr
}
// Connect to L1 and L2 providers. Perform these last since they are the most expensive.
ctx := context.Background()
l1Client, err := opclient.DialEthClientWithTimeout(ctx, cfg.L1EthRpc, opclient.DefaultDialTimeout)
l1Client, err := opclient.DialEthClientWithTimeout(opclient.DefaultDialTimeout, l, cfg.L1EthRpc)
if err != nil {
return nil, err
}
rollupClient, err := opclient.DialRollupClientWithTimeout(ctx, cfg.RollupRpc, opclient.DefaultDialTimeout)
rollupClient, err := opclient.DialRollupClientWithTimeout(opclient.DefaultDialTimeout, l, cfg.RollupRpc)
if err != nil {
return nil, err
}
......
......@@ -6,10 +6,6 @@ import (
"time"
)
// Operation represents an operation that will be retried
// based on some backoff strategy if it fails.
type Operation func() error
// ErrFailedPermanently is an error raised by Do when the
// underlying Operation has been retried maxAttempts times.
type ErrFailedPermanently struct {
......@@ -21,16 +17,27 @@ func (e *ErrFailedPermanently) Error() string {
return fmt.Sprintf("operation failed permanently after %d attempts: %v", e.attempts, e.LastErr)
}
type pair[T, U any] struct {
a T
b U
}
func Do2[T, U any](ctx context.Context, maxAttempts int, strategy Strategy, op func() (T, U, error)) (T, U, error) {
f := func() (pair[T, U], error) {
a, b, err := op()
return pair[T, U]{a, b}, err
}
res, err := Do(ctx, maxAttempts, strategy, f)
return res.a, res.b, err
}
// Do performs the provided Operation up to maxAttempts times
// with delays in between each retry according to the provided
// Strategy.
func Do(maxAttempts int, strategy Strategy, op Operation) error {
return DoCtx(context.Background(), maxAttempts, strategy, op)
}
func DoCtx(ctx context.Context, maxAttempts int, strategy Strategy, op Operation) error {
func Do[T any](ctx context.Context, maxAttempts int, strategy Strategy, op func() (T, error)) (T, error) {
var empty T
if maxAttempts < 1 {
return fmt.Errorf("need at least 1 attempt to run op, but have %d max attempts", maxAttempts)
return empty, fmt.Errorf("need at least 1 attempt to run op, but have %d max attempts", maxAttempts)
}
var attempt int
......@@ -43,16 +50,16 @@ func DoCtx(ctx context.Context, maxAttempts int, strategy Strategy, op Operation
for {
select {
case <-ctx.Done():
return ctx.Err()
return empty, ctx.Err()
case <-reattemptCh:
attempt++
err := op()
ret, err := op()
if err == nil {
return nil
return ret, nil
}
if attempt == maxAttempts {
return &ErrFailedPermanently{
return empty, &ErrFailedPermanently{
attempts: maxAttempts,
LastErr: err,
}
......
package backoff
import (
"context"
"errors"
"testing"
"time"
......@@ -14,20 +15,19 @@ func TestDo(t *testing.T) {
start := time.Now()
var i int
require.NoError(t, Do(2, strategy, func() error {
_, err := Do(context.Background(), 2, strategy, func() (int, error) {
if i == 1 {
return nil
return 0, nil
}
i++
return dummyErr
}))
return 0, dummyErr
})
require.NoError(t, err)
require.True(t, time.Since(start) > 10*time.Millisecond)
start = time.Now()
// add one because the first attempt counts
err := Do(3, strategy, func() error {
return dummyErr
_, err = Do(context.Background(), 3, strategy, func() (int, error) {
return 0, dummyErr
})
require.Equal(t, dummyErr, err.(*ErrFailedPermanently).LastErr)
require.True(t, time.Since(start) > 20*time.Millisecond)
......
package client
import (
"context"
"fmt"
"net"
"net/url"
"time"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-service/backoff"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
// DefaultDialTimeout is a default timeout for dialing a client.
const DefaultDialTimeout = 1 * time.Minute
const defaultRetryCount = 30
const defaultRetryTime = 2 * time.Second
// DialEthClientWithTimeout attempts to dial the L1 provider using the provided
// URL. If the dial doesn't complete within defaultDialTimeout seconds, this
// method will return an error.
func DialEthClientWithTimeout(timeout time.Duration, log log.Logger, url string) (*ethclient.Client, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
c, err := dialRPCClientWithBackoff(ctx, log, url)
if err != nil {
return nil, err
}
return ethclient.NewClient(c), nil
}
// DialRollupClientWithTimeout attempts to dial the RPC provider using the provided URL.
// If the dial doesn't complete within timeout seconds, this method will return an error.
func DialRollupClientWithTimeout(timeout time.Duration, log log.Logger, url string) (*sources.RollupClient, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
rpcCl, err := dialRPCClientWithBackoff(ctx, log, url)
if err != nil {
return nil, err
}
return sources.NewRollupClient(client.NewBaseRPCClient(rpcCl)), nil
}
// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional.
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string) (*rpc.Client, error) {
bOff := backoff.Fixed(defaultRetryTime)
return backoff.Do(ctx, defaultRetryCount, bOff, func() (*rpc.Client, error) {
if !IsURLAvailable(addr) {
log.Warn("failed to dial address, but may connect later", "addr", addr)
return nil, fmt.Errorf("address unavailable (%s)", addr)
}
client, err := rpc.DialOptions(ctx, addr)
if err != nil {
return nil, fmt.Errorf("failed to dial address (%s): %w", addr, err)
}
return client, nil
})
}
func IsURLAvailable(address string) bool {
u, err := url.Parse(address)
if err != nil {
return false
}
conn, err := net.DialTimeout("tcp", u.Host, 5*time.Second)
if err != nil {
return false
}
conn.Close()
return true
}
package client
import (
"fmt"
"net"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func TestIsURLAvailable(t *testing.T) {
listener, err := net.Listen("tcp4", ":0")
require.NoError(t, err)
defer listener.Close()
a := listener.Addr().String()
parts := strings.Split(a, ":")
addr := fmt.Sprintf("http://localhost:%s", parts[1])
require.True(t, IsURLAvailable(addr))
require.False(t, IsURLAvailable("http://localhost:0"))
}
package client
import (
"context"
"time"
"github.com/ethereum/go-ethereum/ethclient"
)
// DialEthClientWithTimeout attempts to dial the L1 provider using the provided
// URL. If the dial doesn't complete within defaultDialTimeout seconds, this
// method will return an error.
func DialEthClientWithTimeout(ctx context.Context, url string, timeout time.Duration) (*ethclient.Client, error) {
ctxt, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return ethclient.DialContext(ctxt, url)
}
package client
import (
"context"
"time"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum/go-ethereum/rpc"
)
// DialRollupClientWithTimeout attempts to dial the RPC provider using the provided
// URL. If the dial doesn't complete within defaultDialTimeout seconds, this
// method will return an error.
func DialRollupClientWithTimeout(ctx context.Context, url string, timeout time.Duration) (*sources.RollupClient, error) {
ctxt, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
rpcCl, err := rpc.DialContext(ctxt, url)
if err != nil {
return nil, err
}
return sources.NewRollupClient(client.NewBaseRPCClient(rpcCl)), nil
}
package client
import (
"time"
)
// DefaultDialTimeout is a default timeout for dialing a client.
const DefaultDialTimeout = 5 * time.Second
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment