Commit 01d3a171 authored by Sebastian Stammler's avatar Sebastian Stammler Committed by GitHub

op-batcher,op-node,batch_decoder: add logging of compression algo (#10589)

* op-batcher: add logging of compression algo

* add node logging of algo, add brotli default algo

* fix typos

* only log compression algo if present

* add type conversion abstraction to Batch interface

Since we're dealing now with wrapped types around Batch implementations,
this let's us transparently unwrap the underlying batch types. It makes
sense to add this to the interface, because getting the underlying types
from the interface is done in several places, so it's part of the
interface's contract.

* adapt BatchReader test
parent 3b374c29
...@@ -430,7 +430,7 @@ func TestChannelBuilder_OutputFrames(t *testing.T) { ...@@ -430,7 +430,7 @@ func TestChannelBuilder_OutputFrames(t *testing.T) {
} }
func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) { func TestChannelBuilder_OutputFrames_SpanBatch(t *testing.T) {
for _, algo := range derive.CompressionAlgoTypes { for _, algo := range derive.CompressionAlgos {
t.Run("ChannelBuilder_OutputFrames_SpanBatch_"+algo.String(), func(t *testing.T) { t.Run("ChannelBuilder_OutputFrames_SpanBatch_"+algo.String(), func(t *testing.T) {
if algo.IsBrotli() { if algo.IsBrotli() {
ChannelBuilder_OutputFrames_SpanBatch(t, algo) // to fill faster for brotli ChannelBuilder_OutputFrames_SpanBatch(t, algo) // to fill faster for brotli
......
...@@ -204,7 +204,6 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { ...@@ -204,7 +204,6 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
} }
pc, err := newChannel(s.log, s.metr, s.cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number) pc, err := newChannel(s.log, s.metr, s.cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)
if err != nil { if err != nil {
return fmt.Errorf("creating new channel: %w", err) return fmt.Errorf("creating new channel: %w", err)
} }
...@@ -218,6 +217,8 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { ...@@ -218,6 +217,8 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
"l1OriginLastClosedChannel", s.l1OriginLastClosedChannel, "l1OriginLastClosedChannel", s.l1OriginLastClosedChannel,
"blocks_pending", len(s.blocks), "blocks_pending", len(s.blocks),
"batch_type", s.cfg.BatchType, "batch_type", s.cfg.BatchType,
"compression_algo", s.cfg.CompressorConfig.CompressionAlgo,
"target_num_frames", s.cfg.TargetNumFrames,
"max_frame_size", s.cfg.MaxFrameSize, "max_frame_size", s.cfg.MaxFrameSize,
) )
s.metr.RecordChannelOpened(pc.ID(), len(s.blocks)) s.metr.RecordChannelOpened(pc.ID(), len(s.blocks))
......
...@@ -128,7 +128,7 @@ func (c *CLIConfig) Check() error { ...@@ -128,7 +128,7 @@ func (c *CLIConfig) Check() error {
if c.Compressor == compressor.RatioKind && (c.ApproxComprRatio <= 0 || c.ApproxComprRatio > 1) { if c.Compressor == compressor.RatioKind && (c.ApproxComprRatio <= 0 || c.ApproxComprRatio > 1) {
return fmt.Errorf("invalid ApproxComprRatio %v for ratio compressor", c.ApproxComprRatio) return fmt.Errorf("invalid ApproxComprRatio %v for ratio compressor", c.ApproxComprRatio)
} }
if !derive.ValidCompressionAlgoType(c.CompressionAlgo) { if !derive.ValidCompressionAlgo(c.CompressionAlgo) {
return fmt.Errorf("invalid compression algo %v", c.CompressionAlgo) return fmt.Errorf("invalid compression algo %v", c.CompressionAlgo)
} }
if c.BatchType > 1 { if c.BatchType > 1 {
......
...@@ -242,9 +242,10 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { ...@@ -242,9 +242,10 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
"max_frame_size", cc.MaxFrameSize, "max_frame_size", cc.MaxFrameSize,
"target_num_frames", cc.TargetNumFrames, "target_num_frames", cc.TargetNumFrames,
"compressor", cc.CompressorConfig.Kind, "compressor", cc.CompressorConfig.Kind,
"compression_algo", cc.CompressorConfig.CompressionAlgo,
"batch_type", cc.BatchType,
"max_channel_duration", cc.MaxChannelDuration, "max_channel_duration", cc.MaxChannelDuration,
"channel_timeout", cc.ChannelTimeout, "channel_timeout", cc.ChannelTimeout,
"batch_type", cc.BatchType,
"sub_safety_margin", cc.SubSafetyMargin) "sub_safety_margin", cc.SubSafetyMargin)
bs.ChannelConfig = cc bs.ChannelConfig = cc
return nil return nil
......
...@@ -102,7 +102,7 @@ var ( ...@@ -102,7 +102,7 @@ var (
} }
CompressionAlgoFlag = &cli.GenericFlag{ CompressionAlgoFlag = &cli.GenericFlag{
Name: "compression-algo", Name: "compression-algo",
Usage: "The compression algorithm to use. Valid options: " + openum.EnumString(derive.CompressionAlgoTypes), Usage: "The compression algorithm to use. Valid options: " + openum.EnumString(derive.CompressionAlgos),
EnvVars: prefixEnvVars("COMPRESSION_ALGO"), EnvVars: prefixEnvVars("COMPRESSION_ALGO"),
Value: func() *derive.CompressionAlgo { Value: func() *derive.CompressionAlgo {
out := derive.Zlib out := derive.Zlib
......
...@@ -48,7 +48,7 @@ var ( ...@@ -48,7 +48,7 @@ var (
derive.SpanBatchType, derive.SpanBatchType,
// uncomment to include singular batches in the benchmark // uncomment to include singular batches in the benchmark
// singular batches are not included by default because they are not the target of the benchmark // singular batches are not included by default because they are not the target of the benchmark
//derive.SingularBatchType, // derive.SingularBatchType,
} }
) )
...@@ -129,7 +129,7 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) { ...@@ -129,7 +129,7 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
// to leverage optimizations in the Batch Linked List // to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix()) batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix())
} }
for _, algo := range derive.CompressionAlgoTypes { for _, algo := range derive.CompressionAlgos {
b.Run(tc.String()+"_"+algo.String(), func(b *testing.B) { b.Run(tc.String()+"_"+algo.String(), func(b *testing.B) {
// reset the compressor used in the test case // reset the compressor used in the test case
for bn := 0; bn < b.N; bn++ { for bn := 0; bn < b.N; bn++ {
...@@ -168,7 +168,7 @@ func BenchmarkIncremental(b *testing.B) { ...@@ -168,7 +168,7 @@ func BenchmarkIncremental(b *testing.B) {
{derive.SpanBatchType, 5, 1, "RealBlindCompressor"}, {derive.SpanBatchType, 5, 1, "RealBlindCompressor"},
//{derive.SingularBatchType, 100, 1, "RealShadowCompressor"}, //{derive.SingularBatchType, 100, 1, "RealShadowCompressor"},
} }
for _, algo := range derive.CompressionAlgoTypes { for _, algo := range derive.CompressionAlgos {
for _, tc := range tcs { for _, tc := range tcs {
cout, err := channelOutByType(tc.BatchType, tc.compKey, algo) cout, err := channelOutByType(tc.BatchType, tc.compKey, algo)
if err != nil { if err != nil {
...@@ -231,7 +231,7 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) { ...@@ -231,7 +231,7 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
} }
} }
for _, algo := range derive.CompressionAlgoTypes { for _, algo := range derive.CompressionAlgos {
for _, tc := range tests { for _, tc := range tests {
chainID := big.NewInt(333) chainID := big.NewInt(333)
rng := rand.New(rand.NewSource(0x543331)) rng := rand.New(rand.NewSource(0x543331))
......
...@@ -18,13 +18,14 @@ import ( ...@@ -18,13 +18,14 @@ import (
) )
type ChannelWithMetadata struct { type ChannelWithMetadata struct {
ID derive.ChannelID `json:"id"` ID derive.ChannelID `json:"id"`
IsReady bool `json:"is_ready"` IsReady bool `json:"is_ready"`
InvalidFrames bool `json:"invalid_frames"` InvalidFrames bool `json:"invalid_frames"`
InvalidBatches bool `json:"invalid_batches"` InvalidBatches bool `json:"invalid_batches"`
Frames []FrameWithMetadata `json:"frames"` Frames []FrameWithMetadata `json:"frames"`
Batches []derive.Batch `json:"batches"` Batches []derive.Batch `json:"batches"`
BatchTypes []int `json:"batch_types"` BatchTypes []int `json:"batch_types"`
ComprAlgos []derive.CompressionAlgo `json:"compr_alogs"`
} }
type FrameWithMetadata struct { type FrameWithMetadata struct {
...@@ -54,7 +55,6 @@ func LoadFrames(directory string, inbox common.Address) []FrameWithMetadata { ...@@ -54,7 +55,6 @@ func LoadFrames(directory string, inbox common.Address) []FrameWithMetadata {
} else { } else {
return txns[i].BlockNumber < txns[j].BlockNumber return txns[i].BlockNumber < txns[j].BlockNumber
} }
}) })
return transactionsToFrames(txns) return transactionsToFrames(txns)
} }
...@@ -107,8 +107,12 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr ...@@ -107,8 +107,12 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr
} }
} }
var batches []derive.Batch var (
var batchTypes []int batches []derive.Batch
batchTypes []int
comprAlgos []derive.CompressionAlgo
)
invalidBatches := false invalidBatches := false
if ch.IsReady() { if ch.IsReady() {
br, err := derive.BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(ch.HighestBlock().Time), rollupCfg.IsFjord(ch.HighestBlock().Time)) br, err := derive.BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(ch.HighestBlock().Time), rollupCfg.IsFjord(ch.HighestBlock().Time))
...@@ -118,6 +122,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr ...@@ -118,6 +122,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr
fmt.Printf("Error reading batchData for channel %v. Err: %v\n", id.String(), err) fmt.Printf("Error reading batchData for channel %v. Err: %v\n", id.String(), err)
invalidBatches = true invalidBatches = true
} else { } else {
comprAlgos = append(comprAlgos, batchData.ComprAlgo)
batchType := batchData.GetBatchType() batchType := batchData.GetBatchType()
batchTypes = append(batchTypes, int(batchType)) batchTypes = append(batchTypes, int(batchType))
switch batchType { switch batchType {
...@@ -157,6 +162,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr ...@@ -157,6 +162,7 @@ func processFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, fr
InvalidBatches: invalidBatches, InvalidBatches: invalidBatches,
Batches: batches, Batches: batches,
BatchTypes: batchTypes, BatchTypes: batchTypes,
ComprAlgos: comprAlgos,
} }
} }
......
...@@ -37,6 +37,21 @@ type Batch interface { ...@@ -37,6 +37,21 @@ type Batch interface {
GetBatchType() int GetBatchType() int
GetTimestamp() uint64 GetTimestamp() uint64
LogContext(log.Logger) log.Logger LogContext(log.Logger) log.Logger
AsSingularBatch() (*SingularBatch, bool)
AsSpanBatch() (*SpanBatch, bool)
}
type batchWithMetadata struct {
Batch
comprAlgo CompressionAlgo
}
func (b batchWithMetadata) LogContext(l log.Logger) log.Logger {
lgr := b.Batch.LogContext(l)
if b.comprAlgo == "" {
return lgr
}
return lgr.With("compression_algo", b.comprAlgo)
} }
// BatchData is used to represent the typed encoding & decoding. // BatchData is used to represent the typed encoding & decoding.
...@@ -44,7 +59,8 @@ type Batch interface { ...@@ -44,7 +59,8 @@ type Batch interface {
// Further fields such as cache can be added in the future, without embedding each type of InnerBatchData. // Further fields such as cache can be added in the future, without embedding each type of InnerBatchData.
// Similar design with op-geth's types.Transaction struct. // Similar design with op-geth's types.Transaction struct.
type BatchData struct { type BatchData struct {
inner InnerBatchData inner InnerBatchData
ComprAlgo CompressionAlgo
} }
// InnerBatchData is the underlying data of a BatchData. // InnerBatchData is the underlying data of a BatchData.
......
...@@ -177,15 +177,15 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si ...@@ -177,15 +177,15 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si
} }
var nextBatch *SingularBatch var nextBatch *SingularBatch
switch batch.GetBatchType() { switch typ := batch.GetBatchType(); typ {
case SingularBatchType: case SingularBatchType:
singularBatch, ok := batch.(*SingularBatch) singularBatch, ok := batch.AsSingularBatch()
if !ok { if !ok {
return nil, false, NewCriticalError(errors.New("failed type assertion to SingularBatch")) return nil, false, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
} }
nextBatch = singularBatch nextBatch = singularBatch
case SpanBatchType: case SpanBatchType:
spanBatch, ok := batch.(*SpanBatch) spanBatch, ok := batch.AsSpanBatch()
if !ok { if !ok {
return nil, false, NewCriticalError(errors.New("failed type assertion to SpanBatch")) return nil, false, NewCriticalError(errors.New("failed type assertion to SpanBatch"))
} }
...@@ -198,7 +198,7 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si ...@@ -198,7 +198,7 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si
// span-batches are non-empty, so the below pop is safe. // span-batches are non-empty, so the below pop is safe.
nextBatch = bq.popNextBatch(parent) nextBatch = bq.popNextBatch(parent)
default: default:
return nil, false, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", batch.GetBatchType())) return nil, false, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", typ))
} }
// If the nextBatch is derived from the span batch, len(bq.nextSpan) == 0 means it's the last batch of the span. // If the nextBatch is derived from the span batch, len(bq.nextSpan) == 0 means it's the last batch of the span.
......
...@@ -11,8 +11,8 @@ import ( ...@@ -11,8 +11,8 @@ import (
) )
type BatchWithL1InclusionBlock struct { type BatchWithL1InclusionBlock struct {
Batch
L1InclusionBlock eth.L1BlockRef L1InclusionBlock eth.L1BlockRef
Batch Batch
} }
type BatchValidity uint8 type BatchValidity uint8
...@@ -34,23 +34,23 @@ const ( ...@@ -34,23 +34,23 @@ const (
func CheckBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1BlockRef, func CheckBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1BlockRef,
l2SafeHead eth.L2BlockRef, batch *BatchWithL1InclusionBlock, l2Fetcher SafeBlockFetcher, l2SafeHead eth.L2BlockRef, batch *BatchWithL1InclusionBlock, l2Fetcher SafeBlockFetcher,
) BatchValidity { ) BatchValidity {
switch batch.Batch.GetBatchType() { switch typ := batch.GetBatchType(); typ {
case SingularBatchType: case SingularBatchType:
singularBatch, ok := batch.Batch.(*SingularBatch) singularBatch, ok := batch.AsSingularBatch()
if !ok { if !ok {
log.Error("failed type assertion to SingularBatch") log.Error("failed type assertion to SingularBatch")
return BatchDrop return BatchDrop
} }
return checkSingularBatch(cfg, log, l1Blocks, l2SafeHead, singularBatch, batch.L1InclusionBlock) return checkSingularBatch(cfg, log, l1Blocks, l2SafeHead, singularBatch, batch.L1InclusionBlock)
case SpanBatchType: case SpanBatchType:
spanBatch, ok := batch.Batch.(*SpanBatch) spanBatch, ok := batch.AsSpanBatch()
if !ok { if !ok {
log.Error("failed type assertion to SpanBatch") log.Error("failed type assertion to SpanBatch")
return BatchDrop return BatchDrop
} }
return checkSpanBatch(ctx, cfg, log, l1Blocks, l2SafeHead, spanBatch, batch.L1InclusionBlock, l2Fetcher) return checkSpanBatch(ctx, cfg, log, l1Blocks, l2SafeHead, spanBatch, batch.L1InclusionBlock, l2Fetcher)
default: default:
log.Warn("Unrecognized batch type: %d", batch.Batch.GetBatchType()) log.Warn("Unrecognized batch type: %d", typ)
return BatchDrop return BatchDrop
} }
} }
......
...@@ -167,6 +167,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func( ...@@ -167,6 +167,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
} }
var zr io.Reader var zr io.Reader
var comprAlgo CompressionAlgo
// For zlib, the last 4 bits must be either 8 or 15 (both are reserved value) // For zlib, the last 4 bits must be either 8 or 15 (both are reserved value)
if compressionType[0]&0x0F == ZlibCM8 || compressionType[0]&0x0F == ZlibCM15 { if compressionType[0]&0x0F == ZlibCM8 || compressionType[0]&0x0F == ZlibCM15 {
var err error var err error
...@@ -175,6 +176,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func( ...@@ -175,6 +176,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
return nil, err return nil, err
} }
// If the bits equal to 1, then it is a brotli reader // If the bits equal to 1, then it is a brotli reader
comprAlgo = Zlib
} else if compressionType[0] == ChannelVersionBrotli { } else if compressionType[0] == ChannelVersionBrotli {
// If before Fjord, we cannot accept brotli compressed batch // If before Fjord, we cannot accept brotli compressed batch
if !isFjord { if !isFjord {
...@@ -186,6 +188,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func( ...@@ -186,6 +188,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
return nil, err return nil, err
} }
zr = brotli.NewReader(bufReader) zr = brotli.NewReader(bufReader)
comprAlgo = Brotli
} else { } else {
return nil, fmt.Errorf("cannot distinguish the compression algo used given type byte %v", compressionType[0]) return nil, fmt.Errorf("cannot distinguish the compression algo used given type byte %v", compressionType[0])
} }
...@@ -194,7 +197,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func( ...@@ -194,7 +197,7 @@ func BatchReader(r io.Reader, maxRLPBytesPerChannel uint64, isFjord bool) (func(
rlpReader := rlp.NewStream(zr, maxRLPBytesPerChannel) rlpReader := rlp.NewStream(zr, maxRLPBytesPerChannel)
// Read each batch iteratively // Read each batch iteratively
return func() (*BatchData, error) { return func() (*BatchData, error) {
var batchData BatchData batchData := BatchData{ComprAlgo: comprAlgo}
if err := rlpReader.Decode(&batchData); err != nil { if err := rlpReader.Decode(&batchData); err != nil {
return nil, err return nil, err
} }
......
...@@ -87,15 +87,17 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) { ...@@ -87,15 +87,17 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) {
cr.NextChannel() cr.NextChannel()
return nil, NotEnoughData return nil, NotEnoughData
} }
batch := batchWithMetadata{comprAlgo: batchData.ComprAlgo}
switch batchData.GetBatchType() { switch batchData.GetBatchType() {
case SingularBatchType: case SingularBatchType:
singularBatch, err := GetSingularBatch(batchData) batch.Batch, err = GetSingularBatch(batchData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
singularBatch.LogContext(cr.log).Debug("decoded singular batch from channel", "stage_origin", cr.Origin()) batch.LogContext(cr.log).Debug("decoded singular batch from channel", "stage_origin", cr.Origin())
cr.metrics.RecordDerivedBatches("singular") cr.metrics.RecordDerivedBatches("singular")
return singularBatch, nil return batch, nil
case SpanBatchType: case SpanBatchType:
if origin := cr.Origin(); !cr.cfg.IsDelta(origin.Time) { if origin := cr.Origin(); !cr.cfg.IsDelta(origin.Time) {
// Check hard fork activation with the L1 inclusion block time instead of the L1 origin block time. // Check hard fork activation with the L1 inclusion block time instead of the L1 origin block time.
...@@ -103,13 +105,13 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) { ...@@ -103,13 +105,13 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) {
// This is just for early dropping invalid batches as soon as possible. // This is just for early dropping invalid batches as soon as possible.
return nil, NewTemporaryError(fmt.Errorf("cannot accept span batch in L1 block %s at time %d", origin, origin.Time)) return nil, NewTemporaryError(fmt.Errorf("cannot accept span batch in L1 block %s at time %d", origin, origin.Time))
} }
spanBatch, err := DeriveSpanBatch(batchData, cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID) batch.Batch, err = DeriveSpanBatch(batchData, cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
spanBatch.LogContext(cr.log).Debug("decoded span batch from channel", "stage_origin", cr.Origin()) batch.LogContext(cr.log).Debug("decoded span batch from channel", "stage_origin", cr.Origin())
cr.metrics.RecordDerivedBatches("span") cr.metrics.RecordDerivedBatches("span")
return spanBatch, nil return batch, nil
default: default:
// error is bubbled up to user, but pipeline can skip the batch and continue after. // error is bubbled up to user, but pipeline can skip the batch and continue after.
return nil, NewTemporaryError(fmt.Errorf("unrecognized batch type: %d", batchData.GetBatchType())) return nil, NewTemporaryError(fmt.Errorf("unrecognized batch type: %d", batchData.GetBatchType()))
......
...@@ -15,9 +15,7 @@ import ( ...@@ -15,9 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
) )
var ( var rollupCfg rollup.Config
rollupCfg rollup.Config
)
// basic implementation of the Compressor interface that does no compression // basic implementation of the Compressor interface that does no compression
type nonCompressor struct { type nonCompressor struct {
...@@ -248,7 +246,7 @@ func TestSpanChannelOut(t *testing.T) { ...@@ -248,7 +246,7 @@ func TestSpanChannelOut(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
test := test test := test
for _, algo := range CompressionAlgoTypes { for _, algo := range CompressionAlgos {
t.Run(test.name+"_"+algo.String(), func(t *testing.T) { t.Run(test.name+"_"+algo.String(), func(t *testing.T) {
test.f(t, algo) test.f(t, algo)
}) })
......
...@@ -55,7 +55,8 @@ func TestFrameValidity(t *testing.T) { ...@@ -55,7 +55,8 @@ func TestFrameValidity(t *testing.T) {
name: "double close", name: "double close",
frames: []Frame{ frames: []Frame{
{ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")}, {ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")},
{ID: id, FrameNumber: 1, IsLast: true}}, {ID: id, FrameNumber: 1, IsLast: true},
},
shouldErr: []bool{false, true}, shouldErr: []bool{false, true},
sizes: []uint64{204, 204}, sizes: []uint64{204, 204},
}, },
...@@ -63,7 +64,8 @@ func TestFrameValidity(t *testing.T) { ...@@ -63,7 +64,8 @@ func TestFrameValidity(t *testing.T) {
name: "duplicate frame", name: "duplicate frame",
frames: []Frame{ frames: []Frame{
{ID: id, FrameNumber: 2, Data: []byte("four")}, {ID: id, FrameNumber: 2, Data: []byte("four")},
{ID: id, FrameNumber: 2, Data: []byte("seven__")}}, {ID: id, FrameNumber: 2, Data: []byte("seven__")},
},
shouldErr: []bool{false, true}, shouldErr: []bool{false, true},
sizes: []uint64{204, 204}, sizes: []uint64{204, 204},
}, },
...@@ -71,7 +73,8 @@ func TestFrameValidity(t *testing.T) { ...@@ -71,7 +73,8 @@ func TestFrameValidity(t *testing.T) {
name: "duplicate closing frames", name: "duplicate closing frames",
frames: []Frame{ frames: []Frame{
{ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")}, {ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")},
{ID: id, FrameNumber: 2, IsLast: true, Data: []byte("seven__")}}, {ID: id, FrameNumber: 2, IsLast: true, Data: []byte("seven__")},
},
shouldErr: []bool{false, true}, shouldErr: []bool{false, true},
sizes: []uint64{204, 204}, sizes: []uint64{204, 204},
}, },
...@@ -79,7 +82,8 @@ func TestFrameValidity(t *testing.T) { ...@@ -79,7 +82,8 @@ func TestFrameValidity(t *testing.T) {
name: "frame past closing", name: "frame past closing",
frames: []Frame{ frames: []Frame{
{ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")}, {ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")},
{ID: id, FrameNumber: 10, Data: []byte("seven__")}}, {ID: id, FrameNumber: 10, Data: []byte("seven__")},
},
shouldErr: []bool{false, true}, shouldErr: []bool{false, true},
sizes: []uint64{204, 204}, sizes: []uint64{204, 204},
}, },
...@@ -87,7 +91,8 @@ func TestFrameValidity(t *testing.T) { ...@@ -87,7 +91,8 @@ func TestFrameValidity(t *testing.T) {
name: "prune after close frame", name: "prune after close frame",
frames: []Frame{ frames: []Frame{
{ID: id, FrameNumber: 10, IsLast: false, Data: []byte("seven__")}, {ID: id, FrameNumber: 10, IsLast: false, Data: []byte("seven__")},
{ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")}}, {ID: id, FrameNumber: 2, IsLast: true, Data: []byte("four")},
},
shouldErr: []bool{false, false}, shouldErr: []bool{false, false},
sizes: []uint64{207, 204}, sizes: []uint64{207, 204},
}, },
...@@ -95,7 +100,8 @@ func TestFrameValidity(t *testing.T) { ...@@ -95,7 +100,8 @@ func TestFrameValidity(t *testing.T) {
name: "multiple valid frames", name: "multiple valid frames",
frames: []Frame{ frames: []Frame{
{ID: id, FrameNumber: 10, Data: []byte("seven__")}, {ID: id, FrameNumber: 10, Data: []byte("seven__")},
{ID: id, FrameNumber: 2, Data: []byte("four")}}, {ID: id, FrameNumber: 2, Data: []byte("four")},
},
shouldErr: []bool{false, false}, shouldErr: []bool{false, false},
sizes: []uint64{207, 411}, sizes: []uint64{207, 411},
}, },
...@@ -107,103 +113,107 @@ func TestFrameValidity(t *testing.T) { ...@@ -107,103 +113,107 @@ func TestFrameValidity(t *testing.T) {
} }
func TestBatchReader(t *testing.T) { func TestBatchReader(t *testing.T) {
// Get batch data
rng := rand.New(rand.NewSource(0x543331)) rng := rand.New(rand.NewSource(0x543331))
singularBatch := RandomSingularBatch(rng, 20, big.NewInt(333)) singularBatch := RandomSingularBatch(rng, 20, big.NewInt(333))
batchDataInput := NewBatchData(singularBatch) batchDataInput := NewBatchData(singularBatch)
encodedBatch := &bytes.Buffer{} encodedBatch := new(bytes.Buffer)
err := batchDataInput.EncodeRLP(encodedBatch) err := batchDataInput.EncodeRLP(encodedBatch)
require.NoError(t, err) require.NoError(t, err)
var testCases = []struct { const Zstd CompressionAlgo = "zstd" // invalid algo
compressor := func(ca CompressionAlgo) func(buf *bytes.Buffer, t *testing.T) {
switch {
case ca == Zlib:
return func(buf *bytes.Buffer, t *testing.T) {
writer := zlib.NewWriter(buf)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
require.NoError(t, writer.Close())
}
case ca.IsBrotli():
return func(buf *bytes.Buffer, t *testing.T) {
buf.WriteByte(ChannelVersionBrotli)
lvl := GetBrotliLevel(ca)
writer := brotli.NewWriterLevel(buf, lvl)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
require.NoError(t, writer.Close())
}
case ca == Zstd: // invalid algo
return func(buf *bytes.Buffer, t *testing.T) {
buf.WriteByte(0x02) // invalid channel version byte
writer := zstd.NewWriter(buf)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
require.NoError(t, writer.Close())
}
default:
panic("unexpected test algo")
}
}
testCases := []struct {
name string name string
algo func(buf *bytes.Buffer, t *testing.T) algo CompressionAlgo
isFjord bool isFjord bool
expectErr bool expectErr bool
}{ }{
{ {
name: "zlib-post-fjord", name: "zlib-post-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) { algo: Zlib,
writer := zlib.NewWriter(buf)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: true, isFjord: true,
}, },
{ {
name: "zlib-pre-fjord", name: "zlib-pre-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) { algo: Zlib,
writer := zlib.NewWriter(buf)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: false, isFjord: false,
}, },
{ {
name: "brotli9-post-fjord", name: "brotli-post-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) { algo: Brotli,
buf.WriteByte(ChannelVersionBrotli)
writer := brotli.NewWriterLevel(buf, 9)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: true, isFjord: true,
}, },
{ {
name: "brotli9-pre-fjord", name: "brotli-pre-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) { algo: Brotli,
buf.WriteByte(ChannelVersionBrotli)
writer := brotli.NewWriterLevel(buf, 9)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: false, isFjord: false,
expectErr: true, // expect an error because brotli is not supported before Fjord expectErr: true, // expect an error because brotli is not supported before Fjord
}, },
{ {
name: "brotli10-post-fjord", name: "brotli9-post-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) { algo: Brotli9,
buf.WriteByte(ChannelVersionBrotli)
writer := brotli.NewWriterLevel(buf, 10)
_, err := writer.Write(encodedBatch.Bytes())
require.NoError(t, err)
writer.Close()
},
isFjord: true, isFjord: true,
}, },
{ {
name: "brotli11-post-fjord", name: "brotli9-pre-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) { algo: Brotli9,
buf.WriteByte(ChannelVersionBrotli) isFjord: false,
writer := brotli.NewWriterLevel(buf, 11) expectErr: true, // expect an error because brotli is not supported before Fjord
_, err := writer.Write(encodedBatch.Bytes()) },
require.NoError(t, err) {
writer.Close() name: "brotli10-post-fjord",
}, algo: Brotli10,
isFjord: true, isFjord: true,
}, },
{ {
name: "zstd-post-fjord", name: "brotli11-post-fjord",
algo: func(buf *bytes.Buffer, t *testing.T) { algo: Brotli11,
writer := zstd.NewWriter(buf) isFjord: true,
_, err := writer.Write(encodedBatch.Bytes()) },
require.NoError(t, err) {
writer.Close() name: "zstd-post-fjord",
}, algo: Zstd,
expectErr: true, expectErr: true,
isFjord: true, isFjord: true,
}} },
}
for _, tc := range testCases { for _, tc := range testCases {
compressed := new(bytes.Buffer) compressed := new(bytes.Buffer)
tc := tc tc := tc
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
tc.algo(compressed, t) compressor(tc.algo)(compressed, t)
reader, err := BatchReader(bytes.NewReader(compressed.Bytes()), 120000, tc.isFjord) reader, err := BatchReader(bytes.NewReader(compressed.Bytes()), 120000, tc.isFjord)
if tc.expectErr { if tc.expectErr {
require.Error(t, err) require.Error(t, err)
...@@ -215,6 +225,12 @@ func TestBatchReader(t *testing.T) { ...@@ -215,6 +225,12 @@ func TestBatchReader(t *testing.T) {
batchData, err := reader() batchData, err := reader()
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, batchData) require.NotNil(t, batchData)
if tc.algo.IsBrotli() {
// special case because reader doesn't decode level
batchDataInput.ComprAlgo = Brotli
} else {
batchDataInput.ComprAlgo = tc.algo
}
require.Equal(t, batchDataInput, batchData) require.Equal(t, batchDataInput, batchData)
}) })
} }
......
...@@ -27,6 +27,9 @@ type SingularBatch struct { ...@@ -27,6 +27,9 @@ type SingularBatch struct {
Transactions []hexutil.Bytes Transactions []hexutil.Bytes
} }
func (b *SingularBatch) AsSingularBatch() (*SingularBatch, bool) { return b, true }
func (b *SingularBatch) AsSpanBatch() (*SpanBatch, bool) { return nil, false }
// GetBatchType returns its batch type (batch_version) // GetBatchType returns its batch type (batch_version)
func (b *SingularBatch) GetBatchType() int { func (b *SingularBatch) GetBatchType() int {
return SingularBatchType return SingularBatchType
......
...@@ -422,6 +422,9 @@ type SpanBatch struct { ...@@ -422,6 +422,9 @@ type SpanBatch struct {
sbtxs *spanBatchTxs sbtxs *spanBatchTxs
} }
func (b *SpanBatch) AsSingularBatch() (*SingularBatch, bool) { return nil, false }
func (b *SpanBatch) AsSpanBatch() (*SpanBatch, bool) { return b, true }
// spanBatchMarshaling is a helper type used for JSON marshaling. // spanBatchMarshaling is a helper type used for JSON marshaling.
type spanBatchMarshaling struct { type spanBatchMarshaling struct {
ParentCheck []hexutil.Bytes `json:"parent_check"` ParentCheck []hexutil.Bytes `json:"parent_check"`
......
...@@ -10,26 +10,28 @@ type CompressionAlgo string ...@@ -10,26 +10,28 @@ type CompressionAlgo string
const ( const (
// compression algo types // compression algo types
Zlib CompressionAlgo = "zlib" Zlib CompressionAlgo = "zlib"
Brotli CompressionAlgo = "brotli" // default level
Brotli9 CompressionAlgo = "brotli-9" Brotli9 CompressionAlgo = "brotli-9"
Brotli10 CompressionAlgo = "brotli-10" Brotli10 CompressionAlgo = "brotli-10"
Brotli11 CompressionAlgo = "brotli-11" Brotli11 CompressionAlgo = "brotli-11"
) )
var CompressionAlgoTypes = []CompressionAlgo{ var CompressionAlgos = []CompressionAlgo{
Zlib, Zlib,
Brotli,
Brotli9, Brotli9,
Brotli10, Brotli10,
Brotli11, Brotli11,
} }
var brotliRegexp = regexp.MustCompile(`^brotli-(9|10|11)$`) var brotliRegexp = regexp.MustCompile(`^brotli(|-(9|10|11))$`)
func (algo CompressionAlgo) String() string { func (algo CompressionAlgo) String() string {
return string(algo) return string(algo)
} }
func (algo *CompressionAlgo) Set(value string) error { func (algo *CompressionAlgo) Set(value string) error {
if !ValidCompressionAlgoType(CompressionAlgo(value)) { if !ValidCompressionAlgo(CompressionAlgo(value)) {
return fmt.Errorf("unknown compression algo type: %q", value) return fmt.Errorf("unknown compression algo type: %q", value)
} }
*algo = CompressionAlgo(value) *algo = CompressionAlgo(value)
...@@ -49,7 +51,7 @@ func GetBrotliLevel(algo CompressionAlgo) int { ...@@ -49,7 +51,7 @@ func GetBrotliLevel(algo CompressionAlgo) int {
switch algo { switch algo {
case Brotli9: case Brotli9:
return 9 return 9
case Brotli10: case Brotli10, Brotli: // make level 10 the default
return 10 return 10
case Brotli11: case Brotli11:
return 11 return 11
...@@ -58,8 +60,8 @@ func GetBrotliLevel(algo CompressionAlgo) int { ...@@ -58,8 +60,8 @@ func GetBrotliLevel(algo CompressionAlgo) int {
} }
} }
func ValidCompressionAlgoType(value CompressionAlgo) bool { func ValidCompressionAlgo(value CompressionAlgo) bool {
for _, k := range CompressionAlgoTypes { for _, k := range CompressionAlgos {
if k == value { if k == value {
return true return true
} }
......
...@@ -10,49 +10,64 @@ func TestCompressionAlgo(t *testing.T) { ...@@ -10,49 +10,64 @@ func TestCompressionAlgo(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
algo CompressionAlgo algo CompressionAlgo
isBrotli bool
isValidCompressionAlgoType bool isValidCompressionAlgoType bool
isBrotli bool
brotliLevel int
}{ }{
{ {
name: "zlib", name: "zlib",
algo: Zlib, algo: Zlib,
isValidCompressionAlgoType: true,
isBrotli: false, isBrotli: false,
},
{
name: "brotli",
algo: Brotli,
isValidCompressionAlgoType: true, isValidCompressionAlgoType: true,
isBrotli: true,
brotliLevel: 10,
}, },
{ {
name: "brotli-9", name: "brotli-9",
algo: Brotli9, algo: Brotli9,
isBrotli: true,
isValidCompressionAlgoType: true, isValidCompressionAlgoType: true,
isBrotli: true,
brotliLevel: 9,
}, },
{ {
name: "brotli-10", name: "brotli-10",
algo: Brotli10, algo: Brotli10,
isBrotli: true,
isValidCompressionAlgoType: true, isValidCompressionAlgoType: true,
isBrotli: true,
brotliLevel: 10,
}, },
{ {
name: "brotli-11", name: "brotli-11",
algo: Brotli11, algo: Brotli11,
isBrotli: true,
isValidCompressionAlgoType: true, isValidCompressionAlgoType: true,
isBrotli: true,
brotliLevel: 11,
}, },
{ {
name: "invalid", name: "invalid",
algo: CompressionAlgo("invalid"), algo: CompressionAlgo("invalid"),
isBrotli: false,
isValidCompressionAlgoType: false, isValidCompressionAlgoType: false,
}} isBrotli: false,
},
}
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.isBrotli, tc.algo.IsBrotli()) require.Equal(t, tc.isBrotli, tc.algo.IsBrotli())
if tc.isBrotli { if tc.isBrotli {
require.NotPanics(t, func() { GetBrotliLevel((tc.algo)) }) require.NotPanics(t, func() {
blvl := GetBrotliLevel((tc.algo))
require.Equal(t, tc.brotliLevel, blvl)
})
} else { } else {
require.Panics(t, func() { GetBrotliLevel(tc.algo) }) require.Panics(t, func() { GetBrotliLevel(tc.algo) })
} }
require.Equal(t, tc.isValidCompressionAlgoType, ValidCompressionAlgoType(tc.algo)) require.Equal(t, tc.isValidCompressionAlgoType, ValidCompressionAlgo(tc.algo))
}) })
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment