Commit 461b2134 authored by Michael de Hoog's avatar Michael de Hoog

Move compressor instantiation to ChannelConfig

parent 7a73979b
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"io" "io"
"math" "math"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
...@@ -32,8 +33,6 @@ func (e *ChannelFullError) Unwrap() error { ...@@ -32,8 +33,6 @@ func (e *ChannelFullError) Unwrap() error {
return e.Err return e.Err
} }
type CompressorFactory func() (derive.Compressor, error)
type ChannelConfig struct { type ChannelConfig struct {
// Number of epochs (L1 blocks) per sequencing window, including the epoch // Number of epochs (L1 blocks) per sequencing window, including the epoch
// L1 origin block itself // L1 origin block itself
...@@ -56,8 +55,21 @@ type ChannelConfig struct { ...@@ -56,8 +55,21 @@ type ChannelConfig struct {
SubSafetyMargin uint64 SubSafetyMargin uint64
// The maximum byte-size a frame can have. // The maximum byte-size a frame can have.
MaxFrameSize uint64 MaxFrameSize uint64
// CompressorFactory creates Compressors used to compress frame data. // The target number of frames to create per channel. Note that if the
CompressorFactory CompressorFactory // realized compression ratio is worse than the approximate, more frames may
// actually be created. This also depends on how close TargetFrameSize is to
// MaxFrameSize.
TargetFrameSize uint64
// The target number of frames to create in this channel. If the realized
// compression ratio is worse than approxComprRatio, additional leftover
// frame(s) might get created.
TargetNumFrames int
// Approximated compression ratio to assume. Should be slightly smaller than
// average from experiments to avoid the chances of creating a small
// additional leftover frame.
ApproxComprRatio float64
// CompressorKind is the compressor implementation to use.
CompressorKind flags.CompressorKind
} }
// Check validates the [ChannelConfig] parameters. // Check validates the [ChannelConfig] parameters.
...@@ -83,14 +95,24 @@ func (cc *ChannelConfig) Check() error { ...@@ -83,14 +95,24 @@ func (cc *ChannelConfig) Check() error {
return fmt.Errorf("max frame size %d is less than the minimum 23", cc.MaxFrameSize) return fmt.Errorf("max frame size %d is less than the minimum 23", cc.MaxFrameSize)
} }
// Compressor must be set
if cc.CompressorFactory == nil {
return errors.New("compressor factory cannot be nil")
}
return nil return nil
} }
func (cc *ChannelConfig) NewCompressor() (derive.Compressor, error) {
switch cc.CompressorKind {
case flags.CompressorShadow:
return NewShadowCompressor(
cc.MaxFrameSize, // subtract 1 byte for version
)
default:
return NewTargetSizeCompressor(
cc.TargetFrameSize, // subtract 1 byte for version
cc.TargetNumFrames,
cc.ApproxComprRatio,
)
}
}
type frameID struct { type frameID struct {
chID derive.ChannelID chID derive.ChannelID
frameNumber uint16 frameNumber uint16
...@@ -131,7 +153,7 @@ type channelBuilder struct { ...@@ -131,7 +153,7 @@ type channelBuilder struct {
// newChannelBuilder creates a new channel builder or returns an error if the // newChannelBuilder creates a new channel builder or returns an error if the
// channel out could not be created. // channel out could not be created.
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) { func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) {
c, err := cfg.CompressorFactory() c, err := cfg.NewCompressor()
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -28,13 +28,9 @@ var defaultTestChannelConfig = ChannelConfig{ ...@@ -28,13 +28,9 @@ var defaultTestChannelConfig = ChannelConfig{
MaxChannelDuration: 1, MaxChannelDuration: 1,
SubSafetyMargin: 4, SubSafetyMargin: 4,
MaxFrameSize: 120000, MaxFrameSize: 120000,
CompressorFactory: newCompressorFactory(100000, 1, 0.4), TargetFrameSize: 100000,
} TargetNumFrames: 1,
ApproxComprRatio: 0.4,
func newCompressorFactory(targetFrameSize uint64, targetNumFrames int, approxCompRatio float64) CompressorFactory {
return func() (derive.Compressor, error) {
return NewTargetSizeCompressor(targetFrameSize, targetNumFrames, approxCompRatio)
}
} }
// TestChannelConfig_Check tests the [ChannelConfig] [Check] function. // TestChannelConfig_Check tests the [ChannelConfig] [Check] function.
...@@ -420,7 +416,7 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) { ...@@ -420,7 +416,7 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) {
// Mock the internals of `channelBuilder.outputFrame` // Mock the internals of `channelBuilder.outputFrame`
// to construct a single frame // to construct a single frame
c, err := channelConfig.CompressorFactory() c, err := channelConfig.NewCompressor()
require.NoError(t, err) require.NoError(t, err)
co, err := derive.NewChannelOut(c) co, err := derive.NewChannelOut(c)
require.NoError(t, err) require.NoError(t, err)
...@@ -488,7 +484,9 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) { ...@@ -488,7 +484,9 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) {
func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) { func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) {
t.Parallel() t.Parallel()
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.CompressorFactory = newCompressorFactory(derive.MaxRLPBytesPerChannel*2, derive.MaxRLPBytesPerChannel*2, 1) channelConfig.MaxFrameSize = derive.MaxRLPBytesPerChannel * 2
channelConfig.TargetFrameSize = derive.MaxRLPBytesPerChannel * 2
channelConfig.ApproxComprRatio = 1
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -504,7 +502,9 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) { ...@@ -504,7 +502,9 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) {
func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) { func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.MaxFrameSize = 24 channelConfig.MaxFrameSize = 24
channelConfig.CompressorFactory = newCompressorFactory(24, math.MaxInt, 0) channelConfig.TargetNumFrames = math.MaxInt
channelConfig.TargetFrameSize = 24
channelConfig.ApproxComprRatio = 0
// Continuously add blocks until the max frame index is reached // Continuously add blocks until the max frame index is reached
// This should cause the [channelBuilder.OutputFrames] function // This should cause the [channelBuilder.OutputFrames] function
...@@ -546,7 +546,9 @@ func TestChannelBuilder_AddBlock(t *testing.T) { ...@@ -546,7 +546,9 @@ func TestChannelBuilder_AddBlock(t *testing.T) {
channelConfig.MaxFrameSize = 30 channelConfig.MaxFrameSize = 30
// Configure the Input Threshold params so we observe a full channel // Configure the Input Threshold params so we observe a full channel
channelConfig.CompressorFactory = newCompressorFactory(30, 2, 1) channelConfig.TargetFrameSize = 30
channelConfig.TargetNumFrames = 2
channelConfig.ApproxComprRatio = 1
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -701,8 +703,10 @@ func TestChannelBuilder_OutputBytes(t *testing.T) { ...@@ -701,8 +703,10 @@ func TestChannelBuilder_OutputBytes(t *testing.T) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
cfg := defaultTestChannelConfig cfg := defaultTestChannelConfig
cfg.TargetFrameSize = 1000
cfg.MaxFrameSize = 1000 cfg.MaxFrameSize = 1000
cfg.CompressorFactory = newCompressorFactory(1000, 16, 1) cfg.TargetNumFrames = 16
cfg.ApproxComprRatio = 1.0
cb, err := newChannelBuilder(cfg) cb, err := newChannelBuilder(cfg)
require.NoError(err, "newChannelBuilder") require.NoError(err, "newChannelBuilder")
......
...@@ -98,8 +98,9 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { ...@@ -98,8 +98,9 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 120_000, TargetFrameSize: 1,
CompressorFactory: newCompressorFactory(1, 1, 1), MaxFrameSize: 120_000,
ApproxComprRatio: 1.0,
}) })
a := newMiniL2Block(0) a := newMiniL2Block(0)
...@@ -169,8 +170,9 @@ func TestChannelManager_Clear(t *testing.T) { ...@@ -169,8 +170,9 @@ func TestChannelManager_Clear(t *testing.T) {
ChannelTimeout: 10, ChannelTimeout: 10,
// Have to set the max frame size here otherwise the channel builder would not // Have to set the max frame size here otherwise the channel builder would not
// be able to output any frames // be able to output any frames
MaxFrameSize: 24, MaxFrameSize: 24,
CompressorFactory: newCompressorFactory(24, 1, 1), TargetFrameSize: 24,
ApproxComprRatio: 1.0,
}) })
// Channel Manager state should be empty by default // Channel Manager state should be empty by default
...@@ -329,8 +331,9 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -329,8 +331,9 @@ func TestChannelManager_TxResend(t *testing.T) {
log := testlog.Logger(t, log.LvlError) log := testlog.Logger(t, log.LvlError)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 120_000, TargetFrameSize: 1,
CompressorFactory: newCompressorFactory(1, 1, 1), MaxFrameSize: 120_000,
ApproxComprRatio: 1.0,
}) })
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -369,9 +372,10 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) { ...@@ -369,9 +372,10 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 100, TargetFrameSize: 1,
CompressorFactory: newCompressorFactory(0, 1, 1), MaxFrameSize: 100,
ChannelTimeout: 1000, ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
}) })
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -393,9 +397,10 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) { ...@@ -393,9 +397,10 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 1000, TargetFrameSize: 1,
CompressorFactory: newCompressorFactory(1, 1, 1), MaxFrameSize: 100,
ChannelTimeout: 1000, ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
}) })
a := newMiniL2Block(0) a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash()) b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
...@@ -428,9 +433,11 @@ func TestChannelManagerClosePendingChannel(t *testing.T) { ...@@ -428,9 +433,11 @@ func TestChannelManagerClosePendingChannel(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 1000, TargetNumFrames: 100,
CompressorFactory: newCompressorFactory(1000, 100, 1), TargetFrameSize: 1000,
ChannelTimeout: 1000, MaxFrameSize: 1000,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
}) })
a := newMiniL2Block(50_000) a := newMiniL2Block(50_000)
...@@ -469,9 +476,11 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) { ...@@ -469,9 +476,11 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 1000, TargetNumFrames: 100,
CompressorFactory: newCompressorFactory(1000, 100, 1), TargetFrameSize: 1000,
ChannelTimeout: 1000, MaxFrameSize: 1000,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
}) })
a := newMiniL2Block(50_000) a := newMiniL2Block(50_000)
......
...@@ -12,7 +12,6 @@ import ( ...@@ -12,7 +12,6 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-batcher/rpc"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
...@@ -153,20 +152,3 @@ func NewConfig(ctx *cli.Context) CLIConfig { ...@@ -153,20 +152,3 @@ func NewConfig(ctx *cli.Context) CLIConfig {
PprofConfig: oppprof.ReadCLIConfig(ctx), PprofConfig: oppprof.ReadCLIConfig(ctx),
} }
} }
func (c CLIConfig) NewCompressorFactory() CompressorFactory {
return func() (derive.Compressor, error) {
switch c.CompressorKind {
case flags.CompressorTarget:
return NewTargetSizeCompressor(
c.TargetL1TxSize-1, // subtract 1 byte for version
c.TargetNumFrames,
c.ApproxComprRatio,
)
default:
return NewShadowCompressor(
c.MaxL1TxSize - 1, // subtract 1 byte for version
)
}
}
}
...@@ -89,8 +89,11 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -89,8 +89,11 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
ChannelTimeout: rcfg.ChannelTimeout, ChannelTimeout: rcfg.ChannelTimeout,
MaxChannelDuration: cfg.MaxChannelDuration, MaxChannelDuration: cfg.MaxChannelDuration,
SubSafetyMargin: cfg.SubSafetyMargin, SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
CompressorFactory: cfg.NewCompressorFactory(), TargetFrameSize: cfg.TargetL1TxSize - 1, // subtract 1 byte for version
TargetNumFrames: cfg.TargetNumFrames,
ApproxComprRatio: cfg.ApproxComprRatio,
CompressorKind: cfg.CompressorKind,
}, },
} }
......
package batcher_test package batcher_test
import ( import (
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"math" "math"
"testing" "testing"
...@@ -118,13 +119,15 @@ func TestInputThreshold(t *testing.T) { ...@@ -118,13 +119,15 @@ func TestInputThreshold(t *testing.T) {
// Validate each test case // Validate each test case
for _, tt := range tests { for _, tt := range tests {
compressor, err := batcher.NewTargetSizeCompressor( config := batcher.ChannelConfig{
tt.input.TargetFrameSize, TargetFrameSize: tt.input.TargetFrameSize,
tt.input.TargetNumFrames, TargetNumFrames: tt.input.TargetNumFrames,
tt.input.ApproxComprRatio, ApproxComprRatio: tt.input.ApproxComprRatio,
) CompressorKind: flags.CompressorTarget,
}
comp, err := config.NewCompressor()
require.NoError(t, err) require.NoError(t, err)
got := compressor.(*batcher.TargetSizeCompressor).InputThreshold() got := comp.(*batcher.TargetSizeCompressor).InputThreshold()
tt.assertion(got) tt.assertion(got)
} }
} }
...@@ -92,7 +92,7 @@ var ( ...@@ -92,7 +92,7 @@ var (
flags.EnumString[CompressorKind](CompressorKinds), flags.EnumString[CompressorKind](CompressorKinds),
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "COMPRESSOR"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "COMPRESSOR"),
Value: func() *CompressorKind { Value: func() *CompressorKind {
out := CompressorShadow out := CompressorTarget
return &out return &out
}(), }(),
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment