Commit 828682cf authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #6089 from ethereum-optimism/aj/revert-retry-client

op-program/op-service: Revert retry client
parents 1c707578 eba66055
......@@ -6,7 +6,6 @@ import (
"fmt"
"io"
"io/fs"
"math"
"os"
"os/exec"
......@@ -22,13 +21,10 @@ import (
"github.com/ethereum-optimism/optimism/op-program/host/prefetcher"
oppio "github.com/ethereum-optimism/optimism/op-program/io"
opservice "github.com/ethereum-optimism/optimism/op-service"
opclient "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
const maxRPCRetries = math.MaxInt
type L2Source struct {
*sources.L2Client
*sources.DebugClient
......@@ -192,41 +188,31 @@ func PreimageServer(ctx context.Context, logger log.Logger, cfg *config.Config,
func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *config.Config) (*prefetcher.Prefetcher, error) {
logger.Info("Connecting to L1 node", "l1", cfg.L1URL)
l1RPC, err := createRetryingRPC(ctx, logger, cfg.L1URL)
l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL)
if err != nil {
return nil, fmt.Errorf("failed to setup L1 RPC: %w", err)
}
logger.Info("Connecting to L2 node", "l2", cfg.L2URL)
l2RPC, err := createRetryingRPC(ctx, logger, cfg.L2URL)
l2RPC, err := client.NewRPC(ctx, logger, cfg.L2URL)
if err != nil {
return nil, fmt.Errorf("failed to setup L2 RPC: %w", err)
}
l1ClCfg := sources.L1ClientDefaultConfig(cfg.Rollup, cfg.L1TrustRPC, cfg.L1RPCKind)
l2ClCfg := sources.L2ClientDefaultConfig(cfg.Rollup, true)
l1Cl, err := sources.NewL1Client(l1RPC, logger, nil, l1ClCfg)
if err != nil {
return nil, fmt.Errorf("failed to create L1 client: %w", err)
}
l2ClCfg := sources.L2ClientDefaultConfig(cfg.Rollup, true)
l2Cl, err := sources.NewL2Client(l2RPC, logger, nil, l2ClCfg)
if err != nil {
return nil, fmt.Errorf("failed to create L2 client: %w", err)
}
l2DebugCl := &L2Source{L2Client: l2Cl, DebugClient: sources.NewDebugClient(l2RPC.CallContext)}
return prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv), nil
}
func createRetryingRPC(ctx context.Context, logger log.Logger, url string) (client.RPC, error) {
rpc, err := client.NewRPC(ctx, logger, url)
if err != nil {
return nil, err
}
return opclient.NewRetryingClient(logger, rpc, maxRPCRetries), nil
}
func routeHints(logger log.Logger, hHostRW io.ReadWriter, hinter preimage.HintHandler) chan error {
chErr := make(chan error)
hintReader := preimage.NewHintReader(hHostRW)
......
......@@ -42,8 +42,8 @@ type Prefetcher struct {
func NewPrefetcher(logger log.Logger, l1Fetcher L1Source, l2Fetcher L2Source, kvStore kvstore.KV) *Prefetcher {
return &Prefetcher{
logger: logger,
l1Fetcher: l1Fetcher,
l2Fetcher: l2Fetcher,
l1Fetcher: NewRetryingL1Source(logger, l1Fetcher),
l2Fetcher: NewRetryingL2Source(logger, l2Fetcher),
kvStore: kvStore,
}
}
......
package prefetcher
import (
"context"
"math"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-service/backoff"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
const maxAttempts = math.MaxInt // Succeed or die trying
type RetryingL1Source struct {
logger log.Logger
source L1Source
strategy backoff.Strategy
}
func NewRetryingL1Source(logger log.Logger, source L1Source) *RetryingL1Source {
return &RetryingL1Source{
logger: logger,
source: source,
strategy: backoff.Exponential(),
}
}
func (s *RetryingL1Source) InfoByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, error) {
var info eth.BlockInfo
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
res, err := s.source.InfoByHash(ctx, blockHash)
if err != nil {
s.logger.Warn("Failed to retrieve info", "hash", blockHash, "err", err)
return err
}
info = res
return nil
})
return info, err
}
func (s *RetryingL1Source) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) {
var info eth.BlockInfo
var txs types.Transactions
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
i, t, err := s.source.InfoAndTxsByHash(ctx, blockHash)
if err != nil {
s.logger.Warn("Failed to retrieve l1 info and txs", "hash", blockHash, "err", err)
return err
}
info = i
txs = t
return nil
})
return info, txs, err
}
func (s *RetryingL1Source) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) {
var info eth.BlockInfo
var rcpts types.Receipts
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
i, r, err := s.source.FetchReceipts(ctx, blockHash)
if err != nil {
s.logger.Warn("Failed to fetch receipts", "hash", blockHash, "err", err)
return err
}
info = i
rcpts = r
return nil
})
return info, rcpts, err
}
var _ L1Source = (*RetryingL1Source)(nil)
type RetryingL2Source struct {
logger log.Logger
source L2Source
strategy backoff.Strategy
}
func (s *RetryingL2Source) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) {
var info eth.BlockInfo
var txs types.Transactions
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
i, t, err := s.source.InfoAndTxsByHash(ctx, blockHash)
if err != nil {
s.logger.Warn("Failed to retrieve l2 info and txs", "hash", blockHash, "err", err)
return err
}
info = i
txs = t
return nil
})
return info, txs, err
}
func (s *RetryingL2Source) NodeByHash(ctx context.Context, hash common.Hash) ([]byte, error) {
var node []byte
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
n, err := s.source.NodeByHash(ctx, hash)
if err != nil {
s.logger.Warn("Failed to retrieve node", "hash", hash, "err", err)
return err
}
node = n
return nil
})
return node, err
}
func (s *RetryingL2Source) CodeByHash(ctx context.Context, hash common.Hash) ([]byte, error) {
var code []byte
err := backoff.DoCtx(ctx, maxAttempts, s.strategy, func() error {
c, err := s.source.CodeByHash(ctx, hash)
if err != nil {
s.logger.Warn("Failed to retrieve code", "hash", hash, "err", err)
return err
}
code = c
return nil
})
return code, err
}
func NewRetryingL2Source(logger log.Logger, source L2Source) *RetryingL2Source {
return &RetryingL2Source{
logger: logger,
source: source,
strategy: backoff.Exponential(),
}
}
var _ L2Source = (*RetryingL2Source)(nil)
package prefetcher
import (
"context"
"errors"
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum-optimism/optimism/op-service/backoff"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestRetryingL1Source(t *testing.T) {
ctx := context.Background()
hash := common.Hash{0xab}
info := &testutils.MockBlockInfo{InfoHash: hash}
// The mock really doesn't like returning nil for a eth.BlockInfo so return a value we expect to be ignored instead
wrongInfo := &testutils.MockBlockInfo{InfoHash: common.Hash{0x99}}
txs := types.Transactions{
&types.Transaction{},
}
rcpts := types.Receipts{
&types.Receipt{},
}
t.Run("InfoByHash Success", func(t *testing.T) {
source, mock := createL1Source(t)
defer mock.AssertExpectations(t)
mock.ExpectInfoByHash(hash, info, nil)
result, err := source.InfoByHash(ctx, hash)
require.NoError(t, err)
require.Equal(t, info, result)
})
t.Run("InfoByHash Error", func(t *testing.T) {
source, mock := createL1Source(t)
defer mock.AssertExpectations(t)
expectedErr := errors.New("boom")
mock.ExpectInfoByHash(hash, wrongInfo, expectedErr)
mock.ExpectInfoByHash(hash, info, nil)
result, err := source.InfoByHash(ctx, hash)
require.NoError(t, err)
require.Equal(t, info, result)
})
t.Run("InfoAndTxsByHash Success", func(t *testing.T) {
source, mock := createL1Source(t)
defer mock.AssertExpectations(t)
mock.ExpectInfoAndTxsByHash(hash, info, txs, nil)
actualInfo, actualTxs, err := source.InfoAndTxsByHash(ctx, hash)
require.NoError(t, err)
require.Equal(t, info, actualInfo)
require.Equal(t, txs, actualTxs)
})
t.Run("InfoAndTxsByHash Error", func(t *testing.T) {
source, mock := createL1Source(t)
defer mock.AssertExpectations(t)
expectedErr := errors.New("boom")
mock.ExpectInfoAndTxsByHash(hash, wrongInfo, nil, expectedErr)
mock.ExpectInfoAndTxsByHash(hash, info, txs, nil)
actualInfo, actualTxs, err := source.InfoAndTxsByHash(ctx, hash)
require.NoError(t, err)
require.Equal(t, info, actualInfo)
require.Equal(t, txs, actualTxs)
})
t.Run("FetchReceipts Success", func(t *testing.T) {
source, mock := createL1Source(t)
defer mock.AssertExpectations(t)
mock.ExpectFetchReceipts(hash, info, rcpts, nil)
actualInfo, actualRcpts, err := source.FetchReceipts(ctx, hash)
require.NoError(t, err)
require.Equal(t, info, actualInfo)
require.Equal(t, rcpts, actualRcpts)
})
t.Run("FetchReceipts Error", func(t *testing.T) {
source, mock := createL1Source(t)
defer mock.AssertExpectations(t)
expectedErr := errors.New("boom")
mock.ExpectFetchReceipts(hash, wrongInfo, nil, expectedErr)
mock.ExpectFetchReceipts(hash, info, rcpts, nil)
actualInfo, actualRcpts, err := source.FetchReceipts(ctx, hash)
require.NoError(t, err)
require.Equal(t, info, actualInfo)
require.Equal(t, rcpts, actualRcpts)
})
}
func createL1Source(t *testing.T) (*RetryingL1Source, *testutils.MockL1Source) {
logger := testlog.Logger(t, log.LvlDebug)
mock := &testutils.MockL1Source{}
source := NewRetryingL1Source(logger, mock)
// Avoid sleeping in tests by using a fixed backoff strategy with no delay
source.strategy = backoff.Fixed(0)
return source, mock
}
func TestRetryingL2Source(t *testing.T) {
ctx := context.Background()
hash := common.Hash{0xab}
info := &testutils.MockBlockInfo{InfoHash: hash}
// The mock really doesn't like returning nil for a eth.BlockInfo so return a value we expect to be ignored instead
wrongInfo := &testutils.MockBlockInfo{InfoHash: common.Hash{0x99}}
txs := types.Transactions{
&types.Transaction{},
}
data := []byte{1, 2, 3, 4, 5}
t.Run("InfoAndTxsByHash Success", func(t *testing.T) {
source, mock := createL2Source(t)
defer mock.AssertExpectations(t)
mock.ExpectInfoAndTxsByHash(hash, info, txs, nil)
actualInfo, actualTxs, err := source.InfoAndTxsByHash(ctx, hash)
require.NoError(t, err)
require.Equal(t, info, actualInfo)
require.Equal(t, txs, actualTxs)
})
t.Run("InfoAndTxsByHash Error", func(t *testing.T) {
source, mock := createL2Source(t)
defer mock.AssertExpectations(t)
expectedErr := errors.New("boom")
mock.ExpectInfoAndTxsByHash(hash, wrongInfo, nil, expectedErr)
mock.ExpectInfoAndTxsByHash(hash, info, txs, nil)
actualInfo, actualTxs, err := source.InfoAndTxsByHash(ctx, hash)
require.NoError(t, err)
require.Equal(t, info, actualInfo)
require.Equal(t, txs, actualTxs)
})
t.Run("NodeByHash Success", func(t *testing.T) {
source, mock := createL2Source(t)
defer mock.AssertExpectations(t)
mock.ExpectNodeByHash(hash, data, nil)
actual, err := source.NodeByHash(ctx, hash)
require.NoError(t, err)
require.Equal(t, data, actual)
})
t.Run("NodeByHash Error", func(t *testing.T) {
source, mock := createL2Source(t)
defer mock.AssertExpectations(t)
expectedErr := errors.New("boom")
mock.ExpectNodeByHash(hash, nil, expectedErr)
mock.ExpectNodeByHash(hash, data, nil)
actual, err := source.NodeByHash(ctx, hash)
require.NoError(t, err)
require.Equal(t, data, actual)
})
t.Run("CodeByHash Success", func(t *testing.T) {
source, mock := createL2Source(t)
defer mock.AssertExpectations(t)
mock.ExpectCodeByHash(hash, data, nil)
actual, err := source.CodeByHash(ctx, hash)
require.NoError(t, err)
require.Equal(t, data, actual)
})
t.Run("CodeByHash Error", func(t *testing.T) {
source, mock := createL2Source(t)
defer mock.AssertExpectations(t)
expectedErr := errors.New("boom")
mock.ExpectCodeByHash(hash, nil, expectedErr)
mock.ExpectCodeByHash(hash, data, nil)
actual, err := source.CodeByHash(ctx, hash)
require.NoError(t, err)
require.Equal(t, data, actual)
})
}
func createL2Source(t *testing.T) (*RetryingL2Source, *MockL2Source) {
logger := testlog.Logger(t, log.LvlDebug)
mock := &MockL2Source{}
source := NewRetryingL2Source(logger, mock)
// Avoid sleeping in tests by using a fixed backoff strategy with no delay
source.strategy = backoff.Fixed(0)
return source, mock
}
type MockL2Source struct {
mock.Mock
}
func (m *MockL2Source) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) {
out := m.Mock.MethodCalled("InfoAndTxsByHash", blockHash)
return out[0].(eth.BlockInfo), out[1].(types.Transactions), *out[2].(*error)
}
func (m *MockL2Source) NodeByHash(ctx context.Context, hash common.Hash) ([]byte, error) {
out := m.Mock.MethodCalled("NodeByHash", hash)
return out[0].([]byte), *out[1].(*error)
}
func (m *MockL2Source) CodeByHash(ctx context.Context, hash common.Hash) ([]byte, error) {
out := m.Mock.MethodCalled("CodeByHash", hash)
return out[0].([]byte), *out[1].(*error)
}
func (m *MockL2Source) ExpectInfoAndTxsByHash(blockHash common.Hash, info eth.BlockInfo, txs types.Transactions, err error) {
m.Mock.On("InfoAndTxsByHash", blockHash).Once().Return(info, txs, &err)
}
func (m *MockL2Source) ExpectNodeByHash(hash common.Hash, node []byte, err error) {
m.Mock.On("NodeByHash", hash).Once().Return(node, &err)
}
func (m *MockL2Source) ExpectCodeByHash(hash common.Hash, code []byte, err error) {
m.Mock.On("CodeByHash", hash).Once().Return(code, &err)
}
var _ L2Source = (*MockL2Source)(nil)
......@@ -2,7 +2,7 @@ test:
go test -v ./...
lint:
golangci-lint run -E goimports,sqlclosecheck,bodyclose,asciicheck,misspell,errorlint -e "errors.As" -e "errors.Is" ./...
golangci-lint run -E asciicheck,goimports,misspell ./...
generate-mocks:
go generate ./...
......
package client
import (
"context"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/hashicorp/go-multierror"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-service/backoff"
)
var (
// ExponentialBackoff is the default backoff strategy.
ExponentialBackoff = backoff.Exponential()
)
// retryingClient wraps a [client.RPC] with a backoff strategy.
type retryingClient struct {
log log.Logger
c client.RPC
retryAttempts int
strategy backoff.Strategy
}
// NewRetryingClient creates a new retrying client.
// The backoff strategy is optional, if not provided, the default exponential backoff strategy is used.
func NewRetryingClient(logger log.Logger, c client.RPC, retries int, strategy ...backoff.Strategy) *retryingClient {
if len(strategy) == 0 {
strategy = []backoff.Strategy{ExponentialBackoff}
}
return &retryingClient{
log: logger,
c: c,
retryAttempts: retries,
strategy: strategy[0],
}
}
// BackoffStrategy returns the [backoff.Strategy] used by the client.
func (b *retryingClient) BackoffStrategy() backoff.Strategy {
return b.strategy
}
func (b *retryingClient) Close() {
b.c.Close()
}
func (b *retryingClient) CallContext(ctx context.Context, result any, method string, args ...any) error {
return backoff.DoCtx(ctx, b.retryAttempts, b.strategy, func() error {
cCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
err := b.c.CallContext(cCtx, result, method, args...)
if err != nil {
b.log.Warn("RPC request failed", "method", method, "err", err)
}
return err
})
}
// pendingReq combines BatchElem information with the index of this request in the original []rpc.BatchElem
type pendingReq struct {
// req is a copy of the BatchElem individual request to make.
// It never has Result or Error set as it gets copied again as part of being passed to the underlying client.
req rpc.BatchElem
// idx tracks the index of the original BatchElem in the supplied input array
// This can then be used to set the result on the original input
idx int
}
func (b *retryingClient) BatchCallContext(ctx context.Context, input []rpc.BatchElem) error {
// Add all BatchElem to the initial pending set
// Each time we retry, we'll remove successful BatchElem for this list so we only retry ones that fail.
pending := make([]*pendingReq, len(input))
for i, req := range input {
pending[i] = &pendingReq{
req: req,
idx: i,
}
}
return backoff.DoCtx(ctx, b.retryAttempts, b.strategy, func() error {
cCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
batch := make([]rpc.BatchElem, len(pending))
for i, req := range pending {
batch[i] = req.req
}
err := b.c.BatchCallContext(cCtx, batch)
if err != nil {
b.log.Warn("Batch request failed", "err", err)
// Whole call failed, retry all pending elems again
return err
}
var failed []*pendingReq
var combinedErr error
for i, elem := range batch {
req := pending[i]
idx := req.idx // Index into input of the original BatchElem
// Set the result on the original batch to pass back to the caller in case we stop retrying
input[idx].Error = elem.Error
input[idx].Result = elem.Result
// If the individual request failed, add it to the list to retry
if elem.Error != nil {
// Need to retry this request
failed = append(failed, req)
combinedErr = multierror.Append(elem.Error, combinedErr)
}
}
if len(failed) > 0 {
pending = failed
b.log.Warn("Batch request returned errors", "err", combinedErr)
return combinedErr
}
return nil
})
}
func (b *retryingClient) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) {
var sub ethereum.Subscription
err := backoff.DoCtx(ctx, b.retryAttempts, b.strategy, func() error {
var err error
sub, err = b.c.EthSubscribe(ctx, channel, args...)
if err != nil {
b.log.Warn("Subscription request failed", "err", err)
}
return err
})
return sub, err
}
package client_test
import (
"context"
"errors"
"testing"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-service/backoff"
"github.com/ethereum-optimism/optimism/op-service/client"
opclient "github.com/ethereum-optimism/optimism/op-node/client"
)
type MockRPC struct {
mock.Mock
}
func (m *MockRPC) Close() {
m.Called()
}
func (m *MockRPC) CallContext(ctx context.Context, result any, method string, args ...any) error {
out := m.Mock.MethodCalled("CallContext", ctx, result, method, args)
return *out[0].(*error)
}
func (m *MockRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
out := m.Mock.MethodCalled("BatchCallContext", ctx, b)
err, ok := out[0].(*error)
if ok {
return *err
}
return nil
}
func (m *MockRPC) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) {
out := m.Mock.MethodCalled("EthSubscribe", ctx, channel, args)
return *out[0].(*ethereum.Subscription), *out[1].(*error)
}
func (m *MockRPC) ExpectCallContext(err error, result any, method string, arg string) {
m.On("CallContext", mock.Anything, result, method, []interface{}{arg}).Return(&err)
}
func (m *MockRPC) ExpectBatchCallContext(err error, b []rpc.BatchElem) {
m.On("BatchCallContext", mock.Anything, b).Return(&err)
}
func (m *MockRPC) OnBatchCallContext(err error, b []rpc.BatchElem, action func(callBatches []rpc.BatchElem)) {
m.On("BatchCallContext", mock.Anything, b).Return(err).Run(func(args mock.Arguments) {
action(args[1].([]rpc.BatchElem))
})
}
func (m *MockRPC) ExpectEthSubscribe(sub ethereum.Subscription, err error, channel any, args ...any) {
m.On("EthSubscribe", mock.Anything, channel, args).Return(&sub, &err)
}
var _ opclient.RPC = (*MockRPC)(nil)
func TestClient_BackoffClient_Strategy(t *testing.T) {
mockRpc := &MockRPC{}
backoffClient := client.NewRetryingClient(testlog.Logger(t, log.LvlInfo), mockRpc, 0)
require.Equal(t, backoffClient.BackoffStrategy(), client.ExponentialBackoff)
fixedStrategy := &backoff.FixedStrategy{}
backoffClient = client.NewRetryingClient(testlog.Logger(t, log.LvlInfo), mockRpc, 0, fixedStrategy)
require.Equal(t, backoffClient.BackoffStrategy(), fixedStrategy)
}
func TestClient_BackoffClient_Close(t *testing.T) {
mockRpc := &MockRPC{}
mockRpc.On("Close").Return()
backoffClient := client.NewRetryingClient(testlog.Logger(t, log.LvlInfo), mockRpc, 0)
backoffClient.Close()
require.True(t, mockRpc.AssertCalled(t, "Close"))
}
func TestClient_BackoffClient_CallContext(t *testing.T) {
mockRpc := &MockRPC{}
mockRpc.ExpectCallContext(nil, nil, "foo", "bar")
backoffClient := client.NewRetryingClient(testlog.Logger(t, log.LvlInfo), mockRpc, 1)
err := backoffClient.CallContext(context.Background(), nil, "foo", "bar")
require.NoError(t, err)
require.True(t, mockRpc.AssertCalled(t, "CallContext", mock.Anything, nil, "foo", []interface{}{"bar"}))
}
func TestClient_BackoffClient_CallContext_WithRetries(t *testing.T) {
mockRpc := &MockRPC{}
mockRpc.ExpectCallContext(errors.New("foo"), nil, "foo", "bar")
backoffClient := client.NewRetryingClient(testlog.Logger(t, log.LvlInfo), mockRpc, 2, backoff.Fixed(0))
err := backoffClient.CallContext(context.Background(), nil, "foo", "bar")
require.Error(t, err)
require.True(t, mockRpc.AssertNumberOfCalls(t, "CallContext", 2))
}
func TestClient_BackoffClient_BatchCallContext(t *testing.T) {
mockRpc := &MockRPC{}
mockRpc.ExpectBatchCallContext(nil, []rpc.BatchElem{})
backoffClient := client.NewRetryingClient(testlog.Logger(t, log.LvlInfo), mockRpc, 1)
err := backoffClient.BatchCallContext(context.Background(), nil)
require.NoError(t, err)
require.True(t, mockRpc.AssertCalled(t, "BatchCallContext", mock.Anything, []rpc.BatchElem{}))
}
func TestClient_BackoffClient_BatchCallContext_WithRetries(t *testing.T) {
mockRpc := &MockRPC{}
mockRpc.ExpectBatchCallContext(errors.New("foo"), []rpc.BatchElem{})
backoffClient := client.NewRetryingClient(testlog.Logger(t, log.LvlInfo), mockRpc, 2, backoff.Fixed(0))
err := backoffClient.BatchCallContext(context.Background(), nil)
require.Error(t, err)
require.True(t, mockRpc.AssertNumberOfCalls(t, "BatchCallContext", 2))
}
func TestClient_BackoffClient_BatchCallContext_WithPartialRetries(t *testing.T) {
batches := []rpc.BatchElem{
{Method: "0"},
{Method: "1"},
{Method: "2"},
}
mockRpc := &MockRPC{}
mockRpc.OnBatchCallContext(nil, batches, func(batch []rpc.BatchElem) {
batch[0].Result = batch[0].Method
batch[1].Error = errors.New("boom")
batch[2].Error = errors.New("boom")
})
mockRpc.OnBatchCallContext(nil, []rpc.BatchElem{batches[1], batches[2]}, func(batch []rpc.BatchElem) {
batch[0].Error = errors.New("boom again")
batch[1].Result = batch[1].Method
})
backoffClient := client.NewRetryingClient(testlog.Logger(t, log.LvlInfo), mockRpc, 2, backoff.Fixed(0))
err := backoffClient.BatchCallContext(context.Background(), batches)
require.Error(t, err)
require.True(t, mockRpc.AssertNumberOfCalls(t, "BatchCallContext", 2))
// Check our original batches got updated correctly
require.Equal(t, rpc.BatchElem{Method: "0", Result: "0"}, batches[0])
require.Equal(t, rpc.BatchElem{Method: "1", Result: nil, Error: errors.New("boom again")}, batches[1])
require.Equal(t, rpc.BatchElem{Method: "2", Result: "2"}, batches[2])
}
func TestClient_BackoffClient_BatchCallContext_WithPartialRetriesUntilSuccess(t *testing.T) {
batches := []rpc.BatchElem{
{Method: "0"},
{Method: "1"},
{Method: "2"},
}
mockRpc := &MockRPC{}
mockRpc.OnBatchCallContext(nil, batches, func(batch []rpc.BatchElem) {
batch[0].Result = batch[0].Method
batch[1].Error = errors.New("boom")
batch[2].Error = errors.New("boom")
})
mockRpc.OnBatchCallContext(nil, []rpc.BatchElem{batches[1], batches[2]}, func(batch []rpc.BatchElem) {
batch[0].Error = errors.New("boom again")
batch[1].Result = batch[1].Method
})
mockRpc.OnBatchCallContext(nil, []rpc.BatchElem{batches[1]}, func(batch []rpc.BatchElem) {
batch[0].Result = batch[0].Method
})
backoffClient := client.NewRetryingClient(testlog.Logger(t, log.LvlInfo), mockRpc, 4, backoff.Fixed(0))
err := backoffClient.BatchCallContext(context.Background(), batches)
require.NoError(t, err)
require.True(t, mockRpc.AssertNumberOfCalls(t, "BatchCallContext", 3))
// Check our original batches got updated correctly
require.Equal(t, rpc.BatchElem{Method: "0", Result: "0"}, batches[0])
require.Equal(t, rpc.BatchElem{Method: "1", Result: "1"}, batches[1])
require.Equal(t, rpc.BatchElem{Method: "2", Result: "2"}, batches[2])
}
func TestClient_BackoffClient_EthSubscribe(t *testing.T) {
mockRpc := &MockRPC{}
mockRpc.ExpectEthSubscribe(ethereum.Subscription(nil), nil, nil, "foo", "bar")
backoffClient := client.NewRetryingClient(testlog.Logger(t, log.LvlInfo), mockRpc, 1)
_, err := backoffClient.EthSubscribe(context.Background(), nil, "foo", "bar")
require.NoError(t, err)
require.True(t, mockRpc.AssertCalled(t, "EthSubscribe", mock.Anything, nil, []interface{}{"foo", "bar"}))
}
func TestClient_BackoffClient_EthSubscribe_WithRetries(t *testing.T) {
mockRpc := &MockRPC{}
mockRpc.ExpectEthSubscribe(ethereum.Subscription(nil), errors.New("foo"), nil, "foo", "bar")
backoffClient := client.NewRetryingClient(testlog.Logger(t, log.LvlInfo), mockRpc, 2, backoff.Fixed(0))
_, err := backoffClient.EthSubscribe(context.Background(), nil, "foo", "bar")
require.Error(t, err)
require.True(t, mockRpc.AssertNumberOfCalls(t, "EthSubscribe", 2))
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment