Commit e89248ad authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-program: Introduce RetryingL2Sources and use it to make prefetcher somewhat...

op-program: Introduce RetryingL2Sources and use it to make prefetcher somewhat multi-L2 capable (#13718)

* op-program: Introduce L2Sources to combine info for multiple L2s.

* op-program: Plumb L2Sources in

* op-e2e: Use actual chain ID, not hte test place holder.

* op-program: Use correct chain ID for rexec and prevent duplicate RPC urls for same chain

* op-program: Fix test to use chain ID that is actually configured in the test
parent eea9572d
...@@ -5,11 +5,13 @@ import ( ...@@ -5,11 +5,13 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/actions/helpers" "github.com/ethereum-optimism/optimism/op-e2e/actions/helpers"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/fakebeacon" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/fakebeacon"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-program/host" "github.com/ethereum-optimism/optimism/op-program/host"
hostcommon "github.com/ethereum-optimism/optimism/op-program/host/common" hostcommon "github.com/ethereum-optimism/optimism/op-program/host/common"
"github.com/ethereum-optimism/optimism/op-program/host/config" "github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore" "github.com/ethereum-optimism/optimism/op-program/host/kvstore"
"github.com/ethereum-optimism/optimism/op-program/host/prefetcher" "github.com/ethereum-optimism/optimism/op-program/host/prefetcher"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
...@@ -76,14 +78,11 @@ func RunFaultProofProgram(t helpers.Testing, logger log.Logger, l1 *helpers.L1Mi ...@@ -76,14 +78,11 @@ func RunFaultProofProgram(t helpers.Testing, logger log.Logger, l1 *helpers.L1Mi
l1BlobFetcher := l1.BlobSource() l1BlobFetcher := l1.BlobSource()
// Set up in-process L2 source // Set up in-process L2 source
l2ClCfg := sources.L2ClientDefaultConfig(l2.RollupCfg, true) sources, err := prefetcher.NewRetryingL2Sources(ctx, logger, []*rollup.Config{l2.RollupCfg}, []client.RPC{l2Eng.RPCClient()}, nil)
l2RPC := l2Eng.RPCClient()
l2Client, err := hostcommon.NewL2Client(l2RPC, logger, nil, &hostcommon.L2ClientConfig{L2ClientConfig: l2ClCfg})
require.NoError(t, err, "failed to create L2 client") require.NoError(t, err, "failed to create L2 client")
l2DebugCl := hostcommon.NewL2SourceWithClient(logger, l2Client, sources.NewDebugClient(l2RPC.CallContext))
executor := host.MakeProgramExecutor(logger, programCfg) executor := host.MakeProgramExecutor(logger, programCfg)
return prefetcher.NewPrefetcher(logger, l1Cl, l1BlobFetcher, l2DebugCl, kv, executor, cfg.L2Head, cfg.AgreedPrestate), nil return prefetcher.NewPrefetcher(logger, l1Cl, l1BlobFetcher, l2.RollupCfg.L2ChainID.Uint64(), sources, kv, executor, cfg.L2Head, cfg.AgreedPrestate), nil
}) })
err = hostcommon.FaultProofProgram(t.Ctx(), logger, programCfg, withInProcessPrefetcher) err = hostcommon.FaultProofProgram(t.Ctx(), logger, programCfg, withInProcessPrefetcher)
checkResult(t, err) checkResult(t, err)
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"errors" "errors"
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-program/host/config" "github.com/ethereum-optimism/optimism/op-program/host/config"
hosttypes "github.com/ethereum-optimism/optimism/op-program/host/types" hosttypes "github.com/ethereum-optimism/optimism/op-program/host/types"
"github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/client"
...@@ -50,15 +51,29 @@ func NewL2SourceWithClient(logger log.Logger, canonicalL2Client *L2Client, canon ...@@ -50,15 +51,29 @@ func NewL2SourceWithClient(logger log.Logger, canonicalL2Client *L2Client, canon
func NewL2Source(ctx context.Context, logger log.Logger, config *config.Config) (*L2Source, error) { func NewL2Source(ctx context.Context, logger log.Logger, config *config.Config) (*L2Source, error) {
logger.Info("Connecting to canonical L2 source", "url", config.L2URL) logger.Info("Connecting to canonical L2 source", "url", config.L2URL)
// eth_getProof calls are expensive and takes time, so we use a longer timeout // eth_getProof calls are expensive and takes time, so we use a longer timeout
canonicalL2RPC, err := client.NewRPC(ctx, logger, config.L2URL, client.WithDialAttempts(10), client.WithCallTimeout(5*time.Minute)) canonicalL2RPC, err := client.NewRPC(ctx, logger, config.L2URL, client.WithDialAttempts(10), client.WithCallTimeout(5*time.Minute))
if err != nil { if err != nil {
return nil, err return nil, err
} }
var experimentalRPC client.RPC
if len(config.L2ExperimentalURL) != 0 {
logger.Info("Connecting to experimental L2 source", "url", config.L2ExperimentalURL)
// debug_executionWitness calls are expensive and takes time, so we use a longer timeout
experimentalRPC, err = client.NewRPC(ctx, logger, config.L2ExperimentalURL, client.WithDialAttempts(10), client.WithCallTimeout(5*time.Minute))
if err != nil {
return nil, err
}
}
return NewL2SourceFromRPC(logger, config.Rollup, canonicalL2RPC, experimentalRPC)
}
func NewL2SourceFromRPC(logger log.Logger, rollupCfg *rollup.Config, canonicalL2RPC client.RPC, experimentalRPC client.RPC) (*L2Source, error) {
canonicalDebugClient := sources.NewDebugClient(canonicalL2RPC.CallContext) canonicalDebugClient := sources.NewDebugClient(canonicalL2RPC.CallContext)
canonicalL2ClientCfg := sources.L2ClientDefaultConfig(config.Rollup, true) canonicalL2ClientCfg := sources.L2ClientDefaultConfig(rollupCfg, true)
canonicalL2Client, err := NewL2Client(canonicalL2RPC, logger, nil, &L2ClientConfig{L2ClientConfig: canonicalL2ClientCfg}) canonicalL2Client, err := NewL2Client(canonicalL2RPC, logger, nil, &L2ClientConfig{L2ClientConfig: canonicalL2ClientCfg})
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -66,17 +81,11 @@ func NewL2Source(ctx context.Context, logger log.Logger, config *config.Config) ...@@ -66,17 +81,11 @@ func NewL2Source(ctx context.Context, logger log.Logger, config *config.Config)
source := NewL2SourceWithClient(logger, canonicalL2Client, canonicalDebugClient) source := NewL2SourceWithClient(logger, canonicalL2Client, canonicalDebugClient)
if len(config.L2ExperimentalURL) == 0 { if experimentalRPC == nil {
return source, nil return source, nil
} }
logger.Info("Connecting to experimental L2 source", "url", config.L2ExperimentalURL) experimentalL2ClientCfg := sources.L2ClientDefaultConfig(rollupCfg, true)
// debug_executionWitness calls are expensive and takes time, so we use a longer timeout
experimentalRPC, err := client.NewRPC(ctx, logger, config.L2ExperimentalURL, client.WithDialAttempts(10), client.WithCallTimeout(5*time.Minute))
if err != nil {
return nil, err
}
experimentalL2ClientCfg := sources.L2ClientDefaultConfig(config.Rollup, true)
experimentalL2Client, err := NewL2Client(experimentalRPC, logger, nil, &L2ClientConfig{L2ClientConfig: experimentalL2ClientCfg}) experimentalL2Client, err := NewL2Client(experimentalRPC, logger, nil, &L2ClientConfig{L2ClientConfig: experimentalL2ClientCfg})
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -87,6 +96,10 @@ func NewL2Source(ctx context.Context, logger log.Logger, config *config.Config) ...@@ -87,6 +96,10 @@ func NewL2Source(ctx context.Context, logger log.Logger, config *config.Config)
return source, nil return source, nil
} }
func (s *L2Source) RollupConfig() *rollup.Config {
return s.canonicalEthClient.RollupConfig()
}
func (l *L2Source) ExperimentalEnabled() bool { func (l *L2Source) ExperimentalEnabled() bool {
return l.experimentalClient != nil return l.experimentalClient != nil
} }
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/rollup"
preimage "github.com/ethereum-optimism/optimism/op-preimage" preimage "github.com/ethereum-optimism/optimism/op-preimage"
hostcommon "github.com/ethereum-optimism/optimism/op-program/host/common" hostcommon "github.com/ethereum-optimism/optimism/op-program/host/common"
"github.com/ethereum-optimism/optimism/op-program/host/config" "github.com/ethereum-optimism/optimism/op-program/host/config"
...@@ -89,13 +90,17 @@ func makeDefaultPrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV ...@@ -89,13 +90,17 @@ func makeDefaultPrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV
l1BlobFetcher := sources.NewL1BeaconClient(l1Beacon, sources.L1BeaconClientConfig{FetchAllSidecars: false}) l1BlobFetcher := sources.NewL1BeaconClient(l1Beacon, sources.L1BeaconClientConfig{FetchAllSidecars: false})
logger.Info("Initializing L2 clients") logger.Info("Initializing L2 clients")
l2Client, err := hostcommon.NewL2Source(ctx, logger, cfg) var experimentalURLs []string
if cfg.L2ExperimentalURL != "" {
experimentalURLs = append(experimentalURLs, cfg.L2ExperimentalURL)
}
sources, err := prefetcher.NewRetryingL2SourcesFromURLs(ctx, logger, []*rollup.Config{cfg.Rollup}, []string{cfg.L2URL}, experimentalURLs)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create L2 source: %w", err) return nil, fmt.Errorf("failed to create L2 sources: %w", err)
} }
executor := MakeProgramExecutor(logger, cfg) executor := MakeProgramExecutor(logger, cfg)
return prefetcher.NewPrefetcher(logger, l1Cl, l1BlobFetcher, l2Client, kv, executor, cfg.L2Head, cfg.AgreedPrestate), nil return prefetcher.NewPrefetcher(logger, l1Cl, l1BlobFetcher, cfg.Rollup.L2ChainID.Uint64(), sources, kv, executor, cfg.L2Head, cfg.AgreedPrestate), nil
} }
type programExecutor struct { type programExecutor struct {
......
package prefetcher
import (
"context"
"errors"
"fmt"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-program/host/common"
"github.com/ethereum-optimism/optimism/op-program/host/types"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)
var (
ErrNoSources = errors.New("no sources specified")
ErrNoL2ForRollup = errors.New("no L2 RPC available for rollup")
ErrNoRollupForL2 = errors.New("no rollup config available for L2 RPC")
ErrDuplicateL2URLs = errors.New("multiple L2 URLs provided for chain")
ErrNoRollupForExperimental = errors.New("no rollup config available for L2 experimental RPC")
ErrDuplicateExperimentsURLs = errors.New("multiple experimental URLs provided for chain")
)
type RetryingL2Sources struct {
Sources map[uint64]*RetryingL2Source
}
func NewRetryingL2SourcesFromURLs(ctx context.Context, logger log.Logger, configs []*rollup.Config, l2URLs []string, l2ExperimentalURLs []string) (*RetryingL2Sources, error) {
l2Clients, err := connectRPCs(ctx, logger, l2URLs)
if err != nil {
return nil, err
}
l2ExperimentalClients, err := connectRPCs(ctx, logger, l2ExperimentalURLs)
if err != nil {
return nil, err
}
return NewRetryingL2Sources(ctx, logger, configs, l2Clients, l2ExperimentalClients)
}
func connectRPCs(ctx context.Context, logger log.Logger, urls []string) ([]client.RPC, error) {
l2Clients := make([]client.RPC, len(urls))
for i, url := range urls {
logger.Info("Connecting to L2 source", "url", url)
// eth_getProof calls are expensive and takes time, so we use a longer timeout
rpc, err := client.NewRPC(ctx, logger, url, client.WithDialAttempts(10), client.WithCallTimeout(5*time.Minute))
if err != nil {
return nil, fmt.Errorf("failed to connect to rpc URL %s: %w", url, err)
}
l2Clients[i] = rpc
}
return l2Clients, nil
}
func NewRetryingL2Sources(ctx context.Context, logger log.Logger, configs []*rollup.Config, l2Clients []client.RPC, l2ExperimentalClients []client.RPC) (*RetryingL2Sources, error) {
if len(configs) == 0 {
return nil, ErrNoSources
}
rollupConfigs := make(map[uint64]*rollup.Config)
for _, rollupCfg := range configs {
rollupConfigs[rollupCfg.L2ChainID.Uint64()] = rollupCfg
}
l2RPCs := make(map[uint64]client.RPC)
for _, rpc := range l2Clients {
chainID, err := loadChainID(ctx, rpc)
if err != nil {
return nil, fmt.Errorf("failed to load chain ID: %w", err)
}
if _, ok := l2RPCs[chainID]; ok {
return nil, fmt.Errorf("%w %v", ErrDuplicateL2URLs, chainID)
}
l2RPCs[chainID] = rpc
if _, ok := rollupConfigs[chainID]; !ok {
return nil, fmt.Errorf("%w: %v", ErrNoRollupForL2, chainID)
}
}
l2ExperimentalRPCs := make(map[uint64]client.RPC)
for _, rpc := range l2ExperimentalClients {
chainID, err := loadChainID(ctx, rpc)
if err != nil {
return nil, fmt.Errorf("failed to load chain ID: %w", err)
}
if _, ok := l2ExperimentalRPCs[chainID]; ok {
return nil, fmt.Errorf("%w %v", ErrDuplicateExperimentsURLs, chainID)
}
l2ExperimentalRPCs[chainID] = rpc
if _, ok := rollupConfigs[chainID]; !ok {
return nil, fmt.Errorf("%w: %v", ErrNoRollupForExperimental, chainID)
}
}
sources := make(map[uint64]*RetryingL2Source, len(configs))
for _, rollupCfg := range rollupConfigs {
chainID := rollupCfg.L2ChainID.Uint64()
l2RPC, ok := l2RPCs[chainID]
if !ok {
return nil, fmt.Errorf("%w: %v", ErrNoL2ForRollup, chainID)
}
l2ExperimentalRPC := l2ExperimentalRPCs[chainID] // Allowed to be nil
source, err := common.NewL2SourceFromRPC(logger, rollupCfg, l2RPC, l2ExperimentalRPC)
if err != nil {
return nil, fmt.Errorf("failed to create l2 source for chain ID %v: %w", chainID, err)
}
sources[chainID] = NewRetryingL2Source(logger, source)
}
return &RetryingL2Sources{
Sources: sources,
}, nil
}
func (s *RetryingL2Sources) ForChainID(chainID uint64) (types.L2Source, error) {
source, ok := s.Sources[chainID]
if !ok {
return nil, fmt.Errorf("no source available for chain ID: %v", chainID)
}
return source, nil
}
func (s *RetryingL2Sources) ForChainIDWithoutRetries(chainID uint64) (types.L2Source, error) {
retrying, ok := s.Sources[chainID]
if !ok {
return nil, fmt.Errorf("no source available for chain ID: %v", chainID)
}
return retrying.source, nil
}
func loadChainID(ctx context.Context, rpc client.RPC) (uint64, error) {
var id hexutil.Big
err := rpc.CallContext(ctx, &id, "eth_chainId")
if err != nil {
return 0, err
}
return (*big.Int)(&id).Uint64(), nil
}
package prefetcher
import (
"context"
"fmt"
"math/big"
"testing"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
)
func TestNewL2Sources(t *testing.T) {
t.Run("NoSources", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
_, err := NewRetryingL2Sources(context.Background(), logger, nil, nil, nil)
require.ErrorIs(t, err, ErrNoSources)
})
t.Run("SingleSource", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
config, l2Rpc, experimentalRpc := chain(4)
src, err := NewRetryingL2Sources(context.Background(), logger,
[]*rollup.Config{config},
[]client.RPC{l2Rpc},
[]client.RPC{experimentalRpc})
require.NoError(t, err)
require.Len(t, src.Sources, 1)
require.True(t, src.Sources[uint64(4)].ExperimentalEnabled())
})
t.Run("MultipleSources", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
config1, l2Rpc1, experimentalRpc1 := chain(1)
config2, l2Rpc2, experimentalRpc2 := chain(2)
src, err := NewRetryingL2Sources(context.Background(), logger,
[]*rollup.Config{config1, config2},
[]client.RPC{l2Rpc1, l2Rpc2},
[]client.RPC{experimentalRpc1, experimentalRpc2})
require.NoError(t, err)
require.Len(t, src.Sources, 2)
require.True(t, src.Sources[uint64(1)].ExperimentalEnabled())
require.True(t, src.Sources[uint64(2)].ExperimentalEnabled())
})
t.Run("ExperimentalRPCsAreOptional", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
config1, l2Rpc1, _ := chain(1)
config2, l2Rpc2, experimentalRpc2 := chain(2)
src, err := NewRetryingL2Sources(context.Background(), logger,
[]*rollup.Config{config1, config2},
[]client.RPC{l2Rpc1, l2Rpc2},
[]client.RPC{experimentalRpc2})
require.NoError(t, err)
require.Len(t, src.Sources, 2)
require.Same(t, src.Sources[uint64(1)].RollupConfig(), config1)
require.False(t, src.Sources[uint64(1)].ExperimentalEnabled())
require.Same(t, src.Sources[uint64(2)].RollupConfig(), config2)
require.True(t, src.Sources[uint64(2)].ExperimentalEnabled())
})
t.Run("RollupMissingL2URL", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
config1, _, _ := chain(1)
config2, l2Rpc2, experimentalRpc2 := chain(2)
_, err := NewRetryingL2Sources(context.Background(), logger,
[]*rollup.Config{config1, config2},
[]client.RPC{l2Rpc2},
[]client.RPC{experimentalRpc2})
require.ErrorIs(t, err, ErrNoL2ForRollup)
})
t.Run("L2URLWithoutConfig", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
_, l2Rpc1, _ := chain(1)
config2, l2Rpc2, experimentalRpc2 := chain(2)
_, err := NewRetryingL2Sources(context.Background(), logger,
[]*rollup.Config{config2},
[]client.RPC{l2Rpc1, l2Rpc2},
[]client.RPC{experimentalRpc2})
require.ErrorIs(t, err, ErrNoRollupForL2)
})
t.Run("DuplicateL2URLsForSameChain", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
config1, l2Rpc1, _ := chain(1)
_, l2Rpc2, _ := chain(1)
_, err := NewRetryingL2Sources(context.Background(), logger,
[]*rollup.Config{config1},
[]client.RPC{l2Rpc1, l2Rpc2},
nil)
require.ErrorIs(t, err, ErrDuplicateL2URLs)
})
t.Run("ExperimentalURLWithoutConfig", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
_, _, experimentalRpc1 := chain(1)
config2, l2Rpc2, experimentalRpc2 := chain(2)
_, err := NewRetryingL2Sources(context.Background(), logger,
[]*rollup.Config{config2},
[]client.RPC{l2Rpc2},
[]client.RPC{experimentalRpc1, experimentalRpc2})
require.ErrorIs(t, err, ErrNoRollupForExperimental)
})
t.Run("DuplicateExperimentalURLsForSameChain", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
config1, l2RPC, experimentalRpc1 := chain(1)
_, _, experimentalRpc2 := chain(1)
_, err := NewRetryingL2Sources(context.Background(), logger,
[]*rollup.Config{config1},
[]client.RPC{l2RPC},
[]client.RPC{experimentalRpc1, experimentalRpc2})
require.ErrorIs(t, err, ErrDuplicateExperimentsURLs)
})
}
func chain(id uint64) (*rollup.Config, client.RPC, client.RPC) {
chainID := new(big.Int).SetUint64(id)
return &rollup.Config{L2ChainID: chainID}, &chainIDRPC{id: chainID}, &chainIDRPC{id: chainID}
}
type chainIDRPC struct {
id *big.Int
}
func (c *chainIDRPC) Close() {
panic("implement me")
}
func (c *chainIDRPC) CallContext(ctx context.Context, result any, method string, args ...any) error {
if method != "eth_chainId" {
return fmt.Errorf("invalid method: %s", method)
}
resultOut := result.(*hexutil.Big)
*resultOut = (hexutil.Big)(*c.id)
return nil
}
func (c *chainIDRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
panic("implement me")
}
func (c *chainIDRPC) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error) {
panic("implement me")
}
...@@ -52,7 +52,8 @@ type Prefetcher struct { ...@@ -52,7 +52,8 @@ type Prefetcher struct {
logger log.Logger logger log.Logger
l1Fetcher L1Source l1Fetcher L1Source
l1BlobFetcher L1BlobSource l1BlobFetcher L1BlobSource
l2Fetcher *RetryingL2Source defaultChainID uint64
l2Sources hosttypes.L2Sources
lastHint string lastHint string
kvStore kvstore.KV kvStore kvstore.KV
// l2Head is the L2 block hash to retrieve output root from if interop is disabled // l2Head is the L2 block hash to retrieve output root from if interop is disabled
...@@ -67,7 +68,8 @@ func NewPrefetcher( ...@@ -67,7 +68,8 @@ func NewPrefetcher(
logger log.Logger, logger log.Logger,
l1Fetcher L1Source, l1Fetcher L1Source,
l1BlobFetcher L1BlobSource, l1BlobFetcher L1BlobSource,
l2Fetcher hosttypes.L2Source, defaultChainID uint64,
l2Sources hosttypes.L2Sources,
kvStore kvstore.KV, kvStore kvstore.KV,
executor ProgramExecutor, executor ProgramExecutor,
l2Head common.Hash, l2Head common.Hash,
...@@ -77,7 +79,8 @@ func NewPrefetcher( ...@@ -77,7 +79,8 @@ func NewPrefetcher(
logger: logger, logger: logger,
l1Fetcher: NewRetryingL1Source(logger, l1Fetcher), l1Fetcher: NewRetryingL1Source(logger, l1Fetcher),
l1BlobFetcher: NewRetryingL1BlobSource(logger, l1BlobFetcher), l1BlobFetcher: NewRetryingL1BlobSource(logger, l1BlobFetcher),
l2Fetcher: NewRetryingL2Source(logger, l2Fetcher), defaultChainID: defaultChainID,
l2Sources: l2Sources,
kvStore: kvStore, kvStore: kvStore,
executor: executor, executor: executor,
l2Head: l2Head, l2Head: l2Head,
...@@ -259,7 +262,11 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error { ...@@ -259,7 +262,11 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error {
return fmt.Errorf("invalid L2 header/tx hint: %x", hint) return fmt.Errorf("invalid L2 header/tx hint: %x", hint)
} }
hash := common.Hash(hintBytes) hash := common.Hash(hintBytes)
header, txs, err := p.l2Fetcher.InfoAndTxsByHash(ctx, hash) source, err := p.l2Sources.ForChainID(p.defaultChainID)
if err != nil {
return err
}
header, txs, err := source.InfoAndTxsByHash(ctx, hash)
if err != nil { if err != nil {
return fmt.Errorf("failed to fetch L2 block %s: %w", hash, err) return fmt.Errorf("failed to fetch L2 block %s: %w", hash, err)
} }
...@@ -277,7 +284,11 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error { ...@@ -277,7 +284,11 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error {
return fmt.Errorf("invalid L2 state node hint: %x", hint) return fmt.Errorf("invalid L2 state node hint: %x", hint)
} }
hash := common.Hash(hintBytes) hash := common.Hash(hintBytes)
node, err := p.l2Fetcher.NodeByHash(ctx, hash) source, err := p.l2Sources.ForChainID(p.defaultChainID)
if err != nil {
return err
}
node, err := source.NodeByHash(ctx, hash)
if err != nil { if err != nil {
return fmt.Errorf("failed to fetch L2 state node %s: %w", hash, err) return fmt.Errorf("failed to fetch L2 state node %s: %w", hash, err)
} }
...@@ -287,7 +298,11 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error { ...@@ -287,7 +298,11 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error {
return fmt.Errorf("invalid L2 code hint: %x", hint) return fmt.Errorf("invalid L2 code hint: %x", hint)
} }
hash := common.Hash(hintBytes) hash := common.Hash(hintBytes)
code, err := p.l2Fetcher.CodeByHash(ctx, hash) source, err := p.l2Sources.ForChainID(p.defaultChainID)
if err != nil {
return err
}
code, err := source.CodeByHash(ctx, hash)
if err != nil { if err != nil {
return fmt.Errorf("failed to fetch L2 contract code %s: %w", hash, err) return fmt.Errorf("failed to fetch L2 contract code %s: %w", hash, err)
} }
...@@ -297,7 +312,11 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error { ...@@ -297,7 +312,11 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error {
return fmt.Errorf("invalid L2 output hint: %x", hint) return fmt.Errorf("invalid L2 output hint: %x", hint)
} }
requestedHash := common.Hash(hintBytes) requestedHash := common.Hash(hintBytes)
output, err := p.l2Fetcher.OutputByRoot(ctx, p.l2Head) source, err := p.l2Sources.ForChainID(p.defaultChainID)
if err != nil {
return err
}
output, err := source.OutputByRoot(ctx, p.l2Head)
if err != nil { if err != nil {
return fmt.Errorf("failed to fetch L2 output root for block %s: %w", p.l2Head, err) return fmt.Errorf("failed to fetch L2 output root for block %s: %w", p.l2Head, err)
} }
......
...@@ -9,6 +9,8 @@ import ( ...@@ -9,6 +9,8 @@ import (
"math/rand" "math/rand"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-node/rollup"
hostTypes "github.com/ethereum-optimism/optimism/op-program/host/types"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -516,7 +518,7 @@ func TestFetchL2Code(t *testing.T) { ...@@ -516,7 +518,7 @@ func TestFetchL2Code(t *testing.T) {
} }
func TestFetchL2BlockData(t *testing.T) { func TestFetchL2BlockData(t *testing.T) {
chainID := uint64(0xdead) chainID := uint64(14)
testBlockExec := func(t *testing.T, err error) { testBlockExec := func(t *testing.T, err error) {
prefetcher, _, _, l2Client, _ := createPrefetcher(t) prefetcher, _, _, l2Client, _ := createPrefetcher(t)
...@@ -646,7 +648,8 @@ func TestRetryWhenNotAvailableAfterPrefetching(t *testing.T) { ...@@ -646,7 +648,8 @@ func TestRetryWhenNotAvailableAfterPrefetching(t *testing.T) {
_, l1Source, l1BlobSource, l2Cl, kv := createPrefetcher(t) _, l1Source, l1BlobSource, l2Cl, kv := createPrefetcher(t)
putsToIgnore := 2 putsToIgnore := 2
kv = &unreliableKvStore{KV: kv, putsToIgnore: putsToIgnore} kv = &unreliableKvStore{KV: kv, putsToIgnore: putsToIgnore}
prefetcher := NewPrefetcher(testlog.Logger(t, log.LevelInfo), l1Source, l1BlobSource, l2Cl, kv, nil, common.Hash{}, nil) sources := &l2Clients{sources: map[uint64]hostTypes.L2Source{6: l2Cl}}
prefetcher := NewPrefetcher(testlog.Logger(t, log.LevelInfo), l1Source, l1BlobSource, 6, sources, kv, nil, common.Hash{}, nil)
// Expect one call for each ignored put, plus one more request for when the put succeeds // Expect one call for each ignored put, plus one more request for when the put succeeds
for i := 0; i < putsToIgnore+1; i++ { for i := 0; i < putsToIgnore+1; i++ {
...@@ -673,11 +676,35 @@ func (s *unreliableKvStore) Put(k common.Hash, v []byte) error { ...@@ -673,11 +676,35 @@ func (s *unreliableKvStore) Put(k common.Hash, v []byte) error {
return s.KV.Put(k, v) return s.KV.Put(k, v)
} }
type l2Clients struct {
sources map[uint64]hostTypes.L2Source
}
func (l *l2Clients) ForChainID(id uint64) (hostTypes.L2Source, error) {
source, ok := l.sources[id]
if !ok {
return nil, fmt.Errorf("no such source for chain %d", id)
}
return source, nil
}
func (l *l2Clients) ForChainIDWithoutRetries(id uint64) (hostTypes.L2Source, error) {
return l.ForChainID(id)
}
type l2Client struct { type l2Client struct {
*testutils.MockL2Client *testutils.MockL2Client
*testutils.MockDebugClient *testutils.MockDebugClient
} }
func (m *l2Client) RollupConfig() *rollup.Config {
panic("implement me")
}
func (m *l2Client) ExperimentalEnabled() bool {
panic("implement me")
}
func (m *l2Client) OutputByRoot(ctx context.Context, blockHash common.Hash) (eth.Output, error) { func (m *l2Client) OutputByRoot(ctx context.Context, blockHash common.Hash) (eth.Output, error) {
out := m.Mock.MethodCalled("OutputByRoot", blockHash) out := m.Mock.MethodCalled("OutputByRoot", blockHash)
return out[0].(eth.Output), *out[1].(*error) return out[0].(eth.Output), *out[1].(*error)
...@@ -700,8 +727,11 @@ func createPrefetcherWithAgreedPrestate(t *testing.T, agreedPrestate []byte) (*P ...@@ -700,8 +727,11 @@ func createPrefetcherWithAgreedPrestate(t *testing.T, agreedPrestate []byte) (*P
MockL2Client: new(testutils.MockL2Client), MockL2Client: new(testutils.MockL2Client),
MockDebugClient: new(testutils.MockDebugClient), MockDebugClient: new(testutils.MockDebugClient),
} }
l2Sources := &l2Clients{
sources: map[uint64]hostTypes.L2Source{14: l2Source},
}
prefetcher := NewPrefetcher(logger, l1Source, l1BlobSource, l2Source, kv, nil, common.Hash{0xdd}, agreedPrestate) prefetcher := NewPrefetcher(logger, l1Source, l1BlobSource, 14, l2Sources, kv, nil, common.Hash{0xdd}, agreedPrestate)
return prefetcher, l1Source, l1BlobSource, l2Source, kv return prefetcher, l1Source, l1BlobSource, l2Source, kv
} }
......
...@@ -20,7 +20,12 @@ type ProgramExecutor interface { ...@@ -20,7 +20,12 @@ type ProgramExecutor interface {
func (p *Prefetcher) nativeReExecuteBlock( func (p *Prefetcher) nativeReExecuteBlock(
ctx context.Context, agreedBlockHash, blockHash common.Hash, chainID uint64) error { ctx context.Context, agreedBlockHash, blockHash common.Hash, chainID uint64) error {
// Avoid retries as the block may not be canonical and unavailable // Avoid retries as the block may not be canonical and unavailable
_, _, err := p.l2Fetcher.source.InfoAndTxsByHash(ctx, blockHash)
source, err := p.l2Sources.ForChainIDWithoutRetries(chainID)
if err != nil {
return err
}
_, _, err = source.InfoAndTxsByHash(ctx, blockHash)
if err == nil { if err == nil {
// we already have the data needed for the program to re-execute // we already have the data needed for the program to re-execute
return nil return nil
...@@ -29,7 +34,11 @@ func (p *Prefetcher) nativeReExecuteBlock( ...@@ -29,7 +34,11 @@ func (p *Prefetcher) nativeReExecuteBlock(
p.logger.Error("Failed to fetch block", "block_hash", blockHash, "err", err) p.logger.Error("Failed to fetch block", "block_hash", blockHash, "err", err)
} }
header, _, err := p.l2Fetcher.InfoAndTxsByHash(ctx, agreedBlockHash) retrying, err := p.l2Sources.ForChainID(chainID)
if err != nil {
return err
}
header, _, err := retrying.InfoAndTxsByHash(ctx, agreedBlockHash)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"math" "math"
"github.com/ethereum-optimism/optimism/op-node/rollup"
hosttypes "github.com/ethereum-optimism/optimism/op-program/host/types" hosttypes "github.com/ethereum-optimism/optimism/op-program/host/types"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-service/retry"
...@@ -102,6 +103,14 @@ type RetryingL2Source struct { ...@@ -102,6 +103,14 @@ type RetryingL2Source struct {
strategy retry.Strategy strategy retry.Strategy
} }
func (s *RetryingL2Source) RollupConfig() *rollup.Config {
return s.source.RollupConfig()
}
func (s *RetryingL2Source) ExperimentalEnabled() bool {
return s.source.ExperimentalEnabled()
}
func (s *RetryingL2Source) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) { func (s *RetryingL2Source) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) {
return retry.Do2(ctx, maxAttempts, s.strategy, func() (eth.BlockInfo, types.Transactions, error) { return retry.Do2(ctx, maxAttempts, s.strategy, func() (eth.BlockInfo, types.Transactions, error) {
i, t, err := s.source.InfoAndTxsByHash(ctx, blockHash) i, t, err := s.source.InfoAndTxsByHash(ctx, blockHash)
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"errors" "errors"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -339,6 +340,16 @@ type MockL2Source struct { ...@@ -339,6 +340,16 @@ type MockL2Source struct {
mock.Mock mock.Mock
} }
func (m *MockL2Source) ExperimentalEnabled() bool {
out := m.Mock.MethodCalled("ExperimentalEnabled")
return out[0].(bool)
}
func (m *MockL2Source) RollupConfig() *rollup.Config {
out := m.Mock.MethodCalled("RollupConfig")
return out[0].(*rollup.Config)
}
func (m *MockL2Source) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) { func (m *MockL2Source) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) {
out := m.Mock.MethodCalled("InfoAndTxsByHash", blockHash) out := m.Mock.MethodCalled("InfoAndTxsByHash", blockHash)
return out[0].(eth.BlockInfo), out[1].(types.Transactions), *out[2].(*error) return out[0].(eth.BlockInfo), out[1].(types.Transactions), *out[2].(*error)
......
...@@ -3,6 +3,7 @@ package types ...@@ -3,6 +3,7 @@ package types
import ( import (
"context" "context"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -23,4 +24,11 @@ type L2Source interface { ...@@ -23,4 +24,11 @@ type L2Source interface {
NodeByHash(ctx context.Context, hash common.Hash) ([]byte, error) NodeByHash(ctx context.Context, hash common.Hash) ([]byte, error)
CodeByHash(ctx context.Context, hash common.Hash) ([]byte, error) CodeByHash(ctx context.Context, hash common.Hash) ([]byte, error)
OutputByRoot(ctx context.Context, blockRoot common.Hash) (eth.Output, error) OutputByRoot(ctx context.Context, blockRoot common.Hash) (eth.Output, error)
RollupConfig() *rollup.Config
ExperimentalEnabled() bool
}
type L2Sources interface {
ForChainID(chainID uint64) (L2Source, error)
ForChainIDWithoutRetries(chainID uint64) (L2Source, error)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment