Commit 710d8ba4 authored by Michael de Hoog's avatar Michael de Hoog

Create an abstraction for channel compression

parent c1b6240b
...@@ -13,7 +13,6 @@ import ( ...@@ -13,7 +13,6 @@ import (
var ( var (
ErrInvalidChannelTimeout = errors.New("channel timeout is less than the safety margin") ErrInvalidChannelTimeout = errors.New("channel timeout is less than the safety margin")
ErrInputTargetReached = errors.New("target amount of input data reached")
ErrMaxFrameIndex = errors.New("max frame index reached (uint16)") ErrMaxFrameIndex = errors.New("max frame index reached (uint16)")
ErrMaxDurationReached = errors.New("max channel duration reached") ErrMaxDurationReached = errors.New("max channel duration reached")
ErrChannelTimeoutClose = errors.New("close to channel timeout") ErrChannelTimeoutClose = errors.New("close to channel timeout")
...@@ -55,19 +54,8 @@ type ChannelConfig struct { ...@@ -55,19 +54,8 @@ type ChannelConfig struct {
SubSafetyMargin uint64 SubSafetyMargin uint64
// The maximum byte-size a frame can have. // The maximum byte-size a frame can have.
MaxFrameSize uint64 MaxFrameSize uint64
// The target number of frames to create per channel. Note that if the // Compressor to use to compress frame data.
// realized compression ratio is worse than the approximate, more frames may Compressor derive.Compressor
// actually be created. This also depends on how close TargetFrameSize is to
// MaxFrameSize.
TargetFrameSize uint64
// The target number of frames to create in this channel. If the realized
// compression ratio is worse than approxComprRatio, additional leftover
// frame(s) might get created.
TargetNumFrames int
// Approximated compression ratio to assume. Should be slightly smaller than
// average from experiments to avoid the chances of creating a small
// additional leftover frame.
ApproxComprRatio float64
} }
// Check validates the [ChannelConfig] parameters. // Check validates the [ChannelConfig] parameters.
...@@ -93,13 +81,12 @@ func (cc *ChannelConfig) Check() error { ...@@ -93,13 +81,12 @@ func (cc *ChannelConfig) Check() error {
return fmt.Errorf("max frame size %d is less than the minimum 23", cc.MaxFrameSize) return fmt.Errorf("max frame size %d is less than the minimum 23", cc.MaxFrameSize)
} }
return nil // Compressor must be set
} if cc.Compressor == nil {
return errors.New("compressor cannot be nil")
}
// InputThreshold calculates the input data threshold in bytes from the given return nil
// parameters.
func (c ChannelConfig) InputThreshold() uint64 {
return uint64(float64(c.TargetNumFrames) * float64(c.TargetFrameSize) / c.ApproxComprRatio)
} }
type frameID struct { type frameID struct {
...@@ -142,7 +129,7 @@ type channelBuilder struct { ...@@ -142,7 +129,7 @@ type channelBuilder struct {
// newChannelBuilder creates a new channel builder or returns an error if the // newChannelBuilder creates a new channel builder or returns an error if the
// channel out could not be created. // channel out could not be created.
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) { func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) {
co, err := derive.NewChannelOut() co, err := derive.NewChannelOut(cfg.Compressor)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -218,8 +205,8 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error ...@@ -218,8 +205,8 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error
c.blocks = append(c.blocks, block) c.blocks = append(c.blocks, block)
c.updateSwTimeout(batch) c.updateSwTimeout(batch)
if c.inputTargetReached() { if err = c.cfg.Compressor.FullErr(); err != nil {
c.setFullErr(ErrInputTargetReached) c.setFullErr(err)
// Adding this block still worked, so don't return error, just mark as full // Adding this block still worked, so don't return error, just mark as full
} }
...@@ -293,12 +280,6 @@ func (c *channelBuilder) TimedOut(blockNum uint64) bool { ...@@ -293,12 +280,6 @@ func (c *channelBuilder) TimedOut(blockNum uint64) bool {
return c.timeout != 0 && blockNum >= c.timeout return c.timeout != 0 && blockNum >= c.timeout
} }
// inputTargetReached says whether the target amount of input data has been
// reached in this channel builder. No more blocks can be added afterwards.
func (c *channelBuilder) inputTargetReached() bool {
return uint64(c.co.InputBytes()) >= c.cfg.InputThreshold()
}
// IsFull returns whether the channel is full. // IsFull returns whether the channel is full.
// FullErr returns the reason for the channel being full. // FullErr returns the reason for the channel being full.
func (c *channelBuilder) IsFull() bool { func (c *channelBuilder) IsFull() bool {
...@@ -310,7 +291,7 @@ func (c *channelBuilder) IsFull() bool { ...@@ -310,7 +291,7 @@ func (c *channelBuilder) IsFull() bool {
// //
// It returns a ChannelFullError wrapping one of the following possible reasons // It returns a ChannelFullError wrapping one of the following possible reasons
// for the channel being full: // for the channel being full:
// - ErrInputTargetReached if the target amount of input data has been reached, // - derive.CompressorFullErr if the compressor target has been reached.
// - derive.MaxRLPBytesPerChannel if the general maximum amount of input data // - derive.MaxRLPBytesPerChannel if the general maximum amount of input data
// would have been exceeded by the latest AddBlock call, // would have been exceeded by the latest AddBlock call,
// - ErrMaxFrameIndex if the maximum number of frames has been generated // - ErrMaxFrameIndex if the maximum number of frames has been generated
......
...@@ -22,15 +22,21 @@ import ( ...@@ -22,15 +22,21 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var defaultTestChannelConfig = ChannelConfig{ func defaultTestChannelConfig(t *testing.T) ChannelConfig {
SeqWindowSize: 15, return ChannelConfig{
ChannelTimeout: 40, SeqWindowSize: 15,
MaxChannelDuration: 1, ChannelTimeout: 40,
SubSafetyMargin: 4, MaxChannelDuration: 1,
MaxFrameSize: 120000, SubSafetyMargin: 4,
TargetFrameSize: 100000, MaxFrameSize: 120000,
TargetNumFrames: 1, Compressor: newCompressor(t, 100000, 1, 0.4),
ApproxComprRatio: 0.4, }
}
func newCompressor(t *testing.T, targetFrameSize uint64, targetNumFrames int, approxCompRatio float64) derive.Compressor {
c, err := NewTargetSizeCompressor(targetFrameSize, targetNumFrames, approxCompRatio)
require.NoError(t, err)
return c
} }
// TestChannelConfig_Check tests the [ChannelConfig] [Check] function. // TestChannelConfig_Check tests the [ChannelConfig] [Check] function.
...@@ -41,14 +47,14 @@ func TestChannelConfig_Check(t *testing.T) { ...@@ -41,14 +47,14 @@ func TestChannelConfig_Check(t *testing.T) {
} }
// Construct test cases that test the boundary conditions // Construct test cases that test the boundary conditions
zeroChannelConfig := defaultTestChannelConfig zeroChannelConfig := defaultTestChannelConfig(t)
zeroChannelConfig.MaxFrameSize = 0 zeroChannelConfig.MaxFrameSize = 0
timeoutChannelConfig := defaultTestChannelConfig timeoutChannelConfig := defaultTestChannelConfig(t)
timeoutChannelConfig.ChannelTimeout = 0 timeoutChannelConfig.ChannelTimeout = 0
timeoutChannelConfig.SubSafetyMargin = 1 timeoutChannelConfig.SubSafetyMargin = 1
tests := []test{ tests := []test{
{ {
input: defaultTestChannelConfig, input: defaultTestChannelConfig(t),
assertion: func(output error) { assertion: func(output error) {
require.NoError(t, output) require.NoError(t, output)
}, },
...@@ -67,7 +73,7 @@ func TestChannelConfig_Check(t *testing.T) { ...@@ -67,7 +73,7 @@ func TestChannelConfig_Check(t *testing.T) {
}, },
} }
for i := 1; i < derive.FrameV0OverHeadSize; i++ { for i := 1; i < derive.FrameV0OverHeadSize; i++ {
smallChannelConfig := defaultTestChannelConfig smallChannelConfig := defaultTestChannelConfig(t)
smallChannelConfig.MaxFrameSize = uint64(i) smallChannelConfig.MaxFrameSize = uint64(i)
expectedErr := fmt.Sprintf("max frame size %d is less than the minimum 23", i) expectedErr := fmt.Sprintf("max frame size %d is less than the minimum 23", i)
tests = append(tests, test{ tests = append(tests, test{
...@@ -101,7 +107,7 @@ func FuzzChannelConfig_CheckTimeout(f *testing.F) { ...@@ -101,7 +107,7 @@ func FuzzChannelConfig_CheckTimeout(f *testing.F) {
subSafetyMargin = channelTimeout + 1 subSafetyMargin = channelTimeout + 1
} }
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
channelConfig.ChannelTimeout = channelTimeout channelConfig.ChannelTimeout = channelTimeout
channelConfig.SubSafetyMargin = subSafetyMargin channelConfig.SubSafetyMargin = subSafetyMargin
require.ErrorIs(t, channelConfig.Check(), ErrInvalidChannelTimeout) require.ErrorIs(t, channelConfig.Check(), ErrInvalidChannelTimeout)
...@@ -175,7 +181,7 @@ func FuzzDurationTimeoutZeroMaxChannelDuration(f *testing.F) { ...@@ -175,7 +181,7 @@ func FuzzDurationTimeoutZeroMaxChannelDuration(f *testing.F) {
f.Add(uint64(i)) f.Add(uint64(i))
} }
f.Fuzz(func(t *testing.T, l1BlockNum uint64) { f.Fuzz(func(t *testing.T, l1BlockNum uint64) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
channelConfig.MaxChannelDuration = 0 channelConfig.MaxChannelDuration = 0
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
require.NoError(t, err) require.NoError(t, err)
...@@ -198,7 +204,7 @@ func FuzzChannelBuilder_DurationZero(f *testing.F) { ...@@ -198,7 +204,7 @@ func FuzzChannelBuilder_DurationZero(f *testing.F) {
} }
// Create the channel builder // Create the channel builder
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
channelConfig.MaxChannelDuration = maxChannelDuration channelConfig.MaxChannelDuration = maxChannelDuration
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
require.NoError(t, err) require.NoError(t, err)
...@@ -225,7 +231,7 @@ func FuzzDurationTimeoutMaxChannelDuration(f *testing.F) { ...@@ -225,7 +231,7 @@ func FuzzDurationTimeoutMaxChannelDuration(f *testing.F) {
} }
// Create the channel builder // Create the channel builder
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
channelConfig.MaxChannelDuration = maxChannelDuration channelConfig.MaxChannelDuration = maxChannelDuration
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
require.NoError(t, err) require.NoError(t, err)
...@@ -258,7 +264,7 @@ func FuzzChannelCloseTimeout(f *testing.F) { ...@@ -258,7 +264,7 @@ func FuzzChannelCloseTimeout(f *testing.F) {
} }
f.Fuzz(func(t *testing.T, l1BlockNum uint64, channelTimeout uint64, subSafetyMargin uint64, timeout uint64) { f.Fuzz(func(t *testing.T, l1BlockNum uint64, channelTimeout uint64, subSafetyMargin uint64, timeout uint64) {
// Create the channel builder // Create the channel builder
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
channelConfig.ChannelTimeout = channelTimeout channelConfig.ChannelTimeout = channelTimeout
channelConfig.SubSafetyMargin = subSafetyMargin channelConfig.SubSafetyMargin = subSafetyMargin
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -286,7 +292,7 @@ func FuzzChannelZeroCloseTimeout(f *testing.F) { ...@@ -286,7 +292,7 @@ func FuzzChannelZeroCloseTimeout(f *testing.F) {
} }
f.Fuzz(func(t *testing.T, l1BlockNum uint64, channelTimeout uint64, subSafetyMargin uint64) { f.Fuzz(func(t *testing.T, l1BlockNum uint64, channelTimeout uint64, subSafetyMargin uint64) {
// Create the channel builder // Create the channel builder
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
channelConfig.ChannelTimeout = channelTimeout channelConfig.ChannelTimeout = channelTimeout
channelConfig.SubSafetyMargin = subSafetyMargin channelConfig.SubSafetyMargin = subSafetyMargin
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -313,7 +319,7 @@ func FuzzSeqWindowClose(f *testing.F) { ...@@ -313,7 +319,7 @@ func FuzzSeqWindowClose(f *testing.F) {
} }
f.Fuzz(func(t *testing.T, epochNum uint64, seqWindowSize uint64, subSafetyMargin uint64, timeout uint64) { f.Fuzz(func(t *testing.T, epochNum uint64, seqWindowSize uint64, subSafetyMargin uint64, timeout uint64) {
// Create the channel builder // Create the channel builder
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
channelConfig.SeqWindowSize = seqWindowSize channelConfig.SeqWindowSize = seqWindowSize
channelConfig.SubSafetyMargin = subSafetyMargin channelConfig.SubSafetyMargin = subSafetyMargin
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -345,7 +351,7 @@ func FuzzSeqWindowZeroTimeoutClose(f *testing.F) { ...@@ -345,7 +351,7 @@ func FuzzSeqWindowZeroTimeoutClose(f *testing.F) {
} }
f.Fuzz(func(t *testing.T, epochNum uint64, seqWindowSize uint64, subSafetyMargin uint64) { f.Fuzz(func(t *testing.T, epochNum uint64, seqWindowSize uint64, subSafetyMargin uint64) {
// Create the channel builder // Create the channel builder
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
channelConfig.SeqWindowSize = seqWindowSize channelConfig.SeqWindowSize = seqWindowSize
channelConfig.SubSafetyMargin = subSafetyMargin channelConfig.SubSafetyMargin = subSafetyMargin
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -368,7 +374,7 @@ func FuzzSeqWindowZeroTimeoutClose(f *testing.F) { ...@@ -368,7 +374,7 @@ func FuzzSeqWindowZeroTimeoutClose(f *testing.F) {
// TestChannelBuilder_NextFrame tests calling NextFrame on a ChannelBuilder with only one frame // TestChannelBuilder_NextFrame tests calling NextFrame on a ChannelBuilder with only one frame
func TestChannelBuilder_NextFrame(t *testing.T) { func TestChannelBuilder_NextFrame(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
// Create a new channel builder // Create a new channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -408,7 +414,7 @@ func TestChannelBuilder_NextFrame(t *testing.T) { ...@@ -408,7 +414,7 @@ func TestChannelBuilder_NextFrame(t *testing.T) {
// TestChannelBuilder_OutputWrongFramePanic tests that a panic is thrown when a frame is pushed with an invalid frame id // TestChannelBuilder_OutputWrongFramePanic tests that a panic is thrown when a frame is pushed with an invalid frame id
func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) { func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
// Construct a channel builder // Construct a channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -416,7 +422,7 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) { ...@@ -416,7 +422,7 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) {
// Mock the internals of `channelBuilder.outputFrame` // Mock the internals of `channelBuilder.outputFrame`
// to construct a single frame // to construct a single frame
co, err := derive.NewChannelOut() co, err := derive.NewChannelOut(channelConfig.Compressor)
require.NoError(t, err) require.NoError(t, err)
var buf bytes.Buffer var buf bytes.Buffer
fn, err := co.OutputFrame(&buf, channelConfig.MaxFrameSize) fn, err := co.OutputFrame(&buf, channelConfig.MaxFrameSize)
...@@ -438,7 +444,7 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) { ...@@ -438,7 +444,7 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) {
// TestChannelBuilder_OutputFramesWorks tests the [ChannelBuilder] OutputFrames is successful. // TestChannelBuilder_OutputFramesWorks tests the [ChannelBuilder] OutputFrames is successful.
func TestChannelBuilder_OutputFramesWorks(t *testing.T) { func TestChannelBuilder_OutputFramesWorks(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
channelConfig.MaxFrameSize = 24 channelConfig.MaxFrameSize = 24
// Construct the channel builder // Construct the channel builder
...@@ -481,10 +487,8 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) { ...@@ -481,10 +487,8 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) {
// function errors when the max RLP bytes per channel is reached. // function errors when the max RLP bytes per channel is reached.
func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) { func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) {
t.Parallel() t.Parallel()
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
channelConfig.MaxFrameSize = derive.MaxRLPBytesPerChannel * 2 channelConfig.Compressor = newCompressor(t, derive.MaxRLPBytesPerChannel*2, derive.MaxRLPBytesPerChannel*2, 1)
channelConfig.TargetFrameSize = derive.MaxRLPBytesPerChannel * 2
channelConfig.ApproxComprRatio = 1
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -498,11 +502,9 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) { ...@@ -498,11 +502,9 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) {
// TestChannelBuilder_OutputFramesMaxFrameIndex tests the [ChannelBuilder.OutputFrames] // TestChannelBuilder_OutputFramesMaxFrameIndex tests the [ChannelBuilder.OutputFrames]
// function errors when the max frame index is reached. // function errors when the max frame index is reached.
func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) { func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
channelConfig.MaxFrameSize = 24 channelConfig.MaxFrameSize = 24
channelConfig.TargetNumFrames = math.MaxInt channelConfig.Compressor = newCompressor(t, 24, math.MaxInt, 0)
channelConfig.TargetFrameSize = 24
channelConfig.ApproxComprRatio = 0
// Continuously add blocks until the max frame index is reached // Continuously add blocks until the max frame index is reached
// This should cause the [channelBuilder.OutputFrames] function // This should cause the [channelBuilder.OutputFrames] function
...@@ -538,15 +540,13 @@ func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) { ...@@ -538,15 +540,13 @@ func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) {
// TestChannelBuilder_AddBlock tests the AddBlock function // TestChannelBuilder_AddBlock tests the AddBlock function
func TestChannelBuilder_AddBlock(t *testing.T) { func TestChannelBuilder_AddBlock(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
// Lower the max frame size so that we can batch // Lower the max frame size so that we can batch
channelConfig.MaxFrameSize = 30 channelConfig.MaxFrameSize = 30
// Configure the Input Threshold params so we observe a full channel // Configure the Input Threshold params so we observe a full channel
channelConfig.TargetFrameSize = 30 channelConfig.Compressor = newCompressor(t, 30, 2, 1)
channelConfig.TargetNumFrames = 2
channelConfig.ApproxComprRatio = 1
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -564,12 +564,12 @@ func TestChannelBuilder_AddBlock(t *testing.T) { ...@@ -564,12 +564,12 @@ func TestChannelBuilder_AddBlock(t *testing.T) {
// Since the channel output is full, the next call to AddBlock // Since the channel output is full, the next call to AddBlock
// should return the channel out full error // should return the channel out full error
require.ErrorIs(t, addMiniBlock(cb), ErrInputTargetReached) require.ErrorIs(t, addMiniBlock(cb), derive.CompressorFullErr)
} }
// TestChannelBuilder_Reset tests the [Reset] function // TestChannelBuilder_Reset tests the [Reset] function
func TestChannelBuilder_Reset(t *testing.T) { func TestChannelBuilder_Reset(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
// Lower the max frame size so that we can batch // Lower the max frame size so that we can batch
channelConfig.MaxFrameSize = 24 channelConfig.MaxFrameSize = 24
...@@ -616,7 +616,7 @@ func TestChannelBuilder_Reset(t *testing.T) { ...@@ -616,7 +616,7 @@ func TestChannelBuilder_Reset(t *testing.T) {
// TestBuilderRegisterL1Block tests the RegisterL1Block function // TestBuilderRegisterL1Block tests the RegisterL1Block function
func TestBuilderRegisterL1Block(t *testing.T) { func TestBuilderRegisterL1Block(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -636,7 +636,7 @@ func TestBuilderRegisterL1Block(t *testing.T) { ...@@ -636,7 +636,7 @@ func TestBuilderRegisterL1Block(t *testing.T) {
// TestBuilderRegisterL1BlockZeroMaxChannelDuration tests the RegisterL1Block function // TestBuilderRegisterL1BlockZeroMaxChannelDuration tests the RegisterL1Block function
func TestBuilderRegisterL1BlockZeroMaxChannelDuration(t *testing.T) { func TestBuilderRegisterL1BlockZeroMaxChannelDuration(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
// Set the max channel duration to 0 // Set the max channel duration to 0
channelConfig.MaxChannelDuration = 0 channelConfig.MaxChannelDuration = 0
...@@ -660,7 +660,7 @@ func TestBuilderRegisterL1BlockZeroMaxChannelDuration(t *testing.T) { ...@@ -660,7 +660,7 @@ func TestBuilderRegisterL1BlockZeroMaxChannelDuration(t *testing.T) {
// TestFramePublished tests the FramePublished function // TestFramePublished tests the FramePublished function
func TestFramePublished(t *testing.T) { func TestFramePublished(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig(t)
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -700,11 +700,9 @@ func TestChannelBuilder_InputBytes(t *testing.T) { ...@@ -700,11 +700,9 @@ func TestChannelBuilder_InputBytes(t *testing.T) {
func TestChannelBuilder_OutputBytes(t *testing.T) { func TestChannelBuilder_OutputBytes(t *testing.T) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
cfg := defaultTestChannelConfig cfg := defaultTestChannelConfig(t)
cfg.TargetFrameSize = 1000
cfg.MaxFrameSize = 1000 cfg.MaxFrameSize = 1000
cfg.TargetNumFrames = 16 cfg.Compressor = newCompressor(t, 1000, 16, 1)
cfg.ApproxComprRatio = 1.0
cb, err := newChannelBuilder(cfg) cb, err := newChannelBuilder(cfg)
require.NoError(err, "newChannelBuilder") require.NoError(err, "newChannelBuilder")
...@@ -713,7 +711,7 @@ func TestChannelBuilder_OutputBytes(t *testing.T) { ...@@ -713,7 +711,7 @@ func TestChannelBuilder_OutputBytes(t *testing.T) {
for { for {
block, _ := dtest.RandomL2Block(rng, rng.Intn(32)) block, _ := dtest.RandomL2Block(rng, rng.Intn(32))
_, err := cb.AddBlock(block) _, err := cb.AddBlock(block)
if errors.Is(err, ErrInputTargetReached) { if errors.Is(err, derive.CompressorFullErr) {
break break
} }
require.NoError(err) require.NoError(err)
...@@ -734,7 +732,7 @@ func TestChannelBuilder_OutputBytes(t *testing.T) { ...@@ -734,7 +732,7 @@ func TestChannelBuilder_OutputBytes(t *testing.T) {
func defaultChannelBuilderSetup(t *testing.T) (*channelBuilder, ChannelConfig) { func defaultChannelBuilderSetup(t *testing.T) (*channelBuilder, ChannelConfig) {
t.Helper() t.Helper()
cfg := defaultTestChannelConfig cfg := defaultTestChannelConfig(t)
cb, err := newChannelBuilder(cfg) cb, err := newChannelBuilder(cfg)
require.NoError(t, err, "newChannelBuilder") require.NoError(t, err, "newChannelBuilder")
return cb, cfg return cb, cfg
......
...@@ -98,9 +98,8 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { ...@@ -98,9 +98,8 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0, MaxFrameSize: 120_000,
MaxFrameSize: 120_000, Compressor: newCompressor(t, 1, 1, 1),
ApproxComprRatio: 1.0,
}) })
a := newMiniL2Block(0) a := newMiniL2Block(0)
...@@ -170,9 +169,8 @@ func TestChannelManager_Clear(t *testing.T) { ...@@ -170,9 +169,8 @@ func TestChannelManager_Clear(t *testing.T) {
ChannelTimeout: 10, ChannelTimeout: 10,
// Have to set the max frame size here otherwise the channel builder would not // Have to set the max frame size here otherwise the channel builder would not
// be able to output any frames // be able to output any frames
MaxFrameSize: 24, MaxFrameSize: 24,
TargetFrameSize: 24, Compressor: newCompressor(t, 24, 1, 1),
ApproxComprRatio: 1.0,
}) })
// Channel Manager state should be empty by default // Channel Manager state should be empty by default
...@@ -331,9 +329,8 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -331,9 +329,8 @@ func TestChannelManager_TxResend(t *testing.T) {
log := testlog.Logger(t, log.LvlError) log := testlog.Logger(t, log.LvlError)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0, MaxFrameSize: 120_000,
MaxFrameSize: 120_000, Compressor: newCompressor(t, 1, 1, 1),
ApproxComprRatio: 1.0,
}) })
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -372,10 +369,9 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) { ...@@ -372,10 +369,9 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0, MaxFrameSize: 100,
MaxFrameSize: 100, Compressor: newCompressor(t, 0, 1, 1),
ApproxComprRatio: 1.0, ChannelTimeout: 1000,
ChannelTimeout: 1000,
}) })
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -397,10 +393,9 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) { ...@@ -397,10 +393,9 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0, MaxFrameSize: 1000,
MaxFrameSize: 100, Compressor: newCompressor(t, 1, 1, 1),
ApproxComprRatio: 1.0, ChannelTimeout: 1000,
ChannelTimeout: 1000,
}) })
a := newMiniL2Block(0) a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash()) b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
...@@ -433,11 +428,9 @@ func TestChannelManagerClosePendingChannel(t *testing.T) { ...@@ -433,11 +428,9 @@ func TestChannelManagerClosePendingChannel(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetNumFrames: 100, MaxFrameSize: 1000,
TargetFrameSize: 1000, Compressor: newCompressor(t, 1000, 100, 1),
MaxFrameSize: 1000, ChannelTimeout: 1000,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
}) })
a := newMiniL2Block(50_000) a := newMiniL2Block(50_000)
...@@ -476,11 +469,9 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) { ...@@ -476,11 +469,9 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetNumFrames: 100, MaxFrameSize: 1000,
TargetFrameSize: 1000, Compressor: newCompressor(t, 1000, 100, 1),
MaxFrameSize: 1000, ChannelTimeout: 1000,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
}) })
a := newMiniL2Block(50_000) a := newMiniL2Block(50_000)
......
...@@ -75,6 +75,15 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -75,6 +75,15 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
return nil, err return nil, err
} }
compressor, err := NewTargetSizeCompressor(
cfg.TargetL1TxSize-1, // subtract 1 byte for version
cfg.TargetNumFrames,
cfg.ApproxComprRatio,
)
if err != nil {
return nil, err
}
batcherCfg := Config{ batcherCfg := Config{
L1Client: l1Client, L1Client: l1Client,
L2Client: l2Client, L2Client: l2Client,
...@@ -89,10 +98,8 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -89,10 +98,8 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
ChannelTimeout: rcfg.ChannelTimeout, ChannelTimeout: rcfg.ChannelTimeout,
MaxChannelDuration: cfg.MaxChannelDuration, MaxChannelDuration: cfg.MaxChannelDuration,
SubSafetyMargin: cfg.SubSafetyMargin, SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
TargetFrameSize: cfg.TargetL1TxSize - 1, // subtract 1 byte for version Compressor: compressor,
TargetNumFrames: cfg.TargetNumFrames,
ApproxComprRatio: cfg.ApproxComprRatio,
}, },
} }
......
package batcher
import (
"bytes"
"compress/zlib"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type TargetSizeCompressor struct {
// The target number of frames to create per channel. Note that if the
// realized compression ratio is worse than the approximate, more frames may
// actually be created. This also depends on how close TargetFrameSize is to
// MaxFrameSize.
TargetFrameSize uint64
// The target number of frames to create in this channel. If the realized
// compression ratio is worse than approxComprRatio, additional leftover
// frame(s) might get created.
TargetNumFrames int
// Approximated compression ratio to assume. Should be slightly smaller than
// average from experiments to avoid the chances of creating a small
// additional leftover frame.
ApproxComprRatio float64
inputBytes int
buf bytes.Buffer
compress *zlib.Writer
}
func NewTargetSizeCompressor(targetFrameSize uint64, targetNumFrames int, approxCompRatio float64) (derive.Compressor, error) {
c := &TargetSizeCompressor{
TargetFrameSize: targetFrameSize,
TargetNumFrames: targetNumFrames,
ApproxComprRatio: approxCompRatio,
}
compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
if err != nil {
return nil, err
}
c.compress = compress
return c, nil
}
func (t *TargetSizeCompressor) Write(p []byte) (int, error) {
if err := t.FullErr(); err != nil {
return 0, err
}
t.inputBytes += len(p)
return t.compress.Write(p)
}
func (t *TargetSizeCompressor) Close() error {
return t.compress.Close()
}
func (t *TargetSizeCompressor) Read(p []byte) (int, error) {
return t.buf.Read(p)
}
func (t *TargetSizeCompressor) Reset() {
t.buf.Reset()
t.compress.Reset(&t.buf)
t.inputBytes = 0
}
func (t *TargetSizeCompressor) Len() int {
return t.buf.Len()
}
func (t *TargetSizeCompressor) Flush() error {
return t.compress.Flush()
}
func (t *TargetSizeCompressor) FullErr() error {
if t.inputTargetReached() {
return derive.CompressorFullErr
}
return nil
}
// InputThreshold calculates the input data threshold in bytes from the given
// parameters.
func (t *TargetSizeCompressor) InputThreshold() uint64 {
return uint64(float64(t.TargetNumFrames) * float64(t.TargetFrameSize) / t.ApproxComprRatio)
}
// inputTargetReached says whether the target amount of input data has been
// reached in this channel builder. No more blocks can be added afterwards.
func (t *TargetSizeCompressor) inputTargetReached() bool {
return uint64(t.inputBytes) >= t.InputThreshold()
}
...@@ -118,12 +118,13 @@ func TestInputThreshold(t *testing.T) { ...@@ -118,12 +118,13 @@ func TestInputThreshold(t *testing.T) {
// Validate each test case // Validate each test case
for _, tt := range tests { for _, tt := range tests {
config := batcher.ChannelConfig{ compressor, err := batcher.NewTargetSizeCompressor(
TargetFrameSize: tt.input.TargetFrameSize, tt.input.TargetFrameSize,
TargetNumFrames: tt.input.TargetNumFrames, tt.input.TargetNumFrames,
ApproxComprRatio: tt.input.ApproxComprRatio, tt.input.ApproxComprRatio,
} )
got := config.InputThreshold() require.NoError(t, err)
got := compressor.(*batcher.TargetSizeCompressor).InputThreshold()
tt.assertion(got) tt.assertion(got)
} }
} }
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -133,7 +134,9 @@ func (s *L2Batcher) Buffer(t Testing) error { ...@@ -133,7 +134,9 @@ func (s *L2Batcher) Buffer(t Testing) error {
if s.l2BatcherCfg.GarbageCfg != nil { if s.l2BatcherCfg.GarbageCfg != nil {
ch, err = NewGarbageChannelOut(s.l2BatcherCfg.GarbageCfg) ch, err = NewGarbageChannelOut(s.l2BatcherCfg.GarbageCfg)
} else { } else {
ch, err = derive.NewChannelOut() c, e := batcher.NewTargetSizeCompressor(s.l2BatcherCfg.MaxL1TxSize, 1, 1)
require.NoError(t, e, "failed to create compressor")
ch, err = derive.NewChannelOut(c)
} }
require.NoError(t, err, "failed to create channel") require.NoError(t, err, "failed to create channel")
s.l2ChannelOut = ch s.l2ChannelOut = ch
......
...@@ -2,7 +2,6 @@ package derive ...@@ -2,7 +2,6 @@ package derive
import ( import (
"bytes" "bytes"
"compress/zlib"
"crypto/rand" "crypto/rand"
"errors" "errors"
"fmt" "fmt"
...@@ -25,6 +24,30 @@ var ErrTooManyRLPBytes = errors.New("batch would cause RLP bytes to go over limi ...@@ -25,6 +24,30 @@ var ErrTooManyRLPBytes = errors.New("batch would cause RLP bytes to go over limi
// [Frame Format]: https://github.com/ethereum-optimism/optimism/blob/develop/specs/derivation.md#frame-format // [Frame Format]: https://github.com/ethereum-optimism/optimism/blob/develop/specs/derivation.md#frame-format
const FrameV0OverHeadSize = 23 const FrameV0OverHeadSize = 23
var CompressorFullErr = errors.New("compressor is full")
type Compressor interface {
// Writer is used to write uncompressed data which will be compressed. Should return
// CompressorFullErr if the compressor is full and no more data should be written.
io.Writer
// Closer Close function should be called before reading any data.
io.Closer
// Reader is used to Read compressed data; should only be called after Close.
io.Reader
// Reset will reset all written data
Reset()
// Len returns an estimate of the current length of the compressed data; calling Flush will
// increase the accuracy at the expense of a poorer compression ratio.
Len() int
// Flush flushes any uncompressed data to the compression buffer. This will result in a
// non-optimal compression ratio.
Flush() error
// FullErr returns CompressorFullErr if the compressor is known to be full. Note that
// calls to Write will fail if an error is returned from this method, but calls to Write
// can still return CompressorFullErr even if this does not.
FullErr() error
}
type ChannelOut struct { type ChannelOut struct {
id ChannelID id ChannelID
// Frame ID of the next frame to emit. Increment after emitting // Frame ID of the next frame to emit. Increment after emitting
...@@ -33,9 +56,7 @@ type ChannelOut struct { ...@@ -33,9 +56,7 @@ type ChannelOut struct {
rlpLength int rlpLength int
// Compressor stage. Write input data to it // Compressor stage. Write input data to it
compress *zlib.Writer compress Compressor
// post compression buffer
buf bytes.Buffer
closed bool closed bool
} }
...@@ -44,23 +65,18 @@ func (co *ChannelOut) ID() ChannelID { ...@@ -44,23 +65,18 @@ func (co *ChannelOut) ID() ChannelID {
return co.id return co.id
} }
func NewChannelOut() (*ChannelOut, error) { func NewChannelOut(compress Compressor) (*ChannelOut, error) {
c := &ChannelOut{ c := &ChannelOut{
id: ChannelID{}, // TODO: use GUID here instead of fully random data id: ChannelID{}, // TODO: use GUID here instead of fully random data
frame: 0, frame: 0,
rlpLength: 0, rlpLength: 0,
compress: compress,
} }
_, err := rand.Read(c.id[:]) _, err := rand.Read(c.id[:])
if err != nil { if err != nil {
return nil, err return nil, err
} }
compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
if err != nil {
return nil, err
}
c.compress = compress
return c, nil return c, nil
} }
...@@ -68,8 +84,7 @@ func NewChannelOut() (*ChannelOut, error) { ...@@ -68,8 +84,7 @@ func NewChannelOut() (*ChannelOut, error) {
func (co *ChannelOut) Reset() error { func (co *ChannelOut) Reset() error {
co.frame = 0 co.frame = 0
co.rlpLength = 0 co.rlpLength = 0
co.buf.Reset() co.compress.Reset()
co.compress.Reset(&co.buf)
co.closed = false co.closed = false
_, err := rand.Read(co.id[:]) _, err := rand.Read(co.id[:])
return err return err
...@@ -116,7 +131,8 @@ func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) { ...@@ -116,7 +131,8 @@ func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) {
} }
co.rlpLength += buf.Len() co.rlpLength += buf.Len()
written, err := io.Copy(co.compress, &buf) // avoid using io.Copy here, because we need all or nothing
written, err := co.compress.Write(buf.Bytes())
return uint64(written), err return uint64(written), err
} }
...@@ -129,7 +145,7 @@ func (co *ChannelOut) InputBytes() int { ...@@ -129,7 +145,7 @@ func (co *ChannelOut) InputBytes() int {
// Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes // Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes
// are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage. // are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage.
func (co *ChannelOut) ReadyBytes() int { func (co *ChannelOut) ReadyBytes() int {
return co.buf.Len() return co.compress.Len()
} }
// Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more // Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more
...@@ -166,8 +182,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro ...@@ -166,8 +182,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro
// Copy data from the local buffer into the frame data buffer // Copy data from the local buffer into the frame data buffer
maxDataSize := maxSize - FrameV0OverHeadSize maxDataSize := maxSize - FrameV0OverHeadSize
if maxDataSize > uint64(co.buf.Len()) { if maxDataSize > uint64(co.compress.Len()) {
maxDataSize = uint64(co.buf.Len()) maxDataSize = uint64(co.compress.Len())
// If we are closed & will not spill past the current frame // If we are closed & will not spill past the current frame
// mark it is the final frame of the channel. // mark it is the final frame of the channel.
if co.closed { if co.closed {
...@@ -176,7 +192,7 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro ...@@ -176,7 +192,7 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro
} }
f.Data = make([]byte, maxDataSize) f.Data = make([]byte, maxDataSize)
if _, err := io.ReadFull(&co.buf, f.Data); err != nil { if _, err := io.ReadFull(co.compress, f.Data); err != nil {
return 0, err return 0, err
} }
......
...@@ -11,8 +11,21 @@ import ( ...@@ -11,8 +11,21 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// basic implementation of the Compressor interface that does no compression
type nonCompressor struct {
bytes.Buffer
}
func (s *nonCompressor) Flush() error {
return nil
}
func (s *nonCompressor) Close() error {
return nil
}
func TestChannelOutAddBlock(t *testing.T) { func TestChannelOutAddBlock(t *testing.T) {
cout, err := NewChannelOut() cout, err := NewChannelOut(&nonCompressor{})
require.NoError(t, err) require.NoError(t, err)
t.Run("returns err if first tx is not an l1info tx", func(t *testing.T) { t.Run("returns err if first tx is not an l1info tx", func(t *testing.T) {
...@@ -33,7 +46,7 @@ func TestChannelOutAddBlock(t *testing.T) { ...@@ -33,7 +46,7 @@ func TestChannelOutAddBlock(t *testing.T) {
// max size that is below the fixed frame size overhead of 23, will return // max size that is below the fixed frame size overhead of 23, will return
// an error. // an error.
func TestOutputFrameSmallMaxSize(t *testing.T) { func TestOutputFrameSmallMaxSize(t *testing.T) {
cout, err := NewChannelOut() cout, err := NewChannelOut(&nonCompressor{})
require.NoError(t, err) require.NoError(t, err)
// Call OutputFrame with the range of small max size values that err // Call OutputFrame with the range of small max size values that err
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment