Commit d7afde1d authored by Sebastian Stammler's avatar Sebastian Stammler Committed by GitHub

op-batcher: rework channel & compressor config, fix overhead bug (#9887)

parent 06b48767
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"io" "io"
"math" "math"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
...@@ -35,81 +34,6 @@ func (e *ChannelFullError) Unwrap() error { ...@@ -35,81 +34,6 @@ func (e *ChannelFullError) Unwrap() error {
return e.Err return e.Err
} }
type ChannelConfig struct {
// Number of epochs (L1 blocks) per sequencing window, including the epoch
// L1 origin block itself
SeqWindowSize uint64
// The maximum number of L1 blocks that the inclusion transactions of a
// channel's frames can span.
ChannelTimeout uint64
// Builder Config
// MaxChannelDuration is the maximum duration (in #L1-blocks) to keep the
// channel open. This allows control over how long a channel is kept open
// during times of low transaction volume.
//
// If 0, duration checks are disabled.
MaxChannelDuration uint64
// The batcher tx submission safety margin (in #L1-blocks) to subtract from
// a channel's timeout and sequencing window, to guarantee safe inclusion of
// a channel on L1.
SubSafetyMargin uint64
// The maximum byte-size a frame can have.
MaxFrameSize uint64
// CompressorConfig contains the configuration for creating new compressors.
CompressorConfig compressor.Config
// BatchType indicates whether the channel uses SingularBatch or SpanBatch.
BatchType uint
// Whether to put all frames of a channel inside a single tx.
// Should only be used for blob transactions.
MultiFrameTxs bool
}
func (cc *ChannelConfig) MaxFramesPerTx() int {
if !cc.MultiFrameTxs {
return 1
}
return cc.CompressorConfig.TargetNumFrames
}
// Check validates the [ChannelConfig] parameters.
func (cc *ChannelConfig) Check() error {
// The [ChannelTimeout] must be larger than the [SubSafetyMargin].
// Otherwise, new blocks would always be considered timed out.
if cc.ChannelTimeout < cc.SubSafetyMargin {
return ErrInvalidChannelTimeout
}
// If the [MaxFrameSize] is set to 0, the channel builder
// will infinitely loop when trying to create frames in the
// [ChannelBuilder.OutputFrames] function.
if cc.MaxFrameSize == 0 {
return errors.New("max frame size cannot be zero")
}
// If the [MaxFrameSize] is less than [FrameV0OverHeadSize], the channel
// out will underflow the maxSize variable in the [derive.ChannelOut].
// Since it is of type uint64, it will wrap around to a very large
// number, making the frame size extremely large.
if cc.MaxFrameSize < derive.FrameV0OverHeadSize {
return fmt.Errorf("max frame size %d is less than the minimum 23", cc.MaxFrameSize)
}
if cc.BatchType > derive.SpanBatchType {
return fmt.Errorf("unrecognized batch type: %d", cc.BatchType)
}
if nf := cc.CompressorConfig.TargetNumFrames; nf < 1 {
return fmt.Errorf("invalid number of frames %d", nf)
}
return nil
}
type frameID struct { type frameID struct {
chID derive.ChannelID chID derive.ChannelID
frameNumber uint16 frameNumber uint16
...@@ -371,9 +295,9 @@ func (c *ChannelBuilder) OutputFrames() error { ...@@ -371,9 +295,9 @@ func (c *ChannelBuilder) OutputFrames() error {
// This is part of an optimization to already generate frames and send them off // This is part of an optimization to already generate frames and send them off
// as txs while still collecting blocks in the channel builder. // as txs while still collecting blocks in the channel builder.
func (c *ChannelBuilder) outputReadyFrames() error { func (c *ChannelBuilder) outputReadyFrames() error {
// TODO: Decide whether we want to fill frames to max size and use target // When creating a frame from the ready compression data, the frame overhead
// only for estimation, or use target size. // will be added to the total output size, so we can add it in the condition.
for c.co.ReadyBytes() >= int(c.cfg.MaxFrameSize) { for c.co.ReadyBytes()+derive.FrameV0OverHeadSize >= int(c.cfg.MaxFrameSize) {
if err := c.outputFrame(); err == io.EOF { if err := c.outputFrame(); err == io.EOF {
return nil return nil
} else if err != nil { } else if err != nil {
......
This diff is collapsed.
package batcher
import (
"fmt"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type ChannelConfig struct {
// Number of epochs (L1 blocks) per sequencing window, including the epoch
// L1 origin block itself
SeqWindowSize uint64
// The maximum number of L1 blocks that the inclusion transactions of a
// channel's frames can span.
ChannelTimeout uint64
// Builder Config
// MaxChannelDuration is the maximum duration (in #L1-blocks) to keep the
// channel open. This allows control over how long a channel is kept open
// during times of low transaction volume.
//
// If 0, duration checks are disabled.
MaxChannelDuration uint64
// The batcher tx submission safety margin (in #L1-blocks) to subtract from
// a channel's timeout and sequencing window, to guarantee safe inclusion of
// a channel on L1.
SubSafetyMargin uint64
// The maximum byte-size a frame can have.
MaxFrameSize uint64
// Target number of frames to create per channel.
// For blob transactions, this controls the number of blobs to target adding
// to each blob tx.
TargetNumFrames int
// CompressorConfig contains the configuration for creating new compressors.
// It should not be set directly, but via the Init*Compressor methods after
// creating the ChannelConfig to guarantee a consistent configuration.
CompressorConfig compressor.Config
// BatchType indicates whether the channel uses SingularBatch or SpanBatch.
BatchType uint
// Whether to put all frames of a channel inside a single tx.
// Should only be used for blob transactions.
MultiFrameTxs bool
}
// InitCompressorConfig (re)initializes the channel configuration's compressor
// configuration using the given values. The TargetOutputSize will be set to a
// value consistent with cc.TargetNumFrames and cc.MaxFrameSize.
// comprKind can be the empty string, in which case the default compressor will
// be used.
func (cc *ChannelConfig) InitCompressorConfig(approxComprRatio float64, comprKind string) {
cc.CompressorConfig = compressor.Config{
// Compressor output size needs to account for frame encoding overhead
TargetOutputSize: MaxDataSize(cc.TargetNumFrames, cc.MaxFrameSize),
ApproxComprRatio: approxComprRatio,
Kind: comprKind,
}
}
func (cc *ChannelConfig) InitRatioCompressor(approxComprRatio float64) {
cc.InitCompressorConfig(approxComprRatio, compressor.RatioKind)
}
func (cc *ChannelConfig) InitShadowCompressor() {
cc.InitCompressorConfig(0, compressor.ShadowKind)
}
func (cc *ChannelConfig) InitNoneCompressor() {
cc.InitCompressorConfig(0, compressor.NoneKind)
}
func (cc *ChannelConfig) MaxFramesPerTx() int {
if !cc.MultiFrameTxs {
return 1
}
return cc.TargetNumFrames
}
// Check validates the [ChannelConfig] parameters.
func (cc *ChannelConfig) Check() error {
// The [ChannelTimeout] must be larger than the [SubSafetyMargin].
// Otherwise, new blocks would always be considered timed out.
if cc.ChannelTimeout < cc.SubSafetyMargin {
return ErrInvalidChannelTimeout
}
// The max frame size must at least be able to accommodate the constant
// frame overhead.
if cc.MaxFrameSize < derive.FrameV0OverHeadSize {
return fmt.Errorf("max frame size %d is less than the minimum %d",
cc.MaxFrameSize, derive.FrameV0OverHeadSize)
}
if cc.BatchType > derive.SpanBatchType {
return fmt.Errorf("unrecognized batch type: %d", cc.BatchType)
}
if nf := cc.TargetNumFrames; nf < 1 {
return fmt.Errorf("invalid number of frames %d", nf)
}
return nil
}
// MaxDataSize returns the maximum byte size of output data that can be packed
// into a channel with numFrames frames and frames of max size maxFrameSize.
// It accounts for the constant frame overhead. It panics if the maxFrameSize
// is smaller than [derive.FrameV0OverHeadSize].
func MaxDataSize(numFrames int, maxFrameSize uint64) uint64 {
if maxFrameSize < derive.FrameV0OverHeadSize {
panic("max frame size smaller than frame overhead")
}
return uint64(numFrames) * (maxFrameSize - derive.FrameV0OverHeadSize)
}
package batcher
import (
"fmt"
"math"
"testing"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/stretchr/testify/require"
)
func defaultTestChannelConfig() ChannelConfig {
c := ChannelConfig{
SeqWindowSize: 15,
ChannelTimeout: 40,
MaxChannelDuration: 1,
SubSafetyMargin: 4,
MaxFrameSize: 120_000,
TargetNumFrames: 1,
BatchType: derive.SingularBatchType,
}
c.InitRatioCompressor(0.4)
return c
}
func TestChannelConfig_Check(t *testing.T) {
type test struct {
input func() ChannelConfig
assertion func(error)
}
tests := []test{
{
input: defaultTestChannelConfig,
assertion: func(output error) {
require.NoError(t, output)
},
},
{
input: func() ChannelConfig {
cfg := defaultTestChannelConfig()
cfg.ChannelTimeout = 0
cfg.SubSafetyMargin = 1
return cfg
},
assertion: func(output error) {
require.ErrorIs(t, output, ErrInvalidChannelTimeout)
},
},
}
for i := 0; i < derive.FrameV0OverHeadSize; i++ {
expectedErr := fmt.Sprintf("max frame size %d is less than the minimum 23", i)
i := i // need to udpate Go version...
tests = append(tests, test{
input: func() ChannelConfig {
cfg := defaultTestChannelConfig()
cfg.MaxFrameSize = uint64(i)
return cfg
},
assertion: func(output error) {
require.EqualError(t, output, expectedErr)
},
})
}
// Run the table tests
for _, test := range tests {
cfg := test.input()
test.assertion(cfg.Check())
}
}
// FuzzChannelConfig_CheckTimeout tests the [ChannelConfig.Check] function
// with fuzzing to make sure that a [ErrInvalidChannelTimeout] is thrown when
// the ChannelTimeout is less than the SubSafetyMargin.
func FuzzChannelConfig_CheckTimeout(f *testing.F) {
for i := range [10]int{} {
f.Add(uint64(i+1), uint64(i))
}
f.Fuzz(func(t *testing.T, channelTimeout uint64, subSafetyMargin uint64) {
// We only test where [ChannelTimeout] is less than the [SubSafetyMargin]
// So we cannot have [ChannelTimeout] be [math.MaxUint64]
if channelTimeout == math.MaxUint64 {
channelTimeout = math.MaxUint64 - 1
}
if subSafetyMargin <= channelTimeout {
subSafetyMargin = channelTimeout + 1
}
channelConfig := defaultTestChannelConfig()
channelConfig.ChannelTimeout = channelTimeout
channelConfig.SubSafetyMargin = subSafetyMargin
require.ErrorIs(t, channelConfig.Check(), ErrInvalidChannelTimeout)
})
}
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -20,6 +19,16 @@ import ( ...@@ -20,6 +19,16 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func channelManagerTestConfig(maxFrameSize uint64, batchType uint) ChannelConfig {
cfg := ChannelConfig{
MaxFrameSize: maxFrameSize,
TargetNumFrames: 1,
BatchType: batchType,
}
cfg.InitRatioCompressor(1)
return cfg
}
func TestChannelManagerBatchType(t *testing.T) { func TestChannelManagerBatchType(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
...@@ -84,18 +93,9 @@ func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) { ...@@ -84,18 +93,9 @@ func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) {
// detects a reorg even if it does not have any blocks inside it. // detects a reorg even if it does not have any blocks inside it.
func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) { func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) {
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, cfg := channelManagerTestConfig(120_000, batchType)
ChannelConfig{ cfg.CompressorConfig.TargetOutputSize = 1 // full on first block
MaxFrameSize: 120_000, m := NewChannelManager(log, metrics.NoopMetrics, cfg, &rollup.Config{})
CompressorConfig: compressor.Config{
TargetFrameSize: 1,
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
BatchType: batchType,
},
&rollup.Config{},
)
m.Clear(eth.BlockID{}) m.Clear(eth.BlockID{})
a := newMiniL2Block(0) a := newMiniL2Block(0)
...@@ -118,23 +118,13 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { ...@@ -118,23 +118,13 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
// Create a channel manager // Create a channel manager
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ cfg := channelManagerTestConfig(derive.FrameV0OverHeadSize+1, batchType)
// Need to set the channel timeout here so we don't clear pending // Need to set the channel timeout here so we don't clear pending
// channels on confirmation. This would result in [TxConfirmed] // channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and resetting the pendingChannels map // clearing confirmed transactions, and resetting the pendingChannels map
ChannelTimeout: 10, cfg.ChannelTimeout = 10
// Have to set the max frame size here otherwise the channel builder would not cfg.InitRatioCompressor(1)
// be able to output any frames m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
MaxFrameSize: 24,
CompressorConfig: compressor.Config{
TargetFrameSize: 24,
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
BatchType: batchType,
},
&defaultTestRollupConfig,
)
// Channel Manager state should be empty by default // Channel Manager state should be empty by default
require.Empty(m.blocks) require.Empty(m.blocks)
...@@ -203,18 +193,9 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) { ...@@ -203,18 +193,9 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LevelError) log := testlog.Logger(t, log.LevelError)
m := NewChannelManager(log, metrics.NoopMetrics, cfg := channelManagerTestConfig(120_000, batchType)
ChannelConfig{ cfg.CompressorConfig.TargetOutputSize = 1 // full on first block
MaxFrameSize: 120_000, m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
CompressorConfig: compressor.Config{
TargetFrameSize: 1,
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear(eth.BlockID{}) m.Clear(eth.BlockID{})
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID) a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
...@@ -252,15 +233,7 @@ func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) { ...@@ -252,15 +233,7 @@ func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) {
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ channelManagerTestConfig(10000, batchType),
MaxFrameSize: 100,
ChannelTimeout: 1000,
CompressorConfig: compressor.Config{
TargetFrameSize: 0,
ApproxComprRatio: 1.0,
},
BatchType: batchType,
},
&defaultTestRollupConfig, &defaultTestRollupConfig,
) )
m.Clear(eth.BlockID{}) m.Clear(eth.BlockID{})
...@@ -282,19 +255,10 @@ func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) { ...@@ -282,19 +255,10 @@ func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) {
func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) { func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) {
require := require.New(t) require := require.New(t)
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, cfg := channelManagerTestConfig(10000, batchType)
ChannelConfig{ cfg.CompressorConfig.TargetOutputSize = 1 // full on first block
MaxFrameSize: 100, cfg.ChannelTimeout = 1000
ChannelTimeout: 1000, m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
CompressorConfig: compressor.Config{
TargetFrameSize: 1,
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear(eth.BlockID{}) m.Clear(eth.BlockID{})
a := newMiniL2Block(0) a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash()) b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
...@@ -328,19 +292,9 @@ func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) { ...@@ -328,19 +292,9 @@ func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
// Example of different RNG seed that creates less than 2 frames: 1698700588902821588 // Example of different RNG seed that creates less than 2 frames: 1698700588902821588
rng := rand.New(rand.NewSource(123)) rng := rand.New(rand.NewSource(123))
log := testlog.Logger(t, log.LevelError) log := testlog.Logger(t, log.LevelError)
m := NewChannelManager(log, metrics.NoopMetrics, cfg := channelManagerTestConfig(10_000, batchType)
ChannelConfig{ cfg.ChannelTimeout = 1000
MaxFrameSize: 10_000, m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
ChannelTimeout: 1_000,
CompressorConfig: compressor.Config{
TargetNumFrames: 1,
TargetFrameSize: 10_000,
ApproxComprRatio: 1.0,
},
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear(eth.BlockID{}) m.Clear(eth.BlockID{})
numTx := 20 // Adjust number of txs to make 2 frames numTx := 20 // Adjust number of txs to make 2 frames
...@@ -386,20 +340,13 @@ func TestChannelManager_Close_PartiallyPendingChannel(t *testing.T) { ...@@ -386,20 +340,13 @@ func TestChannelManager_Close_PartiallyPendingChannel(t *testing.T) {
// Example of different RNG seed that creates less than 2 frames: 1698700588902821588 // Example of different RNG seed that creates less than 2 frames: 1698700588902821588
rng := rand.New(rand.NewSource(123)) rng := rand.New(rand.NewSource(123))
log := testlog.Logger(t, log.LevelError) log := testlog.Logger(t, log.LevelError)
const framesize = 2200 cfg := ChannelConfig{
m := NewChannelManager(log, metrics.NoopMetrics, MaxFrameSize: 2200,
ChannelConfig{ ChannelTimeout: 1000,
MaxFrameSize: framesize, TargetNumFrames: 100,
ChannelTimeout: 1000, }
CompressorConfig: compressor.Config{ cfg.InitNoneCompressor()
TargetNumFrames: 100, m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
TargetFrameSize: framesize,
ApproxComprRatio: 1.0,
Kind: "none",
},
},
&defaultTestRollupConfig,
)
m.Clear(eth.BlockID{}) m.Clear(eth.BlockID{})
numTx := 3 // Adjust number of txs to make 2 frames numTx := 3 // Adjust number of txs to make 2 frames
...@@ -446,38 +393,46 @@ func TestChannelManager_Close_PartiallyPendingChannel(t *testing.T) { ...@@ -446,38 +393,46 @@ func TestChannelManager_Close_PartiallyPendingChannel(t *testing.T) {
// have successfully landed on chain. // have successfully landed on chain.
func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) { func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(1357))
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, cfg := channelManagerTestConfig(100, batchType)
ChannelConfig{ cfg.TargetNumFrames = 1000
MaxFrameSize: 1000, cfg.InitNoneCompressor()
ChannelTimeout: 1000, m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
CompressorConfig: compressor.Config{
TargetNumFrames: 100,
TargetFrameSize: 1000,
ApproxComprRatio: 1.0,
},
BatchType: batchType,
}, &defaultTestRollupConfig,
)
m.Clear(eth.BlockID{}) m.Clear(eth.BlockID{})
a := derivetest.RandomL2BlockWithChainId(rng, 50000, defaultTestRollupConfig.L2ChainID) a := derivetest.RandomL2BlockWithChainId(rng, 1000, defaultTestRollupConfig.L2ChainID)
err := m.AddL2Block(a) err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block") require.NoError(err, "Failed to add L2 block")
txdata, err := m.TxData(eth.BlockID{}) drainTxData := func() (txdatas []txData) {
require.NoError(err, "Expected channel manager to produce valid tx data") for {
txdata, err := m.TxData(eth.BlockID{})
if err == io.EOF {
return
}
require.NoError(err, "Expected channel manager to produce valid tx data")
txdatas = append(txdatas, txdata)
}
}
txdatas := drainTxData()
require.NotEmpty(txdatas)
m.TxFailed(txdata.ID()) for _, txdata := range txdatas {
m.TxFailed(txdata.ID())
}
// Show that this data will continue to be emitted as long as the transaction // Show that this data will continue to be emitted as long as the transaction
// fails and the channel manager is not closed // fails and the channel manager is not closed
txdata, err = m.TxData(eth.BlockID{}) txdatas1 := drainTxData()
require.NoError(err, "Expected channel manager to re-attempt the failed transaction") require.NotEmpty(txdatas)
require.ElementsMatch(txdatas, txdatas1, "expected same txdatas on re-attempt")
m.TxFailed(txdata.ID()) for _, txdata := range txdatas1 {
m.TxFailed(txdata.ID())
}
require.NoError(m.Close(), "Expected to close channel manager gracefully") require.NoError(m.Close(), "Expected to close channel manager gracefully")
...@@ -487,16 +442,10 @@ func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) { ...@@ -487,16 +442,10 @@ func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
func TestChannelManager_ChannelCreation(t *testing.T) { func TestChannelManager_ChannelCreation(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit) l := testlog.Logger(t, log.LevelCrit)
maxChannelDuration := uint64(15) const maxChannelDuration = 15
cfg := ChannelConfig{ cfg := channelManagerTestConfig(1000, derive.SpanBatchType)
MaxChannelDuration: maxChannelDuration, cfg.MaxChannelDuration = maxChannelDuration
MaxFrameSize: 1000, cfg.InitNoneCompressor()
CompressorConfig: compressor.Config{
TargetNumFrames: 100,
TargetFrameSize: 1000,
ApproxComprRatio: 1.0,
},
}
for _, tt := range []struct { for _, tt := range []struct {
name string name string
......
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"io" "io"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -117,10 +116,8 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) { ...@@ -117,10 +116,8 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
const n = 6 const n = 6
lgr := testlog.Logger(t, log.LevelWarn) lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{ ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{
MultiFrameTxs: false, MultiFrameTxs: false,
CompressorConfig: compressor.Config{ TargetNumFrames: n,
TargetNumFrames: n,
},
}, &rollup.Config{}, latestL1BlockOrigin) }, &rollup.Config{}, latestL1BlockOrigin)
require.NoError(err) require.NoError(err)
chID := ch.ID() chID := ch.ID()
...@@ -157,10 +154,8 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) { ...@@ -157,10 +154,8 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) {
const n = 6 const n = 6
lgr := testlog.Logger(t, log.LevelWarn) lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{ ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{
MultiFrameTxs: true, MultiFrameTxs: true,
CompressorConfig: compressor.Config{ TargetNumFrames: n,
TargetNumFrames: n,
},
}, &rollup.Config{}, latestL1BlockOrigin) }, &rollup.Config{}, latestL1BlockOrigin)
require.NoError(err) require.NoError(err)
chID := ch.ID() chID := ch.ID()
......
...@@ -55,6 +55,18 @@ type CLIConfig struct { ...@@ -55,6 +55,18 @@ type CLIConfig struct {
// If using blobs, this setting is ignored and the max blob size is used. // If using blobs, this setting is ignored and the max blob size is used.
MaxL1TxSize uint64 MaxL1TxSize uint64
// The target number of frames to create per channel. Controls number of blobs
// per blob tx, if using Blob DA.
TargetNumFrames int
// ApproxComprRatio to assume (only [compressor.RatioCompressor]).
// Should be slightly smaller than average from experiments to avoid the
// chances of creating a small additional leftover frame.
ApproxComprRatio float64
// Type of compressor to use. Must be one of [compressor.KindKeys].
Compressor string
Stopped bool Stopped bool
BatchType uint BatchType uint
...@@ -70,13 +82,12 @@ type CLIConfig struct { ...@@ -70,13 +82,12 @@ type CLIConfig struct {
// ActiveSequencerCheckDuration is the duration between checks to determine the active sequencer endpoint. // ActiveSequencerCheckDuration is the duration between checks to determine the active sequencer endpoint.
ActiveSequencerCheckDuration time.Duration ActiveSequencerCheckDuration time.Duration
TxMgrConfig txmgr.CLIConfig TxMgrConfig txmgr.CLIConfig
LogConfig oplog.CLIConfig LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig MetricsConfig opmetrics.CLIConfig
PprofConfig oppprof.CLIConfig PprofConfig oppprof.CLIConfig
CompressorConfig compressor.CLIConfig RPC oprpc.CLIConfig
RPC oprpc.CLIConfig PlasmaDA plasma.CLIConfig
PlasmaDA plasma.CLIConfig
} }
func (c *CLIConfig) Check() error { func (c *CLIConfig) Check() error {
...@@ -98,13 +109,16 @@ func (c *CLIConfig) Check() error { ...@@ -98,13 +109,16 @@ func (c *CLIConfig) Check() error {
if c.MaxL1TxSize <= 1 { if c.MaxL1TxSize <= 1 {
return errors.New("MaxL1TxSize must be greater than 1") return errors.New("MaxL1TxSize must be greater than 1")
} }
if target, max := c.CompressorConfig.TargetL1TxSizeBytes, c.MaxL1TxSize; target > max { if c.TargetNumFrames < 1 {
return fmt.Errorf("target tx size > max, %d > %d", target, max) return errors.New("TargetNumFrames must be at least 1")
}
if c.Compressor == compressor.RatioKind && (c.ApproxComprRatio <= 0 || c.ApproxComprRatio > 1) {
return fmt.Errorf("invalid ApproxComprRatio %v for ratio compressor", c.ApproxComprRatio)
} }
if c.BatchType > 1 { if c.BatchType > 1 {
return fmt.Errorf("unknown batch type: %v", c.BatchType) return fmt.Errorf("unknown batch type: %v", c.BatchType)
} }
if c.DataAvailabilityType == flags.BlobsType && c.CompressorConfig.TargetNumFrames > 6 { if c.DataAvailabilityType == flags.BlobsType && c.TargetNumFrames > 6 {
return errors.New("too many frames for blob transactions, max 6") return errors.New("too many frames for blob transactions, max 6")
} }
if !flags.ValidDataAvailabilityType(c.DataAvailabilityType) { if !flags.ValidDataAvailabilityType(c.DataAvailabilityType) {
...@@ -139,6 +153,9 @@ func NewConfig(ctx *cli.Context) *CLIConfig { ...@@ -139,6 +153,9 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
MaxPendingTransactions: ctx.Uint64(flags.MaxPendingTransactionsFlag.Name), MaxPendingTransactions: ctx.Uint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name), MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name), MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.Int(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.Float64(flags.ApproxComprRatioFlag.Name),
Compressor: ctx.String(flags.CompressorFlag.Name),
Stopped: ctx.Bool(flags.StoppedFlag.Name), Stopped: ctx.Bool(flags.StoppedFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name), BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
DataAvailabilityType: flags.DataAvailabilityType(ctx.String(flags.DataAvailabilityTypeFlag.Name)), DataAvailabilityType: flags.DataAvailabilityType(ctx.String(flags.DataAvailabilityTypeFlag.Name)),
...@@ -147,7 +164,6 @@ func NewConfig(ctx *cli.Context) *CLIConfig { ...@@ -147,7 +164,6 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
LogConfig: oplog.ReadCLIConfig(ctx), LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx), MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx), PprofConfig: oppprof.ReadCLIConfig(ctx),
CompressorConfig: compressor.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx), RPC: oprpc.ReadCLIConfig(ctx),
PlasmaDA: plasma.ReadCLIConfig(ctx), PlasmaDA: plasma.ReadCLIConfig(ctx),
} }
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/ethereum-optimism/optimism/op-batcher/batcher" "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-service/log" "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics" "github.com/ethereum-optimism/optimism/op-service/metrics"
...@@ -24,6 +25,8 @@ func validBatcherConfig() batcher.CLIConfig { ...@@ -24,6 +25,8 @@ func validBatcherConfig() batcher.CLIConfig {
PollInterval: time.Second, PollInterval: time.Second,
MaxPendingTransactions: 0, MaxPendingTransactions: 0,
MaxL1TxSize: 10, MaxL1TxSize: 10,
TargetNumFrames: 1,
Compressor: "shadow",
Stopped: false, Stopped: false,
BatchType: 0, BatchType: 0,
DataAvailabilityType: flags.CalldataType, DataAvailabilityType: flags.CalldataType,
...@@ -87,6 +90,27 @@ func TestBatcherConfig(t *testing.T) { ...@@ -87,6 +90,27 @@ func TestBatcherConfig(t *testing.T) {
override: func(c *batcher.CLIConfig) { c.DataAvailabilityType = "foo" }, override: func(c *batcher.CLIConfig) { c.DataAvailabilityType = "foo" },
errString: "unknown data availability type: \"foo\"", errString: "unknown data availability type: \"foo\"",
}, },
{
name: "zero TargetNumFrames",
override: func(c *batcher.CLIConfig) { c.TargetNumFrames = 0 },
errString: "TargetNumFrames must be at least 1",
},
{
name: "larger 6 TargetNumFrames for blobs",
override: func(c *batcher.CLIConfig) {
c.TargetNumFrames = 7
c.DataAvailabilityType = flags.BlobsType
},
errString: "too many frames for blob transactions, max 6",
},
{
name: "invalid compr ratio for ratio compressor",
override: func(c *batcher.CLIConfig) {
c.ApproxComprRatio = 4.2
c.Compressor = compressor.RatioKind
},
errString: "invalid ApproxComprRatio 4.2 for ratio compressor",
},
} }
for _, test := range tests { for _, test := range tests {
......
...@@ -13,7 +13,6 @@ import ( ...@@ -13,7 +13,6 @@ import (
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-batcher/rpc"
...@@ -187,22 +186,23 @@ func (bs *BatcherService) initRollupConfig(ctx context.Context) error { ...@@ -187,22 +186,23 @@ func (bs *BatcherService) initRollupConfig(ctx context.Context) error {
} }
func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
bs.ChannelConfig = ChannelConfig{ cc := ChannelConfig{
SeqWindowSize: bs.RollupConfig.SeqWindowSize, SeqWindowSize: bs.RollupConfig.SeqWindowSize,
ChannelTimeout: bs.RollupConfig.ChannelTimeout, ChannelTimeout: bs.RollupConfig.ChannelTimeout,
MaxChannelDuration: cfg.MaxChannelDuration, MaxChannelDuration: cfg.MaxChannelDuration,
MaxFrameSize: cfg.MaxL1TxSize, // reset for blobs MaxFrameSize: cfg.MaxL1TxSize - 1, // account for version byte prefix; reset for blobs
TargetNumFrames: cfg.TargetNumFrames,
SubSafetyMargin: cfg.SubSafetyMargin, SubSafetyMargin: cfg.SubSafetyMargin,
CompressorConfig: cfg.CompressorConfig.Config(),
BatchType: cfg.BatchType, BatchType: cfg.BatchType,
} }
switch cfg.DataAvailabilityType { switch cfg.DataAvailabilityType {
case flags.BlobsType: case flags.BlobsType:
if !cfg.TestUseMaxTxSizeForBlobs { if !cfg.TestUseMaxTxSizeForBlobs {
bs.ChannelConfig.MaxFrameSize = eth.MaxBlobDataSize // account for version byte prefix
cc.MaxFrameSize = eth.MaxBlobDataSize - 1
} }
bs.ChannelConfig.MultiFrameTxs = true cc.MultiFrameTxs = true
bs.UseBlobs = true bs.UseBlobs = true
case flags.CalldataType: case flags.CalldataType:
bs.UseBlobs = false bs.UseBlobs = false
...@@ -210,16 +210,11 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { ...@@ -210,16 +210,11 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType) return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType)
} }
if bs.UsePlasma && bs.ChannelConfig.MaxFrameSize > plasma.MaxInputSize { if bs.UsePlasma && cc.MaxFrameSize > plasma.MaxInputSize {
return fmt.Errorf("max frame size %d exceeds plasma max input size %d", bs.ChannelConfig.MaxFrameSize, plasma.MaxInputSize) return fmt.Errorf("max frame size %d exceeds plasma max input size %d", cc.MaxFrameSize, plasma.MaxInputSize)
} }
bs.ChannelConfig.MaxFrameSize-- // subtract 1 byte for version cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor)
if bs.ChannelConfig.CompressorConfig.Kind == compressor.ShadowKind {
// shadow compressor guarantees to not go over target size, so can use max size
bs.ChannelConfig.CompressorConfig.TargetFrameSize = bs.ChannelConfig.MaxFrameSize
}
if bs.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { if bs.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
bs.Log.Error("Cannot use Blob data before Ecotone!") // log only, the batcher may not be actively running. bs.Log.Error("Cannot use Blob data before Ecotone!") // log only, the batcher may not be actively running.
...@@ -228,16 +223,20 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { ...@@ -228,16 +223,20 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
bs.Log.Warn("Ecotone upgrade is active, but batcher is not configured to use Blobs!") bs.Log.Warn("Ecotone upgrade is active, but batcher is not configured to use Blobs!")
} }
if err := bs.ChannelConfig.Check(); err != nil { if err := cc.Check(); err != nil {
return fmt.Errorf("invalid channel configuration: %w", err) return fmt.Errorf("invalid channel configuration: %w", err)
} }
bs.Log.Info("Initialized channel-config", bs.Log.Info("Initialized channel-config",
"use_blobs", bs.UseBlobs, "use_blobs", bs.UseBlobs,
"max_frame_size", bs.ChannelConfig.MaxFrameSize, "use_plasma", bs.UsePlasma,
"max_channel_duration", bs.ChannelConfig.MaxChannelDuration, "max_frame_size", cc.MaxFrameSize,
"channel_timeout", bs.ChannelConfig.ChannelTimeout, "target_num_frames", cc.TargetNumFrames,
"batch_type", bs.ChannelConfig.BatchType, "compressor", cc.CompressorConfig.Kind,
"sub_safety_margin", bs.ChannelConfig.SubSafetyMargin) "max_channel_duration", cc.MaxChannelDuration,
"channel_timeout", cc.ChannelTimeout,
"batch_type", cc.BatchType,
"sub_safety_margin", cc.SubSafetyMargin)
bs.ChannelConfig = cc
return nil return nil
} }
......
package compressor
import (
"strings"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/urfave/cli/v2"
)
const (
TargetL1TxSizeBytesFlagName = "target-l1-tx-size-bytes"
TargetNumFramesFlagName = "target-num-frames"
ApproxComprRatioFlagName = "approx-compr-ratio"
KindFlagName = "compressor"
)
func CLIFlags(envPrefix string) []cli.Flag {
return []cli.Flag{
&cli.Uint64Flag{
Name: TargetL1TxSizeBytesFlagName,
Usage: "The target size of a batch tx submitted to L1.",
Value: 100_000,
EnvVars: opservice.PrefixEnvVar(envPrefix, "TARGET_L1_TX_SIZE_BYTES"),
},
&cli.IntFlag{
Name: TargetNumFramesFlagName,
Usage: "The target number of frames to create per channel",
Value: 1,
EnvVars: opservice.PrefixEnvVar(envPrefix, "TARGET_NUM_FRAMES"),
},
&cli.Float64Flag{
Name: ApproxComprRatioFlagName,
Usage: "The approximate compression ratio (<= 1.0)",
Value: 0.4,
EnvVars: opservice.PrefixEnvVar(envPrefix, "APPROX_COMPR_RATIO"),
},
&cli.StringFlag{
Name: KindFlagName,
Usage: "The type of compressor. Valid options: " + strings.Join(KindKeys, ", "),
EnvVars: opservice.PrefixEnvVar(envPrefix, "COMPRESSOR"),
Value: ShadowKind,
},
}
}
type CLIConfig struct {
// TargetL1TxSizeBytes to target when creating channel frames. Note that if the
// realized compression ratio is worse than the approximate, more frames may
// actually be created. This also depends on how close the target is to the
// max frame size.
TargetL1TxSizeBytes uint64
// TargetNumFrames to create in this channel. If the realized compression ratio
// is worse than approxComprRatio, additional leftover frame(s) might get created.
TargetNumFrames int
// ApproxComprRatio to assume. Should be slightly smaller than average from
// experiments to avoid the chances of creating a small additional leftover frame.
ApproxComprRatio float64
// Type of compressor to use. Must be one of KindKeys.
Kind string
}
func (c *CLIConfig) Config() Config {
return Config{
TargetFrameSize: c.TargetL1TxSizeBytes - 1, // subtract 1 byte for version
TargetNumFrames: c.TargetNumFrames,
ApproxComprRatio: c.ApproxComprRatio,
Kind: c.Kind,
}
}
func ReadCLIConfig(ctx *cli.Context) CLIConfig {
return CLIConfig{
Kind: ctx.String(KindFlagName),
TargetL1TxSizeBytes: ctx.Uint64(TargetL1TxSizeBytesFlagName),
TargetNumFrames: ctx.Int(TargetNumFramesFlagName),
ApproxComprRatio: ctx.Float64(ApproxComprRatioFlagName),
}
}
...@@ -10,6 +10,10 @@ const ( ...@@ -10,6 +10,10 @@ const (
RatioKind = "ratio" RatioKind = "ratio"
ShadowKind = "shadow" ShadowKind = "shadow"
NoneKind = "none" NoneKind = "none"
// CloseOverheadZlib is the number of final bytes a [zlib.Writer] call writes
// to the output buffer.
CloseOverheadZlib = 9
) )
var Kinds = map[string]FactoryFunc{ var Kinds = map[string]FactoryFunc{
......
package compressor
import (
"bytes"
"compress/zlib"
"io"
"math/rand"
"testing"
"github.com/stretchr/testify/require"
)
func TestCloseOverheadZlib(t *testing.T) {
var buf bytes.Buffer
z := zlib.NewWriter(&buf)
rng := rand.New(rand.NewSource(420))
_, err := io.CopyN(z, rng, 0xff)
require.NoError(t, err)
require.NoError(t, z.Flush())
fsize := buf.Len()
require.NoError(t, z.Close())
csize := buf.Len()
require.Equal(t, CloseOverheadZlib, csize-fsize)
}
...@@ -5,14 +5,13 @@ import ( ...@@ -5,14 +5,13 @@ import (
) )
type Config struct { type Config struct {
// TargetFrameSize to target when creating channel frames. // TargetOutputSize is the target size that the compressed data should reach.
// It is guaranteed that a frame will never be larger. // The shadow compressor guarantees that the compressed data stays below
TargetFrameSize uint64 // this bound. The ratio compressor might go over.
// TargetNumFrames to create in this channel. If the first block that is added TargetOutputSize uint64
// doesn't fit within a single frame, more frames might be created. // ApproxComprRatio to assume (only ratio compressor). Should be slightly smaller
TargetNumFrames int // than average from experiments to avoid the chances of creating a small
// ApproxComprRatio to assume. Should be slightly smaller than average from // additional leftover frame.
// experiments to avoid the chances of creating a small additional leftover frame.
ApproxComprRatio float64 ApproxComprRatio float64
// Kind of compressor to use. Must be one of KindKeys. If unset, NewCompressor // Kind of compressor to use. Must be one of KindKeys. If unset, NewCompressor
// will default to RatioKind. // will default to RatioKind.
......
...@@ -44,7 +44,7 @@ func (t *NonCompressor) Write(p []byte) (int, error) { ...@@ -44,7 +44,7 @@ func (t *NonCompressor) Write(p []byte) (int, error) {
if err != nil { if err != nil {
return 0, err return 0, err
} }
if uint64(t.buf.Len()) > t.config.TargetFrameSize*uint64(t.config.TargetNumFrames) { if uint64(t.buf.Len()) > t.config.TargetOutputSize {
t.fullErr = derive.CompressorFullErr t.fullErr = derive.CompressorFullErr
} }
return n, nil return n, nil
......
...@@ -10,8 +10,7 @@ import ( ...@@ -10,8 +10,7 @@ import (
func TestNonCompressor(t *testing.T) { func TestNonCompressor(t *testing.T) {
require := require.New(t) require := require.New(t)
c, err := NewNonCompressor(Config{ c, err := NewNonCompressor(Config{
TargetFrameSize: 1000, TargetOutputSize: 100000,
TargetNumFrames: 100,
}) })
require.NoError(err) require.NoError(err)
......
...@@ -74,7 +74,7 @@ func (t *RatioCompressor) FullErr() error { ...@@ -74,7 +74,7 @@ func (t *RatioCompressor) FullErr() error {
// InputThreshold calculates the input data threshold in bytes from the given // InputThreshold calculates the input data threshold in bytes from the given
// parameters. // parameters.
func (t *RatioCompressor) InputThreshold() uint64 { func (t *RatioCompressor) InputThreshold() uint64 {
return uint64(float64(t.config.TargetNumFrames) * float64(t.config.TargetFrameSize) / t.config.ApproxComprRatio) return uint64(float64(t.config.TargetOutputSize) / t.config.ApproxComprRatio)
} }
// inputTargetReached says whether the target amount of input data has been // inputTargetReached says whether the target amount of input data has been
......
package compressor_test package compressor_test
import ( import (
"fmt"
"math" "math"
"testing" "testing"
...@@ -8,106 +9,46 @@ import ( ...@@ -8,106 +9,46 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// TestInputThreshold tests the [ChannelConfig.InputThreshold] func TestChannelConfig_InputThreshold(t *testing.T) {
// function using a table-driven testing approach.
func TestInputThreshold(t *testing.T) {
type testInput struct {
TargetFrameSize uint64
TargetNumFrames int
ApproxComprRatio float64
}
// Construct test cases that test the boundary conditions
tests := []struct { tests := []struct {
input testInput targetOutputSize uint64
assertion func(uint64) approxComprRatio float64
expInputThreshold uint64
assertion func(uint64) // optional, for more complex assertion
}{ }{
{ {
input: testInput{ targetOutputSize: 1,
TargetFrameSize: 1, approxComprRatio: 0.4,
TargetNumFrames: 1, expInputThreshold: 2,
ApproxComprRatio: 0.4,
},
assertion: func(output uint64) {
require.Equal(t, uint64(2), output)
},
}, },
{ {
input: testInput{ targetOutputSize: 1,
TargetFrameSize: 1, approxComprRatio: 1,
TargetNumFrames: 100000, expInputThreshold: 1,
ApproxComprRatio: 0.4,
},
assertion: func(output uint64) {
require.Equal(t, uint64(250_000), output)
},
}, },
{ {
input: testInput{ targetOutputSize: 100_000,
TargetFrameSize: 1, approxComprRatio: 0.4,
TargetNumFrames: 1, expInputThreshold: 250_000,
ApproxComprRatio: 1,
},
assertion: func(output uint64) {
require.Equal(t, uint64(1), output)
},
}, },
{ {
input: testInput{ targetOutputSize: 1,
TargetFrameSize: 1, approxComprRatio: 0.4,
TargetNumFrames: 1, expInputThreshold: 2,
ApproxComprRatio: 2,
},
assertion: func(output uint64) {
require.Equal(t, uint64(0), output)
},
}, },
{ {
input: testInput{ targetOutputSize: 100_000,
TargetFrameSize: 100000, approxComprRatio: 0.4,
TargetNumFrames: 1, expInputThreshold: 250_000,
ApproxComprRatio: 0.4,
},
assertion: func(output uint64) {
require.Equal(t, uint64(250_000), output)
},
}, },
{ {
input: testInput{ targetOutputSize: 1,
TargetFrameSize: 1, approxComprRatio: 0.000001,
TargetNumFrames: 100000, expInputThreshold: 1_000_000,
ApproxComprRatio: 0.4,
},
assertion: func(output uint64) {
require.Equal(t, uint64(250_000), output)
},
}, },
{ {
input: testInput{ targetOutputSize: 0,
TargetFrameSize: 100000, approxComprRatio: 0,
TargetNumFrames: 100000,
ApproxComprRatio: 0.4,
},
assertion: func(output uint64) {
require.Equal(t, uint64(25_000_000_000), output)
},
},
{
input: testInput{
TargetFrameSize: 1,
TargetNumFrames: 1,
ApproxComprRatio: 0.000001,
},
assertion: func(output uint64) {
require.Equal(t, uint64(1_000_000), output)
},
},
{
input: testInput{
TargetFrameSize: 0,
TargetNumFrames: 0,
ApproxComprRatio: 0,
},
assertion: func(output uint64) { assertion: func(output uint64) {
// Need to allow for NaN depending on the machine architecture // Need to allow for NaN depending on the machine architecture
require.True(t, output == uint64(0) || output == uint64(math.NaN())) require.True(t, output == uint64(0) || output == uint64(math.NaN()))
...@@ -116,14 +57,19 @@ func TestInputThreshold(t *testing.T) { ...@@ -116,14 +57,19 @@ func TestInputThreshold(t *testing.T) {
} }
// Validate each test case // Validate each test case
for _, tt := range tests { for i, tt := range tests {
comp, err := compressor.NewRatioCompressor(compressor.Config{ t.Run(fmt.Sprintf("test-%d", i), func(t *testing.T) {
TargetFrameSize: tt.input.TargetFrameSize, comp, err := compressor.NewRatioCompressor(compressor.Config{
TargetNumFrames: tt.input.TargetNumFrames, TargetOutputSize: tt.targetOutputSize,
ApproxComprRatio: tt.input.ApproxComprRatio, ApproxComprRatio: tt.approxComprRatio,
})
require.NoError(t, err)
got := comp.(*compressor.RatioCompressor).InputThreshold()
if tt.assertion != nil {
tt.assertion(got)
} else {
require.Equal(t, tt.expInputThreshold, got)
}
}) })
require.NoError(t, err)
got := comp.(*compressor.RatioCompressor).InputThreshold()
tt.assertion(got)
} }
} }
...@@ -67,21 +67,19 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) { ...@@ -67,21 +67,19 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) {
return 0, err return 0, err
} }
newBound := t.bound + uint64(len(p)) newBound := t.bound + uint64(len(p))
cap := t.config.TargetFrameSize * uint64(t.config.TargetNumFrames) if newBound > t.config.TargetOutputSize {
if newBound > cap {
// Do not flush the buffer unless there's some chance we will be over the size limit. // Do not flush the buffer unless there's some chance we will be over the size limit.
// This reduces CPU but more importantly it makes the shadow compression ratio more // This reduces CPU but more importantly it makes the shadow compression ratio more
// closely reflect the ultimate compression ratio. // closely reflect the ultimate compression ratio.
err = t.shadowCompress.Flush() if err = t.shadowCompress.Flush(); err != nil {
if err != nil {
return 0, err return 0, err
} }
newBound = uint64(t.shadowBuf.Len()) + 4 // + 4 is to account for the digest written on close() newBound = uint64(t.shadowBuf.Len()) + CloseOverheadZlib
if newBound > cap { if newBound > t.config.TargetOutputSize {
t.fullErr = derive.CompressorFullErr t.fullErr = derive.CompressorFullErr
if t.Len() > 0 { if t.Len() > 0 {
// only return an error if we've already written data to this compressor before // only return an error if we've already written data to this compressor before
// (otherwise individual blocks over the target would never be written) // (otherwise single blocks over the target would never be written)
return 0, t.fullErr return 0, t.fullErr
} }
} }
......
...@@ -11,55 +11,49 @@ import ( ...@@ -11,55 +11,49 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var r *rand.Rand var r = rand.New(rand.NewSource(99))
func init() { func randomBytes(length int) []byte {
r = rand.New(rand.NewSource(99))
}
func randomBytes(t *testing.T, length int) []byte {
b := make([]byte, length) b := make([]byte, length)
_, err := r.Read(b) _, err := r.Read(b)
require.NoError(t, err) // Rand.Read always returns nil error
if err != nil {
panic(err)
}
return b return b
} }
func TestShadowCompressor(t *testing.T) { func TestShadowCompressor(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
targetFrameSize uint64 targetOutputSize uint64
targetNumFrames int data [][]byte
data [][]byte errs []error
errs []error fullErr error
fullErr error
}{{ }{{
name: "no data", name: "no data",
targetFrameSize: 1, targetOutputSize: 1 + derive.FrameV0OverHeadSize,
targetNumFrames: 1, data: [][]byte{},
data: [][]byte{}, errs: []error{},
errs: []error{}, fullErr: nil,
fullErr: nil,
}, { }, {
name: "large first block", name: "large first block",
targetFrameSize: 1, targetOutputSize: 1 + derive.FrameV0OverHeadSize,
targetNumFrames: 1, data: [][]byte{bytes.Repeat([]byte{0}, 1024)},
data: [][]byte{bytes.Repeat([]byte{0}, 1024)}, errs: []error{nil},
errs: []error{nil}, fullErr: derive.CompressorFullErr,
fullErr: derive.CompressorFullErr,
}, { }, {
name: "large second block", name: "large second block",
targetFrameSize: 1, targetOutputSize: 1 + derive.FrameV0OverHeadSize,
targetNumFrames: 1, data: [][]byte{bytes.Repeat([]byte{0}, 512), bytes.Repeat([]byte{0}, 1024)},
data: [][]byte{bytes.Repeat([]byte{0}, 512), bytes.Repeat([]byte{0}, 1024)}, errs: []error{nil, derive.CompressorFullErr},
errs: []error{nil, derive.CompressorFullErr}, fullErr: derive.CompressorFullErr,
fullErr: derive.CompressorFullErr,
}, { }, {
name: "random data", name: "random data",
targetFrameSize: 1 << 17, targetOutputSize: 1 << 17,
targetNumFrames: 1, data: [][]byte{randomBytes((1 << 17) - 1000), randomBytes(512), randomBytes(512)},
data: [][]byte{randomBytes(t, (1<<17)-1000), randomBytes(t, 512), randomBytes(t, 512)}, errs: []error{nil, nil, derive.CompressorFullErr},
errs: []error{nil, nil, derive.CompressorFullErr}, fullErr: derive.CompressorFullErr,
fullErr: derive.CompressorFullErr,
}} }}
for _, test := range tests { for _, test := range tests {
test := test test := test
...@@ -68,8 +62,7 @@ func TestShadowCompressor(t *testing.T) { ...@@ -68,8 +62,7 @@ func TestShadowCompressor(t *testing.T) {
require.Equal(t, len(test.errs), len(test.data), "invalid test case: len(data) != len(errs)") require.Equal(t, len(test.errs), len(test.data), "invalid test case: len(data) != len(errs)")
sc, err := NewShadowCompressor(Config{ sc, err := NewShadowCompressor(Config{
TargetFrameSize: test.targetFrameSize, TargetOutputSize: test.targetOutputSize,
TargetNumFrames: test.targetNumFrames,
}) })
require.NoError(t, err) require.NoError(t, err)
...@@ -118,22 +111,21 @@ func TestShadowCompressor(t *testing.T) { ...@@ -118,22 +111,21 @@ func TestShadowCompressor(t *testing.T) {
// TestBoundInaccruateForLargeRandomData documents where our bounding heuristic starts to fail // TestBoundInaccruateForLargeRandomData documents where our bounding heuristic starts to fail
// (writing at least 128k of random data) // (writing at least 128k of random data)
func TestBoundInaccurateForLargeRandomData(t *testing.T) { func TestBoundInaccurateForLargeRandomData(t *testing.T) {
var sizeLimit int = 1 << 17 const sizeLimit = 1 << 17
sc, err := NewShadowCompressor(Config{ sc, err := NewShadowCompressor(Config{
TargetFrameSize: uint64(sizeLimit + 100), TargetOutputSize: sizeLimit + 100,
TargetNumFrames: 1,
}) })
require.NoError(t, err) require.NoError(t, err)
_, err = sc.Write(randomBytes(t, sizeLimit+1)) _, err = sc.Write(randomBytes(sizeLimit + 1))
require.NoError(t, err) require.NoError(t, err)
err = sc.Close() err = sc.Close()
require.NoError(t, err) require.NoError(t, err)
require.Greater(t, uint64(sc.Len()), sc.(*ShadowCompressor).bound) require.Greater(t, uint64(sc.Len()), sc.(*ShadowCompressor).bound)
sc.Reset() sc.Reset()
_, err = sc.Write(randomBytes(t, sizeLimit)) _, err = sc.Write(randomBytes(sizeLimit))
require.NoError(t, err) require.NoError(t, err)
err = sc.Close() err = sc.Close()
require.NoError(t, err) require.NoError(t, err)
......
...@@ -2,9 +2,11 @@ package flags ...@@ -2,9 +2,11 @@ package flags
import ( import (
"fmt" "fmt"
"strings"
"time" "time"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/exp/slices"
"github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
plasma "github.com/ethereum-optimism/optimism/op-plasma" plasma "github.com/ethereum-optimism/optimism/op-plasma"
...@@ -69,20 +71,45 @@ var ( ...@@ -69,20 +71,45 @@ var (
} }
MaxL1TxSizeBytesFlag = &cli.Uint64Flag{ MaxL1TxSizeBytesFlag = &cli.Uint64Flag{
Name: "max-l1-tx-size-bytes", Name: "max-l1-tx-size-bytes",
Usage: "The maximum size of a batch tx submitted to L1.", Usage: "The maximum size of a batch tx submitted to L1. Ignored for blobs, where max blob size will be used.",
Value: 120_000, Value: 120_000, // will be overwritten to max for blob da-type
EnvVars: prefixEnvVars("MAX_L1_TX_SIZE_BYTES"), EnvVars: prefixEnvVars("MAX_L1_TX_SIZE_BYTES"),
} }
TargetNumFramesFlag = &cli.IntFlag{
Name: "target-num-frames",
Usage: "The target number of frames to create per channel. Controls number of blobs per blob tx, if using Blob DA.",
Value: 1,
EnvVars: prefixEnvVars("TARGET_NUM_FRAMES"),
}
ApproxComprRatioFlag = &cli.Float64Flag{
Name: "approx-compr-ratio",
Usage: "The approximate compression ratio (<= 1.0). Only relevant for ratio compressor.",
Value: 0.6,
EnvVars: prefixEnvVars("APPROX_COMPR_RATIO"),
}
CompressorFlag = &cli.StringFlag{
Name: "compressor",
Usage: "The type of compressor. Valid options: " + strings.Join(compressor.KindKeys, ", "),
EnvVars: prefixEnvVars("COMPRESSOR"),
Value: compressor.ShadowKind,
Action: func(_ *cli.Context, s string) error {
if !slices.Contains(compressor.KindKeys, s) {
return fmt.Errorf("unsupported compressor: %s", s)
}
return nil
},
}
StoppedFlag = &cli.BoolFlag{ StoppedFlag = &cli.BoolFlag{
Name: "stopped", Name: "stopped",
Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC", Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC",
EnvVars: prefixEnvVars("STOPPED"), EnvVars: prefixEnvVars("STOPPED"),
} }
BatchTypeFlag = &cli.UintFlag{ BatchTypeFlag = &cli.UintFlag{
Name: "batch-type", Name: "batch-type",
Usage: "The batch type. 0 for SingularBatch and 1 for SpanBatch.", Usage: "The batch type. 0 for SingularBatch and 1 for SpanBatch.",
Value: 0, Value: 0,
EnvVars: prefixEnvVars("BATCH_TYPE"), EnvVars: prefixEnvVars("BATCH_TYPE"),
DefaultText: "singular",
} }
DataAvailabilityTypeFlag = &cli.GenericFlag{ DataAvailabilityTypeFlag = &cli.GenericFlag{
Name: "data-availability-type", Name: "data-availability-type",
...@@ -116,6 +143,9 @@ var optionalFlags = []cli.Flag{ ...@@ -116,6 +143,9 @@ var optionalFlags = []cli.Flag{
MaxPendingTransactionsFlag, MaxPendingTransactionsFlag,
MaxChannelDurationFlag, MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag, MaxL1TxSizeBytesFlag,
TargetNumFramesFlag,
ApproxComprRatioFlag,
CompressorFlag,
StoppedFlag, StoppedFlag,
SequencerHDPathFlag, SequencerHDPathFlag,
BatchTypeFlag, BatchTypeFlag,
...@@ -129,7 +159,6 @@ func init() { ...@@ -129,7 +159,6 @@ func init() {
optionalFlags = append(optionalFlags, opmetrics.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, opmetrics.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, oppprof.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, oppprof.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, txmgr.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, txmgr.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, compressor.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, plasma.CLIFlags(EnvVarPrefix, "")...) optionalFlags = append(optionalFlags, plasma.CLIFlags(EnvVarPrefix, "")...)
Flags = append(requiredFlags, optionalFlags...) Flags = append(requiredFlags, optionalFlags...)
......
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags" batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
...@@ -189,8 +190,7 @@ func (s *L2Batcher) Buffer(t Testing) error { ...@@ -189,8 +190,7 @@ func (s *L2Batcher) Buffer(t Testing) error {
ch, err = NewGarbageChannelOut(s.l2BatcherCfg.GarbageCfg) ch, err = NewGarbageChannelOut(s.l2BatcherCfg.GarbageCfg)
} else { } else {
c, e := compressor.NewRatioCompressor(compressor.Config{ c, e := compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: s.l2BatcherCfg.MaxL1TxSize, TargetOutputSize: batcher.MaxDataSize(1, s.l2BatcherCfg.MaxL1TxSize),
TargetNumFrames: 1,
ApproxComprRatio: 1, ApproxComprRatio: 1,
}) })
require.NoError(t, e, "failed to create compressor") require.NoError(t, e, "failed to create compressor")
......
...@@ -212,8 +212,7 @@ func TestBackupUnsafe(gt *testing.T) { ...@@ -212,8 +212,7 @@ func TestBackupUnsafe(gt *testing.T) {
require.Equal(t, verifier.L2Safe().Number, uint64(0)) require.Equal(t, verifier.L2Safe().Number, uint64(0))
c, e := compressor.NewRatioCompressor(compressor.Config{ c, e := compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: 128_000, TargetOutputSize: 128_000,
TargetNumFrames: 1,
ApproxComprRatio: 1, ApproxComprRatio: 1,
}) })
require.NoError(t, e) require.NoError(t, e)
...@@ -384,8 +383,7 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) { ...@@ -384,8 +383,7 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) {
require.Equal(t, verifier.L2Safe().Number, uint64(0)) require.Equal(t, verifier.L2Safe().Number, uint64(0))
c, e := compressor.NewRatioCompressor(compressor.Config{ c, e := compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: 128_000, TargetOutputSize: 128_000,
TargetNumFrames: 1,
ApproxComprRatio: 1, ApproxComprRatio: 1,
}) })
require.NoError(t, e) require.NoError(t, e)
...@@ -532,8 +530,7 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) { ...@@ -532,8 +530,7 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) {
require.Equal(t, verifier.L2Safe().Number, uint64(0)) require.Equal(t, verifier.L2Safe().Number, uint64(0))
c, e := compressor.NewRatioCompressor(compressor.Config{ c, e := compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: 128_000, TargetOutputSize: 128_000,
TargetNumFrames: 1,
ApproxComprRatio: 1, ApproxComprRatio: 1,
}) })
require.NoError(t, e) require.NoError(t, e)
...@@ -870,8 +867,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) { ...@@ -870,8 +867,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) {
verifier.ActL2PipelineFull(t) verifier.ActL2PipelineFull(t)
c, e := compressor.NewRatioCompressor(compressor.Config{ c, e := compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: 128_000, TargetOutputSize: 128_000,
TargetNumFrames: 1,
ApproxComprRatio: 1, ApproxComprRatio: 1,
}) })
require.NoError(t, e) require.NoError(t, e)
...@@ -919,8 +915,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) { ...@@ -919,8 +915,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) {
// Create new span batch channel // Create new span batch channel
c, e = compressor.NewRatioCompressor(compressor.Config{ c, e = compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: 128_000, TargetOutputSize: 128_000,
TargetNumFrames: 1,
ApproxComprRatio: 1, ApproxComprRatio: 1,
}) })
require.NoError(t, e) require.NoError(t, e)
......
...@@ -15,7 +15,6 @@ import ( ...@@ -15,7 +15,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags" batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags"
con "github.com/ethereum-optimism/optimism/op-conductor/conductor" con "github.com/ethereum-optimism/optimism/op-conductor/conductor"
conrpc "github.com/ethereum-optimism/optimism/op-conductor/rpc" conrpc "github.com/ethereum-optimism/optimism/op-conductor/rpc"
...@@ -226,15 +225,12 @@ func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) { ...@@ -226,15 +225,12 @@ func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) {
RollupRpc: rollupRpc, RollupRpc: rollupRpc,
MaxPendingTransactions: 0, MaxPendingTransactions: 0,
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 240_000, MaxL1TxSize: 120_000,
CompressorConfig: compressor.CLIConfig{ TargetNumFrames: 1,
TargetL1TxSizeBytes: sys.Cfg.BatcherTargetL1TxSizeBytes, ApproxComprRatio: 0.4,
TargetNumFrames: 1, SubSafetyMargin: 4,
ApproxComprRatio: 0.4, PollInterval: 1 * time.Second,
}, TxMgrConfig: newTxMgrConfig(sys.EthInstances["l1"].WSEndpoint(), sys.Cfg.Secrets.Batcher),
SubSafetyMargin: 4,
PollInterval: 1 * time.Second,
TxMgrConfig: newTxMgrConfig(sys.EthInstances["l1"].WSEndpoint(), sys.Cfg.Secrets.Batcher),
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: log.LevelDebug, Level: log.LevelDebug,
Format: oplog.FormatText, Format: oplog.FormatText,
......
...@@ -39,7 +39,6 @@ import ( ...@@ -39,7 +39,6 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags" batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys" "github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis" "github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
...@@ -151,13 +150,13 @@ func DefaultSystemConfig(t *testing.T) SystemConfig { ...@@ -151,13 +150,13 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
"batcher": testlog.Logger(t, log.LevelInfo).New("role", "batcher"), "batcher": testlog.Logger(t, log.LevelInfo).New("role", "batcher"),
"proposer": testlog.Logger(t, log.LevelCrit).New("role", "proposer"), "proposer": testlog.Logger(t, log.LevelCrit).New("role", "proposer"),
}, },
GethOptions: map[string][]geth.GethOption{}, GethOptions: map[string][]geth.GethOption{},
P2PTopology: nil, // no P2P connectivity by default P2PTopology: nil, // no P2P connectivity by default
NonFinalizedProposals: false, NonFinalizedProposals: false,
ExternalL2Shim: config.ExternalL2Shim, ExternalL2Shim: config.ExternalL2Shim,
BatcherTargetL1TxSizeBytes: 100_000, DataAvailabilityType: batcherFlags.CalldataType,
DataAvailabilityType: batcherFlags.CalldataType, MaxPendingTransactions: 1,
MaxPendingTransactions: 1, BatcherTargetNumFrames: 1,
} }
} }
...@@ -213,16 +212,13 @@ type SystemConfig struct { ...@@ -213,16 +212,13 @@ type SystemConfig struct {
// Configure data-availability type that is used by the batcher. // Configure data-availability type that is used by the batcher.
DataAvailabilityType batcherFlags.DataAvailabilityType DataAvailabilityType batcherFlags.DataAvailabilityType
// Target L1 tx size for the batcher transactions
BatcherTargetL1TxSizeBytes uint64
// Max L1 tx size for the batcher transactions // Max L1 tx size for the batcher transactions
BatcherMaxL1TxSizeBytes uint64 BatcherMaxL1TxSizeBytes uint64
// Target number of frames to create per channel. Can be used to create // Target number of frames to create per channel. Can be used to create
// multi-blob transactions. // multi-blob transactions.
// Default is 1 if unset. // Default is 1 if unset.
BatcherTargetNumFrames uint64 BatcherTargetNumFrames int
// whether to actually use BatcherMaxL1TxSizeBytes for blobs, insteaf of max blob size // whether to actually use BatcherMaxL1TxSizeBytes for blobs, insteaf of max blob size
BatcherUseMaxTxSizeForBlobs bool BatcherUseMaxTxSizeForBlobs bool
...@@ -815,7 +811,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -815,7 +811,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
// batcher defaults if unset // batcher defaults if unset
batcherMaxL1TxSizeBytes := cfg.BatcherMaxL1TxSizeBytes batcherMaxL1TxSizeBytes := cfg.BatcherMaxL1TxSizeBytes
if batcherMaxL1TxSizeBytes == 0 { if batcherMaxL1TxSizeBytes == 0 {
batcherMaxL1TxSizeBytes = 240_000 batcherMaxL1TxSizeBytes = 120_000
} }
batcherTargetNumFrames := cfg.BatcherTargetNumFrames batcherTargetNumFrames := cfg.BatcherTargetNumFrames
if batcherTargetNumFrames == 0 { if batcherTargetNumFrames == 0 {
...@@ -829,14 +825,11 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -829,14 +825,11 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: batcherMaxL1TxSizeBytes, MaxL1TxSize: batcherMaxL1TxSizeBytes,
TestUseMaxTxSizeForBlobs: cfg.BatcherUseMaxTxSizeForBlobs, TestUseMaxTxSizeForBlobs: cfg.BatcherUseMaxTxSizeForBlobs,
CompressorConfig: compressor.CLIConfig{ TargetNumFrames: int(batcherTargetNumFrames),
TargetL1TxSizeBytes: cfg.BatcherTargetL1TxSizeBytes, ApproxComprRatio: 0.4,
TargetNumFrames: int(batcherTargetNumFrames), SubSafetyMargin: 4,
ApproxComprRatio: 0.4, PollInterval: 50 * time.Millisecond,
}, TxMgrConfig: newTxMgrConfig(sys.EthInstances["l1"].WSEndpoint(), cfg.Secrets.Batcher),
SubSafetyMargin: 4,
PollInterval: 50 * time.Millisecond,
TxMgrConfig: newTxMgrConfig(sys.EthInstances["l1"].WSEndpoint(), cfg.Secrets.Batcher),
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: log.LevelInfo, Level: log.LevelInfo,
Format: oplog.FormatText, Format: oplog.FormatText,
......
...@@ -243,7 +243,7 @@ func TestSystemE2EDencunAtGenesisWithBlobs(t *testing.T) { ...@@ -243,7 +243,7 @@ func TestSystemE2EDencunAtGenesisWithBlobs(t *testing.T) {
InitParallel(t) InitParallel(t)
cfg := DefaultSystemConfig(t) cfg := DefaultSystemConfig(t)
//cancun is on from genesis: // cancun is on from genesis:
genesisActivation := hexutil.Uint64(0) genesisActivation := hexutil.Uint64(0)
cfg.DeployConfig.L1CancunTimeOffset = &genesisActivation // i.e. turn cancun on at genesis time + 0 cfg.DeployConfig.L1CancunTimeOffset = &genesisActivation // i.e. turn cancun on at genesis time + 0
...@@ -1468,27 +1468,28 @@ func TestBatcherMultiTx(t *testing.T) { ...@@ -1468,27 +1468,28 @@ func TestBatcherMultiTx(t *testing.T) {
InitParallel(t) InitParallel(t)
cfg := DefaultSystemConfig(t) cfg := DefaultSystemConfig(t)
cfg.MaxPendingTransactions = 0 // no limit on parallel txs cfg.MaxPendingTransactions = 0 // no limit on parallel txs
cfg.BatcherTargetL1TxSizeBytes = 2 // ensures that batcher txs are as small as possible // ensures that batcher txs are as small as possible
cfg.BatcherMaxL1TxSizeBytes = derive.FrameV0OverHeadSize + 1 /*version bytes*/ + 1
cfg.DisableBatcher = true cfg.DisableBatcher = true
sys, err := cfg.Start(t) sys, err := cfg.Start(t)
require.Nil(t, err, "Error starting up system") require.NoError(t, err, "Error starting up system")
defer sys.Close() defer sys.Close()
l1Client := sys.Clients["l1"] l1Client := sys.Clients["l1"]
l2Seq := sys.Clients["sequencer"] l2Seq := sys.Clients["sequencer"]
_, err = geth.WaitForBlock(big.NewInt(10), l2Seq, time.Duration(cfg.DeployConfig.L2BlockTime*15)*time.Second) _, err = geth.WaitForBlock(big.NewInt(10), l2Seq, time.Duration(cfg.DeployConfig.L2BlockTime*15)*time.Second)
require.Nil(t, err, "Waiting for L2 blocks") require.NoError(t, err, "Waiting for L2 blocks")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() defer cancel()
l1Number, err := l1Client.BlockNumber(ctx) l1Number, err := l1Client.BlockNumber(ctx)
require.Nil(t, err) require.NoError(t, err)
// start batch submission // start batch submission
err = sys.BatchSubmitter.Driver().StartBatchSubmitting() err = sys.BatchSubmitter.Driver().StartBatchSubmitting()
require.Nil(t, err) require.NoError(t, err)
totalTxCount := 0 totalTxCount := 0
// wait for up to 10 L1 blocks, usually only 3 is required, but it's // wait for up to 10 L1 blocks, usually only 3 is required, but it's
...@@ -1496,7 +1497,7 @@ func TestBatcherMultiTx(t *testing.T) { ...@@ -1496,7 +1497,7 @@ func TestBatcherMultiTx(t *testing.T) {
// so we wait additional blocks. // so we wait additional blocks.
for i := int64(0); i < 10; i++ { for i := int64(0); i < 10; i++ {
block, err := geth.WaitForBlock(big.NewInt(int64(l1Number)+i), l1Client, time.Duration(cfg.DeployConfig.L1BlockTime*5)*time.Second) block, err := geth.WaitForBlock(big.NewInt(int64(l1Number)+i), l1Client, time.Duration(cfg.DeployConfig.L1BlockTime*5)*time.Second)
require.Nil(t, err, "Waiting for l1 blocks") require.NoError(t, err, "Waiting for l1 blocks")
totalTxCount += len(block.Transactions()) totalTxCount += len(block.Transactions())
if totalTxCount >= 10 { if totalTxCount >= 10 {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment