Commit 13d2bccf authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #5497 from ethereum-optimism/inphi/fpp-detached

op-program: Support running program as a separate process
parents c6d3e78c d597f34e
......@@ -9,9 +9,11 @@ import (
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog"
oppcl "github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/client/driver"
opp "github.com/ethereum-optimism/optimism/op-program/host"
oppconf "github.com/ethereum-optimism/optimism/op-program/host/config"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
......@@ -19,7 +21,27 @@ import (
"github.com/stretchr/testify/require"
)
// bypass the test runnner if running client to execute the fpp directly
func init() {
if !opp.RunningProgramInClient() {
return
}
logger := oplog.NewLogger(oplog.CLIConfig{
Level: "debug",
Format: "text",
})
oppcl.Main(logger)
}
func TestVerifyL2OutputRoot(t *testing.T) {
testVerifyL2OutputRoot(t, false)
}
func TestVerifyL2OutputRootDetached(t *testing.T) {
testVerifyL2OutputRoot(t, true)
}
func testVerifyL2OutputRoot(t *testing.T, detached bool) {
InitParallel(t)
ctx := context.Background()
......@@ -93,6 +115,7 @@ func TestVerifyL2OutputRoot(t *testing.T) {
fppConfig.L1URL = sys.NodeEndpoint("l1")
fppConfig.L2URL = sys.NodeEndpoint("sequencer")
fppConfig.DataDir = preimageDir
fppConfig.Detached = detached
// Check the FPP confirms the expected output
t.Log("Running fault proof in fetching mode")
......@@ -119,7 +142,11 @@ func TestVerifyL2OutputRoot(t *testing.T) {
t.Log("Running fault proof with invalid claim")
fppConfig.L2Claim = common.Hash{0xaa}
err = opp.FaultProofProgram(log, fppConfig)
require.ErrorIs(t, err, driver.ErrClaimNotValid)
if detached {
require.Error(t, err, "exit status 1")
} else {
require.ErrorIs(t, err, driver.ErrClaimNotValid)
}
}
func waitForSafeHead(ctx context.Context, safeBlockNum uint64, rollupClient *sources.RollupClient) error {
......
package client
import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
)
type BootInfo struct {
// TODO(CLI-XXX): The rollup config will be hardcoded. It's configurable for testing purposes.
Rollup *rollup.Config `json:"rollup"`
L2ChainConfig *params.ChainConfig `json:"l2_chain_config"`
L1Head common.Hash `json:"l1_head"`
L2Head common.Hash `json:"l2_head"`
L2Claim common.Hash `json:"l2_claim"`
L2ClaimBlockNumber uint64 `json:"l2_claim_block_number"`
}
type BootstrapOracleWriter struct {
w io.Writer
}
func NewBootstrapOracleWriter(w io.Writer) *BootstrapOracleWriter {
return &BootstrapOracleWriter{w: w}
}
func (bw *BootstrapOracleWriter) WriteBootInfo(info *BootInfo) error {
// TODO(CLI-3751): Bootstrap from local oracle
payload, err := json.Marshal(info)
if err != nil {
return err
}
var b []byte
b = binary.BigEndian.AppendUint32(b, uint32(len(payload)))
b = append(b, payload...)
_, err = bw.w.Write(b)
return err
}
type BootstrapOracleReader struct {
r io.Reader
}
func NewBootstrapOracleReader(r io.Reader) *BootstrapOracleReader {
return &BootstrapOracleReader{r: r}
}
func (br *BootstrapOracleReader) BootInfo() (*BootInfo, error) {
var length uint32
if err := binary.Read(br.r, binary.BigEndian, &length); err != nil {
if err == io.EOF {
return nil, io.EOF
}
return nil, fmt.Errorf("failed to read bootinfo length prefix: %w", err)
}
payload := make([]byte, length)
if length > 0 {
if _, err := io.ReadFull(br.r, payload); err != nil {
return nil, fmt.Errorf("failed to read bootinfo data (length %d): %w", length, err)
}
}
var bootInfo BootInfo
if err := json.Unmarshal(payload, &bootInfo); err != nil {
return nil, err
}
return &bootInfo, nil
}
package client
import (
"io"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
)
func TestBootstrapOracle(t *testing.T) {
r, w := io.Pipe()
br := NewBootstrapOracleReader(r)
bw := NewBootstrapOracleWriter(w)
bootInfo := BootInfo{
Rollup: new(rollup.Config),
L2ChainConfig: new(params.ChainConfig),
L1Head: common.HexToHash("0xffffa"),
L2Head: common.HexToHash("0xffffb"),
L2Claim: common.HexToHash("0xffffc"),
L2ClaimBlockNumber: 1,
}
go func() {
err := bw.WriteBootInfo(&bootInfo)
require.NoError(t, err)
}()
type result struct {
bootInnfo *BootInfo
err error
}
read := make(chan result)
go func() {
readBootInfo, err := br.BootInfo()
read <- result{readBootInfo, err}
close(read)
}()
select {
case <-time.After(time.Second * 30):
t.Error("timeout waiting for bootstrap oracle")
case r := <-read:
require.NoError(t, r.err)
require.Equal(t, bootInfo, *r.bootInnfo)
}
}
package client
const (
// 0,1,2 used for stdin,stdout,stderr
HClientRFd = iota + 3
HClientWFd
PClientRFd
PClientWFd
BootRFd // TODO(CLI-3751): remove
MaxFd
)
......@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"os"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
......@@ -15,30 +16,54 @@ import (
cldr "github.com/ethereum-optimism/optimism/op-program/client/driver"
"github.com/ethereum-optimism/optimism/op-program/client/l1"
"github.com/ethereum-optimism/optimism/op-program/client/l2"
oppio "github.com/ethereum-optimism/optimism/op-program/io"
"github.com/ethereum-optimism/optimism/op-program/preimage"
)
// ClientProgram executes the Program, while attached to an IO based pre-image oracle, to be served by a host.
func ClientProgram(
logger log.Logger,
cfg *rollup.Config,
l2Cfg *params.ChainConfig,
l1Head common.Hash,
l2Head common.Hash,
l2Claim common.Hash,
l2ClaimBlockNumber uint64,
preimageOracle io.ReadWriter,
preimageHinter io.ReadWriter,
) error {
// Main executes the client program in a detached context and exits the current process.
// The client runtime environment must be preset before calling this function.
func Main(logger log.Logger) {
log.Info("Starting fault proof program client")
preimageOracle := CreatePreimageChannel()
preimageHinter := CreateHinterChannel()
bootOracle := os.NewFile(BootRFd, "bootR")
err := RunProgram(logger, bootOracle, preimageOracle, preimageHinter)
if err != nil {
log.Error("Program failed", "err", err)
os.Exit(1)
} else {
os.Exit(0)
}
}
// RunProgram executes the Program, while attached to an IO based pre-image oracle, to be served by a host.
func RunProgram(logger log.Logger, bootOracle io.Reader, preimageOracle io.ReadWriter, preimageHinter io.ReadWriter) error {
bootReader := NewBootstrapOracleReader(bootOracle)
bootInfo, err := bootReader.BootInfo()
if err != nil {
return fmt.Errorf("failed to read boot info: %w", err)
}
logger.Debug("Loaded boot info", "bootInfo", bootInfo)
pClient := preimage.NewOracleClient(preimageOracle)
hClient := preimage.NewHintWriter(preimageHinter)
l1PreimageOracle := l1.NewPreimageOracle(pClient, hClient)
l2PreimageOracle := l2.NewPreimageOracle(pClient, hClient)
return Program(logger, cfg, l2Cfg, l1Head, l2Head, l2Claim, l2ClaimBlockNumber, l1PreimageOracle, l2PreimageOracle)
return runDerivation(
logger,
bootInfo.Rollup,
bootInfo.L2ChainConfig,
bootInfo.L1Head,
bootInfo.L2Head,
bootInfo.L2Claim,
bootInfo.L2ClaimBlockNumber,
l1PreimageOracle,
l2PreimageOracle,
)
}
// Program executes the L2 state transition, given a minimal interface to retrieve data.
func Program(logger log.Logger, cfg *rollup.Config, l2Cfg *params.ChainConfig, l1Head common.Hash, l2Head common.Hash, l2Claim common.Hash, l2ClaimBlockNum uint64, l1Oracle l1.Oracle, l2Oracle l2.Oracle) error {
// runDerivation executes the L2 state transition, given a minimal interface to retrieve data.
func runDerivation(logger log.Logger, cfg *rollup.Config, l2Cfg *params.ChainConfig, l1Head common.Hash, l2Head common.Hash, l2Claim common.Hash, l2ClaimBlockNum uint64, l1Oracle l1.Oracle, l2Oracle l2.Oracle) error {
l1Source := l1.NewOracleL1Client(logger, l1Oracle, l1Head)
engineBackend, err := l2.NewOracleBackedL2Chain(logger, l2Oracle, l2Cfg, l2Head)
if err != nil {
......@@ -57,3 +82,16 @@ func Program(logger log.Logger, cfg *rollup.Config, l2Cfg *params.ChainConfig, l
}
return d.ValidateClaim(eth.Bytes32(l2Claim))
}
func CreateHinterChannel() oppio.FileChannel {
r := os.NewFile(HClientRFd, "preimage-hint-read")
w := os.NewFile(HClientWFd, "preimage-hint-write")
return oppio.NewReadWritePair(r, w)
}
// CreatePreimageChannel returns a FileChannel for the preimage oracle in a detached context
func CreatePreimageChannel() oppio.FileChannel {
r := os.NewFile(PClientRFd, "preimage-oracle-read")
w := os.NewFile(PClientWFd, "preimage-oracle-write")
return oppio.NewReadWritePair(r, w)
}
......@@ -214,6 +214,25 @@ func TestL2BlockNumber(t *testing.T) {
})
}
func TestDetached(t *testing.T) {
t.Run("DefaultFalse", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t))
require.False(t, cfg.Detached)
})
t.Run("Enabled", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t, "--detached"))
require.True(t, cfg.Detached)
})
t.Run("EnabledWithArg", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t, "--detached=true"))
require.True(t, cfg.Detached)
})
t.Run("Disabled", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(t, "--detached=false"))
require.False(t, cfg.Detached)
})
}
func verifyArgsInvalid(t *testing.T, messageContains string, cliArgs []string) {
_, _, err := runWithArgs(cliArgs)
require.ErrorContains(t, err, messageContains)
......
......@@ -49,6 +49,8 @@ type Config struct {
L2ClaimBlockNumber uint64
// L2ChainConfig is the op-geth chain config for the L2 execution engine
L2ChainConfig *params.ChainConfig
// Detached indicates that the program runs as a separate process
Detached bool
}
func (c *Config) Check() error {
......@@ -137,6 +139,7 @@ func NewConfigFromCLI(ctx *cli.Context) (*Config, error) {
L1URL: ctx.GlobalString(flags.L1NodeAddr.Name),
L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name),
L1RPCKind: sources.RPCProviderKind(ctx.GlobalString(flags.L1RPCProviderKind.Name)),
Detached: ctx.GlobalBool(flags.Detached.Name),
}, nil
}
......
......@@ -81,6 +81,11 @@ var (
return &out
}(),
}
Detached = cli.BoolFlag{
Name: "detached",
Usage: "Run the program as a separate process detached from the host",
EnvVar: service.PrefixEnvVar(envVarPrefix, "DETACHED"),
}
)
// Flags contains the list of configuration options available to the binary.
......@@ -101,6 +106,7 @@ var programFlags = []cli.Flag{
L1NodeAddr,
L1TrustRPC,
L1RPCProviderKind,
Detached,
}
func init() {
......
......@@ -5,7 +5,9 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"os/exec"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/client"
......@@ -14,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore"
"github.com/ethereum-optimism/optimism/op-program/host/prefetcher"
oppio "github.com/ethereum-optimism/optimism/op-program/io"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
......@@ -24,8 +27,20 @@ type L2Source struct {
*sources.DebugClient
}
const opProgramChildEnvName = "OP_PROGRAM_CHILD"
func RunningProgramInClient() bool {
value, _ := os.LookupEnv(opProgramChildEnvName)
return value == "true"
}
// FaultProofProgram is the programmatic entry-point for the fault proof program
func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
if RunningProgramInClient() {
cl.Main(logger)
panic("Client main should have exited process")
}
if err := cfg.Check(); err != nil {
return fmt.Errorf("invalid config: %w", err)
}
......@@ -44,35 +59,15 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
kv = kvstore.NewDiskKV(cfg.DataDir)
}
var getPreimage func(key common.Hash) ([]byte, error)
var hinter func(hint string) error
var (
getPreimage func(key common.Hash) ([]byte, error)
hinter func(hint string) error
)
if cfg.FetchingEnabled() {
logger.Info("Connecting to L1 node", "l1", cfg.L1URL)
l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL)
prefetch, err := makePrefetcher(ctx, logger, kv, cfg)
if err != nil {
return fmt.Errorf("failed to setup L1 RPC: %w", err)
return fmt.Errorf("failed to create prefetcher: %w", err)
}
logger.Info("Connecting to L2 node", "l2", cfg.L2URL)
l2RPC, err := client.NewRPC(ctx, logger, cfg.L2URL)
if err != nil {
return fmt.Errorf("failed to setup L2 RPC: %w", err)
}
l1ClCfg := sources.L1ClientDefaultConfig(cfg.Rollup, cfg.L1TrustRPC, cfg.L1RPCKind)
l2ClCfg := sources.L2ClientDefaultConfig(cfg.Rollup, true)
l1Cl, err := sources.NewL1Client(l1RPC, logger, nil, l1ClCfg)
if err != nil {
return fmt.Errorf("failed to create L1 client: %w", err)
}
l2Cl, err := sources.NewL2Client(l2RPC, logger, nil, l2ClCfg)
if err != nil {
return fmt.Errorf("failed to create L2 client: %w", err)
}
l2DebugCl := &L2Source{L2Client: l2Cl, DebugClient: sources.NewDebugClient(l2RPC.CallContext)}
logger.Info("Setting up pre-fetcher")
prefetch := prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv)
getPreimage = func(key common.Hash) ([]byte, error) { return prefetch.GetPreimage(ctx, key) }
hinter = prefetch.Hint
} else {
......@@ -84,56 +79,121 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
}
}
// TODO(CLI-3751: Load local preimages
localPreimageSource := kvstore.NewLocalPreimageSource(cfg)
splitter := kvstore.NewPreimageSourceSplitter(localPreimageSource.Get, getPreimage)
// Setup pipe for preimage oracle interaction
pClientRW, pHostRW := bidirectionalPipe()
// Setup client I/O for preimage oracle interaction
pClientRW, pHostRW, err := oppio.CreateBidirectionalChannel()
if err != nil {
return fmt.Errorf("failed to create preimage pipe: %w", err)
}
oracleServer := preimage.NewOracleServer(pHostRW)
// Setup pipe for hint comms
hClientRW, hHostRW := bidirectionalPipe()
hHost := preimage.NewHintReader(hHostRW)
launchOracleServer(logger, oracleServer, splitter.Get)
defer pHostRW.Close()
// Setup client I/O for hint comms
hClientRW, hHostRW, err := oppio.CreateBidirectionalChannel()
if err != nil {
return fmt.Errorf("failed to create hints pipe: %w", err)
}
defer hHostRW.Close()
hHost := preimage.NewHintReader(hHostRW)
routeHints(logger, hHost, hinter)
launchOracleServer(logger, oracleServer, splitter.Get)
return cl.ClientProgram(
logger,
cfg.Rollup,
cfg.L2ChainConfig,
cfg.L1Head,
cfg.L2Head,
cfg.L2Claim,
cfg.L2ClaimBlockNumber,
pClientRW,
hClientRW,
)
}
bootClientR, bootHostW, err := os.Pipe()
if err != nil {
return fmt.Errorf("failed to create boot info pipe: %w", err)
}
type readWritePair struct {
io.ReadCloser
io.WriteCloser
var cmd *exec.Cmd
if cfg.Detached {
cmd = exec.Command(os.Args[0], os.Args[1:]...)
cmd.ExtraFiles = make([]*os.File, cl.MaxFd-3) // not including stdin, stdout and stderr
cmd.ExtraFiles[cl.HClientRFd-3] = hClientRW.Reader()
cmd.ExtraFiles[cl.HClientWFd-3] = hClientRW.Writer()
cmd.ExtraFiles[cl.PClientRFd-3] = pClientRW.Reader()
cmd.ExtraFiles[cl.PClientWFd-3] = pClientRW.Writer()
cmd.ExtraFiles[cl.BootRFd-3] = bootClientR
cmd.Stdout = os.Stdout // for debugging
cmd.Stderr = os.Stderr // for debugging
cmd.Env = append(os.Environ(), fmt.Sprintf("%s=true", opProgramChildEnvName))
err := cmd.Start()
if err != nil {
return fmt.Errorf("program cmd failed to start: %w", err)
}
}
bootInfo := cl.BootInfo{
Rollup: cfg.Rollup,
L2ChainConfig: cfg.L2ChainConfig,
L1Head: cfg.L1Head,
L2Head: cfg.L2Head,
L2Claim: cfg.L2Claim,
L2ClaimBlockNumber: cfg.L2ClaimBlockNumber,
}
// Spawn a goroutine to write the boot info to avoid blocking this host's goroutine
// if we're running in detached mode
bootInitErrorCh := initializeBootInfoAsync(&bootInfo, bootHostW)
if !cfg.Detached {
return cl.RunProgram(logger, bootClientR, pClientRW, hClientRW)
}
if err := <-bootInitErrorCh; err != nil {
// return early as a detached client is blocked waiting for the boot info
return fmt.Errorf("failed to write boot info: %w", err)
}
if cfg.Detached {
err := cmd.Wait()
if err != nil {
return fmt.Errorf("failed to wait for child program: %w", err)
}
}
return nil
}
func (rw *readWritePair) Close() error {
if err := rw.ReadCloser.Close(); err != nil {
return err
func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *config.Config) (*prefetcher.Prefetcher, error) {
logger.Info("Connecting to L1 node", "l1", cfg.L1URL)
l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL)
if err != nil {
return nil, fmt.Errorf("failed to setup L1 RPC: %w", err)
}
logger.Info("Connecting to L2 node", "l2", cfg.L2URL)
l2RPC, err := client.NewRPC(ctx, logger, cfg.L2URL)
if err != nil {
return nil, fmt.Errorf("failed to setup L2 RPC: %w", err)
}
return rw.WriteCloser.Close()
l1ClCfg := sources.L1ClientDefaultConfig(cfg.Rollup, cfg.L1TrustRPC, cfg.L1RPCKind)
l2ClCfg := sources.L2ClientDefaultConfig(cfg.Rollup, true)
l1Cl, err := sources.NewL1Client(l1RPC, logger, nil, l1ClCfg)
if err != nil {
return nil, fmt.Errorf("failed to create L1 client: %w", err)
}
l2Cl, err := sources.NewL2Client(l2RPC, logger, nil, l2ClCfg)
if err != nil {
return nil, fmt.Errorf("failed to create L2 client: %w", err)
}
l2DebugCl := &L2Source{L2Client: l2Cl, DebugClient: sources.NewDebugClient(l2RPC.CallContext)}
return prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv), nil
}
func bidirectionalPipe() (a, b io.ReadWriteCloser) {
ar, bw := io.Pipe()
br, aw := io.Pipe()
return &readWritePair{ReadCloser: ar, WriteCloser: aw}, &readWritePair{ReadCloser: br, WriteCloser: bw}
func initializeBootInfoAsync(bootInfo *cl.BootInfo, bootOracle *os.File) <-chan error {
bootWriteErr := make(chan error, 1)
go func() {
bootOracleWriter := cl.NewBootstrapOracleWriter(bootOracle)
bootWriteErr <- bootOracleWriter.WriteBootInfo(bootInfo)
close(bootWriteErr)
}()
return bootWriteErr
}
func routeHints(logger log.Logger, hintReader *preimage.HintReader, hinter func(hint string) error) {
go func() {
for {
if err := hintReader.NextHint(hinter); err != nil {
if err == io.EOF || errors.Is(err, io.ErrClosedPipe) {
if err == io.EOF || errors.Is(err, fs.ErrClosed) {
logger.Debug("closing pre-image hint handler")
return
}
......@@ -148,7 +208,7 @@ func launchOracleServer(logger log.Logger, server *preimage.OracleServer, getter
go func() {
for {
if err := server.NextPreimageRequest(getter); err != nil {
if err == io.EOF || errors.Is(err, io.ErrClosedPipe) {
if err == io.EOF || errors.Is(err, fs.ErrClosed) {
logger.Debug("closing pre-image server")
return
}
......
package io
import (
"io"
"os"
)
// FileChannel is a unidirectional channel for file I/O
type FileChannel interface {
io.ReadWriteCloser
// Reader returns the file that is used for reading.
Reader() *os.File
// Writer returns the file that is used for writing.
Writer() *os.File
}
type ReadWritePair struct {
r *os.File
w *os.File
}
// NewReadWritePair creates a new FileChannel that uses the given files
func NewReadWritePair(r *os.File, w *os.File) *ReadWritePair {
return &ReadWritePair{r: r, w: w}
}
func (rw *ReadWritePair) Read(p []byte) (int, error) {
return rw.r.Read(p)
}
func (rw *ReadWritePair) Write(p []byte) (int, error) {
return rw.w.Write(p)
}
func (rw *ReadWritePair) Reader() *os.File {
return rw.r
}
func (rw *ReadWritePair) Writer() *os.File {
return rw.w
}
func (rw *ReadWritePair) Close() error {
if err := rw.r.Close(); err != nil {
return err
}
return rw.w.Close()
}
// CreateBidirectionalChannel creates a pair of FileChannels that are connected to each other.
func CreateBidirectionalChannel() (FileChannel, FileChannel, error) {
ar, bw, err := os.Pipe()
if err != nil {
return nil, nil, err
}
br, aw, err := os.Pipe()
if err != nil {
return nil, nil, err
}
return NewReadWritePair(ar, aw), NewReadWritePair(br, bw), nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment