Commit 5da18810 authored by inphi's avatar inphi

op-program: host-client interaction via I/O pipes

parent bd493973
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
oppcl "github.com/ethereum-optimism/optimism/op-program/client"
opp "github.com/ethereum-optimism/optimism/op-program/host" opp "github.com/ethereum-optimism/optimism/op-program/host"
oppconf "github.com/ethereum-optimism/optimism/op-program/host/config" oppconf "github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -88,7 +89,7 @@ func TestVerifyL2OutputRoot(t *testing.T) { ...@@ -88,7 +89,7 @@ func TestVerifyL2OutputRoot(t *testing.T) {
t.Log("Running fault proof with invalid claim") t.Log("Running fault proof with invalid claim")
fppConfig.L2Claim = common.Hash{0xaa} fppConfig.L2Claim = common.Hash{0xaa}
err = opp.FaultProofProgram(log, fppConfig) err = opp.FaultProofProgram(log, fppConfig)
require.ErrorIs(t, err, opp.ErrClaimNotValid) require.ErrorIs(t, err, oppcl.ErrClaimNotValid)
} }
func waitForSafeHead(ctx context.Context, safeBlockNum uint64, rollupClient *sources.RollupClient) error { func waitForSafeHead(ctx context.Context, safeBlockNum uint64, rollupClient *sources.RollupClient) error {
......
package client
import (
"context"
"errors"
"fmt"
"io"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
cldr "github.com/ethereum-optimism/optimism/op-program/client/driver"
"github.com/ethereum-optimism/optimism/op-program/client/l1"
"github.com/ethereum-optimism/optimism/op-program/client/l2"
"github.com/ethereum-optimism/optimism/op-program/preimage"
)
var (
ErrClaimNotValid = errors.New("invalid claim")
)
// ClientProgram executes the Program, while attached to an IO based pre-image oracle, to be served by a host.
func ClientProgram(
logger log.Logger,
cfg *rollup.Config,
l2Cfg *params.ChainConfig,
l1Head common.Hash,
l2Head common.Hash,
l2Claim common.Hash,
l2ClaimBlockNumber uint64,
preimageOracle io.ReadWriter,
preimageHinter io.Writer,
) error {
pClient := preimage.NewOracleClient(preimageOracle)
hClient := preimage.NewHintWriter(preimageHinter)
l1PreimageOracle := l1.NewPreimageOracle(pClient, hClient)
l2PreimageOracle := l2.NewPreimageOracle(pClient, hClient)
return Program(logger, cfg, l2Cfg, l1Head, l2Head, l2Claim, l2ClaimBlockNumber, l1PreimageOracle, l2PreimageOracle)
}
// Program executes the L2 state transition, given a minimal interface to retrieve data.
func Program(logger log.Logger, cfg *rollup.Config, l2Cfg *params.ChainConfig, l1Head common.Hash, l2Head common.Hash, l2Claim common.Hash, l2ClaimBlockNum uint64, l1Oracle l1.Oracle, l2Oracle l2.Oracle) error {
l1Source := l1.NewOracleL1Client(logger, l1Oracle, l1Head)
engineBackend, err := l2.NewOracleBackedL2Chain(logger, l2Oracle, l2Cfg, l2Head)
if err != nil {
return fmt.Errorf("failed to create oracle-backed L2 chain: %w", err)
}
l2Source := l2.NewOracleEngine(cfg, logger, engineBackend)
logger.Info("Starting derivation")
d := cldr.NewDriver(logger, cfg, l1Source, l2Source, l2ClaimBlockNum)
for {
if err = d.Step(context.Background()); errors.Is(err, io.EOF) {
break
} else if err != nil {
return err
}
}
if !d.ValidateClaim(eth.Bytes32(l2Claim)) {
return ErrClaimNotValid
}
logger.Info("Derivation complete", "head", d.SafeHead())
return nil
}
...@@ -6,26 +6,20 @@ import ( ...@@ -6,26 +6,20 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"time"
"github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
cldr "github.com/ethereum-optimism/optimism/op-program/client/driver" cl "github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/host/config" "github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore" "github.com/ethereum-optimism/optimism/op-program/host/kvstore"
"github.com/ethereum-optimism/optimism/op-program/host/l1"
"github.com/ethereum-optimism/optimism/op-program/host/l2"
"github.com/ethereum-optimism/optimism/op-program/host/prefetcher" "github.com/ethereum-optimism/optimism/op-program/host/prefetcher"
"github.com/ethereum-optimism/optimism/op-program/preimage" "github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
var (
ErrClaimNotValid = errors.New("invalid claim")
)
type L2Source struct { type L2Source struct {
*sources.L2Client *sources.L2Client
*sources.DebugClient *sources.DebugClient
...@@ -51,8 +45,8 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error { ...@@ -51,8 +45,8 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
kv = kvstore.NewDiskKV(cfg.DataDir) kv = kvstore.NewDiskKV(cfg.DataDir)
} }
var preimageOracle preimage.OracleFn var getPreimage func(key common.Hash) ([]byte, error)
var hinter preimage.HinterFn var hinter func(hint string) error
if cfg.FetchingEnabled() { if cfg.FetchingEnabled() {
logger.Info("Connecting to L1 node", "l1", cfg.L1URL) logger.Info("Connecting to L1 node", "l1", cfg.L1URL)
l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL) l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL)
...@@ -80,54 +74,89 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error { ...@@ -80,54 +74,89 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
logger.Info("Setting up pre-fetcher") logger.Info("Setting up pre-fetcher")
prefetch := prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv) prefetch := prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv)
preimageOracle = asOracleFn(func(key common.Hash) ([]byte, error) { getPreimage = func(key common.Hash) ([]byte, error) { return prefetch.GetPreimage(ctx, key) }
return prefetch.GetPreimage(ctx, key) hinter = prefetch.Hint
})
hinter = asHinter(prefetch.Hint)
} else { } else {
logger.Info("Using offline mode. All required pre-images must be pre-populated.") logger.Info("Using offline mode. All required pre-images must be pre-populated.")
preimageOracle = asOracleFn(kv.Get) getPreimage = kv.Get
hinter = func(v preimage.Hint) { hinter = func(hint string) error {
logger.Debug("ignoring prefetch hint", "hint", v) logger.Debug("ignoring prefetch hint", "hint", hint)
return nil
} }
} }
l1Source := l1.NewSource(logger, preimageOracle, hinter, cfg.L1Head)
l2Source, err := l2.NewEngine(logger, preimageOracle, hinter, cfg) // Setup pipe for preimage oracle interaction
if err != nil { pClientRW, pHostRW := bidirectionalPipe()
return fmt.Errorf("connect l2 oracle: %w", err) oracleServer := preimage.NewOracleServer(pHostRW)
} // Setup pipe for hint comms
hHostR, hClientW := io.Pipe()
hHost := preimage.NewHintReader(hHostR)
defer pHostRW.Close()
defer hHostR.Close()
routeHints(logger, hHost, hinter)
launchOracleServer(logger, oracleServer, getPreimage)
logger.Info("Starting derivation") // TODO(CLI-XXX): This is a hack to wait for the oracle server and hint router to begin polling for requests
d := cldr.NewDriver(logger, cfg.Rollup, l1Source, l2Source, cfg.L2ClaimBlockNumber) // before the program starts. This should be replaced with a more robust solution.
for { time.Sleep(time.Second * 1)
if err = d.Step(ctx); errors.Is(err, io.EOF) {
break return cl.ClientProgram(
} else if err != nil { logger,
return err cfg.Rollup,
} cfg.L2ChainConfig,
} cfg.L1Head,
if !d.ValidateClaim(eth.Bytes32(cfg.L2Claim)) { cfg.L2Head,
return ErrClaimNotValid cfg.L2Claim,
cfg.L2ClaimBlockNumber,
pClientRW,
hClientW,
)
}
type readWritePair struct {
io.ReadCloser
io.WriteCloser
}
func (rw *readWritePair) Close() error {
if err := rw.ReadCloser.Close(); err != nil {
return err
} }
return nil return rw.WriteCloser.Close()
} }
func asOracleFn(getter func(key common.Hash) ([]byte, error)) preimage.OracleFn { func bidirectionalPipe() (a, b io.ReadWriteCloser) {
return func(key preimage.Key) []byte { ar, bw := io.Pipe()
pre, err := getter(key.PreimageKey()) br, aw := io.Pipe()
if err != nil { return &readWritePair{ReadCloser: ar, WriteCloser: aw}, &readWritePair{ReadCloser: br, WriteCloser: bw}
panic(fmt.Errorf("preimage unavailable for key %v: %w", key, err)) }
func routeHints(logger log.Logger, hintReader *preimage.HintReader, hinter func(hint string) error) {
go func() {
for {
if err := hintReader.NextHint(hinter); err != nil {
if err == io.EOF || errors.Is(err, io.ErrClosedPipe) {
logger.Info("closing pre-image hint handler")
return
}
logger.Error("pre-image hint router error", "err", err)
return
}
} }
return pre }()
}
} }
func asHinter(hint func(hint string) error) preimage.HinterFn { func launchOracleServer(logger log.Logger, server *preimage.OracleServer, getter func(key common.Hash) ([]byte, error)) {
return func(v preimage.Hint) { go func() {
err := hint(v.Hint()) for {
if err != nil { if err := server.NextPreimageRequest(getter); err != nil {
panic(fmt.Errorf("hint rejected %v: %w", v, err)) if err == io.EOF || errors.Is(err, io.ErrClosedPipe) {
logger.Info("closing pre-image server")
return
}
logger.Error("pre-image server error", "error", err)
return
}
} }
} }()
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment