Commit 208de9ed authored by Adrian Sutton's avatar Adrian Sutton

op-service: Update retryingClient to handle individual elements in a batch failing

Only the individual elements that fail are retried to avoid double execution.
parent 129032f1
......@@ -2,7 +2,7 @@ test:
go test -v ./...
lint:
golangci-lint run -E asciicheck,goimports,misspell ./...
golangci-lint run -E goimports,sqlclosecheck,bodyclose,asciicheck,misspell,errorlint -e "errors.As" -e "errors.Is" ./...
generate-mocks:
go generate ./...
......
......@@ -6,6 +6,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
"github.com/hashicorp/go-multierror"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-service/backoff"
......@@ -53,12 +54,62 @@ func (b *retryingClient) CallContext(ctx context.Context, result any, method str
})
}
func (b *retryingClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error {
// pendingReq combines BatchElem information with the index of this request in the original []rpc.BatchElem
type pendingReq struct {
// req is a copy of the BatchElem individual request to make.
// It never has Result or Error set as it gets copied again as part of being passed to the underlying client.
req rpc.BatchElem
// idx tracks the index of the original BatchElem in the supplied input array
// This can then be used to set the result on the original input
idx int
}
func (b *retryingClient) BatchCallContext(ctx context.Context, input []rpc.BatchElem) error {
// Add all BatchElem to the initial pending set
// Each time we retry, we'll remove successful BatchElem for this list so we only retry ones that fail.
pending := make([]*pendingReq, len(input))
for i, req := range input {
pending[i] = &pendingReq{
req: req,
idx: i,
}
}
return backoff.DoCtx(ctx, b.retryAttempts, b.strategy, func() error {
cCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
batch := make([]rpc.BatchElem, len(pending))
for i, req := range pending {
batch[i] = req.req
}
err := b.c.BatchCallContext(cCtx, batch)
return err
if err != nil {
// Whole call failed, retry all pending elems again
return err
}
var failed []*pendingReq
var combinedErr error
for i, elem := range batch {
req := pending[i]
idx := req.idx // Index into input of the original BatchElem
// Set the result on the original batch to pass back to the caller in case we stop retrying
input[idx].Error = elem.Error
input[idx].Result = elem.Result
// If the individual request failed, add it to the list to retry
if elem.Error != nil {
// Need to retry this request
failed = append(failed, req)
combinedErr = multierror.Append(elem.Error, combinedErr)
}
}
if len(failed) > 0 {
pending = failed
return combinedErr
}
return nil
})
}
......
......@@ -32,7 +32,11 @@ func (m *MockRPC) CallContext(ctx context.Context, result any, method string, ar
func (m *MockRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
out := m.Mock.MethodCalled("BatchCallContext", ctx, b)
return *out[0].(*error)
err, ok := out[0].(*error)
if ok {
return *err
}
return nil
}
func (m *MockRPC) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) {
......@@ -48,6 +52,12 @@ func (m *MockRPC) ExpectBatchCallContext(err error, b []rpc.BatchElem) {
m.On("BatchCallContext", mock.Anything, b).Return(&err)
}
func (m *MockRPC) OnBatchCallContext(err error, b []rpc.BatchElem, action func(callBatches []rpc.BatchElem)) {
m.On("BatchCallContext", mock.Anything, b).Return(err).Run(func(args mock.Arguments) {
action(args[1].([]rpc.BatchElem))
})
}
func (m *MockRPC) ExpectEthSubscribe(sub ethereum.Subscription, err error, channel any, args ...any) {
m.On("EthSubscribe", mock.Anything, channel, args).Return(&sub, &err)
}
......@@ -84,7 +94,7 @@ func TestClient_BackoffClient_CallContext(t *testing.T) {
func TestClient_BackoffClient_CallContext_WithRetries(t *testing.T) {
mockRpc := &MockRPC{}
mockRpc.ExpectCallContext(errors.New("foo"), nil, "foo", "bar")
backoffClient := client.NewRetryingClient(mockRpc, 2)
backoffClient := client.NewRetryingClient(mockRpc, 2, backoff.Fixed(0))
err := backoffClient.CallContext(context.Background(), nil, "foo", "bar")
require.Error(t, err)
require.True(t, mockRpc.AssertNumberOfCalls(t, "CallContext", 2))
......@@ -92,22 +102,79 @@ func TestClient_BackoffClient_CallContext_WithRetries(t *testing.T) {
func TestClient_BackoffClient_BatchCallContext(t *testing.T) {
mockRpc := &MockRPC{}
mockRpc.ExpectBatchCallContext(nil, []rpc.BatchElem(nil))
mockRpc.ExpectBatchCallContext(nil, []rpc.BatchElem{})
backoffClient := client.NewRetryingClient(mockRpc, 1)
err := backoffClient.BatchCallContext(context.Background(), nil)
require.NoError(t, err)
require.True(t, mockRpc.AssertCalled(t, "BatchCallContext", mock.Anything, []rpc.BatchElem(nil)))
require.True(t, mockRpc.AssertCalled(t, "BatchCallContext", mock.Anything, []rpc.BatchElem{}))
}
func TestClient_BackoffClient_BatchCallContext_WithRetries(t *testing.T) {
mockRpc := &MockRPC{}
mockRpc.ExpectBatchCallContext(errors.New("foo"), []rpc.BatchElem(nil))
backoffClient := client.NewRetryingClient(mockRpc, 2)
mockRpc.ExpectBatchCallContext(errors.New("foo"), []rpc.BatchElem{})
backoffClient := client.NewRetryingClient(mockRpc, 2, backoff.Fixed(0))
err := backoffClient.BatchCallContext(context.Background(), nil)
require.Error(t, err)
require.True(t, mockRpc.AssertNumberOfCalls(t, "BatchCallContext", 2))
}
func TestClient_BackoffClient_BatchCallContext_WithPartialRetries(t *testing.T) {
batches := []rpc.BatchElem{
{Method: "0"},
{Method: "1"},
{Method: "2"},
}
mockRpc := &MockRPC{}
mockRpc.OnBatchCallContext(nil, batches, func(batch []rpc.BatchElem) {
batch[0].Result = batch[0].Method
batch[1].Error = errors.New("boom")
batch[2].Error = errors.New("boom")
})
mockRpc.OnBatchCallContext(nil, []rpc.BatchElem{batches[1], batches[2]}, func(batch []rpc.BatchElem) {
batch[0].Error = errors.New("boom again")
batch[1].Result = batch[1].Method
})
backoffClient := client.NewRetryingClient(mockRpc, 2, backoff.Fixed(0))
err := backoffClient.BatchCallContext(context.Background(), batches)
require.Error(t, err)
require.True(t, mockRpc.AssertNumberOfCalls(t, "BatchCallContext", 2))
// Check our original batches got updated correctly
require.Equal(t, rpc.BatchElem{Method: "0", Result: "0"}, batches[0])
require.Equal(t, rpc.BatchElem{Method: "1", Result: nil, Error: errors.New("boom again")}, batches[1])
require.Equal(t, rpc.BatchElem{Method: "2", Result: "2"}, batches[2])
}
func TestClient_BackoffClient_BatchCallContext_WithPartialRetriesUntilSuccess(t *testing.T) {
batches := []rpc.BatchElem{
{Method: "0"},
{Method: "1"},
{Method: "2"},
}
mockRpc := &MockRPC{}
mockRpc.OnBatchCallContext(nil, batches, func(batch []rpc.BatchElem) {
batch[0].Result = batch[0].Method
batch[1].Error = errors.New("boom")
batch[2].Error = errors.New("boom")
})
mockRpc.OnBatchCallContext(nil, []rpc.BatchElem{batches[1], batches[2]}, func(batch []rpc.BatchElem) {
batch[0].Error = errors.New("boom again")
batch[1].Result = batch[1].Method
})
mockRpc.OnBatchCallContext(nil, []rpc.BatchElem{batches[1]}, func(batch []rpc.BatchElem) {
batch[0].Result = batch[0].Method
})
backoffClient := client.NewRetryingClient(mockRpc, 4, backoff.Fixed(0))
err := backoffClient.BatchCallContext(context.Background(), batches)
require.NoError(t, err)
require.True(t, mockRpc.AssertNumberOfCalls(t, "BatchCallContext", 3))
// Check our original batches got updated correctly
require.Equal(t, rpc.BatchElem{Method: "0", Result: "0"}, batches[0])
require.Equal(t, rpc.BatchElem{Method: "1", Result: "1"}, batches[1])
require.Equal(t, rpc.BatchElem{Method: "2", Result: "2"}, batches[2])
}
func TestClient_BackoffClient_EthSubscribe(t *testing.T) {
mockRpc := &MockRPC{}
mockRpc.ExpectEthSubscribe(ethereum.Subscription(nil), nil, nil, "foo", "bar")
......@@ -120,7 +187,7 @@ func TestClient_BackoffClient_EthSubscribe(t *testing.T) {
func TestClient_BackoffClient_EthSubscribe_WithRetries(t *testing.T) {
mockRpc := &MockRPC{}
mockRpc.ExpectEthSubscribe(ethereum.Subscription(nil), errors.New("foo"), nil, "foo", "bar")
backoffClient := client.NewRetryingClient(mockRpc, 2)
backoffClient := client.NewRetryingClient(mockRpc, 2, backoff.Fixed(0))
_, err := backoffClient.EthSubscribe(context.Background(), nil, "foo", "bar")
require.Error(t, err)
require.True(t, mockRpc.AssertNumberOfCalls(t, "EthSubscribe", 2))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment