Commit 8478fa6f authored by Sebastian Stammler's avatar Sebastian Stammler Committed by GitHub

Revert "Add `PrefetchingEthClient`, which builds a cache of ethclient data for callers."

parent d9e4363d
......@@ -124,18 +124,6 @@ var (
EnvVars: prefixEnvVars("L1_HTTP_POLL_INTERVAL"),
Value: time.Second * 12,
}
L1PrefetchingWindow = &cli.Uint64Flag{
Name: "l1.prefetching-window",
Usage: "Number of L1 blocks to prefetch in the background. Disabled if 0.",
EnvVars: prefixEnvVars("L1_PREFETCHING_WINDOW"),
Value: 0,
}
L1PrefetchingTimeout = &cli.DurationFlag{
Name: "l1.prefetching-timeout",
Usage: "Timeout for L1 prefetching. Disabled if 0.",
EnvVars: prefixEnvVars("L1_PREFETCHING_TIMEOUT"),
Value: time.Second * 30,
}
L2EngineJWTSecret = &cli.StringFlag{
Name: "l2.jwt-secret",
Usage: "Path to JWT secret key. Keys are 32 bytes, hex encoded in a file. A new key will be generated if left empty.",
......@@ -321,8 +309,6 @@ var optionalFlags = []cli.Flag{
L1RPCMaxBatchSize,
L1RPCMaxConcurrency,
L1HTTPPollInterval,
L1PrefetchingWindow,
L1PrefetchingTimeout,
L2EngineJWTSecret,
VerifierL1Confs,
SequencerEnabledFlag,
......
......@@ -159,14 +159,6 @@ type L1EndpointConfig struct {
// It is recommended to use websockets or IPC for efficient following of the changing block.
// Setting this to 0 disables polling.
HttpPollInterval time.Duration
// PrefetchingWindow specifies the number of blocks to prefetch from the L1 RPC.
// Setting this to 0 disables prefetching.
PrefetchingWindow uint64
// PrefetchingTimeout specifies the timeout for prefetching from the L1 RPC.
// Setting this to 0 disables prefetching.
PrefetchingTimeout time.Duration
}
var _ L1EndpointSetup = (*L1EndpointConfig)(nil)
......@@ -200,8 +192,6 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf
rpcCfg := sources.L1ClientDefaultConfig(rollupCfg, cfg.L1TrustRPC, cfg.L1RPCKind)
rpcCfg.MaxRequestsPerBatch = cfg.BatchSize
rpcCfg.MaxConcurrentRequests = cfg.MaxConcurrency
rpcCfg.PrefetchingWindow = cfg.PrefetchingWindow
rpcCfg.PrefetchingTimeout = cfg.PrefetchingTimeout
return l1Node, rpcCfg, nil
}
......
......@@ -123,15 +123,13 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
func NewL1EndpointConfig(ctx *cli.Context) *node.L1EndpointConfig {
return &node.L1EndpointConfig{
L1NodeAddr: ctx.String(flags.L1NodeAddr.Name),
L1TrustRPC: ctx.Bool(flags.L1TrustRPC.Name),
L1RPCKind: sources.RPCProviderKind(strings.ToLower(ctx.String(flags.L1RPCProviderKind.Name))),
PrefetchingWindow: ctx.Uint64(flags.L1PrefetchingWindow.Name),
PrefetchingTimeout: ctx.Duration(flags.L1PrefetchingTimeout.Name),
RateLimit: ctx.Float64(flags.L1RPCRateLimit.Name),
BatchSize: ctx.Int(flags.L1RPCMaxBatchSize.Name),
HttpPollInterval: ctx.Duration(flags.L1HTTPPollInterval.Name),
MaxConcurrency: ctx.Int(flags.L1RPCMaxConcurrency.Name),
L1NodeAddr: ctx.String(flags.L1NodeAddr.Name),
L1TrustRPC: ctx.Bool(flags.L1TrustRPC.Name),
L1RPCKind: sources.RPCProviderKind(strings.ToLower(ctx.String(flags.L1RPCProviderKind.Name))),
RateLimit: ctx.Float64(flags.L1RPCRateLimit.Name),
BatchSize: ctx.Int(flags.L1RPCMaxBatchSize.Name),
HttpPollInterval: ctx.Duration(flags.L1HTTPPollInterval.Name),
MaxConcurrency: ctx.Int(flags.L1RPCMaxConcurrency.Name),
}
}
......
......@@ -20,8 +20,6 @@ type L1ClientConfig struct {
EthClientConfig
L1BlockRefsCacheSize int
PrefetchingWindow uint64
PrefetchingTimeout time.Duration
}
func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProviderKind) *L1ClientConfig {
......@@ -47,8 +45,6 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide
},
// Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors.
L1BlockRefsCacheSize: fullSpan,
PrefetchingWindow: 0, // no prefetching by default
PrefetchingTimeout: 0, // no prefetching by default
}
}
......@@ -56,7 +52,7 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide
// with optimized batch requests, cached results, and flag to not trust the RPC
// (i.e. to verify all returned contents against corresponding block hashes).
type L1Client struct {
EthClientInterface
*EthClient
// cache L1BlockRef by hash
// common.Hash -> eth.L1BlockRef
......@@ -70,23 +66,10 @@ func NewL1Client(client client.RPC, log log.Logger, metrics caching.Metrics, con
return nil, err
}
var clientToUse EthClientInterface
if config.PrefetchingTimeout > 0 && config.PrefetchingWindow > 0 {
prefetchingEthClient, err := NewPrefetchingEthClient(ethClient, config.PrefetchingWindow, config.PrefetchingTimeout)
if err != nil {
return nil, err
}
clientToUse = prefetchingEthClient
} else {
clientToUse = ethClient
}
return &L1Client{
EthClientInterface: clientToUse,
l1BlockRefsCache: caching.NewLRUCache[common.Hash, eth.L1BlockRef](metrics, "blockrefs", config.L1BlockRefsCacheSize),
EthClient: ethClient,
l1BlockRefsCache: caching.NewLRUCache[common.Hash, eth.L1BlockRef](metrics, "blockrefs", config.L1BlockRefsCacheSize),
}, nil
}
// L1BlockRefByLabel returns the [eth.L1BlockRef] for the given block label.
......
package sources
import (
"context"
"math/big"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
type EthClientInterface interface {
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
ChainID(ctx context.Context) (*big.Int, error)
InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error)
InfoByNumber(ctx context.Context, number uint64) (eth.BlockInfo, error)
InfoByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockInfo, error)
InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error)
InfoAndTxsByNumber(ctx context.Context, number uint64) (eth.BlockInfo, types.Transactions, error)
InfoAndTxsByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockInfo, types.Transactions, error)
PayloadByHash(ctx context.Context, hash common.Hash) (*eth.ExecutionPayload, error)
PayloadByNumber(ctx context.Context, number uint64) (*eth.ExecutionPayload, error)
PayloadByLabel(ctx context.Context, label eth.BlockLabel) (*eth.ExecutionPayload, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error)
GetStorageAt(ctx context.Context, address common.Address, storageSlot common.Hash, blockTag string) (common.Hash, error)
ReadStorageAt(ctx context.Context, address common.Address, storageSlot common.Hash, blockHash common.Hash) (common.Hash, error)
Close()
}
type PrefetchingEthClient struct {
inner EthClientInterface
PrefetchingRange uint64
PrefetchingTimeout time.Duration
runningCtx context.Context
runningCancel context.CancelFunc
highestHeadRequesting uint64
highestHeadLock sync.Mutex
wg *sync.WaitGroup // used for testing
}
// NewPrefetchingEthClient creates a new [PrefetchingEthClient] with the given underlying [EthClient]
// and a prefetching range.
func NewPrefetchingEthClient(inner EthClientInterface, prefetchingRange uint64, timeout time.Duration) (*PrefetchingEthClient, error) {
// Create a new context for the prefetching goroutines
runningCtx, runningCancel := context.WithCancel(context.Background())
return &PrefetchingEthClient{
inner: inner,
PrefetchingRange: prefetchingRange,
PrefetchingTimeout: timeout,
runningCtx: runningCtx,
runningCancel: runningCancel,
highestHeadRequesting: 0,
}, nil
}
func (p *PrefetchingEthClient) updateRequestingHead(start, end uint64) (newStart uint64, shouldFetch bool) {
// Acquire lock before reading/updating highestHeadRequesting
p.highestHeadLock.Lock()
defer p.highestHeadLock.Unlock()
if start <= p.highestHeadRequesting {
start = p.highestHeadRequesting + 1
}
if p.highestHeadRequesting < end {
p.highestHeadRequesting = end
}
return start, start <= end
}
func (p *PrefetchingEthClient) FetchWindow(start, end uint64) {
if p.wg != nil {
defer p.wg.Done()
}
start, shouldFetch := p.updateRequestingHead(start, end)
if !shouldFetch {
return
}
ctx, cancel := context.WithTimeout(p.runningCtx, p.PrefetchingTimeout)
defer cancel()
for i := start; i <= end; i++ {
p.FetchBlockAndReceipts(ctx, i)
}
}
func (p *PrefetchingEthClient) FetchBlockAndReceipts(ctx context.Context, number uint64) {
blockInfo, _, err := p.inner.InfoAndTxsByNumber(ctx, number)
if err != nil {
return
}
_, _, _ = p.inner.FetchReceipts(ctx, blockInfo.Hash())
}
func (p *PrefetchingEthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
return p.inner.SubscribeNewHead(ctx, ch)
}
func (p *PrefetchingEthClient) ChainID(ctx context.Context) (*big.Int, error) {
return p.inner.ChainID(ctx)
}
func (p *PrefetchingEthClient) InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error) {
// Fetch the block information for the given hash
blockInfo, err := p.inner.InfoByHash(ctx, hash)
if err != nil {
return blockInfo, err
}
if p.wg != nil {
p.wg.Add(1)
}
// Prefetch the next n blocks and their receipts starting from the block number of the fetched block
go p.FetchWindow(blockInfo.NumberU64()+1, blockInfo.NumberU64()+p.PrefetchingRange)
return blockInfo, nil
}
func (p *PrefetchingEthClient) InfoByNumber(ctx context.Context, number uint64) (eth.BlockInfo, error) {
if p.wg != nil {
p.wg.Add(1)
}
// Trigger prefetching in the background
go p.FetchWindow(number+1, number+p.PrefetchingRange)
// Fetch the requested block
return p.inner.InfoByNumber(ctx, number)
}
func (p *PrefetchingEthClient) InfoByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockInfo, error) {
// Fetch the block information for the given label
blockInfo, err := p.inner.InfoByLabel(ctx, label)
if err != nil {
return blockInfo, err
}
if p.wg != nil {
p.wg.Add(1)
}
// Prefetch the next n blocks and their receipts starting from the block number of the fetched block
go p.FetchWindow(blockInfo.NumberU64()+1, blockInfo.NumberU64()+p.PrefetchingRange)
return blockInfo, nil
}
func (p *PrefetchingEthClient) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) {
// Fetch the block info and transactions for the requested hash
blockInfo, txs, err := p.inner.InfoAndTxsByHash(ctx, hash)
if err != nil {
return blockInfo, txs, err
}
if p.wg != nil {
p.wg.Add(1)
}
// Prefetch the next n blocks and their receipts
go p.FetchWindow(blockInfo.NumberU64()+1, blockInfo.NumberU64()+p.PrefetchingRange)
return blockInfo, txs, nil
}
func (p *PrefetchingEthClient) InfoAndTxsByNumber(ctx context.Context, number uint64) (eth.BlockInfo, types.Transactions, error) {
// Fetch the block info and transactions for the requested number
blockInfo, txs, err := p.inner.InfoAndTxsByNumber(ctx, number)
if err != nil {
return blockInfo, txs, err
}
if p.wg != nil {
p.wg.Add(1)
}
// Prefetch the next n blocks and their receipts
go p.FetchWindow(number+1, number+p.PrefetchingRange)
return blockInfo, txs, nil
}
func (p *PrefetchingEthClient) InfoAndTxsByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockInfo, types.Transactions, error) {
// Fetch the block info and transactions for the requested label
blockInfo, txs, err := p.inner.InfoAndTxsByLabel(ctx, label)
if err != nil {
return blockInfo, txs, err
}
if p.wg != nil {
p.wg.Add(1)
}
// Prefetch the next n blocks and their receipts
go p.FetchWindow(blockInfo.NumberU64()+1, blockInfo.NumberU64()+p.PrefetchingRange)
return blockInfo, txs, nil
}
func (p *PrefetchingEthClient) PayloadByHash(ctx context.Context, hash common.Hash) (*eth.ExecutionPayload, error) {
// Fetch the payload for the requested hash
payload, err := p.inner.PayloadByHash(ctx, hash)
if err != nil {
return payload, err
}
if p.wg != nil {
p.wg.Add(1)
}
// Prefetch the next n blocks and their receipts
go p.FetchWindow(uint64(payload.BlockNumber)+1, uint64(payload.BlockNumber)+p.PrefetchingRange)
return payload, nil
}
func (p *PrefetchingEthClient) PayloadByNumber(ctx context.Context, number uint64) (*eth.ExecutionPayload, error) {
// Fetch the payload for the requested number
payload, err := p.inner.PayloadByNumber(ctx, number)
if err != nil {
return payload, err
}
if p.wg != nil {
p.wg.Add(1)
}
// Prefetch the next n blocks and their receipts
go p.FetchWindow(number+1, number+p.PrefetchingRange)
return payload, nil
}
func (p *PrefetchingEthClient) PayloadByLabel(ctx context.Context, label eth.BlockLabel) (*eth.ExecutionPayload, error) {
// Fetch the payload for the requested label
payload, err := p.inner.PayloadByLabel(ctx, label)
if err != nil {
return payload, err
}
if p.wg != nil {
p.wg.Add(1)
}
// Prefetch the next n blocks and their receipts
go p.FetchWindow(uint64(payload.BlockNumber)+1, uint64(payload.BlockNumber)+p.PrefetchingRange)
return payload, nil
}
func (p *PrefetchingEthClient) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) {
// Fetch the block info and receipts for the requested hash
blockInfo, receipts, err := p.inner.FetchReceipts(ctx, blockHash)
if err != nil {
return blockInfo, receipts, err
}
if p.wg != nil {
p.wg.Add(1)
}
// Prefetch the next n blocks and their receipts
go p.FetchWindow(blockInfo.NumberU64(), blockInfo.NumberU64()+p.PrefetchingRange)
return blockInfo, receipts, nil
}
func (p *PrefetchingEthClient) GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error) {
return p.inner.GetProof(ctx, address, storage, blockTag)
}
func (p *PrefetchingEthClient) GetStorageAt(ctx context.Context, address common.Address, storageSlot common.Hash, blockTag string) (common.Hash, error) {
return p.inner.GetStorageAt(ctx, address, storageSlot, blockTag)
}
func (p *PrefetchingEthClient) ReadStorageAt(ctx context.Context, address common.Address, storageSlot common.Hash, blockHash common.Hash) (common.Hash, error) {
return p.inner.ReadStorageAt(ctx, address, storageSlot, blockHash)
}
func (p *PrefetchingEthClient) Close() {
p.runningCancel()
p.inner.Close()
}
package sources
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)
// TestPrefetchingEthClient runs all test cases for each prefetching range.
func TestPrefetchingEthClient(t *testing.T) {
prefetchingRanges := []uint64{0, 1, 5}
for _, prefetchingRange := range prefetchingRanges {
testName := fmt.Sprintf("range-%d", prefetchingRange)
t.Run(testName, func(t *testing.T) {
ctx := context.Background()
mockEthClient := new(testutils.MockEthClient)
client, err := NewPrefetchingEthClient(mockEthClient, prefetchingRange, 30*time.Second)
require.NoError(t, err)
defer client.Close()
client.wg = new(sync.WaitGroup) // Initialize the WaitGroup for testing
// set up a random block to get from the client
randomness := rand.New(rand.NewSource(123))
block, _ := randomRpcBlockAndReceipts(randomness, 2)
rhdr := block.rpcHeader
expectedTxs := block.Transactions
expectedInfo, err := rhdr.Info(true, false)
require.NoError(t, err)
mockEthClient.ExpectInfoAndTxsByNumber(uint64(rhdr.Number), expectedInfo, expectedTxs, nil)
// also set up a window of random blocks and receipts to prefetch
windowEnd := (uint64(rhdr.Number) + client.PrefetchingRange)
for i := uint64(rhdr.Number) + 1; i <= windowEnd; i++ {
// set up different info per iteration
fillerBlock, fillerReceipts := randomRpcBlockAndReceipts(randomness, 2)
fillerBlock.rpcHeader.Number = hexutil.Uint64(i)
fillerInfo, err := fillerBlock.rpcHeader.Info(true, false)
require.NoError(t, err)
mockEthClient.ExpectInfoAndTxsByNumber(i, fillerInfo, fillerBlock.Transactions, nil)
mockEthClient.ExpectFetchReceipts(fillerBlock.Hash, fillerInfo, fillerReceipts, nil)
}
info, txs, err := client.InfoAndTxsByNumber(ctx, uint64(rhdr.Number))
require.NoError(t, err)
require.Equal(t, info, expectedInfo)
require.Equal(t, txs, types.Transactions(expectedTxs))
client.wg.Wait() // Wait for all goroutines to complete before asserting expectations
mockEthClient.AssertExpectations(t)
})
}
}
func TestUpdateRequestingHead_NormalRange(t *testing.T) {
client := &PrefetchingEthClient{
highestHeadRequesting: 10,
PrefetchingTimeout: 30 * time.Second,
}
start, end := uint64(11), uint64(15)
newStart, shouldFetch := client.updateRequestingHead(start, end)
require.Equal(t, newStart, start)
require.True(t, shouldFetch)
require.Equal(t, client.highestHeadRequesting, end)
}
func TestUpdateRequestingHead_OverlappingRange(t *testing.T) {
highestHeadBeforeUpdate := uint64(10)
client := &PrefetchingEthClient{
highestHeadRequesting: highestHeadBeforeUpdate,
PrefetchingTimeout: 30 * time.Second,
}
start, end := uint64(8), uint64(12)
newStart, shouldFetch := client.updateRequestingHead(start, end)
require.Equal(t, newStart, highestHeadBeforeUpdate+1)
require.True(t, shouldFetch)
require.Equal(t, client.highestHeadRequesting, end)
}
......@@ -2,11 +2,9 @@ package testutils
import (
"context"
"math/big"
"github.com/stretchr/testify/mock"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
......@@ -17,15 +15,6 @@ type MockEthClient struct {
mock.Mock
}
func (m *MockEthClient) ChainID(ctx context.Context) (*big.Int, error) {
out := m.Mock.MethodCalled("ChainID")
return out[0].(*big.Int), *out[1].(*error)
}
func (m *MockEthClient) ExpectChainID(chainID *big.Int, err error) {
m.Mock.On("ChainID").Once().Return(chainID, &err)
}
func (m *MockEthClient) InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error) {
out := m.Mock.MethodCalled("InfoByHash", hash)
return *out[0].(*eth.BlockInfo), *out[1].(*error)
......@@ -139,9 +128,3 @@ func (m *MockEthClient) ReadStorageAt(ctx context.Context, address common.Addres
func (m *MockEthClient) ExpectReadStorageAt(ctx context.Context, address common.Address, storageSlot common.Hash, blockHash common.Hash, result common.Hash, err error) {
m.Mock.On("ReadStorageAt", address, storageSlot, blockHash).Once().Return(result, &err)
}
func (m *MockEthClient) Close() {}
func (m *MockEthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
return nil, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment