Commit bed7459a authored by Michael de Hoog's avatar Michael de Hoog

Don't reuse compressor

parent ba075423
...@@ -32,6 +32,8 @@ func (e *ChannelFullError) Unwrap() error { ...@@ -32,6 +32,8 @@ func (e *ChannelFullError) Unwrap() error {
return e.Err return e.Err
} }
type CompressorFactory func() (derive.Compressor, error)
type ChannelConfig struct { type ChannelConfig struct {
// Number of epochs (L1 blocks) per sequencing window, including the epoch // Number of epochs (L1 blocks) per sequencing window, including the epoch
// L1 origin block itself // L1 origin block itself
...@@ -54,8 +56,8 @@ type ChannelConfig struct { ...@@ -54,8 +56,8 @@ type ChannelConfig struct {
SubSafetyMargin uint64 SubSafetyMargin uint64
// The maximum byte-size a frame can have. // The maximum byte-size a frame can have.
MaxFrameSize uint64 MaxFrameSize uint64
// Compressor to use to compress frame data. // CompressorFactory creates Compressors to use to compress frame data.
Compressor derive.Compressor CompressorFactory CompressorFactory
} }
// Check validates the [ChannelConfig] parameters. // Check validates the [ChannelConfig] parameters.
...@@ -82,7 +84,7 @@ func (cc *ChannelConfig) Check() error { ...@@ -82,7 +84,7 @@ func (cc *ChannelConfig) Check() error {
} }
// Compressor must be set // Compressor must be set
if cc.Compressor == nil { if cc.CompressorFactory == nil {
return errors.New("compressor cannot be nil") return errors.New("compressor cannot be nil")
} }
...@@ -129,7 +131,11 @@ type channelBuilder struct { ...@@ -129,7 +131,11 @@ type channelBuilder struct {
// newChannelBuilder creates a new channel builder or returns an error if the // newChannelBuilder creates a new channel builder or returns an error if the
// channel out could not be created. // channel out could not be created.
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) { func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) {
co, err := derive.NewChannelOut(cfg.Compressor) c, err := cfg.CompressorFactory()
if err != nil {
return nil, err
}
co, err := derive.NewChannelOut(c)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -205,7 +211,7 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error ...@@ -205,7 +211,7 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error
c.blocks = append(c.blocks, block) c.blocks = append(c.blocks, block)
c.updateSwTimeout(batch) c.updateSwTimeout(batch)
if err = c.cfg.Compressor.FullErr(); err != nil { if err = c.co.FullErr(); err != nil {
c.setFullErr(err) c.setFullErr(err)
// Adding this block still worked, so don't return error, just mark as full // Adding this block still worked, so don't return error, just mark as full
} }
......
This diff is collapsed.
...@@ -99,7 +99,7 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { ...@@ -99,7 +99,7 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 120_000, MaxFrameSize: 120_000,
Compressor: newCompressor(t, 1, 1, 1), CompressorFactory: newCompressorFactory(1, 1, 1),
}) })
a := newMiniL2Block(0) a := newMiniL2Block(0)
...@@ -170,7 +170,7 @@ func TestChannelManager_Clear(t *testing.T) { ...@@ -170,7 +170,7 @@ func TestChannelManager_Clear(t *testing.T) {
// Have to set the max frame size here otherwise the channel builder would not // Have to set the max frame size here otherwise the channel builder would not
// be able to output any frames // be able to output any frames
MaxFrameSize: 24, MaxFrameSize: 24,
Compressor: newCompressor(t, 24, 1, 1), CompressorFactory: newCompressorFactory(24, 1, 1),
}) })
// Channel Manager state should be empty by default // Channel Manager state should be empty by default
...@@ -330,7 +330,7 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -330,7 +330,7 @@ func TestChannelManager_TxResend(t *testing.T) {
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 120_000, MaxFrameSize: 120_000,
Compressor: newCompressor(t, 1, 1, 1), CompressorFactory: newCompressorFactory(1, 1, 1),
}) })
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -370,7 +370,7 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) { ...@@ -370,7 +370,7 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 100, MaxFrameSize: 100,
Compressor: newCompressor(t, 0, 1, 1), CompressorFactory: newCompressorFactory(0, 1, 1),
ChannelTimeout: 1000, ChannelTimeout: 1000,
}) })
...@@ -394,7 +394,7 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) { ...@@ -394,7 +394,7 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 1000, MaxFrameSize: 1000,
Compressor: newCompressor(t, 1, 1, 1), CompressorFactory: newCompressorFactory(1, 1, 1),
ChannelTimeout: 1000, ChannelTimeout: 1000,
}) })
a := newMiniL2Block(0) a := newMiniL2Block(0)
...@@ -429,7 +429,7 @@ func TestChannelManagerClosePendingChannel(t *testing.T) { ...@@ -429,7 +429,7 @@ func TestChannelManagerClosePendingChannel(t *testing.T) {
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 1000, MaxFrameSize: 1000,
Compressor: newCompressor(t, 1000, 100, 1), CompressorFactory: newCompressorFactory(1000, 100, 1),
ChannelTimeout: 1000, ChannelTimeout: 1000,
}) })
...@@ -470,7 +470,7 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) { ...@@ -470,7 +470,7 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 1000, MaxFrameSize: 1000,
Compressor: newCompressor(t, 1000, 100, 1), CompressorFactory: newCompressorFactory(1000, 100, 1),
ChannelTimeout: 1000, ChannelTimeout: 1000,
}) })
......
...@@ -154,7 +154,8 @@ func NewConfig(ctx *cli.Context) CLIConfig { ...@@ -154,7 +154,8 @@ func NewConfig(ctx *cli.Context) CLIConfig {
} }
} }
func (c CLIConfig) NewCompressor() (derive.Compressor, error) { func (c CLIConfig) NewCompressorFactory() CompressorFactory {
return func() (derive.Compressor, error) {
switch c.CompressorKind { switch c.CompressorKind {
case flags.CompressorShadow: case flags.CompressorShadow:
return NewShadowCompressor( return NewShadowCompressor(
...@@ -167,4 +168,5 @@ func (c CLIConfig) NewCompressor() (derive.Compressor, error) { ...@@ -167,4 +168,5 @@ func (c CLIConfig) NewCompressor() (derive.Compressor, error) {
c.ApproxComprRatio, c.ApproxComprRatio,
) )
} }
}
} }
...@@ -75,11 +75,6 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -75,11 +75,6 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
return nil, err return nil, err
} }
compressor, err := cfg.NewCompressor()
if err != nil {
return nil, err
}
batcherCfg := Config{ batcherCfg := Config{
L1Client: l1Client, L1Client: l1Client,
L2Client: l2Client, L2Client: l2Client,
...@@ -95,7 +90,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -95,7 +90,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
MaxChannelDuration: cfg.MaxChannelDuration, MaxChannelDuration: cfg.MaxChannelDuration,
SubSafetyMargin: cfg.SubSafetyMargin, SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
Compressor: compressor, CompressorFactory: cfg.NewCompressorFactory(),
}, },
} }
......
...@@ -154,6 +154,10 @@ func (co *ChannelOut) Flush() error { ...@@ -154,6 +154,10 @@ func (co *ChannelOut) Flush() error {
return co.compress.Flush() return co.compress.Flush()
} }
func (co *ChannelOut) FullErr() error {
return co.compress.FullErr()
}
func (co *ChannelOut) Close() error { func (co *ChannelOut) Close() error {
if co.closed { if co.closed {
return errors.New("already closed") return errors.New("already closed")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment