Commit fafc3b37 authored by protolambda's avatar protolambda Committed by GitHub

op-node,op-e2e: optimize L1 receipts fetching by supporting different methods (#4494)

* op-node,op-e2e: optimize receipts fetching by supporting different kinds of rpc providers and receipts-fetching method alternatives

* op-node: rpc provider kinds / fetching methods review fixes

* op-e2e: add missing arg - fix lint
parent a1ced6ad
...@@ -169,7 +169,7 @@ func (s *L1Replica) RPCClient() client.RPC { ...@@ -169,7 +169,7 @@ func (s *L1Replica) RPCClient() client.RPC {
} }
func (s *L1Replica) L1Client(t Testing, cfg *rollup.Config) *sources.L1Client { func (s *L1Replica) L1Client(t Testing, cfg *rollup.Config) *sources.L1Client {
l1F, err := sources.NewL1Client(s.RPCClient(), s.log, nil, sources.L1ClientDefaultConfig(cfg, false)) l1F, err := sources.NewL1Client(s.RPCClient(), s.log, nil, sources.L1ClientDefaultConfig(cfg, false, sources.RPCKindBasic))
require.NoError(t, err) require.NoError(t, err)
return l1F return l1F
} }
......
...@@ -39,7 +39,7 @@ func TestL1Replica_ActL1RPCFail(gt *testing.T) { ...@@ -39,7 +39,7 @@ func TestL1Replica_ActL1RPCFail(gt *testing.T) {
// mock an RPC failure // mock an RPC failure
replica.ActL1RPCFail(t) replica.ActL1RPCFail(t)
// check RPC failure // check RPC failure
l1Cl, err := sources.NewL1Client(replica.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false)) l1Cl, err := sources.NewL1Client(replica.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindBasic))
require.NoError(t, err) require.NoError(t, err)
_, err = l1Cl.InfoByLabel(t.Ctx(), eth.Unsafe) _, err = l1Cl.InfoByLabel(t.Ctx(), eth.Unsafe)
require.ErrorContains(t, err, "mock") require.ErrorContains(t, err, "mock")
......
...@@ -21,7 +21,7 @@ func setupSequencerTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1M ...@@ -21,7 +21,7 @@ func setupSequencerTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1M
miner := NewL1Miner(t, log, sd.L1Cfg) miner := NewL1Miner(t, log, sd.L1Cfg)
l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false)) l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindBasic))
require.NoError(t, err) require.NoError(t, err)
engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath)
l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) l2Cl, err := sources.NewEngineClient(engine.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
......
...@@ -285,7 +285,7 @@ func TestRestartOpGeth(gt *testing.T) { ...@@ -285,7 +285,7 @@ func TestRestartOpGeth(gt *testing.T) {
jwtPath := e2eutils.WriteDefaultJWT(t) jwtPath := e2eutils.WriteDefaultJWT(t)
// L1 // L1
miner := NewL1Miner(t, log, sd.L1Cfg) miner := NewL1Miner(t, log, sd.L1Cfg)
l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false)) l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindBasic))
require.NoError(t, err) require.NoError(t, err)
// Sequencer // Sequencer
seqEng := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, dbOption) seqEng := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath, dbOption)
...@@ -380,7 +380,7 @@ func TestConflictingL2Blocks(gt *testing.T) { ...@@ -380,7 +380,7 @@ func TestConflictingL2Blocks(gt *testing.T) {
altSeqEng := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) altSeqEng := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath)
altSeqEngCl, err := sources.NewEngineClient(altSeqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) altSeqEngCl, err := sources.NewEngineClient(altSeqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg))
require.NoError(t, err) require.NoError(t, err)
l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false)) l1F, err := sources.NewL1Client(miner.RPCClient(), log, nil, sources.L1ClientDefaultConfig(sd.RollupCfg, false, sources.RPCKindBasic))
require.NoError(t, err) require.NoError(t, err)
altSequencer := NewL2Sequencer(t, log, l1F, altSeqEngCl, sd.RollupCfg, 0) altSequencer := NewL2Sequencer(t, log, l1F, altSeqEngCl, sd.RollupCfg, 0)
altBatcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ altBatcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"time" "time"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-node/sources"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer" l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
...@@ -268,6 +269,7 @@ func TestMigration(t *testing.T) { ...@@ -268,6 +269,7 @@ func TestMigration(t *testing.T) {
L1: &node.L1EndpointConfig{ L1: &node.L1EndpointConfig{
L1NodeAddr: forkedL1URL, L1NodeAddr: forkedL1URL,
L1TrustRPC: false, L1TrustRPC: false,
L1RPCKind: sources.RPCKindBasic,
}, },
L2: &node.L2EndpointConfig{ L2: &node.L2EndpointConfig{
L2EngineAddr: gethNode.HTTPAuthEndpoint(), L2EngineAddr: gethNode.HTTPAuthEndpoint(),
......
...@@ -32,6 +32,7 @@ import ( ...@@ -32,6 +32,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer" l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
...@@ -368,6 +369,7 @@ func (cfg SystemConfig) Start() (*System, error) { ...@@ -368,6 +369,7 @@ func (cfg SystemConfig) Start() (*System, error) {
rollupCfg.L1 = &rollupNode.L1EndpointConfig{ rollupCfg.L1 = &rollupNode.L1EndpointConfig{
L1NodeAddr: l1EndpointConfig, L1NodeAddr: l1EndpointConfig,
L1TrustRPC: false, L1TrustRPC: false,
L1RPCKind: sources.RPCKindBasic,
} }
rollupCfg.L2 = &rollupNode.L2EndpointConfig{ rollupCfg.L2 = &rollupNode.L2EndpointConfig{
L2EngineAddr: l2EndpointConfig, L2EngineAddr: l2EndpointConfig,
......
...@@ -2,9 +2,7 @@ package eth ...@@ -2,9 +2,7 @@ package eth
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io"
"math/big" "math/big"
"reflect" "reflect"
...@@ -281,42 +279,6 @@ type ForkchoiceUpdatedResult struct { ...@@ -281,42 +279,6 @@ type ForkchoiceUpdatedResult struct {
PayloadID *PayloadID `json:"payloadId"` PayloadID *PayloadID `json:"payloadId"`
} }
// ReceiptsFetcher fetches receipts of a block,
// and enables the caller to parallelize fetching and backoff on fetching errors as needed.
type ReceiptsFetcher interface {
// Reset clears the previously fetched results for a fresh re-attempt.
Reset()
// Fetch retrieves receipts in batches, until it returns io.EOF to indicate completion.
Fetch(ctx context.Context) error
// Complete indicates when all data has been fetched.
Complete() bool
// Result returns the receipts, or an error if the Fetch-ing is not Complete,
// or an error if the results are invalid.
// If an error is returned, the fetcher is Reset automatically.
Result() (types.Receipts, error)
}
// FetchedReceipts is a simple util to implement the ReceiptsFetcher with readily available receipts.
type FetchedReceipts types.Receipts
func (f FetchedReceipts) Reset() {
// nothing to reset
}
func (f FetchedReceipts) Fetch(ctx context.Context) error {
return io.EOF
}
func (f FetchedReceipts) Complete() bool {
return true
}
func (f FetchedReceipts) Result() (types.Receipts, error) {
return types.Receipts(f), nil
}
var _ ReceiptsFetcher = (FetchedReceipts)(nil)
// SystemConfig represents the rollup system configuration that carries over in every L2 block, // SystemConfig represents the rollup system configuration that carries over in every L2 block,
// and may be changed through L1 system config events. // and may be changed through L1 system config events.
// The initial SystemConfig at rollup genesis is embedded in the rollup configuration. // The initial SystemConfig at rollup genesis is embedded in the rollup configuration.
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/urfave/cli" "github.com/urfave/cli"
) )
...@@ -63,6 +64,16 @@ var ( ...@@ -63,6 +64,16 @@ var (
Usage: "Trust the L1 RPC, sync faster at risk of malicious/buggy RPC providing bad or inconsistent L1 data", Usage: "Trust the L1 RPC, sync faster at risk of malicious/buggy RPC providing bad or inconsistent L1 data",
EnvVar: prefixEnvVar("L1_TRUST_RPC"), EnvVar: prefixEnvVar("L1_TRUST_RPC"),
} }
L1RPCProviderKind = cli.GenericFlag{
Name: "l1.rpckind",
Usage: "The kind of RPC provider, used to inform optimal transactions receipts fetching, and thus reduce costs. Valid options: " +
EnumString[sources.RPCProviderKind](sources.RPCProviderKinds),
EnvVar: prefixEnvVar("L1_RPC_KIND"),
Value: func() *sources.RPCProviderKind {
out := sources.RPCKindBasic
return &out
}(),
}
L2EngineJWTSecret = cli.StringFlag{ L2EngineJWTSecret = cli.StringFlag{
Name: "l2.jwt-secret", Name: "l2.jwt-secret",
Usage: "Path to JWT secret key. Keys are 32 bytes, hex encoded in a file. A new key will be generated if left empty.", Usage: "Path to JWT secret key. Keys are 32 bytes, hex encoded in a file. A new key will be generated if left empty.",
...@@ -182,6 +193,7 @@ var optionalFlags = append([]cli.Flag{ ...@@ -182,6 +193,7 @@ var optionalFlags = append([]cli.Flag{
RollupConfig, RollupConfig,
Network, Network,
L1TrustRPC, L1TrustRPC,
L1RPCProviderKind,
L2EngineJWTSecret, L2EngineJWTSecret,
VerifierL1Confs, VerifierL1Confs,
SequencerEnabledFlag, SequencerEnabledFlag,
......
package flags
import (
"fmt"
"strings"
)
func EnumString[T fmt.Stringer](values []T) string {
var out strings.Builder
for i, v := range values {
out.WriteString(v.String())
if i+1 < len(values) {
out.WriteString(", ")
}
}
return out.String()
}
...@@ -6,6 +6,8 @@ import ( ...@@ -6,6 +6,8 @@ import (
"fmt" "fmt"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
gn "github.com/ethereum/go-ethereum/node" gn "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
...@@ -19,7 +21,9 @@ type L2EndpointSetup interface { ...@@ -19,7 +21,9 @@ type L2EndpointSetup interface {
type L1EndpointSetup interface { type L1EndpointSetup interface {
// Setup a RPC client to a L1 node to pull rollup input-data from. // Setup a RPC client to a L1 node to pull rollup input-data from.
Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) // The results of the RPC client may be trusted for faster processing, or strictly validated.
// The kind of the RPC may be non-basic, to optimize RPC usage.
Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, kind sources.RPCProviderKind, err error)
} }
type L2EndpointConfig struct { type L2EndpointConfig struct {
...@@ -78,26 +82,31 @@ type L1EndpointConfig struct { ...@@ -78,26 +82,31 @@ type L1EndpointConfig struct {
// against block hashes, or cached transaction sender addresses. // against block hashes, or cached transaction sender addresses.
// Thus we can sync faster at the risk of the source RPC being wrong. // Thus we can sync faster at the risk of the source RPC being wrong.
L1TrustRPC bool L1TrustRPC bool
// L1RPCKind identifies the RPC provider kind that serves the RPC,
// to inform the optimal usage of the RPC for transaction receipts fetching.
L1RPCKind sources.RPCProviderKind
} }
var _ L1EndpointSetup = (*L1EndpointConfig)(nil) var _ L1EndpointSetup = (*L1EndpointConfig)(nil)
func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) { func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, kind sources.RPCProviderKind, err error) {
l1Node, err := client.NewRPC(ctx, log, cfg.L1NodeAddr) l1Node, err := client.NewRPC(ctx, log, cfg.L1NodeAddr)
if err != nil { if err != nil {
return nil, false, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err) return nil, false, sources.RPCKindBasic, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err)
} }
return l1Node, cfg.L1TrustRPC, nil return l1Node, cfg.L1TrustRPC, cfg.L1RPCKind, nil
} }
// PreparedL1Endpoint enables testing with an in-process pre-setup RPC connection to L1 // PreparedL1Endpoint enables testing with an in-process pre-setup RPC connection to L1
type PreparedL1Endpoint struct { type PreparedL1Endpoint struct {
Client client.RPC Client client.RPC
TrustRPC bool TrustRPC bool
RPCProviderKind sources.RPCProviderKind
} }
var _ L1EndpointSetup = (*PreparedL1Endpoint)(nil) var _ L1EndpointSetup = (*PreparedL1Endpoint)(nil)
func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) { func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, kind sources.RPCProviderKind, err error) {
return p.Client, p.TrustRPC, nil return p.Client, p.TrustRPC, p.RPCProviderKind, nil
} }
...@@ -112,14 +112,14 @@ func (n *OpNode) initTracer(ctx context.Context, cfg *Config) error { ...@@ -112,14 +112,14 @@ func (n *OpNode) initTracer(ctx context.Context, cfg *Config) error {
} }
func (n *OpNode) initL1(ctx context.Context, cfg *Config) error { func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
l1Node, trustRPC, err := cfg.L1.Setup(ctx, n.log) l1Node, trustRPC, rpcProvKind, err := cfg.L1.Setup(ctx, n.log)
if err != nil { if err != nil {
return fmt.Errorf("failed to get L1 RPC client: %w", err) return fmt.Errorf("failed to get L1 RPC client: %w", err)
} }
n.l1Source, err = sources.NewL1Client( n.l1Source, err = sources.NewL1Client(
client.NewInstrumentedRPC(l1Node, n.metrics), n.log, n.metrics.L1SourceCache, client.NewInstrumentedRPC(l1Node, n.metrics), n.log, n.metrics.L1SourceCache,
sources.L1ClientDefaultConfig(&cfg.Rollup, trustRPC)) sources.L1ClientDefaultConfig(&cfg.Rollup, trustRPC, rpcProvKind))
if err != nil { if err != nil {
return fmt.Errorf("failed to create L1 source: %w", err) return fmt.Errorf("failed to create L1 source: %w", err)
} }
......
...@@ -9,17 +9,19 @@ import ( ...@@ -9,17 +9,19 @@ import (
"strings" "strings"
"github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/flags" "github.com/ethereum-optimism/optimism/op-node/flags"
"github.com/ethereum-optimism/optimism/op-node/node" "github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
) )
// NewConfig creates a Config from the provided flags or environment variables. // NewConfig creates a Config from the provided flags or environment variables.
...@@ -97,6 +99,7 @@ func NewL1EndpointConfig(ctx *cli.Context) (*node.L1EndpointConfig, error) { ...@@ -97,6 +99,7 @@ func NewL1EndpointConfig(ctx *cli.Context) (*node.L1EndpointConfig, error) {
return &node.L1EndpointConfig{ return &node.L1EndpointConfig{
L1NodeAddr: ctx.GlobalString(flags.L1NodeAddr.Name), L1NodeAddr: ctx.GlobalString(flags.L1NodeAddr.Name),
L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name), L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name),
L1RPCKind: sources.RPCProviderKind(strings.ToLower(ctx.GlobalString(flags.L1RPCProviderKind.Name))),
}, nil }, nil
} }
......
...@@ -15,7 +15,7 @@ import ( ...@@ -15,7 +15,7 @@ import (
// IterativeBatchCall is an util to create a job to fetch many RPC requests in batches, // IterativeBatchCall is an util to create a job to fetch many RPC requests in batches,
// and enable the caller to parallelize easily and safely, handle and re-try errors, // and enable the caller to parallelize easily and safely, handle and re-try errors,
// and pick a batch size all by simply calling Fetch again and again until it returns io.EOF. // and pick a batch size all by simply calling Fetch again and again until it returns io.EOF.
type IterativeBatchCall[K any, V any, O any] struct { type IterativeBatchCall[K any, V any] struct {
completed uint32 // tracks how far to completing all requests we are completed uint32 // tracks how far to completing all requests we are
resetLock sync.RWMutex // ensures we do not concurrently read (incl. fetch) / reset resetLock sync.RWMutex // ensures we do not concurrently read (incl. fetch) / reset
...@@ -23,23 +23,19 @@ type IterativeBatchCall[K any, V any, O any] struct { ...@@ -23,23 +23,19 @@ type IterativeBatchCall[K any, V any, O any] struct {
batchSize int batchSize int
makeRequest func(K) (V, rpc.BatchElem) makeRequest func(K) (V, rpc.BatchElem)
makeResults func([]K, []V) (O, error)
getBatch BatchCallContextFn getBatch BatchCallContextFn
requestsValues []V requestsValues []V
scheduled chan rpc.BatchElem scheduled chan rpc.BatchElem
results *O
} }
// NewIterativeBatchCall constructs a batch call, fetching the values with the given keys, // NewIterativeBatchCall constructs a batch call, fetching the values with the given keys,
// and transforms them into a verified final result. // and transforms them into a verified final result.
func NewIterativeBatchCall[K any, V any, O any]( func NewIterativeBatchCall[K any, V any](
requestsKeys []K, requestsKeys []K,
makeRequest func(K) (V, rpc.BatchElem), makeRequest func(K) (V, rpc.BatchElem),
makeResults func([]K, []V) (O, error),
getBatch BatchCallContextFn, getBatch BatchCallContextFn,
batchSize int) *IterativeBatchCall[K, V, O] { batchSize int) *IterativeBatchCall[K, V] {
if len(requestsKeys) < batchSize { if len(requestsKeys) < batchSize {
batchSize = len(requestsKeys) batchSize = len(requestsKeys)
...@@ -48,20 +44,19 @@ func NewIterativeBatchCall[K any, V any, O any]( ...@@ -48,20 +44,19 @@ func NewIterativeBatchCall[K any, V any, O any](
batchSize = 1 batchSize = 1
} }
out := &IterativeBatchCall[K, V, O]{ out := &IterativeBatchCall[K, V]{
completed: 0, completed: 0,
getBatch: getBatch, getBatch: getBatch,
requestsKeys: requestsKeys, requestsKeys: requestsKeys,
batchSize: batchSize, batchSize: batchSize,
makeRequest: makeRequest, makeRequest: makeRequest,
makeResults: makeResults,
} }
out.Reset() out.Reset()
return out return out
} }
// Reset will clear the batch call, to start fetching all contents from scratch. // Reset will clear the batch call, to start fetching all contents from scratch.
func (ibc *IterativeBatchCall[K, V, O]) Reset() { func (ibc *IterativeBatchCall[K, V]) Reset() {
ibc.resetLock.Lock() ibc.resetLock.Lock()
defer ibc.resetLock.Unlock() defer ibc.resetLock.Unlock()
...@@ -85,7 +80,7 @@ func (ibc *IterativeBatchCall[K, V, O]) Reset() { ...@@ -85,7 +80,7 @@ func (ibc *IterativeBatchCall[K, V, O]) Reset() {
// This method is safe to call concurrently: it will parallelize the fetching work. // This method is safe to call concurrently: it will parallelize the fetching work.
// If no work is available, but the fetching is not done yet, // If no work is available, but the fetching is not done yet,
// then Fetch will block until the next thing can be fetched, or until the context expires. // then Fetch will block until the next thing can be fetched, or until the context expires.
func (ibc *IterativeBatchCall[K, V, O]) Fetch(ctx context.Context) error { func (ibc *IterativeBatchCall[K, V]) Fetch(ctx context.Context) error {
ibc.resetLock.RLock() ibc.resetLock.RLock()
defer ibc.resetLock.RUnlock() defer ibc.resetLock.RUnlock()
...@@ -150,7 +145,7 @@ func (ibc *IterativeBatchCall[K, V, O]) Fetch(ctx context.Context) error { ...@@ -150,7 +145,7 @@ func (ibc *IterativeBatchCall[K, V, O]) Fetch(ctx context.Context) error {
} }
// Complete indicates if the batch call is done. // Complete indicates if the batch call is done.
func (ibc *IterativeBatchCall[K, V, O]) Complete() bool { func (ibc *IterativeBatchCall[K, V]) Complete() bool {
ibc.resetLock.RLock() ibc.resetLock.RLock()
defer ibc.resetLock.RUnlock() defer ibc.resetLock.RUnlock()
return atomic.LoadUint32(&ibc.completed) >= uint32(len(ibc.requestsKeys)) return atomic.LoadUint32(&ibc.completed) >= uint32(len(ibc.requestsKeys))
...@@ -158,27 +153,12 @@ func (ibc *IterativeBatchCall[K, V, O]) Complete() bool { ...@@ -158,27 +153,12 @@ func (ibc *IterativeBatchCall[K, V, O]) Complete() bool {
// Result returns the fetched values, checked and transformed to the final output type, if available. // Result returns the fetched values, checked and transformed to the final output type, if available.
// If the check fails, the IterativeBatchCall will Reset itself, to be ready for a re-attempt in fetching new data. // If the check fails, the IterativeBatchCall will Reset itself, to be ready for a re-attempt in fetching new data.
func (ibc *IterativeBatchCall[K, V, O]) Result() (O, error) { func (ibc *IterativeBatchCall[K, V]) Result() ([]V, error) {
ibc.resetLock.RLock() ibc.resetLock.RLock()
if atomic.LoadUint32(&ibc.completed) < uint32(len(ibc.requestsKeys)) { if atomic.LoadUint32(&ibc.completed) < uint32(len(ibc.requestsKeys)) {
ibc.resetLock.RUnlock() ibc.resetLock.RUnlock()
return *new(O), fmt.Errorf("results not available yet, Fetch more first") return nil, fmt.Errorf("results not available yet, Fetch more first")
}
if ibc.results != nil {
ibc.resetLock.RUnlock()
return *ibc.results, nil
} }
out, err := ibc.makeResults(ibc.requestsKeys, ibc.requestsValues)
ibc.resetLock.RUnlock() ibc.resetLock.RUnlock()
if err != nil { return ibc.requestsValues, nil
// start over
ibc.Reset()
} else {
// cache the valid results
ibc.resetLock.Lock()
ibc.results = &out
ibc.resetLock.Unlock()
}
return out, err
} }
...@@ -49,12 +49,6 @@ func makeTestRequest(i int) (*string, rpc.BatchElem) { ...@@ -49,12 +49,6 @@ func makeTestRequest(i int) (*string, rpc.BatchElem) {
} }
} }
func makeTestResults() func(keys []int, values []*string) ([]*string, error) {
return func(keys []int, values []*string) ([]*string, error) {
return values, nil
}
}
func (tc *batchTestCase) GetBatch(ctx context.Context, b []rpc.BatchElem) error { func (tc *batchTestCase) GetBatch(ctx context.Context, b []rpc.BatchElem) error {
if ctx.Err() != nil { if ctx.Err() != nil {
return ctx.Err() return ctx.Err()
...@@ -103,7 +97,7 @@ func (tc *batchTestCase) Run(t *testing.T) { ...@@ -103,7 +97,7 @@ func (tc *batchTestCase) Run(t *testing.T) {
tc.On("get", batch).Once().Run(makeMock(bci, bc)).Return([]error{bc.rpcErr}) // wrap to preserve nil as type of error tc.On("get", batch).Once().Run(makeMock(bci, bc)).Return([]error{bc.rpcErr}) // wrap to preserve nil as type of error
} }
} }
iter := NewIterativeBatchCall[int, *string, []*string](keys, makeTestRequest, makeTestResults(), tc.GetBatch, tc.batchSize) iter := NewIterativeBatchCall[int, *string](keys, makeTestRequest, tc.GetBatch, tc.batchSize)
for i, bc := range tc.batchCalls { for i, bc := range tc.batchCalls {
ctx := context.Background() ctx := context.Background()
if bc.makeCtx != nil { if bc.makeCtx != nil {
......
...@@ -3,7 +3,6 @@ package sources ...@@ -3,7 +3,6 @@ package sources
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -44,6 +43,9 @@ type EthClientConfig struct { ...@@ -44,6 +43,9 @@ type EthClientConfig struct {
// If this is not checked, disabled header fields like the nonce or difficulty // If this is not checked, disabled header fields like the nonce or difficulty
// may be used to get a different block-hash. // may be used to get a different block-hash.
MustBePostMerge bool MustBePostMerge bool
// RPCProviderKind is a hint at what type of RPC provider we are dealing with
RPCProviderKind RPCProviderKind
} }
func (c *EthClientConfig) Check() error { func (c *EthClientConfig) Check() error {
...@@ -65,6 +67,9 @@ func (c *EthClientConfig) Check() error { ...@@ -65,6 +67,9 @@ func (c *EthClientConfig) Check() error {
if c.MaxRequestsPerBatch < 1 { if c.MaxRequestsPerBatch < 1 {
return fmt.Errorf("expected at least 1 request per batch, but max is: %d", c.MaxRequestsPerBatch) return fmt.Errorf("expected at least 1 request per batch, but max is: %d", c.MaxRequestsPerBatch)
} }
if !ValidRPCProviderKind(c.RPCProviderKind) {
return fmt.Errorf("unknown rpc provider kind: %s", c.RPCProviderKind)
}
return nil return nil
} }
...@@ -78,11 +83,13 @@ type EthClient struct { ...@@ -78,11 +83,13 @@ type EthClient struct {
mustBePostMerge bool mustBePostMerge bool
provKind RPCProviderKind
log log.Logger log log.Logger
// cache receipts in bundles per block hash // cache receipts in bundles per block hash
// We cache the receipts fetcher to not lose progress when we have to retry the `Fetch` call // We cache the receipts fetching job to not lose progress when we have to retry the `Fetch` call
// common.Hash -> eth.ReceiptsFetcher // common.Hash -> *receiptsFetchingJob
receiptsCache *caching.LRUCache receiptsCache *caching.LRUCache
// cache transactions in bundles per block hash // cache transactions in bundles per block hash
...@@ -96,6 +103,27 @@ type EthClient struct { ...@@ -96,6 +103,27 @@ type EthClient struct {
// cache payloads by hash // cache payloads by hash
// common.Hash -> *eth.ExecutionPayload // common.Hash -> *eth.ExecutionPayload
payloadsCache *caching.LRUCache payloadsCache *caching.LRUCache
// availableReceiptMethods tracks which receipt methods can be used for fetching receipts
// This may be modified concurrently, but we don't lock since it's a single
// uint64 that's not critical (fine to miss or mix up a modification)
availableReceiptMethods ReceiptsFetchingMethod
}
func (s *EthClient) PickReceiptsMethod(txCount uint64) ReceiptsFetchingMethod {
return PickBestReceiptsFetchingMethod(s.provKind, s.availableReceiptMethods, txCount)
}
func (s *EthClient) OnReceiptsMethodErr(m ReceiptsFetchingMethod, err error) {
if unusableMethod(err) {
// clear the bit of the method that errored
s.availableReceiptMethods &^= m
s.log.Warn("failed to use selected RPC method for receipt fetching, falling back to alternatives",
"provider_kind", s.provKind, "failed_method", m, "fallback", s.availableReceiptMethods, "err", err)
} else {
s.log.Debug("failed to use selected RPC method for receipt fetching, but method does appear to be available, so we continue to use it",
"provider_kind", s.provKind, "failed_method", m, "fallback", s.availableReceiptMethods&^m, "err", err)
}
} }
// NewEthClient wraps a RPC with bindings to fetch ethereum data, // NewEthClient wraps a RPC with bindings to fetch ethereum data,
...@@ -106,14 +134,17 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co ...@@ -106,14 +134,17 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co
} }
client = LimitRPC(client, config.MaxConcurrentRequests) client = LimitRPC(client, config.MaxConcurrentRequests)
return &EthClient{ return &EthClient{
client: client, client: client,
maxBatchSize: config.MaxRequestsPerBatch, maxBatchSize: config.MaxRequestsPerBatch,
trustRPC: config.TrustRPC, trustRPC: config.TrustRPC,
log: log, mustBePostMerge: config.MustBePostMerge,
receiptsCache: caching.NewLRUCache(metrics, "receipts", config.ReceiptsCacheSize), provKind: config.RPCProviderKind,
transactionsCache: caching.NewLRUCache(metrics, "txs", config.TransactionsCacheSize), log: log,
headersCache: caching.NewLRUCache(metrics, "headers", config.HeadersCacheSize), receiptsCache: caching.NewLRUCache(metrics, "receipts", config.ReceiptsCacheSize),
payloadsCache: caching.NewLRUCache(metrics, "payloads", config.PayloadsCacheSize), transactionsCache: caching.NewLRUCache(metrics, "txs", config.TransactionsCacheSize),
headersCache: caching.NewLRUCache(metrics, "headers", config.HeadersCacheSize),
payloadsCache: caching.NewLRUCache(metrics, "payloads", config.PayloadsCacheSize),
availableReceiptMethods: AvailableReceiptsFetchingMethods(config.RPCProviderKind),
}, nil }, nil
} }
...@@ -238,26 +269,18 @@ func (s *EthClient) FetchReceipts(ctx context.Context, blockHash common.Hash) (e ...@@ -238,26 +269,18 @@ func (s *EthClient) FetchReceipts(ctx context.Context, blockHash common.Hash) (e
// Try to reuse the receipts fetcher because is caches the results of intermediate calls. This means // Try to reuse the receipts fetcher because is caches the results of intermediate calls. This means
// that if just one of many calls fail, we only retry the failed call rather than all of the calls. // that if just one of many calls fail, we only retry the failed call rather than all of the calls.
// The underlying fetcher uses the receipts hash to verify receipt integrity. // The underlying fetcher uses the receipts hash to verify receipt integrity.
var fetcher eth.ReceiptsFetcher var job *receiptsFetchingJob
if v, ok := s.receiptsCache.Get(blockHash); ok { if v, ok := s.receiptsCache.Get(blockHash); ok {
fetcher = v.(eth.ReceiptsFetcher) job = v.(*receiptsFetchingJob)
} else { } else {
txHashes := make([]common.Hash, len(txs)) txHashes := make([]common.Hash, len(txs))
for i := 0; i < len(txs); i++ { for i := 0; i < len(txs); i++ {
txHashes[i] = txs[i].Hash() txHashes[i] = txs[i].Hash()
} }
fetcher = NewReceiptsFetcher(eth.ToBlockID(info), info.ReceiptHash(), txHashes, s.client.BatchCallContext, s.maxBatchSize) job = NewReceiptsFetchingJob(s, s.client, s.maxBatchSize, eth.ToBlockID(info), info.ReceiptHash(), txHashes)
s.receiptsCache.Add(blockHash, fetcher) s.receiptsCache.Add(blockHash, job)
}
// Fetch all receipts
for {
if err := fetcher.Fetch(ctx); err == io.EOF {
break
} else if err != nil {
return nil, nil, err
}
} }
receipts, err := fetcher.Result() receipts, err := job.Fetch(ctx)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
......
...@@ -52,6 +52,7 @@ var testEthClientConfig = &EthClientConfig{ ...@@ -52,6 +52,7 @@ var testEthClientConfig = &EthClientConfig{
MaxConcurrentRequests: 10, MaxConcurrentRequests: 10,
TrustRPC: false, TrustRPC: false,
MustBePostMerge: false, MustBePostMerge: false,
RPCProviderKind: RPCKindBasic,
} }
func randHash() (out common.Hash) { func randHash() (out common.Hash) {
...@@ -132,7 +133,7 @@ func TestEthClient_InfoByNumber(t *testing.T) { ...@@ -132,7 +133,7 @@ func TestEthClient_InfoByNumber(t *testing.T) {
"eth_getBlockByNumber", []any{n.String(), false}).Run(func(args mock.Arguments) { "eth_getBlockByNumber", []any{n.String(), false}).Run(func(args mock.Arguments) {
*args[1].(**rpcHeader) = rhdr *args[1].(**rpcHeader) = rhdr
}).Return([]error{nil}) }).Return([]error{nil})
s, err := NewL1Client(m, nil, nil, L1ClientDefaultConfig(&rollup.Config{SeqWindowSize: 10}, true)) s, err := NewL1Client(m, nil, nil, L1ClientDefaultConfig(&rollup.Config{SeqWindowSize: 10}, true, RPCKindBasic))
require.NoError(t, err) require.NoError(t, err)
info, err := s.InfoByNumber(ctx, uint64(n)) info, err := s.InfoByNumber(ctx, uint64(n))
require.NoError(t, err) require.NoError(t, err)
......
...@@ -5,13 +5,14 @@ import ( ...@@ -5,13 +5,14 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/sources/caching" "github.com/ethereum-optimism/optimism/op-node/sources/caching"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
) )
type L1ClientConfig struct { type L1ClientConfig struct {
...@@ -20,7 +21,7 @@ type L1ClientConfig struct { ...@@ -20,7 +21,7 @@ type L1ClientConfig struct {
L1BlockRefsCacheSize int L1BlockRefsCacheSize int
} }
func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool) *L1ClientConfig { func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProviderKind) *L1ClientConfig {
// Cache 3/2 worth of sequencing window of receipts and txs // Cache 3/2 worth of sequencing window of receipts and txs
span := int(config.SeqWindowSize) * 3 / 2 span := int(config.SeqWindowSize) * 3 / 2
if span > 1000 { // sanity cap. If a large sequencing window is configured, do not make the cache too large if span > 1000 { // sanity cap. If a large sequencing window is configured, do not make the cache too large
...@@ -37,6 +38,7 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool) *L1ClientConfig ...@@ -37,6 +38,7 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool) *L1ClientConfig
MaxConcurrentRequests: 10, MaxConcurrentRequests: 10,
TrustRPC: trustRPC, TrustRPC: trustRPC,
MustBePostMerge: false, MustBePostMerge: false,
RPCProviderKind: kind,
}, },
L1BlockRefsCacheSize: span, L1BlockRefsCacheSize: span,
} }
......
...@@ -48,6 +48,7 @@ func L2ClientDefaultConfig(config *rollup.Config, trustRPC bool) *L2ClientConfig ...@@ -48,6 +48,7 @@ func L2ClientDefaultConfig(config *rollup.Config, trustRPC bool) *L2ClientConfig
MaxConcurrentRequests: 10, MaxConcurrentRequests: 10,
TrustRPC: trustRPC, TrustRPC: trustRPC,
MustBePostMerge: true, MustBePostMerge: true,
RPCProviderKind: RPCKindBasic,
}, },
L2BlockRefsCacheSize: span, L2BlockRefsCacheSize: span,
L1ConfigsCacheSize: span, L1ConfigsCacheSize: span,
......
This diff is collapsed.
...@@ -4,19 +4,23 @@ import ( ...@@ -4,19 +4,23 @@ import (
"context" "context"
"fmt" "fmt"
"math/big" "math/big"
"strings"
"github.com/holiman/uint256" "github.com/holiman/uint256"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"github.com/ethereum-optimism/optimism/op-node/eth"
) )
type BatchCallContextFn func(ctx context.Context, b []rpc.BatchElem) error type BatchCallContextFn func(ctx context.Context, b []rpc.BatchElem) error
type CallContextFn func(ctx context.Context, result any, method string, args ...any) error
// Note: these types are used, instead of the geth types, to enable: // Note: these types are used, instead of the geth types, to enable:
// - batched calls of many block requests (standard bindings do extra uncle-header fetches, cannot be batched nicely) // - batched calls of many block requests (standard bindings do extra uncle-header fetches, cannot be batched nicely)
// - ignore uncle data (does not even exist anymore post-Merge) // - ignore uncle data (does not even exist anymore post-Merge)
...@@ -258,3 +262,27 @@ func (block *rpcBlock) ExecutionPayload(trustCache bool) (*eth.ExecutionPayload, ...@@ -258,3 +262,27 @@ func (block *rpcBlock) ExecutionPayload(trustCache bool) (*eth.ExecutionPayload,
Transactions: opaqueTxs, Transactions: opaqueTxs,
}, nil }, nil
} }
// blockHashParameter is used as "block parameter":
// Some Nethermind and Alchemy RPC endpoints require an object to identify a block, instead of a string.
type blockHashParameter struct {
BlockHash common.Hash `json:"blockHash"`
}
// unusableMethod identifies if an error indicates that the RPC method cannot be used as expected:
// if it's an unknown method, or if parameters were invalid.
func unusableMethod(err error) bool {
if rpcErr, ok := err.(rpc.Error); ok {
code := rpcErr.ErrorCode()
// method not found, or invalid params
if code == -32601 || code == -32602 {
return true
}
} else {
errText := strings.ToLower(err.Error())
if strings.Contains(errText, "unknown method") || strings.Contains(errText, "invalid param") || strings.Contains(errText, "is not available") {
return true
}
}
return false
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment