Commit 6191eea7 authored by Tei Im's avatar Tei Im Committed by protolambda

Split ChannelOut to singular and span channel out

parent 460bb3f3
......@@ -121,7 +121,7 @@ type channelBuilder struct {
// guaranteed to be a ChannelFullError wrapping the specific reason.
fullErr error
// current channel
co *derive.ChannelOut
co derive.ChannelOut
// list of blocks in the channel. Saved in case the channel must be rebuilt
blocks []*types.Block
// frames data queue, to be send as txs
......@@ -139,7 +139,7 @@ func newChannelBuilder(cfg ChannelConfig, spanBatchBuilder *derive.SpanBatchBuil
if err != nil {
return nil, err
}
co, err := derive.NewChannelOut(c, cfg.BatchType, spanBatchBuilder)
co, err := derive.NewChannelOut(cfg.BatchType, c, spanBatchBuilder)
if err != nil {
return nil, err
}
......
......@@ -449,7 +449,7 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) {
// to construct a single frame
c, err := channelConfig.CompressorConfig.NewCompressor()
require.NoError(t, err)
co, err := derive.NewChannelOut(c, derive.SingularBatchType, nil)
co, err := derive.NewChannelOut(derive.SingularBatchType, c, nil)
require.NoError(t, err)
var buf bytes.Buffer
fn, err := co.OutputFrame(&buf, channelConfig.MaxFrameSize)
......
......@@ -61,8 +61,11 @@ type ChannelOutIface interface {
OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error)
}
// Compile-time check for ChannelOutIface interface implementation for the ChannelOut type.
var _ ChannelOutIface = (*derive.ChannelOut)(nil)
// Compile-time check for ChannelOutIface interface implementation for the SingularChannelOut type.
var _ ChannelOutIface = (*derive.SingularChannelOut)(nil)
// Compile-time check for ChannelOutIface interface implementation for the SpanChannelOut type.
var _ ChannelOutIface = (*derive.SpanChannelOut)(nil)
// Compile-time check for ChannelOutIface interface implementation for the GarbageChannelOut type.
var _ ChannelOutIface = (*GarbageChannelOut)(nil)
......
......@@ -140,7 +140,7 @@ func (s *L2Batcher) Buffer(t Testing) error {
ApproxComprRatio: 1,
})
require.NoError(t, e, "failed to create compressor")
ch, err = derive.NewChannelOut(c, derive.SingularBatchType, nil)
ch, err = derive.NewChannelOut(derive.SingularBatchType, c, nil)
}
require.NoError(t, err, "failed to create channel")
s.l2ChannelOut = ch
......
......@@ -48,14 +48,31 @@ type Compressor interface {
FullErr() error
}
type ChannelOutReader interface {
io.Writer
io.Reader
Reset()
Len() int
type ChannelOut interface {
ID() ChannelID
Reset() error
AddBlock(*types.Block) (uint64, error)
AddSingularBatch(*SingularBatch) (uint64, error)
InputBytes() int
ReadyBytes() int
Flush() error
FullErr() error
Close() error
OutputFrame(*bytes.Buffer, uint64) (uint16, error)
}
func NewChannelOut(batchType uint, compress Compressor, spanBatchBuilder *SpanBatchBuilder) (ChannelOut, error) {
switch batchType {
case SingularBatchType:
return NewSingularChannelOut(compress)
case SpanBatchType:
return NewSpanChannelOut(compress, spanBatchBuilder)
default:
return nil, fmt.Errorf("unrecognized batch type: %d", batchType)
}
}
type ChannelOut struct {
type SingularChannelOut struct {
id ChannelID
// Frame ID of the next frame to emit. Increment after emitting
frame uint64
......@@ -64,35 +81,20 @@ type ChannelOut struct {
// Compressor stage. Write input data to it
compress Compressor
// closed indicates if the channel is closed
closed bool
// batchType indicates whether this channel uses SingularBatch or SpanBatch
batchType uint
// spanBatchBuilder contains information requires to build SpanBatch
spanBatchBuilder *SpanBatchBuilder
// reader contains compressed data for making output frames
reader ChannelOutReader
}
func (co *ChannelOut) ID() ChannelID {
func (co *SingularChannelOut) ID() ChannelID {
return co.id
}
func NewChannelOut(compress Compressor, batchType uint, spanBatchBuilder *SpanBatchBuilder) (*ChannelOut, error) {
// If the channel uses SingularBatch, use compressor directly as its reader
var reader ChannelOutReader = compress
if batchType == SpanBatchType {
// If the channel uses SpanBatch, create empty buffer for reader
reader = &bytes.Buffer{}
}
c := &ChannelOut{
id: ChannelID{}, // TODO: use GUID here instead of fully random data
frame: 0,
rlpLength: 0,
compress: compress,
batchType: batchType,
spanBatchBuilder: spanBatchBuilder,
reader: reader,
func NewSingularChannelOut(compress Compressor) (*SingularChannelOut, error) {
c := &SingularChannelOut{
id: ChannelID{}, // TODO: use GUID here instead of fully random data
frame: 0,
rlpLength: 0,
compress: compress,
}
_, err := rand.Read(c.id[:])
if err != nil {
......@@ -102,16 +104,12 @@ func NewChannelOut(compress Compressor, batchType uint, spanBatchBuilder *SpanBa
return c, nil
}
// TODO: reuse ChannelOut for performance
func (co *ChannelOut) Reset() error {
// TODO: reuse SingularChannelOut for performance
func (co *SingularChannelOut) Reset() error {
co.frame = 0
co.rlpLength = 0
co.compress.Reset()
co.reader.Reset()
co.closed = false
if co.spanBatchBuilder != nil {
co.spanBatchBuilder.Reset()
}
_, err := rand.Read(co.id[:])
return err
}
......@@ -120,7 +118,7 @@ func (co *ChannelOut) Reset() error {
// and an error if there is a problem adding the block. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) {
func (co *SingularChannelOut) AddBlock(block *types.Block) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
}
......@@ -137,28 +135,17 @@ func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) {
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
//
// AddSingularBatch should be used together with BlockToSingularBatch if you need to access the
// AddSingularBatch should be used together with BlockToBatch if you need to access the
// BatchData before adding a block to the channel. It isn't possible to access
// the batch data with AddBlock.
func (co *ChannelOut) AddSingularBatch(batch *SingularBatch) (uint64, error) {
func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
}
switch co.batchType {
case SingularBatchType:
return co.writeSingularBatch(batch)
case SpanBatchType:
return co.writeSpanBatch(batch)
default:
return 0, fmt.Errorf("unrecognized batch type: %d", co.batchType)
}
}
func (co *ChannelOut) writeSingularBatch(batch *SingularBatch) (uint64, error) {
var buf bytes.Buffer
// We encode to a temporary buffer to determine the encoded length to
// ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
var buf bytes.Buffer
if err := rlp.Encode(&buf, NewSingularBatchData(*batch)); err != nil {
return 0, err
}
......@@ -173,113 +160,33 @@ func (co *ChannelOut) writeSingularBatch(batch *SingularBatch) (uint64, error) {
return uint64(written), err
}
// writeSpanBatch appends a SingularBatch to the channel's SpanBatch.
// A channel can have only one SpanBatch. And compressed results should not be accessible until the channel is closed, since the prefix and payload can be changed.
// So it resets channel contents and rewrites the entire SpanBatch each time, and compressed results are copied to reader after the channel is closed.
// It makes we can only get frames once the channel is full or closed, in the case of SpanBatch.
func (co *ChannelOut) writeSpanBatch(batch *SingularBatch) (uint64, error) {
if co.FullErr() != nil {
// channel is already full
return 0, co.FullErr()
}
var buf bytes.Buffer
// Append Singular batch to its span batch builder
co.spanBatchBuilder.AppendSingularBatch(batch)
// Convert Span batch to RawSpanBatch
rawSpanBatch, err := co.spanBatchBuilder.GetRawSpanBatch()
if err != nil {
return 0, fmt.Errorf("failed to convert SpanBatch into RawSpanBatch: %w", err)
}
// Encode RawSpanBatch into bytes
if err = rlp.Encode(&buf, NewSpanBatchData(*rawSpanBatch)); err != nil {
return 0, fmt.Errorf("failed to encode RawSpanBatch into bytes: %w", err)
}
co.rlpLength = 0
// Ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
if co.rlpLength+buf.Len() > MaxRLPBytesPerChannel {
return 0, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
buf.Len(), co.rlpLength, MaxRLPBytesPerChannel, ErrTooManyRLPBytes)
}
co.rlpLength = buf.Len()
if co.spanBatchBuilder.GetBlockCount() > 1 {
// Flush compressed data into reader to preserve current result.
// If the channel is full after this block is appended, we should use preserved data.
if err := co.compress.Flush(); err != nil {
return 0, fmt.Errorf("failed to flush compressor: %w", err)
}
_, err = io.Copy(co.reader, co.compress)
if err != nil {
// Must reset reader to avoid partial output
co.reader.Reset()
return 0, fmt.Errorf("failed to copy compressed data to reader: %w", err)
}
}
// Reset compressor to rewrite the entire span batch
co.compress.Reset()
// Avoid using io.Copy here, because we need all or nothing
written, err := co.compress.Write(buf.Bytes())
if co.compress.FullErr() != nil {
err = co.compress.FullErr()
if co.spanBatchBuilder.GetBlockCount() == 1 {
// Do not return CompressorFullErr for the first block in the batch
// In this case, reader must be empty. then the contents of compressor will be copied to reader when the channel is closed.
err = nil
}
// If there are more than one blocks in the channel, reader should have data that preserves previous compression result before adding this block.
// So, as a result, this block is not added to the channel and the channel will be closed.
return uint64(written), err
}
// If compressor is not full yet, reader must be reset to avoid submitting invalid frames
co.reader.Reset()
return uint64(written), err
}
// InputBytes returns the total amount of RLP-encoded input bytes.
func (co *ChannelOut) InputBytes() int {
func (co *SingularChannelOut) InputBytes() int {
return co.rlpLength
}
// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
// Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes
// are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage.
func (co *ChannelOut) ReadyBytes() int {
return co.reader.Len()
func (co *SingularChannelOut) ReadyBytes() int {
return co.compress.Len()
}
// Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more
// complete frame. It reduces the compression efficiency.
func (co *ChannelOut) Flush() error {
if err := co.compress.Flush(); err != nil {
return err
}
if co.batchType == SpanBatchType && co.closed && co.ReadyBytes() == 0 && co.compress.Len() > 0 {
_, err := io.Copy(co.reader, co.compress)
if err != nil {
// Must reset reader to avoid partial output
co.reader.Reset()
return fmt.Errorf("failed to flush compressed data to reader: %w", err)
}
}
return nil
func (co *SingularChannelOut) Flush() error {
return co.compress.Flush()
}
func (co *ChannelOut) FullErr() error {
func (co *SingularChannelOut) FullErr() error {
return co.compress.FullErr()
}
func (co *ChannelOut) Close() error {
func (co *SingularChannelOut) Close() error {
if co.closed {
return errors.New("already closed")
}
co.closed = true
if co.batchType == SpanBatchType {
if err := co.Flush(); err != nil {
return err
}
}
return co.compress.Close()
}
......@@ -290,30 +197,15 @@ func (co *ChannelOut) Close() error {
// Returns io.EOF when the channel is closed & there are no more frames.
// Returns nil if there is still more buffered data.
// Returns an error if it ran into an error during processing.
func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) {
f := Frame{
ID: co.id,
FrameNumber: uint16(co.frame),
}
func (co *SingularChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) {
// Check that the maxSize is large enough for the frame overhead size.
if maxSize < FrameV0OverHeadSize {
return 0, ErrMaxFrameSizeTooSmall
}
// Copy data from the local buffer into the frame data buffer
maxDataSize := maxSize - FrameV0OverHeadSize
if maxDataSize > uint64(co.ReadyBytes()) {
maxDataSize = uint64(co.ReadyBytes())
// If we are closed & will not spill past the current frame
// mark it is the final frame of the channel.
if co.closed {
f.IsLast = true
}
}
f.Data = make([]byte, maxDataSize)
f := createEmptyFrame(co.id, co.frame, co.ReadyBytes(), co.closed, maxSize)
if _, err := io.ReadFull(co.reader, f.Data); err != nil {
if _, err := io.ReadFull(co.compress, f.Data); err != nil {
return 0, err
}
......@@ -418,3 +310,24 @@ func ForceCloseTxData(frames []Frame) ([]byte, error) {
return out.Bytes(), nil
}
// createEmptyFrame creates new empty Frame with given information. Frame data must be copied from ChannelOut.
func createEmptyFrame(id ChannelID, frame uint64, readyBytes int, closed bool, maxSize uint64) *Frame {
f := Frame{
ID: id,
FrameNumber: uint16(frame),
}
// Copy data from the local buffer into the frame data buffer
maxDataSize := maxSize - FrameV0OverHeadSize
if maxDataSize > uint64(readyBytes) {
maxDataSize = uint64(readyBytes)
// If we are closed & will not spill past the current frame
// mark it is the final frame of the channel.
if closed {
f.IsLast = true
}
}
f.Data = make([]byte, maxDataSize)
return &f
}
......@@ -29,7 +29,7 @@ func (s *nonCompressor) FullErr() error {
}
func TestChannelOutAddBlock(t *testing.T) {
cout, err := NewChannelOut(&nonCompressor{}, SingularBatchType, nil)
cout, err := NewChannelOut(SingularBatchType, &nonCompressor{}, nil)
require.NoError(t, err)
t.Run("returns err if first tx is not an l1info tx", func(t *testing.T) {
......@@ -50,7 +50,7 @@ func TestChannelOutAddBlock(t *testing.T) {
// max size that is below the fixed frame size overhead of 23, will return
// an error.
func TestOutputFrameSmallMaxSize(t *testing.T) {
cout, err := NewChannelOut(&nonCompressor{}, SingularBatchType, nil)
cout, err := NewChannelOut(SingularBatchType, &nonCompressor{}, nil)
require.NoError(t, err)
// Call OutputFrame with the range of small max size values that err
......
package derive
import (
"bytes"
"crypto/rand"
"errors"
"fmt"
"io"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
)
type SpanChannelOut struct {
id ChannelID
// Frame ID of the next frame to emit. Increment after emitting
frame uint64
// rlpLength is the uncompressed size of the channel. Must be less than MAX_RLP_BYTES_PER_CHANNEL
rlpLength int
// Compressor stage. Write input data to it
compress Compressor
// closed indicates if the channel is closed
closed bool
// spanBatchBuilder contains information requires to build SpanBatch
spanBatchBuilder *SpanBatchBuilder
// reader contains compressed data for making output frames
reader *bytes.Buffer
}
func (co *SpanChannelOut) ID() ChannelID {
return co.id
}
func NewSpanChannelOut(compress Compressor, spanBatchBuilder *SpanBatchBuilder) (*SpanChannelOut, error) {
c := &SpanChannelOut{
id: ChannelID{}, // TODO: use GUID here instead of fully random data
frame: 0,
rlpLength: 0,
compress: compress,
spanBatchBuilder: spanBatchBuilder,
reader: &bytes.Buffer{},
}
_, err := rand.Read(c.id[:])
if err != nil {
return nil, err
}
return c, nil
}
// TODO: reuse ChannelOut for performance
func (co *SpanChannelOut) Reset() error {
co.frame = 0
co.rlpLength = 0
co.compress.Reset()
co.reader.Reset()
co.closed = false
co.spanBatchBuilder.Reset()
_, err := rand.Read(co.id[:])
return err
}
// AddBlock adds a block to the channel. It returns the RLP encoded byte size
// and an error if there is a problem adding the block. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
func (co *SpanChannelOut) AddBlock(block *types.Block) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
}
batch, _, err := BlockToSingularBatch(block)
if err != nil {
return 0, err
}
return co.AddSingularBatch(batch)
}
// AddSingularBatch adds a batch to the channel. It returns the RLP encoded byte size
// and an error if there is a problem adding the batch. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
//
// AddSingularBatch should be used together with BlockToSingularBatch if you need to access the
// BatchData before adding a block to the channel. It isn't possible to access
// the batch data with AddBlock.
//
// SingularBatch is appended to the channel's SpanBatch.
// A channel can have only one SpanBatch. And compressed results should not be accessible until the channel is closed, since the prefix and payload can be changed.
// So it resets channel contents and rewrites the entire SpanBatch each time, and compressed results are copied to reader after the channel is closed.
// It makes we can only get frames once the channel is full or closed, in the case of SpanBatch.
func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
}
if co.FullErr() != nil {
// channel is already full
return 0, co.FullErr()
}
var buf bytes.Buffer
// Append Singular batch to its span batch builder
co.spanBatchBuilder.AppendSingularBatch(batch)
// Convert Span batch to RawSpanBatch
rawSpanBatch, err := co.spanBatchBuilder.GetRawSpanBatch()
if err != nil {
return 0, fmt.Errorf("failed to convert SpanBatch into RawSpanBatch: %w", err)
}
// Encode RawSpanBatch into bytes
if err = rlp.Encode(&buf, NewSpanBatchData(*rawSpanBatch)); err != nil {
return 0, fmt.Errorf("failed to encode RawSpanBatch into bytes: %w", err)
}
// Ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
if buf.Len() > MaxRLPBytesPerChannel {
return 0, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
buf.Len(), co.rlpLength, MaxRLPBytesPerChannel, ErrTooManyRLPBytes)
}
co.rlpLength = buf.Len()
if co.spanBatchBuilder.GetBlockCount() > 1 {
// Flush compressed data into reader to preserve current result.
// If the channel is full after this block is appended, we should use preserved data.
if err := co.compress.Flush(); err != nil {
return 0, fmt.Errorf("failed to flush compressor: %w", err)
}
_, err = io.Copy(co.reader, co.compress)
if err != nil {
// Must reset reader to avoid partial output
co.reader.Reset()
return 0, fmt.Errorf("failed to copy compressed data to reader: %w", err)
}
}
// Reset compressor to rewrite the entire span batch
co.compress.Reset()
// Avoid using io.Copy here, because we need all or nothing
written, err := co.compress.Write(buf.Bytes())
if co.compress.FullErr() != nil {
err = co.compress.FullErr()
if co.spanBatchBuilder.GetBlockCount() == 1 {
// Do not return CompressorFullErr for the first block in the batch
// In this case, reader must be empty. then the contents of compressor will be copied to reader when the channel is closed.
err = nil
}
// If there are more than one blocks in the channel, reader should have data that preserves previous compression result before adding this block.
// So, as a result, this block is not added to the channel and the channel will be closed.
return uint64(written), err
}
// If compressor is not full yet, reader must be reset to avoid submitting invalid frames
co.reader.Reset()
return uint64(written), err
}
// InputBytes returns the total amount of RLP-encoded input bytes.
func (co *SpanChannelOut) InputBytes() int {
return co.rlpLength
}
// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
// Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes
// are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage.
func (co *SpanChannelOut) ReadyBytes() int {
return co.reader.Len()
}
// Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more
// complete frame. It reduces the compression efficiency.
func (co *SpanChannelOut) Flush() error {
if err := co.compress.Flush(); err != nil {
return err
}
if co.closed && co.ReadyBytes() == 0 && co.compress.Len() > 0 {
_, err := io.Copy(co.reader, co.compress)
if err != nil {
// Must reset reader to avoid partial output
co.reader.Reset()
return fmt.Errorf("failed to flush compressed data to reader: %w", err)
}
}
return nil
}
func (co *SpanChannelOut) FullErr() error {
return co.compress.FullErr()
}
func (co *SpanChannelOut) Close() error {
if co.closed {
return errors.New("already closed")
}
co.closed = true
if err := co.Flush(); err != nil {
return err
}
return co.compress.Close()
}
// OutputFrame writes a frame to w with a given max size and returns the frame
// number.
// Use `ReadyBytes`, `Flush`, and `Close` to modify the ready buffer.
// Returns an error if the `maxSize` < FrameV0OverHeadSize.
// Returns io.EOF when the channel is closed & there are no more frames.
// Returns nil if there is still more buffered data.
// Returns an error if it ran into an error during processing.
func (co *SpanChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) {
// Check that the maxSize is large enough for the frame overhead size.
if maxSize < FrameV0OverHeadSize {
return 0, ErrMaxFrameSizeTooSmall
}
f := createEmptyFrame(co.id, co.frame, co.ReadyBytes(), co.closed, maxSize)
if _, err := io.ReadFull(co.reader, f.Data); err != nil {
return 0, err
}
if err := f.MarshalBinary(w); err != nil {
return 0, err
}
co.frame += 1
fn := f.FrameNumber
if f.IsLast {
return fn, io.EOF
} else {
return fn, nil
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment