Commit 8f406ae5 authored by Joshua Gutow's avatar Joshua Gutow

Make backoff package generic

parent 23054ecb
...@@ -102,22 +102,16 @@ func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption) ...@@ -102,22 +102,16 @@ func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption)
// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional. // Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional.
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, attempts int, opts ...rpc.ClientOption) (*rpc.Client, error) { func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, attempts int, opts ...rpc.ClientOption) (*rpc.Client, error) {
bOff := backoff.Exponential() bOff := backoff.Exponential()
var ret *rpc.Client return backoff.Do(ctx, attempts, bOff, func() (*rpc.Client, error) {
err := backoff.DoCtx(ctx, attempts, bOff, func() error {
client, err := rpc.DialOptions(ctx, addr, opts...) client, err := rpc.DialOptions(ctx, addr, opts...)
if err != nil { if err != nil {
if client == nil { if client == nil {
return fmt.Errorf("failed to dial address (%s): %w", addr, err) return nil, fmt.Errorf("failed to dial address (%s): %w", addr, err)
} }
log.Warn("failed to dial address, but may connect later", "addr", addr, "err", err) log.Warn("failed to dial address, but may connect later", "addr", addr, "err", err)
} }
ret = client return client, nil
return nil
}) })
if err != nil {
return nil, err
}
return ret, nil
} }
// BaseRPCClient is a wrapper around a concrete *rpc.Client instance to make it compliant // BaseRPCClient is a wrapper around a concrete *rpc.Client instance to make it compliant
......
...@@ -142,12 +142,12 @@ func (s *SyncClient) eventLoop() { ...@@ -142,12 +142,12 @@ func (s *SyncClient) eventLoop() {
s.log.Debug("Shutting down RPC sync worker") s.log.Debug("Shutting down RPC sync worker")
return return
case reqNum := <-s.requests: case reqNum := <-s.requests:
err := backoff.DoCtx(s.resCtx, 5, backoffStrategy, func() error { _, err := backoff.Do(s.resCtx, 5, backoffStrategy, func() (interface{}, error) {
// Limit the maximum time for fetching payloads // Limit the maximum time for fetching payloads
ctx, cancel := context.WithTimeout(s.resCtx, time.Second*10) ctx, cancel := context.WithTimeout(s.resCtx, time.Second*10)
defer cancel() defer cancel()
// We are only fetching one block at a time here. // We are only fetching one block at a time here.
return s.fetchUnsafeBlockFromRpc(ctx, reqNum) return nil, s.fetchUnsafeBlockFromRpc(ctx, reqNum)
}) })
if err != nil { if err != nil {
if err == s.resCtx.Err() { if err == s.resCtx.Err() {
......
...@@ -28,49 +28,33 @@ func NewRetryingL1Source(logger log.Logger, source L1Source) *RetryingL1Source { ...@@ -28,49 +28,33 @@ func NewRetryingL1Source(logger log.Logger, source L1Source) *RetryingL1Source {
} }
func (s *RetryingL1Source) InfoByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, error) { func (s *RetryingL1Source) InfoByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, error) {
var info eth.BlockInfo return backoff.Do(ctx, maxAttempts, s.strategy, func() (eth.BlockInfo, error) {
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
res, err := s.source.InfoByHash(ctx, blockHash) res, err := s.source.InfoByHash(ctx, blockHash)
if err != nil { if err != nil {
s.logger.Warn("Failed to retrieve info", "hash", blockHash, "err", err) s.logger.Warn("Failed to retrieve info", "hash", blockHash, "err", err)
return err
} }
info = res return res, err
return nil
}) })
return info, err
} }
func (s *RetryingL1Source) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) { func (s *RetryingL1Source) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) {
var info eth.BlockInfo return backoff.Do2(ctx, maxAttempts, s.strategy, func() (eth.BlockInfo, types.Transactions, error) {
var txs types.Transactions
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
i, t, err := s.source.InfoAndTxsByHash(ctx, blockHash) i, t, err := s.source.InfoAndTxsByHash(ctx, blockHash)
if err != nil { if err != nil {
s.logger.Warn("Failed to retrieve l1 info and txs", "hash", blockHash, "err", err) s.logger.Warn("Failed to retrieve l1 info and txs", "hash", blockHash, "err", err)
return err
} }
info = i return i, t, err
txs = t
return nil
}) })
return info, txs, err
} }
func (s *RetryingL1Source) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) { func (s *RetryingL1Source) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) {
var info eth.BlockInfo return backoff.Do2(ctx, maxAttempts, s.strategy, func() (eth.BlockInfo, types.Receipts, error) {
var rcpts types.Receipts
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
i, r, err := s.source.FetchReceipts(ctx, blockHash) i, r, err := s.source.FetchReceipts(ctx, blockHash)
if err != nil { if err != nil {
s.logger.Warn("Failed to fetch receipts", "hash", blockHash, "err", err) s.logger.Warn("Failed to fetch receipts", "hash", blockHash, "err", err)
return err
} }
info = i return i, r, err
rcpts = r
return nil
}) })
return info, rcpts, err
} }
var _ L1Source = (*RetryingL1Source)(nil) var _ L1Source = (*RetryingL1Source)(nil)
...@@ -82,47 +66,33 @@ type RetryingL2Source struct { ...@@ -82,47 +66,33 @@ type RetryingL2Source struct {
} }
func (s *RetryingL2Source) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) { func (s *RetryingL2Source) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) {
var info eth.BlockInfo return backoff.Do2(ctx, maxAttempts, s.strategy, func() (eth.BlockInfo, types.Transactions, error) {
var txs types.Transactions
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
i, t, err := s.source.InfoAndTxsByHash(ctx, blockHash) i, t, err := s.source.InfoAndTxsByHash(ctx, blockHash)
if err != nil { if err != nil {
s.logger.Warn("Failed to retrieve l2 info and txs", "hash", blockHash, "err", err) s.logger.Warn("Failed to retrieve l2 info and txs", "hash", blockHash, "err", err)
return err
} }
info = i return i, t, err
txs = t
return nil
}) })
return info, txs, err
} }
func (s *RetryingL2Source) NodeByHash(ctx context.Context, hash common.Hash) ([]byte, error) { func (s *RetryingL2Source) NodeByHash(ctx context.Context, hash common.Hash) ([]byte, error) {
var node []byte return backoff.Do(ctx, maxAttempts, s.strategy, func() ([]byte, error) {
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
n, err := s.source.NodeByHash(ctx, hash) n, err := s.source.NodeByHash(ctx, hash)
if err != nil { if err != nil {
s.logger.Warn("Failed to retrieve node", "hash", hash, "err", err) s.logger.Warn("Failed to retrieve node", "hash", hash, "err", err)
return err
} }
node = n return n, err
return nil
}) })
return node, err
} }
func (s *RetryingL2Source) CodeByHash(ctx context.Context, hash common.Hash) ([]byte, error) { func (s *RetryingL2Source) CodeByHash(ctx context.Context, hash common.Hash) ([]byte, error) {
var code []byte return backoff.Do(ctx, maxAttempts, s.strategy, func() ([]byte, error) {
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
c, err := s.source.CodeByHash(ctx, hash) c, err := s.source.CodeByHash(ctx, hash)
if err != nil { if err != nil {
s.logger.Warn("Failed to retrieve code", "hash", hash, "err", err) s.logger.Warn("Failed to retrieve code", "hash", hash, "err", err)
return err
} }
code = c return c, err
return nil
}) })
return code, err
} }
func (s *RetryingL2Source) OutputByRoot(ctx context.Context, root common.Hash) (eth.Output, error) { func (s *RetryingL2Source) OutputByRoot(ctx context.Context, root common.Hash) (eth.Output, error) {
......
...@@ -6,10 +6,6 @@ import ( ...@@ -6,10 +6,6 @@ import (
"time" "time"
) )
// Operation represents an operation that will be retried
// based on some backoff strategy if it fails.
type Operation func() error
// ErrFailedPermanently is an error raised by Do when the // ErrFailedPermanently is an error raised by Do when the
// underlying Operation has been retried maxAttempts times. // underlying Operation has been retried maxAttempts times.
type ErrFailedPermanently struct { type ErrFailedPermanently struct {
...@@ -21,16 +17,27 @@ func (e *ErrFailedPermanently) Error() string { ...@@ -21,16 +17,27 @@ func (e *ErrFailedPermanently) Error() string {
return fmt.Sprintf("operation failed permanently after %d attempts: %v", e.attempts, e.LastErr) return fmt.Sprintf("operation failed permanently after %d attempts: %v", e.attempts, e.LastErr)
} }
type pair[T, U any] struct {
a T
b U
}
func Do2[T, U any](ctx context.Context, maxAttempts int, strategy Strategy, op func() (T, U, error)) (T, U, error) {
f := func() (pair[T, U], error) {
a, b, err := op()
return pair[T, U]{a, b}, err
}
res, err := Do(ctx, maxAttempts, strategy, f)
return res.a, res.b, err
}
// Do performs the provided Operation up to maxAttempts times // Do performs the provided Operation up to maxAttempts times
// with delays in between each retry according to the provided // with delays in between each retry according to the provided
// Strategy. // Strategy.
func Do(maxAttempts int, strategy Strategy, op Operation) error { func Do[T any](ctx context.Context, maxAttempts int, strategy Strategy, op func() (T, error)) (T, error) {
return DoCtx(context.Background(), maxAttempts, strategy, op) var empty T
}
func DoCtx(ctx context.Context, maxAttempts int, strategy Strategy, op Operation) error {
if maxAttempts < 1 { if maxAttempts < 1 {
return fmt.Errorf("need at least 1 attempt to run op, but have %d max attempts", maxAttempts) return empty, fmt.Errorf("need at least 1 attempt to run op, but have %d max attempts", maxAttempts)
} }
var attempt int var attempt int
...@@ -43,16 +50,16 @@ func DoCtx(ctx context.Context, maxAttempts int, strategy Strategy, op Operation ...@@ -43,16 +50,16 @@ func DoCtx(ctx context.Context, maxAttempts int, strategy Strategy, op Operation
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return empty, ctx.Err()
case <-reattemptCh: case <-reattemptCh:
attempt++ attempt++
err := op() ret, err := op()
if err == nil { if err == nil {
return nil return ret, nil
} }
if attempt == maxAttempts { if attempt == maxAttempts {
return &ErrFailedPermanently{ return empty, &ErrFailedPermanently{
attempts: maxAttempts, attempts: maxAttempts,
LastErr: err, LastErr: err,
} }
......
package backoff package backoff
import ( import (
"context"
"errors" "errors"
"testing" "testing"
"time" "time"
...@@ -14,20 +15,19 @@ func TestDo(t *testing.T) { ...@@ -14,20 +15,19 @@ func TestDo(t *testing.T) {
start := time.Now() start := time.Now()
var i int var i int
require.NoError(t, Do(2, strategy, func() error { _, err := Do(context.Background(), 2, strategy, func() (int, error) {
if i == 1 { if i == 1 {
return nil return 0, nil
} }
i++ i++
return dummyErr return 0, dummyErr
})) })
require.NoError(t, err)
require.True(t, time.Since(start) > 10*time.Millisecond) require.True(t, time.Since(start) > 10*time.Millisecond)
start = time.Now() start = time.Now()
// add one because the first attempt counts // add one because the first attempt counts
err := Do(3, strategy, func() error { _, err = Do(context.Background(), 3, strategy, func() (int, error) {
return dummyErr return 0, dummyErr
}) })
require.Equal(t, dummyErr, err.(*ErrFailedPermanently).LastErr) require.Equal(t, dummyErr, err.(*ErrFailedPermanently).LastErr)
require.True(t, time.Since(start) > 20*time.Millisecond) require.True(t, time.Since(start) > 20*time.Millisecond)
......
...@@ -52,20 +52,14 @@ func DialRollupClientWithTimeout(timeout time.Duration, log log.Logger, url stri ...@@ -52,20 +52,14 @@ func DialRollupClientWithTimeout(timeout time.Duration, log log.Logger, url stri
// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional. // Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional.
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string) (*rpc.Client, error) { func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string) (*rpc.Client, error) {
bOff := backoff.Fixed(defaultRetryTime) bOff := backoff.Fixed(defaultRetryTime)
var ret *rpc.Client return backoff.Do(ctx, defaultRetryCount, bOff, func() (*rpc.Client, error) {
err := backoff.DoCtx(ctx, defaultRetryCount, bOff, func() error {
client, err := rpc.DialOptions(ctx, addr) client, err := rpc.DialOptions(ctx, addr)
if err != nil { if err != nil {
return fmt.Errorf("failed to dial address (%s): %w", addr, err) return nil, fmt.Errorf("failed to dial address (%s): %w", addr, err)
} }
// log.Warn("failed to dial address, but may connect later", "addr", addr, "err", err) // log.Warn("failed to dial address, but may connect later", "addr", addr, "err", err)
ret = client return client, nil
return nil
}) })
if err != nil {
return nil, err
}
return ret, nil
} }
func IsURLAvailable(address string) bool { func IsURLAvailable(address string) bool {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment