Commit a3cc8f27 authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

op-batcher: Embed Zlib Compressor into Span Channel Out ; Compression Avoidance Strategy (#10002)

* Add iterative batch building benchmark

* Embed Compressor Logic directly to Span Channel Out

* PR Comments

* Tests

* fix error handling

* remove errant comment

* PR Comments
parent 99600241
...@@ -78,18 +78,20 @@ type ChannelBuilder struct { ...@@ -78,18 +78,20 @@ type ChannelBuilder struct {
// newChannelBuilder creates a new channel builder or returns an error if the // newChannelBuilder creates a new channel builder or returns an error if the
// channel out could not be created. // channel out could not be created.
// it acts as a factory for either a span or singular channel out
func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1OriginBlockNum uint64) (*ChannelBuilder, error) { func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1OriginBlockNum uint64) (*ChannelBuilder, error) {
c, err := cfg.CompressorConfig.NewCompressor() c, err := cfg.CompressorConfig.NewCompressor()
if err != nil { if err != nil {
return nil, err return nil, err
} }
var spanBatch *derive.SpanBatch var co derive.ChannelOut
if cfg.BatchType == derive.SpanBatchType { if cfg.BatchType == derive.SpanBatchType {
spanBatch = derive.NewSpanBatch(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID) co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize)
} else {
co, err = derive.NewSingularChannelOut(c)
} }
co, err := derive.NewChannelOut(cfg.BatchType, c, spanBatch)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("creating channel out: %w", err)
} }
cb := &ChannelBuilder{ cb := &ChannelBuilder{
...@@ -154,7 +156,7 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro ...@@ -154,7 +156,7 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro
return l1info, fmt.Errorf("converting block to batch: %w", err) return l1info, fmt.Errorf("converting block to batch: %w", err)
} }
if _, err = c.co.AddSingularBatch(batch, l1info.SequenceNumber); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.ErrCompressorFull) { if err = c.co.AddSingularBatch(batch, l1info.SequenceNumber); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.ErrCompressorFull) {
c.setFullErr(err) c.setFullErr(err)
return l1info, c.FullErr() return l1info, c.FullErr()
} else if err != nil { } else if err != nil {
......
...@@ -297,6 +297,7 @@ func TestChannelBuilderBatchType(t *testing.T) { ...@@ -297,6 +297,7 @@ func TestChannelBuilderBatchType(t *testing.T) {
{"ChannelBuilder_PendingFrames_TotalFrames", ChannelBuilder_PendingFrames_TotalFrames}, {"ChannelBuilder_PendingFrames_TotalFrames", ChannelBuilder_PendingFrames_TotalFrames},
{"ChannelBuilder_InputBytes", ChannelBuilder_InputBytes}, {"ChannelBuilder_InputBytes", ChannelBuilder_InputBytes},
{"ChannelBuilder_OutputBytes", ChannelBuilder_OutputBytes}, {"ChannelBuilder_OutputBytes", ChannelBuilder_OutputBytes},
{"ChannelBuilder_OutputWrongFramePanic", ChannelBuilder_OutputWrongFramePanic},
} }
for _, test := range tests { for _, test := range tests {
test := test test := test
...@@ -354,8 +355,9 @@ func TestChannelBuilder_NextFrame(t *testing.T) { ...@@ -354,8 +355,9 @@ func TestChannelBuilder_NextFrame(t *testing.T) {
} }
// TestChannelBuilder_OutputWrongFramePanic tests that a panic is thrown when a frame is pushed with an invalid frame id // TestChannelBuilder_OutputWrongFramePanic tests that a panic is thrown when a frame is pushed with an invalid frame id
func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) { func ChannelBuilder_OutputWrongFramePanic(t *testing.T, batchType uint) {
channelConfig := defaultTestChannelConfig() channelConfig := defaultTestChannelConfig()
channelConfig.BatchType = batchType
// Construct a channel builder // Construct a channel builder
cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin) cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin)
...@@ -363,9 +365,10 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) { ...@@ -363,9 +365,10 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) {
// Mock the internals of `ChannelBuilder.outputFrame` // Mock the internals of `ChannelBuilder.outputFrame`
// to construct a single frame // to construct a single frame
// the type of batch does not matter here because we are using it to construct a broken frame
c, err := channelConfig.CompressorConfig.NewCompressor() c, err := channelConfig.CompressorConfig.NewCompressor()
require.NoError(t, err) require.NoError(t, err)
co, err := derive.NewChannelOut(derive.SingularBatchType, c, nil) co, err := derive.NewSingularChannelOut(c)
require.NoError(t, err) require.NoError(t, err)
var buf bytes.Buffer var buf bytes.Buffer
fn, err := co.OutputFrame(&buf, channelConfig.MaxFrameSize) fn, err := co.OutputFrame(&buf, channelConfig.MaxFrameSize)
......
package compressor
import (
"bytes"
"compress/zlib"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
// BlindCompressor is a simple compressor that blindly compresses data
// the only way to know if the target size has been reached is to first flush the buffer
// and then check the length of the compressed data
type BlindCompressor struct {
config Config
inputBytes int
buf bytes.Buffer
compress *zlib.Writer
}
// NewBlindCompressor creates a new derive.Compressor implementation that compresses
func NewBlindCompressor(config Config) (derive.Compressor, error) {
c := &BlindCompressor{
config: config,
}
compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
if err != nil {
return nil, err
}
c.compress = compress
return c, nil
}
func (t *BlindCompressor) Write(p []byte) (int, error) {
if err := t.FullErr(); err != nil {
return 0, err
}
t.inputBytes += len(p)
return t.compress.Write(p)
}
func (t *BlindCompressor) Close() error {
return t.compress.Close()
}
func (t *BlindCompressor) Read(p []byte) (int, error) {
return t.buf.Read(p)
}
func (t *BlindCompressor) Reset() {
t.buf.Reset()
t.compress.Reset(&t.buf)
t.inputBytes = 0
}
func (t *BlindCompressor) Len() int {
return t.buf.Len()
}
func (t *BlindCompressor) Flush() error {
return t.compress.Flush()
}
// FullErr returns an error if the target output size has been reached.
// Flush *must* be called before this method to ensure the buffer is up to date
func (t *BlindCompressor) FullErr() error {
if uint64(t.Len()) >= t.config.TargetOutputSize {
return derive.ErrCompressorFull
}
return nil
}
package compressor_test
import (
"testing"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/stretchr/testify/require"
)
func TestBlindCompressorLimit(t *testing.T) {
bc, err := compressor.NewBlindCompressor(compressor.Config{
TargetOutputSize: 10,
})
require.NoError(t, err)
// write far too much data to the compressor, but never flush
for i := 0; i < 100; i++ {
_, err := bc.Write([]byte("hello"))
require.NoError(t, err)
require.NoError(t, bc.FullErr())
}
// finally flush the compressor and see that it is full
bc.Flush()
require.Error(t, bc.FullErr())
// write a little more data to the compressor and see that it is still full
_, err = bc.Write([]byte("hello"))
require.Error(t, err)
}
...@@ -10,7 +10,6 @@ const ( ...@@ -10,7 +10,6 @@ const (
RatioKind = "ratio" RatioKind = "ratio"
ShadowKind = "shadow" ShadowKind = "shadow"
NoneKind = "none" NoneKind = "none"
BlindKind = "blind"
// CloseOverheadZlib is the number of final bytes a [zlib.Writer] call writes // CloseOverheadZlib is the number of final bytes a [zlib.Writer] call writes
// to the output buffer. // to the output buffer.
...@@ -21,7 +20,6 @@ var Kinds = map[string]FactoryFunc{ ...@@ -21,7 +20,6 @@ var Kinds = map[string]FactoryFunc{
RatioKind: NewRatioCompressor, RatioKind: NewRatioCompressor,
ShadowKind: NewShadowCompressor, ShadowKind: NewShadowCompressor,
NoneKind: NewNonCompressor, NoneKind: NewNonCompressor,
BlindKind: NewBlindCompressor,
} }
var KindKeys []string var KindKeys []string
......
...@@ -54,7 +54,7 @@ type Writer interface { ...@@ -54,7 +54,7 @@ type Writer interface {
type ChannelOutIface interface { type ChannelOutIface interface {
ID() derive.ChannelID ID() derive.ChannelID
Reset() error Reset() error
AddBlock(rollupCfg *rollup.Config, block *types.Block) (uint64, error) AddBlock(rollupCfg *rollup.Config, block *types.Block) error
ReadyBytes() int ReadyBytes() int
Flush() error Flush() error
Close() error Close() error
...@@ -138,19 +138,19 @@ func (co *GarbageChannelOut) Reset() error { ...@@ -138,19 +138,19 @@ func (co *GarbageChannelOut) Reset() error {
// error that it returns is ErrTooManyRLPBytes. If this error // error that it returns is ErrTooManyRLPBytes. If this error
// is returned, the channel should be closed and a new one // is returned, the channel should be closed and a new one
// should be made. // should be made.
func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (uint64, error) { func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) error {
if co.closed { if co.closed {
return 0, errors.New("already closed") return errors.New("already closed")
} }
batch, err := blockToBatch(rollupCfg, block) batch, err := blockToBatch(rollupCfg, block)
if err != nil { if err != nil {
return 0, err return err
} }
// We encode to a temporary buffer to determine the encoded length to // We encode to a temporary buffer to determine the encoded length to
// ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL // ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
var buf bytes.Buffer var buf bytes.Buffer
if err := rlp.Encode(&buf, batch); err != nil { if err := rlp.Encode(&buf, batch); err != nil {
return 0, err return err
} }
if co.cfg.malformRLP { if co.cfg.malformRLP {
// Malform the RLP by incrementing the length prefix by 1. // Malform the RLP by incrementing the length prefix by 1.
...@@ -160,13 +160,13 @@ func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Blo ...@@ -160,13 +160,13 @@ func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Blo
buf.Write(bufBytes) buf.Write(bufBytes)
} }
if co.rlpLength+buf.Len() > derive.MaxRLPBytesPerChannel { if co.rlpLength+buf.Len() > derive.MaxRLPBytesPerChannel {
return 0, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
buf.Len(), co.rlpLength, derive.MaxRLPBytesPerChannel, derive.ErrTooManyRLPBytes) buf.Len(), co.rlpLength, derive.MaxRLPBytesPerChannel, derive.ErrTooManyRLPBytes)
} }
co.rlpLength += buf.Len() co.rlpLength += buf.Len()
written, err := io.Copy(co.compress, &buf) _, err = io.Copy(co.compress, &buf)
return uint64(written), err return err
} }
// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame. // ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
......
...@@ -189,29 +189,28 @@ func (s *L2Batcher) Buffer(t Testing) error { ...@@ -189,29 +189,28 @@ func (s *L2Batcher) Buffer(t Testing) error {
if s.l2BatcherCfg.GarbageCfg != nil { if s.l2BatcherCfg.GarbageCfg != nil {
ch, err = NewGarbageChannelOut(s.l2BatcherCfg.GarbageCfg) ch, err = NewGarbageChannelOut(s.l2BatcherCfg.GarbageCfg)
} else { } else {
c, e := compressor.NewBlindCompressor(compressor.Config{ target := batcher.MaxDataSize(1, s.l2BatcherCfg.MaxL1TxSize)
TargetOutputSize: batcher.MaxDataSize(1, s.l2BatcherCfg.MaxL1TxSize), c, e := compressor.NewShadowCompressor(compressor.Config{
TargetOutputSize: target,
}) })
require.NoError(t, e, "failed to create compressor") require.NoError(t, e, "failed to create compressor")
var batchType uint = derive.SingularBatchType
var spanBatch *derive.SpanBatch
if s.l2BatcherCfg.ForceSubmitSingularBatch && s.l2BatcherCfg.ForceSubmitSpanBatch { if s.l2BatcherCfg.ForceSubmitSingularBatch && s.l2BatcherCfg.ForceSubmitSpanBatch {
t.Fatalf("ForceSubmitSingularBatch and ForceSubmitSpanBatch cannot be set to true at the same time") t.Fatalf("ForceSubmitSingularBatch and ForceSubmitSpanBatch cannot be set to true at the same time")
} else if s.l2BatcherCfg.ForceSubmitSingularBatch { } else {
// use SingularBatchType // use span batch if we're forcing it or if we're at/beyond delta
} else if s.l2BatcherCfg.ForceSubmitSpanBatch || s.rollupCfg.IsDelta(block.Time()) { if s.l2BatcherCfg.ForceSubmitSpanBatch || s.rollupCfg.IsDelta(block.Time()) {
// If both ForceSubmitSingularBatch and ForceSubmitSpanbatch are false, use SpanBatch automatically if Delta HF is activated. ch, err = derive.NewSpanChannelOut(s.rollupCfg.Genesis.L2Time, s.rollupCfg.L2ChainID, target)
batchType = derive.SpanBatchType // use singular batches in all other cases
spanBatch = derive.NewSpanBatch(s.rollupCfg.Genesis.L2Time, s.rollupCfg.L2ChainID) } else {
ch, err = derive.NewSingularChannelOut(c)
}
} }
ch, err = derive.NewChannelOut(batchType, c, spanBatch)
} }
require.NoError(t, err, "failed to create channel") require.NoError(t, err, "failed to create channel")
s.l2ChannelOut = ch s.l2ChannelOut = ch
} }
if _, err := s.l2ChannelOut.AddBlock(s.rollupCfg, block); err != nil { // should always succeed if err := s.l2ChannelOut.AddBlock(s.rollupCfg, block); err != nil {
return err return err
} }
ref, err := s.engCl.L2BlockRefByHash(t.Ctx(), block.Hash()) ref, err := s.engCl.L2BlockRefByHash(t.Ctx(), block.Hash())
......
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
...@@ -27,12 +26,7 @@ import ( ...@@ -27,12 +26,7 @@ import (
) )
func newSpanChannelOut(t StatefulTesting, e e2eutils.SetupData) derive.ChannelOut { func newSpanChannelOut(t StatefulTesting, e e2eutils.SetupData) derive.ChannelOut {
c, err := compressor.NewBlindCompressor(compressor.Config{ channelOut, err := derive.NewSpanChannelOut(e.RollupCfg.Genesis.L2Time, e.RollupCfg.L2ChainID, 128_000)
TargetOutputSize: 128_000,
})
require.NoError(t, err)
spanBatch := derive.NewSpanBatch(e.RollupCfg.Genesis.L2Time, e.RollupCfg.L2ChainID)
channelOut, err := derive.NewChannelOut(derive.SpanBatchType, c, spanBatch)
require.NoError(t, err) require.NoError(t, err)
return channelOut return channelOut
} }
...@@ -249,7 +243,7 @@ func TestBackupUnsafe(gt *testing.T) { ...@@ -249,7 +243,7 @@ func TestBackupUnsafe(gt *testing.T) {
block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{}) block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{})
} }
// Add A1, B2, B3, B4, B5 into the channel // Add A1, B2, B3, B4, B5 into the channel
_, err = channelOut.AddBlock(sd.RollupCfg, block) err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err) require.NoError(t, err)
} }
...@@ -412,7 +406,7 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) { ...@@ -412,7 +406,7 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) {
block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{}) block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{})
} }
// Add A1, B2, B3, B4, B5 into the channel // Add A1, B2, B3, B4, B5 into the channel
_, err = channelOut.AddBlock(sd.RollupCfg, block) err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err) require.NoError(t, err)
} }
...@@ -551,7 +545,7 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) { ...@@ -551,7 +545,7 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) {
block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{}) block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{})
} }
// Add A1, B2, B3, B4, B5 into the channel // Add A1, B2, B3, B4, B5 into the channel
_, err = channelOut.AddBlock(sd.RollupCfg, block) err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err) require.NoError(t, err)
} }
...@@ -870,7 +864,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) { ...@@ -870,7 +864,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) {
block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{}) block = block.WithBody([]*types.Transaction{block.Transactions()[0], invalidTx}, []*types.Header{})
} }
// Add A1 ~ A12 into the channel // Add A1 ~ A12 into the channel
_, err = channelOut.AddBlock(sd.RollupCfg, block) err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err) require.NoError(t, err)
} }
...@@ -919,7 +913,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) { ...@@ -919,7 +913,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) {
block = block.WithBody([]*types.Transaction{block.Transactions()[0], tx}, []*types.Header{}) block = block.WithBody([]*types.Transaction{block.Transactions()[0], tx}, []*types.Header{})
} }
// Add B1, A2 ~ A12 into the channel // Add B1, A2 ~ A12 into the channel
_, err = channelOut.AddBlock(sd.RollupCfg, block) err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err) require.NoError(t, err)
} }
// Submit span batch(B1, A2, ... A12) // Submit span batch(B1, A2, ... A12)
......
...@@ -12,30 +12,37 @@ import ( ...@@ -12,30 +12,37 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var ( const (
// a really large target output size to ensure that the compressors are never full
targetOutput_huge = uint64(100_000_000_000)
// this target size was determiend by the devnet sepolia batcher's configuration
targetOuput_real = uint64(780120)
)
var (
// compressors used in the benchmark // compressors used in the benchmark
rc, _ = compressor.NewRatioCompressor(compressor.Config{ rc, _ = compressor.NewRatioCompressor(compressor.Config{
TargetOutputSize: 100_000_000_000, TargetOutputSize: targetOutput_huge,
ApproxComprRatio: 0.4, ApproxComprRatio: 0.4,
}) })
sc, _ = compressor.NewShadowCompressor(compressor.Config{ sc, _ = compressor.NewShadowCompressor(compressor.Config{
TargetOutputSize: 100_000_000_000, TargetOutputSize: targetOutput_huge,
}) })
nc, _ = compressor.NewNonCompressor(compressor.Config{ nc, _ = compressor.NewNonCompressor(compressor.Config{
TargetOutputSize: 100_000_000_000, TargetOutputSize: targetOutput_huge,
}) })
bc, _ = compressor.NewBlindCompressor(compressor.Config{ realsc, _ = compressor.NewShadowCompressor(compressor.Config{
TargetOutputSize: 100_000_000_000, TargetOutputSize: targetOuput_real,
}) })
compressors = map[string]derive.Compressor{ // compressors used in the benchmark mapped by their name
"BlindCompressor": bc, // they come paired with a target output size so span batches can use the target size directly
"NonCompressor": nc, compressors = map[string]compressorAndTarget{
"RatioCompressor": rc, "NonCompressor": {nc, targetOutput_huge},
"ShadowCompressor": sc, "RatioCompressor": {rc, targetOutput_huge},
"ShadowCompressor": {sc, targetOutput_huge},
"RealShadowCompressor": {realsc, targetOuput_real},
} }
// batch types used in the benchmark // batch types used in the benchmark
batchTypes = []uint{ batchTypes = []uint{
derive.SpanBatchType, derive.SpanBatchType,
...@@ -45,6 +52,23 @@ var ( ...@@ -45,6 +52,23 @@ var (
} }
) )
type compressorAndTarget struct {
compressor derive.Compressor
targetOutput uint64
}
// channelOutByType returns a channel out of the given type as a helper for the benchmarks
func channelOutByType(batchType uint, compKey string) (derive.ChannelOut, error) {
chainID := big.NewInt(333)
if batchType == derive.SingularBatchType {
return derive.NewSingularChannelOut(compressors[compKey].compressor)
}
if batchType == derive.SpanBatchType {
return derive.NewSpanChannelOut(0, chainID, compressors[compKey].targetOutput)
}
return nil, fmt.Errorf("unsupported batch type: %d", batchType)
}
// a test case for the benchmark controls the number of batches and transactions per batch, // a test case for the benchmark controls the number of batches and transactions per batch,
// as well as the batch type and compressor used // as well as the batch type and compressor used
type BatchingBenchmarkTC struct { type BatchingBenchmarkTC struct {
...@@ -110,24 +134,70 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) { ...@@ -110,24 +134,70 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
for bn := 0; bn < b.N; bn++ { for bn := 0; bn < b.N; bn++ {
// don't measure the setup time // don't measure the setup time
b.StopTimer() b.StopTimer()
compressors[tc.compKey].Reset() compressors[tc.compKey].compressor.Reset()
spanBatch := derive.NewSpanBatch(uint64(0), chainID) cout, _ := channelOutByType(tc.BatchType, tc.compKey)
cout, _ := derive.NewChannelOut(tc.BatchType, compressors[tc.compKey], spanBatch)
// add all but the final batch to the channel out // add all but the final batch to the channel out
for i := 0; i < tc.BatchCount-1; i++ { for i := 0; i < tc.BatchCount-1; i++ {
_, err := cout.AddSingularBatch(batches[i], 0) err := cout.AddSingularBatch(batches[i], 0)
require.NoError(b, err) require.NoError(b, err)
} }
// measure the time to add the final batch // measure the time to add the final batch
b.StartTimer() b.StartTimer()
// add the final batch to the channel out // add the final batch to the channel out
_, err := cout.AddSingularBatch(batches[tc.BatchCount-1], 0) err := cout.AddSingularBatch(batches[tc.BatchCount-1], 0)
require.NoError(b, err) require.NoError(b, err)
} }
}) })
} }
} }
// BenchmarkIncremental fills a channel out incrementally with batches
// each increment is counted as its own benchmark
// Hint: use -benchtime=1x to run the benchmarks for a single iteration
// it is not currently designed to use b.N
func BenchmarkIncremental(b *testing.B) {
chainID := big.NewInt(333)
rng := rand.New(rand.NewSource(0x543331))
// use the real compressor for this benchmark
// use batchCount as the number of batches to add in each benchmark iteration
// and use txPerBatch as the number of transactions per batch
tcs := []BatchingBenchmarkTC{
{derive.SpanBatchType, 5, 1, "RealBlindCompressor"},
//{derive.SingularBatchType, 100, 1, "RealShadowCompressor"},
}
for _, tc := range tcs {
cout, err := channelOutByType(tc.BatchType, tc.compKey)
if err != nil {
b.Fatal(err)
}
done := false
for base := 0; !done; base += tc.BatchCount {
rangeName := fmt.Sprintf("Incremental %s: %d-%d", tc.String(), base, base+tc.BatchCount)
b.Run(rangeName, func(b *testing.B) {
b.StopTimer()
// prepare the batches
t := time.Now()
batches := make([]*derive.SingularBatch, tc.BatchCount)
for i := 0; i < tc.BatchCount; i++ {
t := t.Add(time.Second)
batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID)
// set the timestamp to increase with each batch
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Unix())
}
b.StartTimer()
for i := 0; i < tc.BatchCount; i++ {
err := cout.AddSingularBatch(batches[i], 0)
if err != nil {
done = true
return
}
}
})
}
}
}
// BenchmarkAllBatchesChannelOut benchmarks the performance of adding singular batches to a channel out // BenchmarkAllBatchesChannelOut benchmarks the performance of adding singular batches to a channel out
// this exercises the compression and batching logic, as well as any batch-building logic // this exercises the compression and batching logic, as well as any batch-building logic
// Every Compressor in the compressor map is benchmarked for each test case // Every Compressor in the compressor map is benchmarked for each test case
...@@ -173,13 +243,12 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) { ...@@ -173,13 +243,12 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
for bn := 0; bn < b.N; bn++ { for bn := 0; bn < b.N; bn++ {
// don't measure the setup time // don't measure the setup time
b.StopTimer() b.StopTimer()
compressors[tc.compKey].Reset() compressors[tc.compKey].compressor.Reset()
spanBatch := derive.NewSpanBatch(0, chainID) cout, _ := channelOutByType(tc.BatchType, tc.compKey)
cout, _ := derive.NewChannelOut(tc.BatchType, compressors[tc.compKey], spanBatch)
b.StartTimer() b.StartTimer()
// add all batches to the channel out // add all batches to the channel out
for i := 0; i < tc.BatchCount; i++ { for i := 0; i < tc.BatchCount; i++ {
_, err := cout.AddSingularBatch(batches[i], 0) err := cout.AddSingularBatch(batches[i], 0)
require.NoError(b, err) require.NoError(b, err)
} }
} }
......
...@@ -53,8 +53,8 @@ type Compressor interface { ...@@ -53,8 +53,8 @@ type Compressor interface {
type ChannelOut interface { type ChannelOut interface {
ID() ChannelID ID() ChannelID
Reset() error Reset() error
AddBlock(*rollup.Config, *types.Block) (uint64, error) AddBlock(*rollup.Config, *types.Block) error
AddSingularBatch(*SingularBatch, uint64) (uint64, error) AddSingularBatch(*SingularBatch, uint64) error
InputBytes() int InputBytes() int
ReadyBytes() int ReadyBytes() int
Flush() error Flush() error
...@@ -63,17 +63,6 @@ type ChannelOut interface { ...@@ -63,17 +63,6 @@ type ChannelOut interface {
OutputFrame(*bytes.Buffer, uint64) (uint16, error) OutputFrame(*bytes.Buffer, uint64) (uint16, error)
} }
func NewChannelOut(batchType uint, compress Compressor, spanBatch *SpanBatch) (ChannelOut, error) {
switch batchType {
case SingularBatchType:
return NewSingularChannelOut(compress)
case SpanBatchType:
return NewSpanChannelOut(compress, spanBatch)
default:
return nil, fmt.Errorf("unrecognized batch type: %d", batchType)
}
}
type SingularChannelOut struct { type SingularChannelOut struct {
id ChannelID id ChannelID
// Frame ID of the next frame to emit. Increment after emitting // Frame ID of the next frame to emit. Increment after emitting
...@@ -119,14 +108,14 @@ func (co *SingularChannelOut) Reset() error { ...@@ -119,14 +108,14 @@ func (co *SingularChannelOut) Reset() error {
// and an error if there is a problem adding the block. The only sentinel error // and an error if there is a problem adding the block. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel // that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made. // should be closed and a new one should be made.
func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (uint64, error) { func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) error {
if co.closed { if co.closed {
return 0, ErrChannelOutAlreadyClosed return ErrChannelOutAlreadyClosed
} }
batch, l1Info, err := BlockToSingularBatch(rollupCfg, block) batch, l1Info, err := BlockToSingularBatch(rollupCfg, block)
if err != nil { if err != nil {
return 0, err return err
} }
return co.AddSingularBatch(batch, l1Info.SequenceNumber) return co.AddSingularBatch(batch, l1Info.SequenceNumber)
} }
...@@ -139,26 +128,26 @@ func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Bl ...@@ -139,26 +128,26 @@ func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Bl
// AddSingularBatch should be used together with BlockToBatch if you need to access the // AddSingularBatch should be used together with BlockToBatch if you need to access the
// BatchData before adding a block to the channel. It isn't possible to access // BatchData before adding a block to the channel. It isn't possible to access
// the batch data with AddBlock. // the batch data with AddBlock.
func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch, _ uint64) (uint64, error) { func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch, _ uint64) error {
if co.closed { if co.closed {
return 0, ErrChannelOutAlreadyClosed return ErrChannelOutAlreadyClosed
} }
// We encode to a temporary buffer to determine the encoded length to // We encode to a temporary buffer to determine the encoded length to
// ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL // ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
var buf bytes.Buffer var buf bytes.Buffer
if err := rlp.Encode(&buf, NewBatchData(batch)); err != nil { if err := rlp.Encode(&buf, NewBatchData(batch)); err != nil {
return 0, err return err
} }
if co.rlpLength+buf.Len() > MaxRLPBytesPerChannel { if co.rlpLength+buf.Len() > MaxRLPBytesPerChannel {
return 0, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
buf.Len(), co.rlpLength, MaxRLPBytesPerChannel, ErrTooManyRLPBytes) buf.Len(), co.rlpLength, MaxRLPBytesPerChannel, ErrTooManyRLPBytes)
} }
co.rlpLength += buf.Len() co.rlpLength += buf.Len()
// avoid using io.Copy here, because we need all or nothing // avoid using io.Copy here, because we need all or nothing
written, err := co.compress.Write(buf.Bytes()) _, err := co.compress.Write(buf.Bytes())
return uint64(written), err return err
} }
// InputBytes returns the total amount of RLP-encoded input bytes. // InputBytes returns the total amount of RLP-encoded input bytes.
......
...@@ -36,59 +36,94 @@ func (s *nonCompressor) FullErr() error { ...@@ -36,59 +36,94 @@ func (s *nonCompressor) FullErr() error {
return nil return nil
} }
func TestChannelOutAddBlock(t *testing.T) { // channelTypes allows tests to run against different channel types
cout, err := NewChannelOut(SingularBatchType, &nonCompressor{}, nil) var channelTypes = []struct {
require.NoError(t, err) ChannelOut func(t *testing.T) ChannelOut
Name string
}{
{
Name: "Singular",
ChannelOut: func(t *testing.T) ChannelOut {
cout, err := NewSingularChannelOut(&nonCompressor{})
require.NoError(t, err)
return cout
},
},
{
Name: "Span",
ChannelOut: func(t *testing.T) ChannelOut {
cout, err := NewSpanChannelOut(0, big.NewInt(0), 128_000)
require.NoError(t, err)
return cout
},
},
}
t.Run("returns err if first tx is not an l1info tx", func(t *testing.T) { func TestChannelOutAddBlock(t *testing.T) {
header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} for _, tcase := range channelTypes {
block := types.NewBlockWithHeader(header).WithBody( t.Run(tcase.Name, func(t *testing.T) {
[]*types.Transaction{ cout := tcase.ChannelOut(t)
types.NewTx(&types.DynamicFeeTx{}), header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)}
}, block := types.NewBlockWithHeader(header).WithBody(
nil, []*types.Transaction{
) types.NewTx(&types.DynamicFeeTx{}),
_, err := cout.AddBlock(&rollupCfg, block) },
require.Error(t, err) nil,
require.Equal(t, ErrNotDepositTx, err) )
}) err := cout.AddBlock(&rollupCfg, block)
require.Error(t, err)
require.Equal(t, ErrNotDepositTx, err)
})
}
} }
// TestOutputFrameSmallMaxSize tests that calling [OutputFrame] with a small // TestOutputFrameSmallMaxSize tests that calling [OutputFrame] with a small
// max size that is below the fixed frame size overhead of 23, will return // max size that is below the fixed frame size overhead of FrameV0OverHeadSize (23),
// an error. // will return an error.
func TestOutputFrameSmallMaxSize(t *testing.T) { func TestOutputFrameSmallMaxSize(t *testing.T) {
cout, err := NewChannelOut(SingularBatchType, &nonCompressor{}, nil) for _, tcase := range channelTypes {
require.NoError(t, err) t.Run(tcase.Name, func(t *testing.T) {
cout := tcase.ChannelOut(t)
// Call OutputFrame with the range of small max size values that err // Call OutputFrame with the range of small max size values that err
var w bytes.Buffer var w bytes.Buffer
for i := 0; i < 23; i++ { for i := 0; i < FrameV0OverHeadSize; i++ {
fid, err := cout.OutputFrame(&w, uint64(i)) fid, err := cout.OutputFrame(&w, uint64(i))
require.ErrorIs(t, err, ErrMaxFrameSizeTooSmall) require.ErrorIs(t, err, ErrMaxFrameSizeTooSmall)
require.Zero(t, fid) require.Zero(t, fid)
}
})
} }
} }
func TestOutputFrameNoEmptyLastFrame(t *testing.T) { func TestOutputFrameNoEmptyLastFrame(t *testing.T) {
cout, err := NewChannelOut(SingularBatchType, &nonCompressor{}, nil) for _, tcase := range channelTypes {
require.NoError(t, err) t.Run(tcase.Name, func(t *testing.T) {
cout := tcase.ChannelOut(t)
rng := rand.New(rand.NewSource(0x543331)) rng := rand.New(rand.NewSource(0x543331))
chainID := big.NewInt(rng.Int63n(1000)) chainID := big.NewInt(0)
txCount := 1 txCount := 1
singularBatch := RandomSingularBatch(rng, txCount, chainID) singularBatch := RandomSingularBatch(rng, txCount, chainID)
written, err := cout.AddSingularBatch(singularBatch, 0) err := cout.AddSingularBatch(singularBatch, 0)
require.NoError(t, err) var written uint64
require.NoError(t, err)
require.NoError(t, cout.Close()) require.NoError(t, cout.Close())
var buf bytes.Buffer // depending on the channel type, determine the size of the written data
// Output a frame which needs exactly `written` bytes. This frame is expected to be the last frame. if span, ok := cout.(*SpanChannelOut); ok {
_, err = cout.OutputFrame(&buf, written+FrameV0OverHeadSize) written = uint64(span.compressed.Len())
require.ErrorIs(t, err, io.EOF) } else if singular, ok := cout.(*SingularChannelOut); ok {
written = uint64(singular.compress.Len())
}
var buf bytes.Buffer
// Output a frame which needs exactly `written` bytes. This frame is expected to be the last frame.
_, err = cout.OutputFrame(&buf, written+FrameV0OverHeadSize)
require.ErrorIs(t, err, io.EOF)
})
}
} }
// TestRLPByteLimit ensures that stream encoder is properly limiting the length. // TestRLPByteLimit ensures that stream encoder is properly limiting the length.
...@@ -184,3 +219,82 @@ func TestBlockToBatchValidity(t *testing.T) { ...@@ -184,3 +219,82 @@ func TestBlockToBatchValidity(t *testing.T) {
_, _, err := BlockToSingularBatch(&rollupCfg, block) _, _, err := BlockToSingularBatch(&rollupCfg, block)
require.ErrorContains(t, err, "has no transactions") require.ErrorContains(t, err, "has no transactions")
} }
func SpanChannelAndBatches(t *testing.T, target uint64, len int) (*SpanChannelOut, []*SingularBatch) {
// target is larger than one batch, but smaller than two batches
rng := rand.New(rand.NewSource(0x543331))
chainID := big.NewInt(rng.Int63n(1000))
txCount := 1
cout, err := NewSpanChannelOut(0, chainID, target)
require.NoError(t, err)
batches := make([]*SingularBatch, len)
// adding the first batch should not cause an error
for i := 0; i < len; i++ {
singularBatch := RandomSingularBatch(rng, txCount, chainID)
batches[i] = singularBatch
}
return cout, batches
}
// TestSpanChannelOutCompressionOnlyOneBatch tests that the SpanChannelOut compression works as expected when there is only one batch
// and it is larger than the target size. The single batch should be compressed, and the channel should now be full
func TestSpanChannelOutCompressionOnlyOneBatch(t *testing.T) {
cout, singularBatches := SpanChannelAndBatches(t, 300, 2)
err := cout.AddSingularBatch(singularBatches[0], 0)
// confirm compression was not skipped
require.Greater(t, cout.compressed.Len(), 0)
require.NoError(t, err)
// confirm the channel is full
require.ErrorIs(t, cout.FullErr(), ErrCompressorFull)
// confirm adding another batch would cause the same full error
err = cout.AddSingularBatch(singularBatches[1], 0)
require.ErrorIs(t, err, ErrCompressorFull)
}
// TestSpanChannelOutCompressionUndo tests that the SpanChannelOut compression rejects a batch that would cause the channel to be overfull
func TestSpanChannelOutCompressionUndo(t *testing.T) {
// target is larger than one batch, but smaller than two batches
cout, singularBatches := SpanChannelAndBatches(t, 750, 2)
err := cout.AddSingularBatch(singularBatches[0], 0)
require.NoError(t, err)
// confirm that the first compression was skipped
require.Equal(t, 0, cout.compressed.Len())
// record the RLP length to confirm it doesn't change when adding a rejected batch
rlp1 := cout.activeRLP().Len()
err = cout.AddSingularBatch(singularBatches[1], 0)
require.ErrorIs(t, err, ErrCompressorFull)
// confirm that the second compression was not skipped
require.Greater(t, cout.compressed.Len(), 0)
// confirm that the second rlp is tht same size as the first (because the second batch was not added)
require.Equal(t, rlp1, cout.activeRLP().Len())
}
// TestSpanChannelOutClose tests that the SpanChannelOut compression works as expected when the channel is closed.
// it should compress the batch even if it is smaller than the target size because the channel is closing
func TestSpanChannelOutClose(t *testing.T) {
target := uint64(600)
cout, singularBatches := SpanChannelAndBatches(t, target, 1)
err := cout.AddSingularBatch(singularBatches[0], 0)
require.NoError(t, err)
// confirm no compression has happened yet
require.Equal(t, 0, cout.compressed.Len())
// confirm the RLP length is less than the target
rlpLen := cout.activeRLP().Len()
require.Less(t, uint64(rlpLen), target)
// close the channel
require.NoError(t, cout.Close())
// confirm that the only batch was compressed, and that the RLP did not change
require.Greater(t, cout.compressed.Len(), 0)
require.Equal(t, rlpLen, cout.activeRLP().Len())
}
...@@ -2,9 +2,11 @@ package derive ...@@ -2,9 +2,11 @@ package derive
import ( import (
"bytes" "bytes"
"compress/zlib"
"crypto/rand" "crypto/rand"
"fmt" "fmt"
"io" "io"
"math/big"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
...@@ -16,176 +18,220 @@ type SpanChannelOut struct { ...@@ -16,176 +18,220 @@ type SpanChannelOut struct {
id ChannelID id ChannelID
// Frame ID of the next frame to emit. Increment after emitting // Frame ID of the next frame to emit. Increment after emitting
frame uint64 frame uint64
// rlpLength is the uncompressed size of the channel. Must be less than MAX_RLP_BYTES_PER_CHANNEL // rlp is the encoded, uncompressed data of the channel. length must be less than MAX_RLP_BYTES_PER_CHANNEL
rlpLength int // it is a double buffer to allow us to "undo" the last change to the RLP structure when the target size is exceeded
rlp [2]*bytes.Buffer
// Compressor stage. Write input data to it // rlpIndex is the index of the current rlp buffer
compress Compressor rlpIndex int
// lastCompressedRLPSize tracks the *uncompressed* size of the last RLP buffer that was compressed
// it is used to measure the growth of the RLP buffer when adding a new batch to optimize compression
lastCompressedRLPSize int
// compressed contains compressed data for making output frames
compressed *bytes.Buffer
// compress is the zlib writer for the channel
compressor *zlib.Writer
// target is the target size of the compressed data
target uint64
// closed indicates if the channel is closed // closed indicates if the channel is closed
closed bool closed bool
// spanBatch is the batch being built // full indicates if the channel is full
full error
// spanBatch is the batch being built, which immutably holds genesis timestamp and chain ID, but otherwise can be reset
spanBatch *SpanBatch spanBatch *SpanBatch
// reader contains compressed data for making output frames
reader *bytes.Buffer
} }
func (co *SpanChannelOut) ID() ChannelID { func (co *SpanChannelOut) ID() ChannelID {
return co.id return co.id
} }
func NewSpanChannelOut(compress Compressor, spanBatch *SpanBatch) (*SpanChannelOut, error) { func (co *SpanChannelOut) setRandomID() error {
_, err := rand.Read(co.id[:])
return err
}
func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64) (*SpanChannelOut, error) {
c := &SpanChannelOut{ c := &SpanChannelOut{
id: ChannelID{}, id: ChannelID{},
frame: 0, frame: 0,
rlpLength: 0, spanBatch: NewSpanBatch(genesisTimestamp, chainID),
compress: compress, rlp: [2]*bytes.Buffer{{}, {}},
spanBatch: spanBatch, compressed: &bytes.Buffer{},
reader: &bytes.Buffer{}, target: targetOutputSize,
} }
_, err := rand.Read(c.id[:]) var err error
if err != nil { if err = c.setRandomID(); err != nil {
return nil, err
}
if c.compressor, err = zlib.NewWriterLevel(c.compressed, zlib.BestCompression); err != nil {
return nil, err return nil, err
} }
return c, nil return c, nil
} }
func (co *SpanChannelOut) Reset() error { func (co *SpanChannelOut) Reset() error {
co.frame = 0
co.rlpLength = 0
co.compress.Reset()
co.reader.Reset()
co.closed = false co.closed = false
co.full = nil
co.frame = 0
co.rlp[0].Reset()
co.rlp[1].Reset()
co.lastCompressedRLPSize = 0
co.compressed.Reset()
co.compressor.Reset(co.compressed)
co.spanBatch = NewSpanBatch(co.spanBatch.GenesisTimestamp, co.spanBatch.ChainID) co.spanBatch = NewSpanBatch(co.spanBatch.GenesisTimestamp, co.spanBatch.ChainID)
_, err := rand.Read(co.id[:]) // setting the new randomID is the only part of the reset that can fail
return err return co.setRandomID()
} }
// AddBlock adds a block to the channel. It returns the RLP encoded byte size // activeRLP returns the active RLP buffer using the current rlpIndex
// and an error if there is a problem adding the block. The only sentinel error func (co *SpanChannelOut) activeRLP() *bytes.Buffer {
return co.rlp[co.rlpIndex]
}
// inactiveRLP returns the inactive RLP buffer using the current rlpIndex
func (co *SpanChannelOut) inactiveRLP() *bytes.Buffer {
return co.rlp[(co.rlpIndex+1)%2]
}
// swapRLP switches the active and inactive RLP buffers by modifying the rlpIndex
func (co *SpanChannelOut) swapRLP() {
co.rlpIndex = (co.rlpIndex + 1) % 2
}
// AddBlock adds a block to the channel.
// returns an error if there is a problem adding the block. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel // that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made. // should be closed and a new one should be made.
func (co *SpanChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (uint64, error) { func (co *SpanChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) error {
if co.closed { if co.closed {
return 0, ErrChannelOutAlreadyClosed return ErrChannelOutAlreadyClosed
} }
batch, l1Info, err := BlockToSingularBatch(rollupCfg, block) batch, l1Info, err := BlockToSingularBatch(rollupCfg, block)
if err != nil { if err != nil {
return 0, err return err
} }
return co.AddSingularBatch(batch, l1Info.SequenceNumber) return co.AddSingularBatch(batch, l1Info.SequenceNumber)
} }
// AddSingularBatch adds a batch to the channel. It returns the RLP encoded byte size // AddSingularBatch adds a SingularBatch to the channel, compressing the data if necessary.
// and an error if there is a problem adding the batch. The only sentinel error // if the new batch would make the channel exceed the target size, the last batch is reverted,
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel // and the compression happens on the previous RLP buffer instead
// should be closed and a new one should be made. // if the input is too small to need compression, data is accumulated but not compressed
// func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) error {
// AddSingularBatch should be used together with BlockToSingularBatch if you need to access the // sentinel error for closed or full channel
// BatchData before adding a block to the channel. It isn't possible to access
// the batch data with AddBlock.
//
// SingularBatch is appended to the channel's SpanBatch.
// A channel can have only one SpanBatch. And compressed results should not be accessible until the channel is closed, since the prefix and payload can be changed.
// So it resets channel contents and rewrites the entire SpanBatch each time, and compressed results are copied to reader after the channel is closed.
// It makes we can only get frames once the channel is full or closed, in the case of SpanBatch.
func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) (uint64, error) {
if co.closed { if co.closed {
return 0, ErrChannelOutAlreadyClosed return ErrChannelOutAlreadyClosed
} }
if co.FullErr() != nil { if err := co.FullErr(); err != nil {
// channel is already full return err
return 0, co.FullErr()
} }
var buf bytes.Buffer
// Append Singular batch to its span batch builder // update the SpanBatch with the SingularBatch
if err := co.spanBatch.AppendSingularBatch(batch, seqNum); err != nil { if err := co.spanBatch.AppendSingularBatch(batch, seqNum); err != nil {
return 0, fmt.Errorf("failed to append SingularBatch to SpanBatch: %w", err) return fmt.Errorf("failed to append SingularBatch to SpanBatch: %w", err)
} }
// Convert Span batch to RawSpanBatch // convert Span batch to RawSpanBatch
rawSpanBatch, err := co.spanBatch.ToRawSpanBatch() rawSpanBatch, err := co.spanBatch.ToRawSpanBatch()
if err != nil { if err != nil {
return 0, fmt.Errorf("failed to convert SpanBatch into RawSpanBatch: %w", err) return fmt.Errorf("failed to convert SpanBatch into RawSpanBatch: %w", err)
} }
// Encode RawSpanBatch into bytes
if err = rlp.Encode(&buf, NewBatchData(rawSpanBatch)); err != nil { // switch to the other buffer and reset it for new use
return 0, fmt.Errorf("failed to encode RawSpanBatch into bytes: %w", err) // (the RLP buffer which is being made inactive holds the RLP encoded span batch just before the new batch was added)
} co.swapRLP()
// Ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL co.activeRLP().Reset()
if buf.Len() > MaxRLPBytesPerChannel { if err = rlp.Encode(co.activeRLP(), NewBatchData(rawSpanBatch)); err != nil {
return 0, fmt.Errorf("could not take %d bytes as replacement of channel of %d bytes, max is %d. err: %w", return fmt.Errorf("failed to encode RawSpanBatch into bytes: %w", err)
buf.Len(), co.rlpLength, MaxRLPBytesPerChannel, ErrTooManyRLPBytes) }
}
co.rlpLength = buf.Len() // check the RLP length against the max
if co.activeRLP().Len() > MaxRLPBytesPerChannel {
// If the channel is full after this block is appended, we should use preserved data. return fmt.Errorf("could not take %d bytes as replacement of channel of %d bytes, max is %d. err: %w",
// so copy the compressed data to reader co.activeRLP().Len(), co.inactiveRLP().Len(), MaxRLPBytesPerChannel, ErrTooManyRLPBytes)
if len(co.spanBatch.Batches) > 1 { }
_, err = io.Copy(co.reader, co.compress)
if err != nil { // if the compressed data *plus* the new rlp data is under the target size, return early
// Must reset reader to avoid partial output // this optimizes out cases where the compressor will obviously come in under the target size
co.reader.Reset() rlpGrowth := co.activeRLP().Len() - co.lastCompressedRLPSize
return 0, fmt.Errorf("failed to copy compressed data to reader: %w", err) if uint64(co.compressed.Len()+rlpGrowth) < co.target {
} return nil
} }
// Reset compressor to rewrite the entire span batch // we must compress the data to check if we've met or exceeded the target size
co.compress.Reset() if err = co.compress(); err != nil {
// Avoid using io.Copy here, because we need all or nothing return err
written, err := co.compress.Write(buf.Bytes())
// Always flush (for BlindCompressor to check if it's full)
if err := co.compress.Flush(); err != nil {
return 0, fmt.Errorf("failed to flush compressor: %w", err)
} }
if co.compress.FullErr() != nil { co.lastCompressedRLPSize = co.activeRLP().Len()
err = co.compress.FullErr()
// if the channel is now full, either return the compressed data, or the compressed previous data
if err := co.FullErr(); err != nil {
// if there is only one batch in the channel, it *must* be returned
if len(co.spanBatch.Batches) == 1 { if len(co.spanBatch.Batches) == 1 {
// Do not return ErrCompressorFull for the first block in the batch return nil
// In this case, reader must be empty. then the contents of compressor will be copied to reader when the channel is closed. }
err = nil
// if there is more than one batch in the channel, we revert the last batch
// by switching the RLP buffer and doing a fresh compression
co.swapRLP()
if err := co.compress(); err != nil {
return err
} }
// If there are more than one blocks in the channel, reader should have data that preserves previous compression result before adding this block. // return the full error
// So, as a result, this block is not added to the channel and the channel will be closed. return err
return uint64(written), err
} }
// If compressor is not full yet, reader must be reset to avoid submitting invalid frames return nil
co.reader.Reset() }
return uint64(written), err
// compress compresses the active RLP buffer and checks if the compressed data is over the target size.
// it resets all the compression buffers because Span Batches aren't meant to be compressed incrementally.
func (co *SpanChannelOut) compress() error {
co.compressed.Reset()
co.compressor.Reset(co.compressed)
if _, err := co.compressor.Write(co.activeRLP().Bytes()); err != nil {
return err
}
if err := co.compressor.Close(); err != nil {
return err
}
co.checkFull()
return nil
} }
// InputBytes returns the total amount of RLP-encoded input bytes. // InputBytes returns the total amount of RLP-encoded input bytes.
func (co *SpanChannelOut) InputBytes() int { func (co *SpanChannelOut) InputBytes() int {
return co.rlpLength return co.activeRLP().Len()
} }
// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame. // ReadyBytes returns the total amount of compressed bytes that are ready to be output.
// Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes // Span Channel Out does not provide early output, so this will always be 0 until the channel is closed or full
// are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage.
func (co *SpanChannelOut) ReadyBytes() int { func (co *SpanChannelOut) ReadyBytes() int {
return co.reader.Len() if co.closed || co.FullErr() != nil {
return co.compressed.Len()
}
return 0
} }
// Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more // Flush implements the Channel Out
// complete frame. It reduces the compression efficiency. // Span Channel Out manages the flushing of the compressor internally, so this is a no-op
func (co *SpanChannelOut) Flush() error { func (co *SpanChannelOut) Flush() error {
if err := co.compress.Flush(); err != nil { return nil
return err }
// checkFull sets the full error if the compressed data is over the target size.
// the error is only set once, and the channel is considered full from that point on
func (co *SpanChannelOut) checkFull() {
// if the channel is already full, don't update further
if co.full != nil {
return
} }
if co.closed && co.ReadyBytes() == 0 && co.compress.Len() > 0 { if uint64(co.compressed.Len()) >= co.target {
_, err := io.Copy(co.reader, co.compress) co.full = ErrCompressorFull
if err != nil {
// Must reset reader to avoid partial output
co.reader.Reset()
return fmt.Errorf("failed to flush compressed data to reader: %w", err)
}
} }
return nil
} }
func (co *SpanChannelOut) FullErr() error { func (co *SpanChannelOut) FullErr() error {
return co.compress.FullErr() return co.full
} }
func (co *SpanChannelOut) Close() error { func (co *SpanChannelOut) Close() error {
...@@ -193,10 +239,14 @@ func (co *SpanChannelOut) Close() error { ...@@ -193,10 +239,14 @@ func (co *SpanChannelOut) Close() error {
return ErrChannelOutAlreadyClosed return ErrChannelOutAlreadyClosed
} }
co.closed = true co.closed = true
if err := co.Flush(); err != nil { // if the channel was already full,
return err // the compressor is already flushed and closed
if co.FullErr() != nil {
return nil
} }
return co.compress.Close() // if this channel is not full, we need to compress the last batch
// this also flushes/closes the compressor
return co.compress()
} }
// OutputFrame writes a frame to w with a given max size and returns the frame // OutputFrame writes a frame to w with a given max size and returns the frame
...@@ -214,7 +264,7 @@ func (co *SpanChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, ...@@ -214,7 +264,7 @@ func (co *SpanChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16,
f := createEmptyFrame(co.id, co.frame, co.ReadyBytes(), co.closed, maxSize) f := createEmptyFrame(co.id, co.frame, co.ReadyBytes(), co.closed, maxSize)
if _, err := io.ReadFull(co.reader, f.Data); err != nil { if _, err := io.ReadFull(co.compressed, f.Data); err != nil {
return 0, err return 0, err
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment