Commit 460bb3f3 authored by Tei Im's avatar Tei Im Committed by protolambda

Implement span batch submission for op-batcher

parent d5f9ebfe
...@@ -26,8 +26,8 @@ type channel struct { ...@@ -26,8 +26,8 @@ type channel struct {
confirmedTransactions map[txID]eth.BlockID confirmedTransactions map[txID]eth.BlockID
} }
func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) (*channel, error) { func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, spanBatchBuilder *derive.SpanBatchBuilder) (*channel, error) {
cb, err := newChannelBuilder(cfg) cb, err := newChannelBuilder(cfg, spanBatchBuilder)
if err != nil { if err != nil {
return nil, fmt.Errorf("creating new channel: %w", err) return nil, fmt.Errorf("creating new channel: %w", err)
} }
......
...@@ -58,6 +58,9 @@ type ChannelConfig struct { ...@@ -58,6 +58,9 @@ type ChannelConfig struct {
// CompressorConfig contains the configuration for creating new compressors. // CompressorConfig contains the configuration for creating new compressors.
CompressorConfig compressor.Config CompressorConfig compressor.Config
// BatchType indicates whether the channel uses SingularBatch or SpanBatch.
BatchType uint
} }
// Check validates the [ChannelConfig] parameters. // Check validates the [ChannelConfig] parameters.
...@@ -83,6 +86,10 @@ func (cc *ChannelConfig) Check() error { ...@@ -83,6 +86,10 @@ func (cc *ChannelConfig) Check() error {
return fmt.Errorf("max frame size %d is less than the minimum 23", cc.MaxFrameSize) return fmt.Errorf("max frame size %d is less than the minimum 23", cc.MaxFrameSize)
} }
if cc.BatchType > derive.SpanBatchType {
return fmt.Errorf("unrecognized batch type: %d", cc.BatchType)
}
return nil return nil
} }
...@@ -127,12 +134,12 @@ type channelBuilder struct { ...@@ -127,12 +134,12 @@ type channelBuilder struct {
// newChannelBuilder creates a new channel builder or returns an error if the // newChannelBuilder creates a new channel builder or returns an error if the
// channel out could not be created. // channel out could not be created.
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) { func newChannelBuilder(cfg ChannelConfig, spanBatchBuilder *derive.SpanBatchBuilder) (*channelBuilder, error) {
c, err := cfg.CompressorConfig.NewCompressor() c, err := cfg.CompressorConfig.NewCompressor()
if err != nil { if err != nil {
return nil, err return nil, err
} }
co, err := derive.NewChannelOut(c) co, err := derive.NewChannelOut(c, cfg.BatchType, spanBatchBuilder)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -194,12 +201,12 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error ...@@ -194,12 +201,12 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error
return derive.L1BlockInfo{}, c.FullErr() return derive.L1BlockInfo{}, c.FullErr()
} }
batch, l1info, err := derive.BlockToBatch(block) batch, l1info, err := derive.BlockToSingularBatch(block)
if err != nil { if err != nil {
return l1info, fmt.Errorf("converting block to batch: %w", err) return l1info, fmt.Errorf("converting block to batch: %w", err)
} }
if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) { if _, err = c.co.AddSingularBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
c.setFullErr(err) c.setFullErr(err)
return l1info, c.FullErr() return l1info, c.FullErr()
} else if err != nil { } else if err != nil {
...@@ -252,7 +259,7 @@ func (c *channelBuilder) updateDurationTimeout(l1BlockNum uint64) { ...@@ -252,7 +259,7 @@ func (c *channelBuilder) updateDurationTimeout(l1BlockNum uint64) {
// derived from the batch's origin L1 block. The timeout is only moved forward // derived from the batch's origin L1 block. The timeout is only moved forward
// if the derived sequencer window timeout is earlier than the currently set // if the derived sequencer window timeout is earlier than the currently set
// timeout. // timeout.
func (c *channelBuilder) updateSwTimeout(batch *derive.BatchData) { func (c *channelBuilder) updateSwTimeout(batch *derive.SingularBatch) {
timeout := uint64(batch.EpochNum) + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin timeout := uint64(batch.EpochNum) + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin
c.updateTimeout(timeout, ErrSeqWindowClose) c.updateTimeout(timeout, ErrSeqWindowClose)
} }
......
...@@ -32,6 +32,15 @@ var defaultTestChannelConfig = ChannelConfig{ ...@@ -32,6 +32,15 @@ var defaultTestChannelConfig = ChannelConfig{
TargetNumFrames: 1, TargetNumFrames: 1,
ApproxComprRatio: 0.4, ApproxComprRatio: 0.4,
}, },
BatchType: derive.SingularBatchType,
}
func getSpanBatchBuilder(batchType uint) *derive.SpanBatchBuilder {
if batchType == derive.SpanBatchType {
chainId := big.NewInt(1234)
return derive.NewSpanBatchBuilder(uint64(0), uint64(0), chainId)
}
return nil
} }
// TestChannelConfig_Check tests the [ChannelConfig] [Check] function. // TestChannelConfig_Check tests the [ChannelConfig] [Check] function.
...@@ -158,8 +167,9 @@ func newMiniL2BlockWithNumberParent(numTx int, number *big.Int, parent common.Ha ...@@ -158,8 +167,9 @@ func newMiniL2BlockWithNumberParent(numTx int, number *big.Int, parent common.Ha
// addTooManyBlocks adds blocks to the channel until it hits an error, // addTooManyBlocks adds blocks to the channel until it hits an error,
// which is presumably ErrTooManyRLPBytes. // which is presumably ErrTooManyRLPBytes.
func addTooManyBlocks(cb *channelBuilder) error { func addTooManyBlocks(cb *channelBuilder) error {
rng := rand.New(rand.NewSource(1234))
for i := 0; i < 10_000; i++ { for i := 0; i < 10_000; i++ {
block := newMiniL2Block(100) block, _ := dtest.RandomL2Block(rng, 1000)
_, err := cb.AddBlock(block) _, err := cb.AddBlock(block)
if err != nil { if err != nil {
return err return err
...@@ -178,7 +188,7 @@ func FuzzDurationTimeoutZeroMaxChannelDuration(f *testing.F) { ...@@ -178,7 +188,7 @@ func FuzzDurationTimeoutZeroMaxChannelDuration(f *testing.F) {
f.Fuzz(func(t *testing.T, l1BlockNum uint64) { f.Fuzz(func(t *testing.T, l1BlockNum uint64) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.MaxChannelDuration = 0 channelConfig.MaxChannelDuration = 0
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, nil)
require.NoError(t, err) require.NoError(t, err)
cb.timeout = 0 cb.timeout = 0
cb.updateDurationTimeout(l1BlockNum) cb.updateDurationTimeout(l1BlockNum)
...@@ -201,7 +211,7 @@ func FuzzChannelBuilder_DurationZero(f *testing.F) { ...@@ -201,7 +211,7 @@ func FuzzChannelBuilder_DurationZero(f *testing.F) {
// Create the channel builder // Create the channel builder
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.MaxChannelDuration = maxChannelDuration channelConfig.MaxChannelDuration = maxChannelDuration
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, nil)
require.NoError(t, err) require.NoError(t, err)
// Whenever the timeout is set to 0, the channel builder should have a duration timeout // Whenever the timeout is set to 0, the channel builder should have a duration timeout
...@@ -228,7 +238,7 @@ func FuzzDurationTimeoutMaxChannelDuration(f *testing.F) { ...@@ -228,7 +238,7 @@ func FuzzDurationTimeoutMaxChannelDuration(f *testing.F) {
// Create the channel builder // Create the channel builder
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.MaxChannelDuration = maxChannelDuration channelConfig.MaxChannelDuration = maxChannelDuration
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, nil)
require.NoError(t, err) require.NoError(t, err)
// Whenever the timeout is greater than the l1BlockNum, // Whenever the timeout is greater than the l1BlockNum,
...@@ -262,7 +272,7 @@ func FuzzChannelCloseTimeout(f *testing.F) { ...@@ -262,7 +272,7 @@ func FuzzChannelCloseTimeout(f *testing.F) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.ChannelTimeout = channelTimeout channelConfig.ChannelTimeout = channelTimeout
channelConfig.SubSafetyMargin = subSafetyMargin channelConfig.SubSafetyMargin = subSafetyMargin
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, nil)
require.NoError(t, err) require.NoError(t, err)
// Check the timeout // Check the timeout
...@@ -290,7 +300,7 @@ func FuzzChannelZeroCloseTimeout(f *testing.F) { ...@@ -290,7 +300,7 @@ func FuzzChannelZeroCloseTimeout(f *testing.F) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.ChannelTimeout = channelTimeout channelConfig.ChannelTimeout = channelTimeout
channelConfig.SubSafetyMargin = subSafetyMargin channelConfig.SubSafetyMargin = subSafetyMargin
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, nil)
require.NoError(t, err) require.NoError(t, err)
// Check the timeout // Check the timeout
...@@ -317,16 +327,12 @@ func FuzzSeqWindowClose(f *testing.F) { ...@@ -317,16 +327,12 @@ func FuzzSeqWindowClose(f *testing.F) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.SeqWindowSize = seqWindowSize channelConfig.SeqWindowSize = seqWindowSize
channelConfig.SubSafetyMargin = subSafetyMargin channelConfig.SubSafetyMargin = subSafetyMargin
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, nil)
require.NoError(t, err) require.NoError(t, err)
// Check the timeout // Check the timeout
cb.timeout = timeout cb.timeout = timeout
cb.updateSwTimeout(derive.NewSingularBatchData( cb.updateSwTimeout(&derive.SingularBatch{EpochNum: rollup.Epoch(epochNum)})
derive.SingularBatch{
EpochNum: rollup.Epoch(epochNum),
},
))
calculatedTimeout := epochNum + seqWindowSize - subSafetyMargin calculatedTimeout := epochNum + seqWindowSize - subSafetyMargin
if timeout > calculatedTimeout && calculatedTimeout != 0 { if timeout > calculatedTimeout && calculatedTimeout != 0 {
cb.checkTimeout(calculatedTimeout) cb.checkTimeout(calculatedTimeout)
...@@ -349,16 +355,12 @@ func FuzzSeqWindowZeroTimeoutClose(f *testing.F) { ...@@ -349,16 +355,12 @@ func FuzzSeqWindowZeroTimeoutClose(f *testing.F) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.SeqWindowSize = seqWindowSize channelConfig.SeqWindowSize = seqWindowSize
channelConfig.SubSafetyMargin = subSafetyMargin channelConfig.SubSafetyMargin = subSafetyMargin
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, nil)
require.NoError(t, err) require.NoError(t, err)
// Check the timeout // Check the timeout
cb.timeout = 0 cb.timeout = 0
cb.updateSwTimeout(derive.NewSingularBatchData( cb.updateSwTimeout(&derive.SingularBatch{EpochNum: rollup.Epoch(epochNum)})
derive.SingularBatch{
EpochNum: rollup.Epoch(epochNum),
},
))
calculatedTimeout := epochNum + seqWindowSize - subSafetyMargin calculatedTimeout := epochNum + seqWindowSize - subSafetyMargin
cb.checkTimeout(calculatedTimeout) cb.checkTimeout(calculatedTimeout)
if cb.timeout != 0 { if cb.timeout != 0 {
...@@ -367,12 +369,40 @@ func FuzzSeqWindowZeroTimeoutClose(f *testing.F) { ...@@ -367,12 +369,40 @@ func FuzzSeqWindowZeroTimeoutClose(f *testing.F) {
}) })
} }
func TestChannelBuilderBatchType(t *testing.T) {
tests := []struct {
name string
f func(t *testing.T, batchType uint)
}{
{"ChannelBuilder_MaxRLPBytesPerChannel", ChannelBuilder_MaxRLPBytesPerChannel},
{"ChannelBuilder_OutputFramesMaxFrameIndex", ChannelBuilder_OutputFramesMaxFrameIndex},
{"ChannelBuilder_AddBlock", ChannelBuilder_AddBlock},
{"ChannelBuilder_Reset", ChannelBuilder_Reset},
{"ChannelBuilder_PendingFrames_TotalFrames", ChannelBuilder_PendingFrames_TotalFrames},
{"ChannelBuilder_InputBytes", ChannelBuilder_InputBytes},
{"ChannelBuilder_OutputBytes", ChannelBuilder_OutputBytes},
}
for _, test := range tests {
test := test
t.Run(test.name+"_SingularBatch", func(t *testing.T) {
test.f(t, derive.SingularBatchType)
})
}
for _, test := range tests {
test := test
t.Run(test.name+"_SpanBatch", func(t *testing.T) {
test.f(t, derive.SpanBatchType)
})
}
}
// TestChannelBuilder_NextFrame tests calling NextFrame on a ChannelBuilder with only one frame // TestChannelBuilder_NextFrame tests calling NextFrame on a ChannelBuilder with only one frame
func TestChannelBuilder_NextFrame(t *testing.T) { func TestChannelBuilder_NextFrame(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
// Create a new channel builder // Create a new channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, nil)
require.NoError(t, err) require.NoError(t, err)
// Mock the internals of `channelBuilder.outputFrame` // Mock the internals of `channelBuilder.outputFrame`
...@@ -412,14 +442,14 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) { ...@@ -412,14 +442,14 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
// Construct a channel builder // Construct a channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, nil)
require.NoError(t, err) require.NoError(t, err)
// Mock the internals of `channelBuilder.outputFrame` // Mock the internals of `channelBuilder.outputFrame`
// to construct a single frame // to construct a single frame
c, err := channelConfig.CompressorConfig.NewCompressor() c, err := channelConfig.CompressorConfig.NewCompressor()
require.NoError(t, err) require.NoError(t, err)
co, err := derive.NewChannelOut(c) co, err := derive.NewChannelOut(c, derive.SingularBatchType, nil)
require.NoError(t, err) require.NoError(t, err)
var buf bytes.Buffer var buf bytes.Buffer
fn, err := co.OutputFrame(&buf, channelConfig.MaxFrameSize) fn, err := co.OutputFrame(&buf, channelConfig.MaxFrameSize)
...@@ -445,7 +475,7 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) { ...@@ -445,7 +475,7 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) {
channelConfig.MaxFrameSize = 24 channelConfig.MaxFrameSize = 24
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, nil)
require.NoError(t, err) require.NoError(t, err)
require.False(t, cb.IsFull()) require.False(t, cb.IsFull())
require.Equal(t, 0, cb.PendingFrames()) require.Equal(t, 0, cb.PendingFrames())
...@@ -480,17 +510,68 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) { ...@@ -480,17 +510,68 @@ func TestChannelBuilder_OutputFramesWorks(t *testing.T) {
} }
} }
// TestChannelBuilder_MaxRLPBytesPerChannel tests the [channelBuilder.OutputFrames] // TestChannelBuilder_OutputFramesWorks tests the [ChannelBuilder] OutputFrames is successful.
func TestChannelBuilder_OutputFramesWorks_SpanBatch(t *testing.T) {
channelConfig := defaultTestChannelConfig
channelConfig.MaxFrameSize = 24
channelConfig.CompressorConfig.TargetFrameSize = 50
channelConfig.BatchType = derive.SpanBatchType
// Construct the channel builder
cb, err := newChannelBuilder(channelConfig, getSpanBatchBuilder(derive.SpanBatchType))
require.NoError(t, err)
require.False(t, cb.IsFull())
require.Equal(t, 0, cb.PendingFrames())
// Calling OutputFrames without having called [AddBlock]
// should return no error
require.NoError(t, cb.OutputFrames())
// There should be no ready bytes yet
require.Equal(t, 0, cb.co.ReadyBytes())
// fill up
for {
err = addMiniBlock(cb)
if err == nil {
require.False(t, cb.IsFull())
// There should be no ready bytes until the channel is full
require.Equal(t, cb.co.ReadyBytes(), 0)
} else {
require.ErrorIs(t, err, derive.CompressorFullErr)
break
}
}
require.True(t, cb.IsFull())
// Check how many ready bytes
// There should be more than the max frame size ready
require.Greater(t, uint64(cb.co.ReadyBytes()), channelConfig.MaxFrameSize)
require.Equal(t, 0, cb.PendingFrames())
// We should be able to output the frames
require.NoError(t, cb.OutputFrames())
// There should be many frames in the channel builder now
require.Greater(t, cb.PendingFrames(), 1)
for i := 0; i < cb.numFrames-1; i++ {
require.Len(t, cb.frames[i].data, int(channelConfig.MaxFrameSize))
}
require.LessOrEqual(t, len(cb.frames[len(cb.frames)-1].data), int(channelConfig.MaxFrameSize))
}
// ChannelBuilder_MaxRLPBytesPerChannel tests the [channelBuilder.OutputFrames]
// function errors when the max RLP bytes per channel is reached. // function errors when the max RLP bytes per channel is reached.
func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) { func ChannelBuilder_MaxRLPBytesPerChannel(t *testing.T, batchType uint) {
t.Parallel() t.Parallel()
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.MaxFrameSize = derive.MaxRLPBytesPerChannel * 2 channelConfig.MaxFrameSize = derive.MaxRLPBytesPerChannel * 2
channelConfig.CompressorConfig.TargetFrameSize = derive.MaxRLPBytesPerChannel * 2 channelConfig.CompressorConfig.TargetFrameSize = derive.MaxRLPBytesPerChannel * 2
channelConfig.CompressorConfig.ApproxComprRatio = 1 channelConfig.CompressorConfig.ApproxComprRatio = 1
channelConfig.BatchType = batchType
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, getSpanBatchBuilder(batchType))
require.NoError(t, err) require.NoError(t, err)
// Add a block that overflows the [ChannelOut] // Add a block that overflows the [ChannelOut]
...@@ -498,61 +579,55 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) { ...@@ -498,61 +579,55 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) {
require.ErrorIs(t, err, derive.ErrTooManyRLPBytes) require.ErrorIs(t, err, derive.ErrTooManyRLPBytes)
} }
// TestChannelBuilder_OutputFramesMaxFrameIndex tests the [ChannelBuilder.OutputFrames] // ChannelBuilder_OutputFramesMaxFrameIndex tests the [ChannelBuilder.OutputFrames]
// function errors when the max frame index is reached. // function errors when the max frame index is reached.
func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) { func ChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T, batchType uint) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.MaxFrameSize = 24 channelConfig.MaxFrameSize = 24
channelConfig.CompressorConfig.TargetNumFrames = math.MaxInt channelConfig.CompressorConfig.TargetNumFrames = 6000
channelConfig.CompressorConfig.TargetFrameSize = 24 channelConfig.CompressorConfig.TargetFrameSize = 24
channelConfig.CompressorConfig.ApproxComprRatio = 0 channelConfig.CompressorConfig.ApproxComprRatio = 1
channelConfig.BatchType = batchType
rng := rand.New(rand.NewSource(123))
// Continuously add blocks until the max frame index is reached // Continuously add blocks until the max frame index is reached
// This should cause the [channelBuilder.OutputFrames] function // This should cause the [channelBuilder.OutputFrames] function
// to error // to error
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, getSpanBatchBuilder(batchType))
require.NoError(t, err) require.NoError(t, err)
require.False(t, cb.IsFull()) require.False(t, cb.IsFull())
require.Equal(t, 0, cb.PendingFrames()) require.Equal(t, 0, cb.PendingFrames())
for { for {
lBlock := types.NewBlock(&types.Header{ a, _ := dtest.RandomL2Block(rng, 1)
BaseFee: common.Big0,
Difficulty: common.Big0,
Number: common.Big0,
}, nil, nil, nil, trie.NewStackTrie(nil))
l1InfoTx, _ := derive.L1InfoDeposit(0, eth.BlockToInfo(lBlock), eth.SystemConfig{}, false)
txs := []*types.Transaction{types.NewTx(l1InfoTx)}
a := types.NewBlock(&types.Header{
Number: big.NewInt(0),
}, txs, nil, nil, trie.NewStackTrie(nil))
_, err = cb.AddBlock(a) _, err = cb.AddBlock(a)
require.NoError(t, cb.co.Flush())
if cb.IsFull() { if cb.IsFull() {
fullErr := cb.FullErr() fullErr := cb.FullErr()
require.ErrorIs(t, fullErr, ErrMaxFrameIndex) require.ErrorIs(t, fullErr, derive.CompressorFullErr)
break break
} }
require.NoError(t, err) require.NoError(t, err)
_ = cb.OutputFrames()
// Flushing so we can construct new frames
_ = cb.co.Flush()
} }
_ = cb.OutputFrames()
require.ErrorIs(t, cb.FullErr(), ErrMaxFrameIndex)
} }
// TestChannelBuilder_AddBlock tests the AddBlock function // ChannelBuilder_AddBlock tests the AddBlock function
func TestChannelBuilder_AddBlock(t *testing.T) { func ChannelBuilder_AddBlock(t *testing.T, batchType uint) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.BatchType = batchType
// Lower the max frame size so that we can batch // Lower the max frame size so that we can batch
channelConfig.MaxFrameSize = 30 channelConfig.MaxFrameSize = 20
// Configure the Input Threshold params so we observe a full channel // Configure the Input Threshold params so we observe a full channel
channelConfig.CompressorConfig.TargetFrameSize = 30 channelConfig.CompressorConfig.TargetFrameSize = 20
channelConfig.CompressorConfig.TargetNumFrames = 2 channelConfig.CompressorConfig.TargetNumFrames = 2
channelConfig.CompressorConfig.ApproxComprRatio = 1 channelConfig.CompressorConfig.ApproxComprRatio = 1
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, getSpanBatchBuilder(batchType))
require.NoError(t, err) require.NoError(t, err)
// Add a nonsense block to the channel builder // Add a nonsense block to the channel builder
...@@ -560,7 +635,11 @@ func TestChannelBuilder_AddBlock(t *testing.T) { ...@@ -560,7 +635,11 @@ func TestChannelBuilder_AddBlock(t *testing.T) {
require.NoError(t, cb.co.Flush()) require.NoError(t, cb.co.Flush())
// Check the fields reset in the AddBlock function // Check the fields reset in the AddBlock function
require.Equal(t, 74, cb.co.InputBytes()) expectedInputBytes := 74
if batchType == derive.SpanBatchType {
expectedInputBytes = 47
}
require.Equal(t, expectedInputBytes, cb.co.InputBytes())
require.Equal(t, 1, len(cb.blocks)) require.Equal(t, 1, len(cb.blocks))
require.Equal(t, 0, len(cb.frames)) require.Equal(t, 0, len(cb.frames))
require.True(t, cb.IsFull()) require.True(t, cb.IsFull())
...@@ -570,14 +649,18 @@ func TestChannelBuilder_AddBlock(t *testing.T) { ...@@ -570,14 +649,18 @@ func TestChannelBuilder_AddBlock(t *testing.T) {
require.ErrorIs(t, addMiniBlock(cb), derive.CompressorFullErr) require.ErrorIs(t, addMiniBlock(cb), derive.CompressorFullErr)
} }
// TestChannelBuilder_Reset tests the [Reset] function // ChannelBuilder_Reset tests the [Reset] function
func TestChannelBuilder_Reset(t *testing.T) { func ChannelBuilder_Reset(t *testing.T, batchType uint) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.BatchType = batchType
// Lower the max frame size so that we can batch // Lower the max frame size so that we can batch
channelConfig.MaxFrameSize = 24 channelConfig.MaxFrameSize = 24
channelConfig.CompressorConfig.TargetNumFrames = 1
channelConfig.CompressorConfig.TargetFrameSize = 24
channelConfig.CompressorConfig.ApproxComprRatio = 1
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, getSpanBatchBuilder(batchType))
require.NoError(t, err) require.NoError(t, err)
// Add a nonsense block to the channel builder // Add a nonsense block to the channel builder
...@@ -590,20 +673,16 @@ func TestChannelBuilder_Reset(t *testing.T) { ...@@ -590,20 +673,16 @@ func TestChannelBuilder_Reset(t *testing.T) {
// Timeout should be updated in the AddBlock internal call to `updateSwTimeout` // Timeout should be updated in the AddBlock internal call to `updateSwTimeout`
timeout := uint64(100) + cb.cfg.SeqWindowSize - cb.cfg.SubSafetyMargin timeout := uint64(100) + cb.cfg.SeqWindowSize - cb.cfg.SubSafetyMargin
require.Equal(t, timeout, cb.timeout) require.Equal(t, timeout, cb.timeout)
require.NoError(t, cb.fullErr) require.Error(t, cb.fullErr)
// Output frames so we can set the channel builder frames // Output frames so we can set the channel builder frames
require.NoError(t, cb.OutputFrames()) require.NoError(t, cb.OutputFrames())
// Add another block to increment the block count
require.NoError(t, addMiniBlock(cb))
require.NoError(t, cb.co.Flush())
// Check the fields reset in the Reset function // Check the fields reset in the Reset function
require.Equal(t, 2, len(cb.blocks)) require.Equal(t, 1, len(cb.blocks))
require.Greater(t, len(cb.frames), 1)
require.Equal(t, timeout, cb.timeout) require.Equal(t, timeout, cb.timeout)
require.NoError(t, cb.fullErr) require.Error(t, cb.fullErr)
require.Greater(t, len(cb.frames), 1)
// Reset the channel builder // Reset the channel builder
require.NoError(t, cb.Reset()) require.NoError(t, cb.Reset())
...@@ -622,7 +701,7 @@ func TestBuilderRegisterL1Block(t *testing.T) { ...@@ -622,7 +701,7 @@ func TestBuilderRegisterL1Block(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, nil)
require.NoError(t, err) require.NoError(t, err)
// Assert params modified in RegisterL1Block // Assert params modified in RegisterL1Block
...@@ -645,7 +724,7 @@ func TestBuilderRegisterL1BlockZeroMaxChannelDuration(t *testing.T) { ...@@ -645,7 +724,7 @@ func TestBuilderRegisterL1BlockZeroMaxChannelDuration(t *testing.T) {
channelConfig.MaxChannelDuration = 0 channelConfig.MaxChannelDuration = 0
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, nil)
require.NoError(t, err) require.NoError(t, err)
// Assert params modified in RegisterL1Block // Assert params modified in RegisterL1Block
...@@ -666,7 +745,7 @@ func TestFramePublished(t *testing.T) { ...@@ -666,7 +745,7 @@ func TestFramePublished(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig, nil)
require.NoError(t, err) require.NoError(t, err)
// Let's say the block number is fed in as 100 // Let's say the block number is fed in as 100
...@@ -682,7 +761,7 @@ func TestFramePublished(t *testing.T) { ...@@ -682,7 +761,7 @@ func TestFramePublished(t *testing.T) {
require.Equal(t, uint64(1000), cb.timeout) require.Equal(t, uint64(1000), cb.timeout)
} }
func TestChannelBuilder_PendingFrames_TotalFrames(t *testing.T) { func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) {
const tnf = 8 const tnf = 8
rng := rand.New(rand.NewSource(94572314)) rng := rand.New(rand.NewSource(94572314))
require := require.New(t) require := require.New(t)
...@@ -691,7 +770,8 @@ func TestChannelBuilder_PendingFrames_TotalFrames(t *testing.T) { ...@@ -691,7 +770,8 @@ func TestChannelBuilder_PendingFrames_TotalFrames(t *testing.T) {
cfg.MaxFrameSize = 1000 cfg.MaxFrameSize = 1000
cfg.CompressorConfig.TargetNumFrames = tnf cfg.CompressorConfig.TargetNumFrames = tnf
cfg.CompressorConfig.Kind = "shadow" cfg.CompressorConfig.Kind = "shadow"
cb, err := newChannelBuilder(cfg) cfg.BatchType = batchType
cb, err := newChannelBuilder(cfg, getSpanBatchBuilder(batchType))
require.NoError(err) require.NoError(err)
// initial builder should be empty // initial builder should be empty
...@@ -725,25 +805,40 @@ func TestChannelBuilder_PendingFrames_TotalFrames(t *testing.T) { ...@@ -725,25 +805,40 @@ func TestChannelBuilder_PendingFrames_TotalFrames(t *testing.T) {
} }
} }
func TestChannelBuilder_InputBytes(t *testing.T) { func ChannelBuilder_InputBytes(t *testing.T, batchType uint) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(4982432)) rng := rand.New(rand.NewSource(4982432))
cb, _ := defaultChannelBuilderSetup(t) cfg := defaultTestChannelConfig
cfg.BatchType = batchType
spanBatchBuilder := getSpanBatchBuilder(batchType)
cb, err := newChannelBuilder(cfg, getSpanBatchBuilder(batchType))
require.NoError(err)
require.Zero(cb.InputBytes()) require.Zero(cb.InputBytes())
var l int var l int
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
block := newMiniL2Block(rng.Intn(32)) block := newMiniL2Block(rng.Intn(32))
if batchType == derive.SingularBatchType {
l += blockBatchRlpSize(t, block) l += blockBatchRlpSize(t, block)
} else {
singularBatch, _, err := derive.BlockToSingularBatch(block)
require.NoError(err)
spanBatchBuilder.AppendSingularBatch(singularBatch)
rawSpanBatch, err := spanBatchBuilder.GetRawSpanBatch()
require.NoError(err)
batch := derive.NewSpanBatchData(*rawSpanBatch)
var buf bytes.Buffer
require.NoError(batch.EncodeRLP(&buf))
l = buf.Len()
}
_, err := cb.AddBlock(block) _, err := cb.AddBlock(block)
require.NoError(err) require.NoError(err)
require.Equal(cb.InputBytes(), l) require.Equal(cb.InputBytes(), l)
} }
} }
func TestChannelBuilder_OutputBytes(t *testing.T) { func ChannelBuilder_OutputBytes(t *testing.T, batchType uint) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(9860372)) rng := rand.New(rand.NewSource(9860372))
cfg := defaultTestChannelConfig cfg := defaultTestChannelConfig
...@@ -751,7 +846,8 @@ func TestChannelBuilder_OutputBytes(t *testing.T) { ...@@ -751,7 +846,8 @@ func TestChannelBuilder_OutputBytes(t *testing.T) {
cfg.MaxFrameSize = 1000 cfg.MaxFrameSize = 1000
cfg.CompressorConfig.TargetNumFrames = 16 cfg.CompressorConfig.TargetNumFrames = 16
cfg.CompressorConfig.ApproxComprRatio = 1.0 cfg.CompressorConfig.ApproxComprRatio = 1.0
cb, err := newChannelBuilder(cfg) cfg.BatchType = batchType
cb, err := newChannelBuilder(cfg, getSpanBatchBuilder(batchType))
require.NoError(err, "newChannelBuilder") require.NoError(err, "newChannelBuilder")
require.Zero(cb.OutputBytes()) require.Zero(cb.OutputBytes())
...@@ -778,17 +874,10 @@ func TestChannelBuilder_OutputBytes(t *testing.T) { ...@@ -778,17 +874,10 @@ func TestChannelBuilder_OutputBytes(t *testing.T) {
require.Equal(cb.OutputBytes(), flen) require.Equal(cb.OutputBytes(), flen)
} }
func defaultChannelBuilderSetup(t *testing.T) (*channelBuilder, ChannelConfig) {
t.Helper()
cfg := defaultTestChannelConfig
cb, err := newChannelBuilder(cfg)
require.NoError(t, err, "newChannelBuilder")
return cb, cfg
}
func blockBatchRlpSize(t *testing.T, b *types.Block) int { func blockBatchRlpSize(t *testing.T, b *types.Block) int {
t.Helper() t.Helper()
batch, _, err := derive.BlockToBatch(b) singularBatch, _, err := derive.BlockToSingularBatch(b)
batch := derive.NewSingularBatchData(*singularBatch)
require.NoError(t, err) require.NoError(t, err)
var buf bytes.Buffer var buf bytes.Buffer
require.NoError(t, batch.EncodeRLP(&buf), "RLP-encoding batch") require.NoError(t, batch.EncodeRLP(&buf), "RLP-encoding batch")
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"sync" "sync"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -28,12 +29,16 @@ type channelManager struct { ...@@ -28,12 +29,16 @@ type channelManager struct {
log log.Logger log log.Logger
metr metrics.Metricer metr metrics.Metricer
cfg ChannelConfig cfg ChannelConfig
rcfg *rollup.Config
// All blocks since the last request for new tx data. // All blocks since the last request for new tx data.
blocks []*types.Block blocks []*types.Block
// last block hash - for reorg detection // last block hash - for reorg detection
tip common.Hash tip common.Hash
// last block added to channel. nil at first.
lastProcessedBlock *eth.L2BlockRef
// channel to write new block data to // channel to write new block data to
currentChannel *channel currentChannel *channel
// channels to read frame data from, for writing batches onchain // channels to read frame data from, for writing batches onchain
...@@ -45,18 +50,21 @@ type channelManager struct { ...@@ -45,18 +50,21 @@ type channelManager struct {
closed bool closed bool
} }
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager { func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rcfg *rollup.Config) *channelManager {
return &channelManager{ return &channelManager{
log: log, log: log,
metr: metr, metr: metr,
cfg: cfg, cfg: cfg,
rcfg: rcfg,
txChannels: make(map[txID]*channel), txChannels: make(map[txID]*channel),
lastProcessedBlock: nil,
} }
} }
// Clear clears the entire state of the channel manager. // Clear clears the entire state of the channel manager.
// It is intended to be used after an L2 reorg. // It is intended to be used before launching op-batcher and after an L2 reorg.
func (s *channelManager) Clear() { // Must set lastProcessedBlock as current L2 safe head fetched from L2 node.
func (s *channelManager) Clear(safeHead *eth.L2BlockRef) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.log.Trace("clearing channel manager state") s.log.Trace("clearing channel manager state")
...@@ -66,6 +74,7 @@ func (s *channelManager) Clear() { ...@@ -66,6 +74,7 @@ func (s *channelManager) Clear() {
s.currentChannel = nil s.currentChannel = nil
s.channelQueue = nil s.channelQueue = nil
s.txChannels = make(map[txID]*channel) s.txChannels = make(map[txID]*channel)
s.lastProcessedBlock = safeHead
} }
// TxFailed records a transaction as failed. It will attempt to resubmit the data // TxFailed records a transaction as failed. It will attempt to resubmit the data
...@@ -195,7 +204,19 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { ...@@ -195,7 +204,19 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil return nil
} }
pc, err := newChannel(s.log, s.metr, s.cfg) var spanBatchBuilder *derive.SpanBatchBuilder
if s.cfg.BatchType == derive.SpanBatchType {
if s.lastProcessedBlock == nil {
// TODO: we can remove "lastProcessedBlock" if we change the data-builder
// to append a singular-batch *with* the L2 metadata such as the L1-block-info seq-number;
// this helps determine whether or not the L1 origin changed in the first block of the span,
// without having to remember the last block from before the span.
return errors.New("last block is not initialized")
}
// Pass the current lastProcessedBlock as the parent
spanBatchBuilder = derive.NewSpanBatchBuilder(s.lastProcessedBlock.L1Origin.Number, s.rcfg.Genesis.L2Time, s.rcfg.L2ChainID)
}
pc, err := newChannel(s.log, s.metr, s.cfg, spanBatchBuilder)
if err != nil { if err != nil {
return fmt.Errorf("creating new channel: %w", err) return fmt.Errorf("creating new channel: %w", err)
} }
...@@ -241,6 +262,7 @@ func (s *channelManager) processBlocks() error { ...@@ -241,6 +262,7 @@ func (s *channelManager) processBlocks() error {
blocksAdded += 1 blocksAdded += 1
latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info) latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info)
s.metr.RecordL2BlockInChannel(block) s.metr.RecordL2BlockInChannel(block)
s.lastProcessedBlock = &latestL2ref
// current block got added but channel is now full // current block got added but channel is now full
if s.currentChannel.IsFull() { if s.currentChannel.IsFull() {
break break
......
...@@ -9,21 +9,53 @@ import ( ...@@ -9,21 +9,53 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
derivetest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test" derivetest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// TestChannelManagerReturnsErrReorg ensures that the channel manager func TestChannelManagerBatchType(t *testing.T) {
tests := []struct {
name string
f func(t *testing.T, batchType uint)
}{
{"ChannelManagerReturnsErrReorg", ChannelManagerReturnsErrReorg},
{"ChannelManagerReturnsErrReorgWhenDrained", ChannelManagerReturnsErrReorgWhenDrained},
{"ChannelManager_Clear", ChannelManager_Clear},
{"ChannelManager_TxResend", ChannelManager_TxResend},
{"ChannelManagerCloseBeforeFirstUse", ChannelManagerCloseBeforeFirstUse},
{"ChannelManagerCloseNoPendingChannel", ChannelManagerCloseNoPendingChannel},
{"ChannelManagerClosePendingChannel", ChannelManagerClosePendingChannel},
{"ChannelManagerCloseAllTxsFailed", ChannelManagerCloseAllTxsFailed},
}
for _, test := range tests {
test := test
t.Run(test.name+"_SingularBatch", func(t *testing.T) {
test.f(t, derive.SingularBatchType)
})
}
for _, test := range tests {
test := test
t.Run(test.name+"_SpanBatch", func(t *testing.T) {
test.f(t, derive.SpanBatchType)
})
}
}
// ChannelManagerReturnsErrReorg ensures that the channel manager
// detects a reorg when it has cached L1 blocks. // detects a reorg when it has cached L1 blocks.
func TestChannelManagerReturnsErrReorg(t *testing.T) { func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{BatchType: batchType}, &rollup.Config{})
m.Clear(&eth.L2BlockRef{})
a := types.NewBlock(&types.Header{ a := types.NewBlock(&types.Header{
Number: big.NewInt(0), Number: big.NewInt(0),
...@@ -49,9 +81,9 @@ func TestChannelManagerReturnsErrReorg(t *testing.T) { ...@@ -49,9 +81,9 @@ func TestChannelManagerReturnsErrReorg(t *testing.T) {
require.Equal(t, []*types.Block{a, b, c}, m.blocks) require.Equal(t, []*types.Block{a, b, c}, m.blocks)
} }
// TestChannelManagerReturnsErrReorgWhenDrained ensures that the channel manager // ChannelManagerReturnsErrReorgWhenDrained ensures that the channel manager
// detects a reorg even if it does not have any blocks inside it. // detects a reorg even if it does not have any blocks inside it.
func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
...@@ -61,7 +93,11 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { ...@@ -61,7 +93,11 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
TargetNumFrames: 1, TargetNumFrames: 1,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
}, },
}) BatchType: batchType,
},
&rollup.Config{},
)
m.Clear(&eth.L2BlockRef{})
a := newMiniL2Block(0) a := newMiniL2Block(0)
x := newMiniL2BlockWithNumberParent(0, big.NewInt(1), common.Hash{0xff}) x := newMiniL2BlockWithNumberParent(0, big.NewInt(1), common.Hash{0xff})
...@@ -76,8 +112,8 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { ...@@ -76,8 +112,8 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
require.ErrorIs(t, m.AddL2Block(x), ErrReorg) require.ErrorIs(t, m.AddL2Block(x), ErrReorg)
} }
// TestChannelManager_Clear tests clearing the channel manager. // ChannelManager_Clear tests clearing the channel manager.
func TestChannelManager_Clear(t *testing.T) { func ChannelManager_Clear(t *testing.T, batchType uint) {
require := require.New(t) require := require.New(t)
// Create a channel manager // Create a channel manager
...@@ -96,7 +132,10 @@ func TestChannelManager_Clear(t *testing.T) { ...@@ -96,7 +132,10 @@ func TestChannelManager_Clear(t *testing.T) {
TargetNumFrames: 1, TargetNumFrames: 1,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
}, },
}) BatchType: batchType,
},
&rollup.Config{},
)
// Channel Manager state should be empty by default // Channel Manager state should be empty by default
require.Empty(m.blocks) require.Empty(m.blocks)
...@@ -104,6 +143,9 @@ func TestChannelManager_Clear(t *testing.T) { ...@@ -104,6 +143,9 @@ func TestChannelManager_Clear(t *testing.T) {
require.Nil(m.currentChannel) require.Nil(m.currentChannel)
require.Empty(m.channelQueue) require.Empty(m.channelQueue)
require.Empty(m.txChannels) require.Empty(m.txChannels)
require.Nil(m.lastProcessedBlock)
// Set the last block
m.Clear(&eth.L2BlockRef{})
// Add a block to the channel manager // Add a block to the channel manager
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -143,7 +185,8 @@ func TestChannelManager_Clear(t *testing.T) { ...@@ -143,7 +185,8 @@ func TestChannelManager_Clear(t *testing.T) {
require.Equal(b.Hash(), m.tip) require.Equal(b.Hash(), m.tip)
// Clear the channel manager // Clear the channel manager
m.Clear() safeHead := testutils.RandomL2BlockRef(rng)
m.Clear(&safeHead)
// Check that the entire channel manager state cleared // Check that the entire channel manager state cleared
require.Empty(m.blocks) require.Empty(m.blocks)
...@@ -151,9 +194,10 @@ func TestChannelManager_Clear(t *testing.T) { ...@@ -151,9 +194,10 @@ func TestChannelManager_Clear(t *testing.T) {
require.Nil(m.currentChannel) require.Nil(m.currentChannel)
require.Empty(m.channelQueue) require.Empty(m.channelQueue)
require.Empty(m.txChannels) require.Empty(m.txChannels)
require.Equal(m.lastProcessedBlock, &safeHead)
} }
func TestChannelManager_TxResend(t *testing.T) { func ChannelManager_TxResend(t *testing.T, batchType uint) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlError) log := testlog.Logger(t, log.LvlError)
...@@ -165,7 +209,11 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -165,7 +209,11 @@ func TestChannelManager_TxResend(t *testing.T) {
TargetNumFrames: 1, TargetNumFrames: 1,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
}, },
}) BatchType: batchType,
},
&rollup.Config{},
)
m.Clear(&eth.L2BlockRef{})
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -195,9 +243,9 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -195,9 +243,9 @@ func TestChannelManager_TxResend(t *testing.T) {
require.Len(fs, 1) require.Len(fs, 1)
} }
// TestChannelManagerCloseBeforeFirstUse ensures that the channel manager // ChannelManagerCloseBeforeFirstUse ensures that the channel manager
// will not produce any frames if closed immediately. // will not produce any frames if closed immediately.
func TestChannelManagerCloseBeforeFirstUse(t *testing.T) { func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
...@@ -209,7 +257,11 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) { ...@@ -209,7 +257,11 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
TargetFrameSize: 0, TargetFrameSize: 0,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
}, },
}) BatchType: batchType,
},
&rollup.Config{},
)
m.Clear(&eth.L2BlockRef{})
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -222,10 +274,10 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) { ...@@ -222,10 +274,10 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to contain no tx data") require.ErrorIs(err, io.EOF, "Expected closed channel manager to contain no tx data")
} }
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager // ChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with no pending channels, and will not emit any new // can gracefully close with no pending channels, and will not emit any new
// channel frames. // channel frames.
func TestChannelManagerCloseNoPendingChannel(t *testing.T) { func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) {
require := require.New(t) require := require.New(t)
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
...@@ -237,7 +289,11 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) { ...@@ -237,7 +289,11 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
TargetNumFrames: 1, TargetNumFrames: 1,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
}, },
}) BatchType: batchType,
},
&rollup.Config{},
)
m.Clear(&eth.L2BlockRef{})
a := newMiniL2Block(0) a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash()) b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
...@@ -261,10 +317,10 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) { ...@@ -261,10 +317,10 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to return no new tx data") require.ErrorIs(err, io.EOF, "Expected closed channel manager to return no new tx data")
} }
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager // ChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with a pending channel, and will not produce any // can gracefully close with a pending channel, and will not produce any
// new channel frames after this point. // new channel frames after this point.
func TestChannelManagerClosePendingChannel(t *testing.T) { func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
require := require.New(t) require := require.New(t)
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
...@@ -272,13 +328,23 @@ func TestChannelManagerClosePendingChannel(t *testing.T) { ...@@ -272,13 +328,23 @@ func TestChannelManagerClosePendingChannel(t *testing.T) {
MaxFrameSize: 1000, MaxFrameSize: 1000,
ChannelTimeout: 1000, ChannelTimeout: 1000,
CompressorConfig: compressor.Config{ CompressorConfig: compressor.Config{
TargetNumFrames: 100, TargetNumFrames: 1,
TargetFrameSize: 1000, TargetFrameSize: 1000,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
}, },
}) BatchType: batchType,
},
a := newMiniL2Block(50_000) &rollup.Config{},
)
m.Clear(&eth.L2BlockRef{})
numTx := 50000
if batchType == derive.SpanBatchType {
// Adjust number of txs to make 2 frames
// Encoding empty txs as span batch requires more data size because span batch encodes tx signature to fixed length
numTx = 20000
}
a := newMiniL2Block(numTx)
b := newMiniL2BlockWithNumberParent(10, big.NewInt(1), a.Hash()) b := newMiniL2BlockWithNumberParent(10, big.NewInt(1), a.Hash())
err := m.AddL2Block(a) err := m.AddL2Block(a)
...@@ -306,10 +372,10 @@ func TestChannelManagerClosePendingChannel(t *testing.T) { ...@@ -306,10 +372,10 @@ func TestChannelManagerClosePendingChannel(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data") require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
} }
// TestChannelManagerCloseAllTxsFailed ensures that the channel manager // ChannelManagerCloseAllTxsFailed ensures that the channel manager
// can gracefully close after producing transaction frames if none of these // can gracefully close after producing transaction frames if none of these
// have successfully landed on chain. // have successfully landed on chain.
func TestChannelManagerCloseAllTxsFailed(t *testing.T) { func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
require := require.New(t) require := require.New(t)
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
...@@ -321,7 +387,10 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) { ...@@ -321,7 +387,10 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
TargetFrameSize: 1000, TargetFrameSize: 1000,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
}, },
}) BatchType: batchType,
}, &rollup.Config{},
)
m.Clear(&eth.L2BlockRef{})
a := newMiniL2Block(50_000) a := newMiniL2Block(50_000)
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
...@@ -20,7 +21,8 @@ func TestChannelTimeout(t *testing.T) { ...@@ -20,7 +21,8 @@ func TestChannelTimeout(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
ChannelTimeout: 100, ChannelTimeout: 100,
}) }, &rollup.Config{})
m.Clear(&eth.L2BlockRef{})
// Pending channel is nil so is cannot be timed out // Pending channel is nil so is cannot be timed out
require.Nil(t, m.currentChannel) require.Nil(t, m.currentChannel)
...@@ -61,7 +63,8 @@ func TestChannelTimeout(t *testing.T) { ...@@ -61,7 +63,8 @@ func TestChannelTimeout(t *testing.T) {
// TestChannelNextTxData checks the nextTxData function. // TestChannelNextTxData checks the nextTxData function.
func TestChannelNextTxData(t *testing.T) { func TestChannelNextTxData(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear(&eth.L2BlockRef{})
// Nil pending channel should return EOF // Nil pending channel should return EOF
returnedTxData, err := m.nextTxData(nil) returnedTxData, err := m.nextTxData(nil)
...@@ -109,7 +112,8 @@ func TestChannelTxConfirmed(t *testing.T) { ...@@ -109,7 +112,8 @@ func TestChannelTxConfirmed(t *testing.T) {
// channels on confirmation. This would result in [TxConfirmed] // channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and reseting the pendingChannels map // clearing confirmed transactions, and reseting the pendingChannels map
ChannelTimeout: 10, ChannelTimeout: 10,
}) }, &rollup.Config{})
m.Clear(&eth.L2BlockRef{})
// Let's add a valid pending transaction to the channel manager // Let's add a valid pending transaction to the channel manager
// So we can demonstrate that TxConfirmed's correctness // So we can demonstrate that TxConfirmed's correctness
...@@ -157,7 +161,8 @@ func TestChannelTxConfirmed(t *testing.T) { ...@@ -157,7 +161,8 @@ func TestChannelTxConfirmed(t *testing.T) {
func TestChannelTxFailed(t *testing.T) { func TestChannelTxFailed(t *testing.T) {
// Create a channel manager // Create a channel manager
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear(&eth.L2BlockRef{})
// Let's add a valid pending transaction to the channel // Let's add a valid pending transaction to the channel
// manager so we can demonstrate correctness // manager so we can demonstrate correctness
......
...@@ -52,6 +52,8 @@ type CLIConfig struct { ...@@ -52,6 +52,8 @@ type CLIConfig struct {
Stopped bool Stopped bool
BatchType uint
TxMgrConfig txmgr.CLIConfig TxMgrConfig txmgr.CLIConfig
LogConfig oplog.CLIConfig LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig MetricsConfig opmetrics.CLIConfig
...@@ -93,6 +95,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig { ...@@ -93,6 +95,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name), MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name), MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name),
Stopped: ctx.Bool(flags.StoppedFlag.Name), Stopped: ctx.Bool(flags.StoppedFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx), TxMgrConfig: txmgr.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx), LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx), MetricsConfig: opmetrics.ReadCLIConfig(ctx),
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"io" "io"
"math/big" "math/big"
_ "net/http/pprof" _ "net/http/pprof"
...@@ -16,7 +17,6 @@ import ( ...@@ -16,7 +17,6 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
) )
...@@ -74,7 +74,7 @@ type BatchSubmitter struct { ...@@ -74,7 +74,7 @@ type BatchSubmitter struct {
func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter { func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter {
return &BatchSubmitter{ return &BatchSubmitter{
DriverSetup: setup, DriverSetup: setup,
state: NewChannelManager(setup.Log, setup.Metr, setup.Channel), state: NewChannelManager(setup.Log, setup.Metr, setup.Channel, setup.RollupCfg),
} }
} }
...@@ -91,7 +91,11 @@ func (l *BatchSubmitter) StartBatchSubmitting() error { ...@@ -91,7 +91,11 @@ func (l *BatchSubmitter) StartBatchSubmitting() error {
l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background()) l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background())
l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background()) l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background())
l.state.Clear() syncStatus, err := fetchSyncStatus(l.shutdownCtx, l.RollupClient, l.Cfg.NetworkTimeout)
if err != nil {
return err
}
l.state.Clear(&syncStatus.SafeL2)
l.lastStoredBlock = eth.BlockID{} l.lastStoredBlock = eth.BlockID{}
l.wg.Add(1) l.wg.Add(1)
...@@ -201,15 +205,9 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin ...@@ -201,15 +205,9 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state. // calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions) // It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions)
func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) { func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) {
ctx, cancel := context.WithTimeout(ctx, l.Cfg.NetworkTimeout) syncStatus, err := fetchSyncStatus(ctx, l.RollupClient, l.Cfg.NetworkTimeout)
defer cancel()
syncStatus, err := l.RollupClient.SyncStatus(ctx)
// Ensure that we have the sync status
if err != nil { if err != nil {
return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("failed to get sync status: %w", err) return eth.BlockID{}, eth.BlockID{}, err
}
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
return eth.BlockID{}, eth.BlockID{}, errors.New("empty sync status")
} }
// Check last stored to see if it needs to be set on startup OR set if is lagged behind. // Check last stored to see if it needs to be set on startup OR set if is lagged behind.
...@@ -259,7 +257,12 @@ func (l *BatchSubmitter) loop() { ...@@ -259,7 +257,12 @@ func (l *BatchSubmitter) loop() {
l.Log.Error("error closing the channel manager to handle a L2 reorg", "err", err) l.Log.Error("error closing the channel manager to handle a L2 reorg", "err", err)
} }
l.publishStateToL1(queue, receiptsCh, true) l.publishStateToL1(queue, receiptsCh, true)
l.state.Clear() if syncStatus, err := fetchSyncStatus(l.shutdownCtx, l.RollupClient, l.Cfg.NetworkTimeout); err == nil {
l.state.Clear(&syncStatus.SafeL2)
} else {
// if fetchSyncStatus failed, ErrReorg will be returned again
l.Log.Error("error fetching sync status from L2 node", "err", err)
}
continue continue
} }
l.publishStateToL1(queue, receiptsCh, false) l.publishStateToL1(queue, receiptsCh, false)
...@@ -395,3 +398,17 @@ func (l *BatchSubmitter) l1Tip(ctx context.Context) (eth.L1BlockRef, error) { ...@@ -395,3 +398,17 @@ func (l *BatchSubmitter) l1Tip(ctx context.Context) (eth.L1BlockRef, error) {
} }
return eth.InfoToL1BlockRef(eth.HeaderBlockInfo(head)), nil return eth.InfoToL1BlockRef(eth.HeaderBlockInfo(head)), nil
} }
func fetchSyncStatus(ctx context.Context, rollupNode RollupClient, timeout time.Duration) (*eth.SyncStatus, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
syncStatus, err := rollupNode.SyncStatus(ctx)
// Ensure that we have the sync status
if err != nil {
return &eth.SyncStatus{}, fmt.Errorf("failed to get sync status: %w", err)
}
if syncStatus.SafeL2 == (eth.L2BlockRef{}) {
return &eth.SyncStatus{}, errors.New("empty sync status")
}
return syncStatus, nil
}
...@@ -173,6 +173,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { ...@@ -173,6 +173,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
SubSafetyMargin: cfg.SubSafetyMargin, SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
CompressorConfig: cfg.CompressorConfig.Config(), CompressorConfig: cfg.CompressorConfig.Config(),
BatchType: cfg.BatchType,
} }
if err := bs.Channel.Check(); err != nil { if err := bs.Channel.Check(); err != nil {
return fmt.Errorf("invalid channel configuration: %w", err) return fmt.Errorf("invalid channel configuration: %w", err)
......
...@@ -76,6 +76,12 @@ var ( ...@@ -76,6 +76,12 @@ var (
Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC", Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC",
EnvVars: prefixEnvVars("STOPPED"), EnvVars: prefixEnvVars("STOPPED"),
} }
BatchTypeFlag = &cli.UintFlag{
Name: "batch-type",
Usage: "The batch type. 0 for SingularBatch and 1 for SpanBatch.",
Value: 0,
EnvVars: prefixEnvVars("BATCH_TYPE"),
}
// Legacy Flags // Legacy Flags
SequencerHDPathFlag = txmgr.SequencerHDPathFlag SequencerHDPathFlag = txmgr.SequencerHDPathFlag
) )
...@@ -94,6 +100,7 @@ var optionalFlags = []cli.Flag{ ...@@ -94,6 +100,7 @@ var optionalFlags = []cli.Flag{
MaxL1TxSizeBytesFlag, MaxL1TxSizeBytesFlag,
StoppedFlag, StoppedFlag,
SequencerHDPathFlag, SequencerHDPathFlag,
BatchTypeFlag,
} }
func init() { func init() {
......
...@@ -140,7 +140,7 @@ func (s *L2Batcher) Buffer(t Testing) error { ...@@ -140,7 +140,7 @@ func (s *L2Batcher) Buffer(t Testing) error {
ApproxComprRatio: 1, ApproxComprRatio: 1,
}) })
require.NoError(t, e, "failed to create compressor") require.NoError(t, e, "failed to create compressor")
ch, err = derive.NewChannelOut(c) ch, err = derive.NewChannelOut(c, derive.SingularBatchType, nil)
} }
require.NoError(t, err, "failed to create channel") require.NoError(t, err, "failed to create channel")
s.l2ChannelOut = ch s.l2ChannelOut = ch
......
...@@ -49,6 +49,7 @@ import ( ...@@ -49,6 +49,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/p2p/store" "github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
proposermetrics "github.com/ethereum-optimism/optimism/op-proposer/metrics" proposermetrics "github.com/ethereum-optimism/optimism/op-proposer/metrics"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer" l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
...@@ -679,6 +680,10 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -679,6 +680,10 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
return nil, fmt.Errorf("unable to start l2 output submitter: %w", err) return nil, fmt.Errorf("unable to start l2 output submitter: %w", err)
} }
batchType := derive.SingularBatchType
if os.Getenv("OP_E2E_USE_SPAN_BATCH") == "true" {
batchType = derive.SpanBatchType
}
batcherCLIConfig := &bss.CLIConfig{ batcherCLIConfig := &bss.CLIConfig{
L1EthRpc: sys.EthInstances["l1"].WSEndpoint(), L1EthRpc: sys.EthInstances["l1"].WSEndpoint(),
L2EthRpc: sys.EthInstances["sequencer"].WSEndpoint(), L2EthRpc: sys.EthInstances["sequencer"].WSEndpoint(),
...@@ -699,6 +704,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -699,6 +704,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
Format: oplog.FormatText, Format: oplog.FormatText,
}, },
Stopped: sys.cfg.DisableBatcher, // Batch submitter may be enabled later Stopped: sys.cfg.DisableBatcher, // Batch submitter may be enabled later
BatchType: uint(batchType),
} }
// Batch Submitter // Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.cfg.Loggers["batcher"]) batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.cfg.Loggers["batcher"])
......
...@@ -48,6 +48,13 @@ type Compressor interface { ...@@ -48,6 +48,13 @@ type Compressor interface {
FullErr() error FullErr() error
} }
type ChannelOutReader interface {
io.Writer
io.Reader
Reset()
Len() int
}
type ChannelOut struct { type ChannelOut struct {
id ChannelID id ChannelID
// Frame ID of the next frame to emit. Increment after emitting // Frame ID of the next frame to emit. Increment after emitting
...@@ -57,20 +64,35 @@ type ChannelOut struct { ...@@ -57,20 +64,35 @@ type ChannelOut struct {
// Compressor stage. Write input data to it // Compressor stage. Write input data to it
compress Compressor compress Compressor
// closed indicates if the channel is closed
closed bool closed bool
// batchType indicates whether this channel uses SingularBatch or SpanBatch
batchType uint
// spanBatchBuilder contains information requires to build SpanBatch
spanBatchBuilder *SpanBatchBuilder
// reader contains compressed data for making output frames
reader ChannelOutReader
} }
func (co *ChannelOut) ID() ChannelID { func (co *ChannelOut) ID() ChannelID {
return co.id return co.id
} }
func NewChannelOut(compress Compressor) (*ChannelOut, error) { func NewChannelOut(compress Compressor, batchType uint, spanBatchBuilder *SpanBatchBuilder) (*ChannelOut, error) {
// If the channel uses SingularBatch, use compressor directly as its reader
var reader ChannelOutReader = compress
if batchType == SpanBatchType {
// If the channel uses SpanBatch, create empty buffer for reader
reader = &bytes.Buffer{}
}
c := &ChannelOut{ c := &ChannelOut{
id: ChannelID{}, // TODO: use GUID here instead of fully random data id: ChannelID{}, // TODO: use GUID here instead of fully random data
frame: 0, frame: 0,
rlpLength: 0, rlpLength: 0,
compress: compress, compress: compress,
batchType: batchType,
spanBatchBuilder: spanBatchBuilder,
reader: reader,
} }
_, err := rand.Read(c.id[:]) _, err := rand.Read(c.id[:])
if err != nil { if err != nil {
...@@ -85,7 +107,11 @@ func (co *ChannelOut) Reset() error { ...@@ -85,7 +107,11 @@ func (co *ChannelOut) Reset() error {
co.frame = 0 co.frame = 0
co.rlpLength = 0 co.rlpLength = 0
co.compress.Reset() co.compress.Reset()
co.reader.Reset()
co.closed = false co.closed = false
if co.spanBatchBuilder != nil {
co.spanBatchBuilder.Reset()
}
_, err := rand.Read(co.id[:]) _, err := rand.Read(co.id[:])
return err return err
} }
...@@ -99,30 +125,41 @@ func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) { ...@@ -99,30 +125,41 @@ func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) {
return 0, errors.New("already closed") return 0, errors.New("already closed")
} }
batch, _, err := BlockToBatch(block) batch, _, err := BlockToSingularBatch(block)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return co.AddBatch(batch) return co.AddSingularBatch(batch)
} }
// AddBatch adds a batch to the channel. It returns the RLP encoded byte size // AddSingularBatch adds a batch to the channel. It returns the RLP encoded byte size
// and an error if there is a problem adding the batch. The only sentinel error // and an error if there is a problem adding the batch. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel // that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made. // should be closed and a new one should be made.
// //
// AddBatch should be used together with BlockToBatch if you need to access the // AddSingularBatch should be used together with BlockToSingularBatch if you need to access the
// BatchData before adding a block to the channel. It isn't possible to access // BatchData before adding a block to the channel. It isn't possible to access
// the batch data with AddBlock. // the batch data with AddBlock.
func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) { func (co *ChannelOut) AddSingularBatch(batch *SingularBatch) (uint64, error) {
if co.closed { if co.closed {
return 0, errors.New("already closed") return 0, errors.New("already closed")
} }
switch co.batchType {
case SingularBatchType:
return co.writeSingularBatch(batch)
case SpanBatchType:
return co.writeSpanBatch(batch)
default:
return 0, fmt.Errorf("unrecognized batch type: %d", co.batchType)
}
}
func (co *ChannelOut) writeSingularBatch(batch *SingularBatch) (uint64, error) {
var buf bytes.Buffer
// We encode to a temporary buffer to determine the encoded length to // We encode to a temporary buffer to determine the encoded length to
// ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL // ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
var buf bytes.Buffer if err := rlp.Encode(&buf, NewSingularBatchData(*batch)); err != nil {
if err := rlp.Encode(&buf, batch); err != nil {
return 0, err return 0, err
} }
if co.rlpLength+buf.Len() > MaxRLPBytesPerChannel { if co.rlpLength+buf.Len() > MaxRLPBytesPerChannel {
...@@ -136,6 +173,70 @@ func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) { ...@@ -136,6 +173,70 @@ func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) {
return uint64(written), err return uint64(written), err
} }
// writeSpanBatch appends a SingularBatch to the channel's SpanBatch.
// A channel can have only one SpanBatch. And compressed results should not be accessible until the channel is closed, since the prefix and payload can be changed.
// So it resets channel contents and rewrites the entire SpanBatch each time, and compressed results are copied to reader after the channel is closed.
// It makes we can only get frames once the channel is full or closed, in the case of SpanBatch.
func (co *ChannelOut) writeSpanBatch(batch *SingularBatch) (uint64, error) {
if co.FullErr() != nil {
// channel is already full
return 0, co.FullErr()
}
var buf bytes.Buffer
// Append Singular batch to its span batch builder
co.spanBatchBuilder.AppendSingularBatch(batch)
// Convert Span batch to RawSpanBatch
rawSpanBatch, err := co.spanBatchBuilder.GetRawSpanBatch()
if err != nil {
return 0, fmt.Errorf("failed to convert SpanBatch into RawSpanBatch: %w", err)
}
// Encode RawSpanBatch into bytes
if err = rlp.Encode(&buf, NewSpanBatchData(*rawSpanBatch)); err != nil {
return 0, fmt.Errorf("failed to encode RawSpanBatch into bytes: %w", err)
}
co.rlpLength = 0
// Ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
if co.rlpLength+buf.Len() > MaxRLPBytesPerChannel {
return 0, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
buf.Len(), co.rlpLength, MaxRLPBytesPerChannel, ErrTooManyRLPBytes)
}
co.rlpLength = buf.Len()
if co.spanBatchBuilder.GetBlockCount() > 1 {
// Flush compressed data into reader to preserve current result.
// If the channel is full after this block is appended, we should use preserved data.
if err := co.compress.Flush(); err != nil {
return 0, fmt.Errorf("failed to flush compressor: %w", err)
}
_, err = io.Copy(co.reader, co.compress)
if err != nil {
// Must reset reader to avoid partial output
co.reader.Reset()
return 0, fmt.Errorf("failed to copy compressed data to reader: %w", err)
}
}
// Reset compressor to rewrite the entire span batch
co.compress.Reset()
// Avoid using io.Copy here, because we need all or nothing
written, err := co.compress.Write(buf.Bytes())
if co.compress.FullErr() != nil {
err = co.compress.FullErr()
if co.spanBatchBuilder.GetBlockCount() == 1 {
// Do not return CompressorFullErr for the first block in the batch
// In this case, reader must be empty. then the contents of compressor will be copied to reader when the channel is closed.
err = nil
}
// If there are more than one blocks in the channel, reader should have data that preserves previous compression result before adding this block.
// So, as a result, this block is not added to the channel and the channel will be closed.
return uint64(written), err
}
// If compressor is not full yet, reader must be reset to avoid submitting invalid frames
co.reader.Reset()
return uint64(written), err
}
// InputBytes returns the total amount of RLP-encoded input bytes. // InputBytes returns the total amount of RLP-encoded input bytes.
func (co *ChannelOut) InputBytes() int { func (co *ChannelOut) InputBytes() int {
return co.rlpLength return co.rlpLength
...@@ -145,13 +246,24 @@ func (co *ChannelOut) InputBytes() int { ...@@ -145,13 +246,24 @@ func (co *ChannelOut) InputBytes() int {
// Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes // Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes
// are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage. // are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage.
func (co *ChannelOut) ReadyBytes() int { func (co *ChannelOut) ReadyBytes() int {
return co.compress.Len() return co.reader.Len()
} }
// Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more // Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more
// complete frame. It reduces the compression efficiency. // complete frame. It reduces the compression efficiency.
func (co *ChannelOut) Flush() error { func (co *ChannelOut) Flush() error {
return co.compress.Flush() if err := co.compress.Flush(); err != nil {
return err
}
if co.batchType == SpanBatchType && co.closed && co.ReadyBytes() == 0 && co.compress.Len() > 0 {
_, err := io.Copy(co.reader, co.compress)
if err != nil {
// Must reset reader to avoid partial output
co.reader.Reset()
return fmt.Errorf("failed to flush compressed data to reader: %w", err)
}
}
return nil
} }
func (co *ChannelOut) FullErr() error { func (co *ChannelOut) FullErr() error {
...@@ -163,6 +275,11 @@ func (co *ChannelOut) Close() error { ...@@ -163,6 +275,11 @@ func (co *ChannelOut) Close() error {
return errors.New("already closed") return errors.New("already closed")
} }
co.closed = true co.closed = true
if co.batchType == SpanBatchType {
if err := co.Flush(); err != nil {
return err
}
}
return co.compress.Close() return co.compress.Close()
} }
...@@ -186,8 +303,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro ...@@ -186,8 +303,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro
// Copy data from the local buffer into the frame data buffer // Copy data from the local buffer into the frame data buffer
maxDataSize := maxSize - FrameV0OverHeadSize maxDataSize := maxSize - FrameV0OverHeadSize
if maxDataSize > uint64(co.compress.Len()) { if maxDataSize > uint64(co.ReadyBytes()) {
maxDataSize = uint64(co.compress.Len()) maxDataSize = uint64(co.ReadyBytes())
// If we are closed & will not spill past the current frame // If we are closed & will not spill past the current frame
// mark it is the final frame of the channel. // mark it is the final frame of the channel.
if co.closed { if co.closed {
...@@ -196,7 +313,7 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro ...@@ -196,7 +313,7 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro
} }
f.Data = make([]byte, maxDataSize) f.Data = make([]byte, maxDataSize)
if _, err := io.ReadFull(co.compress, f.Data); err != nil { if _, err := io.ReadFull(co.reader, f.Data); err != nil {
return 0, err return 0, err
} }
...@@ -213,8 +330,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro ...@@ -213,8 +330,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro
} }
} }
// BlockToBatch transforms a block into a batch object that can easily be RLP encoded. // BlockToSingularBatch transforms a block into a batch object that can easily be RLP encoded.
func BlockToBatch(block *types.Block) (*BatchData, L1BlockInfo, error) { func BlockToSingularBatch(block *types.Block) (*SingularBatch, L1BlockInfo, error) {
opaqueTxs := make([]hexutil.Bytes, 0, len(block.Transactions())) opaqueTxs := make([]hexutil.Bytes, 0, len(block.Transactions()))
for i, tx := range block.Transactions() { for i, tx := range block.Transactions() {
if tx.Type() == types.DepositTxType { if tx.Type() == types.DepositTxType {
...@@ -238,15 +355,13 @@ func BlockToBatch(block *types.Block) (*BatchData, L1BlockInfo, error) { ...@@ -238,15 +355,13 @@ func BlockToBatch(block *types.Block) (*BatchData, L1BlockInfo, error) {
return nil, l1Info, fmt.Errorf("could not parse the L1 Info deposit: %w", err) return nil, l1Info, fmt.Errorf("could not parse the L1 Info deposit: %w", err)
} }
return NewSingularBatchData( return &SingularBatch{
SingularBatch{
ParentHash: block.ParentHash(), ParentHash: block.ParentHash(),
EpochNum: rollup.Epoch(l1Info.Number), EpochNum: rollup.Epoch(l1Info.Number),
EpochHash: l1Info.BlockHash, EpochHash: l1Info.BlockHash,
Timestamp: block.Time(), Timestamp: block.Time(),
Transactions: opaqueTxs, Transactions: opaqueTxs,
}, }, l1Info, nil
), l1Info, nil
} }
// ForceCloseTxData generates the transaction data for a transaction which will force close // ForceCloseTxData generates the transaction data for a transaction which will force close
......
...@@ -29,7 +29,7 @@ func (s *nonCompressor) FullErr() error { ...@@ -29,7 +29,7 @@ func (s *nonCompressor) FullErr() error {
} }
func TestChannelOutAddBlock(t *testing.T) { func TestChannelOutAddBlock(t *testing.T) {
cout, err := NewChannelOut(&nonCompressor{}) cout, err := NewChannelOut(&nonCompressor{}, SingularBatchType, nil)
require.NoError(t, err) require.NoError(t, err)
t.Run("returns err if first tx is not an l1info tx", func(t *testing.T) { t.Run("returns err if first tx is not an l1info tx", func(t *testing.T) {
...@@ -50,7 +50,7 @@ func TestChannelOutAddBlock(t *testing.T) { ...@@ -50,7 +50,7 @@ func TestChannelOutAddBlock(t *testing.T) {
// max size that is below the fixed frame size overhead of 23, will return // max size that is below the fixed frame size overhead of 23, will return
// an error. // an error.
func TestOutputFrameSmallMaxSize(t *testing.T) { func TestOutputFrameSmallMaxSize(t *testing.T) {
cout, err := NewChannelOut(&nonCompressor{}) cout, err := NewChannelOut(&nonCompressor{}, SingularBatchType, nil)
require.NoError(t, err) require.NoError(t, err)
// Call OutputFrame with the range of small max size values that err // Call OutputFrame with the range of small max size values that err
...@@ -97,42 +97,42 @@ func TestForceCloseTxData(t *testing.T) { ...@@ -97,42 +97,42 @@ func TestForceCloseTxData(t *testing.T) {
output: "", output: "",
}, },
{ {
frames: []Frame{Frame{FrameNumber: 0, IsLast: false}, Frame{ID: id, FrameNumber: 1, IsLast: true}}, frames: []Frame{{FrameNumber: 0, IsLast: false}, {ID: id, FrameNumber: 1, IsLast: true}},
errors: true, errors: true,
output: "", output: "",
}, },
{ {
frames: []Frame{Frame{ID: id, FrameNumber: 0, IsLast: false}}, frames: []Frame{{ID: id, FrameNumber: 0, IsLast: false}},
errors: false, errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000001", output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000001",
}, },
{ {
frames: []Frame{Frame{ID: id, FrameNumber: 0, IsLast: true}}, frames: []Frame{{ID: id, FrameNumber: 0, IsLast: true}},
errors: false, errors: false,
output: "00", output: "00",
}, },
{ {
frames: []Frame{Frame{ID: id, FrameNumber: 1, IsLast: false}}, frames: []Frame{{ID: id, FrameNumber: 1, IsLast: false}},
errors: false, errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000001", output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000001",
}, },
{ {
frames: []Frame{Frame{ID: id, FrameNumber: 1, IsLast: true}}, frames: []Frame{{ID: id, FrameNumber: 1, IsLast: true}},
errors: false, errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000", output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000",
}, },
{ {
frames: []Frame{Frame{ID: id, FrameNumber: 2, IsLast: true}}, frames: []Frame{{ID: id, FrameNumber: 2, IsLast: true}},
errors: false, errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000deadbeefdeadbeefdeadbeefdeadbeef00010000000000", output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000deadbeefdeadbeefdeadbeefdeadbeef00010000000000",
}, },
{ {
frames: []Frame{Frame{ID: id, FrameNumber: 1, IsLast: false}, Frame{ID: id, FrameNumber: 3, IsLast: true}}, frames: []Frame{{ID: id, FrameNumber: 1, IsLast: false}, {ID: id, FrameNumber: 3, IsLast: true}},
errors: false, errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000deadbeefdeadbeefdeadbeefdeadbeef00020000000000", output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000deadbeefdeadbeefdeadbeefdeadbeef00020000000000",
}, },
{ {
frames: []Frame{Frame{ID: id, FrameNumber: 1, IsLast: false}, Frame{ID: id, FrameNumber: 3, IsLast: true}, Frame{ID: id, FrameNumber: 5, IsLast: true}}, frames: []Frame{{ID: id, FrameNumber: 1, IsLast: false}, {ID: id, FrameNumber: 3, IsLast: true}, {ID: id, FrameNumber: 5, IsLast: true}},
errors: false, errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000deadbeefdeadbeefdeadbeefdeadbeef00020000000000", output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000deadbeefdeadbeefdeadbeefdeadbeef00020000000000",
}, },
...@@ -152,6 +152,6 @@ func TestForceCloseTxData(t *testing.T) { ...@@ -152,6 +152,6 @@ func TestForceCloseTxData(t *testing.T) {
func TestBlockToBatchValidity(t *testing.T) { func TestBlockToBatchValidity(t *testing.T) {
block := new(types.Block) block := new(types.Block)
_, _, err := BlockToBatch(block) _, _, err := BlockToSingularBatch(block)
require.ErrorContains(t, err, "has no transactions") require.ErrorContains(t, err, "has no transactions")
} }
...@@ -155,6 +155,7 @@ services: ...@@ -155,6 +155,7 @@ services:
OP_BATCHER_PPROF_ENABLED: "true" OP_BATCHER_PPROF_ENABLED: "true"
OP_BATCHER_METRICS_ENABLED: "true" OP_BATCHER_METRICS_ENABLED: "true"
OP_BATCHER_RPC_ENABLE_ADMIN: "true" OP_BATCHER_RPC_ENABLE_ADMIN: "true"
OP_BATCHER_BATCH_TYPE: 0
artifact-server: artifact-server:
depends_on: depends_on:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment