Commit d2a9455d authored by Sebastian Stammler's avatar Sebastian Stammler Committed by GitHub

op-node,op-batcher: implement span channel out block count limit (#11416)

* op-node: implement span channel out block count limit

* op-batcher: add max-blocks-per-span-batch flag

* op-e2e: test MaxBlocksPerSpanBatch in system test

* op-e2e: use span batches in 4844 test

* address Axel's review
parent dece4de2
...@@ -94,7 +94,10 @@ func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1Origi ...@@ -94,7 +94,10 @@ func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1Origi
chainSpec := rollup.NewChainSpec(&rollupCfg) chainSpec := rollup.NewChainSpec(&rollupCfg)
var co derive.ChannelOut var co derive.ChannelOut
if cfg.BatchType == derive.SpanBatchType { if cfg.BatchType == derive.SpanBatchType {
co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize, cfg.CompressorConfig.CompressionAlgo, chainSpec) co, err = derive.NewSpanChannelOut(
rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID,
cfg.CompressorConfig.TargetOutputSize, cfg.CompressorConfig.CompressionAlgo,
chainSpec, derive.WithMaxBlocksPerSpanBatch(cfg.MaxBlocksPerSpanBatch))
} else { } else {
co, err = derive.NewSingularChannelOut(c, chainSpec) co, err = derive.NewSingularChannelOut(c, chainSpec)
} }
......
...@@ -29,6 +29,9 @@ type ChannelConfig struct { ...@@ -29,6 +29,9 @@ type ChannelConfig struct {
SubSafetyMargin uint64 SubSafetyMargin uint64
// The maximum byte-size a frame can have. // The maximum byte-size a frame can have.
MaxFrameSize uint64 MaxFrameSize uint64
// MaxBlocksPerSpanBatch is the maximum number of blocks to add to a span batch.
// A value of 0 disables a maximum.
MaxBlocksPerSpanBatch int
// Target number of frames to create per channel. // Target number of frames to create per channel.
// For blob transactions, this controls the number of blobs to target adding // For blob transactions, this controls the number of blobs to target adding
......
...@@ -56,6 +56,9 @@ type CLIConfig struct { ...@@ -56,6 +56,9 @@ type CLIConfig struct {
// If using blobs, this setting is ignored and the max blob size is used. // If using blobs, this setting is ignored and the max blob size is used.
MaxL1TxSize uint64 MaxL1TxSize uint64
// Maximum number of blocks to add to a span batch. Default is 0 - no maximum.
MaxBlocksPerSpanBatch int
// The target number of frames to create per channel. Controls number of blobs // The target number of frames to create per channel. Controls number of blobs
// per blob tx, if using Blob DA. // per blob tx, if using Blob DA.
TargetNumFrames int TargetNumFrames int
...@@ -173,6 +176,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig { ...@@ -173,6 +176,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
MaxPendingTransactions: ctx.Uint64(flags.MaxPendingTransactionsFlag.Name), MaxPendingTransactions: ctx.Uint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name), MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name), MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name),
MaxBlocksPerSpanBatch: ctx.Int(flags.MaxBlocksPerSpanBatch.Name),
TargetNumFrames: ctx.Int(flags.TargetNumFramesFlag.Name), TargetNumFrames: ctx.Int(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.Float64(flags.ApproxComprRatioFlag.Name), ApproxComprRatio: ctx.Float64(flags.ApproxComprRatioFlag.Name),
Compressor: ctx.String(flags.CompressorFlag.Name), Compressor: ctx.String(flags.CompressorFlag.Name),
......
...@@ -192,13 +192,14 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { ...@@ -192,13 +192,14 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
channelTimeout = bs.RollupConfig.ChannelTimeoutGranite channelTimeout = bs.RollupConfig.ChannelTimeoutGranite
} }
cc := ChannelConfig{ cc := ChannelConfig{
SeqWindowSize: bs.RollupConfig.SeqWindowSize, SeqWindowSize: bs.RollupConfig.SeqWindowSize,
ChannelTimeout: channelTimeout, ChannelTimeout: channelTimeout,
MaxChannelDuration: cfg.MaxChannelDuration, MaxChannelDuration: cfg.MaxChannelDuration,
MaxFrameSize: cfg.MaxL1TxSize - 1, // account for version byte prefix; reset for blobs MaxFrameSize: cfg.MaxL1TxSize - 1, // account for version byte prefix; reset for blobs
TargetNumFrames: cfg.TargetNumFrames, MaxBlocksPerSpanBatch: cfg.MaxBlocksPerSpanBatch,
SubSafetyMargin: cfg.SubSafetyMargin, TargetNumFrames: cfg.TargetNumFrames,
BatchType: cfg.BatchType, SubSafetyMargin: cfg.SubSafetyMargin,
BatchType: cfg.BatchType,
} }
switch cfg.DataAvailabilityType { switch cfg.DataAvailabilityType {
......
...@@ -76,6 +76,11 @@ var ( ...@@ -76,6 +76,11 @@ var (
Value: 120_000, // will be overwritten to max for blob da-type Value: 120_000, // will be overwritten to max for blob da-type
EnvVars: prefixEnvVars("MAX_L1_TX_SIZE_BYTES"), EnvVars: prefixEnvVars("MAX_L1_TX_SIZE_BYTES"),
} }
MaxBlocksPerSpanBatch = &cli.IntFlag{
Name: "max-blocks-per-span-batch",
Usage: "Maximum number of blocks to add to a span batch. Default is 0 - no maximum.",
EnvVars: prefixEnvVars("MAX_BLOCKS_PER_SPAN_BATCH"),
}
TargetNumFramesFlag = &cli.IntFlag{ TargetNumFramesFlag = &cli.IntFlag{
Name: "target-num-frames", Name: "target-num-frames",
Usage: "The target number of frames to create per channel. Controls number of blobs per blob tx, if using Blob DA.", Usage: "The target number of frames to create per channel. Controls number of blobs per blob tx, if using Blob DA.",
...@@ -169,6 +174,7 @@ var optionalFlags = []cli.Flag{ ...@@ -169,6 +174,7 @@ var optionalFlags = []cli.Flag{
MaxPendingTransactionsFlag, MaxPendingTransactionsFlag,
MaxChannelDurationFlag, MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag, MaxL1TxSizeBytesFlag,
MaxBlocksPerSpanBatch,
TargetNumFramesFlag, TargetNumFramesFlag,
ApproxComprRatioFlag, ApproxComprRatioFlag,
CompressorFlag, CompressorFlag,
......
...@@ -46,6 +46,7 @@ func testSystem4844E2E(t *testing.T, multiBlob bool, daType batcherFlags.DataAva ...@@ -46,6 +46,7 @@ func testSystem4844E2E(t *testing.T, multiBlob bool, daType batcherFlags.DataAva
cfg := EcotoneSystemConfig(t, &genesisTime) cfg := EcotoneSystemConfig(t, &genesisTime)
cfg.DataAvailabilityType = daType cfg.DataAvailabilityType = daType
cfg.BatcherBatchType = derive.SpanBatchType
cfg.DeployConfig.L1GenesisBlockBaseFeePerGas = (*hexutil.Big)(big.NewInt(7000)) cfg.DeployConfig.L1GenesisBlockBaseFeePerGas = (*hexutil.Big)(big.NewInt(7000))
const maxBlobs = 6 const maxBlobs = 6
......
...@@ -288,6 +288,12 @@ type SystemConfig struct { ...@@ -288,6 +288,12 @@ type SystemConfig struct {
// whether to actually use BatcherMaxL1TxSizeBytes for blobs, insteaf of max blob size // whether to actually use BatcherMaxL1TxSizeBytes for blobs, insteaf of max blob size
BatcherUseMaxTxSizeForBlobs bool BatcherUseMaxTxSizeForBlobs bool
// Singular (0) or span batches (1)
BatcherBatchType uint
// If >0, limits the number of blocks per span batch
BatcherMaxBlocksPerSpanBatch int
// SupportL1TimeTravel determines if the L1 node supports quickly skipping forward in time // SupportL1TimeTravel determines if the L1 node supports quickly skipping forward in time
SupportL1TimeTravel bool SupportL1TimeTravel bool
...@@ -884,10 +890,6 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -884,10 +890,6 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
} }
sys.L2OutputSubmitter = proposer sys.L2OutputSubmitter = proposer
var batchType uint = derive.SingularBatchType
if cfg.DeployConfig.L2GenesisDeltaTimeOffset != nil && *cfg.DeployConfig.L2GenesisDeltaTimeOffset == hexutil.Uint64(0) {
batchType = derive.SpanBatchType
}
// batcher defaults if unset // batcher defaults if unset
batcherMaxL1TxSizeBytes := cfg.BatcherMaxL1TxSizeBytes batcherMaxL1TxSizeBytes := cfg.BatcherMaxL1TxSizeBytes
if batcherMaxL1TxSizeBytes == 0 { if batcherMaxL1TxSizeBytes == 0 {
...@@ -921,10 +923,11 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -921,10 +923,11 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
Level: log.LevelInfo, Level: log.LevelInfo,
Format: oplog.FormatText, Format: oplog.FormatText,
}, },
Stopped: sys.Cfg.DisableBatcher, // Batch submitter may be enabled later Stopped: sys.Cfg.DisableBatcher, // Batch submitter may be enabled later
BatchType: batchType, BatchType: cfg.BatcherBatchType,
DataAvailabilityType: sys.Cfg.DataAvailabilityType, MaxBlocksPerSpanBatch: cfg.BatcherMaxBlocksPerSpanBatch,
CompressionAlgo: compressionAlgo, DataAvailabilityType: sys.Cfg.DataAvailabilityType,
CompressionAlgo: compressionAlgo,
} }
// Batch Submitter // Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"]) batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"])
......
...@@ -55,22 +55,27 @@ import ( ...@@ -55,22 +55,27 @@ import (
func TestSystemBatchType(t *testing.T) { func TestSystemBatchType(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
f func(gt *testing.T, deltaTimeOffset *hexutil.Uint64) f func(*testing.T, func(*SystemConfig))
}{ }{
{"StopStartBatcher", StopStartBatcher}, {"StopStartBatcher", StopStartBatcher},
} }
for _, test := range tests { for _, test := range tests {
test := test test := test
t.Run(test.name+"_SingularBatch", func(t *testing.T) { t.Run(test.name+"_SingularBatch", func(t *testing.T) {
test.f(t, nil) test.f(t, func(sc *SystemConfig) {
sc.BatcherBatchType = derive.SingularBatchType
})
}) })
}
deltaTimeOffset := hexutil.Uint64(0)
for _, test := range tests {
test := test
t.Run(test.name+"_SpanBatch", func(t *testing.T) { t.Run(test.name+"_SpanBatch", func(t *testing.T) {
test.f(t, &deltaTimeOffset) test.f(t, func(sc *SystemConfig) {
sc.BatcherBatchType = derive.SpanBatchType
})
})
t.Run(test.name+"_SpanBatchMaxBlocks", func(t *testing.T) {
test.f(t, func(sc *SystemConfig) {
sc.BatcherBatchType = derive.SpanBatchType
sc.BatcherMaxBlocksPerSpanBatch = 2
})
}) })
} }
} }
...@@ -1292,10 +1297,11 @@ func testFees(t *testing.T, cfg SystemConfig) { ...@@ -1292,10 +1297,11 @@ func testFees(t *testing.T, cfg SystemConfig) {
require.Equal(t, balanceDiff, totalFee, "balances should add up") require.Equal(t, balanceDiff, totalFee, "balances should add up")
} }
func StopStartBatcher(t *testing.T, deltaTimeOffset *hexutil.Uint64) { func StopStartBatcher(t *testing.T, cfgMod func(*SystemConfig)) {
InitParallel(t) InitParallel(t)
cfg := DeltaSystemConfig(t, deltaTimeOffset) cfg := DefaultSystemConfig(t)
cfgMod(&cfg)
sys, err := cfg.Start(t) sys, err := cfg.Start(t)
require.NoError(t, err, "Error starting up system") require.NoError(t, err, "Error starting up system")
defer sys.Close() defer sys.Close()
......
...@@ -5,6 +5,9 @@ import ( ...@@ -5,6 +5,9 @@ import (
"io" "io"
"math/big" "math/big"
"math/rand" "math/rand"
"reflect"
"runtime"
"strconv"
"testing" "testing"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -13,9 +16,17 @@ import ( ...@@ -13,9 +16,17 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
) )
var rollupCfg rollup.Config var rollupCfg = rollup.Config{
Genesis: rollup.Genesis{
L2Time: uint64(1723618465),
},
BlockTime: 2,
L2ChainID: big.NewInt(420),
L1ChainID: big.NewInt(161),
}
// basic implementation of the Compressor interface that does no compression // basic implementation of the Compressor interface that does no compression
type nonCompressor struct { type nonCompressor struct {
...@@ -204,7 +215,7 @@ func TestForceCloseTxData(t *testing.T) { ...@@ -204,7 +215,7 @@ func TestForceCloseTxData(t *testing.T) {
for i, test := range tests { for i, test := range tests {
out, err := ForceCloseTxData(test.frames) out, err := ForceCloseTxData(test.frames)
if test.errors { if test.errors {
require.NotNil(t, err, "Should error on tc %v", i) require.Error(t, err, "Should error on tc %v", i)
require.Nil(t, out, "Should return no value in tc %v", i) require.Nil(t, out, "Should return no value in tc %v", i)
} else { } else {
require.NoError(t, err, "Should not error on tc %v", i) require.NoError(t, err, "Should not error on tc %v", i)
...@@ -219,42 +230,46 @@ func TestBlockToBatchValidity(t *testing.T) { ...@@ -219,42 +230,46 @@ func TestBlockToBatchValidity(t *testing.T) {
require.ErrorContains(t, err, "has no transactions") require.ErrorContains(t, err, "has no transactions")
} }
func SpanChannelAndBatches(t *testing.T, target uint64, len int, algo CompressionAlgo) (*SpanChannelOut, []*SingularBatch) { func SpanChannelAndBatches(t *testing.T, targetOutputSize uint64, numBatches int, algo CompressionAlgo, opts ...SpanChannelOutOption) (*SpanChannelOut, []*SingularBatch) {
// target is larger than one batch, but smaller than two batches // target is larger than one batch, but smaller than two batches
rng := rand.New(rand.NewSource(0x543331)) rng := rand.New(rand.NewSource(0x543331))
chainID := big.NewInt(rng.Int63n(1000)) chainID := rollupCfg.L2ChainID
txCount := 1 txCount := 1
cout, err := NewSpanChannelOut(0, chainID, target, algo, rollup.NewChainSpec(&rollupCfg)) genesisTime := rollupCfg.Genesis.L2Time
cout, err := NewSpanChannelOut(genesisTime, chainID, targetOutputSize, algo, rollup.NewChainSpec(&rollupCfg), opts...)
require.NoError(t, err) require.NoError(t, err)
batches := make([]*SingularBatch, len) batches := make([]*SingularBatch, 0, numBatches)
// adding the first batch should not cause an error // adding the first batch should not cause an error
for i := 0; i < len; i++ { for i := 0; i < numBatches; i++ {
singularBatch := RandomSingularBatch(rng, txCount, chainID) singularBatch := RandomSingularBatch(rng, txCount, chainID)
batches[i] = singularBatch // use default 2 sec block time
singularBatch.Timestamp = genesisTime + 420_000 + rollupCfg.BlockTime*uint64(i)
batches = append(batches, singularBatch)
} }
return cout, batches return cout, batches
} }
func TestSpanChannelOut(t *testing.T) { func TestSpanChannelOut(t *testing.T) {
tests := []struct { tests := []func(t *testing.T, algo CompressionAlgo){
name string SpanChannelOutCompressionOnlyOneBatch,
f func(t *testing.T, algo CompressionAlgo) SpanChannelOutCompressionUndo,
}{ SpanChannelOutClose,
{"SpanChannelOutCompressionOnlyOneBatch", SpanChannelOutCompressionOnlyOneBatch},
{"SpanChannelOutCompressionUndo", SpanChannelOutCompressionUndo},
{"SpanChannelOutClose", SpanChannelOutClose},
} }
for _, test := range tests { for _, test := range tests {
test := test test := test
for _, algo := range CompressionAlgos { for _, algo := range CompressionAlgos {
t.Run(test.name+"_"+algo.String(), func(t *testing.T) { t.Run(funcName(test)+"_"+algo.String(), func(t *testing.T) {
test.f(t, algo) test(t, algo)
}) })
} }
} }
} }
func funcName(fn any) string {
return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
}
// TestSpanChannelOutCompressionOnlyOneBatch tests that the SpanChannelOut compression works as expected when there is only one batch // TestSpanChannelOutCompressionOnlyOneBatch tests that the SpanChannelOut compression works as expected when there is only one batch
// and it is larger than the target size. The single batch should be compressed, and the channel should now be full // and it is larger than the target size. The single batch should be compressed, and the channel should now be full
func SpanChannelOutCompressionOnlyOneBatch(t *testing.T, algo CompressionAlgo) { func SpanChannelOutCompressionOnlyOneBatch(t *testing.T, algo CompressionAlgo) {
...@@ -276,7 +291,7 @@ func SpanChannelOutCompressionOnlyOneBatch(t *testing.T, algo CompressionAlgo) { ...@@ -276,7 +291,7 @@ func SpanChannelOutCompressionOnlyOneBatch(t *testing.T, algo CompressionAlgo) {
// TestSpanChannelOutCompressionUndo tests that the SpanChannelOut compression rejects a batch that would cause the channel to be overfull // TestSpanChannelOutCompressionUndo tests that the SpanChannelOut compression rejects a batch that would cause the channel to be overfull
func SpanChannelOutCompressionUndo(t *testing.T, algo CompressionAlgo) { func SpanChannelOutCompressionUndo(t *testing.T, algo CompressionAlgo) {
// target is larger than one batch, but smaller than two batches // target is larger than one batch, but smaller than two batches
cout, singularBatches := SpanChannelAndBatches(t, 750, 2, algo) cout, singularBatches := SpanChannelAndBatches(t, 1100, 2, algo)
err := cout.AddSingularBatch(singularBatches[0], 0) err := cout.AddSingularBatch(singularBatches[0], 0)
require.NoError(t, err) require.NoError(t, err)
...@@ -301,7 +316,7 @@ func SpanChannelOutCompressionUndo(t *testing.T, algo CompressionAlgo) { ...@@ -301,7 +316,7 @@ func SpanChannelOutCompressionUndo(t *testing.T, algo CompressionAlgo) {
// TestSpanChannelOutClose tests that the SpanChannelOut compression works as expected when the channel is closed. // TestSpanChannelOutClose tests that the SpanChannelOut compression works as expected when the channel is closed.
// it should compress the batch even if it is smaller than the target size because the channel is closing // it should compress the batch even if it is smaller than the target size because the channel is closing
func SpanChannelOutClose(t *testing.T, algo CompressionAlgo) { func SpanChannelOutClose(t *testing.T, algo CompressionAlgo) {
target := uint64(600) target := uint64(1100)
cout, singularBatches := SpanChannelAndBatches(t, target, 1, algo) cout, singularBatches := SpanChannelAndBatches(t, target, 1, algo)
err := cout.AddSingularBatch(singularBatches[0], 0) err := cout.AddSingularBatch(singularBatches[0], 0)
...@@ -325,3 +340,150 @@ func SpanChannelOutClose(t *testing.T, algo CompressionAlgo) { ...@@ -325,3 +340,150 @@ func SpanChannelOutClose(t *testing.T, algo CompressionAlgo) {
require.Greater(t, cout.compressor.Len(), 0) require.Greater(t, cout.compressor.Len(), 0)
require.Equal(t, rlpLen, cout.activeRLP().Len()) require.Equal(t, rlpLen, cout.activeRLP().Len())
} }
type maxBlocksTest struct {
outputSize uint64
exactFull bool // whether the outputSize is exactly hit by the last batch
numBatches int // the last batch should cause the compressor to be full
maxBlocks int
expNumSpanBatches int
expLastNumBlocks int
}
// This tests sets a max blocks per span batch and causes multiple span batches
// within a single channel. It then does a full round trip, encoding and decoding
// the channel, confirming that the expected batches were encoded.
func TestSpanChannelOut_MaxBlocksPerSpanBatch(t *testing.T) {
for i, tt := range []maxBlocksTest{
{
outputSize: 10_751,
exactFull: true,
numBatches: 15,
maxBlocks: 4,
expNumSpanBatches: 4,
expLastNumBlocks: 3,
},
{
outputSize: 11_000,
numBatches: 16,
maxBlocks: 4,
expNumSpanBatches: 4,
expLastNumBlocks: 3,
},
{
outputSize: 11_154,
exactFull: true,
numBatches: 16,
maxBlocks: 4,
expNumSpanBatches: 4,
expLastNumBlocks: 4,
},
{
outputSize: 11_500,
numBatches: 17,
maxBlocks: 4,
expNumSpanBatches: 4,
expLastNumBlocks: 4,
},
{
outputSize: 11_801,
exactFull: true,
numBatches: 17,
maxBlocks: 4,
expNumSpanBatches: 5,
expLastNumBlocks: 1,
},
{
outputSize: 12_000,
numBatches: 18,
maxBlocks: 4,
expNumSpanBatches: 5,
expLastNumBlocks: 1,
},
} {
t.Run("test-"+strconv.Itoa(i), func(t *testing.T) {
testSpanChannelOut_MaxBlocksPerSpanBatch(t, tt)
})
}
}
func testSpanChannelOut_MaxBlocksPerSpanBatch(t *testing.T, tt maxBlocksTest) {
l1Origin := eth.L1BlockRef{Number: rollupCfg.Genesis.L1.Number + 42_000, Hash: common.Hash{0xde, 0xad, 0x42}}
l2SafeHead := eth.L2BlockRef{Number: rollupCfg.Genesis.L2Time + 40_000}
cout, bs := SpanChannelAndBatches(t, tt.outputSize, tt.numBatches, Brotli, WithMaxBlocksPerSpanBatch(tt.maxBlocks))
for i, b := range bs {
b.EpochNum = rollup.Epoch(l1Origin.Number)
b.EpochHash = l1Origin.Hash
err := cout.AddSingularBatch(b, uint64(i))
if i != tt.numBatches-1 || tt.exactFull {
require.NoErrorf(t, err, "iteration %d", i)
} else {
// adding last batch should not succeed, if not making compressor exactly full
require.ErrorIs(t, err, ErrCompressorFull)
t.Logf("full compressor length: %d", cout.compressor.Len())
}
}
require.ErrorIs(t, cout.FullErr(), ErrCompressorFull)
expSpanBatchBlocks := tt.expLastNumBlocks
if !tt.exactFull {
// if we didn't fill up exactly, we expect that one more block got
// added to the current span batch to detect that the compressor is full
expSpanBatchBlocks = tt.expLastNumBlocks%tt.maxBlocks + 1
}
require.Equal(t, expSpanBatchBlocks, cout.spanBatch.GetBlockCount(),
"last block should still have been added to the span batch")
require.NoError(t, cout.Close())
// write cannel into a single frame
var frameBuf bytes.Buffer
fn, err := cout.OutputFrame(&frameBuf, tt.outputSize+FrameV0OverHeadSize)
require.Zero(t, fn)
require.ErrorIs(t, err, io.EOF)
// now roundtrip to decode the batches
var frame Frame
require.NoError(t, frame.UnmarshalBinary(&frameBuf))
require.True(t, frame.IsLast)
spec := rollup.NewChainSpec(&rollupCfg)
ch := NewChannel(frame.ID, l1Origin)
require.False(t, ch.IsReady())
require.NoError(t, ch.AddFrame(frame, l1Origin))
require.True(t, ch.IsReady())
br, err := BatchReader(ch.Reader(), spec.MaxRLPBytesPerChannel(0), true)
require.NoError(t, err)
sbs := make([]*SingularBatch, 0, tt.numBatches-1)
for i := 0; i < tt.expNumSpanBatches; i++ {
t.Logf("iteration %d", i)
expBlocks := tt.maxBlocks
if i == tt.expNumSpanBatches-1 {
// last span batch possibly contains less
expBlocks = tt.expLastNumBlocks
}
bd, err := br()
require.NoError(t, err)
require.EqualValues(t, SpanBatchType, bd.GetBatchType())
sb, err := DeriveSpanBatch(bd, rollupCfg.BlockTime, rollupCfg.Genesis.L2Time, cout.spanBatch.ChainID)
require.NoError(t, err)
require.Equal(t, expBlocks, sb.GetBlockCount())
sbs0, err := sb.GetSingularBatches([]eth.L1BlockRef{l1Origin}, l2SafeHead)
require.NoError(t, err)
// last span batch contains one less
require.Len(t, sbs0, expBlocks)
sbs = append(sbs, sbs0...)
}
// batch reader should be exhausted
_, err = br()
require.ErrorIs(t, err, io.EOF)
for i, batch := range sbs {
batch0 := bs[i]
// clear the expected parent hash, as GetSingularBatches doesn't set these yet
// we still compare timestamps and txs, which is enough
batch0.ParentHash = (common.Hash{})
require.Equalf(t, batch0, batch, "iteration %d", i)
}
}
...@@ -2,7 +2,6 @@ package derive ...@@ -2,7 +2,6 @@ package derive
import ( import (
"bytes" "bytes"
"crypto/rand" "crypto/rand"
"fmt" "fmt"
"io" "io"
...@@ -37,6 +36,15 @@ type SpanChannelOut struct { ...@@ -37,6 +36,15 @@ type SpanChannelOut struct {
// spanBatch is the batch being built, which immutably holds genesis timestamp and chain ID, but otherwise can be reset // spanBatch is the batch being built, which immutably holds genesis timestamp and chain ID, but otherwise can be reset
spanBatch *SpanBatch spanBatch *SpanBatch
// maxBlocksPerSpanBatch is an optional limit on the number of blocks per span batch.
// If non-zero, a new span batch will be started after the current span batch has
// reached this maximum.
maxBlocksPerSpanBatch int
// sealedRLPBytes stores the sealed number of input RLP bytes. This is used when maxBlocksPerSpanBatch is non-zero
// to seal full span batches (that have reached the max block count) in the rlp slices.
sealedRLPBytes int
chainSpec *rollup.ChainSpec chainSpec *rollup.ChainSpec
} }
...@@ -49,7 +57,15 @@ func (co *SpanChannelOut) setRandomID() error { ...@@ -49,7 +57,15 @@ func (co *SpanChannelOut) setRandomID() error {
return err return err
} }
func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64, compressionAlgo CompressionAlgo, chainSpec *rollup.ChainSpec) (*SpanChannelOut, error) { type SpanChannelOutOption func(co *SpanChannelOut)
func WithMaxBlocksPerSpanBatch(maxBlock int) SpanChannelOutOption {
return func(co *SpanChannelOut) {
co.maxBlocksPerSpanBatch = maxBlock
}
}
func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64, compressionAlgo CompressionAlgo, chainSpec *rollup.ChainSpec, opts ...SpanChannelOutOption) (*SpanChannelOut, error) {
c := &SpanChannelOut{ c := &SpanChannelOut{
id: ChannelID{}, id: ChannelID{},
frame: 0, frame: 0,
...@@ -67,6 +83,10 @@ func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSi ...@@ -67,6 +83,10 @@ func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSi
return nil, err return nil, err
} }
for _, opt := range opts {
opt(c)
}
return c, nil return c, nil
} }
...@@ -74,15 +94,20 @@ func (co *SpanChannelOut) Reset() error { ...@@ -74,15 +94,20 @@ func (co *SpanChannelOut) Reset() error {
co.closed = false co.closed = false
co.full = nil co.full = nil
co.frame = 0 co.frame = 0
co.sealedRLPBytes = 0
co.rlp[0].Reset() co.rlp[0].Reset()
co.rlp[1].Reset() co.rlp[1].Reset()
co.lastCompressedRLPSize = 0 co.lastCompressedRLPSize = 0
co.compressor.Reset() co.compressor.Reset()
co.spanBatch = NewSpanBatch(co.spanBatch.GenesisTimestamp, co.spanBatch.ChainID) co.resetSpanBatch()
// setting the new randomID is the only part of the reset that can fail // setting the new randomID is the only part of the reset that can fail
return co.setRandomID() return co.setRandomID()
} }
func (co *SpanChannelOut) resetSpanBatch() {
co.spanBatch = NewSpanBatch(co.spanBatch.GenesisTimestamp, co.spanBatch.ChainID)
}
// activeRLP returns the active RLP buffer using the current rlpIndex // activeRLP returns the active RLP buffer using the current rlpIndex
func (co *SpanChannelOut) activeRLP() *bytes.Buffer { func (co *SpanChannelOut) activeRLP() *bytes.Buffer {
return co.rlp[co.rlpIndex] return co.rlp[co.rlpIndex]
...@@ -127,6 +152,7 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) ...@@ -127,6 +152,7 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
return err return err
} }
co.ensureOpenSpanBatch()
// update the SpanBatch with the SingularBatch // update the SpanBatch with the SingularBatch
if err := co.spanBatch.AppendSingularBatch(batch, seqNum); err != nil { if err := co.spanBatch.AppendSingularBatch(batch, seqNum); err != nil {
return fmt.Errorf("failed to append SingularBatch to SpanBatch: %w", err) return fmt.Errorf("failed to append SingularBatch to SpanBatch: %w", err)
...@@ -137,11 +163,13 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) ...@@ -137,11 +163,13 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
return fmt.Errorf("failed to convert SpanBatch into RawSpanBatch: %w", err) return fmt.Errorf("failed to convert SpanBatch into RawSpanBatch: %w", err)
} }
// switch to the other buffer and reset it for new use // switch to the other buffer and truncate it for new use
// (the RLP buffer which is being made inactive holds the RLP encoded span batch just before the new batch was added) // (the RLP buffer which is being made inactive holds the RLP encoded span batch(es)
// just before the new batch was added)
co.swapRLP() co.swapRLP()
co.activeRLP().Reset() active := co.activeRLP()
if err = rlp.Encode(co.activeRLP(), NewBatchData(rawSpanBatch)); err != nil { active.Truncate(co.sealedRLPBytes)
if err = rlp.Encode(active, NewBatchData(rawSpanBatch)); err != nil {
return fmt.Errorf("failed to encode RawSpanBatch into bytes: %w", err) return fmt.Errorf("failed to encode RawSpanBatch into bytes: %w", err)
} }
...@@ -150,14 +178,14 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) ...@@ -150,14 +178,14 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
// of the current batch guarantees that this channel will be included in an L1 block with a timestamp well after // of the current batch guarantees that this channel will be included in an L1 block with a timestamp well after
// the Fjord activation. // the Fjord activation.
maxRLPBytesPerChannel := co.chainSpec.MaxRLPBytesPerChannel(batch.Timestamp) maxRLPBytesPerChannel := co.chainSpec.MaxRLPBytesPerChannel(batch.Timestamp)
if co.activeRLP().Len() > int(maxRLPBytesPerChannel) { if active.Len() > int(maxRLPBytesPerChannel) {
return fmt.Errorf("could not take %d bytes as replacement of channel of %d bytes, max is %d. err: %w", return fmt.Errorf("could not take %d bytes as replacement of channel of %d bytes, max is %d. err: %w",
co.activeRLP().Len(), co.inactiveRLP().Len(), maxRLPBytesPerChannel, ErrTooManyRLPBytes) active.Len(), co.inactiveRLP().Len(), maxRLPBytesPerChannel, ErrTooManyRLPBytes)
} }
// if the compressed data *plus* the new rlp data is under the target size, return early // if the compressed data *plus* the new rlp data is under the target size, return early
// this optimizes out cases where the compressor will obviously come in under the target size // this optimizes out cases where the compressor will obviously come in under the target size
rlpGrowth := co.activeRLP().Len() - co.lastCompressedRLPSize rlpGrowth := active.Len() - co.lastCompressedRLPSize
if uint64(co.compressor.Len()+rlpGrowth) < co.target { if uint64(co.compressor.Len()+rlpGrowth) < co.target {
return nil return nil
} }
...@@ -166,20 +194,24 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) ...@@ -166,20 +194,24 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
if err = co.compress(); err != nil { if err = co.compress(); err != nil {
return err return err
} }
co.lastCompressedRLPSize = co.activeRLP().Len()
// if the channel is now full, either return the compressed data, or the compressed previous data // if the channel is now full, either return the compressed data, or the compressed previous data
if err := co.FullErr(); err != nil { if err := co.FullErr(); err != nil {
// if there is only one batch in the channel, it *must* be returned // if it's the first singular batch/block of the channel, it *must* fit in
if len(co.spanBatch.Batches) == 1 { if co.sealedRLPBytes == 0 && co.spanBatch.GetBlockCount() == 1 {
return nil
}
// if we just perfectly filled up the channel, also return nil to retain block
if uint64(co.compressor.Len()) == co.target {
return nil return nil
} }
// if there is more than one batch in the channel, we revert the last batch // if there is more than one batch in the channel, we revert the last batch
// by switching the RLP buffer and doing a fresh compression // by switching the RLP buffer and doing a fresh compression
co.swapRLP() co.swapRLP()
if err := co.compress(); err != nil { if cerr := co.compress(); cerr != nil {
return err return cerr
} }
// return the full error // return the full error
return err return err
...@@ -188,13 +220,36 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) ...@@ -188,13 +220,36 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
return nil return nil
} }
func (co *SpanChannelOut) ensureOpenSpanBatch() {
if co.maxBlocksPerSpanBatch == 0 || co.spanBatch.GetBlockCount() < co.maxBlocksPerSpanBatch {
return
}
// we assume that the full span batch has been written to the last active rlp buffer
active, inactive := co.activeRLP(), co.inactiveRLP()
if inactive.Len() > active.Len() {
panic("inactive rlp unexpectedly larger")
}
co.sealedRLPBytes = active.Len()
// Copy active to inactive rlp buffer so both have the same sealed state
// and resetting by truncation works as intended.
inactive.Reset()
// err is guaranteed to always be nil
_, _ = inactive.Write(active.Bytes())
co.resetSpanBatch()
}
// compress compresses the active RLP buffer and checks if the compressed data is over the target size. // compress compresses the active RLP buffer and checks if the compressed data is over the target size.
// it resets all the compression buffers because Span Batches aren't meant to be compressed incrementally. // it resets all the compression buffers because Span Batches aren't meant to be compressed incrementally.
func (co *SpanChannelOut) compress() error { func (co *SpanChannelOut) compress() error {
co.compressor.Reset() co.compressor.Reset()
if _, err := co.compressor.Write(co.activeRLP().Bytes()); err != nil { // we write Bytes() of the active RLP to the compressor, so the active RLP's
// buffer is not advanced as a ReadWriter, making it possible to later use
// Truncate().
rlpBytes := co.activeRLP().Bytes()
if _, err := co.compressor.Write(rlpBytes); err != nil {
return err return err
} }
co.lastCompressedRLPSize = len(rlpBytes)
if err := co.compressor.Close(); err != nil { if err := co.compressor.Close(); err != nil {
return err return err
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment