Commit 5f9b8919 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Cleanup channel definition (#3229)

* op-node: Cleanup channel definition

This moves some files around and then expands upon the Channel struct.
It is better built for adding frames + then reading data when it is
ready.

Some tests are still failing and I don't know why.

* ReadBatch in channel.go

* op-node: Fix channel.go + start of new interface

* fix lint

* op-node: Use simple channel decoding

* revert switch to reader

This was due to the difficulty in making the mocks work with the reader.

* fix lint

* fix bug around trying to use an empty chanel
Co-authored-by: default avatarmergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
parent de38b6f6
package derive
import (
"bytes"
"compress/zlib"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/rlp"
)
// TODO: Full state machine for channel
// Open, Closed, Ready (todo - when to construct RLP reader)
// A Channel is a set of batches that are split into at least one, but possibly multiple frames.
// Frames are allowed to be ingested out of order.
// Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the
// channel may mark itself as ready for reading once all intervening frames have been added
type Channel struct {
// id of the channel
id ChannelID
// estimated memory size, used to drop the channel if we have too much data
size uint64
// true if we have buffered the last frame
closed bool
// TODO: implement this check
// highestFrameNumber is the highest frame number yet seen.
// This must be equal to `endFrameNumber`
// highestFrameNumber uint16
// endFrameNumber is the frame number of the frame where `isLast` is true
// No other frame number must be larger than this.
endFrameNumber uint16
// Store a map of frame number -> frame data for constant time ordering
inputs map[uint64][]byte
highestL1InclusionBlock eth.L1BlockRef
}
func NewChannel(id ChannelID) *Channel {
return &Channel{
id: id,
inputs: make(map[uint64][]byte),
}
}
// AddFrame adds a frame to the channel.
// If the frame is not valid for the channel it returns an error.
// Otherwise the frame is buffered.
func (ch *Channel) AddFrame(frame Frame, l1InclusionBlock eth.L1BlockRef) error {
if frame.ID != ch.id {
return fmt.Errorf("frame id does not match channel id. Expected %v, got %v", ch.id, frame.ID)
}
if frame.IsLast && ch.closed {
return fmt.Errorf("cannot add ending frame to a closed channel. id %v", ch.id)
}
if _, ok := ch.inputs[uint64(frame.FrameNumber)]; ok {
return DuplicateErr
}
// TODO: highest seen frame vs endFrameNumber
// Guaranteed to succeed. Now update internal state
if frame.IsLast {
ch.endFrameNumber = frame.FrameNumber
ch.closed = true
}
if ch.highestL1InclusionBlock.Number < l1InclusionBlock.Number {
ch.highestL1InclusionBlock = l1InclusionBlock
}
ch.inputs[uint64(frame.FrameNumber)] = frame.Data
ch.size += uint64(len(frame.Data)) + frameOverhead
// todo use `IsReady` + state to create final output reader
return nil
}
// Size returns the current size of the channel including frame overhead.
// Reading from the channel does not reduce the size as reading is done
// on uncompressed data while this size is over compressed data.
func (ch *Channel) Size() uint64 {
return ch.size
}
// IsReady returns true iff the channel is ready to be read.
func (ch *Channel) IsReady() bool {
// Must see the last frame before the channel is ready to be read
if !ch.closed {
return false
}
// Must have the possibility of contiguous frames
if len(ch.inputs) != int(ch.endFrameNumber)+1 {
return false
}
// Check for contiguous frames
for i := uint64(0); i <= uint64(ch.endFrameNumber); i++ {
_, ok := ch.inputs[i]
if !ok {
return false
}
}
return true
}
// Reader returns an io.Reader over the channel data.
// This panics if it is called while `IsReady` is not true.
// This function is able to be called multiple times.
func (ch *Channel) Reader() io.Reader {
var readers []io.Reader
for i := uint64(0); i <= uint64(ch.endFrameNumber); i++ {
data, ok := ch.inputs[i]
if !ok {
panic("dev error in channel.Reader. Must be called after the channel is ready.")
}
readers = append(readers, bytes.NewBuffer(data))
}
return io.MultiReader(readers...)
}
// BatchReader provides a function that iteratively consumes batches from the reader.
// The L1Inclusion block is also provided at creation time.
func BatchReader(r io.Reader, l1InclusionBlock eth.L1BlockRef) (func() (BatchWithL1InclusionBlock, error), error) {
// Setup decompressor stage + RLP reader
zr, err := zlib.NewReader(r)
if err != nil {
return nil, err
}
rlpReader := rlp.NewStream(zr, 10_000_000)
// Read each batch iteratively
return func() (BatchWithL1InclusionBlock, error) {
ret := BatchWithL1InclusionBlock{
L1InclusionBlock: l1InclusionBlock,
}
err := rlpReader.Decode(&ret.Batch)
return ret, err
}, nil
}
...@@ -32,8 +32,8 @@ type ChannelBank struct { ...@@ -32,8 +32,8 @@ type ChannelBank struct {
log log.Logger log log.Logger
cfg *rollup.Config cfg *rollup.Config
channels map[ChannelID]*ChannelIn // channels by ID channels map[ChannelID]*Channel // channels by ID
channelQueue []ChannelID // channels in FIFO order channelQueue []ChannelID // channels in FIFO order
resetting bool resetting bool
...@@ -49,7 +49,7 @@ func NewChannelBank(log log.Logger, cfg *rollup.Config, next ChannelBankOutput) ...@@ -49,7 +49,7 @@ func NewChannelBank(log log.Logger, cfg *rollup.Config, next ChannelBankOutput)
return &ChannelBank{ return &ChannelBank{
log: log, log: log,
cfg: cfg, cfg: cfg,
channels: make(map[ChannelID]*ChannelIn), channels: make(map[ChannelID]*Channel),
channelQueue: make([]ChannelID, 0, 10), channelQueue: make([]ChannelID, 0, 10),
next: next, next: next,
} }
...@@ -106,14 +106,15 @@ func (ib *ChannelBank) IngestData(data []byte) { ...@@ -106,14 +106,15 @@ func (ib *ChannelBank) IngestData(data []byte) {
} }
currentCh, ok := ib.channels[f.ID] currentCh, ok := ib.channels[f.ID]
if !ok { // create new channel if it doesn't exist yet if !ok {
currentCh = &ChannelIn{id: f.ID} // create new channel if it doesn't exist yet
currentCh = NewChannel(f.ID)
ib.channels[f.ID] = currentCh ib.channels[f.ID] = currentCh
ib.channelQueue = append(ib.channelQueue, f.ID) ib.channelQueue = append(ib.channelQueue, f.ID)
} }
ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data)) ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.IngestData(uint64(f.FrameNumber), f.IsLast, f.Data); err != nil { if err := currentCh.AddFrame(f, ib.progress.Origin); err != nil {
ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err) ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
continue continue
} }
...@@ -132,15 +133,17 @@ func (ib *ChannelBank) Read() (data []byte, err error) { ...@@ -132,15 +133,17 @@ func (ib *ChannelBank) Read() (data []byte, err error) {
if timedOut { if timedOut {
ib.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs)) ib.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs))
} }
if ch.closed { if ch.IsReady() {
ib.log.Debug("channel closed", "channel", first) ib.log.Debug("channel ready", "channel", first)
} }
if !timedOut && !ch.closed { // check if channel is done (can then be read) if !timedOut && !ch.IsReady() { // check if channel is readya (can then be read)
return nil, io.EOF return nil, io.EOF
} }
delete(ib.channels, first) delete(ib.channels, first)
ib.channelQueue = ib.channelQueue[1:] ib.channelQueue = ib.channelQueue[1:]
data = ch.Read() r := ch.Reader()
// Suprress error here. io.ReadAll does return nil instead of io.EOF though.
data, _ = io.ReadAll(r)
return data, nil return data, nil
} }
......
package derive
import (
"fmt"
)
type ChannelIn struct {
// id of the channel
id ChannelID
// estimated memory size, used to drop the channel if we have too much data
size uint64
// true if we have buffered the last frame
closed bool
inputs map[uint64][]byte
}
// IngestData buffers a frame in the channel
func (ch *ChannelIn) IngestData(frameNum uint64, isLast bool, frameData []byte) error {
if ch.closed {
return fmt.Errorf("already received a closing frame")
}
// create buffer if it didn't exist yet
if ch.inputs == nil {
ch.inputs = make(map[uint64][]byte)
}
if _, exists := ch.inputs[frameNum]; exists {
// already seen a frame for this channel with this frame number
return DuplicateErr
}
// buffer the frame
ch.inputs[frameNum] = frameData
ch.closed = isLast
ch.size += uint64(len(frameData)) + frameOverhead
return nil
}
// Read full channel content (it may be incomplete if the channel is not Closed)
func (ch *ChannelIn) Read() (out []byte) {
for frameNr := uint64(0); ; frameNr++ {
data, ok := ch.inputs[frameNr]
if !ok {
return
}
out = append(out, data...)
}
}
...@@ -2,13 +2,10 @@ package derive ...@@ -2,13 +2,10 @@ package derive
import ( import (
"bytes" "bytes"
"compress/zlib"
"context" "context"
"io" "io"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
) )
// Channel In Reader reads a batch from the channel // Channel In Reader reads a batch from the channel
...@@ -16,12 +13,6 @@ import ( ...@@ -16,12 +13,6 @@ import (
// This is a pure function from the channel, but each channel (or channel fragment) // This is a pure function from the channel, but each channel (or channel fragment)
// must be tagged with an L1 inclusion block to be passed to the the batch queue. // must be tagged with an L1 inclusion block to be passed to the the batch queue.
// zlib returns an io.ReadCloser but explicitly documents it is also a zlib.Resetter, and we want to use it as such.
type zlibReader interface {
io.ReadCloser
zlib.Resetter
}
type BatchQueueStage interface { type BatchQueueStage interface {
StageProgress StageProgress
AddBatch(batch *BatchData) AddBatch(batch *BatchData)
...@@ -30,12 +21,7 @@ type BatchQueueStage interface { ...@@ -30,12 +21,7 @@ type BatchQueueStage interface {
type ChannelInReader struct { type ChannelInReader struct {
log log.Logger log log.Logger
ready bool nextBatchFn func() (BatchWithL1InclusionBlock, error)
r *bytes.Reader
readZlib zlibReader
readRLP *rlp.Stream
data []byte
progress Progress progress Progress
...@@ -53,58 +39,22 @@ func (cr *ChannelInReader) Progress() Progress { ...@@ -53,58 +39,22 @@ func (cr *ChannelInReader) Progress() Progress {
return cr.progress return cr.progress
} }
// TODO: Take full channel for better logging
func (cr *ChannelInReader) WriteChannel(data []byte) { func (cr *ChannelInReader) WriteChannel(data []byte) {
if cr.progress.Closed { if cr.progress.Closed {
panic("write channel while closed") panic("write channel while closed")
} }
cr.data = data if f, err := BatchReader(bytes.NewBuffer(data), cr.progress.Origin); err == nil {
cr.ready = false cr.nextBatchFn = f
} } else {
cr.log.Error("Error creating batch reader from channel data", "err", err)
// ReadBatch returns a decoded rollup batch, or an error:
// - io.EOF, if the ChannelInReader source needs more data, to be provided with WriteChannel()/
// - any other error (e.g. invalid compression or batch data):
// the caller should ChannelInReader.NextChannel() before continuing reading the next batch.
func (cr *ChannelInReader) ReadBatch(dest *BatchData) error {
// The channel reader may not be initialized yet,
// and initializing involves reading (zlib header data), so we do that now.
if !cr.ready {
if cr.data == nil {
return io.EOF
}
if cr.r == nil {
cr.r = bytes.NewReader(cr.data)
} else {
cr.r.Reset(cr.data)
}
if cr.readZlib == nil {
// creating a new zlib reader involves resetting it, which reads data, which may error
zr, err := zlib.NewReader(cr.r)
if err != nil {
return err
}
cr.readZlib = zr.(zlibReader)
} else {
err := cr.readZlib.Reset(cr.r, nil)
if err != nil {
return err
}
}
if cr.readRLP == nil {
cr.readRLP = rlp.NewStream(cr.readZlib, 10_000_000)
} else {
cr.readRLP.Reset(cr.readZlib, 10_000_000)
}
cr.ready = true
} }
return cr.readRLP.Decode(dest)
} }
// NextChannel forces the next read to continue with the next channel, // NextChannel forces the next read to continue with the next channel,
// resetting any decoding/decompression state to a fresh start. // resetting any decoding/decompression state to a fresh start.
func (cr *ChannelInReader) NextChannel() { func (cr *ChannelInReader) NextChannel() {
cr.ready = false cr.nextBatchFn = nil
cr.data = nil
} }
func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error { func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error {
...@@ -112,21 +62,27 @@ func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error { ...@@ -112,21 +62,27 @@ func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error {
return err return err
} }
var batch BatchData if cr.nextBatchFn == nil {
if err := cr.ReadBatch(&batch); err == io.EOF { return io.EOF
}
// TODO: can batch be non nil while err == io.EOF
// This depends on the behavior of rlp.Stream
batch, err := cr.nextBatchFn()
if err == io.EOF {
return io.EOF return io.EOF
} else if err != nil { } else if err != nil {
cr.log.Warn("failed to read batch from channel reader, skipping to next channel now", "err", err) cr.log.Warn("failed to read batch from channel reader, skipping to next channel now", "err", err)
cr.NextChannel() cr.NextChannel()
return nil return nil
} }
cr.next.AddBatch(&batch) cr.next.AddBatch(batch.Batch)
return nil return nil
} }
func (cr *ChannelInReader) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { func (cr *ChannelInReader) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
cr.ready = false cr.nextBatchFn = nil
cr.data = nil
cr.progress = cr.next.Progress() cr.progress = cr.next.Progress()
return io.EOF return io.EOF
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment