Commit 37f3fb86 authored by inphi's avatar inphi

bidirectional hints channel

parent 5da18810
...@@ -32,7 +32,7 @@ func ClientProgram( ...@@ -32,7 +32,7 @@ func ClientProgram(
l2Claim common.Hash, l2Claim common.Hash,
l2ClaimBlockNumber uint64, l2ClaimBlockNumber uint64,
preimageOracle io.ReadWriter, preimageOracle io.ReadWriter,
preimageHinter io.Writer, preimageHinter io.ReadWriter,
) error { ) error {
pClient := preimage.NewOracleClient(preimageOracle) pClient := preimage.NewOracleClient(preimageOracle)
hClient := preimage.NewHintWriter(preimageHinter) hClient := preimage.NewHintWriter(preimageHinter)
......
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"time"
"github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
...@@ -89,16 +88,16 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error { ...@@ -89,16 +88,16 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
pClientRW, pHostRW := bidirectionalPipe() pClientRW, pHostRW := bidirectionalPipe()
oracleServer := preimage.NewOracleServer(pHostRW) oracleServer := preimage.NewOracleServer(pHostRW)
// Setup pipe for hint comms // Setup pipe for hint comms
hHostR, hClientW := io.Pipe() hClientRW, hHostRW := bidirectionalPipe()
hHost := preimage.NewHintReader(hHostR) hHost := preimage.NewHintReader(hHostRW)
defer pHostRW.Close() defer pHostRW.Close()
defer hHostR.Close() defer hHostRW.Close()
routeHints(logger, hHost, hinter) routeHints(logger, hHost, hinter)
launchOracleServer(logger, oracleServer, getPreimage) launchOracleServer(logger, oracleServer, getPreimage)
// TODO(CLI-XXX): This is a hack to wait for the oracle server and hint router to begin polling for requests // TODO(CLI-XXX): This is a hack to wait for the oracle server and hint router to begin polling for requests
// before the program starts. This should be replaced with a more robust solution. // before the program starts. This should be replaced with a more robust solution.
time.Sleep(time.Second * 1) //time.Sleep(time.Second * 1)
return cl.ClientProgram( return cl.ClientProgram(
logger, logger,
...@@ -109,7 +108,7 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error { ...@@ -109,7 +108,7 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
cfg.L2Claim, cfg.L2Claim,
cfg.L2ClaimBlockNumber, cfg.L2ClaimBlockNumber,
pClientRW, pClientRW,
hClientW, hClientRW,
) )
} }
......
...@@ -9,13 +9,13 @@ import ( ...@@ -9,13 +9,13 @@ import (
// HintWriter writes hints to an io.Writer (e.g. a special file descriptor, or a debug log), // HintWriter writes hints to an io.Writer (e.g. a special file descriptor, or a debug log),
// for a pre-image oracle service to prepare specific pre-images. // for a pre-image oracle service to prepare specific pre-images.
type HintWriter struct { type HintWriter struct {
w io.Writer rw io.ReadWriter
} }
var _ Hinter = (*HintWriter)(nil) var _ Hinter = (*HintWriter)(nil)
func NewHintWriter(w io.Writer) *HintWriter { func NewHintWriter(rw io.ReadWriter) *HintWriter {
return &HintWriter{w: w} return &HintWriter{rw: rw}
} }
func (hw *HintWriter) Hint(v Hint) { func (hw *HintWriter) Hint(v Hint) {
...@@ -23,26 +23,29 @@ func (hw *HintWriter) Hint(v Hint) { ...@@ -23,26 +23,29 @@ func (hw *HintWriter) Hint(v Hint) {
var hintBytes []byte var hintBytes []byte
hintBytes = binary.BigEndian.AppendUint32(hintBytes, uint32(len(hint))) hintBytes = binary.BigEndian.AppendUint32(hintBytes, uint32(len(hint)))
hintBytes = append(hintBytes, []byte(hint)...) hintBytes = append(hintBytes, []byte(hint)...)
hintBytes = append(hintBytes, 0) // to block writing on _, err := hw.rw.Write(hintBytes)
_, err := hw.w.Write(hintBytes)
if err != nil { if err != nil {
panic(fmt.Errorf("failed to write pre-image hint: %w", err)) panic(fmt.Errorf("failed to write pre-image hint: %w", err))
} }
_, err = hw.rw.Read([]byte{0})
if err != nil {
panic(fmt.Errorf("failed to read pre-image hint ack: %w", err))
}
} }
// HintReader reads the hints of HintWriter and passes them to a router for preparation of the requested pre-images. // HintReader reads the hints of HintWriter and passes them to a router for preparation of the requested pre-images.
// Onchain the written hints are no-op. // Onchain the written hints are no-op.
type HintReader struct { type HintReader struct {
r io.Reader rw io.ReadWriter
} }
func NewHintReader(r io.Reader) *HintReader { func NewHintReader(rw io.ReadWriter) *HintReader {
return &HintReader{r: r} return &HintReader{rw: rw}
} }
func (hr *HintReader) NextHint(router func(hint string) error) error { func (hr *HintReader) NextHint(router func(hint string) error) error {
var length uint32 var length uint32
if err := binary.Read(hr.r, binary.BigEndian, &length); err != nil { if err := binary.Read(hr.rw, binary.BigEndian, &length); err != nil {
if err == io.EOF { if err == io.EOF {
return io.EOF return io.EOF
} }
...@@ -50,17 +53,17 @@ func (hr *HintReader) NextHint(router func(hint string) error) error { ...@@ -50,17 +53,17 @@ func (hr *HintReader) NextHint(router func(hint string) error) error {
} }
payload := make([]byte, length) payload := make([]byte, length)
if length > 0 { if length > 0 {
if _, err := io.ReadFull(hr.r, payload); err != nil { if _, err := io.ReadFull(hr.rw, payload); err != nil {
return fmt.Errorf("failed to read hint payload (length %d): %w", length, err) return fmt.Errorf("failed to read hint payload (length %d): %w", length, err)
} }
} }
if err := router(string(payload)); err != nil { if err := router(string(payload)); err != nil {
// stream recovery // write back on error to unblock the HintWriter
_, _ = hr.r.Read([]byte{0}) _, _ = hr.rw.Write([]byte{0})
return fmt.Errorf("failed to handle hint: %w", err) return fmt.Errorf("failed to handle hint: %w", err)
} }
if _, err := hr.r.Read([]byte{0}); err != nil { if _, err := hr.rw.Write([]byte{0}); err != nil {
return fmt.Errorf("failed to read trailing no-op byte to unblock hint writer: %w", err) return fmt.Errorf("failed to write trailing no-op byte to unblock hint writer: %w", err)
} }
return nil return nil
} }
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"crypto/rand" "crypto/rand"
"errors" "errors"
"io" "io"
"sync"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -20,26 +21,39 @@ func TestHints(t *testing.T) { ...@@ -20,26 +21,39 @@ func TestHints(t *testing.T) {
// Note: pretty much every string is valid communication: // Note: pretty much every string is valid communication:
// length, payload, 0. Worst case you run out of data, or allocate too much. // length, payload, 0. Worst case you run out of data, or allocate too much.
testHint := func(hints ...string) { testHint := func(hints ...string) {
var buf bytes.Buffer a, b := bidirectionalPipe()
hw := NewHintWriter(&buf) var wg sync.WaitGroup
for _, h := range hints { wg.Add(2)
hw.Hint(rawHint(h))
} go func() {
hr := NewHintReader(&buf) hw := NewHintWriter(a)
var got []string for _, h := range hints {
for i := 0; i < 100; i++ { // sanity limit hw.Hint(rawHint(h))
err := hr.NextHint(func(hint string) error {
got = append(got, hint)
return nil
})
if err == io.EOF {
break
} }
require.NoError(t, err) wg.Done()
} }()
got := make(chan string, len(hints))
go func() {
println("MENA")
defer wg.Done()
hr := NewHintReader(b)
for i := 0; i < len(hints); i++ {
err := hr.NextHint(func(hint string) error {
got <- hint
return nil
})
if err == io.EOF {
break
}
require.NoError(t, err)
}
}()
wg.Wait()
require.Equal(t, len(hints), len(got), "got all hints") require.Equal(t, len(hints), len(got), "got all hints")
for i, h := range hints { for _, h := range hints {
require.Equal(t, h, got[i], "hints match") require.Equal(t, h, <-got, "hints match")
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment