Commit 1c013805 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

Merge pull request #8130 from ethereum-optimism/seb/receipts-provider

op-service: Refactor `EthClient` with `ReceiptsProvider` abstraction
parents 658d2a68 0745c50e
...@@ -63,7 +63,9 @@ type EthClientConfig struct { ...@@ -63,7 +63,9 @@ type EthClientConfig struct {
// If this is 0 then the client does not fall back to less optimal but available methods. // If this is 0 then the client does not fall back to less optimal but available methods.
MethodResetDuration time.Duration MethodResetDuration time.Duration
// [OPTIONAL] The reth DB path to fetch receipts from // [OPTIONAL] The reth DB path to fetch receipts from.
// If it is specified, the rethdb receipts fetcher will be used
// and the RPC configuration parameters don't need to be set.
RethDBPath string RethDBPath string
} }
...@@ -80,6 +82,15 @@ func (c *EthClientConfig) Check() error { ...@@ -80,6 +82,15 @@ func (c *EthClientConfig) Check() error {
if c.PayloadsCacheSize < 0 { if c.PayloadsCacheSize < 0 {
return fmt.Errorf("invalid payloads cache size: %d", c.PayloadsCacheSize) return fmt.Errorf("invalid payloads cache size: %d", c.PayloadsCacheSize)
} }
if c.RethDBPath != "" {
if buildRethdb {
// If the rethdb path is set, we use the rethdb receipts fetcher and skip creating
// an RCP receipts fetcher, so below rpc config parameters don't need to be checked.
return nil
} else {
return fmt.Errorf("rethdb path specified, but built without rethdb support")
}
}
if c.MaxConcurrentRequests < 1 { if c.MaxConcurrentRequests < 1 {
return fmt.Errorf("expected at least 1 concurrent request, but max is %d", c.MaxConcurrentRequests) return fmt.Errorf("expected at least 1 concurrent request, but max is %d", c.MaxConcurrentRequests)
} }
...@@ -96,21 +107,14 @@ func (c *EthClientConfig) Check() error { ...@@ -96,21 +107,14 @@ func (c *EthClientConfig) Check() error {
type EthClient struct { type EthClient struct {
client client.RPC client client.RPC
maxBatchSize int recProvider ReceiptsProvider
trustRPC bool trustRPC bool
mustBePostMerge bool mustBePostMerge bool
provKind RPCProviderKind
log log.Logger log log.Logger
// cache receipts in bundles per block hash
// We cache the receipts fetching job to not lose progress when we have to retry the `Fetch` call
// common.Hash -> *receiptsFetchingJob
receiptsCache *caching.LRUCache[common.Hash, *receiptsFetchingJob]
// cache transactions in bundles per block hash // cache transactions in bundles per block hash
// common.Hash -> types.Transactions // common.Hash -> types.Transactions
transactionsCache *caching.LRUCache[common.Hash, types.Transactions] transactionsCache *caching.LRUCache[common.Hash, types.Transactions]
...@@ -122,46 +126,6 @@ type EthClient struct { ...@@ -122,46 +126,6 @@ type EthClient struct {
// cache payloads by hash // cache payloads by hash
// common.Hash -> *eth.ExecutionPayload // common.Hash -> *eth.ExecutionPayload
payloadsCache *caching.LRUCache[common.Hash, *eth.ExecutionPayload] payloadsCache *caching.LRUCache[common.Hash, *eth.ExecutionPayload]
// availableReceiptMethods tracks which receipt methods can be used for fetching receipts
// This may be modified concurrently, but we don't lock since it's a single
// uint64 that's not critical (fine to miss or mix up a modification)
availableReceiptMethods ReceiptsFetchingMethod
// lastMethodsReset tracks when availableReceiptMethods was last reset.
// When receipt-fetching fails it falls back to available methods,
// but periodically it will try to reset to the preferred optimal methods.
lastMethodsReset time.Time
// methodResetDuration defines how long we take till we reset lastMethodsReset
methodResetDuration time.Duration
// [OPTIONAL] The reth DB path to fetch receipts from
rethDbPath string
}
func (s *EthClient) PickReceiptsMethod(txCount uint64) ReceiptsFetchingMethod {
if now := time.Now(); now.Sub(s.lastMethodsReset) > s.methodResetDuration {
m := AvailableReceiptsFetchingMethods(s.provKind)
if s.availableReceiptMethods != m {
s.log.Warn("resetting back RPC preferences, please review RPC provider kind setting", "kind", s.provKind.String())
}
s.availableReceiptMethods = m
s.lastMethodsReset = now
}
return PickBestReceiptsFetchingMethod(s.provKind, s.availableReceiptMethods, txCount)
}
func (s *EthClient) OnReceiptsMethodErr(m ReceiptsFetchingMethod, err error) {
if unusableMethod(err) {
// clear the bit of the method that errored
s.availableReceiptMethods &^= m
s.log.Warn("failed to use selected RPC method for receipt fetching, temporarily falling back to alternatives",
"provider_kind", s.provKind, "failed_method", m, "fallback", s.availableReceiptMethods, "err", err)
} else {
s.log.Debug("failed to use selected RPC method for receipt fetching, but method does appear to be available, so we continue to use it",
"provider_kind", s.provKind, "failed_method", m, "fallback", s.availableReceiptMethods&^m, "err", err)
}
} }
// NewEthClient returns an [EthClient], wrapping an RPC with bindings to fetch ethereum data with added error logging, // NewEthClient returns an [EthClient], wrapping an RPC with bindings to fetch ethereum data with added error logging,
...@@ -170,22 +134,18 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co ...@@ -170,22 +134,18 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co
if err := config.Check(); err != nil { if err := config.Check(); err != nil {
return nil, fmt.Errorf("bad config, cannot create L1 source: %w", err) return nil, fmt.Errorf("bad config, cannot create L1 source: %w", err)
} }
client = LimitRPC(client, config.MaxConcurrentRequests) client = LimitRPC(client, config.MaxConcurrentRequests)
recProvider := newRecProviderFromConfig(client, log, metrics, config)
return &EthClient{ return &EthClient{
client: client, client: client,
maxBatchSize: config.MaxRequestsPerBatch, recProvider: recProvider,
trustRPC: config.TrustRPC, trustRPC: config.TrustRPC,
mustBePostMerge: config.MustBePostMerge, mustBePostMerge: config.MustBePostMerge,
provKind: config.RPCProviderKind, log: log,
log: log, transactionsCache: caching.NewLRUCache[common.Hash, types.Transactions](metrics, "txs", config.TransactionsCacheSize),
receiptsCache: caching.NewLRUCache[common.Hash, *receiptsFetchingJob](metrics, "receipts", config.ReceiptsCacheSize), headersCache: caching.NewLRUCache[common.Hash, eth.BlockInfo](metrics, "headers", config.HeadersCacheSize),
transactionsCache: caching.NewLRUCache[common.Hash, types.Transactions](metrics, "txs", config.TransactionsCacheSize), payloadsCache: caching.NewLRUCache[common.Hash, *eth.ExecutionPayload](metrics, "payloads", config.PayloadsCacheSize),
headersCache: caching.NewLRUCache[common.Hash, eth.BlockInfo](metrics, "headers", config.HeadersCacheSize),
payloadsCache: caching.NewLRUCache[common.Hash, *eth.ExecutionPayload](metrics, "payloads", config.PayloadsCacheSize),
availableReceiptMethods: AvailableReceiptsFetchingMethods(config.RPCProviderKind),
lastMethodsReset: time.Now(),
methodResetDuration: config.MethodResetDuration,
rethDbPath: config.RethDBPath,
}, nil }, nil
} }
...@@ -354,24 +314,21 @@ func (s *EthClient) PayloadByLabel(ctx context.Context, label eth.BlockLabel) (* ...@@ -354,24 +314,21 @@ func (s *EthClient) PayloadByLabel(ctx context.Context, label eth.BlockLabel) (*
func (s *EthClient) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) { func (s *EthClient) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) {
info, txs, err := s.InfoAndTxsByHash(ctx, blockHash) info, txs, err := s.InfoAndTxsByHash(ctx, blockHash)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, fmt.Errorf("querying block: %w", err)
} }
// Try to reuse the receipts fetcher because is caches the results of intermediate calls. This means
// that if just one of many calls fail, we only retry the failed call rather than all of the calls. txHashes, block := eth.TransactionsToHashes(txs), eth.ToBlockID(info)
// The underlying fetcher uses the receipts hash to verify receipt integrity. receipts, err := s.recProvider.FetchReceipts(ctx, block, txHashes)
var job *receiptsFetchingJob
if v, ok := s.receiptsCache.Get(blockHash); ok {
job = v
} else {
txHashes := eth.TransactionsToHashes(txs)
job = NewReceiptsFetchingJob(s, s.client, s.maxBatchSize, eth.ToBlockID(info), info.ReceiptHash(), txHashes, s.rethDbPath)
s.receiptsCache.Add(blockHash, job)
}
receipts, err := job.Fetch(ctx)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
if !s.trustRPC {
if err := validateReceipts(block, info.ReceiptHash(), txHashes, receipts); err != nil {
return info, nil, fmt.Errorf("invalid receipts: %w", err)
}
}
return info, receipts, nil return info, receipts, nil
} }
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
crand "crypto/rand" crand "crypto/rand"
"math/big" "math/big"
"math/rand"
"testing" "testing"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
...@@ -18,6 +19,7 @@ import ( ...@@ -18,6 +19,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
) )
type mockRPC struct { type mockRPC struct {
...@@ -177,3 +179,45 @@ func TestEthClient_WrongInfoByHash(t *testing.T) { ...@@ -177,3 +179,45 @@ func TestEthClient_WrongInfoByHash(t *testing.T) {
require.Error(t, err, "cannot accept the wrong block") require.Error(t, err, "cannot accept the wrong block")
m.Mock.AssertExpectations(t) m.Mock.AssertExpectations(t)
} }
func TestEthClient_validateReceipts(t *testing.T) {
require := require.New(t)
mrpc := new(mockRPC)
mrp := new(mockReceiptsProvider)
const numTxs = 4
block, receipts := randomRpcBlockAndReceipts(rand.New(rand.NewSource(420)), numTxs)
txHashes := receiptTxHashes(receipts)
ctx := context.Background()
// mutate a field to make validation fail.
receipts[2].Bloom[0] = 1
mrpc.On("CallContext", ctx, mock.AnythingOfType("**sources.rpcBlock"),
"eth_getBlockByHash", []any{block.Hash, true}).
Run(func(args mock.Arguments) {
*(args[1].(**rpcBlock)) = block
}).
Return([]error{nil}).Once()
mrp.On("FetchReceipts", ctx, block.BlockID(), txHashes).
Return(types.Receipts(receipts), error(nil)).Once()
ethcl := newEthClientWithCaches(nil, numTxs)
ethcl.client = mrpc
ethcl.recProvider = mrp
ethcl.trustRPC = false
_, _, err := ethcl.FetchReceipts(ctx, block.Hash)
require.ErrorContains(err, "invalid receipts")
mrpc.AssertExpectations(t)
mrp.AssertExpectations(t)
}
func newEthClientWithCaches(metrics caching.Metrics, cacheSize int) *EthClient {
return &EthClient{
transactionsCache: caching.NewLRUCache[common.Hash, types.Transactions](metrics, "txs", cacheSize),
headersCache: caching.NewLRUCache[common.Hash, eth.BlockInfo](metrics, "headers", cacheSize),
payloadsCache: caching.NewLRUCache[common.Hash, *eth.ExecutionPayload](metrics, "payloads", cacheSize),
}
}
...@@ -3,19 +3,20 @@ package sources ...@@ -3,19 +3,20 @@ package sources
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"sync"
"github.com/ethereum-optimism/optimism/op-service/sources/batching" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"github.com/ethereum-optimism/optimism/op-service/eth"
) )
type ReceiptsProvider interface {
// FetchReceipts returns a block info and all of the receipts associated with transactions in the block.
// It verifies the receipt hash in the block header against the receipt hash of the fetched receipts
// to ensure that the execution engine did not fail to return any receipts.
FetchReceipts(ctx context.Context, block eth.BlockID, txHashes []common.Hash) (types.Receipts, error)
}
// validateReceipts validates that the receipt contents are valid. // validateReceipts validates that the receipt contents are valid.
// Warning: contractAddress is not verified, since it is a more expensive operation for data we do not use. // Warning: contractAddress is not verified, since it is a more expensive operation for data we do not use.
// See go-ethereum/crypto.CreateAddress to verify contract deployment address data based on sender and tx nonce. // See go-ethereum/crypto.CreateAddress to verify contract deployment address data based on sender and tx nonce.
...@@ -88,453 +89,3 @@ func validateReceipts(block eth.BlockID, receiptHash common.Hash, txHashes []com ...@@ -88,453 +89,3 @@ func validateReceipts(block eth.BlockID, receiptHash common.Hash, txHashes []com
} }
return nil return nil
} }
func makeReceiptRequest(txHash common.Hash) (*types.Receipt, rpc.BatchElem) {
out := new(types.Receipt)
return out, rpc.BatchElem{
Method: "eth_getTransactionReceipt",
Args: []any{txHash},
Result: &out, // receipt may become nil, double pointer is intentional
}
}
// Cost break-down sources:
// Alchemy: https://docs.alchemy.com/reference/compute-units
// QuickNode: https://www.quicknode.com/docs/ethereum/api_credits
// Infura: no pricing table available.
//
// Receipts are encoded the same everywhere:
//
// blockHash, blockNumber, transactionIndex, transactionHash, from, to, cumulativeGasUsed, gasUsed,
// contractAddress, logs, logsBloom, status, effectiveGasPrice, type.
//
// Note that Alchemy/Geth still have a "root" field for legacy reasons,
// but ethereum does not compute state-roots per tx anymore, so quicknode and others do not serve this data.
// RPCProviderKind identifies an RPC provider, used to hint at the optimal receipt fetching approach.
type RPCProviderKind string
const (
RPCKindAlchemy RPCProviderKind = "alchemy"
RPCKindQuickNode RPCProviderKind = "quicknode"
RPCKindInfura RPCProviderKind = "infura"
RPCKindParity RPCProviderKind = "parity"
RPCKindNethermind RPCProviderKind = "nethermind"
RPCKindDebugGeth RPCProviderKind = "debug_geth"
RPCKindErigon RPCProviderKind = "erigon"
RPCKindBasic RPCProviderKind = "basic" // try only the standard most basic receipt fetching
RPCKindAny RPCProviderKind = "any" // try any method available
RPCKindStandard RPCProviderKind = "standard" // try standard methods, including newer optimized standard RPC methods
RPCKindRethDB RPCProviderKind = "reth_db" // read data directly from reth's database
)
var RPCProviderKinds = []RPCProviderKind{
RPCKindAlchemy,
RPCKindQuickNode,
RPCKindInfura,
RPCKindParity,
RPCKindNethermind,
RPCKindDebugGeth,
RPCKindErigon,
RPCKindBasic,
RPCKindAny,
RPCKindStandard,
}
// Copy of RPCProviderKinds with RethDB added to all RethDB to be used but to hide it from the flags
var validRPCProviderKinds = func() []RPCProviderKind {
return append(RPCProviderKinds, RPCKindRethDB)
}()
func (kind RPCProviderKind) String() string {
return string(kind)
}
func (kind *RPCProviderKind) Set(value string) error {
if !ValidRPCProviderKind(RPCProviderKind(value)) {
return fmt.Errorf("unknown rpc kind: %q", value)
}
*kind = RPCProviderKind(value)
return nil
}
func (kind *RPCProviderKind) Clone() any {
cpy := *kind
return &cpy
}
func ValidRPCProviderKind(value RPCProviderKind) bool {
for _, k := range validRPCProviderKinds {
if k == value {
return true
}
}
return false
}
// ReceiptsFetchingMethod is a bitfield with 1 bit for each receipts fetching type.
// Depending on errors, tx counts and preferences the code may select different sets of fetching methods.
type ReceiptsFetchingMethod uint64
func (r ReceiptsFetchingMethod) String() string {
out := ""
x := r
addMaybe := func(m ReceiptsFetchingMethod, v string) {
if x&m != 0 {
out += v
x ^= x & m
}
if x != 0 { // add separator if there are entries left
out += ", "
}
}
addMaybe(EthGetTransactionReceiptBatch, "eth_getTransactionReceipt (batched)")
addMaybe(AlchemyGetTransactionReceipts, "alchemy_getTransactionReceipts")
addMaybe(DebugGetRawReceipts, "debug_getRawReceipts")
addMaybe(ParityGetBlockReceipts, "parity_getBlockReceipts")
addMaybe(EthGetBlockReceipts, "eth_getBlockReceipts")
addMaybe(ErigonGetBlockReceiptsByBlockHash, "erigon_getBlockReceiptsByBlockHash")
addMaybe(^ReceiptsFetchingMethod(0), "unknown") // if anything is left, describe it as unknown
return out
}
const (
// EthGetTransactionReceiptBatch is standard per-tx receipt fetching with JSON-RPC batches.
// Available in: standard, everywhere.
// - Alchemy: 15 CU / tx
// - Quicknode: 2 credits / tx
// Method: eth_getTransactionReceipt
// See: https://ethereum.github.io/execution-apis/api-documentation/
EthGetTransactionReceiptBatch ReceiptsFetchingMethod = 1 << iota
// AlchemyGetTransactionReceipts is a special receipt fetching method provided by Alchemy.
// Available in:
// - Alchemy: 250 CU total
// Method: alchemy_getTransactionReceipts
// Params:
// - object with "blockNumber" or "blockHash" field
// Returns: "array of receipts" - docs lie, array is wrapped in a struct with single "receipts" field
// See: https://docs.alchemy.com/reference/alchemy-gettransactionreceipts#alchemy_gettransactionreceipts
AlchemyGetTransactionReceipts
// DebugGetRawReceipts is a debug method from Geth, faster by avoiding serialization and metadata overhead.
// Ideal for fast syncing from a local geth node.
// Available in:
// - Geth: free
// - QuickNode: 22 credits maybe? Unknown price, undocumented ("debug_getblockreceipts" exists in table though?)
// Method: debug_getRawReceipts
// Params:
// - string presenting a block number or hash
// Returns: list of strings, hex encoded RLP of receipts data. "consensus-encoding of all receipts in a single block"
// See: https://geth.ethereum.org/docs/rpc/ns-debug#debug_getrawreceipts
DebugGetRawReceipts
// ParityGetBlockReceipts is an old parity method, which has been adopted by Nethermind and some RPC providers.
// Available in:
// - Alchemy: 500 CU total
// - QuickNode: 59 credits - docs are wrong, not actually available anymore.
// - Any open-ethereum/parity legacy: free
// - Nethermind: free
// Method: parity_getBlockReceipts
// Params:
// Parity: "quantity or tag"
// Alchemy: string with block hash, number in hex, or block tag.
// Nethermind: very flexible: tag, number, hex or object with "requireCanonical"/"blockHash" fields.
// Returns: array of receipts
// See:
// - Parity: https://openethereum.github.io/JSONRPC-parity-module#parity_getblockreceipts
// - QuickNode: undocumented.
// - Alchemy: https://docs.alchemy.com/reference/eth-getblockreceipts
// - Nethermind: https://docs.nethermind.io/nethermind/ethereum-client/json-rpc/parity#parity_getblockreceipts
ParityGetBlockReceipts
// EthGetBlockReceipts is a previously non-standard receipt fetching method in the eth namespace,
// supported by some RPC platforms.
// This since has been standardized in https://github.com/ethereum/execution-apis/pull/438 and adopted in Geth:
// https://github.com/ethereum/go-ethereum/pull/27702
// Available in:
// - Alchemy: 500 CU total (and deprecated)
// - QuickNode: 59 credits total (does not seem to work with block hash arg, inaccurate docs)
// - Standard, incl. Geth, Besu and Reth, and Nethermind has a PR in review.
// Method: eth_getBlockReceipts
// Params:
// - QuickNode: string, "quantity or tag", docs say incl. block hash, but API does not actually accept it.
// - Alchemy: string, block hash / num (hex) / block tag
// Returns: array of receipts
// See:
// - QuickNode: https://www.quicknode.com/docs/ethereum/eth_getBlockReceipts
// - Alchemy: https://docs.alchemy.com/reference/eth-getblockreceipts
// Erigon has this available, but does not support block-hash argument to the method:
// https://github.com/ledgerwatch/erigon/blob/287a3d1d6c90fc6a7a088b5ae320f93600d5a167/cmd/rpcdaemon/commands/eth_receipts.go#L571
EthGetBlockReceipts
// ErigonGetBlockReceiptsByBlockHash is an Erigon-specific receipt fetching method,
// the same as EthGetBlockReceipts but supporting a block-hash argument.
// Available in:
// - Erigon
// Method: erigon_getBlockReceiptsByBlockHash
// Params:
// - Erigon: string, hex-encoded block hash
// Returns:
// - Erigon: array of json-ified receipts
// See:
// https://github.com/ledgerwatch/erigon/blob/287a3d1d6c90fc6a7a088b5ae320f93600d5a167/cmd/rpcdaemon/commands/erigon_receipts.go#LL391C24-L391C51
ErigonGetBlockReceiptsByBlockHash
// RethGetBlockReceiptsMDBX is a Reth-specific receipt fetching method. It reads the data directly from reth's database, using their
// generic DB abstractions, rather than requesting it from the RPC provider.
// Available in:
// - Reth
// Method: n/a - does not use RPC.
// Params:
// - Reth: string, hex-encoded block hash
// Returns:
// - Reth: string, json-ified receipts
// See:
// - reth's DB crate documentation: https://github.com/paradigmxyz/reth/blob/main/docs/crates/db.md
RethGetBlockReceipts
// Other:
// - 250 credits, not supported, strictly worse than other options. In quicknode price-table.
// qn_getBlockWithReceipts - in price table, ? undocumented, but in quicknode "Single Flight RPC" description
// qn_getReceipts - in price table, ? undocumented, but in quicknode "Single Flight RPC" description
// debug_getBlockReceipts - ? undocumented, shows up in quicknode price table, not available.
)
// AvailableReceiptsFetchingMethods selects receipt fetching methods based on the RPC provider kind.
func AvailableReceiptsFetchingMethods(kind RPCProviderKind) ReceiptsFetchingMethod {
switch kind {
case RPCKindAlchemy:
return AlchemyGetTransactionReceipts | EthGetBlockReceipts | EthGetTransactionReceiptBatch
case RPCKindQuickNode:
return DebugGetRawReceipts | EthGetBlockReceipts | EthGetTransactionReceiptBatch
case RPCKindInfura:
// Infura is big, but sadly does not support more optimized receipts fetching methods (yet?)
return EthGetTransactionReceiptBatch
case RPCKindParity:
return ParityGetBlockReceipts | EthGetTransactionReceiptBatch
case RPCKindNethermind:
return ParityGetBlockReceipts | EthGetTransactionReceiptBatch
case RPCKindDebugGeth:
return DebugGetRawReceipts | EthGetTransactionReceiptBatch
case RPCKindErigon:
return ErigonGetBlockReceiptsByBlockHash | EthGetTransactionReceiptBatch
case RPCKindBasic:
return EthGetTransactionReceiptBatch
case RPCKindAny:
// if it's any kind of RPC provider, then try all methods (except for RethGetBlockReceipts)
return AlchemyGetTransactionReceipts | EthGetBlockReceipts |
DebugGetRawReceipts | ErigonGetBlockReceiptsByBlockHash |
ParityGetBlockReceipts | EthGetTransactionReceiptBatch
case RPCKindStandard:
return EthGetBlockReceipts | EthGetTransactionReceiptBatch
case RPCKindRethDB:
return RethGetBlockReceipts
default:
return EthGetTransactionReceiptBatch
}
}
// PickBestReceiptsFetchingMethod selects an RPC method that is still available,
// and optimal for fetching the given number of tx receipts from the specified provider kind.
func PickBestReceiptsFetchingMethod(kind RPCProviderKind, available ReceiptsFetchingMethod, txCount uint64) ReceiptsFetchingMethod {
// If we have optimized methods available, it makes sense to use them, but only if the cost is
// lower than fetching transactions one by one with the standard receipts RPC method.
if kind == RPCKindRethDB {
return RethGetBlockReceipts
} else if kind == RPCKindAlchemy {
if available&AlchemyGetTransactionReceipts != 0 && txCount > 250/15 {
return AlchemyGetTransactionReceipts
}
if available&EthGetBlockReceipts != 0 && txCount > 500/15 {
return EthGetBlockReceipts
}
return EthGetTransactionReceiptBatch
} else if kind == RPCKindQuickNode {
if available&DebugGetRawReceipts != 0 {
return DebugGetRawReceipts
}
if available&EthGetBlockReceipts != 0 && txCount > 59/2 {
return EthGetBlockReceipts
}
return EthGetTransactionReceiptBatch
}
// in order of preference (based on cost): check available methods
if available&AlchemyGetTransactionReceipts != 0 {
return AlchemyGetTransactionReceipts
}
if available&DebugGetRawReceipts != 0 {
return DebugGetRawReceipts
}
if available&ErigonGetBlockReceiptsByBlockHash != 0 {
return ErigonGetBlockReceiptsByBlockHash
}
if available&EthGetBlockReceipts != 0 {
return EthGetBlockReceipts
}
if available&ParityGetBlockReceipts != 0 {
return ParityGetBlockReceipts
}
// otherwise fall back on per-tx fetching
return EthGetTransactionReceiptBatch
}
type rpcClient interface {
CallContext(ctx context.Context, result any, method string, args ...any) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
}
// receiptsFetchingJob runs the receipt fetching for a specific block,
// and can re-run and adapt based on the fetching method preferences and errors communicated with the requester.
type receiptsFetchingJob struct {
m sync.Mutex
requester ReceiptsRequester
client rpcClient
maxBatchSize int
block eth.BlockID
receiptHash common.Hash
txHashes []common.Hash
fetcher *batching.IterativeBatchCall[common.Hash, *types.Receipt]
// [OPTIONAL] RethDB path to fetch receipts from
rethDbPath string
result types.Receipts
}
func NewReceiptsFetchingJob(requester ReceiptsRequester, client rpcClient, maxBatchSize int, block eth.BlockID,
receiptHash common.Hash, txHashes []common.Hash, rethDb string) *receiptsFetchingJob {
return &receiptsFetchingJob{
requester: requester,
client: client,
maxBatchSize: maxBatchSize,
block: block,
receiptHash: receiptHash,
txHashes: txHashes,
rethDbPath: rethDb,
}
}
// ReceiptsRequester helps determine which receipts fetching method can be used,
// and is given feedback upon receipt fetching errors to adapt the choice of method.
type ReceiptsRequester interface {
PickReceiptsMethod(txCount uint64) ReceiptsFetchingMethod
OnReceiptsMethodErr(m ReceiptsFetchingMethod, err error)
}
// runFetcher retrieves the result by continuing previous batched receipt fetching work,
// and starting this work if necessary.
func (job *receiptsFetchingJob) runFetcher(ctx context.Context) error {
if job.fetcher == nil {
// start new work
job.fetcher = batching.NewIterativeBatchCall[common.Hash, *types.Receipt](
job.txHashes,
makeReceiptRequest,
job.client.BatchCallContext,
job.client.CallContext,
job.maxBatchSize,
)
}
// Fetch all receipts
for {
if err := job.fetcher.Fetch(ctx); err == io.EOF {
break
} else if err != nil {
return err
}
}
result, err := job.fetcher.Result()
if err != nil { // errors if results are not available yet, should never happen.
return err
}
if err := validateReceipts(job.block, job.receiptHash, job.txHashes, result); err != nil {
job.fetcher.Reset() // if results are fetched but invalid, try restart all the fetching to try and get valid data.
return err
}
// Remember the result, and don't keep the fetcher and tx hashes around for longer than needed
job.result = result
job.fetcher = nil
job.txHashes = nil
return nil
}
// receiptsWrapper is a decoding type util. Alchemy in particular wraps the receipts array result.
type receiptsWrapper struct {
Receipts []*types.Receipt `json:"receipts"`
}
// runAltMethod retrieves the result by fetching all receipts at once,
// using the given non-standard receipt fetching method.
func (job *receiptsFetchingJob) runAltMethod(ctx context.Context, m ReceiptsFetchingMethod) error {
var result []*types.Receipt
var err error
switch m {
case AlchemyGetTransactionReceipts:
var tmp receiptsWrapper
err = job.client.CallContext(ctx, &tmp, "alchemy_getTransactionReceipts", blockHashParameter{BlockHash: job.block.Hash})
result = tmp.Receipts
case DebugGetRawReceipts:
var rawReceipts []hexutil.Bytes
err = job.client.CallContext(ctx, &rawReceipts, "debug_getRawReceipts", job.block.Hash)
if err == nil {
if len(rawReceipts) == len(job.txHashes) {
result, err = eth.DecodeRawReceipts(job.block, rawReceipts, job.txHashes)
} else {
err = fmt.Errorf("got %d raw receipts, but expected %d", len(rawReceipts), len(job.txHashes))
}
}
case ParityGetBlockReceipts:
err = job.client.CallContext(ctx, &result, "parity_getBlockReceipts", job.block.Hash)
case EthGetBlockReceipts:
err = job.client.CallContext(ctx, &result, "eth_getBlockReceipts", job.block.Hash)
case ErigonGetBlockReceiptsByBlockHash:
err = job.client.CallContext(ctx, &result, "erigon_getBlockReceiptsByBlockHash", job.block.Hash)
case RethGetBlockReceipts:
if job.rethDbPath == "" {
return fmt.Errorf("reth_db path not set")
}
res, err := FetchRethReceipts(job.rethDbPath, &job.block.Hash)
if err != nil {
return err
}
result = res
default:
err = fmt.Errorf("unknown receipt fetching method: %d", uint64(m))
}
if err != nil {
job.requester.OnReceiptsMethodErr(m, err)
return err
} else {
if err := validateReceipts(job.block, job.receiptHash, job.txHashes, result); err != nil {
return err
}
job.result = result
return nil
}
}
// Fetch makes the job fetch the receipts, and returns the results, if any.
// An error may be returned if the fetching is not successfully completed,
// and fetching may be continued/re-attempted by calling Fetch again.
// The job caches the result, so repeated Fetches add no additional cost.
// Fetch is safe to be called concurrently, and will lock to avoid duplicate work or internal inconsistency.
func (job *receiptsFetchingJob) Fetch(ctx context.Context) (types.Receipts, error) {
job.m.Lock()
defer job.m.Unlock()
if job.result != nil {
return job.result, nil
}
m := job.requester.PickReceiptsMethod(uint64(len(job.txHashes)))
if m == EthGetTransactionReceiptBatch {
if err := job.runFetcher(ctx); err != nil {
return nil, err
}
} else {
if err := job.runAltMethod(ctx, m); err != nil {
return nil, err
}
}
return job.result, nil
}
package sources
import (
"context"
"io"
"sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)
type receiptsBatchCall = batching.IterativeBatchCall[common.Hash, *types.Receipt]
type BasicRPCReceiptsFetcher struct {
client rpcClient
maxBatchSize int
// calls caches uncompleted batch calls
calls map[common.Hash]*receiptsBatchCall
callsMu sync.Mutex
}
func NewBasicRPCReceiptsFetcher(client rpcClient, maxBatchSize int) *BasicRPCReceiptsFetcher {
return &BasicRPCReceiptsFetcher{
client: client,
maxBatchSize: maxBatchSize,
calls: make(map[common.Hash]*receiptsBatchCall),
}
}
func (f *BasicRPCReceiptsFetcher) FetchReceipts(ctx context.Context, block eth.BlockID, txHashes []common.Hash) (types.Receipts, error) {
call := f.getOrCreateBatchCall(block.Hash, txHashes)
// Fetch all receipts
for {
if err := call.Fetch(ctx); err == io.EOF {
break
} else if err != nil {
return nil, err
}
}
res, err := call.Result()
if err != nil {
return nil, err
}
// call successful, remove from cache
f.deleteBatchCall(block.Hash)
return res, nil
}
func (f *BasicRPCReceiptsFetcher) getOrCreateBatchCall(blockHash common.Hash, txHashes []common.Hash) *receiptsBatchCall {
f.callsMu.Lock()
defer f.callsMu.Unlock()
if call, ok := f.calls[blockHash]; ok {
return call
}
call := batching.NewIterativeBatchCall[common.Hash, *types.Receipt](
txHashes,
makeReceiptRequest,
f.client.BatchCallContext,
f.client.CallContext,
f.maxBatchSize,
)
f.calls[blockHash] = call
return call
}
func (f *BasicRPCReceiptsFetcher) deleteBatchCall(blockHash common.Hash) {
f.callsMu.Lock()
defer f.callsMu.Unlock()
delete(f.calls, blockHash)
}
func makeReceiptRequest(txHash common.Hash) (*types.Receipt, rpc.BatchElem) {
out := new(types.Receipt)
return out, rpc.BatchElem{
Method: "eth_getTransactionReceipt",
Args: []any{txHash},
Result: &out, // receipt may become nil, double pointer is intentional
}
}
package sources
import (
"context"
"errors"
"fmt"
"math/rand"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
// simpleMockRPC is needed for some tests where the return value dynamically
// depends on the input, so that the test can set the function.
type simpleMockRPC struct {
callFn func(ctx context.Context, result any, method string, args ...any) error
batchCallFn func(ctx context.Context, b []rpc.BatchElem) error
}
func (m *simpleMockRPC) CallContext(ctx context.Context, result any, method string, args ...any) error {
return m.callFn(ctx, result, method, args...)
}
func (m *simpleMockRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
return m.batchCallFn(ctx, b)
}
func TestBasicRPCReceiptsFetcher_Reuse(t *testing.T) {
require := require.New(t)
batchSize, txCount := 2, uint64(4)
block, receipts := randomRpcBlockAndReceipts(rand.New(rand.NewSource(123)), txCount)
blockid := block.BlockID()
txHashes := make([]common.Hash, 0, len(receipts))
recMap := make(map[common.Hash]*types.Receipt, len(receipts))
for _, rec := range receipts {
txHashes = append(txHashes, rec.TxHash)
recMap[rec.TxHash] = rec
}
mrpc := new(simpleMockRPC)
rp := NewBasicRPCReceiptsFetcher(mrpc, batchSize)
// prepare mock
ctx, done := context.WithTimeout(context.Background(), 10*time.Second)
defer done()
// 1st fetching
response := map[common.Hash]bool{
txHashes[0]: true,
txHashes[1]: true,
txHashes[2]: false,
txHashes[3]: false,
}
var numCalls int
mrpc.batchCallFn = func(_ context.Context, b []rpc.BatchElem) (err error) {
numCalls++
for i, el := range b {
if el.Method == "eth_getTransactionReceipt" {
txHash := el.Args[0].(common.Hash)
if response[txHash] {
// The IterativeBatchCall expects that the values are written
// to the fields of the allocated *types.Receipt.
**(el.Result.(**types.Receipt)) = *recMap[txHash]
} else {
err = errors.Join(err, fmt.Errorf("receipt[%d] error, hash %x", i, txHash))
}
} else {
err = errors.Join(err, fmt.Errorf("unknown method %s", el.Method))
}
}
return err
}
// 1st fetching should result in errors
recs, err := rp.FetchReceipts(ctx, blockid, txHashes)
require.Error(err)
require.Nil(recs)
require.Equal(2, numCalls)
// prepare 2nd fetching - all should succeed now
response[txHashes[2]] = true
response[txHashes[3]] = true
recs, err = rp.FetchReceipts(ctx, blockid, txHashes)
require.NoError(err)
require.NotNil(recs)
for i, rec := range recs {
requireEqualReceipt(t, receipts[i], rec)
}
require.Equal(3, numCalls)
}
func TestBasicRPCReceiptsFetcher_Concurrency(t *testing.T) {
require := require.New(t)
const numFetchers = 32
batchSize, txCount := 4, uint64(18) // 4.5 * 4
block, receipts := randomRpcBlockAndReceipts(rand.New(rand.NewSource(123)), txCount)
recMap := make(map[common.Hash]*types.Receipt, len(receipts))
for _, rec := range receipts {
recMap[rec.TxHash] = rec
}
mrpc := new(mockRPC)
rp := NewBasicRPCReceiptsFetcher(mrpc, batchSize)
// prepare mock
var numCalls int
mrpc.On("BatchCallContext", mock.Anything, mock.AnythingOfType("[]rpc.BatchElem")).
Run(func(args mock.Arguments) {
numCalls++
els := args.Get(1).([]rpc.BatchElem)
for _, el := range els {
if el.Method == "eth_getTransactionReceipt" {
txHash := el.Args[0].(common.Hash)
// The IterativeBatchCall expects that the values are written
// to the fields of the allocated *types.Receipt.
**(el.Result.(**types.Receipt)) = *recMap[txHash]
}
}
}).
Return([]error{nil})
runConcurrentFetchingTest(t, rp, numFetchers, receipts, block)
mrpc.AssertExpectations(t)
require.NotZero(numCalls, "BatchCallContext should have been called.")
require.Less(numCalls, numFetchers, "Some IterativeBatchCalls should have been shared.")
}
func runConcurrentFetchingTest(t *testing.T, rp ReceiptsProvider, numFetchers int, receipts types.Receipts, block *rpcBlock) {
require := require.New(t)
txHashes := receiptTxHashes(receipts)
// start n fetchers
type fetchResult struct {
rs types.Receipts
err error
}
fetchResults := make(chan fetchResult, numFetchers)
barrier := make(chan struct{})
ctx, done := context.WithTimeout(context.Background(), 10*time.Second)
defer done()
for i := 0; i < numFetchers; i++ {
go func() {
<-barrier
recs, err := rp.FetchReceipts(ctx, block.BlockID(), txHashes)
fetchResults <- fetchResult{rs: recs, err: err}
}()
}
close(barrier) // Go!
// assert results
for i := 0; i < numFetchers; i++ {
select {
case f := <-fetchResults:
require.NoError(f.err)
require.Len(f.rs, len(receipts))
for j, r := range receipts {
requireEqualReceipt(t, r, f.rs[j])
}
case <-ctx.Done():
t.Fatal("Test timeout")
}
}
}
func receiptTxHashes(receipts types.Receipts) []common.Hash {
txHashes := make([]common.Hash, 0, len(receipts))
for _, rec := range receipts {
txHashes = append(txHashes, rec.TxHash)
}
return txHashes
}
package sources
import (
"context"
"sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// A CachingReceiptsProvider caches successful receipt fetches from the inner
// ReceiptsProvider. It also avoids duplicate in-flight requests per block hash.
type CachingReceiptsProvider struct {
inner ReceiptsProvider
cache *caching.LRUCache[common.Hash, types.Receipts]
// lock fetching process for each block hash to avoid duplicate requests
fetching map[common.Hash]*sync.Mutex
fetchingMu sync.Mutex // only protects map
}
func NewCachingReceiptsProvider(inner ReceiptsProvider, m caching.Metrics, cacheSize int) *CachingReceiptsProvider {
return &CachingReceiptsProvider{
inner: inner,
cache: caching.NewLRUCache[common.Hash, types.Receipts](m, "receipts", cacheSize),
fetching: make(map[common.Hash]*sync.Mutex),
}
}
func NewCachingRPCReceiptsProvider(client rpcClient, log log.Logger, config RPCReceiptsConfig, m caching.Metrics, cacheSize int) *CachingReceiptsProvider {
return NewCachingReceiptsProvider(NewRPCReceiptsFetcher(client, log, config), m, cacheSize)
}
func (p *CachingReceiptsProvider) getOrCreateFetchingLock(blockHash common.Hash) *sync.Mutex {
p.fetchingMu.Lock()
defer p.fetchingMu.Unlock()
if mu, ok := p.fetching[blockHash]; ok {
return mu
}
mu := new(sync.Mutex)
p.fetching[blockHash] = mu
return mu
}
func (p *CachingReceiptsProvider) deleteFetchingLock(blockHash common.Hash) {
p.fetchingMu.Lock()
defer p.fetchingMu.Unlock()
delete(p.fetching, blockHash)
}
func (p *CachingReceiptsProvider) FetchReceipts(ctx context.Context, block eth.BlockID, txHashes []common.Hash) (types.Receipts, error) {
if r, ok := p.cache.Get(block.Hash); ok {
return r, nil
}
mu := p.getOrCreateFetchingLock(block.Hash)
mu.Lock()
defer mu.Unlock()
// Other routine might have fetched in the meantime
if r, ok := p.cache.Get(block.Hash); ok {
// we might have created a new lock above while the old
// fetching job completed.
p.deleteFetchingLock(block.Hash)
return r, nil
}
r, err := p.inner.FetchReceipts(ctx, block, txHashes)
if err != nil {
return nil, err
}
p.cache.Add(block.Hash, r)
// result now in cache, can delete fetching lock
p.deleteFetchingLock(block.Hash)
return r, nil
}
package sources
import (
"context"
"math/rand"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
type mockReceiptsProvider struct {
mock.Mock
}
func (m *mockReceiptsProvider) FetchReceipts(ctx context.Context, block eth.BlockID, txHashes []common.Hash) (types.Receipts, error) {
args := m.Called(ctx, block, txHashes)
return args.Get(0).(types.Receipts), args.Error(1)
}
func TestCachingReceiptsProvider_Caching(t *testing.T) {
block, receipts := randomRpcBlockAndReceipts(rand.New(rand.NewSource(69)), 4)
txHashes := receiptTxHashes(receipts)
blockid := block.BlockID()
mrp := new(mockReceiptsProvider)
rp := NewCachingReceiptsProvider(mrp, nil, 1)
ctx, done := context.WithTimeout(context.Background(), 10*time.Second)
defer done()
mrp.On("FetchReceipts", ctx, blockid, txHashes).
Return(types.Receipts(receipts), error(nil)).
Once() // receipts should be cached after first fetch
for i := 0; i < 4; i++ {
gotRecs, err := rp.FetchReceipts(ctx, blockid, txHashes)
require.NoError(t, err)
for i, gotRec := range gotRecs {
requireEqualReceipt(t, receipts[i], gotRec)
}
}
mrp.AssertExpectations(t)
}
func TestCachingReceiptsProvider_Concurrency(t *testing.T) {
block, receipts := randomRpcBlockAndReceipts(rand.New(rand.NewSource(69)), 4)
txHashes := receiptTxHashes(receipts)
blockid := block.BlockID()
mrp := new(mockReceiptsProvider)
rp := NewCachingReceiptsProvider(mrp, nil, 1)
mrp.On("FetchReceipts", mock.Anything, blockid, txHashes).
Return(types.Receipts(receipts), error(nil)).
Once() // receipts should be cached after first fetch
runConcurrentFetchingTest(t, rp, 32, receipts, block)
mrp.AssertExpectations(t)
}
package sources
import (
"context"
"fmt"
"time"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
func newRPCRecProviderFromConfig(client client.RPC, log log.Logger, metrics caching.Metrics, config *EthClientConfig) *CachingReceiptsProvider {
recCfg := RPCReceiptsConfig{
MaxBatchSize: config.MaxRequestsPerBatch,
ProviderKind: config.RPCProviderKind,
MethodResetDuration: config.MethodResetDuration,
}
return NewCachingRPCReceiptsProvider(client, log, recCfg, metrics, config.ReceiptsCacheSize)
}
type rpcClient interface {
CallContext(ctx context.Context, result any, method string, args ...any) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
}
type RPCReceiptsFetcher struct {
client rpcClient
basic *BasicRPCReceiptsFetcher
log log.Logger
provKind RPCProviderKind
// availableReceiptMethods tracks which receipt methods can be used for fetching receipts
// This may be modified concurrently, but we don't lock since it's a single
// uint64 that's not critical (fine to miss or mix up a modification)
availableReceiptMethods ReceiptsFetchingMethod
// lastMethodsReset tracks when availableReceiptMethods was last reset.
// When receipt-fetching fails it falls back to available methods,
// but periodically it will try to reset to the preferred optimal methods.
lastMethodsReset time.Time
// methodResetDuration defines how long we take till we reset lastMethodsReset
methodResetDuration time.Duration
}
type RPCReceiptsConfig struct {
MaxBatchSize int
ProviderKind RPCProviderKind
MethodResetDuration time.Duration
}
func NewRPCReceiptsFetcher(client rpcClient, log log.Logger, config RPCReceiptsConfig) *RPCReceiptsFetcher {
return &RPCReceiptsFetcher{
client: client,
basic: NewBasicRPCReceiptsFetcher(client, config.MaxBatchSize),
log: log,
provKind: config.ProviderKind,
availableReceiptMethods: AvailableReceiptsFetchingMethods(config.ProviderKind),
lastMethodsReset: time.Now(),
methodResetDuration: config.MethodResetDuration,
}
}
func (f *RPCReceiptsFetcher) FetchReceipts(ctx context.Context, block eth.BlockID, txHashes []common.Hash) (result types.Receipts, err error) {
m := f.PickReceiptsMethod(len(txHashes))
switch m {
case EthGetTransactionReceiptBatch:
result, err = f.basic.FetchReceipts(ctx, block, txHashes)
case AlchemyGetTransactionReceipts:
var tmp receiptsWrapper
err = f.client.CallContext(ctx, &tmp, "alchemy_getTransactionReceipts", blockHashParameter{BlockHash: block.Hash})
result = tmp.Receipts
case DebugGetRawReceipts:
var rawReceipts []hexutil.Bytes
err = f.client.CallContext(ctx, &rawReceipts, "debug_getRawReceipts", block.Hash)
if err == nil {
if len(rawReceipts) == len(txHashes) {
result, err = eth.DecodeRawReceipts(block, rawReceipts, txHashes)
} else {
err = fmt.Errorf("got %d raw receipts, but expected %d", len(rawReceipts), len(txHashes))
}
}
case ParityGetBlockReceipts:
err = f.client.CallContext(ctx, &result, "parity_getBlockReceipts", block.Hash)
case EthGetBlockReceipts:
err = f.client.CallContext(ctx, &result, "eth_getBlockReceipts", block.Hash)
case ErigonGetBlockReceiptsByBlockHash:
err = f.client.CallContext(ctx, &result, "erigon_getBlockReceiptsByBlockHash", block.Hash)
default:
err = fmt.Errorf("unknown receipt fetching method: %d", uint64(m))
}
if err != nil {
f.OnReceiptsMethodErr(m, err)
return nil, err
}
return
}
// receiptsWrapper is a decoding type util. Alchemy in particular wraps the receipts array result.
type receiptsWrapper struct {
Receipts []*types.Receipt `json:"receipts"`
}
func (f *RPCReceiptsFetcher) PickReceiptsMethod(txCount int) ReceiptsFetchingMethod {
txc := uint64(txCount)
if now := time.Now(); now.Sub(f.lastMethodsReset) > f.methodResetDuration {
m := AvailableReceiptsFetchingMethods(f.provKind)
if f.availableReceiptMethods != m {
f.log.Warn("resetting back RPC preferences, please review RPC provider kind setting", "kind", f.provKind.String())
}
f.availableReceiptMethods = m
f.lastMethodsReset = now
}
return PickBestReceiptsFetchingMethod(f.provKind, f.availableReceiptMethods, txc)
}
func (f *RPCReceiptsFetcher) OnReceiptsMethodErr(m ReceiptsFetchingMethod, err error) {
if unusableMethod(err) {
// clear the bit of the method that errored
f.availableReceiptMethods &^= m
f.log.Warn("failed to use selected RPC method for receipt fetching, temporarily falling back to alternatives",
"provider_kind", f.provKind, "failed_method", m, "fallback", f.availableReceiptMethods, "err", err)
} else {
f.log.Debug("failed to use selected RPC method for receipt fetching, but method does appear to be available, so we continue to use it",
"provider_kind", f.provKind, "failed_method", m, "fallback", f.availableReceiptMethods&^m, "err", err)
}
}
// Cost break-down sources:
// Alchemy: https://docs.alchemy.com/reference/compute-units
// QuickNode: https://www.quicknode.com/docs/ethereum/api_credits
// Infura: no pricing table available.
//
// Receipts are encoded the same everywhere:
//
// blockHash, blockNumber, transactionIndex, transactionHash, from, to, cumulativeGasUsed, gasUsed,
// contractAddress, logs, logsBloom, status, effectiveGasPrice, type.
//
// Note that Alchemy/Geth still have a "root" field for legacy reasons,
// but ethereum does not compute state-roots per tx anymore, so quicknode and others do not serve this data.
// RPCProviderKind identifies an RPC provider, used to hint at the optimal receipt fetching approach.
type RPCProviderKind string
const (
RPCKindAlchemy RPCProviderKind = "alchemy"
RPCKindQuickNode RPCProviderKind = "quicknode"
RPCKindInfura RPCProviderKind = "infura"
RPCKindParity RPCProviderKind = "parity"
RPCKindNethermind RPCProviderKind = "nethermind"
RPCKindDebugGeth RPCProviderKind = "debug_geth"
RPCKindErigon RPCProviderKind = "erigon"
RPCKindBasic RPCProviderKind = "basic" // try only the standard most basic receipt fetching
RPCKindAny RPCProviderKind = "any" // try any method available
RPCKindStandard RPCProviderKind = "standard" // try standard methods, including newer optimized standard RPC methods
)
var RPCProviderKinds = []RPCProviderKind{
RPCKindAlchemy,
RPCKindQuickNode,
RPCKindInfura,
RPCKindParity,
RPCKindNethermind,
RPCKindDebugGeth,
RPCKindErigon,
RPCKindBasic,
RPCKindAny,
RPCKindStandard,
}
func (kind RPCProviderKind) String() string {
return string(kind)
}
func (kind *RPCProviderKind) Set(value string) error {
if !ValidRPCProviderKind(RPCProviderKind(value)) {
return fmt.Errorf("unknown rpc kind: %q", value)
}
*kind = RPCProviderKind(value)
return nil
}
func (kind *RPCProviderKind) Clone() any {
cpy := *kind
return &cpy
}
func ValidRPCProviderKind(value RPCProviderKind) bool {
for _, k := range RPCProviderKinds {
if k == value {
return true
}
}
return false
}
// ReceiptsFetchingMethod is a bitfield with 1 bit for each receipts fetching type.
// Depending on errors, tx counts and preferences the code may select different sets of fetching methods.
type ReceiptsFetchingMethod uint64
func (r ReceiptsFetchingMethod) String() string {
out := ""
x := r
addMaybe := func(m ReceiptsFetchingMethod, v string) {
if x&m != 0 {
out += v
x ^= x & m
}
if x != 0 { // add separator if there are entries left
out += ", "
}
}
addMaybe(EthGetTransactionReceiptBatch, "eth_getTransactionReceipt (batched)")
addMaybe(AlchemyGetTransactionReceipts, "alchemy_getTransactionReceipts")
addMaybe(DebugGetRawReceipts, "debug_getRawReceipts")
addMaybe(ParityGetBlockReceipts, "parity_getBlockReceipts")
addMaybe(EthGetBlockReceipts, "eth_getBlockReceipts")
addMaybe(ErigonGetBlockReceiptsByBlockHash, "erigon_getBlockReceiptsByBlockHash")
addMaybe(^ReceiptsFetchingMethod(0), "unknown") // if anything is left, describe it as unknown
return out
}
const (
// EthGetTransactionReceiptBatch is standard per-tx receipt fetching with JSON-RPC batches.
// Available in: standard, everywhere.
// - Alchemy: 15 CU / tx
// - Quicknode: 2 credits / tx
// Method: eth_getTransactionReceipt
// See: https://ethereum.github.io/execution-apis/api-documentation/
EthGetTransactionReceiptBatch ReceiptsFetchingMethod = 1 << iota
// AlchemyGetTransactionReceipts is a special receipt fetching method provided by Alchemy.
// Available in:
// - Alchemy: 250 CU total
// Method: alchemy_getTransactionReceipts
// Params:
// - object with "blockNumber" or "blockHash" field
// Returns: "array of receipts" - docs lie, array is wrapped in a struct with single "receipts" field
// See: https://docs.alchemy.com/reference/alchemy-gettransactionreceipts#alchemy_gettransactionreceipts
AlchemyGetTransactionReceipts
// DebugGetRawReceipts is a debug method from Geth, faster by avoiding serialization and metadata overhead.
// Ideal for fast syncing from a local geth node.
// Available in:
// - Geth: free
// - QuickNode: 22 credits maybe? Unknown price, undocumented ("debug_getblockreceipts" exists in table though?)
// Method: debug_getRawReceipts
// Params:
// - string presenting a block number or hash
// Returns: list of strings, hex encoded RLP of receipts data. "consensus-encoding of all receipts in a single block"
// See: https://geth.ethereum.org/docs/rpc/ns-debug#debug_getrawreceipts
DebugGetRawReceipts
// ParityGetBlockReceipts is an old parity method, which has been adopted by Nethermind and some RPC providers.
// Available in:
// - Alchemy: 500 CU total
// - QuickNode: 59 credits - docs are wrong, not actually available anymore.
// - Any open-ethereum/parity legacy: free
// - Nethermind: free
// Method: parity_getBlockReceipts
// Params:
// Parity: "quantity or tag"
// Alchemy: string with block hash, number in hex, or block tag.
// Nethermind: very flexible: tag, number, hex or object with "requireCanonical"/"blockHash" fields.
// Returns: array of receipts
// See:
// - Parity: https://openethereum.github.io/JSONRPC-parity-module#parity_getblockreceipts
// - QuickNode: undocumented.
// - Alchemy: https://docs.alchemy.com/reference/eth-getblockreceipts
// - Nethermind: https://docs.nethermind.io/nethermind/ethereum-client/json-rpc/parity#parity_getblockreceipts
ParityGetBlockReceipts
// EthGetBlockReceipts is a previously non-standard receipt fetching method in the eth namespace,
// supported by some RPC platforms.
// This since has been standardized in https://github.com/ethereum/execution-apis/pull/438 and adopted in Geth:
// https://github.com/ethereum/go-ethereum/pull/27702
// Available in:
// - Alchemy: 500 CU total (and deprecated)
// - QuickNode: 59 credits total (does not seem to work with block hash arg, inaccurate docs)
// - Standard, incl. Geth, Besu and Reth, and Nethermind has a PR in review.
// Method: eth_getBlockReceipts
// Params:
// - QuickNode: string, "quantity or tag", docs say incl. block hash, but API does not actually accept it.
// - Alchemy: string, block hash / num (hex) / block tag
// Returns: array of receipts
// See:
// - QuickNode: https://www.quicknode.com/docs/ethereum/eth_getBlockReceipts
// - Alchemy: https://docs.alchemy.com/reference/eth-getblockreceipts
// Erigon has this available, but does not support block-hash argument to the method:
// https://github.com/ledgerwatch/erigon/blob/287a3d1d6c90fc6a7a088b5ae320f93600d5a167/cmd/rpcdaemon/commands/eth_receipts.go#L571
EthGetBlockReceipts
// ErigonGetBlockReceiptsByBlockHash is an Erigon-specific receipt fetching method,
// the same as EthGetBlockReceipts but supporting a block-hash argument.
// Available in:
// - Erigon
// Method: erigon_getBlockReceiptsByBlockHash
// Params:
// - Erigon: string, hex-encoded block hash
// Returns:
// - Erigon: array of json-ified receipts
// See:
// https://github.com/ledgerwatch/erigon/blob/287a3d1d6c90fc6a7a088b5ae320f93600d5a167/cmd/rpcdaemon/commands/erigon_receipts.go#LL391C24-L391C51
ErigonGetBlockReceiptsByBlockHash
// Other:
// - 250 credits, not supported, strictly worse than other options. In quicknode price-table.
// qn_getBlockWithReceipts - in price table, ? undocumented, but in quicknode "Single Flight RPC" description
// qn_getReceipts - in price table, ? undocumented, but in quicknode "Single Flight RPC" description
// debug_getBlockReceipts - ? undocumented, shows up in quicknode price table, not available.
)
// AvailableReceiptsFetchingMethods selects receipt fetching methods based on the RPC provider kind.
func AvailableReceiptsFetchingMethods(kind RPCProviderKind) ReceiptsFetchingMethod {
switch kind {
case RPCKindAlchemy:
return AlchemyGetTransactionReceipts | EthGetBlockReceipts | EthGetTransactionReceiptBatch
case RPCKindQuickNode:
return DebugGetRawReceipts | EthGetBlockReceipts | EthGetTransactionReceiptBatch
case RPCKindInfura:
// Infura is big, but sadly does not support more optimized receipts fetching methods (yet?)
return EthGetTransactionReceiptBatch
case RPCKindParity:
return ParityGetBlockReceipts | EthGetTransactionReceiptBatch
case RPCKindNethermind:
return ParityGetBlockReceipts | EthGetTransactionReceiptBatch
case RPCKindDebugGeth:
return DebugGetRawReceipts | EthGetTransactionReceiptBatch
case RPCKindErigon:
return ErigonGetBlockReceiptsByBlockHash | EthGetTransactionReceiptBatch
case RPCKindBasic:
return EthGetTransactionReceiptBatch
case RPCKindAny:
// if it's any kind of RPC provider, then try all methods
return AlchemyGetTransactionReceipts | EthGetBlockReceipts |
DebugGetRawReceipts | ErigonGetBlockReceiptsByBlockHash |
ParityGetBlockReceipts | EthGetTransactionReceiptBatch
case RPCKindStandard:
return EthGetBlockReceipts | EthGetTransactionReceiptBatch
default:
return EthGetTransactionReceiptBatch
}
}
// PickBestReceiptsFetchingMethod selects an RPC method that is still available,
// and optimal for fetching the given number of tx receipts from the specified provider kind.
func PickBestReceiptsFetchingMethod(kind RPCProviderKind, available ReceiptsFetchingMethod, txCount uint64) ReceiptsFetchingMethod {
// If we have optimized methods available, it makes sense to use them, but only if the cost is
// lower than fetching transactions one by one with the standard receipts RPC method.
if kind == RPCKindAlchemy {
if available&AlchemyGetTransactionReceipts != 0 && txCount > 250/15 {
return AlchemyGetTransactionReceipts
}
if available&EthGetBlockReceipts != 0 && txCount > 500/15 {
return EthGetBlockReceipts
}
return EthGetTransactionReceiptBatch
} else if kind == RPCKindQuickNode {
if available&DebugGetRawReceipts != 0 {
return DebugGetRawReceipts
}
if available&EthGetBlockReceipts != 0 && txCount > 59/2 {
return EthGetBlockReceipts
}
return EthGetTransactionReceiptBatch
}
// in order of preference (based on cost): check available methods
if available&AlchemyGetTransactionReceipts != 0 {
return AlchemyGetTransactionReceipts
}
if available&DebugGetRawReceipts != 0 {
return DebugGetRawReceipts
}
if available&ErigonGetBlockReceiptsByBlockHash != 0 {
return ErigonGetBlockReceiptsByBlockHash
}
if available&EthGetBlockReceipts != 0 {
return EthGetBlockReceipts
}
if available&ParityGetBlockReceipts != 0 {
return ParityGetBlockReceipts
}
// otherwise fall back on per-tx fetching
return EthGetTransactionReceiptBatch
}
...@@ -171,15 +171,13 @@ func (tc *ReceiptsTestCase) Run(t *testing.T) { ...@@ -171,15 +171,13 @@ func (tc *ReceiptsTestCase) Run(t *testing.T) {
for i, req := range requests { for i, req := range requests {
info, result, err := ethCl.FetchReceipts(context.Background(), block.Hash) info, result, err := ethCl.FetchReceipts(context.Background(), block.Hash)
if err == nil { if err == nil {
require.Nil(t, req.err, "error") require.NoError(t, req.err, "error")
require.Equal(t, block.Hash, info.Hash(), fmt.Sprintf("req %d blockhash", i)) require.Equal(t, block.Hash, info.Hash(), fmt.Sprintf("req %d blockhash", i))
expectedJson, err := json.MarshalIndent(req.result, "", " ") for j, rec := range req.result {
require.NoError(t, err) requireEqualReceipt(t, rec, result[j], "req %d result %d", i, j)
gotJson, err := json.MarshalIndent(result, "", " ") }
require.NoError(t, err)
require.Equal(t, string(expectedJson), string(gotJson), fmt.Sprintf("req %d result", i))
} else { } else {
require.NotNil(t, req.err, "error") require.Error(t, req.err, "error")
require.Equal(t, req.err.Error(), err.Error(), fmt.Sprintf("req %d err", i)) require.Equal(t, req.err.Error(), err.Error(), fmt.Sprintf("req %d err", i))
} }
} }
...@@ -570,3 +568,12 @@ func TestVerifyReceipts(t *testing.T) { ...@@ -570,3 +568,12 @@ func TestVerifyReceipts(t *testing.T) {
require.ErrorContains(t, err, "must never be removed due to reorg") require.ErrorContains(t, err, "must never be removed due to reorg")
}) })
} }
func requireEqualReceipt(t *testing.T, exp, act *types.Receipt, msgAndArgs ...any) {
t.Helper()
expJson, err := json.MarshalIndent(exp, "", " ")
require.NoError(t, err, msgAndArgs...)
actJson, err := json.MarshalIndent(act, "", " ")
require.NoError(t, err, msgAndArgs...)
require.Equal(t, string(expJson), string(actJson), msgAndArgs...)
}
...@@ -3,13 +3,17 @@ ...@@ -3,13 +3,17 @@
package sources package sources
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"unsafe" "unsafe"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
) )
/* /*
...@@ -63,3 +67,35 @@ func FetchRethReceipts(dbPath string, blockHash *common.Hash) (types.Receipts, e ...@@ -63,3 +67,35 @@ func FetchRethReceipts(dbPath string, blockHash *common.Hash) (types.Receipts, e
return receipts, nil return receipts, nil
} }
type RethDBReceiptsFetcher struct {
dbPath string
// TODO(8225): Now that we have reading from a Reth DB encapsulated here,
// We could store a reference to the RethDB here instead of just a db path,
// which would be more optimal.
// We could move the opening of the RethDB and creation of the db reference
// into NewRethDBReceiptsFetcher.
}
func NewRethDBReceiptsFetcher(dbPath string) *RethDBReceiptsFetcher {
return &RethDBReceiptsFetcher{
dbPath: dbPath,
}
}
func (f *RethDBReceiptsFetcher) FetchReceipts(ctx context.Context, block eth.BlockID, txHashes []common.Hash) (types.Receipts, error) {
return FetchRethReceipts(f.dbPath, &block.Hash)
}
func NewCachingRethDBReceiptsFetcher(dbPath string, m caching.Metrics, cacheSize int) *CachingReceiptsProvider {
return NewCachingReceiptsProvider(NewRethDBReceiptsFetcher(dbPath), m, cacheSize)
}
const buildRethdb = true
func newRecProviderFromConfig(client client.RPC, log log.Logger, metrics caching.Metrics, config *EthClientConfig) *CachingReceiptsProvider {
if dbPath := config.RethDBPath; dbPath != "" {
return NewCachingRethDBReceiptsFetcher(dbPath, metrics, config.ReceiptsCacheSize)
}
return newRPCRecProviderFromConfig(client, log, metrics, config)
}
...@@ -3,11 +3,13 @@ ...@@ -3,11 +3,13 @@
package sources package sources
import ( import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum/go-ethereum/log"
) )
// FetchRethReceipts stub; Not available without `rethdb` build tag. const buildRethdb = false
func FetchRethReceipts(dbPath string, blockHash *common.Hash) (types.Receipts, error) {
panic("unimplemented! Did you forget to enable the `rethdb` build tag?") func newRecProviderFromConfig(client client.RPC, log log.Logger, metrics caching.Metrics, config *EthClientConfig) *CachingReceiptsProvider {
return newRPCRecProviderFromConfig(client, log, metrics, config)
} }
...@@ -190,6 +190,13 @@ func (hdr *rpcHeader) Info(trustCache bool, mustBePostMerge bool) (eth.BlockInfo ...@@ -190,6 +190,13 @@ func (hdr *rpcHeader) Info(trustCache bool, mustBePostMerge bool) (eth.BlockInfo
return &headerInfo{hdr.Hash, hdr.createGethHeader()}, nil return &headerInfo{hdr.Hash, hdr.createGethHeader()}, nil
} }
func (hdr *rpcHeader) BlockID() eth.BlockID {
return eth.BlockID{
Hash: hdr.Hash,
Number: uint64(hdr.Number),
}
}
type rpcBlock struct { type rpcBlock struct {
rpcHeader rpcHeader
Transactions []*types.Transaction `json:"transactions"` Transactions []*types.Transaction `json:"transactions"`
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment