Commit a53411cb authored by Sebastian Stammler's avatar Sebastian Stammler

op-service: Refactor EthClient with ReceiptsProvider abstraction

parent 83b3f43b
......@@ -63,7 +63,9 @@ type EthClientConfig struct {
// If this is 0 then the client does not fall back to less optimal but available methods.
MethodResetDuration time.Duration
// [OPTIONAL] The reth DB path to fetch receipts from
// [OPTIONAL] The reth DB path to fetch receipts from.
// If it is specified, the rethdb receipts fetcher will be used
// and the RPC configuration parameters don't need to be set.
RethDBPath string
}
......@@ -80,6 +82,15 @@ func (c *EthClientConfig) Check() error {
if c.PayloadsCacheSize < 0 {
return fmt.Errorf("invalid payloads cache size: %d", c.PayloadsCacheSize)
}
if c.RethDBPath != "" {
if buildRethdb {
// If the rethdb path is set, we use the rethdb receipts fetcher and skip creating
// an RCP receipts fetcher, so below rpc config parameters don't need to be checked.
return nil
} else {
return fmt.Errorf("rethdb path specified, but built without rethdb support")
}
}
if c.MaxConcurrentRequests < 1 {
return fmt.Errorf("expected at least 1 concurrent request, but max is %d", c.MaxConcurrentRequests)
}
......@@ -96,21 +107,14 @@ func (c *EthClientConfig) Check() error {
type EthClient struct {
client client.RPC
maxBatchSize int
recProvider ReceiptsProvider
trustRPC bool
mustBePostMerge bool
provKind RPCProviderKind
log log.Logger
// cache receipts in bundles per block hash
// We cache the receipts fetching job to not lose progress when we have to retry the `Fetch` call
// common.Hash -> *receiptsFetchingJob
receiptsCache *caching.LRUCache[common.Hash, *receiptsFetchingJob]
// cache transactions in bundles per block hash
// common.Hash -> types.Transactions
transactionsCache *caching.LRUCache[common.Hash, types.Transactions]
......@@ -122,46 +126,6 @@ type EthClient struct {
// cache payloads by hash
// common.Hash -> *eth.ExecutionPayload
payloadsCache *caching.LRUCache[common.Hash, *eth.ExecutionPayload]
// availableReceiptMethods tracks which receipt methods can be used for fetching receipts
// This may be modified concurrently, but we don't lock since it's a single
// uint64 that's not critical (fine to miss or mix up a modification)
availableReceiptMethods ReceiptsFetchingMethod
// lastMethodsReset tracks when availableReceiptMethods was last reset.
// When receipt-fetching fails it falls back to available methods,
// but periodically it will try to reset to the preferred optimal methods.
lastMethodsReset time.Time
// methodResetDuration defines how long we take till we reset lastMethodsReset
methodResetDuration time.Duration
// [OPTIONAL] The reth DB path to fetch receipts from
rethDbPath string
}
func (s *EthClient) PickReceiptsMethod(txCount uint64) ReceiptsFetchingMethod {
if now := time.Now(); now.Sub(s.lastMethodsReset) > s.methodResetDuration {
m := AvailableReceiptsFetchingMethods(s.provKind)
if s.availableReceiptMethods != m {
s.log.Warn("resetting back RPC preferences, please review RPC provider kind setting", "kind", s.provKind.String())
}
s.availableReceiptMethods = m
s.lastMethodsReset = now
}
return PickBestReceiptsFetchingMethod(s.provKind, s.availableReceiptMethods, txCount)
}
func (s *EthClient) OnReceiptsMethodErr(m ReceiptsFetchingMethod, err error) {
if unusableMethod(err) {
// clear the bit of the method that errored
s.availableReceiptMethods &^= m
s.log.Warn("failed to use selected RPC method for receipt fetching, temporarily falling back to alternatives",
"provider_kind", s.provKind, "failed_method", m, "fallback", s.availableReceiptMethods, "err", err)
} else {
s.log.Debug("failed to use selected RPC method for receipt fetching, but method does appear to be available, so we continue to use it",
"provider_kind", s.provKind, "failed_method", m, "fallback", s.availableReceiptMethods&^m, "err", err)
}
}
// NewEthClient returns an [EthClient], wrapping an RPC with bindings to fetch ethereum data with added error logging,
......@@ -170,22 +134,18 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co
if err := config.Check(); err != nil {
return nil, fmt.Errorf("bad config, cannot create L1 source: %w", err)
}
client = LimitRPC(client, config.MaxConcurrentRequests)
recProvider := newRecProviderFromConfig(client, log, metrics, config)
return &EthClient{
client: client,
maxBatchSize: config.MaxRequestsPerBatch,
trustRPC: config.TrustRPC,
mustBePostMerge: config.MustBePostMerge,
provKind: config.RPCProviderKind,
log: log,
receiptsCache: caching.NewLRUCache[common.Hash, *receiptsFetchingJob](metrics, "receipts", config.ReceiptsCacheSize),
transactionsCache: caching.NewLRUCache[common.Hash, types.Transactions](metrics, "txs", config.TransactionsCacheSize),
headersCache: caching.NewLRUCache[common.Hash, eth.BlockInfo](metrics, "headers", config.HeadersCacheSize),
payloadsCache: caching.NewLRUCache[common.Hash, *eth.ExecutionPayload](metrics, "payloads", config.PayloadsCacheSize),
availableReceiptMethods: AvailableReceiptsFetchingMethods(config.RPCProviderKind),
lastMethodsReset: time.Now(),
methodResetDuration: config.MethodResetDuration,
rethDbPath: config.RethDBPath,
client: client,
recProvider: recProvider,
trustRPC: config.TrustRPC,
mustBePostMerge: config.MustBePostMerge,
log: log,
transactionsCache: caching.NewLRUCache[common.Hash, types.Transactions](metrics, "txs", config.TransactionsCacheSize),
headersCache: caching.NewLRUCache[common.Hash, eth.BlockInfo](metrics, "headers", config.HeadersCacheSize),
payloadsCache: caching.NewLRUCache[common.Hash, *eth.ExecutionPayload](metrics, "payloads", config.PayloadsCacheSize),
}, nil
}
......@@ -354,24 +314,21 @@ func (s *EthClient) PayloadByLabel(ctx context.Context, label eth.BlockLabel) (*
func (s *EthClient) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) {
info, txs, err := s.InfoAndTxsByHash(ctx, blockHash)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("querying block: %w", err)
}
// Try to reuse the receipts fetcher because is caches the results of intermediate calls. This means
// that if just one of many calls fail, we only retry the failed call rather than all of the calls.
// The underlying fetcher uses the receipts hash to verify receipt integrity.
var job *receiptsFetchingJob
if v, ok := s.receiptsCache.Get(blockHash); ok {
job = v
} else {
txHashes := eth.TransactionsToHashes(txs)
job = NewReceiptsFetchingJob(s, s.client, s.maxBatchSize, eth.ToBlockID(info), info.ReceiptHash(), txHashes, s.rethDbPath)
s.receiptsCache.Add(blockHash, job)
}
receipts, err := job.Fetch(ctx)
txHashes, block := eth.TransactionsToHashes(txs), eth.ToBlockID(info)
receipts, err := s.recProvider.FetchReceipts(ctx, block, txHashes)
if err != nil {
return nil, nil, err
}
if !s.trustRPC {
if err := validateReceipts(block, info.ReceiptHash(), txHashes, receipts); err != nil {
return info, nil, fmt.Errorf("invalid receipts: %w", err)
}
}
return info, receipts, nil
}
......
This diff is collapsed.
package sources
import (
"context"
"io"
"sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)
type receiptsBatchCall = batching.IterativeBatchCall[common.Hash, *types.Receipt]
type BasicRPCReceiptsFetcher struct {
client rpcClient
maxBatchSize int
// calls caches uncompleted batch calls
calls map[common.Hash]*receiptsBatchCall
callsMu sync.Mutex
}
func NewBasicRPCReceiptsFetcher(client rpcClient, maxBatchSize int) *BasicRPCReceiptsFetcher {
return &BasicRPCReceiptsFetcher{
client: client,
maxBatchSize: maxBatchSize,
calls: make(map[common.Hash]*receiptsBatchCall),
}
}
func (f *BasicRPCReceiptsFetcher) FetchReceipts(ctx context.Context, block eth.BlockID, txHashes []common.Hash) (types.Receipts, error) {
call := f.getOrCreateBatchCall(block.Hash, txHashes)
// Fetch all receipts
for {
if err := call.Fetch(ctx); err == io.EOF {
break
} else if err != nil {
return nil, err
}
}
res, err := call.Result()
if err != nil {
return nil, err
}
// call successful, remove from cache
f.deleteBatchCall(block.Hash)
return res, nil
}
func (f *BasicRPCReceiptsFetcher) getOrCreateBatchCall(blockHash common.Hash, txHashes []common.Hash) *receiptsBatchCall {
f.callsMu.Lock()
defer f.callsMu.Unlock()
if call, ok := f.calls[blockHash]; ok {
return call
}
call := batching.NewIterativeBatchCall[common.Hash, *types.Receipt](
txHashes,
makeReceiptRequest,
f.client.BatchCallContext,
f.client.CallContext,
f.maxBatchSize,
)
f.calls[blockHash] = call
return call
}
func (f *BasicRPCReceiptsFetcher) deleteBatchCall(blockHash common.Hash) {
f.callsMu.Lock()
defer f.callsMu.Unlock()
delete(f.calls, blockHash)
}
func makeReceiptRequest(txHash common.Hash) (*types.Receipt, rpc.BatchElem) {
out := new(types.Receipt)
return out, rpc.BatchElem{
Method: "eth_getTransactionReceipt",
Args: []any{txHash},
Result: &out, // receipt may become nil, double pointer is intentional
}
}
package sources
import (
"context"
"sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// A CachingReceiptsProvider caches successful receipt fetches from the inner
// ReceiptsProvider. It also avoids duplicate in-flight requests per block hash.
type CachingReceiptsProvider struct {
inner ReceiptsProvider
cache *caching.LRUCache[common.Hash, types.Receipts]
// lock fetching process for each block hash to avoid duplicate requests
fetching map[common.Hash]*sync.Mutex
fetchingMu sync.Mutex // only protects map
}
func NewCachingReceiptsProvider(inner ReceiptsProvider, m caching.Metrics, cacheSize int) *CachingReceiptsProvider {
return &CachingReceiptsProvider{
inner: inner,
cache: caching.NewLRUCache[common.Hash, types.Receipts](m, "receipts", cacheSize),
fetching: make(map[common.Hash]*sync.Mutex),
}
}
func NewCachingRPCReceiptsProvider(client rpcClient, log log.Logger, config RPCReceiptsConfig, m caching.Metrics, cacheSize int) *CachingReceiptsProvider {
return NewCachingReceiptsProvider(NewRPCReceiptsFetcher(client, log, config), m, cacheSize)
}
func (p *CachingReceiptsProvider) getOrCreateFetchingLock(blockHash common.Hash) *sync.Mutex {
p.fetchingMu.Lock()
defer p.fetchingMu.Unlock()
if mu, ok := p.fetching[blockHash]; ok {
return mu
}
mu := new(sync.Mutex)
p.fetching[blockHash] = mu
return mu
}
func (p *CachingReceiptsProvider) deleteFetchingLock(blockHash common.Hash) {
p.fetchingMu.Lock()
defer p.fetchingMu.Unlock()
delete(p.fetching, blockHash)
}
func (p *CachingReceiptsProvider) FetchReceipts(ctx context.Context, block eth.BlockID, txHashes []common.Hash) (types.Receipts, error) {
if r, ok := p.cache.Get(block.Hash); ok {
return r, nil
}
mu := p.getOrCreateFetchingLock(block.Hash)
mu.Lock()
defer mu.Unlock()
// Other routine might have fetched in the meantime
if r, ok := p.cache.Get(block.Hash); ok {
// we might have created a new lock above while the old
// fetching job completed.
p.deleteFetchingLock(block.Hash)
return r, nil
}
r, err := p.inner.FetchReceipts(ctx, block, txHashes)
if err != nil {
return nil, err
}
p.cache.Add(block.Hash, r)
// result now in cache, can delete fetching lock
p.deleteFetchingLock(block.Hash)
return r, nil
}
This diff is collapsed.
......@@ -3,13 +3,17 @@
package sources
import (
"context"
"encoding/json"
"fmt"
"unsafe"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
/*
......@@ -63,3 +67,35 @@ func FetchRethReceipts(dbPath string, blockHash *common.Hash) (types.Receipts, e
return receipts, nil
}
type RethDBReceiptsFetcher struct {
dbPath string
// TODO(8225): Now that we have reading from a Reth DB encapsulated here,
// We could store a reference to the RethDB here instead of just a db path,
// which would be more optimal.
// We could move the opening of the RethDB and creation of the db reference
// into NewRethDBReceiptsFetcher.
}
func NewRethDBReceiptsFetcher(dbPath string) *RethDBReceiptsFetcher {
return &RethDBReceiptsFetcher{
dbPath: dbPath,
}
}
func (f *RethDBReceiptsFetcher) FetchReceipts(ctx context.Context, block eth.BlockID, txHashes []common.Hash) (types.Receipts, error) {
return FetchRethReceipts(f.dbPath, &block.Hash)
}
func NewCachingRethDBReceiptsFetcher(dbPath string, m caching.Metrics, cacheSize int) *CachingReceiptsProvider {
return NewCachingReceiptsProvider(NewRethDBReceiptsFetcher(dbPath), m, cacheSize)
}
const buildRethdb = true
func newRecProviderFromConfig(client client.RPC, log log.Logger, metrics caching.Metrics, config *EthClientConfig) *CachingReceiptsProvider {
if dbPath := config.RethDBPath; dbPath != "" {
return NewCachingRethDBReceiptsFetcher(dbPath, metrics, config.ReceiptsCacheSize)
}
return newRPCRecProviderFromConfig(client, log, metrics, config)
}
......@@ -3,11 +3,13 @@
package sources
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum/go-ethereum/log"
)
// FetchRethReceipts stub; Not available without `rethdb` build tag.
func FetchRethReceipts(dbPath string, blockHash *common.Hash) (types.Receipts, error) {
panic("unimplemented! Did you forget to enable the `rethdb` build tag?")
const buildRethdb = false
func newRecProviderFromConfig(client client.RPC, log log.Logger, metrics caching.Metrics, config *EthClientConfig) *CachingReceiptsProvider {
return newRPCRecProviderFromConfig(client, log, metrics, config)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment