Commit 2dddfdb9 authored by Inphi's avatar Inphi Committed by GitHub

op-program: Add hint route for block execution (#13631)

* op-program: Implement deposit-only block receipt hint

* op-program: Reflective program for block execution

* op-program: Hook new hint route to the L2 oracle

* op-program: Fix prefetch hint for L2BlockData

* op-program: Abstract program execution in prefetcher

* op-program: Add chainID to l2-block-data hint

* Derive block succeeding the agreed block

* fix validation on client.Main

* refactor L2Source interface

* uint8 chainID; fix TODOs

* add test for block exec cache hit

* remove useless comment
parent 0be1eb3e
......@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/fakebeacon"
"github.com/ethereum-optimism/optimism/op-program/host"
hostcommon "github.com/ethereum-optimism/optimism/op-program/host/common"
"github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore"
"github.com/ethereum-optimism/optimism/op-program/host/prefetcher"
......@@ -205,7 +206,7 @@ func (env *L2FaultProofEnv) RunFaultProofProgram(t helpers.Testing, l2ClaimBlock
env,
fixtureInputs,
)
withInProcessPrefetcher := host.WithPrefetcher(func(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *config.Config) (host.Prefetcher, error) {
withInProcessPrefetcher := hostcommon.WithPrefetcher(func(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *config.Config) (hostcommon.Prefetcher, error) {
// Set up in-process L1 sources
l1Cl := env.Miner.L1Client(t, env.Sd.RollupCfg)
l1BlobFetcher := env.Miner.BlobSource()
......@@ -213,13 +214,14 @@ func (env *L2FaultProofEnv) RunFaultProofProgram(t helpers.Testing, l2ClaimBlock
// Set up in-process L2 source
l2ClCfg := sources.L2ClientDefaultConfig(env.Sd.RollupCfg, true)
l2RPC := env.Engine.RPCClient()
l2Client, err := host.NewL2Client(l2RPC, env.log, nil, &host.L2ClientConfig{L2ClientConfig: l2ClCfg, L2Head: cfg.L2Head})
l2Client, err := hostcommon.NewL2Client(l2RPC, env.log, nil, &hostcommon.L2ClientConfig{L2ClientConfig: l2ClCfg, L2Head: cfg.L2Head})
require.NoError(t, err, "failed to create L2 client")
l2DebugCl := host.NewL2SourceWithClient(logger, l2Client, sources.NewDebugClient(l2RPC.CallContext))
l2DebugCl := hostcommon.NewL2SourceWithClient(logger, l2Client, sources.NewDebugClient(l2RPC.CallContext))
return prefetcher.NewPrefetcher(logger, l1Cl, l1BlobFetcher, l2DebugCl, kv), nil
executor := host.MakeProgramExecutor(env.log, programCfg)
return prefetcher.NewPrefetcher(logger, l1Cl, l1BlobFetcher, l2DebugCl, kv, env.Sd.L2Cfg.Config, executor), nil
})
err = host.FaultProofProgram(t.Ctx(), env.log, programCfg, withInProcessPrefetcher)
err = hostcommon.FaultProofProgram(t.Ctx(), env.log, programCfg, withInProcessPrefetcher)
checkResult(t, err)
}
tryDumpTestFixture(t, err, t.Name(), env, *fixtureInputs, workDir)
......
......@@ -298,7 +298,7 @@ func testFaultProofProgramScenario(t *testing.T, ctx context.Context, sys *e2esy
// Check the FPP confirms the expected output
t.Log("Running fault proof in fetching mode")
log := testlog.Logger(t, log.LevelInfo)
err := opp.FaultProofProgram(ctx, log, fppConfig)
err := opp.FaultProofProgramWithDefaultPrefecher(ctx, log, fppConfig)
require.NoError(t, err)
t.Log("Shutting down network")
......@@ -315,13 +315,13 @@ func testFaultProofProgramScenario(t *testing.T, ctx context.Context, sys *e2esy
// Should be able to rerun in offline mode using the pre-fetched images
fppConfig.L1URL = ""
fppConfig.L2URL = ""
err = opp.FaultProofProgram(ctx, log, fppConfig)
err = opp.FaultProofProgramWithDefaultPrefecher(ctx, log, fppConfig)
require.NoError(t, err)
// Check that a fault is detected if we provide an incorrect claim
t.Log("Running fault proof with invalid claim")
fppConfig.L2Claim = common.Hash{0xaa}
err = opp.FaultProofProgram(ctx, log, fppConfig)
err = opp.FaultProofProgramWithDefaultPrefecher(ctx, log, fppConfig)
if s.Detached {
require.Error(t, err, "exit status 1")
} else {
......
......@@ -74,3 +74,10 @@ func (o *CachingOracle) OutputByRoot(root common.Hash) eth.Output {
o.outputs.Add(root, output)
return output
}
func (o *CachingOracle) BlockDataByHash(agreedBlockHash, blockHash common.Hash, chainID uint64) *types.Block {
// Always request from the oracle even on cache hit. as we want the effects of the host oracle hinting
block := o.oracle.BlockDataByHash(agreedBlockHash, blockHash, chainID)
o.blocks.Add(blockHash, block)
return block
}
package l2
import (
"encoding/binary"
"fmt"
"github.com/ethereum/go-ethereum/common"
preimage "github.com/ethereum-optimism/optimism/op-preimage"
......@@ -12,6 +15,7 @@ const (
HintL2Code = "l2-code"
HintL2StateNode = "l2-state-node"
HintL2Output = "l2-output"
HintL2BlockData = "l2-block-data"
)
type BlockHeaderHint common.Hash
......@@ -53,3 +57,19 @@ var _ preimage.Hint = L2OutputHint{}
func (l L2OutputHint) Hint() string {
return HintL2Output + " " + (common.Hash)(l).String()
}
type L2BlockDataHint struct {
AgreedBlockHash common.Hash
BlockHash common.Hash
ChainID uint64
}
var _ preimage.Hint = L2BlockDataHint{}
func (l L2BlockDataHint) Hint() string {
hintBytes := make([]byte, 32+32+8)
copy(hintBytes[:32], (common.Hash)(l.AgreedBlockHash).Bytes())
copy(hintBytes[32:64], (common.Hash)(l.BlockHash).Bytes())
binary.BigEndian.PutUint64(hintBytes[64:], l.ChainID)
return fmt.Sprintf("%s 0x%s", HintL2BlockData, common.Bytes2Hex(hintBytes))
}
......@@ -34,6 +34,9 @@ type Oracle interface {
BlockByHash(blockHash common.Hash) *types.Block
OutputByRoot(root common.Hash) eth.Output
// BlockDataByHash retrieves the block, including all data used to construct it.
BlockDataByHash(agreedBlockHash, blockHash common.Hash, chainID uint64) *types.Block
}
// PreimageOracle implements Oracle using by interfacing with the pure preimage.Oracle
......@@ -102,3 +105,15 @@ func (p *PreimageOracle) OutputByRoot(l2OutputRoot common.Hash) eth.Output {
}
return output
}
func (p *PreimageOracle) BlockDataByHash(agreedBlockHash, blockHash common.Hash, chainID uint64) *types.Block {
hint := L2BlockDataHint{
AgreedBlockHash: agreedBlockHash,
BlockHash: blockHash,
ChainID: chainID,
}
p.hint.Hint(hint)
header := p.headerByBlockHash(blockHash)
txs := p.LoadTransactions(blockHash, header.TxHash)
return types.NewBlockWithHeader(header).WithBody(types.Body{Transactions: txs})
}
......@@ -69,6 +69,14 @@ func (o StubBlockOracle) OutputByRoot(root common.Hash) eth.Output {
return output
}
func (o StubBlockOracle) BlockDataByHash(agreedBlockHash, blockHash common.Hash, chainID uint64) *types.Block {
block, ok := o.Blocks[blockHash]
if !ok {
o.t.Fatalf("requested unknown block %s", blockHash)
}
return block
}
// KvStateOracle loads data from a source ethdb.KeyValueStore
type KvStateOracle struct {
t *testing.T
......
......@@ -14,13 +14,20 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type RunProgramFlag bool
const (
RunProgramFlagSkipValidation RunProgramFlag = false
RunProgramFlagValidate RunProgramFlag = true
)
// Main executes the client program in a detached context and exits the current process.
// The client runtime environment must be preset before calling this function.
func Main(logger log.Logger) {
log.Info("Starting fault proof program client")
preimageOracle := preimage.ClientPreimageChannel()
preimageHinter := preimage.ClientHinterChannel()
if err := RunProgram(logger, preimageOracle, preimageHinter); errors.Is(err, claim.ErrClaimNotValid) {
if err := RunProgram(logger, preimageOracle, preimageHinter, RunProgramFlagValidate); errors.Is(err, claim.ErrClaimNotValid) {
log.Error("Claim is invalid", "err", err)
os.Exit(1)
} else if err != nil {
......@@ -33,7 +40,7 @@ func Main(logger log.Logger) {
}
// RunProgram executes the Program, while attached to an IO based pre-image oracle, to be served by a host.
func RunProgram(logger log.Logger, preimageOracle io.ReadWriter, preimageHinter io.ReadWriter) error {
func RunProgram(logger log.Logger, preimageOracle io.ReadWriter, preimageHinter io.ReadWriter, flags RunProgramFlag) error {
pClient := preimage.NewOracleClient(preimageOracle)
hClient := preimage.NewHintWriter(preimageHinter)
l1PreimageOracle := l1.NewCachingOracle(l1.NewPreimageOracle(pClient, hClient))
......@@ -54,5 +61,8 @@ func RunProgram(logger log.Logger, preimageOracle io.ReadWriter, preimageHinter
if err != nil {
return err
}
if flags == RunProgramFlagValidate {
return claim.ValidateClaim(logger, safeHead, eth.Bytes32(bootInfo.L2Claim), outputRoot)
}
return nil
}
package common
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
"os/exec"
preimage "github.com/ethereum-optimism/optimism/op-preimage"
cl "github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
type Prefetcher interface {
Hint(hint string) error
GetPreimage(ctx context.Context, key common.Hash) ([]byte, error)
}
type PrefetcherCreator func(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *config.Config) (Prefetcher, error)
type programCfg struct {
prefetcher PrefetcherCreator
skipValidation bool
}
type ProgramOpt func(c *programCfg)
func WithPrefetcher(creator PrefetcherCreator) ProgramOpt {
return func(c *programCfg) {
c.prefetcher = creator
}
}
func WithSkipValidation(skip bool) ProgramOpt {
return func(c *programCfg) {
c.skipValidation = skip
}
}
// FaultProofProgram is the programmatic entry-point for the fault proof program
func FaultProofProgram(ctx context.Context, logger log.Logger, cfg *config.Config, opts ...ProgramOpt) error {
programConfig := &programCfg{}
for _, opt := range opts {
opt(programConfig)
}
if programConfig.prefetcher == nil {
panic("prefetcher creator is not set")
}
var (
serverErr chan error
pClientRW preimage.FileChannel
hClientRW preimage.FileChannel
)
defer func() {
if pClientRW != nil {
_ = pClientRW.Close()
}
if hClientRW != nil {
_ = hClientRW.Close()
}
if serverErr != nil {
err := <-serverErr
if err != nil {
logger.Error("preimage server failed", "err", err)
}
logger.Debug("Preimage server stopped")
}
}()
// Setup client I/O for preimage oracle interaction
pClientRW, pHostRW, err := preimage.CreateBidirectionalChannel()
if err != nil {
return fmt.Errorf("failed to create preimage pipe: %w", err)
}
// Setup client I/O for hint comms
hClientRW, hHostRW, err := preimage.CreateBidirectionalChannel()
if err != nil {
return fmt.Errorf("failed to create hints pipe: %w", err)
}
// Use a channel to receive the server result so we can wait for it to complete before returning
serverErr = make(chan error)
go func() {
defer close(serverErr)
serverErr <- PreimageServer(ctx, logger, cfg, pHostRW, hHostRW, programConfig.prefetcher)
}()
var cmd *exec.Cmd
if cfg.ExecCmd != "" {
cmd = exec.CommandContext(ctx, cfg.ExecCmd)
cmd.ExtraFiles = make([]*os.File, cl.MaxFd-3) // not including stdin, stdout and stderr
cmd.ExtraFiles[cl.HClientRFd-3] = hClientRW.Reader()
cmd.ExtraFiles[cl.HClientWFd-3] = hClientRW.Writer()
cmd.ExtraFiles[cl.PClientRFd-3] = pClientRW.Reader()
cmd.ExtraFiles[cl.PClientWFd-3] = pClientRW.Writer()
cmd.Stdout = os.Stdout // for debugging
cmd.Stderr = os.Stderr // for debugging
err := cmd.Start()
if err != nil {
return fmt.Errorf("program cmd failed to start: %w", err)
}
if err := cmd.Wait(); err != nil {
return fmt.Errorf("failed to wait for child program: %w", err)
}
logger.Debug("Client program completed successfully")
return nil
} else {
runFlag := cl.RunProgramFlagValidate
if programConfig.skipValidation {
runFlag = cl.RunProgramFlagSkipValidation
}
return cl.RunProgram(logger, pClientRW, hClientRW, runFlag)
}
}
// PreimageServer reads hints and preimage requests from the provided channels and processes those requests.
// This method will block until both the hinter and preimage handlers complete.
// If either returns an error both handlers are stopped.
// The supplied preimageChannel and hintChannel will be closed before this function returns.
func PreimageServer(ctx context.Context, logger log.Logger, cfg *config.Config, preimageChannel preimage.FileChannel, hintChannel preimage.FileChannel, prefetcherCreator PrefetcherCreator) error {
var serverDone chan error
var hinterDone chan error
logger.Info("Starting preimage server")
var kv kvstore.KV
// Close the preimage/hint channels, and then kv store once the server and hinter have exited.
defer func() {
preimageChannel.Close()
hintChannel.Close()
if serverDone != nil {
// Wait for pre-image server to complete
<-serverDone
}
if hinterDone != nil {
// Wait for hinter to complete
<-hinterDone
}
if kv != nil {
kv.Close()
}
}()
if cfg.DataDir == "" {
logger.Info("Using in-memory storage")
kv = kvstore.NewMemKV()
} else {
if err := os.MkdirAll(cfg.DataDir, 0755); err != nil {
return fmt.Errorf("creating datadir: %w", err)
}
store, err := kvstore.NewDiskKV(logger, cfg.DataDir, cfg.DataFormat)
if err != nil {
return fmt.Errorf("creating kvstore: %w", err)
}
kv = store
}
var (
getPreimage kvstore.PreimageSource
hinter preimage.HintHandler
)
prefetch, err := prefetcherCreator(ctx, logger, kv, cfg)
if err != nil {
return fmt.Errorf("failed to create prefetcher: %w", err)
}
if prefetch != nil {
getPreimage = func(key common.Hash) ([]byte, error) { return prefetch.GetPreimage(ctx, key) }
hinter = prefetch.Hint
} else {
logger.Info("Using offline mode. All required pre-images must be pre-populated.")
getPreimage = kv.Get
hinter = func(hint string) error {
logger.Debug("ignoring prefetch hint", "hint", hint)
return nil
}
}
localPreimageSource := kvstore.NewLocalPreimageSource(cfg)
splitter := kvstore.NewPreimageSourceSplitter(localPreimageSource.Get, getPreimage)
preimageGetter := preimage.WithVerification(splitter.Get)
serverDone = launchOracleServer(logger, preimageChannel, preimageGetter)
hinterDone = routeHints(logger, hintChannel, hinter)
select {
case err := <-serverDone:
return err
case err := <-hinterDone:
return err
case <-ctx.Done():
logger.Info("Shutting down")
if errors.Is(ctx.Err(), context.Canceled) {
// We were asked to shutdown by the context being cancelled so don't treat it as an error condition.
return nil
}
return ctx.Err()
}
}
func routeHints(logger log.Logger, hHostRW io.ReadWriter, hinter preimage.HintHandler) chan error {
chErr := make(chan error)
hintReader := preimage.NewHintReader(hHostRW)
go func() {
defer close(chErr)
for {
if err := hintReader.NextHint(hinter); err != nil {
if err == io.EOF || errors.Is(err, fs.ErrClosed) {
logger.Debug("closing pre-image hint handler")
return
}
logger.Error("pre-image hint router error", "err", err)
chErr <- err
return
}
}
}()
return chErr
}
func launchOracleServer(logger log.Logger, pHostRW io.ReadWriteCloser, getter preimage.PreimageGetter) chan error {
chErr := make(chan error)
server := preimage.NewOracleServer(pHostRW)
go func() {
defer close(chErr)
for {
if err := server.NextPreimageRequest(getter); err != nil {
if err == io.EOF || errors.Is(err, fs.ErrClosed) {
logger.Debug("closing pre-image server")
return
}
logger.Error("pre-image server error", "error", err)
chErr <- err
return
}
}
}()
return chErr
}
package host
package common
import (
"context"
"errors"
"time"
"github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/prefetcher"
hosttypes "github.com/ethereum-optimism/optimism/op-program/host/types"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
......@@ -14,6 +15,11 @@ import (
"github.com/ethereum/go-ethereum/log"
)
var (
ErrExperimentalPrefetchFailed = errors.New("experimental prefetch failed")
ErrExperimentalPrefetchDisabled = errors.New("experimental prefetch disabled")
)
// L2Source is a source of L2 data, it abstracts away the details of how to fetch L2 data between canonical and experimental sources.
// It also tracks metrics for each of the apis. Once experimental sources are stable, this will only route to the "experimental" source.
type L2Source struct {
......@@ -28,7 +34,7 @@ type L2Source struct {
experimentalClient *L2Client
}
var _ prefetcher.L2Source = &L2Source{}
var _ hosttypes.L2Source = &L2Source{}
// NewL2SourceWithClient creates a new L2 source with the given client as the canonical client.
// This doesn't configure the experimental source, but is useful for testing.
......@@ -125,14 +131,14 @@ func (l *L2Source) OutputByRoot(ctx context.Context, root common.Hash) (eth.Outp
func (l *L2Source) ExecutionWitness(ctx context.Context, blockNum uint64) (*eth.ExecutionWitness, error) {
if !l.ExperimentalEnabled() {
l.logger.Error("Experimental source is not enabled, cannot fetch execution witness", "blockNum", blockNum)
return nil, prefetcher.ErrExperimentalPrefetchDisabled
return nil, ErrExperimentalPrefetchDisabled
}
// log errors, but return standard error so we know to retry with legacy source
witness, err := l.experimentalClient.ExecutionWitness(ctx, blockNum)
if err != nil {
l.logger.Error("Failed to fetch execution witness from experimental source", "blockNum", blockNum, "err", err)
return nil, prefetcher.ErrExperimentalPrefetchFailed
return nil, ErrExperimentalPrefetchFailed
}
return witness, nil
}
......@@ -145,7 +151,7 @@ func (l *L2Source) GetProof(ctx context.Context, address common.Address, storage
proof, err := l.canonicalEthClient.GetProof(ctx, address, storage, blockTag)
if err != nil {
l.logger.Error("Failed to fetch proof from canonical source", "address", address, "storage", storage, "blockTag", blockTag, "err", err)
return nil, prefetcher.ErrExperimentalPrefetchFailed
return nil, ErrExperimentalPrefetchFailed
}
return proof, nil
}
......@@ -2,16 +2,11 @@ package host
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
"os/exec"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
preimage "github.com/ethereum-optimism/optimism/op-preimage"
cl "github.com/ethereum-optimism/optimism/op-program/client"
hostcommon "github.com/ethereum-optimism/optimism/op-program/host/common"
"github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/flags"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore"
......@@ -55,10 +50,10 @@ func Main(logger log.Logger, cfg *config.Config) error {
if cfg.ServerMode {
preimageChan := preimage.ClientPreimageChannel()
hinterChan := preimage.ClientHinterChannel()
return PreimageServer(ctx, logger, cfg, preimageChan, hinterChan, makeDefaultPrefetcher)
return hostcommon.PreimageServer(ctx, logger, cfg, preimageChan, hinterChan, makeDefaultPrefetcher)
}
if err := FaultProofProgram(ctx, logger, cfg); err != nil {
if err := FaultProofProgramWithDefaultPrefecher(ctx, logger, cfg); err != nil {
return err
}
log.Info("Claim successfully verified")
......@@ -66,161 +61,14 @@ func Main(logger log.Logger, cfg *config.Config) error {
}
// FaultProofProgram is the programmatic entry-point for the fault proof program
func FaultProofProgram(ctx context.Context, logger log.Logger, cfg *config.Config, opts ...ProgramOpt) error {
creators := &creatorsCfg{
prefetcher: makeDefaultPrefetcher,
}
for _, opt := range opts {
opt(creators)
}
var (
serverErr chan error
pClientRW preimage.FileChannel
hClientRW preimage.FileChannel
)
defer func() {
if pClientRW != nil {
_ = pClientRW.Close()
}
if hClientRW != nil {
_ = hClientRW.Close()
}
if serverErr != nil {
err := <-serverErr
if err != nil {
logger.Error("preimage server failed", "err", err)
}
logger.Debug("Preimage server stopped")
}
}()
// Setup client I/O for preimage oracle interaction
pClientRW, pHostRW, err := preimage.CreateBidirectionalChannel()
if err != nil {
return fmt.Errorf("failed to create preimage pipe: %w", err)
}
// Setup client I/O for hint comms
hClientRW, hHostRW, err := preimage.CreateBidirectionalChannel()
if err != nil {
return fmt.Errorf("failed to create hints pipe: %w", err)
}
// Use a channel to receive the server result so we can wait for it to complete before returning
serverErr = make(chan error)
go func() {
defer close(serverErr)
serverErr <- PreimageServer(ctx, logger, cfg, pHostRW, hHostRW, creators.prefetcher)
}()
var cmd *exec.Cmd
if cfg.ExecCmd != "" {
cmd = exec.CommandContext(ctx, cfg.ExecCmd)
cmd.ExtraFiles = make([]*os.File, cl.MaxFd-3) // not including stdin, stdout and stderr
cmd.ExtraFiles[cl.HClientRFd-3] = hClientRW.Reader()
cmd.ExtraFiles[cl.HClientWFd-3] = hClientRW.Writer()
cmd.ExtraFiles[cl.PClientRFd-3] = pClientRW.Reader()
cmd.ExtraFiles[cl.PClientWFd-3] = pClientRW.Writer()
cmd.Stdout = os.Stdout // for debugging
cmd.Stderr = os.Stderr // for debugging
err := cmd.Start()
if err != nil {
return fmt.Errorf("program cmd failed to start: %w", err)
}
if err := cmd.Wait(); err != nil {
return fmt.Errorf("failed to wait for child program: %w", err)
}
logger.Debug("Client program completed successfully")
return nil
} else {
return cl.RunProgram(logger, pClientRW, hClientRW)
}
}
// PreimageServer reads hints and preimage requests from the provided channels and processes those requests.
// This method will block until both the hinter and preimage handlers complete.
// If either returns an error both handlers are stopped.
// The supplied preimageChannel and hintChannel will be closed before this function returns.
func PreimageServer(ctx context.Context, logger log.Logger, cfg *config.Config, preimageChannel preimage.FileChannel, hintChannel preimage.FileChannel, prefetcherCreator PrefetcherCreator) error {
var serverDone chan error
var hinterDone chan error
logger.Info("Starting preimage server")
var kv kvstore.KV
// Close the preimage/hint channels, and then kv store once the server and hinter have exited.
defer func() {
preimageChannel.Close()
hintChannel.Close()
if serverDone != nil {
// Wait for pre-image server to complete
<-serverDone
}
if hinterDone != nil {
// Wait for hinter to complete
<-hinterDone
}
if kv != nil {
kv.Close()
}
}()
if cfg.DataDir == "" {
logger.Info("Using in-memory storage")
kv = kvstore.NewMemKV()
} else {
if err := os.MkdirAll(cfg.DataDir, 0755); err != nil {
return fmt.Errorf("creating datadir: %w", err)
}
store, err := kvstore.NewDiskKV(logger, cfg.DataDir, cfg.DataFormat)
if err != nil {
return fmt.Errorf("creating kvstore: %w", err)
}
kv = store
}
var (
getPreimage kvstore.PreimageSource
hinter preimage.HintHandler
)
prefetch, err := prefetcherCreator(ctx, logger, kv, cfg)
if err != nil {
return fmt.Errorf("failed to create prefetcher: %w", err)
}
if prefetch != nil {
getPreimage = func(key common.Hash) ([]byte, error) { return prefetch.GetPreimage(ctx, key) }
hinter = prefetch.Hint
} else {
logger.Info("Using offline mode. All required pre-images must be pre-populated.")
getPreimage = kv.Get
hinter = func(hint string) error {
logger.Debug("ignoring prefetch hint", "hint", hint)
return nil
}
}
localPreimageSource := kvstore.NewLocalPreimageSource(cfg)
splitter := kvstore.NewPreimageSourceSplitter(localPreimageSource.Get, getPreimage)
preimageGetter := preimage.WithVerification(splitter.Get)
serverDone = launchOracleServer(logger, preimageChannel, preimageGetter)
hinterDone = routeHints(logger, hintChannel, hinter)
select {
case err := <-serverDone:
return err
case err := <-hinterDone:
return err
case <-ctx.Done():
logger.Info("Shutting down")
if errors.Is(ctx.Err(), context.Canceled) {
// We were asked to shutdown by the context being cancelled so don't treat it as an error condition.
return nil
}
return ctx.Err()
}
func FaultProofProgramWithDefaultPrefecher(ctx context.Context, logger log.Logger, cfg *config.Config, opts ...hostcommon.ProgramOpt) error {
var newopts []hostcommon.ProgramOpt
newopts = append(newopts, hostcommon.WithPrefetcher(makeDefaultPrefetcher))
newopts = append(newopts, opts...)
return hostcommon.FaultProofProgram(ctx, logger, cfg, newopts...)
}
func makeDefaultPrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *config.Config) (Prefetcher, error) {
func makeDefaultPrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *config.Config) (hostcommon.Prefetcher, error) {
if !cfg.FetchingEnabled() {
return nil, nil
}
......@@ -241,50 +89,41 @@ func makeDefaultPrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV
l1BlobFetcher := sources.NewL1BeaconClient(l1Beacon, sources.L1BeaconClientConfig{FetchAllSidecars: false})
logger.Info("Initializing L2 clients")
l2Client, err := NewL2Source(ctx, logger, cfg)
l2Client, err := hostcommon.NewL2Source(ctx, logger, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create L2 source: %w", err)
}
return prefetcher.NewPrefetcher(logger, l1Cl, l1BlobFetcher, l2Client, kv), nil
executor := MakeProgramExecutor(logger, cfg)
return prefetcher.NewPrefetcher(logger, l1Cl, l1BlobFetcher, l2Client, kv, cfg.L2ChainConfig, executor), nil
}
func routeHints(logger log.Logger, hHostRW io.ReadWriter, hinter preimage.HintHandler) chan error {
chErr := make(chan error)
hintReader := preimage.NewHintReader(hHostRW)
go func() {
defer close(chErr)
for {
if err := hintReader.NextHint(hinter); err != nil {
if err == io.EOF || errors.Is(err, fs.ErrClosed) {
logger.Debug("closing pre-image hint handler")
return
}
logger.Error("pre-image hint router error", "err", err)
chErr <- err
return
}
}
}()
return chErr
type programExecutor struct {
logger log.Logger
cfg *config.Config
}
func launchOracleServer(logger log.Logger, pHostRW io.ReadWriteCloser, getter preimage.PreimageGetter) chan error {
chErr := make(chan error)
server := preimage.NewOracleServer(pHostRW)
go func() {
defer close(chErr)
for {
if err := server.NextPreimageRequest(getter); err != nil {
if err == io.EOF || errors.Is(err, fs.ErrClosed) {
logger.Debug("closing pre-image server")
return
}
logger.Error("pre-image server error", "error", err)
chErr <- err
return
}
func (p *programExecutor) RunProgram(
ctx context.Context,
prefetcher hostcommon.Prefetcher,
blockNum uint64,
chainID uint64,
) error {
newCfg := *p.cfg
newCfg.L2ChainID = chainID
newCfg.L2ClaimBlockNumber = blockNum
withPrefetcher := hostcommon.WithPrefetcher(
func(context.Context, log.Logger, kvstore.KV, *config.Config) (hostcommon.Prefetcher, error) {
// TODO(#13663): prevent recursive block execution
return prefetcher, nil
})
return hostcommon.FaultProofProgram(ctx, p.logger, &newCfg, withPrefetcher, hostcommon.WithSkipValidation(true))
}
func MakeProgramExecutor(logger log.Logger, cfg *config.Config) prefetcher.ProgramExecutor {
return &programExecutor{
logger: logger,
cfg: cfg,
}
}()
return chErr
}
......@@ -11,6 +11,7 @@ import (
"github.com/ethereum-optimism/optimism/op-program/chainconfig"
"github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/client/l1"
hostcommon "github.com/ethereum-optimism/optimism/op-program/host/common"
"github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -37,7 +38,7 @@ func TestServerMode(t *testing.T) {
logger := testlog.Logger(t, log.LevelTrace)
result := make(chan error)
go func() {
result <- PreimageServer(context.Background(), logger, cfg, preimageServer, hintServer, makeDefaultPrefetcher)
result <- hostcommon.PreimageServer(context.Background(), logger, cfg, preimageServer, hintServer, makeDefaultPrefetcher)
}()
pClient := preimage.NewOracleClient(preimageClient)
......
......@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-program/client/l2"
"github.com/ethereum-optimism/optimism/op-program/client/mpt"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore"
hosttypes "github.com/ethereum-optimism/optimism/op-program/host/types"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
......@@ -28,11 +29,6 @@ var (
precompileFailure = [1]byte{0}
)
var (
ErrExperimentalPrefetchFailed = errors.New("experimental prefetch failed")
ErrExperimentalPrefetchDisabled = errors.New("experimental prefetch disabled")
)
var acceleratedPrecompiles = []common.Address{
common.BytesToAddress([]byte{0x1}), // ecrecover
common.BytesToAddress([]byte{0x8}), // bn256Pairing
......@@ -50,35 +46,45 @@ type L1BlobSource interface {
GetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error)
}
type L2Source interface {
InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error)
NodeByHash(ctx context.Context, hash common.Hash) ([]byte, error)
CodeByHash(ctx context.Context, hash common.Hash) ([]byte, error)
OutputByRoot(ctx context.Context, root common.Hash) (eth.Output, error)
}
type Prefetcher struct {
logger log.Logger
l1Fetcher L1Source
l1BlobFetcher L1BlobSource
l2Fetcher L2Source
l2Fetcher *RetryingL2Source
lastHint string
kvStore kvstore.KV
// Used to run the program for native block execution
executor ProgramExecutor
}
func NewPrefetcher(logger log.Logger, l1Fetcher L1Source, l1BlobFetcher L1BlobSource, l2Fetcher L2Source, kvStore kvstore.KV) *Prefetcher {
func NewPrefetcher(
logger log.Logger,
l1Fetcher L1Source,
l1BlobFetcher L1BlobSource,
l2Fetcher hosttypes.L2Source,
kvStore kvstore.KV,
l2ChainConfig *params.ChainConfig,
executor ProgramExecutor,
) *Prefetcher {
return &Prefetcher{
logger: logger,
l1Fetcher: NewRetryingL1Source(logger, l1Fetcher),
l1BlobFetcher: NewRetryingL1BlobSource(logger, l1BlobFetcher),
l2Fetcher: NewRetryingL2Source(logger, l2Fetcher),
kvStore: kvStore,
executor: executor,
}
}
func (p *Prefetcher) Hint(hint string) error {
p.logger.Trace("Received hint", "hint", hint)
p.lastHint = hint
// This is a special case to force block execution in order to populate the cache with preimage data
if hintType, _, err := parseHint(hint); err == nil && hintType == l2.HintL2BlockData {
return p.prefetch(context.Background(), hint)
}
return nil
}
......@@ -288,10 +294,34 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error {
return fmt.Errorf("failed to fetch L2 output root %s: %w", hash, err)
}
return p.kvStore.Put(preimage.Keccak256Key(hash).PreimageKey(), output.Marshal())
case l2.HintL2BlockData:
if p.executor == nil {
return fmt.Errorf("this prefetcher does not support native block execution")
}
if len(hintBytes) != 32+32+8 {
return fmt.Errorf("invalid L2 block data hint: %x", hint)
}
agreedBlockHash := common.Hash(hintBytes[:32])
blockHash := common.Hash(hintBytes[32:64])
chainID := binary.BigEndian.Uint64(hintBytes[64:72])
key := BlockDataKey(blockHash)
if _, err := p.kvStore.Get(key.Key()); err == nil {
return nil
}
if err := p.nativeReExecuteBlock(ctx, agreedBlockHash, blockHash, chainID); err != nil {
return fmt.Errorf("failed to re-execute block: %w", err)
}
return p.kvStore.Put(BlockDataKey(blockHash).Key(), []byte{1})
}
return fmt.Errorf("unknown hint type: %v", hintType)
}
type BlockDataKey [32]byte
func (p BlockDataKey) Key() [32]byte {
return crypto.Keccak256Hash([]byte("block_data"), p[:])
}
func (p *Prefetcher) storeReceipts(receipts types.Receipts) error {
opaqueReceipts, err := eth.EncodeReceipts(receipts)
if err != nil {
......
......@@ -4,10 +4,12 @@ import (
"context"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"testing"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
......@@ -20,6 +22,7 @@ import (
"github.com/ethereum-optimism/optimism/op-program/client/l1"
"github.com/ethereum-optimism/optimism/op-program/client/l2"
"github.com/ethereum-optimism/optimism/op-program/client/mpt"
hostcommon "github.com/ethereum-optimism/optimism/op-program/host/common"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -512,6 +515,58 @@ func TestFetchL2Code(t *testing.T) {
})
}
func TestFetchL2BlockData(t *testing.T) {
chainID := uint64(0xdead)
testBlockExec := func(t *testing.T, err error) {
prefetcher, _, _, l2Client, _ := createPrefetcher(t)
rng := rand.New(rand.NewSource(123))
block, _ := testutils.RandomBlock(rng, 10)
disputedBlockHash := common.Hash{0xab}
l2Client.ExpectInfoAndTxsByHash(block.Hash(), eth.BlockToInfo(block), block.Transactions(), nil)
l2Client.ExpectInfoAndTxsByHash(disputedBlockHash, eth.BlockToInfo(nil), nil, err)
defer l2Client.MockDebugClient.AssertExpectations(t)
prefetcher.executor = &mockExecutor{}
hint := l2.L2BlockDataHint{
AgreedBlockHash: block.Hash(),
BlockHash: disputedBlockHash,
ChainID: chainID,
}.Hint()
require.NoError(t, prefetcher.Hint(hint))
require.True(t, prefetcher.executor.(*mockExecutor).invoked)
require.Equal(t, prefetcher.executor.(*mockExecutor).blockNumber, block.NumberU64()+1)
require.Equal(t, prefetcher.executor.(*mockExecutor).chainID, chainID)
data, err := prefetcher.kvStore.Get(BlockDataKey(disputedBlockHash).Key())
require.NoError(t, err)
require.Equal(t, data, []byte{1})
// ensure executor isn't used on a cache hit
prefetcher.executor.(*mockExecutor).invoked = false
require.NoError(t, prefetcher.Hint(hint))
require.False(t, prefetcher.executor.(*mockExecutor).invoked)
}
t.Run("exec block not found", func(t *testing.T) {
testBlockExec(t, ethereum.NotFound)
})
t.Run("exec block fetch error", func(t *testing.T) {
testBlockExec(t, errors.New("fetch error"))
})
t.Run("no exec", func(t *testing.T) {
prefetcher, _, _, _, _ := createPrefetcher(t)
hint := l2.L2BlockDataHint{
AgreedBlockHash: common.Hash{0xaa},
BlockHash: common.Hash{0xab},
ChainID: chainID,
}.Hint()
err := prefetcher.Hint(hint)
require.ErrorContains(t, err, "this prefetcher does not support native block execution")
})
}
func TestBadHints(t *testing.T) {
prefetcher, _, _, _, kv := createPrefetcher(t)
hash := common.Hash{0xad}
......@@ -569,7 +624,7 @@ func TestRetryWhenNotAvailableAfterPrefetching(t *testing.T) {
_, l1Source, l1BlobSource, l2Cl, kv := createPrefetcher(t)
putsToIgnore := 2
kv = &unreliableKvStore{KV: kv, putsToIgnore: putsToIgnore}
prefetcher := NewPrefetcher(testlog.Logger(t, log.LevelInfo), l1Source, l1BlobSource, l2Cl, kv)
prefetcher := NewPrefetcher(testlog.Logger(t, log.LevelInfo), l1Source, l1BlobSource, l2Cl, kv, nil, nil)
// Expect one call for each ignored put, plus one more request for when the put succeeds
for i := 0; i < putsToIgnore+1; i++ {
......@@ -621,7 +676,7 @@ func createPrefetcher(t *testing.T) (*Prefetcher, *testutils.MockL1Source, *test
MockDebugClient: new(testutils.MockDebugClient),
}
prefetcher := NewPrefetcher(logger, l1Source, l1BlobSource, l2Source, kv)
prefetcher := NewPrefetcher(logger, l1Source, l1BlobSource, l2Source, kv, nil, nil)
return prefetcher, l1Source, l1BlobSource, l2Source, kv
}
......@@ -723,3 +778,17 @@ func (o *legacyPrecompileOracle) Precompile(address common.Address, input []byte
}
return result[1:], result[0] == 1
}
type mockExecutor struct {
invoked bool
blockNumber uint64
chainID uint64
}
func (m *mockExecutor) RunProgram(
ctx context.Context, prefetcher hostcommon.Prefetcher, blockNumber uint64, chainID uint64) error {
m.invoked = true
m.blockNumber = blockNumber
m.chainID = chainID
return nil
}
package prefetcher
import (
"context"
"errors"
hostcommon "github.com/ethereum-optimism/optimism/op-program/host/common"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
)
type ProgramExecutor interface {
// RunProgram derives the block at the specified blockNumber
RunProgram(ctx context.Context, prefetcher hostcommon.Prefetcher, blockNumber uint64, chainID uint64) error
}
// nativeReExecuteBlock is a helper function that re-executes a block natively.
// It is used to populate the kv store with the data needed for the program to
// re-derive the block.
func (p *Prefetcher) nativeReExecuteBlock(
ctx context.Context, agreedBlockHash, blockHash common.Hash, chainID uint64) error {
// Avoid retries as the block may not be canonical and unavailable
_, _, err := p.l2Fetcher.source.InfoAndTxsByHash(ctx, blockHash)
if err == nil {
// we already have the data needed for the program to re-execute
return nil
}
if !errors.Is(err, ethereum.NotFound) {
p.logger.Error("Failed to fetch block", "block_hash", blockHash, "err", err)
}
header, _, err := p.l2Fetcher.InfoAndTxsByHash(ctx, agreedBlockHash)
if err != nil {
return err
}
p.logger.Info("Re-executing block", "block_hash", blockHash, "block_number", header.NumberU64())
return p.executor.RunProgram(ctx, p, header.NumberU64()+1, chainID)
}
......@@ -4,6 +4,7 @@ import (
"context"
"math"
hosttypes "github.com/ethereum-optimism/optimism/op-program/host/types"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common"
......@@ -97,7 +98,7 @@ var _ L1BlobSource = (*RetryingL1BlobSource)(nil)
type RetryingL2Source struct {
logger log.Logger
source L2Source
source hosttypes.L2Source
strategy retry.Strategy
}
......@@ -142,7 +143,7 @@ func (s *RetryingL2Source) OutputByRoot(ctx context.Context, root common.Hash) (
})
}
func NewRetryingL2Source(logger log.Logger, source L2Source) *RetryingL2Source {
func NewRetryingL2Source(logger log.Logger, source hosttypes.L2Source) *RetryingL2Source {
return &RetryingL2Source{
logger: logger,
source: source,
......@@ -150,4 +151,4 @@ func NewRetryingL2Source(logger log.Logger, source L2Source) *RetryingL2Source {
}
}
var _ L2Source = (*RetryingL2Source)(nil)
var _ hosttypes.L2Source = (*RetryingL2Source)(nil)
......@@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
hosttypes "github.com/ethereum-optimism/optimism/op-program/host/types"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -374,4 +375,4 @@ func (m *MockL2Source) ExpectOutputByRoot(root common.Hash, output eth.Output, e
m.Mock.On("OutputByRoot", root).Once().Return(output, &err)
}
var _ L2Source = (*MockL2Source)(nil)
var _ hosttypes.L2Source = (*MockL2Source)(nil)
package types
import (
"context"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
type DataFormat string
const (
......@@ -9,3 +17,10 @@ const (
)
var SupportedDataFormats = []DataFormat{DataFormatFile, DataFormatDirectory, DataFormatPebble}
type L2Source interface {
InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error)
NodeByHash(ctx context.Context, hash common.Hash) ([]byte, error)
CodeByHash(ctx context.Context, hash common.Hash) ([]byte, error)
OutputByRoot(ctx context.Context, root common.Hash) (eth.Output, error)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment