Commit 877fcf88 authored by inphi's avatar inphi

cannon: Prevent deadlock on pre-image reads

Reading a FileChannel that does not have a writer blocks
indefinitely. This occurs in cannon whenever the pre-image oracle
server shuts down prematurely. We add a new io.ReadWriter that can
detect when a FileChannel would otherwise block indefinitely.
parent 7ec383b1
package cmd package cmd
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"os/exec" "os/exec"
...@@ -112,11 +113,15 @@ func (rk rawKey) PreimageKey() [32]byte { ...@@ -112,11 +113,15 @@ func (rk rawKey) PreimageKey() [32]byte {
} }
type ProcessPreimageOracle struct { type ProcessPreimageOracle struct {
pCl *preimage.OracleClient pCl *preimage.OracleClient
hCl *preimage.HintWriter hCl *preimage.HintWriter
cmd *exec.Cmd cmd *exec.Cmd
waitErr chan error
cancelIO context.CancelCauseFunc
} }
const clientPollTimeout = time.Second * 15
func NewProcessPreimageOracle(name string, args []string) (*ProcessPreimageOracle, error) { func NewProcessPreimageOracle(name string, args []string) (*ProcessPreimageOracle, error) {
if name == "" { if name == "" {
return &ProcessPreimageOracle{}, nil return &ProcessPreimageOracle{}, nil
...@@ -140,10 +145,18 @@ func NewProcessPreimageOracle(name string, args []string) (*ProcessPreimageOracl ...@@ -140,10 +145,18 @@ func NewProcessPreimageOracle(name string, args []string) (*ProcessPreimageOracl
pOracleRW.Reader(), pOracleRW.Reader(),
pOracleRW.Writer(), pOracleRW.Writer(),
} }
// Note that the client file descriptors are not closed when the pre-image server exits.
// So we use the FilePoller to ensure that we don't get stuck in a blocking read/write.
ctx, cancelIO := context.WithCancelCause(context.Background())
preimageClientIO := preimage.NewFilePoller(ctx, pClientRW, clientPollTimeout)
hostClientIO := preimage.NewFilePoller(ctx, hClientRW, clientPollTimeout)
out := &ProcessPreimageOracle{ out := &ProcessPreimageOracle{
pCl: preimage.NewOracleClient(pClientRW), pCl: preimage.NewOracleClient(preimageClientIO),
hCl: preimage.NewHintWriter(hClientRW), hCl: preimage.NewHintWriter(hostClientIO),
cmd: cmd, cmd: cmd,
waitErr: make(chan error),
cancelIO: cancelIO,
} }
return out, nil return out, nil
} }
...@@ -166,23 +179,37 @@ func (p *ProcessPreimageOracle) Start() error { ...@@ -166,23 +179,37 @@ func (p *ProcessPreimageOracle) Start() error {
if p.cmd == nil { if p.cmd == nil {
return nil return nil
} }
return p.cmd.Start() err := p.cmd.Start()
go p.wait()
return err
} }
func (p *ProcessPreimageOracle) Close() error { func (p *ProcessPreimageOracle) Close() error {
if p.cmd == nil { if p.cmd == nil {
return nil return nil
} }
// We first check if the process has already exited before signaling.
// Note: This is a teeny bit racy since the process could exit after the check
// above and another process is assigned the same pid.
select {
case err := <-p.waitErr:
return err
case <-time.After(time.Second * 1):
// give the wait goroutine time to reap process
}
_ = p.cmd.Process.Signal(os.Interrupt) _ = p.cmd.Process.Signal(os.Interrupt)
// Go 1.20 feature, to introduce later return <-p.waitErr
//p.cmd.WaitDelay = time.Second * 10 }
func (p *ProcessPreimageOracle) wait() {
err := p.cmd.Wait() err := p.cmd.Wait()
if err, ok := err.(*exec.ExitError); ok { var waitErr error
if err.Success() { if err, ok := err.(*exec.ExitError); !ok || !err.Success() {
return nil waitErr = err
}
} }
return err p.cancelIO(fmt.Errorf("%w: pre-image server has exited", waitErr))
p.waitErr <- waitErr
close(p.waitErr)
} }
type StepFn func(proof bool) (*mipsevm.StepWitness, error) type StepFn func(proof bool) (*mipsevm.StepWitness, error)
......
package preimage
import (
"context"
"errors"
"os"
"time"
)
// FilePoller is a ReadWriteCloser that polls the underlying file channel for reads and writes
// until its context is done. This is useful in detecting when the other end of a
// blocking pre-image channel is no longer available.
type FilePoller struct {
File FileChannel
ctx context.Context
pollTimeout time.Duration
}
// NewFilePoller returns a FilePoller that polls the underlying file channel for reads and writes until
// the provided ctx is done. The poll timeout is the maximum amount of time to wait for I/O before
// the operation is halted and the context is checked for cancellation.
func NewFilePoller(ctx context.Context, f FileChannel, pollTimeout time.Duration) *FilePoller {
return &FilePoller{File: f, ctx: ctx, pollTimeout: pollTimeout}
}
func (f *FilePoller) Read(b []byte) (int, error) {
var read int
for {
if err := f.File.Reader().SetReadDeadline(time.Now().Add(f.pollTimeout)); err != nil {
panic(err)
}
n, err := f.File.Read(b[read:])
if errors.Is(err, os.ErrDeadlineExceeded) {
if cerr := f.ctx.Err(); cerr != nil {
return read, cerr
}
} else {
read += n
if read >= len(b) {
return read, err
}
}
}
}
func (f *FilePoller) Write(b []byte) (int, error) {
var written int
for {
if err := f.File.Writer().SetWriteDeadline(time.Now().Add(f.pollTimeout)); err != nil {
panic(err)
}
n, err := f.File.Write(b[written:])
if errors.Is(err, os.ErrDeadlineExceeded) {
if cerr := f.ctx.Err(); cerr != nil {
return written, cerr
}
} else {
written += n
if written >= len(b) {
return written, err
}
}
}
}
func (p *FilePoller) Close() error {
return p.File.Close()
}
package preimage
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestFilePoller_Read(t *testing.T) {
chanA, chanB, err := CreateBidirectionalChannel()
require.NoError(t, err)
ctx := context.Background()
chanAPoller := NewFilePoller(ctx, chanA, time.Millisecond*100)
go func() {
chanB.Write([]byte("hello"))
time.Sleep(time.Second * 1)
chanB.Write([]byte("world"))
}()
var buf [10]byte
n, err := chanAPoller.Read(buf[:])
require.Equal(t, 10, n)
require.NoError(t, err)
}
func TestFilePoller_Write(t *testing.T) {
chanA, chanB, err := CreateBidirectionalChannel()
require.NoError(t, err)
ctx := context.Background()
chanAPoller := NewFilePoller(ctx, chanA, time.Millisecond*100)
bufch := make(chan []byte, 1)
go func() {
var buf [10]byte
chanB.Read(buf[:5])
time.Sleep(time.Second * 1)
chanB.Read(buf[5:])
bufch <- buf[:]
close(bufch)
}()
buf := []byte("helloworld")
n, err := chanAPoller.Write(buf)
require.Equal(t, 10, n)
require.NoError(t, err)
select {
case <-time.After(time.Second * 60):
t.Fatal("timed out waiting for read")
case readbuf := <-bufch:
require.Equal(t, buf, readbuf)
}
}
func TestFilePoller_ReadCancel(t *testing.T) {
chanA, chanB, err := CreateBidirectionalChannel()
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
chanAPoller := NewFilePoller(ctx, chanA, time.Millisecond*100)
go func() {
chanB.Write([]byte("hello"))
cancel()
}()
var buf [10]byte
n, err := chanAPoller.Read(buf[:])
require.Equal(t, 5, n)
require.ErrorIs(t, err, context.Canceled)
}
func TestFilePoller_WriteCancel(t *testing.T) {
chanA, chanB, err := CreateBidirectionalChannel()
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
chanAPoller := NewFilePoller(ctx, chanA, time.Millisecond*100)
go func() {
var buf [5]byte
chanB.Read(buf[:])
cancel()
}()
// use a large buffer to overflow the kernel buffer provided to pipe(2) so the write actually blocks
buf := make([]byte, 1024*1024)
_, err = chanAPoller.Write(buf)
require.ErrorIs(t, err, context.Canceled)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment