Commit b20164cd authored by Adrian Sutton's avatar Adrian Sutton

op-program: Add server mode

parent 34175ef7
...@@ -108,7 +108,7 @@ func testVerifyL2OutputRoot(t *testing.T, detached bool) { ...@@ -108,7 +108,7 @@ func testVerifyL2OutputRoot(t *testing.T, detached bool) {
// Check the FPP confirms the expected output // Check the FPP confirms the expected output
t.Log("Running fault proof in fetching mode") t.Log("Running fault proof in fetching mode")
err = opp.FaultProofProgram(log, fppConfig) err = opp.FaultProofProgram(ctx, log, fppConfig)
require.NoError(t, err) require.NoError(t, err)
t.Log("Shutting down network") t.Log("Shutting down network")
...@@ -124,13 +124,13 @@ func testVerifyL2OutputRoot(t *testing.T, detached bool) { ...@@ -124,13 +124,13 @@ func testVerifyL2OutputRoot(t *testing.T, detached bool) {
// Should be able to rerun in offline mode using the pre-fetched images // Should be able to rerun in offline mode using the pre-fetched images
fppConfig.L1URL = "" fppConfig.L1URL = ""
fppConfig.L2URL = "" fppConfig.L2URL = ""
err = opp.FaultProofProgram(log, fppConfig) err = opp.FaultProofProgram(ctx, log, fppConfig)
require.NoError(t, err) require.NoError(t, err)
// Check that a fault is detected if we provide an incorrect claim // Check that a fault is detected if we provide an incorrect claim
t.Log("Running fault proof with invalid claim") t.Log("Running fault proof with invalid claim")
fppConfig.L2Claim = common.Hash{0xaa} fppConfig.L2Claim = common.Hash{0xaa}
err = opp.FaultProofProgram(log, fppConfig) err = opp.FaultProofProgram(ctx, log, fppConfig)
if detached { if detached {
require.Error(t, err, "exit status 1") require.Error(t, err, "exit status 1")
} else { } else {
......
package main package main
import ( import (
"errors"
"fmt" "fmt"
"os" "os"
"github.com/ethereum-optimism/optimism/op-program/client/driver"
"github.com/ethereum-optimism/optimism/op-program/host" "github.com/ethereum-optimism/optimism/op-program/host"
"github.com/ethereum-optimism/optimism/op-program/host/config" "github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/flags" "github.com/ethereum-optimism/optimism/op-program/host/flags"
...@@ -37,12 +35,8 @@ var VersionWithMeta = func() string { ...@@ -37,12 +35,8 @@ var VersionWithMeta = func() string {
func main() { func main() {
args := os.Args args := os.Args
if err := run(args, host.FaultProofProgram); errors.Is(err, driver.ErrClaimNotValid) { if err := run(args, host.Main); err != nil {
log.Crit("Claim is invalid", "err", err)
} else if err != nil {
log.Crit("Application failed", "err", err) log.Crit("Application failed", "err", err)
} else {
log.Info("Claim successfully verified")
} }
} }
......
...@@ -232,6 +232,28 @@ func TestExec(t *testing.T) { ...@@ -232,6 +232,28 @@ func TestExec(t *testing.T) {
}) })
} }
func TestServerMode(t *testing.T) {
t.Run("DefaultFalse", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs())
require.False(t, cfg.ServerMode)
})
t.Run("Enabled", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs("--server"))
require.True(t, cfg.ServerMode)
})
t.Run("EnabledWithArg", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs("--server=true"))
require.True(t, cfg.ServerMode)
})
t.Run("DisabledWithArg", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs("--server=false"))
require.False(t, cfg.ServerMode)
})
t.Run("InvalidArg", func(t *testing.T) {
verifyArgsInvalid(t, "invalid boolean value \"foo\" for -server", addRequiredArgs("--server=foo"))
})
}
func verifyArgsInvalid(t *testing.T, messageContains string, cliArgs []string) { func verifyArgsInvalid(t *testing.T, messageContains string, cliArgs []string) {
_, _, err := runWithArgs(cliArgs) _, _, err := runWithArgs(cliArgs)
require.ErrorContains(t, err, messageContains) require.ErrorContains(t, err, messageContains)
......
...@@ -25,6 +25,7 @@ var ( ...@@ -25,6 +25,7 @@ var (
ErrInvalidL2Claim = errors.New("invalid l2 claim") ErrInvalidL2Claim = errors.New("invalid l2 claim")
ErrInvalidL2ClaimBlock = errors.New("invalid l2 claim block number") ErrInvalidL2ClaimBlock = errors.New("invalid l2 claim block number")
ErrDataDirRequired = errors.New("datadir must be specified when in non-fetching mode") ErrDataDirRequired = errors.New("datadir must be specified when in non-fetching mode")
ErrNoExecInServerMode = errors.New("exec command must not be set when in server mode")
) )
type Config struct { type Config struct {
...@@ -52,6 +53,10 @@ type Config struct { ...@@ -52,6 +53,10 @@ type Config struct {
// ExecCmd specifies the client program to execute in a separate process. // ExecCmd specifies the client program to execute in a separate process.
// If unset, the fault proof client is run in the same process. // If unset, the fault proof client is run in the same process.
ExecCmd string ExecCmd string
// ServerMode indicates that the program should run in pre-image server mode and wait for requests.
// No client program is run.
ServerMode bool
} }
func (c *Config) Check() error { func (c *Config) Check() error {
...@@ -82,6 +87,9 @@ func (c *Config) Check() error { ...@@ -82,6 +87,9 @@ func (c *Config) Check() error {
if !c.FetchingEnabled() && c.DataDir == "" { if !c.FetchingEnabled() && c.DataDir == "" {
return ErrDataDirRequired return ErrDataDirRequired
} }
if c.ServerMode && c.ExecCmd != "" {
return ErrNoExecInServerMode
}
return nil return nil
} }
...@@ -149,7 +157,8 @@ func NewConfigFromCLI(ctx *cli.Context) (*Config, error) { ...@@ -149,7 +157,8 @@ func NewConfigFromCLI(ctx *cli.Context) (*Config, error) {
L1URL: ctx.GlobalString(flags.L1NodeAddr.Name), L1URL: ctx.GlobalString(flags.L1NodeAddr.Name),
L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name), L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name),
L1RPCKind: sources.RPCProviderKind(ctx.GlobalString(flags.L1RPCProviderKind.Name)), L1RPCKind: sources.RPCProviderKind(ctx.GlobalString(flags.L1RPCProviderKind.Name)),
ExecCmd: ctx.String(flags.Exec.Name), ExecCmd: ctx.GlobalString(flags.Exec.Name),
ServerMode: ctx.GlobalBool(flags.Server.Name),
}, nil }, nil
} }
......
...@@ -142,6 +142,14 @@ func TestRequireDataDirInNonFetchingMode(t *testing.T) { ...@@ -142,6 +142,14 @@ func TestRequireDataDirInNonFetchingMode(t *testing.T) {
require.ErrorIs(t, err, ErrDataDirRequired) require.ErrorIs(t, err, ErrDataDirRequired)
} }
func TestRejectExecAndServerMode(t *testing.T) {
cfg := validConfig()
cfg.ServerMode = true
cfg.ExecCmd = "echo"
err := cfg.Check()
require.ErrorIs(t, err, ErrNoExecInServerMode)
}
func validConfig() *Config { func validConfig() *Config {
cfg := NewConfig(validRollupConfig, validL2Genesis, validL1Head, validL2Head, validL2Claim, validL2ClaimBlockNum) cfg := NewConfig(validRollupConfig, validL2Genesis, validL1Head, validL2Head, validL2Claim, validL2ClaimBlockNum)
cfg.DataDir = "/tmp/configTest" cfg.DataDir = "/tmp/configTest"
......
...@@ -86,6 +86,11 @@ var ( ...@@ -86,6 +86,11 @@ var (
Usage: "Run the specified client program as a separate process detached from the host. Default is to run the client program in the host process.", Usage: "Run the specified client program as a separate process detached from the host. Default is to run the client program in the host process.",
EnvVar: service.PrefixEnvVar(envVarPrefix, "EXEC"), EnvVar: service.PrefixEnvVar(envVarPrefix, "EXEC"),
} }
Server = cli.BoolFlag{
Name: "server",
Usage: "Run in pre-image server mode without executing any client program.",
EnvVar: service.PrefixEnvVar(envVarPrefix, "SERVER"),
}
) )
// Flags contains the list of configuration options available to the binary. // Flags contains the list of configuration options available to the binary.
...@@ -107,6 +112,7 @@ var programFlags = []cli.Flag{ ...@@ -107,6 +112,7 @@ var programFlags = []cli.Flag{
L1TrustRPC, L1TrustRPC,
L1RPCProviderKind, L1RPCProviderKind,
Exec, Exec,
Server,
} }
func init() { func init() {
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
cl "github.com/ethereum-optimism/optimism/op-program/client" cl "github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/client/driver"
"github.com/ethereum-optimism/optimism/op-program/host/config" "github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore" "github.com/ethereum-optimism/optimism/op-program/host/kvstore"
"github.com/ethereum-optimism/optimism/op-program/host/prefetcher" "github.com/ethereum-optimism/optimism/op-program/host/prefetcher"
...@@ -27,56 +28,36 @@ type L2Source struct { ...@@ -27,56 +28,36 @@ type L2Source struct {
*sources.DebugClient *sources.DebugClient
} }
// FaultProofProgram is the programmatic entry-point for the fault proof program func Main(logger log.Logger, cfg *config.Config) error {
func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
if err := cfg.Check(); err != nil { if err := cfg.Check(); err != nil {
return fmt.Errorf("invalid config: %w", err) return fmt.Errorf("invalid config: %w", err)
} }
cfg.Rollup.LogDescription(logger, chaincfg.L2ChainIDToNetworkName) cfg.Rollup.LogDescription(logger, chaincfg.L2ChainIDToNetworkName)
ctx := context.Background() ctx := context.Background()
var kv kvstore.KV if cfg.ServerMode {
if cfg.DataDir == "" { preimageChan := cl.CreatePreimageChannel()
logger.Info("Using in-memory storage") hinterChan := cl.CreateHinterChannel()
kv = kvstore.NewMemKV() return PreimageServer(ctx, logger, cfg, preimageChan, hinterChan)
} else {
logger.Info("Creating disk storage", "datadir", cfg.DataDir)
if err := os.MkdirAll(cfg.DataDir, 0755); err != nil {
return fmt.Errorf("creating datadir: %w", err)
}
kv = kvstore.NewDiskKV(cfg.DataDir)
} }
var ( if err := FaultProofProgram(ctx, logger, cfg); errors.Is(err, driver.ErrClaimNotValid) {
getPreimage func(key common.Hash) ([]byte, error) log.Crit("Claim is invalid", "err", err)
hinter func(hint string) error } else if err != nil {
) return err
if cfg.FetchingEnabled() {
prefetch, err := makePrefetcher(ctx, logger, kv, cfg)
if err != nil {
return fmt.Errorf("failed to create prefetcher: %w", err)
}
getPreimage = func(key common.Hash) ([]byte, error) { return prefetch.GetPreimage(ctx, key) }
hinter = prefetch.Hint
} else { } else {
logger.Info("Using offline mode. All required pre-images must be pre-populated.") log.Info("Claim successfully verified")
getPreimage = kv.Get
hinter = func(hint string) error {
logger.Debug("ignoring prefetch hint", "hint", hint)
return nil
}
} }
return nil
}
localPreimageSource := kvstore.NewLocalPreimageSource(cfg) // FaultProofProgram is the programmatic entry-point for the fault proof program
splitter := kvstore.NewPreimageSourceSplitter(localPreimageSource.Get, getPreimage) func FaultProofProgram(ctx context.Context, logger log.Logger, cfg *config.Config) error {
// Setup client I/O for preimage oracle interaction // Setup client I/O for preimage oracle interaction
pClientRW, pHostRW, err := oppio.CreateBidirectionalChannel() pClientRW, pHostRW, err := oppio.CreateBidirectionalChannel()
if err != nil { if err != nil {
return fmt.Errorf("failed to create preimage pipe: %w", err) return fmt.Errorf("failed to create preimage pipe: %w", err)
} }
oracleServer := preimage.NewOracleServer(pHostRW)
launchOracleServer(logger, oracleServer, splitter.Get)
defer pHostRW.Close() defer pHostRW.Close()
// Setup client I/O for hint comms // Setup client I/O for hint comms
...@@ -84,9 +65,15 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error { ...@@ -84,9 +65,15 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to create hints pipe: %w", err) return fmt.Errorf("failed to create hints pipe: %w", err)
} }
defer hHostRW.Close()
hHost := preimage.NewHintReader(hHostRW) go func() {
routeHints(logger, hHost, hinter) defer hHostRW.Close()
err := PreimageServer(ctx, logger, cfg, pHostRW, hHostRW)
if err != nil {
logger.Error("preimage server failed", "err", err)
}
logger.Debug("Preimage server stopped")
}()
var cmd *exec.Cmd var cmd *exec.Cmd
if cfg.ExecCmd != "" { if cfg.ExecCmd != "" {
...@@ -106,12 +93,61 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error { ...@@ -106,12 +93,61 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
if err := cmd.Wait(); err != nil { if err := cmd.Wait(); err != nil {
return fmt.Errorf("failed to wait for child program: %w", err) return fmt.Errorf("failed to wait for child program: %w", err)
} }
logger.Debug("Client program completed successfully")
return nil return nil
} else { } else {
return cl.RunProgram(logger, pClientRW, hClientRW) return cl.RunProgram(logger, pClientRW, hClientRW)
} }
} }
func PreimageServer(ctx context.Context, logger log.Logger, cfg *config.Config, preimageChannel oppio.FileChannel, hintChannel oppio.FileChannel) error {
logger.Info("Starting preimage server")
var kv kvstore.KV
if cfg.DataDir == "" {
logger.Info("Using in-memory storage")
kv = kvstore.NewMemKV()
} else {
logger.Info("Creating disk storage", "datadir", cfg.DataDir)
if err := os.MkdirAll(cfg.DataDir, 0755); err != nil {
return fmt.Errorf("creating datadir: %w", err)
}
kv = kvstore.NewDiskKV(cfg.DataDir)
}
var (
getPreimage kvstore.PreimageSource
hinter preimage.HintHandler
)
if cfg.FetchingEnabled() {
prefetch, err := makePrefetcher(ctx, logger, kv, cfg)
if err != nil {
return fmt.Errorf("failed to create prefetcher: %w", err)
}
getPreimage = func(key common.Hash) ([]byte, error) { return prefetch.GetPreimage(ctx, key) }
hinter = prefetch.Hint
} else {
logger.Info("Using offline mode. All required pre-images must be pre-populated.")
getPreimage = kv.Get
hinter = func(hint string) error {
logger.Debug("ignoring prefetch hint", "hint", hint)
return nil
}
}
localPreimageSource := kvstore.NewLocalPreimageSource(cfg)
splitter := kvstore.NewPreimageSourceSplitter(localPreimageSource.Get, getPreimage)
preimageGetter := splitter.Get
serverDone := launchOracleServer(logger, preimageChannel, preimageGetter)
hinterDone := routeHints(logger, hintChannel, hinter)
select {
case err := <-serverDone:
return err
case err := <-hinterDone:
return err
}
}
func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *config.Config) (*prefetcher.Prefetcher, error) { func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *config.Config) (*prefetcher.Prefetcher, error) {
logger.Info("Connecting to L1 node", "l1", cfg.L1URL) logger.Info("Connecting to L1 node", "l1", cfg.L1URL)
l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL) l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL)
...@@ -139,8 +175,11 @@ func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg * ...@@ -139,8 +175,11 @@ func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *
return prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv), nil return prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv), nil
} }
func routeHints(logger log.Logger, hintReader *preimage.HintReader, hinter func(hint string) error) { func routeHints(logger log.Logger, hHostRW io.ReadWriter, hinter preimage.HintHandler) chan error {
chErr := make(chan error)
hintReader := preimage.NewHintReader(hHostRW)
go func() { go func() {
defer close(chErr)
for { for {
if err := hintReader.NextHint(hinter); err != nil { if err := hintReader.NextHint(hinter); err != nil {
if err == io.EOF || errors.Is(err, fs.ErrClosed) { if err == io.EOF || errors.Is(err, fs.ErrClosed) {
...@@ -148,14 +187,19 @@ func routeHints(logger log.Logger, hintReader *preimage.HintReader, hinter func( ...@@ -148,14 +187,19 @@ func routeHints(logger log.Logger, hintReader *preimage.HintReader, hinter func(
return return
} }
logger.Error("pre-image hint router error", "err", err) logger.Error("pre-image hint router error", "err", err)
chErr <- err
return return
} }
} }
}() }()
return chErr
} }
func launchOracleServer(logger log.Logger, server *preimage.OracleServer, getter func(key common.Hash) ([]byte, error)) { func launchOracleServer(logger log.Logger, pHostRW io.ReadWriteCloser, getter preimage.PreimageGetter) chan error {
chErr := make(chan error)
server := preimage.NewOracleServer(pHostRW)
go func() { go func() {
defer close(chErr)
for { for {
if err := server.NextPreimageRequest(getter); err != nil { if err := server.NextPreimageRequest(getter); err != nil {
if err == io.EOF || errors.Is(err, fs.ErrClosed) { if err == io.EOF || errors.Is(err, fs.ErrClosed) {
...@@ -163,8 +207,10 @@ func launchOracleServer(logger log.Logger, server *preimage.OracleServer, getter ...@@ -163,8 +207,10 @@ func launchOracleServer(logger log.Logger, server *preimage.OracleServer, getter
return return
} }
logger.Error("pre-image server error", "error", err) logger.Error("pre-image server error", "error", err)
chErr <- err
return return
} }
} }
}() }()
return chErr
} }
package host
import (
"context"
"errors"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/client/l1"
"github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore"
"github.com/ethereum-optimism/optimism/op-program/io"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestServerMode(t *testing.T) {
dir := t.TempDir()
l1Head := common.Hash{0x11}
cfg := config.NewConfig(&chaincfg.Goerli, config.OPGoerliChainConfig, l1Head, common.Hash{0x22}, common.Hash{0x33}, 1000)
cfg.DataDir = dir
cfg.ServerMode = true
preimageServer, preimageClient, err := io.CreateBidirectionalChannel()
require.NoError(t, err)
defer preimageClient.Close()
hintServer, hintClient, err := io.CreateBidirectionalChannel()
require.NoError(t, err)
defer hintClient.Close()
logger := testlog.Logger(t, log.LvlTrace)
result := make(chan error)
go func() {
result <- PreimageServer(context.Background(), logger, cfg, preimageServer, hintServer)
}()
pClient := preimage.NewOracleClient(preimageClient)
hClient := preimage.NewHintWriter(hintClient)
l1PreimageOracle := l1.NewPreimageOracle(pClient, hClient)
require.Equal(t, l1Head.Bytes(), pClient.Get(client.L1HeadLocalIndex), "Should get preimages")
// Should exit when a preimage is unavailable
require.Panics(t, func() {
l1PreimageOracle.HeaderByBlockHash(common.HexToHash("0x1234"))
}, "Preimage should not be available")
require.ErrorIs(t, waitFor(result), kvstore.ErrNotFound)
}
func waitFor(ch chan error) error {
timeout := time.After(30 * time.Second)
select {
case err := <-ch:
return err
case <-timeout:
return errors.New("timed out")
}
}
...@@ -43,7 +43,9 @@ func NewHintReader(rw io.ReadWriter) *HintReader { ...@@ -43,7 +43,9 @@ func NewHintReader(rw io.ReadWriter) *HintReader {
return &HintReader{rw: rw} return &HintReader{rw: rw}
} }
func (hr *HintReader) NextHint(router func(hint string) error) error { type HintHandler func(hint string) error
func (hr *HintReader) NextHint(router HintHandler) error {
var length uint32 var length uint32
if err := binary.Read(hr.rw, binary.BigEndian, &length); err != nil { if err := binary.Read(hr.rw, binary.BigEndian, &length); err != nil {
if err == io.EOF { if err == io.EOF {
......
...@@ -47,7 +47,9 @@ func NewOracleServer(rw io.ReadWriter) *OracleServer { ...@@ -47,7 +47,9 @@ func NewOracleServer(rw io.ReadWriter) *OracleServer {
return &OracleServer{rw: rw} return &OracleServer{rw: rw}
} }
func (o *OracleServer) NextPreimageRequest(getPreimage func(key common.Hash) ([]byte, error)) error { type PreimageGetter func(key common.Hash) ([]byte, error)
func (o *OracleServer) NextPreimageRequest(getPreimage PreimageGetter) error {
var key common.Hash var key common.Hash
if _, err := io.ReadFull(o.rw, key[:]); err != nil { if _, err := io.ReadFull(o.rw, key[:]); err != nil {
if err == io.EOF { if err == io.EOF {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment