Commit 4b8f6f4f authored by cody-wang-cb's avatar cody-wang-cb Committed by GitHub

Fjord: Add Brotli channel compression support (#10358)

* wip

* wip

* fix

* fix

* fix

* fix

* address some of the bots comments

* use version bit of 1

* fix lint

* adding compression type

* update batch reader

* abstract span channel compressor

* test and singular batch compressor

* fix

* lint

* move channel compressor as interface

* add base class

* fix go mod

* test fixes

* address comments

* fix

* fix

* revert channel builder test

* revert ratio compressor test

* add checks to accept brotli only post fjord

* revemo unnecessary in test

* fix forge-std

* gofmt

* address comments

* remove methods in compressor

* fix error msg

* add compression algo flag to optional flags

* add Clone() function

---------
Co-authored-by: default avatarRoberto Bayardo <roberto.bayardo@coinbase.com>
parent ea523888
...@@ -4,6 +4,8 @@ go 1.21 ...@@ -4,6 +4,8 @@ go 1.21
require ( require (
github.com/BurntSushi/toml v1.3.2 github.com/BurntSushi/toml v1.3.2
github.com/DataDog/zstd v1.5.2
github.com/andybalholm/brotli v1.1.0
github.com/btcsuite/btcd v0.24.0 github.com/btcsuite/btcd v0.24.0
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
github.com/cockroachdb/pebble v0.0.0-20231018212520-f6cde3fc2fa4 github.com/cockroachdb/pebble v0.0.0-20231018212520-f6cde3fc2fa4
...@@ -55,7 +57,6 @@ require ( ...@@ -55,7 +57,6 @@ require (
) )
require ( require (
github.com/DataDog/zstd v1.5.2 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/VictoriaMetrics/fastcache v1.12.1 // indirect github.com/VictoriaMetrics/fastcache v1.12.1 // indirect
github.com/allegro/bigcache v1.2.1 // indirect github.com/allegro/bigcache v1.2.1 // indirect
......
...@@ -30,6 +30,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF ...@@ -30,6 +30,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKSc= github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKSc=
github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/go-metrics v0.3.8/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= github.com/armon/go-metrics v0.3.8/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
......
...@@ -86,7 +86,7 @@ func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1Origi ...@@ -86,7 +86,7 @@ func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1Origi
} }
var co derive.ChannelOut var co derive.ChannelOut
if cfg.BatchType == derive.SpanBatchType { if cfg.BatchType == derive.SpanBatchType {
co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize) co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize, cfg.CompressorConfig.CompressionAlgo)
} else { } else {
co, err = derive.NewSingularChannelOut(c) co, err = derive.NewSingularChannelOut(c)
} }
......
...@@ -297,7 +297,6 @@ func TestChannelBuilderBatchType(t *testing.T) { ...@@ -297,7 +297,6 @@ func TestChannelBuilderBatchType(t *testing.T) {
{"ChannelBuilder_PendingFrames_TotalFrames", ChannelBuilder_PendingFrames_TotalFrames}, {"ChannelBuilder_PendingFrames_TotalFrames", ChannelBuilder_PendingFrames_TotalFrames},
{"ChannelBuilder_InputBytes", ChannelBuilder_InputBytes}, {"ChannelBuilder_InputBytes", ChannelBuilder_InputBytes},
{"ChannelBuilder_OutputBytes", ChannelBuilder_OutputBytes}, {"ChannelBuilder_OutputBytes", ChannelBuilder_OutputBytes},
{"ChannelBuilder_OutputWrongFramePanic", ChannelBuilder_OutputWrongFramePanic},
} }
for _, test := range tests { for _, test := range tests {
test := test test := test
...@@ -413,7 +412,8 @@ func TestChannelBuilder_OutputFrames(t *testing.T) { ...@@ -413,7 +412,8 @@ func TestChannelBuilder_OutputFrames(t *testing.T) {
// Check how many ready bytes // Check how many ready bytes
require.Greater(t, uint64(cb.co.ReadyBytes()+derive.FrameV0OverHeadSize), channelConfig.MaxFrameSize) require.Greater(t, uint64(cb.co.ReadyBytes()+derive.FrameV0OverHeadSize), channelConfig.MaxFrameSize)
require.Equal(t, 0, cb.PendingFrames())
require.Equal(t, 0, cb.PendingFrames()) // always 0 because non compressor
// The channel should not be full // The channel should not be full
// but we want to output the frames for testing anyways // but we want to output the frames for testing anyways
...@@ -430,11 +430,27 @@ func TestChannelBuilder_OutputFrames(t *testing.T) { ...@@ -430,11 +430,27 @@ func TestChannelBuilder_OutputFrames(t *testing.T) {
} }
func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) { func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) {
for _, algo := range derive.CompressionAlgoTypes {
t.Run("ChannelBuilder_OutputFrames_SpanBatch_"+algo.String(), func(t *testing.T) {
if algo.IsBrotli() {
ChannelBuilder_OutputFrames_SpanBatch(t, algo) // to fill faster for brotli
} else {
ChannelBuilder_OutputFrames_SpanBatch(t, algo)
}
})
}
}
func ChannelBuilder_OutputFrames_SpanBatch(t *testing.T, algo derive.CompressionAlgo) {
channelConfig := defaultTestChannelConfig() channelConfig := defaultTestChannelConfig()
channelConfig.MaxFrameSize = 20 + derive.FrameV0OverHeadSize channelConfig.MaxFrameSize = 20 + derive.FrameV0OverHeadSize
if algo.IsBrotli() {
channelConfig.TargetNumFrames = 3
} else {
channelConfig.TargetNumFrames = 5 channelConfig.TargetNumFrames = 5
}
channelConfig.BatchType = derive.SpanBatchType channelConfig.BatchType = derive.SpanBatchType
channelConfig.InitRatioCompressor(1) channelConfig.InitRatioCompressor(1, algo)
// Construct the channel builder // Construct the channel builder
cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin) cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin)
...@@ -453,6 +469,10 @@ func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) { ...@@ -453,6 +469,10 @@ func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) {
for { for {
err = addMiniBlock(cb) err = addMiniBlock(cb)
if err == nil { if err == nil {
if cb.IsFull() {
// this happens when the data exactly fills the channel
break
}
require.False(t, cb.IsFull()) require.False(t, cb.IsFull())
// There should be no ready bytes until the channel is full // There should be no ready bytes until the channel is full
require.Equal(t, cb.co.ReadyBytes(), 0) require.Equal(t, cb.co.ReadyBytes(), 0)
...@@ -504,7 +524,7 @@ func ChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T, batchType uint) { ...@@ -504,7 +524,7 @@ func ChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T, batchType uint) {
channelConfig := defaultTestChannelConfig() channelConfig := defaultTestChannelConfig()
channelConfig.MaxFrameSize = derive.FrameV0OverHeadSize + 1 channelConfig.MaxFrameSize = derive.FrameV0OverHeadSize + 1
channelConfig.TargetNumFrames = math.MaxUint16 + 1 channelConfig.TargetNumFrames = math.MaxUint16 + 1
channelConfig.InitRatioCompressor(.1) channelConfig.InitRatioCompressor(.1, derive.Zlib)
channelConfig.BatchType = batchType channelConfig.BatchType = batchType
rng := rand.New(rand.NewSource(123)) rng := rand.New(rand.NewSource(123))
...@@ -546,8 +566,8 @@ func TestChannelBuilder_FullShadowCompressor(t *testing.T) { ...@@ -546,8 +566,8 @@ func TestChannelBuilder_FullShadowCompressor(t *testing.T) {
TargetNumFrames: 1, TargetNumFrames: 1,
BatchType: derive.SpanBatchType, BatchType: derive.SpanBatchType,
} }
cfg.InitShadowCompressor()
cfg.InitShadowCompressor(derive.Zlib)
cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin) cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(err) require.NoError(err)
...@@ -577,7 +597,7 @@ func ChannelBuilder_AddBlock(t *testing.T, batchType uint) { ...@@ -577,7 +597,7 @@ func ChannelBuilder_AddBlock(t *testing.T, batchType uint) {
channelConfig.MaxFrameSize = 20 + derive.FrameV0OverHeadSize channelConfig.MaxFrameSize = 20 + derive.FrameV0OverHeadSize
channelConfig.TargetNumFrames = 2 channelConfig.TargetNumFrames = 2
// Configure the Input Threshold params so we observe a full channel // Configure the Input Threshold params so we observe a full channel
channelConfig.InitRatioCompressor(1) channelConfig.InitRatioCompressor(1, derive.Zlib)
// Construct the channel builder // Construct the channel builder
cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin) cb, err := NewChannelBuilder(channelConfig, defaultTestRollupConfig, latestL1BlockOrigin)
...@@ -700,7 +720,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) { ...@@ -700,7 +720,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) {
cfg.MaxFrameSize = 1000 cfg.MaxFrameSize = 1000
cfg.TargetNumFrames = tnf cfg.TargetNumFrames = tnf
cfg.BatchType = batchType cfg.BatchType = batchType
cfg.InitShadowCompressor() cfg.InitShadowCompressor(derive.Zlib)
cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin) cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(err) require.NoError(err)
...@@ -782,7 +802,7 @@ func ChannelBuilder_OutputBytes(t *testing.T, batchType uint) { ...@@ -782,7 +802,7 @@ func ChannelBuilder_OutputBytes(t *testing.T, batchType uint) {
cfg.MaxFrameSize = 1000 cfg.MaxFrameSize = 1000
cfg.TargetNumFrames = 16 cfg.TargetNumFrames = 16
cfg.BatchType = batchType cfg.BatchType = batchType
cfg.InitRatioCompressor(1.0) cfg.InitRatioCompressor(1.0, derive.Zlib)
cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin) cb, err := NewChannelBuilder(cfg, defaultTestRollupConfig, latestL1BlockOrigin)
require.NoError(err, "NewChannelBuilder") require.NoError(err, "NewChannelBuilder")
......
...@@ -53,25 +53,26 @@ type ChannelConfig struct { ...@@ -53,25 +53,26 @@ type ChannelConfig struct {
// value consistent with cc.TargetNumFrames and cc.MaxFrameSize. // value consistent with cc.TargetNumFrames and cc.MaxFrameSize.
// comprKind can be the empty string, in which case the default compressor will // comprKind can be the empty string, in which case the default compressor will
// be used. // be used.
func (cc *ChannelConfig) InitCompressorConfig(approxComprRatio float64, comprKind string) { func (cc *ChannelConfig) InitCompressorConfig(approxComprRatio float64, comprKind string, compressionAlgo derive.CompressionAlgo) {
cc.CompressorConfig = compressor.Config{ cc.CompressorConfig = compressor.Config{
// Compressor output size needs to account for frame encoding overhead // Compressor output size needs to account for frame encoding overhead
TargetOutputSize: MaxDataSize(cc.TargetNumFrames, cc.MaxFrameSize), TargetOutputSize: MaxDataSize(cc.TargetNumFrames, cc.MaxFrameSize),
ApproxComprRatio: approxComprRatio, ApproxComprRatio: approxComprRatio,
Kind: comprKind, Kind: comprKind,
CompressionAlgo: compressionAlgo,
} }
} }
func (cc *ChannelConfig) InitRatioCompressor(approxComprRatio float64) { func (cc *ChannelConfig) InitRatioCompressor(approxComprRatio float64, compressionAlgo derive.CompressionAlgo) {
cc.InitCompressorConfig(approxComprRatio, compressor.RatioKind) cc.InitCompressorConfig(approxComprRatio, compressor.RatioKind, compressionAlgo)
} }
func (cc *ChannelConfig) InitShadowCompressor() { func (cc *ChannelConfig) InitShadowCompressor(compressionAlgo derive.CompressionAlgo) {
cc.InitCompressorConfig(0, compressor.ShadowKind) cc.InitCompressorConfig(0, compressor.ShadowKind, compressionAlgo)
} }
func (cc *ChannelConfig) InitNoneCompressor() { func (cc *ChannelConfig) InitNoneCompressor() {
cc.InitCompressorConfig(0, compressor.NoneKind) cc.InitCompressorConfig(0, compressor.NoneKind, derive.Zlib)
} }
func (cc *ChannelConfig) MaxFramesPerTx() int { func (cc *ChannelConfig) MaxFramesPerTx() int {
......
...@@ -20,7 +20,7 @@ func defaultTestChannelConfig() ChannelConfig { ...@@ -20,7 +20,7 @@ func defaultTestChannelConfig() ChannelConfig {
TargetNumFrames: 1, TargetNumFrames: 1,
BatchType: derive.SingularBatchType, BatchType: derive.SingularBatchType,
} }
c.InitRatioCompressor(0.4) c.InitRatioCompressor(0.4, derive.Zlib)
return c return c
} }
......
...@@ -25,7 +25,7 @@ func channelManagerTestConfig(maxFrameSize uint64, batchType uint) ChannelConfig ...@@ -25,7 +25,7 @@ func channelManagerTestConfig(maxFrameSize uint64, batchType uint) ChannelConfig
TargetNumFrames: 1, TargetNumFrames: 1,
BatchType: batchType, BatchType: batchType,
} }
cfg.InitRatioCompressor(1) cfg.InitRatioCompressor(1, derive.Zlib)
return cfg return cfg
} }
...@@ -123,7 +123,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { ...@@ -123,7 +123,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
// channels on confirmation. This would result in [TxConfirmed] // channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and resetting the pendingChannels map // clearing confirmed transactions, and resetting the pendingChannels map
cfg.ChannelTimeout = 10 cfg.ChannelTimeout = 10
cfg.InitRatioCompressor(1) cfg.InitRatioCompressor(1, derive.Zlib)
m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig) m := NewChannelManager(log, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
// Channel Manager state should be empty by default // Channel Manager state should be empty by default
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"io" "io"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -29,6 +30,9 @@ func TestChannelTimeout(t *testing.T) { ...@@ -29,6 +30,9 @@ func TestChannelTimeout(t *testing.T) {
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
ChannelTimeout: 100, ChannelTimeout: 100,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
},
}, &rollup.Config{}) }, &rollup.Config{})
m.Clear(eth.BlockID{}) m.Clear(eth.BlockID{})
...@@ -71,7 +75,9 @@ func TestChannelTimeout(t *testing.T) { ...@@ -71,7 +75,9 @@ func TestChannelTimeout(t *testing.T) {
// TestChannelManager_NextTxData tests the nextTxData function. // TestChannelManager_NextTxData tests the nextTxData function.
func TestChannelManager_NextTxData(t *testing.T) { func TestChannelManager_NextTxData(t *testing.T) {
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{}) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
}}, &rollup.Config{})
m.Clear(eth.BlockID{}) m.Clear(eth.BlockID{})
// Nil pending channel should return EOF // Nil pending channel should return EOF
...@@ -118,6 +124,9 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) { ...@@ -118,6 +124,9 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{ ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{
MultiFrameTxs: false, MultiFrameTxs: false,
TargetNumFrames: n, TargetNumFrames: n,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
},
}, &rollup.Config{}, latestL1BlockOrigin) }, &rollup.Config{}, latestL1BlockOrigin)
require.NoError(err) require.NoError(err)
chID := ch.ID() chID := ch.ID()
...@@ -156,6 +165,9 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) { ...@@ -156,6 +165,9 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) {
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{ ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{
MultiFrameTxs: true, MultiFrameTxs: true,
TargetNumFrames: n, TargetNumFrames: n,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
},
}, &rollup.Config{}, latestL1BlockOrigin) }, &rollup.Config{}, latestL1BlockOrigin)
require.NoError(err) require.NoError(err)
chID := ch.ID() chID := ch.ID()
...@@ -202,6 +214,9 @@ func TestChannelTxConfirmed(t *testing.T) { ...@@ -202,6 +214,9 @@ func TestChannelTxConfirmed(t *testing.T) {
// channels on confirmation. This would result in [TxConfirmed] // channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and resetting the pendingChannels map // clearing confirmed transactions, and resetting the pendingChannels map
ChannelTimeout: 10, ChannelTimeout: 10,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
},
}, &rollup.Config{}) }, &rollup.Config{})
m.Clear(eth.BlockID{}) m.Clear(eth.BlockID{})
...@@ -251,7 +266,9 @@ func TestChannelTxConfirmed(t *testing.T) { ...@@ -251,7 +266,9 @@ func TestChannelTxConfirmed(t *testing.T) {
func TestChannelTxFailed(t *testing.T) { func TestChannelTxFailed(t *testing.T) {
// Create a channel manager // Create a channel manager
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{}) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
}}, &rollup.Config{})
m.Clear(eth.BlockID{}) m.Clear(eth.BlockID{})
// Let's add a valid pending transaction to the channel // Let's add a valid pending transaction to the channel
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
plasma "github.com/ethereum-optimism/optimism/op-plasma" plasma "github.com/ethereum-optimism/optimism/op-plasma"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
...@@ -67,6 +68,9 @@ type CLIConfig struct { ...@@ -67,6 +68,9 @@ type CLIConfig struct {
// Type of compressor to use. Must be one of [compressor.KindKeys]. // Type of compressor to use. Must be one of [compressor.KindKeys].
Compressor string Compressor string
// Type of compression algorithm to use. Must be one of [zlib, brotli, brotli[9-11]]
CompressionAlgo derive.CompressionAlgo
// If Stopped is true, the batcher starts stopped and won't start batching right away. // If Stopped is true, the batcher starts stopped and won't start batching right away.
// Batching needs to be started via an admin RPC. // Batching needs to be started via an admin RPC.
Stopped bool Stopped bool
...@@ -124,6 +128,9 @@ func (c *CLIConfig) Check() error { ...@@ -124,6 +128,9 @@ func (c *CLIConfig) Check() error {
if c.Compressor == compressor.RatioKind && (c.ApproxComprRatio <= 0 || c.ApproxComprRatio > 1) { if c.Compressor == compressor.RatioKind && (c.ApproxComprRatio <= 0 || c.ApproxComprRatio > 1) {
return fmt.Errorf("invalid ApproxComprRatio %v for ratio compressor", c.ApproxComprRatio) return fmt.Errorf("invalid ApproxComprRatio %v for ratio compressor", c.ApproxComprRatio)
} }
if !derive.ValidCompressionAlgoType(c.CompressionAlgo) {
return fmt.Errorf("invalid compression algo %v", c.CompressionAlgo)
}
if c.BatchType > 1 { if c.BatchType > 1 {
return fmt.Errorf("unknown batch type: %v", c.BatchType) return fmt.Errorf("unknown batch type: %v", c.BatchType)
} }
...@@ -168,6 +175,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig { ...@@ -168,6 +175,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
TargetNumFrames: ctx.Int(flags.TargetNumFramesFlag.Name), TargetNumFrames: ctx.Int(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.Float64(flags.ApproxComprRatioFlag.Name), ApproxComprRatio: ctx.Float64(flags.ApproxComprRatioFlag.Name),
Compressor: ctx.String(flags.CompressorFlag.Name), Compressor: ctx.String(flags.CompressorFlag.Name),
CompressionAlgo: derive.CompressionAlgo(ctx.String(flags.CompressionAlgoFlag.Name)),
Stopped: ctx.Bool(flags.StoppedFlag.Name), Stopped: ctx.Bool(flags.StoppedFlag.Name),
WaitNodeSync: ctx.Bool(flags.WaitNodeSyncFlag.Name), WaitNodeSync: ctx.Bool(flags.WaitNodeSyncFlag.Name),
CheckRecentTxsDepth: ctx.Int(flags.CheckRecentTxsDepthFlag.Name), CheckRecentTxsDepth: ctx.Int(flags.CheckRecentTxsDepthFlag.Name),
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/batcher" "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/log" "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics" "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof" "github.com/ethereum-optimism/optimism/op-service/oppprof"
...@@ -36,6 +37,7 @@ func validBatcherConfig() batcher.CLIConfig { ...@@ -36,6 +37,7 @@ func validBatcherConfig() batcher.CLIConfig {
PprofConfig: oppprof.DefaultCLIConfig(), PprofConfig: oppprof.DefaultCLIConfig(),
// The compressor config is not checked in config.Check() // The compressor config is not checked in config.Check()
RPC: rpc.DefaultCLIConfig(), RPC: rpc.DefaultCLIConfig(),
CompressionAlgo: derive.Zlib,
} }
} }
......
...@@ -219,7 +219,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { ...@@ -219,7 +219,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
return fmt.Errorf("max frame size %d exceeds plasma max input size %d", cc.MaxFrameSize, plasma.MaxInputSize) return fmt.Errorf("max frame size %d exceeds plasma max input size %d", cc.MaxFrameSize, plasma.MaxInputSize)
} }
cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor) cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor, cfg.CompressionAlgo)
if bs.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { if bs.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) {
bs.Log.Error("Cannot use Blob data before Ecotone!") // log only, the batcher may not be actively running. bs.Log.Error("Cannot use Blob data before Ecotone!") // log only, the batcher may not be actively running.
...@@ -228,6 +228,11 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { ...@@ -228,6 +228,11 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
bs.Log.Warn("Ecotone upgrade is active, but batcher is not configured to use Blobs!") bs.Log.Warn("Ecotone upgrade is active, but batcher is not configured to use Blobs!")
} }
// Checking for brotli compression only post Fjord
if bs.ChannelConfig.CompressorConfig.CompressionAlgo.IsBrotli() && !bs.RollupConfig.IsFjord(uint64(time.Now().Unix())) {
return fmt.Errorf("cannot use brotli compression before Fjord")
}
if err := cc.Check(); err != nil { if err := cc.Check(); err != nil {
return fmt.Errorf("invalid channel configuration: %w", err) return fmt.Errorf("invalid channel configuration: %w", err)
} }
......
...@@ -16,6 +16,9 @@ type Config struct { ...@@ -16,6 +16,9 @@ type Config struct {
// Kind of compressor to use. Must be one of KindKeys. If unset, NewCompressor // Kind of compressor to use. Must be one of KindKeys. If unset, NewCompressor
// will default to RatioKind. // will default to RatioKind.
Kind string Kind string
// Type of compression algorithm to use. Must be one of [zlib, brotli-(9|10|11)]
CompressionAlgo derive.CompressionAlgo
} }
func (c Config) NewCompressor() (derive.Compressor, error) { func (c Config) NewCompressor() (derive.Compressor, error) {
......
package compressor package compressor
import ( import (
"bytes"
"compress/zlib"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
) )
...@@ -11,8 +8,7 @@ type RatioCompressor struct { ...@@ -11,8 +8,7 @@ type RatioCompressor struct {
config Config config Config
inputBytes int inputBytes int
buf bytes.Buffer compressor derive.ChannelCompressor
compress *zlib.Writer
} }
// NewRatioCompressor creates a new derive.Compressor implementation that uses the target // NewRatioCompressor creates a new derive.Compressor implementation that uses the target
...@@ -25,11 +21,11 @@ func NewRatioCompressor(config Config) (derive.Compressor, error) { ...@@ -25,11 +21,11 @@ func NewRatioCompressor(config Config) (derive.Compressor, error) {
config: config, config: config,
} }
compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression) compressor, err := derive.NewChannelCompressor(config.CompressionAlgo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
c.compress = compress c.compressor = compressor
return c, nil return c, nil
} }
...@@ -39,29 +35,28 @@ func (t *RatioCompressor) Write(p []byte) (int, error) { ...@@ -39,29 +35,28 @@ func (t *RatioCompressor) Write(p []byte) (int, error) {
return 0, err return 0, err
} }
t.inputBytes += len(p) t.inputBytes += len(p)
return t.compress.Write(p) return t.compressor.Write(p)
} }
func (t *RatioCompressor) Close() error { func (t *RatioCompressor) Close() error {
return t.compress.Close() return t.compressor.Close()
} }
func (t *RatioCompressor) Read(p []byte) (int, error) { func (t *RatioCompressor) Read(p []byte) (int, error) {
return t.buf.Read(p) return t.compressor.Read(p)
} }
func (t *RatioCompressor) Reset() { func (t *RatioCompressor) Reset() {
t.buf.Reset() t.compressor.Reset()
t.compress.Reset(&t.buf)
t.inputBytes = 0 t.inputBytes = 0
} }
func (t *RatioCompressor) Len() int { func (t *RatioCompressor) Len() int {
return t.buf.Len() return t.compressor.Len()
} }
func (t *RatioCompressor) Flush() error { func (t *RatioCompressor) Flush() error {
return t.compress.Flush() return t.compressor.Flush()
} }
func (t *RatioCompressor) FullErr() error { func (t *RatioCompressor) FullErr() error {
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -62,6 +63,7 @@ func TestChannelConfig_InputThreshold(t *testing.T) { ...@@ -62,6 +63,7 @@ func TestChannelConfig_InputThreshold(t *testing.T) {
comp, err := compressor.NewRatioCompressor(compressor.Config{ comp, err := compressor.NewRatioCompressor(compressor.Config{
TargetOutputSize: tt.targetOutputSize, TargetOutputSize: tt.targetOutputSize,
ApproxComprRatio: tt.approxComprRatio, ApproxComprRatio: tt.approxComprRatio,
CompressionAlgo: derive.Zlib,
}) })
require.NoError(t, err) require.NoError(t, err)
got := comp.(*compressor.RatioCompressor).InputThreshold() got := comp.(*compressor.RatioCompressor).InputThreshold()
......
package compressor package compressor
import ( import (
"bytes"
"compress/zlib"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
) )
...@@ -21,11 +18,8 @@ const ( ...@@ -21,11 +18,8 @@ const (
type ShadowCompressor struct { type ShadowCompressor struct {
config Config config Config
buf bytes.Buffer compressor derive.ChannelCompressor
compress *zlib.Writer shadowCompressor derive.ChannelCompressor
shadowBuf bytes.Buffer
shadowCompress *zlib.Writer
fullErr error fullErr error
...@@ -45,11 +39,11 @@ func NewShadowCompressor(config Config) (derive.Compressor, error) { ...@@ -45,11 +39,11 @@ func NewShadowCompressor(config Config) (derive.Compressor, error) {
} }
var err error var err error
c.compress, err = zlib.NewWriterLevel(&c.buf, zlib.BestCompression) c.compressor, err = derive.NewChannelCompressor(config.CompressionAlgo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
c.shadowCompress, err = zlib.NewWriterLevel(&c.shadowBuf, zlib.BestCompression) c.shadowCompressor, err = derive.NewChannelCompressor(config.CompressionAlgo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -62,7 +56,7 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) { ...@@ -62,7 +56,7 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) {
if t.fullErr != nil { if t.fullErr != nil {
return 0, t.fullErr return 0, t.fullErr
} }
_, err := t.shadowCompress.Write(p) _, err := t.shadowCompressor.Write(p)
if err != nil { if err != nil {
return 0, err return 0, err
} }
...@@ -71,10 +65,10 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) { ...@@ -71,10 +65,10 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) {
// Do not flush the buffer unless there's some chance we will be over the size limit. // Do not flush the buffer unless there's some chance we will be over the size limit.
// This reduces CPU but more importantly it makes the shadow compression ratio more // This reduces CPU but more importantly it makes the shadow compression ratio more
// closely reflect the ultimate compression ratio. // closely reflect the ultimate compression ratio.
if err = t.shadowCompress.Flush(); err != nil { if err = t.shadowCompressor.Flush(); err != nil {
return 0, err return 0, err
} }
newBound = uint64(t.shadowBuf.Len()) + CloseOverheadZlib newBound = uint64(t.shadowCompressor.Len()) + CloseOverheadZlib
if newBound > t.config.TargetOutputSize { if newBound > t.config.TargetOutputSize {
t.fullErr = derive.ErrCompressorFull t.fullErr = derive.ErrCompressorFull
if t.Len() > 0 { if t.Len() > 0 {
...@@ -85,32 +79,30 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) { ...@@ -85,32 +79,30 @@ func (t *ShadowCompressor) Write(p []byte) (int, error) {
} }
} }
t.bound = newBound t.bound = newBound
return t.compress.Write(p) return t.compressor.Write(p)
} }
func (t *ShadowCompressor) Close() error { func (t *ShadowCompressor) Close() error {
return t.compress.Close() return t.compressor.Close()
} }
func (t *ShadowCompressor) Read(p []byte) (int, error) { func (t *ShadowCompressor) Read(p []byte) (int, error) {
return t.buf.Read(p) return t.compressor.Read(p)
} }
func (t *ShadowCompressor) Reset() { func (t *ShadowCompressor) Reset() {
t.buf.Reset() t.compressor.Reset()
t.compress.Reset(&t.buf) t.shadowCompressor.Reset()
t.shadowBuf.Reset()
t.shadowCompress.Reset(&t.shadowBuf)
t.fullErr = nil t.fullErr = nil
t.bound = safeCompressionOverhead t.bound = safeCompressionOverhead
} }
func (t *ShadowCompressor) Len() int { func (t *ShadowCompressor) Len() int {
return t.buf.Len() return t.compressor.Len()
} }
func (t *ShadowCompressor) Flush() error { func (t *ShadowCompressor) Flush() error {
return t.compress.Flush() return t.compressor.Flush()
} }
func (t *ShadowCompressor) FullErr() error { func (t *ShadowCompressor) FullErr() error {
......
...@@ -63,6 +63,7 @@ func TestShadowCompressor(t *testing.T) { ...@@ -63,6 +63,7 @@ func TestShadowCompressor(t *testing.T) {
sc, err := NewShadowCompressor(Config{ sc, err := NewShadowCompressor(Config{
TargetOutputSize: test.targetOutputSize, TargetOutputSize: test.targetOutputSize,
CompressionAlgo: derive.Zlib,
}) })
require.NoError(t, err) require.NoError(t, err)
...@@ -115,6 +116,7 @@ func TestBoundInaccurateForLargeRandomData(t *testing.T) { ...@@ -115,6 +116,7 @@ func TestBoundInaccurateForLargeRandomData(t *testing.T) {
sc, err := NewShadowCompressor(Config{ sc, err := NewShadowCompressor(Config{
TargetOutputSize: sizeLimit + 100, TargetOutputSize: sizeLimit + 100,
CompressionAlgo: derive.Zlib,
}) })
require.NoError(t, err) require.NoError(t, err)
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
plasma "github.com/ethereum-optimism/optimism/op-plasma" plasma "github.com/ethereum-optimism/optimism/op-plasma"
opservice "github.com/ethereum-optimism/optimism/op-service" opservice "github.com/ethereum-optimism/optimism/op-service"
openum "github.com/ethereum-optimism/optimism/op-service/enum" openum "github.com/ethereum-optimism/optimism/op-service/enum"
...@@ -99,6 +100,15 @@ var ( ...@@ -99,6 +100,15 @@ var (
return nil return nil
}, },
} }
CompressionAlgoFlag = &cli.GenericFlag{
Name: "compression-algo",
Usage: "The compression algorithm to use. Valid options: " + openum.EnumString(derive.CompressionAlgoTypes),
EnvVars: prefixEnvVars("COMPRESSION_ALGO"),
Value: func() *derive.CompressionAlgo {
out := derive.Zlib
return &out
}(),
}
StoppedFlag = &cli.BoolFlag{ StoppedFlag = &cli.BoolFlag{
Name: "stopped", Name: "stopped",
Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC", Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC",
...@@ -167,6 +177,7 @@ var optionalFlags = []cli.Flag{ ...@@ -167,6 +177,7 @@ var optionalFlags = []cli.Flag{
BatchTypeFlag, BatchTypeFlag,
DataAvailabilityTypeFlag, DataAvailabilityTypeFlag,
ActiveSequencerCheckDurationFlag, ActiveSequencerCheckDurationFlag,
CompressionAlgoFlag,
} }
func init() { func init() {
......
...@@ -192,6 +192,7 @@ func (s *L2Batcher) Buffer(t Testing) error { ...@@ -192,6 +192,7 @@ func (s *L2Batcher) Buffer(t Testing) error {
target := batcher.MaxDataSize(1, s.l2BatcherCfg.MaxL1TxSize) target := batcher.MaxDataSize(1, s.l2BatcherCfg.MaxL1TxSize)
c, e := compressor.NewShadowCompressor(compressor.Config{ c, e := compressor.NewShadowCompressor(compressor.Config{
TargetOutputSize: target, TargetOutputSize: target,
CompressionAlgo: derive.Zlib,
}) })
require.NoError(t, e, "failed to create compressor") require.NoError(t, e, "failed to create compressor")
...@@ -200,7 +201,7 @@ func (s *L2Batcher) Buffer(t Testing) error { ...@@ -200,7 +201,7 @@ func (s *L2Batcher) Buffer(t Testing) error {
} else { } else {
// use span batch if we're forcing it or if we're at/beyond delta // use span batch if we're forcing it or if we're at/beyond delta
if s.l2BatcherCfg.ForceSubmitSpanBatch || s.rollupCfg.IsDelta(block.Time()) { if s.l2BatcherCfg.ForceSubmitSpanBatch || s.rollupCfg.IsDelta(block.Time()) {
ch, err = derive.NewSpanChannelOut(s.rollupCfg.Genesis.L2Time, s.rollupCfg.L2ChainID, target) ch, err = derive.NewSpanChannelOut(s.rollupCfg.Genesis.L2Time, s.rollupCfg.L2ChainID, target, derive.Zlib)
// use singular batches in all other cases // use singular batches in all other cases
} else { } else {
ch, err = derive.NewSingularChannelOut(c) ch, err = derive.NewSingularChannelOut(c)
......
...@@ -26,7 +26,7 @@ import ( ...@@ -26,7 +26,7 @@ import (
) )
func newSpanChannelOut(t StatefulTesting, e e2eutils.SetupData) derive.ChannelOut { func newSpanChannelOut(t StatefulTesting, e e2eutils.SetupData) derive.ChannelOut {
channelOut, err := derive.NewSpanChannelOut(e.RollupCfg.Genesis.L2Time, e.RollupCfg.L2ChainID, 128_000) channelOut, err := derive.NewSpanChannelOut(e.RollupCfg.Genesis.L2Time, e.RollupCfg.L2ChainID, 128_000, derive.Zlib)
require.NoError(t, err) require.NoError(t, err)
return channelOut return channelOut
} }
......
...@@ -296,6 +296,7 @@ func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) { ...@@ -296,6 +296,7 @@ func setupBatcher(t *testing.T, sys *System, conductors map[string]*conductor) {
BatchType: derive.SpanBatchType, BatchType: derive.SpanBatchType,
DataAvailabilityType: batcherFlags.CalldataType, DataAvailabilityType: batcherFlags.CalldataType,
ActiveSequencerCheckDuration: 0, ActiveSequencerCheckDuration: 0,
CompressionAlgo: derive.Zlib,
} }
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"]) batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"])
......
...@@ -845,6 +845,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -845,6 +845,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
Stopped: sys.Cfg.DisableBatcher, // Batch submitter may be enabled later Stopped: sys.Cfg.DisableBatcher, // Batch submitter may be enabled later
BatchType: batchType, BatchType: batchType,
DataAvailabilityType: sys.Cfg.DataAvailabilityType, DataAvailabilityType: sys.Cfg.DataAvailabilityType,
CompressionAlgo: derive.Zlib,
} }
// Batch Submitter // Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"]) batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"])
......
...@@ -58,13 +58,13 @@ type compressorAndTarget struct { ...@@ -58,13 +58,13 @@ type compressorAndTarget struct {
} }
// channelOutByType returns a channel out of the given type as a helper for the benchmarks // channelOutByType returns a channel out of the given type as a helper for the benchmarks
func channelOutByType(batchType uint, compKey string) (derive.ChannelOut, error) { func channelOutByType(batchType uint, compKey string, algo derive.CompressionAlgo) (derive.ChannelOut, error) {
chainID := big.NewInt(333) chainID := big.NewInt(333)
if batchType == derive.SingularBatchType { if batchType == derive.SingularBatchType {
return derive.NewSingularChannelOut(compressors[compKey].compressor) return derive.NewSingularChannelOut(compressors[compKey].compressor)
} }
if batchType == derive.SpanBatchType { if batchType == derive.SpanBatchType {
return derive.NewSpanChannelOut(0, chainID, compressors[compKey].targetOutput) return derive.NewSpanChannelOut(0, chainID, compressors[compKey].targetOutput, algo)
} }
return nil, fmt.Errorf("unsupported batch type: %d", batchType) return nil, fmt.Errorf("unsupported batch type: %d", batchType)
} }
...@@ -129,13 +129,14 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) { ...@@ -129,13 +129,14 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
// to leverage optimizations in the Batch Linked List // to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix()) batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix())
} }
b.Run(tc.String(), func(b *testing.B) { for _, algo := range derive.CompressionAlgoTypes {
b.Run(tc.String()+"_"+algo.String(), func(b *testing.B) {
// reset the compressor used in the test case // reset the compressor used in the test case
for bn := 0; bn < b.N; bn++ { for bn := 0; bn < b.N; bn++ {
// don't measure the setup time // don't measure the setup time
b.StopTimer() b.StopTimer()
compressors[tc.compKey].compressor.Reset() compressors[tc.compKey].compressor.Reset()
cout, _ := channelOutByType(tc.BatchType, tc.compKey) cout, _ := channelOutByType(tc.BatchType, tc.compKey, algo)
// add all but the final batch to the channel out // add all but the final batch to the channel out
for i := 0; i < tc.BatchCount-1; i++ { for i := 0; i < tc.BatchCount-1; i++ {
err := cout.AddSingularBatch(batches[i], 0) err := cout.AddSingularBatch(batches[i], 0)
...@@ -149,6 +150,8 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) { ...@@ -149,6 +150,8 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
} }
}) })
} }
}
} }
// BenchmarkIncremental fills a channel out incrementally with batches // BenchmarkIncremental fills a channel out incrementally with batches
...@@ -165,15 +168,16 @@ func BenchmarkIncremental(b *testing.B) { ...@@ -165,15 +168,16 @@ func BenchmarkIncremental(b *testing.B) {
{derive.SpanBatchType, 5, 1, "RealBlindCompressor"}, {derive.SpanBatchType, 5, 1, "RealBlindCompressor"},
//{derive.SingularBatchType, 100, 1, "RealShadowCompressor"}, //{derive.SingularBatchType, 100, 1, "RealShadowCompressor"},
} }
for _, algo := range derive.CompressionAlgoTypes {
for _, tc := range tcs { for _, tc := range tcs {
cout, err := channelOutByType(tc.BatchType, tc.compKey) cout, err := channelOutByType(tc.BatchType, tc.compKey, algo)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
done := false done := false
for base := 0; !done; base += tc.BatchCount { for base := 0; !done; base += tc.BatchCount {
rangeName := fmt.Sprintf("Incremental %s: %d-%d", tc.String(), base, base+tc.BatchCount) rangeName := fmt.Sprintf("Incremental %s-%s: %d-%d", algo, tc.String(), base, base+tc.BatchCount)
b.Run(rangeName, func(b *testing.B) { b.Run(rangeName+"_"+algo.String(), func(b *testing.B) {
b.StopTimer() b.StopTimer()
// prepare the batches // prepare the batches
t := time.Now() t := time.Now()
...@@ -196,6 +200,7 @@ func BenchmarkIncremental(b *testing.B) { ...@@ -196,6 +200,7 @@ func BenchmarkIncremental(b *testing.B) {
}) })
} }
} }
}
} }
// BenchmarkAllBatchesChannelOut benchmarks the performance of adding singular batches to a channel out // BenchmarkAllBatchesChannelOut benchmarks the performance of adding singular batches to a channel out
...@@ -226,6 +231,7 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) { ...@@ -226,6 +231,7 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
} }
} }
for _, algo := range derive.CompressionAlgoTypes {
for _, tc := range tests { for _, tc := range tests {
chainID := big.NewInt(333) chainID := big.NewInt(333)
rng := rand.New(rand.NewSource(0x543331)) rng := rand.New(rand.NewSource(0x543331))
...@@ -238,13 +244,13 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) { ...@@ -238,13 +244,13 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
// to leverage optimizations in the Batch Linked List // to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix()) batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix())
} }
b.Run(tc.String(), func(b *testing.B) { b.Run(tc.String()+"_"+algo.String(), func(b *testing.B) {
// reset the compressor used in the test case // reset the compressor used in the test case
for bn := 0; bn < b.N; bn++ { for bn := 0; bn < b.N; bn++ {
// don't measure the setup time // don't measure the setup time
b.StopTimer() b.StopTimer()
compressors[tc.compKey].compressor.Reset() compressors[tc.compKey].compressor.Reset()
cout, _ := channelOutByType(tc.BatchType, tc.compKey) cout, _ := channelOutByType(tc.BatchType, tc.compKey, algo)
b.StartTimer() b.StartTimer()
// add all batches to the channel out // add all batches to the channel out
for i := 0; i < tc.BatchCount; i++ { for i := 0; i < tc.BatchCount; i++ {
...@@ -254,6 +260,7 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) { ...@@ -254,6 +260,7 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
} }
}) })
} }
}
} }
// BenchmarkGetRawSpanBatch benchmarks the performance of building a span batch from singular batches // BenchmarkGetRawSpanBatch benchmarks the performance of building a span batch from singular batches
......
...@@ -111,7 +111,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr ...@@ -111,7 +111,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr
var batchTypes []int var batchTypes []int
invalidBatches := false invalidBatches := false
if ch.IsReady() { if ch.IsReady() {
br, err := derive.BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(ch.HighestBlock().Time)) br, err := derive.BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(ch.HighestBlock().Time), rollupCfg.IsFjord(ch.HighestBlock().Time))
if err == nil { if err == nil {
for batchData, err := br(); err != io.EOF; batchData, err = br() { for batchData, err := br(); err != io.EOF; batchData, err = br() {
if err != nil { if err != nil {
......
package derive package derive
import ( import (
"bufio"
"bytes" "bytes"
"compress/zlib" "compress/zlib"
"fmt" "fmt"
"io" "io"
"github.com/andybalholm/brotli"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
) )
const (
ZlibCM8 = 8
ZlibCM15 = 15
)
// A Channel is a set of batches that are split into at least one, but possibly multiple frames. // A Channel is a set of batches that are split into at least one, but possibly multiple frames.
// Frames are allowed to be ingested out of order. // Frames are allowed to be ingested out of order.
// Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the // Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the
...@@ -151,17 +158,44 @@ func (ch *Channel) Reader() io.Reader { ...@@ -151,17 +158,44 @@ func (ch *Channel) Reader() io.Reader {
// The L1Inclusion block is also provided at creation time. // The L1Inclusion block is also provided at creation time.
// Warning: the batch reader can read every batch-type. // Warning: the batch reader can read every batch-type.
// The caller of the batch-reader should filter the results. // The caller of the batch-reader should filter the results.
func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64) (func() (*BatchData, error), error) { func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func() (*BatchData, error), error) {
// Setup decompressor stage + RLP reader // use buffered reader so can peek the first byte
zr, err := zlib.NewReader(r) bufReader := bufio.NewReader(r)
compressionType, err := bufReader.Peek(1)
if err != nil {
return nil, err
}
var zr io.Reader
// For zlib, the last 4 bits must be either 8 or 15 (both are reserved value)
if compressionType[0]&0x0F == ZlibCM8 || compressionType[0]&0x0F == ZlibCM15 {
var err error
zr, err = zlib.NewReader(bufReader)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// If the bits equal to 1, then it is a brotli reader
} else if compressionType[0] == ChannelVersionBrotli {
// If before Fjord, we cannot accept brotli compressed batch
if !isFjord {
return nil, fmt.Errorf("cannot accept brotli compressed batch before Fjord")
}
// discard the first byte
_, err := bufReader.Discard(1)
if err != nil {
return nil, err
}
zr = brotli.NewReader(bufReader)
} else {
return nil, fmt.Errorf("cannot distinguish the compression algo used given type byte %v", compressionType[0])
}
// Setup decompressor stage + RLP reader
rlpReader := rlp.NewStream(zr, maxRLPBytesPerChannel) rlpReader := rlp.NewStream(zr, maxRLPBytesPerChannel)
// Read each batch iteratively // Read each batch iteratively
return func() (*BatchData, error) { return func() (*BatchData, error) {
var batchData BatchData var batchData BatchData
if err = rlpReader.Decode(&batchData); err != nil { if err := rlpReader.Decode(&batchData); err != nil {
return nil, err return nil, err
} }
return &batchData, nil return &batchData, nil
......
package derive
import (
"bytes"
"compress/zlib"
"fmt"
"io"
"github.com/andybalholm/brotli"
)
const (
ChannelVersionBrotli byte = 0x01
)
type ChannelCompressor interface {
Write([]byte) (int, error)
Flush() error
Close() error
Reset()
Len() int
Read([]byte) (int, error)
GetCompressed() *bytes.Buffer
}
type CompressorWriter interface {
Write([]byte) (int, error)
Flush() error
Close() error
Reset(io.Writer)
}
type BaseChannelCompressor struct {
compressed *bytes.Buffer
CompressorWriter
}
func (bcc *BaseChannelCompressor) Len() int {
return bcc.compressed.Len()
}
func (bcc *BaseChannelCompressor) Read(p []byte) (int, error) {
return bcc.compressed.Read(p)
}
func (bcc *BaseChannelCompressor) GetCompressed() *bytes.Buffer {
return bcc.compressed
}
type ZlibCompressor struct {
BaseChannelCompressor
}
func (zc *ZlibCompressor) Reset() {
zc.compressed.Reset()
zc.CompressorWriter.Reset(zc.compressed)
}
type BrotliCompressor struct {
BaseChannelCompressor
}
func (bc *BrotliCompressor) Reset() {
bc.compressed.Reset()
bc.compressed.WriteByte(ChannelVersionBrotli)
bc.CompressorWriter.Reset(bc.compressed)
}
func NewChannelCompressor(algo CompressionAlgo) (ChannelCompressor, error) {
compressed := &bytes.Buffer{}
if algo == Zlib {
writer, err := zlib.NewWriterLevel(compressed, zlib.BestCompression)
if err != nil {
return nil, err
}
return &ZlibCompressor{
BaseChannelCompressor{
CompressorWriter: writer,
compressed: compressed,
},
}, nil
} else if algo.IsBrotli() {
compressed.WriteByte(ChannelVersionBrotli)
writer := brotli.NewWriterLevel(compressed, GetBrotliLevel(algo))
return &BrotliCompressor{
BaseChannelCompressor{
CompressorWriter: writer,
compressed: compressed,
},
}, nil
} else {
return nil, fmt.Errorf("unsupported compression algorithm: %s", algo)
}
}
package derive
import (
"math/rand"
"testing"
"github.com/stretchr/testify/require"
)
var r = rand.New(rand.NewSource(99))
func randomBytes(length int) []byte {
b := make([]byte, length)
_, err := r.Read(b)
// Rand.Read always returns nil error
if err != nil {
panic(err)
}
return b
}
func TestChannelCompressor_NewReset(t *testing.T) {
testCases := []struct {
name string
algo CompressionAlgo
expectedResetSize int
expectErr bool
}{
{
name: "zlib",
algo: Zlib,
expectedResetSize: 0,
},
{
name: "brotli10",
algo: Brotli10,
expectedResetSize: 1,
},
{
name: "zstd",
algo: CompressionAlgo("zstd"),
expectedResetSize: 0,
expectErr: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
scc, err := NewChannelCompressor(tc.algo)
if tc.expectErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedResetSize, scc.Len())
_, err = scc.Write(randomBytes(10))
require.NoError(t, err)
err = scc.Flush()
require.NoError(t, err)
require.Greater(t, scc.Len(), tc.expectedResetSize)
scc.Reset()
require.Equal(t, tc.expectedResetSize, scc.Len())
})
}
}
...@@ -44,7 +44,7 @@ func (cr *ChannelInReader) Origin() eth.L1BlockRef { ...@@ -44,7 +44,7 @@ func (cr *ChannelInReader) Origin() eth.L1BlockRef {
// TODO: Take full channel for better logging // TODO: Take full channel for better logging
func (cr *ChannelInReader) WriteChannel(data []byte) error { func (cr *ChannelInReader) WriteChannel(data []byte) error {
if f, err := BatchReader(bytes.NewBuffer(data), cr.spec.MaxRLPBytesPerChannel(cr.prev.Origin().Time)); err == nil { if f, err := BatchReader(bytes.NewBuffer(data), cr.spec.MaxRLPBytesPerChannel(cr.prev.Origin().Time), cr.cfg.IsFjord(cr.prev.Origin().Time)); err == nil {
cr.nextBatchFn = f cr.nextBatchFn = f
cr.metrics.RecordChannelInputBytes(len(data)) cr.metrics.RecordChannelInputBytes(len(data))
return nil return nil
......
...@@ -52,7 +52,7 @@ var channelTypes = []struct { ...@@ -52,7 +52,7 @@ var channelTypes = []struct {
{ {
Name: "Span", Name: "Span",
ChannelOut: func(t *testing.T) ChannelOut { ChannelOut: func(t *testing.T) ChannelOut {
cout, err := NewSpanChannelOut(0, big.NewInt(0), 128_000) cout, err := NewSpanChannelOut(0, big.NewInt(0), 128_000, Zlib)
require.NoError(t, err) require.NoError(t, err)
return cout return cout
}, },
...@@ -113,7 +113,7 @@ func TestOutputFrameNoEmptyLastFrame(t *testing.T) { ...@@ -113,7 +113,7 @@ func TestOutputFrameNoEmptyLastFrame(t *testing.T) {
// depending on the channel type, determine the size of the written data // depending on the channel type, determine the size of the written data
if span, ok := cout.(*SpanChannelOut); ok { if span, ok := cout.(*SpanChannelOut); ok {
written = uint64(span.compressed.Len()) written = uint64(span.compressor.Len())
} else if singular, ok := cout.(*SingularChannelOut); ok { } else if singular, ok := cout.(*SingularChannelOut); ok {
written = uint64(singular.compress.Len()) written = uint64(singular.compress.Len())
} }
...@@ -220,12 +220,12 @@ func TestBlockToBatchValidity(t *testing.T) { ...@@ -220,12 +220,12 @@ func TestBlockToBatchValidity(t *testing.T) {
require.ErrorContains(t, err, "has no transactions") require.ErrorContains(t, err, "has no transactions")
} }
func SpanChannelAndBatches(t *testing.T, target uint64, len int) (*SpanChannelOut, []*SingularBatch) { func SpanChannelAndBatches(t *testing.T, target uint64, len int, algo CompressionAlgo) (*SpanChannelOut, []*SingularBatch) {
// target is larger than one batch, but smaller than two batches // target is larger than one batch, but smaller than two batches
rng := rand.New(rand.NewSource(0x543331)) rng := rand.New(rand.NewSource(0x543331))
chainID := big.NewInt(rng.Int63n(1000)) chainID := big.NewInt(rng.Int63n(1000))
txCount := 1 txCount := 1
cout, err := NewSpanChannelOut(0, chainID, target) cout, err := NewSpanChannelOut(0, chainID, target, algo)
require.NoError(t, err) require.NoError(t, err)
batches := make([]*SingularBatch, len) batches := make([]*SingularBatch, len)
// adding the first batch should not cause an error // adding the first batch should not cause an error
...@@ -237,14 +237,33 @@ func SpanChannelAndBatches(t *testing.T, target uint64, len int) (*SpanChannelOu ...@@ -237,14 +237,33 @@ func SpanChannelAndBatches(t *testing.T, target uint64, len int) (*SpanChannelOu
return cout, batches return cout, batches
} }
func TestSpanChannelOut(t *testing.T) {
tests := []struct {
name string
f func(t *testing.T, algo CompressionAlgo)
}{
{"SpanChannelOutCompressionOnlyOneBatch", SpanChannelOutCompressionOnlyOneBatch},
{"SpanChannelOutCompressionUndo", SpanChannelOutCompressionUndo},
{"SpanChannelOutClose", SpanChannelOutClose},
}
for _, test := range tests {
test := test
for _, algo := range CompressionAlgoTypes {
t.Run(test.name+"_"+algo.String(), func(t *testing.T) {
test.f(t, algo)
})
}
}
}
// TestSpanChannelOutCompressionOnlyOneBatch tests that the SpanChannelOut compression works as expected when there is only one batch // TestSpanChannelOutCompressionOnlyOneBatch tests that the SpanChannelOut compression works as expected when there is only one batch
// and it is larger than the target size. The single batch should be compressed, and the channel should now be full // and it is larger than the target size. The single batch should be compressed, and the channel should now be full
func TestSpanChannelOutCompressionOnlyOneBatch(t *testing.T) { func SpanChannelOutCompressionOnlyOneBatch(t *testing.T, algo CompressionAlgo) {
cout, singularBatches := SpanChannelAndBatches(t, 300, 2) cout, singularBatches := SpanChannelAndBatches(t, 300, 2, algo)
err := cout.AddSingularBatch(singularBatches[0], 0) err := cout.AddSingularBatch(singularBatches[0], 0)
// confirm compression was not skipped // confirm compression was not skipped
require.Greater(t, cout.compressed.Len(), 0) require.Greater(t, cout.compressor.Len(), 0)
require.NoError(t, err) require.NoError(t, err)
// confirm the channel is full // confirm the channel is full
...@@ -256,21 +275,25 @@ func TestSpanChannelOutCompressionOnlyOneBatch(t *testing.T) { ...@@ -256,21 +275,25 @@ func TestSpanChannelOutCompressionOnlyOneBatch(t *testing.T) {
} }
// TestSpanChannelOutCompressionUndo tests that the SpanChannelOut compression rejects a batch that would cause the channel to be overfull // TestSpanChannelOutCompressionUndo tests that the SpanChannelOut compression rejects a batch that would cause the channel to be overfull
func TestSpanChannelOutCompressionUndo(t *testing.T) { func SpanChannelOutCompressionUndo(t *testing.T, algo CompressionAlgo) {
// target is larger than one batch, but smaller than two batches // target is larger than one batch, but smaller than two batches
cout, singularBatches := SpanChannelAndBatches(t, 750, 2) cout, singularBatches := SpanChannelAndBatches(t, 750, 2, algo)
err := cout.AddSingularBatch(singularBatches[0], 0) err := cout.AddSingularBatch(singularBatches[0], 0)
require.NoError(t, err) require.NoError(t, err)
// confirm that the first compression was skipped // confirm that the first compression was skipped
require.Equal(t, 0, cout.compressed.Len()) if algo == Zlib {
require.Equal(t, 0, cout.compressor.Len())
} else {
require.Equal(t, 1, cout.compressor.Len()) // 1 because of brotli channel version
}
// record the RLP length to confirm it doesn't change when adding a rejected batch // record the RLP length to confirm it doesn't change when adding a rejected batch
rlp1 := cout.activeRLP().Len() rlp1 := cout.activeRLP().Len()
err = cout.AddSingularBatch(singularBatches[1], 0) err = cout.AddSingularBatch(singularBatches[1], 0)
require.ErrorIs(t, err, ErrCompressorFull) require.ErrorIs(t, err, ErrCompressorFull)
// confirm that the second compression was not skipped // confirm that the second compression was not skipped
require.Greater(t, cout.compressed.Len(), 0) require.Greater(t, cout.compressor.Len(), 0)
// confirm that the second rlp is tht same size as the first (because the second batch was not added) // confirm that the second rlp is tht same size as the first (because the second batch was not added)
require.Equal(t, rlp1, cout.activeRLP().Len()) require.Equal(t, rlp1, cout.activeRLP().Len())
...@@ -278,14 +301,19 @@ func TestSpanChannelOutCompressionUndo(t *testing.T) { ...@@ -278,14 +301,19 @@ func TestSpanChannelOutCompressionUndo(t *testing.T) {
// TestSpanChannelOutClose tests that the SpanChannelOut compression works as expected when the channel is closed. // TestSpanChannelOutClose tests that the SpanChannelOut compression works as expected when the channel is closed.
// it should compress the batch even if it is smaller than the target size because the channel is closing // it should compress the batch even if it is smaller than the target size because the channel is closing
func TestSpanChannelOutClose(t *testing.T) { func SpanChannelOutClose(t *testing.T, algo CompressionAlgo) {
target := uint64(600) target := uint64(600)
cout, singularBatches := SpanChannelAndBatches(t, target, 1) cout, singularBatches := SpanChannelAndBatches(t, target, 1, algo)
err := cout.AddSingularBatch(singularBatches[0], 0) err := cout.AddSingularBatch(singularBatches[0], 0)
require.NoError(t, err) require.NoError(t, err)
// confirm no compression has happened yet // confirm no compression has happened yet
require.Equal(t, 0, cout.compressed.Len())
if algo == Zlib {
require.Equal(t, 0, cout.compressor.Len())
} else {
require.Equal(t, 1, cout.compressor.Len()) // 1 because of brotli channel version
}
// confirm the RLP length is less than the target // confirm the RLP length is less than the target
rlpLen := cout.activeRLP().Len() rlpLen := cout.activeRLP().Len()
...@@ -295,6 +323,6 @@ func TestSpanChannelOutClose(t *testing.T) { ...@@ -295,6 +323,6 @@ func TestSpanChannelOutClose(t *testing.T) {
require.NoError(t, cout.Close()) require.NoError(t, cout.Close())
// confirm that the only batch was compressed, and that the RLP did not change // confirm that the only batch was compressed, and that the RLP did not change
require.Greater(t, cout.compressed.Len(), 0) require.Greater(t, cout.compressor.Len(), 0)
require.Equal(t, rlpLen, cout.activeRLP().Len()) require.Equal(t, rlpLen, cout.activeRLP().Len())
} }
package derive package derive
import ( import (
"bytes"
"compress/zlib"
"math/big"
"math/rand"
"testing" "testing"
"github.com/DataDog/zstd"
"github.com/andybalholm/brotli"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -99,3 +105,117 @@ func TestFrameValidity(t *testing.T) { ...@@ -99,3 +105,117 @@ func TestFrameValidity(t *testing.T) {
t.Run(tc.name, tc.Run) t.Run(tc.name, tc.Run)
} }
} }
func TestBatchReader(t *testing.T) {
// Get batch data
rng := rand.New(rand.NewSource(0x543331))
singularBatch := RandomSingularBatch(rng, 20, big.NewInt(333))
batchDataInput := NewBatchData(singularBatch)
encodedBatch := &bytes.Buffer{}
err := batchDataInput.EncodeRLP(encodedBatch)
require.NoError(t, err)
var testCases = []struct {
name string
algo func(buf *bytes.Buffer, t *testing.T)
isFjord bool
expectErr bool
}{
{
name: "zlib-post-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) {
writer := zlib.NewWriter(buf)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: true,
},
{
name: "zlib-pre-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) {
writer := zlib.NewWriter(buf)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: false,
},
{
name: "brotli9-post-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) {
buf.WriteByte(ChannelVersionBrotli)
writer := brotli.NewWriterLevel(buf, 9)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: true,
},
{
name: "brotli9-pre-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) {
buf.WriteByte(ChannelVersionBrotli)
writer := brotli.NewWriterLevel(buf, 9)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: false,
expectErr: true, // expect an error because brotli is not supported before Fjord
},
{
name: "brotli10-post-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) {
buf.WriteByte(ChannelVersionBrotli)
writer := brotli.NewWriterLevel(buf, 10)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: true,
},
{
name: "brotli11-post-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) {
buf.WriteByte(ChannelVersionBrotli)
writer := brotli.NewWriterLevel(buf, 11)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: true,
},
{
name: "zstd-post-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) {
writer := zstd.NewWriter(buf)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
expectErr: true,
isFjord: true,
}}
for _, tc := range testCases {
compressed := new(bytes.Buffer)
tc := tc
t.Run(tc.name, func(t *testing.T) {
tc.algo(compressed, t)
reader, err := BatchReader(bytes.NewReader(compressed.Bytes()), 120000, tc.isFjord)
if tc.expectErr {
require.Error(t, err)
return
}
require.NoError(t, err)
// read the batch data
batchData, err := reader()
require.NoError(t, err)
require.NotNil(t, batchData)
require.Equal(t, batchDataInput, batchData)
})
}
}
...@@ -2,7 +2,7 @@ package derive ...@@ -2,7 +2,7 @@ package derive
import ( import (
"bytes" "bytes"
"compress/zlib"
"crypto/rand" "crypto/rand"
"fmt" "fmt"
"io" "io"
...@@ -26,10 +26,8 @@ type SpanChannelOut struct { ...@@ -26,10 +26,8 @@ type SpanChannelOut struct {
// lastCompressedRLPSize tracks the *uncompressed* size of the last RLP buffer that was compressed // lastCompressedRLPSize tracks the *uncompressed* size of the last RLP buffer that was compressed
// it is used to measure the growth of the RLP buffer when adding a new batch to optimize compression // it is used to measure the growth of the RLP buffer when adding a new batch to optimize compression
lastCompressedRLPSize int lastCompressedRLPSize int
// compressed contains compressed data for making output frames // the compressor for the channel
compressed *bytes.Buffer compressor ChannelCompressor
// compress is the zlib writer for the channel
compressor *zlib.Writer
// target is the target size of the compressed data // target is the target size of the compressed data
target uint64 target uint64
// closed indicates if the channel is closed // closed indicates if the channel is closed
...@@ -49,22 +47,23 @@ func (co *SpanChannelOut) setRandomID() error { ...@@ -49,22 +47,23 @@ func (co *SpanChannelOut) setRandomID() error {
return err return err
} }
func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64) (*SpanChannelOut, error) { func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64, compressionAlgo CompressionAlgo) (*SpanChannelOut, error) {
c := &SpanChannelOut{ c := &SpanChannelOut{
id: ChannelID{}, id: ChannelID{},
frame: 0, frame: 0,
spanBatch: NewSpanBatch(genesisTimestamp, chainID), spanBatch: NewSpanBatch(genesisTimestamp, chainID),
rlp: [2]*bytes.Buffer{{}, {}}, rlp: [2]*bytes.Buffer{{}, {}},
compressed: &bytes.Buffer{},
target: targetOutputSize, target: targetOutputSize,
} }
var err error var err error
if err = c.setRandomID(); err != nil { if err = c.setRandomID(); err != nil {
return nil, err return nil, err
} }
if c.compressor, err = zlib.NewWriterLevel(c.compressed, zlib.BestCompression); err != nil {
if c.compressor, err = NewChannelCompressor(compressionAlgo); err != nil {
return nil, err return nil, err
} }
return c, nil return c, nil
} }
...@@ -75,8 +74,7 @@ func (co *SpanChannelOut) Reset() error { ...@@ -75,8 +74,7 @@ func (co *SpanChannelOut) Reset() error {
co.rlp[0].Reset() co.rlp[0].Reset()
co.rlp[1].Reset() co.rlp[1].Reset()
co.lastCompressedRLPSize = 0 co.lastCompressedRLPSize = 0
co.compressed.Reset() co.compressor.Reset()
co.compressor.Reset(co.compressed)
co.spanBatch = NewSpanBatch(co.spanBatch.GenesisTimestamp, co.spanBatch.ChainID) co.spanBatch = NewSpanBatch(co.spanBatch.GenesisTimestamp, co.spanBatch.ChainID)
// setting the new randomID is the only part of the reset that can fail // setting the new randomID is the only part of the reset that can fail
return co.setRandomID() return co.setRandomID()
...@@ -153,7 +151,7 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) ...@@ -153,7 +151,7 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
// if the compressed data *plus* the new rlp data is under the target size, return early // if the compressed data *plus* the new rlp data is under the target size, return early
// this optimizes out cases where the compressor will obviously come in under the target size // this optimizes out cases where the compressor will obviously come in under the target size
rlpGrowth := co.activeRLP().Len() - co.lastCompressedRLPSize rlpGrowth := co.activeRLP().Len() - co.lastCompressedRLPSize
if uint64(co.compressed.Len()+rlpGrowth) < co.target { if uint64(co.compressor.Len()+rlpGrowth) < co.target {
return nil return nil
} }
...@@ -186,8 +184,7 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) ...@@ -186,8 +184,7 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
// compress compresses the active RLP buffer and checks if the compressed data is over the target size. // compress compresses the active RLP buffer and checks if the compressed data is over the target size.
// it resets all the compression buffers because Span Batches aren't meant to be compressed incrementally. // it resets all the compression buffers because Span Batches aren't meant to be compressed incrementally.
func (co *SpanChannelOut) compress() error { func (co *SpanChannelOut) compress() error {
co.compressed.Reset() co.compressor.Reset()
co.compressor.Reset(co.compressed)
if _, err := co.compressor.Write(co.activeRLP().Bytes()); err != nil { if _, err := co.compressor.Write(co.activeRLP().Bytes()); err != nil {
return err return err
} }
...@@ -207,7 +204,7 @@ func (co *SpanChannelOut) InputBytes() int { ...@@ -207,7 +204,7 @@ func (co *SpanChannelOut) InputBytes() int {
// Span Channel Out does not provide early output, so this will always be 0 until the channel is closed or full // Span Channel Out does not provide early output, so this will always be 0 until the channel is closed or full
func (co *SpanChannelOut) ReadyBytes() int { func (co *SpanChannelOut) ReadyBytes() int {
if co.closed || co.FullErr() != nil { if co.closed || co.FullErr() != nil {
return co.compressed.Len() return co.compressor.Len()
} }
return 0 return 0
} }
...@@ -225,7 +222,7 @@ func (co *SpanChannelOut) checkFull() { ...@@ -225,7 +222,7 @@ func (co *SpanChannelOut) checkFull() {
if co.full != nil { if co.full != nil {
return return
} }
if uint64(co.compressed.Len()) >= co.target { if uint64(co.compressor.Len()) >= co.target {
co.full = ErrCompressorFull co.full = ErrCompressorFull
} }
} }
...@@ -264,7 +261,7 @@ func (co *SpanChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, ...@@ -264,7 +261,7 @@ func (co *SpanChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16,
f := createEmptyFrame(co.id, co.frame, co.ReadyBytes(), co.closed, maxSize) f := createEmptyFrame(co.id, co.frame, co.ReadyBytes(), co.closed, maxSize)
if _, err := io.ReadFull(co.compressed, f.Data); err != nil { if _, err := io.ReadFull(co.compressor.GetCompressed(), f.Data); err != nil {
return 0, err return 0, err
} }
......
package derive
import (
"fmt"
"regexp"
)
type CompressionAlgo string
const (
// compression algo types
Zlib CompressionAlgo = "zlib"
Brotli9 CompressionAlgo = "brotli-9"
Brotli10 CompressionAlgo = "brotli-10"
Brotli11 CompressionAlgo = "brotli-11"
)
var CompressionAlgoTypes = []CompressionAlgo{
Zlib,
Brotli9,
Brotli10,
Brotli11,
}
var brotliRegexp = regexp.MustCompile(`^brotli-(9|10|11)$`)
func (algo CompressionAlgo) String() string {
return string(algo)
}
func (algo *CompressionAlgo) Set(value string) error {
if !ValidCompressionAlgoType(CompressionAlgo(value)) {
return fmt.Errorf("unknown compression algo type: %q", value)
}
*algo = CompressionAlgo(value)
return nil
}
func (algo *CompressionAlgo) Clone() any {
cpy := *algo
return &cpy
}
func (algo *CompressionAlgo) IsBrotli() bool {
return brotliRegexp.MatchString(algo.String())
}
func GetBrotliLevel(algo CompressionAlgo) int {
switch algo {
case Brotli9:
return 9
case Brotli10:
return 10
case Brotli11:
return 11
default:
panic("Unsupported brotli level")
}
}
func ValidCompressionAlgoType(value CompressionAlgo) bool {
for _, k := range CompressionAlgoTypes {
if k == value {
return true
}
}
return false
}
package derive
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestCompressionAlgo(t *testing.T) {
testCases := []struct {
name string
algo CompressionAlgo
isBrotli bool
isValidCompressionAlgoType bool
}{
{
name: "zlib",
algo: Zlib,
isBrotli: false,
isValidCompressionAlgoType: true,
},
{
name: "brotli-9",
algo: Brotli9,
isBrotli: true,
isValidCompressionAlgoType: true,
},
{
name: "brotli-10",
algo: Brotli10,
isBrotli: true,
isValidCompressionAlgoType: true,
},
{
name: "brotli-11",
algo: Brotli11,
isBrotli: true,
isValidCompressionAlgoType: true,
},
{
name: "invalid",
algo: CompressionAlgo("invalid"),
isBrotli: false,
isValidCompressionAlgoType: false,
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.isBrotli, tc.algo.IsBrotli())
if tc.isBrotli {
require.NotPanics(t, func() { GetBrotliLevel((tc.algo)) })
} else {
require.Panics(t, func() { GetBrotliLevel(tc.algo) })
}
require.Equal(t, tc.isValidCompressionAlgoType, ValidCompressionAlgoType(tc.algo))
})
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment