Commit 6f831fdc authored by Michael de Hoog's avatar Michael de Hoog

Move compressor and CLI configuration to separate package

parent d3d92148
...@@ -7,7 +7,7 @@ import ( ...@@ -7,7 +7,7 @@ import (
"io" "io"
"math" "math"
"github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
...@@ -55,21 +55,11 @@ type ChannelConfig struct { ...@@ -55,21 +55,11 @@ type ChannelConfig struct {
SubSafetyMargin uint64 SubSafetyMargin uint64
// The maximum byte-size a frame can have. // The maximum byte-size a frame can have.
MaxFrameSize uint64 MaxFrameSize uint64
// The target number of frames to create per channel. Note that if the
// realized compression ratio is worse than the approximate, more frames may // CompressorConfig contains the configuration for creating new compressors.
// actually be created. This also depends on how close TargetFrameSize is to CompressorConfig compressor.Config
// MaxFrameSize. // CompressorFactory creates new compressors.
TargetFrameSize uint64 CompressorFactory compressor.FactoryFunc
// The target number of frames to create in this channel. If the realized
// compression ratio is worse than approxComprRatio, additional leftover
// frame(s) might get created.
TargetNumFrames int
// Approximated compression ratio to assume. Should be slightly smaller than
// average from experiments to avoid the chances of creating a small
// additional leftover frame.
ApproxComprRatio float64
// CompressorKind is the compressor implementation to use.
CompressorKind flags.CompressorKind
} }
// Check validates the [ChannelConfig] parameters. // Check validates the [ChannelConfig] parameters.
...@@ -98,22 +88,6 @@ func (cc *ChannelConfig) Check() error { ...@@ -98,22 +88,6 @@ func (cc *ChannelConfig) Check() error {
return nil return nil
} }
func (cc *ChannelConfig) NewCompressor() (derive.Compressor, error) {
switch cc.CompressorKind {
case flags.ShadowCompressorKind:
return NewShadowCompressor(
cc.TargetFrameSize,
cc.TargetNumFrames,
)
default:
return NewRatioCompressor(
cc.TargetFrameSize,
cc.TargetNumFrames,
cc.ApproxComprRatio,
)
}
}
type frameID struct { type frameID struct {
chID derive.ChannelID chID derive.ChannelID
frameNumber uint16 frameNumber uint16
...@@ -154,7 +128,7 @@ type channelBuilder struct { ...@@ -154,7 +128,7 @@ type channelBuilder struct {
// newChannelBuilder creates a new channel builder or returns an error if the // newChannelBuilder creates a new channel builder or returns an error if the
// channel out could not be created. // channel out could not be created.
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) { func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) {
c, err := cfg.NewCompressor() c, err := cfg.CompressorFactory(cfg.CompressorConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -28,9 +29,12 @@ var defaultTestChannelConfig = ChannelConfig{ ...@@ -28,9 +29,12 @@ var defaultTestChannelConfig = ChannelConfig{
MaxChannelDuration: 1, MaxChannelDuration: 1,
SubSafetyMargin: 4, SubSafetyMargin: 4,
MaxFrameSize: 120000, MaxFrameSize: 120000,
CompressorConfig: compressor.Config{
TargetFrameSize: 100000, TargetFrameSize: 100000,
TargetNumFrames: 1, TargetNumFrames: 1,
ApproxComprRatio: 0.4, ApproxComprRatio: 0.4,
},
CompressorFactory: compressor.NewRatioCompressor,
} }
// TestChannelConfig_Check tests the [ChannelConfig] [Check] function. // TestChannelConfig_Check tests the [ChannelConfig] [Check] function.
...@@ -416,7 +420,7 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) { ...@@ -416,7 +420,7 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) {
// Mock the internals of `channelBuilder.outputFrame` // Mock the internals of `channelBuilder.outputFrame`
// to construct a single frame // to construct a single frame
c, err := channelConfig.NewCompressor() c, err := channelConfig.CompressorFactory(channelConfig.CompressorConfig)
require.NoError(t, err) require.NoError(t, err)
co, err := derive.NewChannelOut(c) co, err := derive.NewChannelOut(c)
require.NoError(t, err) require.NoError(t, err)
...@@ -485,8 +489,8 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) { ...@@ -485,8 +489,8 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) {
t.Parallel() t.Parallel()
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.MaxFrameSize = derive.MaxRLPBytesPerChannel * 2 channelConfig.MaxFrameSize = derive.MaxRLPBytesPerChannel * 2
channelConfig.TargetFrameSize = derive.MaxRLPBytesPerChannel * 2 channelConfig.CompressorConfig.TargetFrameSize = derive.MaxRLPBytesPerChannel * 2
channelConfig.ApproxComprRatio = 1 channelConfig.CompressorConfig.ApproxComprRatio = 1
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -502,9 +506,9 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) { ...@@ -502,9 +506,9 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) {
func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) { func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.MaxFrameSize = 24 channelConfig.MaxFrameSize = 24
channelConfig.TargetNumFrames = math.MaxInt channelConfig.CompressorConfig.TargetNumFrames = math.MaxInt
channelConfig.TargetFrameSize = 24 channelConfig.CompressorConfig.TargetFrameSize = 24
channelConfig.ApproxComprRatio = 0 channelConfig.CompressorConfig.ApproxComprRatio = 0
// Continuously add blocks until the max frame index is reached // Continuously add blocks until the max frame index is reached
// This should cause the [channelBuilder.OutputFrames] function // This should cause the [channelBuilder.OutputFrames] function
...@@ -546,9 +550,9 @@ func TestChannelBuilder_AddBlock(t *testing.T) { ...@@ -546,9 +550,9 @@ func TestChannelBuilder_AddBlock(t *testing.T) {
channelConfig.MaxFrameSize = 30 channelConfig.MaxFrameSize = 30
// Configure the Input Threshold params so we observe a full channel // Configure the Input Threshold params so we observe a full channel
channelConfig.TargetFrameSize = 30 channelConfig.CompressorConfig.TargetFrameSize = 30
channelConfig.TargetNumFrames = 2 channelConfig.CompressorConfig.TargetNumFrames = 2
channelConfig.ApproxComprRatio = 1 channelConfig.CompressorConfig.ApproxComprRatio = 1
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -703,10 +707,10 @@ func TestChannelBuilder_OutputBytes(t *testing.T) { ...@@ -703,10 +707,10 @@ func TestChannelBuilder_OutputBytes(t *testing.T) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
cfg := defaultTestChannelConfig cfg := defaultTestChannelConfig
cfg.TargetFrameSize = 1000 cfg.CompressorConfig.TargetFrameSize = 1000
cfg.MaxFrameSize = 1000 cfg.MaxFrameSize = 1000
cfg.TargetNumFrames = 16 cfg.CompressorConfig.TargetNumFrames = 16
cfg.ApproxComprRatio = 1.0 cfg.CompressorConfig.ApproxComprRatio = 1.0
cb, err := newChannelBuilder(cfg) cb, err := newChannelBuilder(cfg)
require.NoError(err, "newChannelBuilder") require.NoError(err, "newChannelBuilder")
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -98,9 +99,12 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { ...@@ -98,9 +99,12 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0,
MaxFrameSize: 120_000, MaxFrameSize: 120_000,
CompressorConfig: compressor.Config{
TargetFrameSize: 0,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
},
CompressorFactory: compressor.NewRatioCompressor,
}) })
a := newMiniL2Block(0) a := newMiniL2Block(0)
...@@ -171,9 +175,12 @@ func TestChannelManager_Clear(t *testing.T) { ...@@ -171,9 +175,12 @@ func TestChannelManager_Clear(t *testing.T) {
// Have to set the max frame size here otherwise the channel builder would not // Have to set the max frame size here otherwise the channel builder would not
// be able to output any frames // be able to output any frames
MaxFrameSize: 24, MaxFrameSize: 24,
CompressorConfig: compressor.Config{
TargetFrameSize: 24, TargetFrameSize: 24,
TargetNumFrames: 1, TargetNumFrames: 1,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
},
CompressorFactory: compressor.NewRatioCompressor,
}) })
// Channel Manager state should be empty by default // Channel Manager state should be empty by default
...@@ -332,9 +339,12 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -332,9 +339,12 @@ func TestChannelManager_TxResend(t *testing.T) {
log := testlog.Logger(t, log.LvlError) log := testlog.Logger(t, log.LvlError)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0,
MaxFrameSize: 120_000, MaxFrameSize: 120_000,
CompressorConfig: compressor.Config{
TargetFrameSize: 0,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
},
CompressorFactory: compressor.NewRatioCompressor,
}) })
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -373,10 +383,13 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) { ...@@ -373,10 +383,13 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0,
MaxFrameSize: 100, MaxFrameSize: 100,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000, ChannelTimeout: 1000,
CompressorConfig: compressor.Config{
TargetFrameSize: 0,
ApproxComprRatio: 1.0,
},
CompressorFactory: compressor.NewRatioCompressor,
}) })
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -398,11 +411,14 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) { ...@@ -398,11 +411,14 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 100,
ChannelTimeout: 1000,
CompressorConfig: compressor.Config{
TargetFrameSize: 1, TargetFrameSize: 1,
TargetNumFrames: 1, TargetNumFrames: 1,
MaxFrameSize: 100,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
ChannelTimeout: 1000, },
CompressorFactory: compressor.NewRatioCompressor,
}) })
a := newMiniL2Block(0) a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash()) b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
...@@ -435,11 +451,14 @@ func TestChannelManagerClosePendingChannel(t *testing.T) { ...@@ -435,11 +451,14 @@ func TestChannelManagerClosePendingChannel(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 1000,
ChannelTimeout: 1000,
CompressorConfig: compressor.Config{
TargetNumFrames: 100, TargetNumFrames: 100,
TargetFrameSize: 1000, TargetFrameSize: 1000,
MaxFrameSize: 1000,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
ChannelTimeout: 1000, },
CompressorFactory: compressor.NewRatioCompressor,
}) })
a := newMiniL2Block(50_000) a := newMiniL2Block(50_000)
...@@ -478,11 +497,14 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) { ...@@ -478,11 +497,14 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 1000,
ChannelTimeout: 1000,
CompressorConfig: compressor.Config{
TargetNumFrames: 100, TargetNumFrames: 100,
TargetFrameSize: 1000, TargetFrameSize: 1000,
MaxFrameSize: 1000,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
ChannelTimeout: 1000, },
CompressorFactory: compressor.NewRatioCompressor,
}) })
a := newMiniL2Block(50_000) a := newMiniL2Block(50_000)
......
package batcher package batcher
import ( import (
"strings"
"time" "time"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-batcher/rpc"
...@@ -85,19 +85,6 @@ type CLIConfig struct { ...@@ -85,19 +85,6 @@ type CLIConfig struct {
// MaxL1TxSize is the maximum size of a batch tx submitted to L1. // MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64 MaxL1TxSize uint64
// TargetL1TxSize is the target size of a batch tx submitted to L1.
TargetL1TxSize uint64
// TargetNumFrames is the target number of frames per channel.
TargetNumFrames int
// ApproxComprRatio is the approximate compression ratio (<= 1.0) of the used
// compression algorithm.
ApproxComprRatio float64
// CompressorKind is the compressor implementation to use.
CompressorKind flags.CompressorKind
Stopped bool Stopped bool
TxMgrConfig txmgr.CLIConfig TxMgrConfig txmgr.CLIConfig
...@@ -105,6 +92,7 @@ type CLIConfig struct { ...@@ -105,6 +92,7 @@ type CLIConfig struct {
LogConfig oplog.CLIConfig LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig MetricsConfig opmetrics.CLIConfig
PprofConfig oppprof.CLIConfig PprofConfig oppprof.CLIConfig
CompressorConfig compressor.CLIConfig
} }
func (c CLIConfig) Check() error { func (c CLIConfig) Check() error {
...@@ -123,6 +111,9 @@ func (c CLIConfig) Check() error { ...@@ -123,6 +111,9 @@ func (c CLIConfig) Check() error {
if err := c.TxMgrConfig.Check(); err != nil { if err := c.TxMgrConfig.Check(); err != nil {
return err return err
} }
if err := c.CompressorConfig.Check(); err != nil {
return err
}
return nil return nil
} }
...@@ -140,15 +131,12 @@ func NewConfig(ctx *cli.Context) CLIConfig { ...@@ -140,15 +131,12 @@ func NewConfig(ctx *cli.Context) CLIConfig {
MaxPendingTransactions: ctx.GlobalUint64(flags.MaxPendingTransactionsFlag.Name), MaxPendingTransactions: ctx.GlobalUint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name), MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name), MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
CompressorKind: flags.CompressorKind(strings.ToLower(ctx.GlobalString(flags.CompressorFlag.Name))),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name), Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx), TxMgrConfig: txmgr.ReadCLIConfig(ctx),
RPCConfig: rpc.ReadCLIConfig(ctx), RPCConfig: rpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx), LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx), MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx), PprofConfig: oppprof.ReadCLIConfig(ctx),
CompressorConfig: compressor.ReadCLIConfig(ctx),
} }
} }
...@@ -75,6 +75,11 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -75,6 +75,11 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
return nil, err return nil, err
} }
compressorFactory, err := cfg.CompressorConfig.Factory()
if err != nil {
return nil, err
}
batcherCfg := Config{ batcherCfg := Config{
L1Client: l1Client, L1Client: l1Client,
L2Client: l2Client, L2Client: l2Client,
...@@ -90,10 +95,8 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -90,10 +95,8 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
MaxChannelDuration: cfg.MaxChannelDuration, MaxChannelDuration: cfg.MaxChannelDuration,
SubSafetyMargin: cfg.SubSafetyMargin, SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
TargetFrameSize: cfg.TargetL1TxSize - 1, // subtract 1 byte for version CompressorConfig: cfg.CompressorConfig.Config,
TargetNumFrames: cfg.TargetNumFrames, CompressorFactory: compressorFactory,
ApproxComprRatio: cfg.ApproxComprRatio,
CompressorKind: cfg.CompressorKind,
}, },
} }
......
package compressor
import (
"fmt"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/urfave/cli"
)
const (
TargetL1TxSizeBytesFlagName = "target-l1-tx-size-bytes"
TargetNumFramesFlagName = "target-num-frames"
ApproxComprRatioFlagName = "approx-compr-ratio"
TypeFlagName = "compressor"
)
func CLIFlags(envPrefix string) []cli.Flag {
return []cli.Flag{
cli.Uint64Flag{
Name: TargetL1TxSizeBytesFlagName,
Usage: "The target size of a batch tx submitted to L1.",
Value: 100_000,
EnvVar: opservice.PrefixEnvVar(envPrefix, "TARGET_L1_TX_SIZE_BYTES"),
},
cli.IntFlag{
Name: TargetNumFramesFlagName,
Usage: "The target number of frames to create per channel",
Value: 1,
EnvVar: opservice.PrefixEnvVar(envPrefix, "TARGET_NUM_FRAMES"),
},
cli.Float64Flag{
Name: ApproxComprRatioFlagName,
Usage: "The approximate compression ratio (<= 1.0)",
Value: 0.4,
EnvVar: opservice.PrefixEnvVar(envPrefix, "APPROX_COMPR_RATIO"),
},
cli.StringFlag{
Name: TypeFlagName,
Usage: "The type of compressor. Valid options: " + FactoryFlags(),
EnvVar: opservice.PrefixEnvVar(envPrefix, "COMPRESSOR"),
Value: Ratio.FlagValue,
},
}
}
type CLIConfig struct {
Type string
Config Config
}
func (c *CLIConfig) Check() error {
_, err := c.Factory()
return err
}
func (c *CLIConfig) Factory() (FactoryFunc, error) {
for _, f := range Factories {
if f.FlagValue == c.Type {
return f.FactoryFunc, nil
}
}
return nil, fmt.Errorf("unknown compressor kind: %q", c.Type)
}
func ReadCLIConfig(ctx *cli.Context) CLIConfig {
return CLIConfig{
Type: ctx.GlobalString(TypeFlagName),
Config: Config{
TargetFrameSize: ctx.GlobalUint64(TargetL1TxSizeBytesFlagName) - 1, // subtract 1 byte for version,
TargetNumFrames: ctx.GlobalInt(TargetNumFramesFlagName),
ApproxComprRatio: ctx.GlobalFloat64(ApproxComprRatioFlagName),
},
}
}
package compressor
type Config struct {
// FrameSizeTarget to target when creating channel frames. Note that if the
// realized compression ratio is worse than the approximate, more frames may
// actually be created. This also depends on how close the target is to the
// max frame size.
TargetFrameSize uint64
// NumFramesTarget to create in this channel. If the realized compression ratio
// is worse than approxComprRatio, additional leftover frame(s) might get created.
TargetNumFrames int
// ApproxCompRatio to assume. Should be slightly smaller than average from
// experiments to avoid the chances of creating a small additional leftover frame.
ApproxComprRatio float64
}
package compressor
import (
"strings"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type FactoryFunc func(Config) (derive.Compressor, error)
type FactoryFlag struct {
FlagValue string
FactoryFunc FactoryFunc
}
var (
// The Ratio Factory creates new RatioCompressor's (see NewRatioCompressor
// for a description).
Ratio = FactoryFlag{
FlagValue: "ratio",
FactoryFunc: NewRatioCompressor,
}
// The Shadow Factory creates new ShadowCompressor's (see NewShadowCompressor
// for a description).
Shadow = FactoryFlag{
FlagValue: "shadow",
FactoryFunc: NewShadowCompressor,
}
)
var Factories = []FactoryFlag{
Ratio,
Shadow,
}
func FactoryFlags() string {
var out strings.Builder
for i, v := range Factories {
out.WriteString(v.FlagValue)
if i+1 < len(Factories) {
out.WriteString(", ")
}
}
return out.String()
}
package batcher package compressor
import ( import (
"bytes" "bytes"
...@@ -8,19 +8,7 @@ import ( ...@@ -8,19 +8,7 @@ import (
) )
type RatioCompressor struct { type RatioCompressor struct {
// The frame size to target when creating channel frames. Note that if the config Config
// realized compression ratio is worse than the approximate, more frames may
// actually be created. This also depends on how close TargetFrameSize is to
// MaxFrameSize.
TargetFrameSize uint64
// The target number of frames to create in this channel. If the realized
// compression ratio is worse than approxComprRatio, additional leftover
// frame(s) might get created.
TargetNumFrames int
// Approximated compression ratio to assume. Should be slightly smaller than
// average from experiments to avoid the chances of creating a small
// additional leftover frame.
ApproxComprRatio float64
inputBytes int inputBytes int
buf bytes.Buffer buf bytes.Buffer
...@@ -32,11 +20,9 @@ type RatioCompressor struct { ...@@ -32,11 +20,9 @@ type RatioCompressor struct {
// the compressor before it's considered full. The full calculation is as follows: // the compressor before it's considered full. The full calculation is as follows:
// //
// full = uncompressedLength * approxCompRatio >= targetFrameSize * targetNumFrames // full = uncompressedLength * approxCompRatio >= targetFrameSize * targetNumFrames
func NewRatioCompressor(targetFrameSize uint64, targetNumFrames int, approxCompRatio float64) (derive.Compressor, error) { func NewRatioCompressor(config Config) (derive.Compressor, error) {
c := &RatioCompressor{ c := &RatioCompressor{
TargetFrameSize: targetFrameSize, config: config,
TargetNumFrames: targetNumFrames,
ApproxComprRatio: approxCompRatio,
} }
compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression) compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
...@@ -88,7 +74,7 @@ func (t *RatioCompressor) FullErr() error { ...@@ -88,7 +74,7 @@ func (t *RatioCompressor) FullErr() error {
// InputThreshold calculates the input data threshold in bytes from the given // InputThreshold calculates the input data threshold in bytes from the given
// parameters. // parameters.
func (t *RatioCompressor) InputThreshold() uint64 { func (t *RatioCompressor) InputThreshold() uint64 {
return uint64(float64(t.TargetNumFrames) * float64(t.TargetFrameSize) / t.ApproxComprRatio) return uint64(float64(t.config.TargetNumFrames) * float64(t.config.TargetFrameSize) / t.config.ApproxComprRatio)
} }
// inputTargetReached says whether the target amount of input data has been // inputTargetReached says whether the target amount of input data has been
......
package batcher_test package compressor_test
import ( import (
"math" "math"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-batcher/batcher" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -119,15 +118,13 @@ func TestInputThreshold(t *testing.T) { ...@@ -119,15 +118,13 @@ func TestInputThreshold(t *testing.T) {
// Validate each test case // Validate each test case
for _, tt := range tests { for _, tt := range tests {
config := batcher.ChannelConfig{ comp, err := compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: tt.input.TargetFrameSize, TargetFrameSize: tt.input.TargetFrameSize,
TargetNumFrames: tt.input.TargetNumFrames, TargetNumFrames: tt.input.TargetNumFrames,
ApproxComprRatio: tt.input.ApproxComprRatio, ApproxComprRatio: tt.input.ApproxComprRatio,
CompressorKind: flags.RatioCompressorKind, })
}
comp, err := config.NewCompressor()
require.NoError(t, err) require.NoError(t, err)
got := comp.(*batcher.RatioCompressor).InputThreshold() got := comp.(*compressor.RatioCompressor).InputThreshold()
tt.assertion(got) tt.assertion(got)
} }
} }
package batcher package compressor
import ( import (
"bytes" "bytes"
...@@ -8,14 +8,7 @@ import ( ...@@ -8,14 +8,7 @@ import (
) )
type ShadowCompressor struct { type ShadowCompressor struct {
// The frame size to target when creating channel frames. When adding new config Config
// data to the shadow compressor causes the buffer size to be greater than
// the target size, the compressor is marked as full.
TargetFrameSize uint64
// The target number of frames to create in this channel. If the realized
// compression ratio is worse than approxComprRatio, additional leftover
// frame(s) might get created.
TargetNumFrames int
buf bytes.Buffer buf bytes.Buffer
compress *zlib.Writer compress *zlib.Writer
...@@ -33,10 +26,9 @@ type ShadowCompressor struct { ...@@ -33,10 +26,9 @@ type ShadowCompressor struct {
// exception to this rule: the first write to the buffer is not checked against the // exception to this rule: the first write to the buffer is not checked against the
// target, which allows individual blocks larger than the target to be included (and will // target, which allows individual blocks larger than the target to be included (and will
// be split across multiple channel frames). // be split across multiple channel frames).
func NewShadowCompressor(targetFrameSize uint64, targetNumFrames int) (derive.Compressor, error) { func NewShadowCompressor(config Config) (derive.Compressor, error) {
c := &ShadowCompressor{ c := &ShadowCompressor{
TargetFrameSize: targetFrameSize, config: config,
TargetNumFrames: targetNumFrames,
} }
var err error var err error
...@@ -61,7 +53,7 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) { ...@@ -61,7 +53,7 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) {
if err != nil { if err != nil {
return 0, err return 0, err
} }
if uint64(t.shadowBuf.Len()) > t.TargetFrameSize*uint64(t.TargetNumFrames) { if uint64(t.shadowBuf.Len()) > t.config.TargetFrameSize*uint64(t.config.TargetNumFrames) {
t.fullErr = derive.CompressorFullErr t.fullErr = derive.CompressorFullErr
if t.Len() > 0 { if t.Len() > 0 {
// only return an error if we've already written data to this compressor before // only return an error if we've already written data to this compressor before
......
package batcher_test package compressor_test
import ( import (
"bytes" "bytes"
...@@ -7,7 +7,7 @@ import ( ...@@ -7,7 +7,7 @@ import (
"math/rand" "math/rand"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-batcher/batcher" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -64,7 +64,10 @@ func TestShadowCompressor(t *testing.T) { ...@@ -64,7 +64,10 @@ func TestShadowCompressor(t *testing.T) {
t.Parallel() t.Parallel()
require.Equal(t, len(test.errs), len(test.data), "invalid test case: len(data) != len(errs)") require.Equal(t, len(test.errs), len(test.data), "invalid test case: len(data) != len(errs)")
sc, err := batcher.NewShadowCompressor(test.targetFrameSize, test.targetNumFrames) sc, err := compressor.NewShadowCompressor(compressor.Config{
TargetFrameSize: test.targetFrameSize,
TargetNumFrames: test.targetNumFrames,
})
require.NoError(t, err) require.NoError(t, err)
for i, d := range test.data { for i, d := range test.data {
......
...@@ -6,8 +6,8 @@ import ( ...@@ -6,8 +6,8 @@ import (
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-batcher/rpc"
"github.com/ethereum-optimism/optimism/op-node/flags"
opservice "github.com/ethereum-optimism/optimism/op-service" opservice "github.com/ethereum-optimism/optimism/op-service"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
...@@ -68,34 +68,6 @@ var ( ...@@ -68,34 +68,6 @@ var (
Value: 120_000, Value: 120_000,
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "MAX_L1_TX_SIZE_BYTES"), EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "MAX_L1_TX_SIZE_BYTES"),
} }
TargetL1TxSizeBytesFlag = cli.Uint64Flag{
Name: "target-l1-tx-size-bytes",
Usage: "The target size of a batch tx submitted to L1.",
Value: 100_000,
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "TARGET_L1_TX_SIZE_BYTES"),
}
TargetNumFramesFlag = cli.IntFlag{
Name: "target-num-frames",
Usage: "The target number of frames to create per channel",
Value: 1,
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "TARGET_NUM_FRAMES"),
}
ApproxComprRatioFlag = cli.Float64Flag{
Name: "approx-compr-ratio",
Usage: "The approximate compression ratio (<= 1.0)",
Value: 0.4,
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "APPROX_COMPR_RATIO"),
}
CompressorFlag = cli.GenericFlag{
Name: "compressor",
Usage: "The type of compressor. Valid options: " +
flags.EnumString[CompressorKind](CompressorKinds),
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "COMPRESSOR"),
Value: func() *CompressorKind {
out := RatioCompressorKind
return &out
}(),
}
StoppedFlag = cli.BoolFlag{ StoppedFlag = cli.BoolFlag{
Name: "stopped", Name: "stopped",
Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC", Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC",
...@@ -117,10 +89,6 @@ var optionalFlags = []cli.Flag{ ...@@ -117,10 +89,6 @@ var optionalFlags = []cli.Flag{
MaxPendingTransactionsFlag, MaxPendingTransactionsFlag,
MaxChannelDurationFlag, MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag, MaxL1TxSizeBytesFlag,
TargetL1TxSizeBytesFlag,
TargetNumFramesFlag,
ApproxComprRatioFlag,
CompressorFlag,
StoppedFlag, StoppedFlag,
SequencerHDPathFlag, SequencerHDPathFlag,
} }
...@@ -132,6 +100,7 @@ func init() { ...@@ -132,6 +100,7 @@ func init() {
optionalFlags = append(optionalFlags, oppprof.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, oppprof.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, rpc.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, rpc.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, txmgr.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, txmgr.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, compressor.CLIFlags(EnvVarPrefix)...)
Flags = append(requiredFlags, optionalFlags...) Flags = append(requiredFlags, optionalFlags...)
} }
...@@ -147,41 +116,3 @@ func CheckRequired(ctx *cli.Context) error { ...@@ -147,41 +116,3 @@ func CheckRequired(ctx *cli.Context) error {
} }
return nil return nil
} }
// CompressorKind identifies a compressor implementation.
type CompressorKind string
const (
// The RatioCompressorKind kind selects the batcher.RatioCompressor (see
// batcher.NewRatioCompressor for a description).
RatioCompressorKind CompressorKind = "ratio"
// The ShadowCompressorKind kind selects the batcher.ShadowCompressor (see
// batcher.NewShadowCompressor for a description).
ShadowCompressorKind CompressorKind = "shadow"
)
var CompressorKinds = []CompressorKind{
RatioCompressorKind,
ShadowCompressorKind,
}
func (kind CompressorKind) String() string {
return string(kind)
}
func (kind *CompressorKind) Set(value string) error {
if !ValidCompressorKind(CompressorKind(value)) {
return fmt.Errorf("unknown compressor kind: %q", value)
}
*kind = CompressorKind(value)
return nil
}
func ValidCompressorKind(value CompressorKind) bool {
for _, k := range CompressorKinds {
if k == value {
return true
}
}
return false
}
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"time" "time"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics" batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
...@@ -335,9 +336,14 @@ func TestMigration(t *testing.T) { ...@@ -335,9 +336,14 @@ func TestMigration(t *testing.T) {
RollupRpc: rollupNode.HTTPEndpoint(), RollupRpc: rollupNode.HTTPEndpoint(),
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000, CompressorConfig: compressor.CLIConfig{
Type: compressor.Ratio.FlagValue,
Config: compressor.Config{
TargetFrameSize: 100_000 - 1,
TargetNumFrames: 1, TargetNumFrames: 1,
ApproxComprRatio: 0.4, ApproxComprRatio: 0.4,
},
},
SubSafetyMargin: 4, SubSafetyMargin: 4,
PollInterval: 50 * time.Millisecond, PollInterval: 50 * time.Millisecond,
TxMgrConfig: newTxMgrConfig(forkedL1URL, secrets.Batcher), TxMgrConfig: newTxMgrConfig(forkedL1URL, secrets.Batcher),
......
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics" batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys" "github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis" "github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
...@@ -599,9 +600,14 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -599,9 +600,14 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
MaxPendingTransactions: 1, MaxPendingTransactions: 1,
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000, CompressorConfig: compressor.CLIConfig{
Type: compressor.Ratio.FlagValue,
Config: compressor.Config{
TargetFrameSize: 100_000 - 1,
TargetNumFrames: 1, TargetNumFrames: 1,
ApproxComprRatio: 0.4, ApproxComprRatio: 0.4,
},
},
SubSafetyMargin: 4, SubSafetyMargin: 4,
PollInterval: 50 * time.Millisecond, PollInterval: 50 * time.Millisecond,
TxMgrConfig: newTxMgrConfig(sys.Nodes["l1"].WSEndpoint(), cfg.Secrets.Batcher), TxMgrConfig: newTxMgrConfig(sys.Nodes["l1"].WSEndpoint(), cfg.Secrets.Batcher),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment