Commit 2106bdb7 authored by Diederik Loerakker's avatar Diederik Loerakker Committed by GitHub

op-node: remove parallel batch fetching / deadlock, introduce iterative batch fetching (#3217)

* op-node: remove parallel batch fetching, introduce iterative batch fetching and receipts fetcher

* op-node: fix receipts fetching cache issue - reset fetcher when encountering error in final results

* receipts fetching improvement

* op-node: verify receipts with pure function, generalize batch fetching code more
parent ed3a6b08
......@@ -2,7 +2,9 @@ package eth
import (
"bytes"
"context"
"fmt"
"io"
"math/big"
"reflect"
......@@ -252,3 +254,39 @@ type ForkchoiceUpdatedResult struct {
// the payload id if requested
PayloadID *PayloadID `json:"payloadId"`
}
// ReceiptsFetcher fetches receipts of a block,
// and enables the caller to parallelize fetching and backoff on fetching errors as needed.
type ReceiptsFetcher interface {
// Reset clears the previously fetched results for a fresh re-attempt.
Reset()
// Fetch retrieves receipts in batches, until it returns io.EOF to indicate completion.
Fetch(ctx context.Context) error
// Complete indicates when all data has been fetched.
Complete() bool
// Result returns the receipts, or an error if the Fetch-ing is not Complete,
// or an error if the results are invalid.
// If an error is returned, the fetcher is Reset automatically.
Result() (types.Receipts, error)
}
// FetchedReceipts is a simple util to implement the ReceiptsFetcher with readily available receipts.
type FetchedReceipts types.Receipts
func (f FetchedReceipts) Reset() {
// nothing to reset
}
func (f FetchedReceipts) Fetch(ctx context.Context) error {
return io.EOF
}
func (f FetchedReceipts) Complete() bool {
return true
}
func (f FetchedReceipts) Result() (types.Receipts, error) {
return types.Receipts(f), nil
}
var _ ReceiptsFetcher = (FetchedReceipts)(nil)
......@@ -2,134 +2,181 @@ package l1
import (
"context"
"errors"
"fmt"
"time"
"io"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/hashicorp/go-multierror"
)
var (
TooManyRetries = errors.New("too many retries")
)
// IterativeBatchCall is an util to create a job to fetch many RPC requests in batches,
// and enable the caller to parallelize easily and safely, handle and re-try errors,
// and pick a batch size all by simply calling Fetch again and again until it returns io.EOF.
type IterativeBatchCall[K any, V any, O any] struct {
completed uint32 // tracks how far to completing all requests we are
resetLock sync.RWMutex // ensures we do not concurrently read (incl. fetch) / reset
// parallelBatchCall creates a drop-in replacement for the standard batchCallContextFn that splits requests into more batch requests, and will parallelize and retry as configured.
func parallelBatchCall(log log.Logger, getBatch batchCallContextFn, maxRetry int, maxPerBatch int, maxParallel int) batchCallContextFn {
return func(ctx context.Context, requests []rpc.BatchElem) error {
return fetchBatched(ctx, log, requests, getBatch, maxRetry, maxPerBatch, maxParallel)
}
}
requestsKeys []K
batchSize int
type batchResult struct {
failed []rpc.BatchElem // if anything has to be retried
err error // if the batch as a whole failed
success int // amount of items that completed successfully
}
makeRequest func(K) (V, rpc.BatchElem)
makeResults func([]K, []V) (O, error)
getBatch batchCallContextFn
// fetchBatched fetches the given requests in batches of at most maxPerBatch elements, and with at most maxRetry retries per batch.
// Batch requests may be split into maxParallel go-routines.
// Retries only apply to individual request errors, not to the outer batch-requests that combine them into batches.
func fetchBatched(ctx context.Context, log log.Logger, requests []rpc.BatchElem, getBatch batchCallContextFn, maxRetry int, maxPerBatch int, maxParallel int) error {
batchRequest := func(ctx context.Context, missing []rpc.BatchElem) (failed []rpc.BatchElem, err error) {
if err := getBatch(ctx, missing); err != nil {
return nil, fmt.Errorf("failed batch-retrieval: %w", err)
}
for _, elem := range missing {
if elem.Error != nil {
log.Trace("batch request element failed", "err", elem.Error, "elem", elem.Args[0])
elem.Error = nil // reset, we'll try this element again
failed = append(failed, elem)
continue
}
}
return failed, nil
}
requestsValues []V
scheduled chan rpc.BatchElem
// limit capacity, don't write to underlying array on retries
requests = requests[:len(requests):len(requests)]
results *O
}
expectedBatches := (len(requests) + maxPerBatch - 1) / maxPerBatch
// NewIterativeBatchCall constructs a batch call, fetching the values with the given keys,
// and transforms them into a verified final result.
func NewIterativeBatchCall[K any, V any, O any](
requestsKeys []K,
makeRequest func(K) (V, rpc.BatchElem),
makeResults func([]K, []V) (O, error),
getBatch batchCallContextFn,
batchSize int) *IterativeBatchCall[K, V, O] {
// don't need more go-routines than requests
if maxParallel > expectedBatches {
maxParallel = expectedBatches
if len(requestsKeys) < batchSize {
batchSize = len(requestsKeys)
}
if batchSize < 1 {
batchSize = 1
}
// capacity is sufficient for no go-routine to get stuck on writing
completed := make(chan batchResult, maxParallel)
out := &IterativeBatchCall[K, V, O]{
completed: 0,
getBatch: getBatch,
requestsKeys: requestsKeys,
batchSize: batchSize,
makeRequest: makeRequest,
makeResults: makeResults,
}
out.Reset()
return out
}
// queue of tasks for worker go-routines
batchRequests := make(chan []rpc.BatchElem, maxParallel)
defer close(batchRequests)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Reset will clear the batch call, to start fetching all contents from scratch.
func (ibc *IterativeBatchCall[K, V, O]) Reset() {
ibc.resetLock.Lock()
defer ibc.resetLock.Unlock()
// starts worker go-routines. Closed when task channel closes
for i := 0; i < maxParallel; i++ {
go func(ctx context.Context) {
for {
batch, ok := <-batchRequests
if !ok {
return // no more batches left
}
failed, err := batchRequest(ctx, batch)
completed <- batchResult{failed: failed, err: err, success: len(batch) - len(failed)}
}
}(ctx)
scheduled := make(chan rpc.BatchElem, len(ibc.requestsKeys))
requestsValues := make([]V, len(ibc.requestsKeys))
for i, k := range ibc.requestsKeys {
v, r := ibc.makeRequest(k)
requestsValues[i] = v
scheduled <- r
}
parallelRequests := func() int {
// we split the requests into parallel batch requests, and count how many
i := 0
for ; i < maxParallel && len(requests) > 0; i++ {
nextBatch := requests
if len(nextBatch) > maxPerBatch {
nextBatch = requests[:maxPerBatch]
ibc.requestsValues = requestsValues
ibc.scheduled = scheduled
if len(ibc.requestsKeys) == 0 {
close(ibc.scheduled)
}
// don't retry this batch of requests again, unless we add them back
requests = requests[len(nextBatch):]
}
// schedule the batch, this may block if all workers are busy and the queue is full
batchRequests <- nextBatch
// Fetch fetches more of the data, and returns io.EOF when all data has been fetched.
// This method is safe to call concurrently: it will parallelize the fetching work.
// If no work is available, but the fetching is not done yet,
// then Fetch will block until the next thing can be fetched, or until the context expires.
func (ibc *IterativeBatchCall[K, V, O]) Fetch(ctx context.Context) error {
ibc.resetLock.RLock()
defer ibc.resetLock.RUnlock()
// collect a batch from the requests channel
batch := make([]rpc.BatchElem, 0, ibc.batchSize)
// wait for first element
select {
case reqElem, ok := <-ibc.scheduled:
if !ok { // no more requests to do
return io.EOF
}
return i
batch = append(batch, reqElem)
case <-ctx.Done():
return ctx.Err()
}
maxCount := expectedBatches * maxRetry
awaited := len(requests)
// start initial round of parallel requests
count := parallelRequests()
// We slow down additional batch requests to not spam the server.
retryTicker := time.NewTicker(time.Millisecond * 20)
defer retryTicker.Stop()
// The main requests slice is only ever mutated by the go-routine running this loop.
// Slices of this are sent to worker go-routines, and never overwritten with different requests.
// collect more elements, if there are any.
for {
// check if we've all results back successfully
if awaited <= 0 {
return nil
}
if count > maxCount {
return TooManyRetries
if len(batch) >= ibc.batchSize {
break
}
select {
case <-retryTicker.C:
count += parallelRequests() // retry batch-requests on interval
case result := <-completed:
if result.err != nil {
// batch failed, RPC may be broken, abort
return fmt.Errorf("batch request failed: %w", result.err)
}
// if any element failed, add it to the requests for re-attempt
requests = append(requests, result.failed...)
awaited -= result.success
case reqElem, ok := <-ibc.scheduled:
if !ok { // no more requests to do
return io.EOF
}
batch = append(batch, reqElem)
continue
case <-ctx.Done():
for _, r := range batch {
ibc.scheduled <- r
}
return ctx.Err()
default:
break
}
break
}
if err := ibc.getBatch(ctx, batch); err != nil {
for _, r := range batch {
ibc.scheduled <- r
}
return fmt.Errorf("failed batch-retrieval: %w", err)
}
var result error
for _, elem := range batch {
if elem.Error != nil {
result = multierror.Append(result, elem.Error)
elem.Error = nil // reset, we'll try this element again
ibc.scheduled <- elem
continue
} else {
atomic.AddUint32(&ibc.completed, 1)
if atomic.LoadUint32(&ibc.completed) >= uint32(len(ibc.requestsKeys)) {
close(ibc.scheduled)
return io.EOF
}
}
}
return result
}
// Complete indicates if the batch call is done.
func (ibc *IterativeBatchCall[K, V, O]) Complete() bool {
ibc.resetLock.RLock()
defer ibc.resetLock.RUnlock()
return atomic.LoadUint32(&ibc.completed) >= uint32(len(ibc.requestsKeys))
}
// Result returns the fetched values, checked and transformed to the final output type, if available.
// If the check fails, the IterativeBatchCall will Reset itself, to be ready for a re-attempt in fetching new data.
func (ibc *IterativeBatchCall[K, V, O]) Result() (O, error) {
ibc.resetLock.RLock()
if atomic.LoadUint32(&ibc.completed) < uint32(len(ibc.requestsKeys)) {
ibc.resetLock.RUnlock()
return *new(O), fmt.Errorf("results not available yet, Fetch more first")
}
if ibc.results != nil {
ibc.resetLock.RUnlock()
return *ibc.results, nil
}
out, err := ibc.makeResults(ibc.requestsKeys, ibc.requestsValues)
ibc.resetLock.RUnlock()
if err != nil {
// start over
ibc.Reset()
} else {
// cache the valid results
ibc.resetLock.Lock()
ibc.results = &out
ibc.resetLock.Unlock()
}
return out, err
}
......@@ -4,13 +4,13 @@ import (
"context"
"errors"
"fmt"
"io"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
......@@ -21,66 +21,118 @@ type elemCall struct {
type batchCall struct {
elems []elemCall
err error
rpcErr error
err string
// Artificial delay to add before returning the call
duration time.Duration
makeCtx func() context.Context
}
type batchTestCase struct {
name string
items int
batchCalls []batchCall
err error
batchSize int
maxRetry int
maxPerBatch int
maxParallel int
batchCalls []batchCall
mock.Mock
}
func (tc *batchTestCase) Inputs() []rpc.BatchElem {
out := make([]rpc.BatchElem, tc.items)
for i := 0; i < tc.items; i++ {
out[i] = rpc.BatchElem{
func makeTestRequest(i int) (*string, rpc.BatchElem) {
out := new(string)
return out, rpc.BatchElem{
Method: "testing_foobar",
Args: []interface{}{i},
Result: nil,
Result: out,
Error: nil,
}
}
func makeTestResults() func(keys []int, values []*string) ([]*string, error) {
return func(keys []int, values []*string) ([]*string, error) {
return values, nil
}
return out
}
func (tc *batchTestCase) GetBatch(ctx context.Context, b []rpc.BatchElem) error {
if ctx.Err() != nil {
return ctx.Err()
}
return tc.Mock.MethodCalled("get", b).Get(0).([]error)[0]
}
func (tc *batchTestCase) Run(t *testing.T) {
requests := tc.Inputs()
var mockErr = errors.New("mockErr")
// mock all the results of the batch calls
for bci, b := range tc.batchCalls {
batchCall := b
var batch []rpc.BatchElem
for _, elem := range batchCall.elems {
batch = append(batch, requests[elem.id])
func (tc *batchTestCase) Run(t *testing.T) {
keys := make([]int, tc.items)
for i := 0; i < tc.items; i++ {
keys[i] = i
}
tc.On("get", batch).Run(func(args mock.Arguments) {
makeMock := func(bci int, bc batchCall) func(args mock.Arguments) {
return func(args mock.Arguments) {
batch := args[0].([]rpc.BatchElem)
for i := range batch {
if batchCall.elems[i].err {
batch[i].Error = fmt.Errorf("mock err batch-call %d, elem call %d", bci, i)
batch[i].Result = nil
for i, elem := range batch {
id := elem.Args[0].(int)
expectedID := bc.elems[i].id
require.Equal(t, expectedID, id, "batch element should match expected batch element")
if bc.elems[i].err {
batch[i].Error = mockErr
*batch[i].Result.(*string) = ""
} else {
batch[i].Error = nil
batch[i].Result = fmt.Sprintf("mock result batch-call %d, elem call %d", bci, i)
*batch[i].Result.(*string) = fmt.Sprintf("mock result id %d", id)
}
}
time.Sleep(bc.duration)
}
}
// mock all the results of the batch calls
for bci, bc := range tc.batchCalls {
var batch []rpc.BatchElem
for _, elem := range bc.elems {
batch = append(batch, rpc.BatchElem{
Method: "testing_foobar",
Args: []interface{}{elem.id},
Result: new(string),
Error: nil,
})
}
if len(bc.elems) > 0 {
tc.On("get", batch).Once().Run(makeMock(bci, bc)).Return([]error{bc.rpcErr}) // wrap to preserve nil as type of error
}
}).Return([]error{batchCall.err}) // wrap to preserve nil as type of error
}
iter := NewIterativeBatchCall[int, *string, []*string](keys, makeTestRequest, makeTestResults(), tc.GetBatch, tc.batchSize)
for i, bc := range tc.batchCalls {
ctx := context.Background()
if bc.makeCtx != nil {
ctx = bc.makeCtx()
}
err := fetchBatched(context.Background(), testlog.Logger(t, log.LvlError), requests, tc.GetBatch, tc.maxRetry, tc.maxPerBatch, tc.maxParallel)
assert.Equal(t, err, tc.err)
err := iter.Fetch(ctx)
if err == io.EOF {
require.Equal(t, i, len(tc.batchCalls)-1, "EOF only on last call")
} else {
require.False(t, iter.Complete())
if bc.err == "" {
require.NoError(t, err)
} else {
require.ErrorContains(t, err, bc.err)
}
}
}
require.True(t, iter.Complete(), "batch iter should be complete after the expected calls")
out, err := iter.Result()
require.NoError(t, err)
for i, v := range out {
require.NotNil(t, v)
require.Equal(t, fmt.Sprintf("mock result id %d", i), *v)
}
out2, err := iter.Result()
require.NoError(t, err)
require.Equal(t, out, out2, "cached result should match")
require.Equal(t, io.EOF, iter.Fetch(context.Background()), "fetch after completion should EOF")
tc.AssertExpectations(t)
}
......@@ -91,14 +143,11 @@ func TestFetchBatched(t *testing.T) {
name: "empty",
items: 0,
batchCalls: []batchCall{},
err: nil,
maxRetry: 3,
maxPerBatch: 10,
maxParallel: 10,
},
{
name: "simple",
items: 4,
batchSize: 4,
batchCalls: []batchCall{
{
elems: []elemCall{
......@@ -107,17 +156,14 @@ func TestFetchBatched(t *testing.T) {
{id: 2, err: false},
{id: 3, err: false},
},
err: nil,
err: "",
},
},
err: nil,
maxRetry: 3,
maxPerBatch: 10,
maxParallel: 10,
},
{
name: "split",
items: 5,
batchSize: 3,
batchCalls: []batchCall{
{
elems: []elemCall{
......@@ -125,173 +171,103 @@ func TestFetchBatched(t *testing.T) {
{id: 1, err: false},
{id: 2, err: false},
},
err: nil,
err: "",
},
{
elems: []elemCall{
{id: 3, err: false},
{id: 4, err: false},
},
err: nil,
},
},
err: nil,
maxRetry: 2,
maxPerBatch: 3,
maxParallel: 10,
},
{
name: "batch split and parallel constrain",
items: 3,
batchCalls: []batchCall{
{
elems: []elemCall{
{id: 0, err: false},
err: "",
},
err: nil,
},
{
elems: []elemCall{
{id: 1, err: false},
},
err: nil,
},
{
elems: []elemCall{
{id: 2, err: false},
},
err: nil,
},
},
err: nil,
maxRetry: 2,
maxPerBatch: 1,
maxParallel: 2,
},
{
name: "efficient retry",
items: 5,
items: 7,
batchSize: 2,
batchCalls: []batchCall{
{
elems: []elemCall{
{id: 0, err: false},
{id: 1, err: true},
},
err: nil,
err: "1 error occurred:",
},
{
elems: []elemCall{
{id: 2, err: false},
{id: 3, err: false},
},
err: nil,
err: "",
},
{
elems: []elemCall{
elems: []elemCall{ // in-process before retry even happens
{id: 4, err: false},
{id: 1, err: false},
{id: 5, err: false},
},
err: "",
},
err: nil,
{
elems: []elemCall{
{id: 6, err: false},
{id: 1, err: false}, // includes the element to retry
},
err: "",
},
},
err: nil,
maxRetry: 2,
maxPerBatch: 2,
maxParallel: 2,
},
{
name: "repeated sequential retries",
items: 3,
items: 2,
batchSize: 2,
batchCalls: []batchCall{
{
elems: []elemCall{
{id: 0, err: false},
{id: 0, err: true},
{id: 1, err: true},
},
err: nil,
err: "2 errors occurred:",
},
{
elems: []elemCall{
{id: 2, err: false},
{id: 0, err: false},
{id: 1, err: true},
},
err: nil,
err: "1 error occurred:",
},
{
elems: []elemCall{
{id: 1, err: false},
},
err: nil,
err: "",
},
},
err: nil,
maxRetry: 2,
maxPerBatch: 2,
maxParallel: 1,
},
{
name: "too many retries",
items: 3,
name: "context timeout",
items: 1,
batchSize: 3,
batchCalls: []batchCall{
{
elems: []elemCall{
{id: 0, err: false},
{id: 1, err: true},
},
err: nil,
},
{
elems: []elemCall{
{id: 2, err: false},
{id: 1, err: true},
elems: nil,
err: context.Canceled.Error(),
makeCtx: func() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ctx
},
err: nil,
},
{
elems: []elemCall{
{id: 1, err: true},
{id: 0, err: false},
},
err: nil,
err: "",
},
},
err: TooManyRetries,
maxRetry: 2,
maxPerBatch: 2,
maxParallel: 1,
},
}
for _, tc := range testCases {
t.Run(tc.name, tc.Run)
}
}
type parentErrBatchTestCase struct {
mock.Mock
}
func (c *parentErrBatchTestCase) GetBatch(ctx context.Context, b []rpc.BatchElem) error {
return c.Mock.MethodCalled("get", b).Get(0).([]error)[0]
}
func (c *parentErrBatchTestCase) Run(t *testing.T) {
var requests []rpc.BatchElem
for i := 0; i < 2; i++ {
requests = append(requests, rpc.BatchElem{
Method: "testing",
Args: []interface{}{i},
})
}
// shouldn't retry if it's an error on the actual request
expErr := errors.New("fail")
c.On("get", requests).Run(func(args mock.Arguments) {
}).Return([]error{expErr})
err := fetchBatched(context.Background(), testlog.Logger(t, log.LvlError), requests, c.GetBatch, 2, 2, 1)
assert.ErrorIs(t, err, expErr)
c.AssertExpectations(t)
}
func TestFetchBatchedContextTimeout(t *testing.T) {
var c parentErrBatchTestCase
c.Run(t)
}
package l1
import (
"context"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/eth"
......@@ -13,29 +12,16 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)
// fetchReceipts fetches the receipts of the transactions using RPC batching, verifies if the receipts are complete and correct, and then returns results
func fetchReceipts(ctx context.Context, block eth.BlockID, receiptHash common.Hash, txs types.Transactions, getBatch batchCallContextFn) (types.Receipts, error) {
if len(txs) == 0 {
func makeReceiptsFn(block eth.BlockID, receiptHash common.Hash) func(txHashes []common.Hash, receipts []*types.Receipt) (types.Receipts, error) {
return func(txHashes []common.Hash, receipts []*types.Receipt) (types.Receipts, error) {
if len(receipts) != len(txHashes) {
return nil, fmt.Errorf("got %d receipts but expected %d", len(receipts), len(txHashes))
}
if len(txHashes) == 0 {
if receiptHash != types.EmptyRootHash {
return nil, fmt.Errorf("no transactions, but got non-empty receipt trie root: %s", receiptHash)
}
return nil, nil
}
receipts := make([]*types.Receipt, len(txs))
receiptRequests := make([]rpc.BatchElem, len(txs))
for i := 0; i < len(txs); i++ {
receipts[i] = new(types.Receipt)
receiptRequests[i] = rpc.BatchElem{
Method: "eth_getTransactionReceipt",
Args: []interface{}{txs[i].Hash()},
Result: &receipts[i], // receipt may become nil, double pointer is intentional
}
}
if err := getBatch(ctx, receiptRequests); err != nil {
return nil, fmt.Errorf("failed to fetch batch of receipts: %v", err)
}
// We don't trust the RPC to provide consistent cached receipt info that we use for critical rollup derivation work.
// Let's check everything quickly.
logIndex := uint(0)
......@@ -65,8 +51,8 @@ func fetchReceipts(ctx context.Context, block eth.BlockID, receiptHash common.Ha
if log.BlockNumber != block.Number {
return nil, fmt.Errorf("log %d of block %d has unexpected block number %d", log.Index, block.Number, log.BlockNumber)
}
if h := txs[i].Hash(); log.TxHash != h {
return nil, fmt.Errorf("log %d of tx %s has unexpected tx hash %s", log.Index, h, log.TxHash)
if log.TxHash != txHashes[i] {
return nil, fmt.Errorf("log %d of tx %s has unexpected tx hash %s", log.Index, txHashes[i], log.TxHash)
}
if log.Removed {
return nil, fmt.Errorf("canonical log (%d) must never be removed due to reorg", log.Index)
......@@ -83,4 +69,25 @@ func fetchReceipts(ctx context.Context, block eth.BlockID, receiptHash common.Ha
return nil, fmt.Errorf("failed to fetch list of receipts: expected receipt root %s but computed %s from retrieved receipts", receiptHash, computed)
}
return receipts, nil
}
}
func makeReceiptRequest(txHash common.Hash) (*types.Receipt, rpc.BatchElem) {
out := new(types.Receipt)
return out, rpc.BatchElem{
Method: "eth_getTransactionReceipt",
Args: []interface{}{txHash},
Result: &out, // receipt may become nil, double pointer is intentional
}
}
// NewReceiptsFetcher creates a receipt fetcher that can iteratively fetch the receipts matching the given txs.
func NewReceiptsFetcher(block eth.BlockID, receiptHash common.Hash, txHashes []common.Hash, getBatch batchCallContextFn, batchSize int) eth.ReceiptsFetcher {
return NewIterativeBatchCall[common.Hash, *types.Receipt, types.Receipts](
txHashes,
makeReceiptRequest,
makeReceiptsFn(block, receiptHash),
getBatch,
batchSize,
)
}
......@@ -13,14 +13,11 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
type SourceConfig struct {
// batching parameters
MaxParallelBatching int
MaxBatchRetry int
// Maximum number of requests to make per batch
MaxRequestsPerBatch int
// limit concurrent requests, applies to the source as a whole
......@@ -55,12 +52,6 @@ func (c *SourceConfig) Check() error {
if c.MaxConcurrentRequests < 1 {
return fmt.Errorf("expected at least 1 concurrent request, but max is %d", c.MaxConcurrentRequests)
}
if c.MaxParallelBatching < 1 {
return fmt.Errorf("expected at least 1 batch request to run at a time, but max is %d", c.MaxParallelBatching)
}
if c.MaxBatchRetry < 0 || c.MaxBatchRetry > 20 {
return fmt.Errorf("number of max batch retries is not reasonable: %d", c.MaxBatchRetry)
}
if c.MaxRequestsPerBatch < 1 {
return fmt.Errorf("expected at least 1 request per batch, but max is: %d", c.MaxRequestsPerBatch)
}
......@@ -68,19 +59,18 @@ func (c *SourceConfig) Check() error {
}
func DefaultConfig(config *rollup.Config, trustRPC bool) *SourceConfig {
// Cache 3/2 worth of sequencing window of receipts and txs, up to 400 per block.
// Cache 3/2 worth of sequencing window of receipts and txs
span := int(config.SeqWindowSize) * 3 / 2
if span > 1000 { // sanity cap. If a large sequencing window is configured, do not make the cache too large
span = 1000
}
return &SourceConfig{
ReceiptsCacheSize: span * 400,
TransactionsCacheSize: span * 400,
// receipts and transactions are cached per block
ReceiptsCacheSize: span,
TransactionsCacheSize: span,
HeadersCacheSize: span,
// TODO: tune batch params
MaxParallelBatching: 8,
MaxBatchRetry: 3,
// TODO: tune batch param
MaxRequestsPerBatch: 20,
MaxConcurrentRequests: 10,
......@@ -97,6 +87,7 @@ type Source struct {
client client.RPC
batchCall batchCallContextFn
maxBatchSize int
trustRPC bool
......@@ -114,19 +105,16 @@ type Source struct {
}
// NewSource wraps a RPC with bindings to fetch L1 data, while logging errors, tracking metrics (optional), and caching.
func NewSource(client client.RPC, log log.Logger, metrics caching.Metrics, config *SourceConfig) (*Source, error) {
func NewSource(client client.RPC, metrics caching.Metrics, config *SourceConfig) (*Source, error) {
if err := config.Check(); err != nil {
return nil, fmt.Errorf("bad config, cannot create L1 source: %w", err)
}
client = LimitRPC(client, config.MaxConcurrentRequests)
// Batch calls will be split up to handle max-batch size,
// and parallelized since the RPC server does not parallelize batch contents otherwise.
getBatch := parallelBatchCall(log, client.BatchCallContext,
config.MaxBatchRetry, config.MaxRequestsPerBatch, config.MaxParallelBatching)
return &Source{
client: client,
batchCall: getBatch,
batchCall: client.BatchCallContext,
maxBatchSize: config.MaxRequestsPerBatch,
trustRPC: config.TrustRPC,
receiptsCache: caching.NewLRUCache(metrics, "receipts", config.ReceiptsCacheSize),
transactionsCache: caching.NewLRUCache(metrics, "txs", config.TransactionsCacheSize),
......@@ -212,21 +200,21 @@ func (s *Source) InfoAndTxsHead(ctx context.Context) (eth.L1Info, types.Transact
return s.blockCall(ctx, "eth_getBlockByNumber", "latest")
}
func (s *Source) Fetch(ctx context.Context, blockHash common.Hash) (eth.L1Info, types.Transactions, types.Receipts, error) {
if blockHash == (common.Hash{}) {
return nil, nil, nil, ethereum.NotFound
}
info, txs, err := s.blockCall(ctx, "eth_getBlockByHash", blockHash)
func (s *Source) Fetch(ctx context.Context, blockHash common.Hash) (eth.L1Info, types.Transactions, eth.ReceiptsFetcher, error) {
info, txs, err := s.InfoAndTxsByHash(ctx, blockHash)
if err != nil {
return nil, nil, nil, err
}
receipts, err := fetchReceipts(ctx, info.ID(), info.receiptHash, txs, s.batchCall)
if err != nil {
return nil, nil, nil, err
if v, ok := s.receiptsCache.Get(blockHash); ok {
return info, txs, v.(eth.ReceiptsFetcher), nil
}
txHashes := make([]common.Hash, len(txs))
for i := 0; i < len(txs); i++ {
txHashes[i] = txs[i].Hash()
}
s.receiptsCache.Add(info.hash, receipts)
return info, txs, receipts, nil
r := NewReceiptsFetcher(info.ID(), info.ReceiptHash(), txHashes, s.batchCall, s.maxBatchSize)
s.receiptsCache.Add(blockHash, r)
return info, txs, r, nil
}
// FetchAllTransactions fetches transaction lists of a window of blocks, and caches each block and the transactions
......
......@@ -9,11 +9,9 @@ import (
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
......@@ -88,7 +86,6 @@ func randTxs(offset uint64, count uint64) types.Transactions {
}
func TestSource_InfoByHash(t *testing.T) {
log := testlog.Logger(t, log.LvlError)
m := new(mockRPC)
hdr := randHeader()
rhdr := &rpcHeader{
......@@ -101,7 +98,7 @@ func TestSource_InfoByHash(t *testing.T) {
m.On("CallContext", ctx, new(*rpcHeader), "eth_getBlockByHash", []interface{}{h, false}).Run(func(args mock.Arguments) {
*args[1].(**rpcHeader) = rhdr
}).Return([]error{nil})
s, err := NewSource(m, log, nil, DefaultConfig(&rollup.Config{SeqWindowSize: 10}, true))
s, err := NewSource(m, nil, DefaultConfig(&rollup.Config{SeqWindowSize: 10}, true))
assert.NoError(t, err)
info, err := s.InfoByHash(ctx, h)
assert.NoError(t, err)
......@@ -115,7 +112,6 @@ func TestSource_InfoByHash(t *testing.T) {
}
func TestSource_InfoByNumber(t *testing.T) {
log := testlog.Logger(t, log.LvlError)
m := new(mockRPC)
hdr := randHeader()
rhdr := &rpcHeader{
......@@ -128,7 +124,7 @@ func TestSource_InfoByNumber(t *testing.T) {
m.On("CallContext", ctx, new(*rpcHeader), "eth_getBlockByNumber", []interface{}{hexutil.EncodeUint64(n), false}).Run(func(args mock.Arguments) {
*args[1].(**rpcHeader) = rhdr
}).Return([]error{nil})
s, err := NewSource(m, log, nil, DefaultConfig(&rollup.Config{SeqWindowSize: 10}, true))
s, err := NewSource(m, nil, DefaultConfig(&rollup.Config{SeqWindowSize: 10}, true))
assert.NoError(t, err)
info, err := s.InfoByNumber(ctx, n)
assert.NoError(t, err)
......@@ -137,7 +133,6 @@ func TestSource_InfoByNumber(t *testing.T) {
}
func TestSource_FetchAllTransactions(t *testing.T) {
log := testlog.Logger(t, log.LvlError)
m := new(mockRPC)
ctx := context.Background()
......@@ -180,7 +175,7 @@ func TestSource_FetchAllTransactions(t *testing.T) {
}
}).Return([]error{nil})
s, err := NewSource(m, log, nil, DefaultConfig(&rollup.Config{SeqWindowSize: 10}, true))
s, err := NewSource(m, nil, DefaultConfig(&rollup.Config{SeqWindowSize: 10}, true))
assert.NoError(t, err)
s.batchCall = m.batchCall // override the optimized batch call
......
......@@ -110,7 +110,7 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
return fmt.Errorf("failed to get L1 RPC client: %w", err)
}
n.l1Source, err = l1.NewSource(client.NewInstrumentedRPC(l1Node, n.metrics), n.log, n.metrics.L1SourceCache, l1.DefaultConfig(&cfg.Rollup, trustRPC))
n.l1Source, err = l1.NewSource(client.NewInstrumentedRPC(l1Node, n.metrics), n.metrics.L1SourceCache, l1.DefaultConfig(&cfg.Rollup, trustRPC))
if err != nil {
return fmt.Errorf("failed to create L1 source: %v", err)
}
......
......@@ -3,6 +3,7 @@ package derive
import (
"context"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
......@@ -14,7 +15,7 @@ import (
// L1ReceiptsFetcher fetches L1 header info and receipts for the payload attributes derivation (the info tx and deposits)
type L1ReceiptsFetcher interface {
InfoByHash(ctx context.Context, hash common.Hash) (eth.L1Info, error)
Fetch(ctx context.Context, blockHash common.Hash) (eth.L1Info, types.Transactions, types.Receipts, error)
Fetch(ctx context.Context, blockHash common.Hash) (eth.L1Info, types.Transactions, eth.ReceiptsFetcher, error)
}
// PreparePayloadAttributes prepares a PayloadAttributes template that is ready to build a L2 block with deposits only, on top of the given l2Parent, with the given epoch as L1 origin.
......@@ -31,7 +32,7 @@ func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1Rece
// case we need to fetch all transaction receipts from the L1 origin block so we can scan for
// user deposits.
if l2Parent.L1Origin.Number != epoch.Number {
info, _, receipts, err := dl.Fetch(ctx, epoch.Hash)
info, _, receiptsFetcher, err := dl.Fetch(ctx, epoch.Hash)
if err != nil {
return nil, NewTemporaryError(fmt.Errorf("failed to fetch L1 block info and receipts: %w", err))
}
......@@ -40,6 +41,17 @@ func PreparePayloadAttributes(ctx context.Context, cfg *rollup.Config, dl L1Rece
fmt.Errorf("cannot create new block with L1 origin %s (parent %s) on top of L1 origin %s",
epoch, info.ParentHash(), l2Parent.L1Origin))
}
for {
if err := receiptsFetcher.Fetch(ctx); err == io.EOF {
break
} else if err != nil {
return nil, NewTemporaryError(fmt.Errorf("failed to fetch more receipts: %w", err))
}
}
receipts, err := receiptsFetcher.Result()
if err != nil {
return nil, NewResetError(fmt.Errorf("fetched bad receipt data: %w", err))
}
deposits, err := DeriveDeposits(receipts, cfg.DepositContractAddress)
if err != nil {
// deposits may never be ignored. Failing to process them is a critical error.
......
......@@ -37,7 +37,7 @@ type Metrics interface {
type Downloader interface {
InfoByHash(ctx context.Context, hash common.Hash) (eth.L1Info, error)
Fetch(ctx context.Context, blockHash common.Hash) (eth.L1Info, types.Transactions, types.Receipts, error)
Fetch(ctx context.Context, blockHash common.Hash) (eth.L1Info, types.Transactions, eth.ReceiptsFetcher, error)
}
type L1Chain interface {
......
......@@ -45,13 +45,13 @@ func (m *MockL1Source) ExpectL1BlockRefByHash(hash common.Hash, ref eth.L1BlockR
m.Mock.On("L1BlockRefByHash", hash).Once().Return(ref, &err)
}
func (m *MockL1Source) Fetch(ctx context.Context, blockHash common.Hash) (eth.L1Info, types.Transactions, types.Receipts, error) {
func (m *MockL1Source) Fetch(ctx context.Context, blockHash common.Hash) (eth.L1Info, types.Transactions, eth.ReceiptsFetcher, error) {
out := m.Mock.MethodCalled("Fetch", blockHash)
return *out[0].(*eth.L1Info), out[1].(types.Transactions), out[2].(types.Receipts), *out[3].(*error)
return *out[0].(*eth.L1Info), out[1].(types.Transactions), out[2].(eth.ReceiptsFetcher), *out[3].(*error)
}
func (m *MockL1Source) ExpectFetch(hash common.Hash, info eth.L1Info, transactions types.Transactions, receipts types.Receipts, err error) {
m.Mock.On("Fetch", hash).Once().Return(&info, transactions, receipts, &err)
m.Mock.On("Fetch", hash).Once().Return(&info, transactions, eth.FetchedReceipts(receipts), &err)
}
func (m *MockL1Source) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.L1Info, types.Transactions, error) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment