Commit d062c1c5 authored by Michael de Hoog's avatar Michael de Hoog Committed by GitHub

Remove AddSingularBatch from ChannelOut interface (prefer AddBlock) (#12079)

parent 8917511b
...@@ -178,12 +178,8 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro ...@@ -178,12 +178,8 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro
return nil, c.FullErr() return nil, c.FullErr()
} }
batch, l1info, err := derive.BlockToSingularBatch(c.rollupCfg, block) l1info, err := c.co.AddBlock(c.rollupCfg, block)
if err != nil { if errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.ErrCompressorFull) {
return l1info, fmt.Errorf("converting block to batch: %w", err)
}
if err = c.co.AddSingularBatch(batch, l1info.SequenceNumber); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.ErrCompressorFull) {
c.setFullErr(err) c.setFullErr(err)
return l1info, c.FullErr() return l1info, c.FullErr()
} else if err != nil { } else if err != nil {
...@@ -191,7 +187,7 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro ...@@ -191,7 +187,7 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro
} }
c.blocks = append(c.blocks, block) c.blocks = append(c.blocks, block)
c.updateSwTimeout(batch) c.updateSwTimeout(l1info.Number)
if l1info.Number > c.latestL1Origin.Number { if l1info.Number > c.latestL1Origin.Number {
c.latestL1Origin = eth.BlockID{ c.latestL1Origin = eth.BlockID{
...@@ -252,8 +248,8 @@ func (c *ChannelBuilder) updateDurationTimeout(l1BlockNum uint64) { ...@@ -252,8 +248,8 @@ func (c *ChannelBuilder) updateDurationTimeout(l1BlockNum uint64) {
// derived from the batch's origin L1 block. The timeout is only moved forward // derived from the batch's origin L1 block. The timeout is only moved forward
// if the derived sequencer window timeout is earlier than the currently set // if the derived sequencer window timeout is earlier than the currently set
// timeout. // timeout.
func (c *ChannelBuilder) updateSwTimeout(batch *derive.SingularBatch) { func (c *ChannelBuilder) updateSwTimeout(l1InfoNumber uint64) {
timeout := uint64(batch.EpochNum) + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin timeout := l1InfoNumber + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin
c.updateTimeout(timeout, ErrSeqWindowClose) c.updateTimeout(timeout, ErrSeqWindowClose)
} }
......
...@@ -250,7 +250,7 @@ func FuzzSeqWindowClose(f *testing.F) { ...@@ -250,7 +250,7 @@ func FuzzSeqWindowClose(f *testing.F) {
// Check the timeout // Check the timeout
cb.timeout = timeout cb.timeout = timeout
cb.updateSwTimeout(&derive.SingularBatch{EpochNum: rollup.Epoch(epochNum)}) cb.updateSwTimeout(epochNum)
calculatedTimeout := epochNum + seqWindowSize - subSafetyMargin calculatedTimeout := epochNum + seqWindowSize - subSafetyMargin
if timeout > calculatedTimeout && calculatedTimeout != 0 { if timeout > calculatedTimeout && calculatedTimeout != 0 {
cb.CheckTimeout(calculatedTimeout) cb.CheckTimeout(calculatedTimeout)
...@@ -278,7 +278,7 @@ func FuzzSeqWindowZeroTimeoutClose(f *testing.F) { ...@@ -278,7 +278,7 @@ func FuzzSeqWindowZeroTimeoutClose(f *testing.F) {
// Check the timeout // Check the timeout
cb.timeout = 0 cb.timeout = 0
cb.updateSwTimeout(&derive.SingularBatch{EpochNum: rollup.Epoch(epochNum)}) cb.updateSwTimeout(epochNum)
calculatedTimeout := epochNum + seqWindowSize - subSafetyMargin calculatedTimeout := epochNum + seqWindowSize - subSafetyMargin
cb.CheckTimeout(calculatedTimeout) cb.CheckTimeout(calculatedTimeout)
if cb.timeout != 0 { if cb.timeout != 0 {
......
...@@ -73,7 +73,7 @@ type Writer interface { ...@@ -73,7 +73,7 @@ type Writer interface {
type ChannelOutIface interface { type ChannelOutIface interface {
ID() derive.ChannelID ID() derive.ChannelID
Reset() error Reset() error
AddBlock(rollupCfg *rollup.Config, block *types.Block) error AddBlock(rollupCfg *rollup.Config, block *types.Block) (*derive.L1BlockInfo, error)
ReadyBytes() int ReadyBytes() int
Flush() error Flush() error
Close() error Close() error
...@@ -157,19 +157,19 @@ func (co *GarbageChannelOut) Reset() error { ...@@ -157,19 +157,19 @@ func (co *GarbageChannelOut) Reset() error {
// error that it returns is ErrTooManyRLPBytes. If this error // error that it returns is ErrTooManyRLPBytes. If this error
// is returned, the channel should be closed and a new one // is returned, the channel should be closed and a new one
// should be made. // should be made.
func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) error { func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (*derive.L1BlockInfo, error) {
if co.closed { if co.closed {
return errors.New("already closed") return nil, errors.New("already closed")
} }
batch, err := blockToBatch(rollupCfg, block) batch, l1Info, err := blockToBatch(rollupCfg, block)
if err != nil { if err != nil {
return err return nil, err
} }
// We encode to a temporary buffer to determine the encoded length to // We encode to a temporary buffer to determine the encoded length to
// ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL // ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
var buf bytes.Buffer var buf bytes.Buffer
if err := rlp.Encode(&buf, batch); err != nil { if err := rlp.Encode(&buf, batch); err != nil {
return err return nil, err
} }
if co.cfg.MalformRLP { if co.cfg.MalformRLP {
// Malform the RLP by incrementing the length prefix by 1. // Malform the RLP by incrementing the length prefix by 1.
...@@ -182,13 +182,13 @@ func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Blo ...@@ -182,13 +182,13 @@ func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Blo
chainSpec := rollup.NewChainSpec(rollupCfg) chainSpec := rollup.NewChainSpec(rollupCfg)
maxRLPBytesPerChannel := chainSpec.MaxRLPBytesPerChannel(block.Time()) maxRLPBytesPerChannel := chainSpec.MaxRLPBytesPerChannel(block.Time())
if co.rlpLength+buf.Len() > int(maxRLPBytesPerChannel) { if co.rlpLength+buf.Len() > int(maxRLPBytesPerChannel) {
return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", return nil, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
buf.Len(), co.rlpLength, maxRLPBytesPerChannel, derive.ErrTooManyRLPBytes) buf.Len(), co.rlpLength, maxRLPBytesPerChannel, derive.ErrTooManyRLPBytes)
} }
co.rlpLength += buf.Len() co.rlpLength += buf.Len()
_, err = io.Copy(co.compress, &buf) _, err = io.Copy(co.compress, &buf)
return err return l1Info, err
} }
// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame. // ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
...@@ -256,7 +256,7 @@ func (co *GarbageChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint1 ...@@ -256,7 +256,7 @@ func (co *GarbageChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint1
} }
// blockToBatch transforms a block into a batch object that can easily be RLP encoded. // blockToBatch transforms a block into a batch object that can easily be RLP encoded.
func blockToBatch(rollupCfg *rollup.Config, block *types.Block) (*derive.BatchData, error) { func blockToBatch(rollupCfg *rollup.Config, block *types.Block) (*derive.BatchData, *derive.L1BlockInfo, error) {
opaqueTxs := make([]hexutil.Bytes, 0, len(block.Transactions())) opaqueTxs := make([]hexutil.Bytes, 0, len(block.Transactions()))
for i, tx := range block.Transactions() { for i, tx := range block.Transactions() {
if tx.Type() == types.DepositTxType { if tx.Type() == types.DepositTxType {
...@@ -264,17 +264,17 @@ func blockToBatch(rollupCfg *rollup.Config, block *types.Block) (*derive.BatchDa ...@@ -264,17 +264,17 @@ func blockToBatch(rollupCfg *rollup.Config, block *types.Block) (*derive.BatchDa
} }
otx, err := tx.MarshalBinary() otx, err := tx.MarshalBinary()
if err != nil { if err != nil {
return nil, fmt.Errorf("could not encode tx %v in block %v: %w", i, tx.Hash(), err) return nil, nil, fmt.Errorf("could not encode tx %v in block %v: %w", i, tx.Hash(), err)
} }
opaqueTxs = append(opaqueTxs, otx) opaqueTxs = append(opaqueTxs, otx)
} }
l1InfoTx := block.Transactions()[0] l1InfoTx := block.Transactions()[0]
if l1InfoTx.Type() != types.DepositTxType { if l1InfoTx.Type() != types.DepositTxType {
return nil, derive.ErrNotDepositTx return nil, nil, derive.ErrNotDepositTx
} }
l1Info, err := derive.L1BlockInfoFromBytes(rollupCfg, block.Time(), l1InfoTx.Data()) l1Info, err := derive.L1BlockInfoFromBytes(rollupCfg, block.Time(), l1InfoTx.Data())
if err != nil { if err != nil {
return nil, fmt.Errorf("could not parse the L1 Info deposit: %w", err) return nil, nil, fmt.Errorf("could not parse the L1 Info deposit: %w", err)
} }
singularBatch := &derive.SingularBatch{ singularBatch := &derive.SingularBatch{
...@@ -285,5 +285,5 @@ func blockToBatch(rollupCfg *rollup.Config, block *types.Block) (*derive.BatchDa ...@@ -285,5 +285,5 @@ func blockToBatch(rollupCfg *rollup.Config, block *types.Block) (*derive.BatchDa
Transactions: opaqueTxs, Transactions: opaqueTxs,
} }
return derive.NewBatchData(singularBatch), nil return derive.NewBatchData(singularBatch), l1Info, nil
} }
...@@ -229,7 +229,7 @@ func (s *L2Batcher) Buffer(t Testing, opts ...BlockModifier) error { ...@@ -229,7 +229,7 @@ func (s *L2Batcher) Buffer(t Testing, opts ...BlockModifier) error {
require.NoError(t, err, "failed to create channel") require.NoError(t, err, "failed to create channel")
s.L2ChannelOut = ch s.L2ChannelOut = ch
} }
if err := s.L2ChannelOut.AddBlock(s.rollupCfg, block); err != nil { if _, err := s.L2ChannelOut.AddBlock(s.rollupCfg, block); err != nil {
return err return err
} }
ref, err := s.engCl.L2BlockRefByHash(t.Ctx(), block.Hash()) ref, err := s.engCl.L2BlockRefByHash(t.Ctx(), block.Hash())
......
...@@ -253,7 +253,7 @@ func TestBackupUnsafe(gt *testing.T) { ...@@ -253,7 +253,7 @@ func TestBackupUnsafe(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}}) block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}})
} }
// Add A1, B2, B3, B4, B5 into the channel // Add A1, B2, B3, B4, B5 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block) _, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err) require.NoError(t, err)
} }
...@@ -414,7 +414,7 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) { ...@@ -414,7 +414,7 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}}) block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}})
} }
// Add A1, B2, B3, B4, B5 into the channel // Add A1, B2, B3, B4, B5 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block) _, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err) require.NoError(t, err)
} }
...@@ -547,7 +547,7 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) { ...@@ -547,7 +547,7 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}}) block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}})
} }
// Add A1, B2, B3, B4, B5 into the channel // Add A1, B2, B3, B4, B5 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block) _, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err) require.NoError(t, err)
} }
...@@ -924,7 +924,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) { ...@@ -924,7 +924,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}}) block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}})
} }
// Add A1 ~ A12 into the channel // Add A1 ~ A12 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block) _, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err) require.NoError(t, err)
} }
...@@ -973,7 +973,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) { ...@@ -973,7 +973,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], tx}}) block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], tx}})
} }
// Add B1, A2 ~ A12 into the channel // Add B1, A2 ~ A12 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block) _, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err) require.NoError(t, err)
} }
// Submit span batch(B1, A2, ... A12) // Submit span batch(B1, A2, ... A12)
......
...@@ -10,6 +10,9 @@ import ( ...@@ -10,6 +10,9 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -101,6 +104,40 @@ func channelOutByType(b *testing.B, batchType uint, cd compressorDetails) (deriv ...@@ -101,6 +104,40 @@ func channelOutByType(b *testing.B, batchType uint, cd compressorDetails) (deriv
return nil, fmt.Errorf("unsupported batch type: %d", batchType) return nil, fmt.Errorf("unsupported batch type: %d", batchType)
} }
func randomBlock(cfg *rollup.Config, rng *rand.Rand, txCount int, timestamp uint64) (*types.Block, error) {
batch := derive.RandomSingularBatch(rng, txCount, cfg.L2ChainID)
batch.Timestamp = timestamp
return singularBatchToBlock(cfg, batch)
}
// singularBatchToBlock converts a singular batch to a block for use in the benchmarks. This function
// should only be used for testing purposes, as the batch input doesn't contain the necessary information
// to build the full block (only non-deposit transactions and a subset of header fields are populated).
func singularBatchToBlock(rollupCfg *rollup.Config, batch *derive.SingularBatch) (*types.Block, error) {
l1InfoTx, err := derive.L1InfoDeposit(rollupCfg, eth.SystemConfig{}, 0, &testutils.MockBlockInfo{
InfoNum: uint64(batch.EpochNum),
InfoHash: batch.EpochHash,
}, batch.Timestamp)
if err != nil {
return nil, fmt.Errorf("could not build L1 Info transaction: %w", err)
}
txs := []*types.Transaction{types.NewTx(l1InfoTx)}
for i, opaqueTx := range batch.Transactions {
var tx types.Transaction
err = tx.UnmarshalBinary(opaqueTx)
if err != nil {
return nil, fmt.Errorf("could not decode tx %d: %w", i, err)
}
txs = append(txs, &tx)
}
return types.NewBlockWithHeader(&types.Header{
ParentHash: batch.ParentHash,
Time: batch.Timestamp,
}).WithBody(types.Body{
Transactions: txs,
}), nil
}
// a test case for the benchmark controls the number of batches and transactions per batch, // a test case for the benchmark controls the number of batches and transactions per batch,
// as well as the batch type and compressor used // as well as the batch type and compressor used
type BatchingBenchmarkTC struct { type BatchingBenchmarkTC struct {
...@@ -155,16 +192,17 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) { ...@@ -155,16 +192,17 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
} }
for _, tc := range tests { for _, tc := range tests {
chainID := big.NewInt(333) cfg := &rollup.Config{L2ChainID: big.NewInt(333)}
rng := rand.New(rand.NewSource(0x543331)) rng := rand.New(rand.NewSource(0x543331))
// pre-generate batches to keep the benchmark from including the random generation // pre-generate batches to keep the benchmark from including the random generation
batches := make([]*derive.SingularBatch, tc.BatchCount) blocks := make([]*types.Block, tc.BatchCount)
t := time.Now() t := time.Now()
for i := 0; i < tc.BatchCount; i++ { for i := 0; i < tc.BatchCount; i++ {
batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID)
// set the timestamp to increase with each batch // set the timestamp to increase with each batch
// to leverage optimizations in the Batch Linked List // to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix()) var err error
blocks[i], err = randomBlock(cfg, rng, tc.txPerBatch, uint64(t.Add(time.Duration(i)*time.Second).Unix()))
require.NoError(b, err)
} }
b.Run(tc.String(), func(b *testing.B) { b.Run(tc.String(), func(b *testing.B) {
// reset the compressor used in the test case // reset the compressor used in the test case
...@@ -174,13 +212,13 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) { ...@@ -174,13 +212,13 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
cout, _ := channelOutByType(b, tc.BatchType, tc.cd) cout, _ := channelOutByType(b, tc.BatchType, tc.cd)
// add all but the final batch to the channel out // add all but the final batch to the channel out
for i := 0; i < tc.BatchCount-1; i++ { for i := 0; i < tc.BatchCount-1; i++ {
err := cout.AddSingularBatch(batches[i], 0) _, err := cout.AddBlock(cfg, blocks[i])
require.NoError(b, err) require.NoError(b, err)
} }
// measure the time to add the final batch // measure the time to add the final batch
b.StartTimer() b.StartTimer()
// add the final batch to the channel out // add the final batch to the channel out
err := cout.AddSingularBatch(batches[tc.BatchCount-1], 0) _, err := cout.AddBlock(cfg, blocks[tc.BatchCount-1])
require.NoError(b, err) require.NoError(b, err)
} }
}) })
...@@ -193,7 +231,7 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) { ...@@ -193,7 +231,7 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
// Hint: use -benchtime=1x to run the benchmarks for a single iteration // Hint: use -benchtime=1x to run the benchmarks for a single iteration
// it is not currently designed to use b.N // it is not currently designed to use b.N
func BenchmarkIncremental(b *testing.B) { func BenchmarkIncremental(b *testing.B) {
chainID := big.NewInt(333) cfg := &rollup.Config{L2ChainID: big.NewInt(333)}
rng := rand.New(rand.NewSource(0x543331)) rng := rand.New(rand.NewSource(0x543331))
// use the real compressor for this benchmark // use the real compressor for this benchmark
// use batchCount as the number of batches to add in each benchmark iteration // use batchCount as the number of batches to add in each benchmark iteration
...@@ -226,17 +264,20 @@ func BenchmarkIncremental(b *testing.B) { ...@@ -226,17 +264,20 @@ func BenchmarkIncremental(b *testing.B) {
b.StopTimer() b.StopTimer()
// prepare the batches // prepare the batches
t := time.Now() t := time.Now()
batches := make([]*derive.SingularBatch, tc.BatchCount) blocks := make([]*types.Block, tc.BatchCount)
for i := 0; i < tc.BatchCount; i++ { for i := 0; i < tc.BatchCount; i++ {
t := t.Add(time.Second)
batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID)
// set the timestamp to increase with each batch // set the timestamp to increase with each batch
// to leverage optimizations in the Batch Linked List // to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Unix()) t = t.Add(time.Second)
blocks[i], err = randomBlock(cfg, rng, tc.txPerBatch, uint64(t.Unix()))
if err != nil {
done = true
return
}
} }
b.StartTimer() b.StartTimer()
for i := 0; i < tc.BatchCount; i++ { for i := 0; i < tc.BatchCount; i++ {
err := cout.AddSingularBatch(batches[i], 0) _, err := cout.AddBlock(cfg, blocks[i])
if err != nil { if err != nil {
done = true done = true
return return
...@@ -280,16 +321,17 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) { ...@@ -280,16 +321,17 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
} }
for _, tc := range tests { for _, tc := range tests {
chainID := big.NewInt(333) cfg := &rollup.Config{L2ChainID: big.NewInt(333)}
rng := rand.New(rand.NewSource(0x543331)) rng := rand.New(rand.NewSource(0x543331))
// pre-generate batches to keep the benchmark from including the random generation // pre-generate batches to keep the benchmark from including the random generation
batches := make([]*derive.SingularBatch, tc.BatchCount) blocks := make([]*types.Block, tc.BatchCount)
t := time.Now() t := time.Now()
for i := 0; i < tc.BatchCount; i++ { for i := 0; i < tc.BatchCount; i++ {
batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID)
// set the timestamp to increase with each batch // set the timestamp to increase with each batch
// to leverage optimizations in the Batch Linked List // to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix()) var err error
blocks[i], err = randomBlock(cfg, rng, tc.txPerBatch, uint64(t.Add(time.Duration(i)*time.Second).Unix()))
require.NoError(b, err)
} }
b.Run(tc.String(), func(b *testing.B) { b.Run(tc.String(), func(b *testing.B) {
// reset the compressor used in the test case // reset the compressor used in the test case
...@@ -300,7 +342,7 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) { ...@@ -300,7 +342,7 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
b.StartTimer() b.StartTimer()
// add all batches to the channel out // add all batches to the channel out
for i := 0; i < tc.BatchCount; i++ { for i := 0; i < tc.BatchCount; i++ {
err := cout.AddSingularBatch(batches[i], 0) _, err := cout.AddBlock(cfg, blocks[i])
require.NoError(b, err) require.NoError(b, err)
} }
} }
......
...@@ -53,8 +53,7 @@ type Compressor interface { ...@@ -53,8 +53,7 @@ type Compressor interface {
type ChannelOut interface { type ChannelOut interface {
ID() ChannelID ID() ChannelID
Reset() error Reset() error
AddBlock(*rollup.Config, *types.Block) error AddBlock(*rollup.Config, *types.Block) (*L1BlockInfo, error)
AddSingularBatch(*SingularBatch, uint64) error
InputBytes() int InputBytes() int
ReadyBytes() int ReadyBytes() int
Flush() error Flush() error
...@@ -107,31 +106,27 @@ func (co *SingularChannelOut) Reset() error { ...@@ -107,31 +106,27 @@ func (co *SingularChannelOut) Reset() error {
return err return err
} }
// AddBlock adds a block to the channel. It returns the RLP encoded byte size // AddBlock adds a block to the channel. It returns the block's L1BlockInfo
// and an error if there is a problem adding the block. The only sentinel error // and an error if there is a problem adding the block. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel // that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made. // should be closed and a new one should be made.
func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) error { func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (*L1BlockInfo, error) {
if co.closed { if co.closed {
return ErrChannelOutAlreadyClosed return nil, ErrChannelOutAlreadyClosed
} }
batch, l1Info, err := BlockToSingularBatch(rollupCfg, block) batch, l1Info, err := BlockToSingularBatch(rollupCfg, block)
if err != nil { if err != nil {
return err return nil, fmt.Errorf("converting block to batch: %w", err)
} }
return co.AddSingularBatch(batch, l1Info.SequenceNumber) return l1Info, co.addSingularBatch(batch, l1Info.SequenceNumber)
} }
// AddSingularBatch adds a batch to the channel. It returns the RLP encoded byte size // addSingularBatch adds a batch to the channel. It returns
// and an error if there is a problem adding the batch. The only sentinel error // an error if there is a problem adding the batch. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel // that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made. // should be closed and a new one should be made.
// func (co *SingularChannelOut) addSingularBatch(batch *SingularBatch, _ uint64) error {
// AddSingularBatch should be used together with BlockToBatch if you need to access the
// BatchData before adding a block to the channel. It isn't possible to access
// the batch data with AddBlock.
func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch, _ uint64) error {
if co.closed { if co.closed {
return ErrChannelOutAlreadyClosed return ErrChannelOutAlreadyClosed
} }
......
...@@ -45,14 +45,19 @@ func (s *nonCompressor) FullErr() error { ...@@ -45,14 +45,19 @@ func (s *nonCompressor) FullErr() error {
return nil return nil
} }
type channelOut interface {
ChannelOut
addSingularBatch(batch *SingularBatch, seqNum uint64) error
}
// channelTypes allows tests to run against different channel types // channelTypes allows tests to run against different channel types
var channelTypes = []struct { var channelTypes = []struct {
ChannelOut func(t *testing.T, rcfg *rollup.Config) ChannelOut ChannelOut func(t *testing.T, rcfg *rollup.Config) channelOut
Name string Name string
}{ }{
{ {
Name: "Singular", Name: "Singular",
ChannelOut: func(t *testing.T, rcfg *rollup.Config) ChannelOut { ChannelOut: func(t *testing.T, rcfg *rollup.Config) channelOut {
cout, err := NewSingularChannelOut(&nonCompressor{}, rollup.NewChainSpec(rcfg)) cout, err := NewSingularChannelOut(&nonCompressor{}, rollup.NewChainSpec(rcfg))
require.NoError(t, err) require.NoError(t, err)
return cout return cout
...@@ -60,7 +65,7 @@ var channelTypes = []struct { ...@@ -60,7 +65,7 @@ var channelTypes = []struct {
}, },
{ {
Name: "Span", Name: "Span",
ChannelOut: func(t *testing.T, rcfg *rollup.Config) ChannelOut { ChannelOut: func(t *testing.T, rcfg *rollup.Config) channelOut {
cout, err := NewSpanChannelOut(128_000, Zlib, rollup.NewChainSpec(rcfg)) cout, err := NewSpanChannelOut(128_000, Zlib, rollup.NewChainSpec(rcfg))
require.NoError(t, err) require.NoError(t, err)
return cout return cout
...@@ -80,9 +85,9 @@ func TestChannelOutAddBlock(t *testing.T) { ...@@ -80,9 +85,9 @@ func TestChannelOutAddBlock(t *testing.T) {
}, },
}, },
) )
err := cout.AddBlock(&rollupCfg, block) _, err := cout.AddBlock(&rollupCfg, block)
require.Error(t, err) require.Error(t, err)
require.Equal(t, ErrNotDepositTx, err) require.ErrorIs(t, err, ErrNotDepositTx)
}) })
} }
} }
...@@ -114,7 +119,7 @@ func TestOutputFrameNoEmptyLastFrame(t *testing.T) { ...@@ -114,7 +119,7 @@ func TestOutputFrameNoEmptyLastFrame(t *testing.T) {
txCount := 1 txCount := 1
singularBatch := RandomSingularBatch(rng, txCount, rollupCfg.L2ChainID) singularBatch := RandomSingularBatch(rng, txCount, rollupCfg.L2ChainID)
err := cout.AddSingularBatch(singularBatch, 0) err := cout.addSingularBatch(singularBatch, 0)
var written uint64 var written uint64
require.NoError(t, err) require.NoError(t, err)
...@@ -274,7 +279,7 @@ func funcName(fn any) string { ...@@ -274,7 +279,7 @@ func funcName(fn any) string {
func SpanChannelOutCompressionOnlyOneBatch(t *testing.T, algo CompressionAlgo) { func SpanChannelOutCompressionOnlyOneBatch(t *testing.T, algo CompressionAlgo) {
cout, singularBatches := SpanChannelAndBatches(t, 300, 2, algo) cout, singularBatches := SpanChannelAndBatches(t, 300, 2, algo)
err := cout.AddSingularBatch(singularBatches[0], 0) err := cout.addSingularBatch(singularBatches[0], 0)
// confirm compression was not skipped // confirm compression was not skipped
require.Greater(t, cout.compressor.Len(), 0) require.Greater(t, cout.compressor.Len(), 0)
require.NoError(t, err) require.NoError(t, err)
...@@ -283,7 +288,7 @@ func SpanChannelOutCompressionOnlyOneBatch(t *testing.T, algo CompressionAlgo) { ...@@ -283,7 +288,7 @@ func SpanChannelOutCompressionOnlyOneBatch(t *testing.T, algo CompressionAlgo) {
require.ErrorIs(t, cout.FullErr(), ErrCompressorFull) require.ErrorIs(t, cout.FullErr(), ErrCompressorFull)
// confirm adding another batch would cause the same full error // confirm adding another batch would cause the same full error
err = cout.AddSingularBatch(singularBatches[1], 0) err = cout.addSingularBatch(singularBatches[1], 0)
require.ErrorIs(t, err, ErrCompressorFull) require.ErrorIs(t, err, ErrCompressorFull)
} }
...@@ -292,7 +297,7 @@ func SpanChannelOutCompressionUndo(t *testing.T, algo CompressionAlgo) { ...@@ -292,7 +297,7 @@ func SpanChannelOutCompressionUndo(t *testing.T, algo CompressionAlgo) {
// target is larger than one batch, but smaller than two batches // target is larger than one batch, but smaller than two batches
cout, singularBatches := SpanChannelAndBatches(t, 1100, 2, algo) cout, singularBatches := SpanChannelAndBatches(t, 1100, 2, algo)
err := cout.AddSingularBatch(singularBatches[0], 0) err := cout.addSingularBatch(singularBatches[0], 0)
require.NoError(t, err) require.NoError(t, err)
// confirm that the first compression was skipped // confirm that the first compression was skipped
if algo == Zlib { if algo == Zlib {
...@@ -303,7 +308,7 @@ func SpanChannelOutCompressionUndo(t *testing.T, algo CompressionAlgo) { ...@@ -303,7 +308,7 @@ func SpanChannelOutCompressionUndo(t *testing.T, algo CompressionAlgo) {
// record the RLP length to confirm it doesn't change when adding a rejected batch // record the RLP length to confirm it doesn't change when adding a rejected batch
rlp1 := cout.activeRLP().Len() rlp1 := cout.activeRLP().Len()
err = cout.AddSingularBatch(singularBatches[1], 0) err = cout.addSingularBatch(singularBatches[1], 0)
require.ErrorIs(t, err, ErrCompressorFull) require.ErrorIs(t, err, ErrCompressorFull)
// confirm that the second compression was not skipped // confirm that the second compression was not skipped
require.Greater(t, cout.compressor.Len(), 0) require.Greater(t, cout.compressor.Len(), 0)
...@@ -318,7 +323,7 @@ func SpanChannelOutClose(t *testing.T, algo CompressionAlgo) { ...@@ -318,7 +323,7 @@ func SpanChannelOutClose(t *testing.T, algo CompressionAlgo) {
target := uint64(1100) target := uint64(1100)
cout, singularBatches := SpanChannelAndBatches(t, target, 1, algo) cout, singularBatches := SpanChannelAndBatches(t, target, 1, algo)
err := cout.AddSingularBatch(singularBatches[0], 0) err := cout.addSingularBatch(singularBatches[0], 0)
require.NoError(t, err) require.NoError(t, err)
// confirm no compression has happened yet // confirm no compression has happened yet
...@@ -413,7 +418,7 @@ func testSpanChannelOut_MaxBlocksPerSpanBatch(t *testing.T, tt maxBlocksTest) { ...@@ -413,7 +418,7 @@ func testSpanChannelOut_MaxBlocksPerSpanBatch(t *testing.T, tt maxBlocksTest) {
for i, b := range bs { for i, b := range bs {
b.EpochNum = rollup.Epoch(l1Origin.Number) b.EpochNum = rollup.Epoch(l1Origin.Number)
b.EpochHash = l1Origin.Hash b.EpochHash = l1Origin.Hash
err := cout.AddSingularBatch(b, uint64(i)) err := cout.addSingularBatch(b, uint64(i))
if i != tt.numBatches-1 || tt.exactFull { if i != tt.numBatches-1 || tt.exactFull {
require.NoErrorf(t, err, "iteration %d", i) require.NoErrorf(t, err, "iteration %d", i)
} else { } else {
......
...@@ -122,27 +122,27 @@ func (co *SpanChannelOut) swapRLP() { ...@@ -122,27 +122,27 @@ func (co *SpanChannelOut) swapRLP() {
co.rlpIndex = (co.rlpIndex + 1) % 2 co.rlpIndex = (co.rlpIndex + 1) % 2
} }
// AddBlock adds a block to the channel. // AddBlock adds a block to the channel. It returns the block's L1BlockInfo
// returns an error if there is a problem adding the block. The only sentinel error // and an error if there is a problem adding the block. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel // that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made. // should be closed and a new one should be made.
func (co *SpanChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) error { func (co *SpanChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (*L1BlockInfo, error) {
if co.closed { if co.closed {
return ErrChannelOutAlreadyClosed return nil, ErrChannelOutAlreadyClosed
} }
batch, l1Info, err := BlockToSingularBatch(rollupCfg, block) batch, l1Info, err := BlockToSingularBatch(rollupCfg, block)
if err != nil { if err != nil {
return err return nil, fmt.Errorf("converting block to batch: %w", err)
} }
return co.AddSingularBatch(batch, l1Info.SequenceNumber) return l1Info, co.addSingularBatch(batch, l1Info.SequenceNumber)
} }
// AddSingularBatch adds a SingularBatch to the channel, compressing the data if necessary. // addSingularBatch adds a SingularBatch to the channel, compressing the data if necessary.
// if the new batch would make the channel exceed the target size, the last batch is reverted, // if the new batch would make the channel exceed the target size, the last batch is reverted,
// and the compression happens on the previous RLP buffer instead // and the compression happens on the previous RLP buffer instead
// if the input is too small to need compression, data is accumulated but not compressed // if the input is too small to need compression, data is accumulated but not compressed
func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) error { func (co *SpanChannelOut) addSingularBatch(batch *SingularBatch, seqNum uint64) error {
// sentinel error for closed or full channel // sentinel error for closed or full channel
if co.closed { if co.closed {
return ErrChannelOutAlreadyClosed return ErrChannelOutAlreadyClosed
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment