Commit 5f5db777 authored by Tei Im's avatar Tei Im

Use sequencer number to calculate origin bit of span batch

Remove lastProcessedBlock from ChannelManager
Add seqNum arg to ChannelOut AddSingularBatch interface
parent 33983267
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"math" "math"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -26,8 +27,8 @@ type channel struct { ...@@ -26,8 +27,8 @@ type channel struct {
confirmedTransactions map[txID]eth.BlockID confirmedTransactions map[txID]eth.BlockID
} }
func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, spanBatchBuilder *derive.SpanBatchBuilder) (*channel, error) { func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rcfg *rollup.Config) (*channel, error) {
cb, err := newChannelBuilder(cfg, spanBatchBuilder) cb, err := newChannelBuilder(cfg, rcfg)
if err != nil { if err != nil {
return nil, fmt.Errorf("creating new channel: %w", err) return nil, fmt.Errorf("creating new channel: %w", err)
} }
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"math" "math"
"github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
...@@ -134,11 +135,15 @@ type channelBuilder struct { ...@@ -134,11 +135,15 @@ type channelBuilder struct {
// newChannelBuilder creates a new channel builder or returns an error if the // newChannelBuilder creates a new channel builder or returns an error if the
// channel out could not be created. // channel out could not be created.
func newChannelBuilder(cfg ChannelConfig, spanBatchBuilder *derive.SpanBatchBuilder) (*channelBuilder, error) { func newChannelBuilder(cfg ChannelConfig, rcfg *rollup.Config) (*channelBuilder, error) {
c, err := cfg.CompressorConfig.NewCompressor() c, err := cfg.CompressorConfig.NewCompressor()
if err != nil { if err != nil {
return nil, err return nil, err
} }
var spanBatchBuilder *derive.SpanBatchBuilder
if cfg.BatchType == derive.SpanBatchType {
spanBatchBuilder = derive.NewSpanBatchBuilder(rcfg.Genesis.L2Time, rcfg.L2ChainID)
}
co, err := derive.NewChannelOut(cfg.BatchType, c, spanBatchBuilder) co, err := derive.NewChannelOut(cfg.BatchType, c, spanBatchBuilder)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -206,7 +211,7 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error ...@@ -206,7 +211,7 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error
return l1info, fmt.Errorf("converting block to batch: %w", err) return l1info, fmt.Errorf("converting block to batch: %w", err)
} }
if _, err = c.co.AddSingularBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) { if _, err = c.co.AddSingularBatch(batch, l1info.SequenceNumber); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
c.setFullErr(err) c.setFullErr(err)
return l1info, c.FullErr() return l1info, c.FullErr()
} else if err != nil { } else if err != nil {
......
...@@ -35,12 +35,9 @@ var defaultTestChannelConfig = ChannelConfig{ ...@@ -35,12 +35,9 @@ var defaultTestChannelConfig = ChannelConfig{
BatchType: derive.SingularBatchType, BatchType: derive.SingularBatchType,
} }
func getSpanBatchBuilder(batchType uint) *derive.SpanBatchBuilder { var defaultTestRollupConfig = rollup.Config{
if batchType == derive.SpanBatchType { Genesis: rollup.Genesis{L2: eth.BlockID{Number: 0}},
chainId := big.NewInt(1234) L2ChainID: big.NewInt(1234),
return derive.NewSpanBatchBuilder(uint64(0), uint64(0), chainId)
}
return nil
} }
// TestChannelConfig_Check tests the [ChannelConfig] [Check] function. // TestChannelConfig_Check tests the [ChannelConfig] [Check] function.
...@@ -169,7 +166,7 @@ func newMiniL2BlockWithNumberParent(numTx int, number *big.Int, parent common.Ha ...@@ -169,7 +166,7 @@ func newMiniL2BlockWithNumberParent(numTx int, number *big.Int, parent common.Ha
func addTooManyBlocks(cb *channelBuilder) error { func addTooManyBlocks(cb *channelBuilder) error {
rng := rand.New(rand.NewSource(1234)) rng := rand.New(rand.NewSource(1234))
for i := 0; i < 10_000; i++ { for i := 0; i < 10_000; i++ {
block, _ := dtest.RandomL2Block(rng, 1000) block := dtest.RandomL2BlockWithChainId(rng, 1000, defaultTestRollupConfig.L2ChainID)
_, err := cb.AddBlock(block) _, err := cb.AddBlock(block)
if err != nil { if err != nil {
return err return err
...@@ -518,7 +515,7 @@ func TestChannelBuilder_OutputFramesWorks_SpanBatch(t *testing.T) { ...@@ -518,7 +515,7 @@ func TestChannelBuilder_OutputFramesWorks_SpanBatch(t *testing.T) {
channelConfig.BatchType = derive.SpanBatchType channelConfig.BatchType = derive.SpanBatchType
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig, getSpanBatchBuilder(derive.SpanBatchType)) cb, err := newChannelBuilder(channelConfig, &defaultTestRollupConfig)
require.NoError(t, err) require.NoError(t, err)
require.False(t, cb.IsFull()) require.False(t, cb.IsFull())
require.Equal(t, 0, cb.PendingFrames()) require.Equal(t, 0, cb.PendingFrames())
...@@ -571,7 +568,7 @@ func ChannelBuilder_MaxRLPBytesPerChannel(t *testing.T, batchType uint) { ...@@ -571,7 +568,7 @@ func ChannelBuilder_MaxRLPBytesPerChannel(t *testing.T, batchType uint) {
channelConfig.BatchType = batchType channelConfig.BatchType = batchType
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig, getSpanBatchBuilder(batchType)) cb, err := newChannelBuilder(channelConfig, &defaultTestRollupConfig)
require.NoError(t, err) require.NoError(t, err)
// Add a block that overflows the [ChannelOut] // Add a block that overflows the [ChannelOut]
...@@ -594,12 +591,12 @@ func ChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T, batchType uint) { ...@@ -594,12 +591,12 @@ func ChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T, batchType uint) {
// Continuously add blocks until the max frame index is reached // Continuously add blocks until the max frame index is reached
// This should cause the [channelBuilder.OutputFrames] function // This should cause the [channelBuilder.OutputFrames] function
// to error // to error
cb, err := newChannelBuilder(channelConfig, getSpanBatchBuilder(batchType)) cb, err := newChannelBuilder(channelConfig, &defaultTestRollupConfig)
require.NoError(t, err) require.NoError(t, err)
require.False(t, cb.IsFull()) require.False(t, cb.IsFull())
require.Equal(t, 0, cb.PendingFrames()) require.Equal(t, 0, cb.PendingFrames())
for { for {
a, _ := dtest.RandomL2Block(rng, 1) a := dtest.RandomL2BlockWithChainId(rng, 1, defaultTestRollupConfig.L2ChainID)
_, err = cb.AddBlock(a) _, err = cb.AddBlock(a)
if cb.IsFull() { if cb.IsFull() {
fullErr := cb.FullErr() fullErr := cb.FullErr()
...@@ -627,7 +624,7 @@ func ChannelBuilder_AddBlock(t *testing.T, batchType uint) { ...@@ -627,7 +624,7 @@ func ChannelBuilder_AddBlock(t *testing.T, batchType uint) {
channelConfig.CompressorConfig.ApproxComprRatio = 1 channelConfig.CompressorConfig.ApproxComprRatio = 1
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig, getSpanBatchBuilder(batchType)) cb, err := newChannelBuilder(channelConfig, &defaultTestRollupConfig)
require.NoError(t, err) require.NoError(t, err)
// Add a nonsense block to the channel builder // Add a nonsense block to the channel builder
...@@ -660,7 +657,7 @@ func ChannelBuilder_Reset(t *testing.T, batchType uint) { ...@@ -660,7 +657,7 @@ func ChannelBuilder_Reset(t *testing.T, batchType uint) {
channelConfig.CompressorConfig.TargetFrameSize = 24 channelConfig.CompressorConfig.TargetFrameSize = 24
channelConfig.CompressorConfig.ApproxComprRatio = 1 channelConfig.CompressorConfig.ApproxComprRatio = 1
cb, err := newChannelBuilder(channelConfig, getSpanBatchBuilder(batchType)) cb, err := newChannelBuilder(channelConfig, &defaultTestRollupConfig)
require.NoError(t, err) require.NoError(t, err)
// Add a nonsense block to the channel builder // Add a nonsense block to the channel builder
...@@ -771,7 +768,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) { ...@@ -771,7 +768,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) {
cfg.CompressorConfig.TargetNumFrames = tnf cfg.CompressorConfig.TargetNumFrames = tnf
cfg.CompressorConfig.Kind = "shadow" cfg.CompressorConfig.Kind = "shadow"
cfg.BatchType = batchType cfg.BatchType = batchType
cb, err := newChannelBuilder(cfg, getSpanBatchBuilder(batchType)) cb, err := newChannelBuilder(cfg, &defaultTestRollupConfig)
require.NoError(err) require.NoError(err)
// initial builder should be empty // initial builder should be empty
...@@ -780,7 +777,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) { ...@@ -780,7 +777,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) {
// fill up // fill up
for { for {
block, _ := dtest.RandomL2Block(rng, 4) block := dtest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
_, err := cb.AddBlock(block) _, err := cb.AddBlock(block)
if cb.IsFull() { if cb.IsFull() {
break break
...@@ -810,21 +807,25 @@ func ChannelBuilder_InputBytes(t *testing.T, batchType uint) { ...@@ -810,21 +807,25 @@ func ChannelBuilder_InputBytes(t *testing.T, batchType uint) {
rng := rand.New(rand.NewSource(4982432)) rng := rand.New(rand.NewSource(4982432))
cfg := defaultTestChannelConfig cfg := defaultTestChannelConfig
cfg.BatchType = batchType cfg.BatchType = batchType
spanBatchBuilder := getSpanBatchBuilder(batchType) var spanBatchBuilder *derive.SpanBatchBuilder
cb, err := newChannelBuilder(cfg, getSpanBatchBuilder(batchType)) if batchType == derive.SpanBatchType {
chainId := big.NewInt(1234)
spanBatchBuilder = derive.NewSpanBatchBuilder(uint64(0), chainId)
}
cb, err := newChannelBuilder(cfg, &defaultTestRollupConfig)
require.NoError(err) require.NoError(err)
require.Zero(cb.InputBytes()) require.Zero(cb.InputBytes())
var l int var l int
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
block := newMiniL2Block(rng.Intn(32)) block := dtest.RandomL2BlockWithChainId(rng, rng.Intn(32), defaultTestRollupConfig.L2ChainID)
if batchType == derive.SingularBatchType { if batchType == derive.SingularBatchType {
l += blockBatchRlpSize(t, block) l += blockBatchRlpSize(t, block)
} else { } else {
singularBatch, _, err := derive.BlockToSingularBatch(block) singularBatch, l1Info, err := derive.BlockToSingularBatch(block)
require.NoError(err) require.NoError(err)
spanBatchBuilder.AppendSingularBatch(singularBatch) spanBatchBuilder.AppendSingularBatch(singularBatch, l1Info.SequenceNumber)
rawSpanBatch, err := spanBatchBuilder.GetRawSpanBatch() rawSpanBatch, err := spanBatchBuilder.GetRawSpanBatch()
require.NoError(err) require.NoError(err)
batch := derive.NewSpanBatchData(*rawSpanBatch) batch := derive.NewSpanBatchData(*rawSpanBatch)
...@@ -847,13 +848,13 @@ func ChannelBuilder_OutputBytes(t *testing.T, batchType uint) { ...@@ -847,13 +848,13 @@ func ChannelBuilder_OutputBytes(t *testing.T, batchType uint) {
cfg.CompressorConfig.TargetNumFrames = 16 cfg.CompressorConfig.TargetNumFrames = 16
cfg.CompressorConfig.ApproxComprRatio = 1.0 cfg.CompressorConfig.ApproxComprRatio = 1.0
cfg.BatchType = batchType cfg.BatchType = batchType
cb, err := newChannelBuilder(cfg, getSpanBatchBuilder(batchType)) cb, err := newChannelBuilder(cfg, &defaultTestRollupConfig)
require.NoError(err, "newChannelBuilder") require.NoError(err, "newChannelBuilder")
require.Zero(cb.OutputBytes()) require.Zero(cb.OutputBytes())
for { for {
block, _ := dtest.RandomL2Block(rng, rng.Intn(32)) block := dtest.RandomL2BlockWithChainId(rng, rng.Intn(32), defaultTestRollupConfig.L2ChainID)
_, err := cb.AddBlock(block) _, err := cb.AddBlock(block)
if errors.Is(err, derive.CompressorFullErr) { if errors.Is(err, derive.CompressorFullErr) {
break break
......
...@@ -36,9 +36,6 @@ type channelManager struct { ...@@ -36,9 +36,6 @@ type channelManager struct {
// last block hash - for reorg detection // last block hash - for reorg detection
tip common.Hash tip common.Hash
// last block added to channel. nil at first.
lastProcessedBlock *eth.L2BlockRef
// channel to write new block data to // channel to write new block data to
currentChannel *channel currentChannel *channel
// channels to read frame data from, for writing batches onchain // channels to read frame data from, for writing batches onchain
...@@ -52,19 +49,18 @@ type channelManager struct { ...@@ -52,19 +49,18 @@ type channelManager struct {
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rcfg *rollup.Config) *channelManager { func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rcfg *rollup.Config) *channelManager {
return &channelManager{ return &channelManager{
log: log, log: log,
metr: metr, metr: metr,
cfg: cfg, cfg: cfg,
rcfg: rcfg, rcfg: rcfg,
txChannels: make(map[txID]*channel), txChannels: make(map[txID]*channel),
lastProcessedBlock: nil,
} }
} }
// Clear clears the entire state of the channel manager. // Clear clears the entire state of the channel manager.
// It is intended to be used before launching op-batcher and after an L2 reorg. // It is intended to be used before launching op-batcher and after an L2 reorg.
// Must set lastProcessedBlock as current L2 safe head fetched from L2 node. // Must set lastProcessedBlock as current L2 safe head fetched from L2 node.
func (s *channelManager) Clear(safeHead *eth.L2BlockRef) { func (s *channelManager) Clear() {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.log.Trace("clearing channel manager state") s.log.Trace("clearing channel manager state")
...@@ -74,7 +70,6 @@ func (s *channelManager) Clear(safeHead *eth.L2BlockRef) { ...@@ -74,7 +70,6 @@ func (s *channelManager) Clear(safeHead *eth.L2BlockRef) {
s.currentChannel = nil s.currentChannel = nil
s.channelQueue = nil s.channelQueue = nil
s.txChannels = make(map[txID]*channel) s.txChannels = make(map[txID]*channel)
s.lastProcessedBlock = safeHead
} }
// TxFailed records a transaction as failed. It will attempt to resubmit the data // TxFailed records a transaction as failed. It will attempt to resubmit the data
...@@ -204,19 +199,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { ...@@ -204,19 +199,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil return nil
} }
var spanBatchBuilder *derive.SpanBatchBuilder pc, err := newChannel(s.log, s.metr, s.cfg, s.rcfg)
if s.cfg.BatchType == derive.SpanBatchType {
if s.lastProcessedBlock == nil {
// TODO: we can remove "lastProcessedBlock" if we change the data-builder
// to append a singular-batch *with* the L2 metadata such as the L1-block-info seq-number;
// this helps determine whether or not the L1 origin changed in the first block of the span,
// without having to remember the last block from before the span.
return errors.New("last block is not initialized")
}
// Pass the current lastProcessedBlock as the parent
spanBatchBuilder = derive.NewSpanBatchBuilder(s.lastProcessedBlock.L1Origin.Number, s.rcfg.Genesis.L2Time, s.rcfg.L2ChainID)
}
pc, err := newChannel(s.log, s.metr, s.cfg, spanBatchBuilder)
if err != nil { if err != nil {
return fmt.Errorf("creating new channel: %w", err) return fmt.Errorf("creating new channel: %w", err)
} }
...@@ -262,7 +245,6 @@ func (s *channelManager) processBlocks() error { ...@@ -262,7 +245,6 @@ func (s *channelManager) processBlocks() error {
blocksAdded += 1 blocksAdded += 1
latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info) latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info)
s.metr.RecordL2BlockInChannel(block) s.metr.RecordL2BlockInChannel(block)
s.lastProcessedBlock = &latestL2ref
// current block got added but channel is now full // current block got added but channel is now full
if s.currentChannel.IsFull() { if s.currentChannel.IsFull() {
break break
......
...@@ -14,7 +14,6 @@ import ( ...@@ -14,7 +14,6 @@ import (
derivetest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test" derivetest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -55,7 +54,7 @@ func TestChannelManagerBatchType(t *testing.T) { ...@@ -55,7 +54,7 @@ func TestChannelManagerBatchType(t *testing.T) {
func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) { func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{BatchType: batchType}, &rollup.Config{}) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{BatchType: batchType}, &rollup.Config{})
m.Clear(&eth.L2BlockRef{}) m.Clear()
a := types.NewBlock(&types.Header{ a := types.NewBlock(&types.Header{
Number: big.NewInt(0), Number: big.NewInt(0),
...@@ -97,7 +96,7 @@ func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) { ...@@ -97,7 +96,7 @@ func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) {
}, },
&rollup.Config{}, &rollup.Config{},
) )
m.Clear(&eth.L2BlockRef{}) m.Clear()
a := newMiniL2Block(0) a := newMiniL2Block(0)
x := newMiniL2BlockWithNumberParent(0, big.NewInt(1), common.Hash{0xff}) x := newMiniL2BlockWithNumberParent(0, big.NewInt(1), common.Hash{0xff})
...@@ -134,7 +133,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { ...@@ -134,7 +133,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
}, },
BatchType: batchType, BatchType: batchType,
}, },
&rollup.Config{}, &defaultTestRollupConfig,
) )
// Channel Manager state should be empty by default // Channel Manager state should be empty by default
...@@ -143,12 +142,11 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { ...@@ -143,12 +142,11 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
require.Nil(m.currentChannel) require.Nil(m.currentChannel)
require.Empty(m.channelQueue) require.Empty(m.channelQueue)
require.Empty(m.txChannels) require.Empty(m.txChannels)
require.Nil(m.lastProcessedBlock)
// Set the last block // Set the last block
m.Clear(&eth.L2BlockRef{}) m.Clear()
// Add a block to the channel manager // Add a block to the channel manager
a, _ := derivetest.RandomL2Block(rng, 4) a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
newL1Tip := a.Hash() newL1Tip := a.Hash()
l1BlockID := eth.BlockID{ l1BlockID := eth.BlockID{
Hash: a.Hash(), Hash: a.Hash(),
...@@ -185,8 +183,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { ...@@ -185,8 +183,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
require.Equal(b.Hash(), m.tip) require.Equal(b.Hash(), m.tip)
// Clear the channel manager // Clear the channel manager
safeHead := testutils.RandomL2BlockRef(rng) m.Clear()
m.Clear(&safeHead)
// Check that the entire channel manager state cleared // Check that the entire channel manager state cleared
require.Empty(m.blocks) require.Empty(m.blocks)
...@@ -194,7 +191,6 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { ...@@ -194,7 +191,6 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
require.Nil(m.currentChannel) require.Nil(m.currentChannel)
require.Empty(m.channelQueue) require.Empty(m.channelQueue)
require.Empty(m.txChannels) require.Empty(m.txChannels)
require.Equal(m.lastProcessedBlock, &safeHead)
} }
func ChannelManager_TxResend(t *testing.T, batchType uint) { func ChannelManager_TxResend(t *testing.T, batchType uint) {
...@@ -211,11 +207,11 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) { ...@@ -211,11 +207,11 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) {
}, },
BatchType: batchType, BatchType: batchType,
}, },
&rollup.Config{}, &defaultTestRollupConfig,
) )
m.Clear(&eth.L2BlockRef{}) m.Clear()
a, _ := derivetest.RandomL2Block(rng, 4) a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
require.NoError(m.AddL2Block(a)) require.NoError(m.AddL2Block(a))
...@@ -259,11 +255,11 @@ func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) { ...@@ -259,11 +255,11 @@ func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) {
}, },
BatchType: batchType, BatchType: batchType,
}, },
&rollup.Config{}, &defaultTestRollupConfig,
) )
m.Clear(&eth.L2BlockRef{}) m.Clear()
a, _ := derivetest.RandomL2Block(rng, 4) a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
m.Close() m.Close()
...@@ -291,9 +287,9 @@ func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) { ...@@ -291,9 +287,9 @@ func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) {
}, },
BatchType: batchType, BatchType: batchType,
}, },
&rollup.Config{}, &defaultTestRollupConfig,
) )
m.Clear(&eth.L2BlockRef{}) m.Clear()
a := newMiniL2Block(0) a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash()) b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
...@@ -322,30 +318,30 @@ func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) { ...@@ -322,30 +318,30 @@ func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) {
// new channel frames after this point. // new channel frames after this point.
func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) { func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
MaxFrameSize: 1000, MaxFrameSize: 10000,
ChannelTimeout: 1000, ChannelTimeout: 1000,
CompressorConfig: compressor.Config{ CompressorConfig: compressor.Config{
TargetNumFrames: 1, TargetNumFrames: 1,
TargetFrameSize: 1000, TargetFrameSize: 10000,
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
}, },
BatchType: batchType, BatchType: batchType,
}, },
&rollup.Config{}, &defaultTestRollupConfig,
) )
m.Clear(&eth.L2BlockRef{}) m.Clear()
numTx := 50000 numTx := 20 // Adjust number of txs to make 2 frames
if batchType == derive.SpanBatchType { a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
// Adjust number of txs to make 2 frames b := derivetest.RandomL2BlockWithChainId(rng, 10, defaultTestRollupConfig.L2ChainID)
// Encoding empty txs as span batch requires more data size because span batch encodes tx signature to fixed length bHeader := b.Header()
numTx = 20000 bHeader.Number = new(big.Int).Add(a.Number(), big.NewInt(1))
} bHeader.ParentHash = a.Hash()
a := newMiniL2Block(numTx) b = b.WithSeal(bHeader)
b := newMiniL2BlockWithNumberParent(10, big.NewInt(1), a.Hash())
err := m.AddL2Block(a) err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block") require.NoError(err, "Failed to add L2 block")
...@@ -377,6 +373,7 @@ func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) { ...@@ -377,6 +373,7 @@ func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
// have successfully landed on chain. // have successfully landed on chain.
func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) { func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
...@@ -388,11 +385,11 @@ func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) { ...@@ -388,11 +385,11 @@ func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
ApproxComprRatio: 1.0, ApproxComprRatio: 1.0,
}, },
BatchType: batchType, BatchType: batchType,
}, &rollup.Config{}, }, &defaultTestRollupConfig,
) )
m.Clear(&eth.L2BlockRef{}) m.Clear()
a := newMiniL2Block(50_000) a := derivetest.RandomL2BlockWithChainId(rng, 50000, defaultTestRollupConfig.L2ChainID)
err := m.AddL2Block(a) err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block") require.NoError(err, "Failed to add L2 block")
......
...@@ -22,7 +22,7 @@ func TestChannelTimeout(t *testing.T) { ...@@ -22,7 +22,7 @@ func TestChannelTimeout(t *testing.T) {
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
ChannelTimeout: 100, ChannelTimeout: 100,
}, &rollup.Config{}) }, &rollup.Config{})
m.Clear(&eth.L2BlockRef{}) m.Clear()
// Pending channel is nil so is cannot be timed out // Pending channel is nil so is cannot be timed out
require.Nil(t, m.currentChannel) require.Nil(t, m.currentChannel)
...@@ -64,7 +64,7 @@ func TestChannelTimeout(t *testing.T) { ...@@ -64,7 +64,7 @@ func TestChannelTimeout(t *testing.T) {
func TestChannelNextTxData(t *testing.T) { func TestChannelNextTxData(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{}) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear(&eth.L2BlockRef{}) m.Clear()
// Nil pending channel should return EOF // Nil pending channel should return EOF
returnedTxData, err := m.nextTxData(nil) returnedTxData, err := m.nextTxData(nil)
...@@ -113,7 +113,7 @@ func TestChannelTxConfirmed(t *testing.T) { ...@@ -113,7 +113,7 @@ func TestChannelTxConfirmed(t *testing.T) {
// clearing confirmed transactions, and reseting the pendingChannels map // clearing confirmed transactions, and reseting the pendingChannels map
ChannelTimeout: 10, ChannelTimeout: 10,
}, &rollup.Config{}) }, &rollup.Config{})
m.Clear(&eth.L2BlockRef{}) m.Clear()
// Let's add a valid pending transaction to the channel manager // Let's add a valid pending transaction to the channel manager
// So we can demonstrate that TxConfirmed's correctness // So we can demonstrate that TxConfirmed's correctness
...@@ -162,7 +162,7 @@ func TestChannelTxFailed(t *testing.T) { ...@@ -162,7 +162,7 @@ func TestChannelTxFailed(t *testing.T) {
// Create a channel manager // Create a channel manager
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{}) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear(&eth.L2BlockRef{}) m.Clear()
// Let's add a valid pending transaction to the channel // Let's add a valid pending transaction to the channel
// manager so we can demonstrate correctness // manager so we can demonstrate correctness
......
...@@ -91,11 +91,7 @@ func (l *BatchSubmitter) StartBatchSubmitting() error { ...@@ -91,11 +91,7 @@ func (l *BatchSubmitter) StartBatchSubmitting() error {
l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background()) l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background())
l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background()) l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background())
syncStatus, err := fetchSyncStatus(l.shutdownCtx, l.RollupClient, l.Cfg.NetworkTimeout) l.state.Clear()
if err != nil {
return err
}
l.state.Clear(&syncStatus.SafeL2)
l.lastStoredBlock = eth.BlockID{} l.lastStoredBlock = eth.BlockID{}
l.wg.Add(1) l.wg.Add(1)
...@@ -205,9 +201,15 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin ...@@ -205,9 +201,15 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state. // calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions) // It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions)
func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) { func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) {
syncStatus, err := fetchSyncStatus(ctx, l.RollupClient, l.Cfg.NetworkTimeout) ctx, cancel := context.WithTimeout(ctx, l.Cfg.NetworkTimeout)
defer cancel()
syncStatus, err := l.RollupClient.SyncStatus(ctx)
// Ensure that we have the sync status
if err != nil { if err != nil {
return eth.BlockID{}, eth.BlockID{}, err return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("failed to get sync status: %w", err)
}
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
return eth.BlockID{}, eth.BlockID{}, errors.New("empty sync status")
} }
// Check last stored to see if it needs to be set on startup OR set if is lagged behind. // Check last stored to see if it needs to be set on startup OR set if is lagged behind.
...@@ -257,12 +259,7 @@ func (l *BatchSubmitter) loop() { ...@@ -257,12 +259,7 @@ func (l *BatchSubmitter) loop() {
l.Log.Error("error closing the channel manager to handle a L2 reorg", "err", err) l.Log.Error("error closing the channel manager to handle a L2 reorg", "err", err)
} }
l.publishStateToL1(queue, receiptsCh, true) l.publishStateToL1(queue, receiptsCh, true)
if syncStatus, err := fetchSyncStatus(l.shutdownCtx, l.RollupClient, l.Cfg.NetworkTimeout); err == nil { l.state.Clear()
l.state.Clear(&syncStatus.SafeL2)
} else {
// if fetchSyncStatus failed, ErrReorg will be returned again
l.Log.Error("error fetching sync status from L2 node", "err", err)
}
continue continue
} }
l.publishStateToL1(queue, receiptsCh, false) l.publishStateToL1(queue, receiptsCh, false)
...@@ -398,17 +395,3 @@ func (l *BatchSubmitter) l1Tip(ctx context.Context) (eth.L1BlockRef, error) { ...@@ -398,17 +395,3 @@ func (l *BatchSubmitter) l1Tip(ctx context.Context) (eth.L1BlockRef, error) {
} }
return eth.InfoToL1BlockRef(eth.HeaderBlockInfo(head)), nil return eth.InfoToL1BlockRef(eth.HeaderBlockInfo(head)), nil
} }
func fetchSyncStatus(ctx context.Context, rollupNode RollupClient, timeout time.Duration) (*eth.SyncStatus, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
syncStatus, err := rollupNode.SyncStatus(ctx)
// Ensure that we have the sync status
if err != nil {
return &eth.SyncStatus{}, fmt.Errorf("failed to get sync status: %w", err)
}
if syncStatus.SafeL2 == (eth.L2BlockRef{}) {
return &eth.SyncStatus{}, errors.New("empty sync status")
}
return syncStatus, nil
}
...@@ -52,7 +52,7 @@ type ChannelOut interface { ...@@ -52,7 +52,7 @@ type ChannelOut interface {
ID() ChannelID ID() ChannelID
Reset() error Reset() error
AddBlock(*types.Block) (uint64, error) AddBlock(*types.Block) (uint64, error)
AddSingularBatch(*SingularBatch) (uint64, error) AddSingularBatch(*SingularBatch, uint64) (uint64, error)
InputBytes() int InputBytes() int
ReadyBytes() int ReadyBytes() int
Flush() error Flush() error
...@@ -123,11 +123,11 @@ func (co *SingularChannelOut) AddBlock(block *types.Block) (uint64, error) { ...@@ -123,11 +123,11 @@ func (co *SingularChannelOut) AddBlock(block *types.Block) (uint64, error) {
return 0, errors.New("already closed") return 0, errors.New("already closed")
} }
batch, _, err := BlockToSingularBatch(block) batch, l1Info, err := BlockToSingularBatch(block)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return co.AddSingularBatch(batch) return co.AddSingularBatch(batch, l1Info.SequenceNumber)
} }
// AddSingularBatch adds a batch to the channel. It returns the RLP encoded byte size // AddSingularBatch adds a batch to the channel. It returns the RLP encoded byte size
...@@ -138,7 +138,7 @@ func (co *SingularChannelOut) AddBlock(block *types.Block) (uint64, error) { ...@@ -138,7 +138,7 @@ func (co *SingularChannelOut) AddBlock(block *types.Block) (uint64, error) {
// AddSingularBatch should be used together with BlockToBatch if you need to access the // AddSingularBatch should be used together with BlockToBatch if you need to access the
// BatchData before adding a block to the channel. It isn't possible to access // BatchData before adding a block to the channel. It isn't possible to access
// the batch data with AddBlock. // the batch data with AddBlock.
func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch) (uint64, error) { func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch, _ uint64) (uint64, error) {
if co.closed { if co.closed {
return 0, errors.New("already closed") return 0, errors.New("already closed")
} }
......
...@@ -597,31 +597,32 @@ func NewSpanBatch(singularBatches []*SingularBatch) *SpanBatch { ...@@ -597,31 +597,32 @@ func NewSpanBatch(singularBatches []*SingularBatch) *SpanBatch {
// SpanBatchBuilder is a utility type to build a SpanBatch by adding a SingularBatch one by one. // SpanBatchBuilder is a utility type to build a SpanBatch by adding a SingularBatch one by one.
// makes easier to stack SingularBatches and convert to RawSpanBatch for encoding. // makes easier to stack SingularBatches and convert to RawSpanBatch for encoding.
type SpanBatchBuilder struct { type SpanBatchBuilder struct {
parentEpoch uint64
genesisTimestamp uint64 genesisTimestamp uint64
chainID *big.Int chainID *big.Int
spanBatch *SpanBatch spanBatch *SpanBatch
originChangedBit uint
} }
func NewSpanBatchBuilder(parentEpoch uint64, genesisTimestamp uint64, chainID *big.Int) *SpanBatchBuilder { func NewSpanBatchBuilder(genesisTimestamp uint64, chainID *big.Int) *SpanBatchBuilder {
return &SpanBatchBuilder{ return &SpanBatchBuilder{
parentEpoch: parentEpoch,
genesisTimestamp: genesisTimestamp, genesisTimestamp: genesisTimestamp,
chainID: chainID, chainID: chainID,
spanBatch: &SpanBatch{}, spanBatch: &SpanBatch{},
} }
} }
func (b *SpanBatchBuilder) AppendSingularBatch(singularBatch *SingularBatch) { func (b *SpanBatchBuilder) AppendSingularBatch(singularBatch *SingularBatch, seqNum uint64) {
if b.GetBlockCount() == 0 {
b.originChangedBit = 0
if seqNum == 0 {
b.originChangedBit = 1
}
}
b.spanBatch.AppendSingularBatch(singularBatch) b.spanBatch.AppendSingularBatch(singularBatch)
} }
func (b *SpanBatchBuilder) GetRawSpanBatch() (*RawSpanBatch, error) { func (b *SpanBatchBuilder) GetRawSpanBatch() (*RawSpanBatch, error) {
originChangedBit := 0 raw, err := b.spanBatch.ToRawSpanBatch(b.originChangedBit, b.genesisTimestamp, b.chainID)
if uint64(b.spanBatch.GetStartEpochNum()) != b.parentEpoch {
originChangedBit = 1
}
raw, err := b.spanBatch.ToRawSpanBatch(uint(originChangedBit), b.genesisTimestamp, b.chainID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -493,16 +493,16 @@ func TestSpanBatchBuilder(t *testing.T) { ...@@ -493,16 +493,16 @@ func TestSpanBatchBuilder(t *testing.T) {
} }
genesisTimeStamp := 1 + singularBatches[0].Timestamp - 128 genesisTimeStamp := 1 + singularBatches[0].Timestamp - 128
parentEpoch := uint64(singularBatches[0].EpochNum) var seqNum uint64 = 1
if originChangedBit == 1 { if originChangedBit == 1 {
parentEpoch -= 1 seqNum = 0
} }
spanBatchBuilder := NewSpanBatchBuilder(parentEpoch, genesisTimeStamp, chainID) spanBatchBuilder := NewSpanBatchBuilder(genesisTimeStamp, chainID)
assert.Equal(t, 0, spanBatchBuilder.GetBlockCount()) assert.Equal(t, 0, spanBatchBuilder.GetBlockCount())
for i := 0; i < len(singularBatches); i++ { for i := 0; i < len(singularBatches); i++ {
spanBatchBuilder.AppendSingularBatch(singularBatches[i]) spanBatchBuilder.AppendSingularBatch(singularBatches[i], seqNum)
assert.Equal(t, i+1, spanBatchBuilder.GetBlockCount()) assert.Equal(t, i+1, spanBatchBuilder.GetBlockCount())
assert.Equal(t, singularBatches[0].ParentHash.Bytes()[:20], spanBatchBuilder.spanBatch.parentCheck) assert.Equal(t, singularBatches[0].ParentHash.Bytes()[:20], spanBatchBuilder.spanBatch.parentCheck)
assert.Equal(t, singularBatches[i].EpochHash.Bytes()[:20], spanBatchBuilder.spanBatch.l1OriginCheck) assert.Equal(t, singularBatches[i].EpochHash.Bytes()[:20], spanBatchBuilder.spanBatch.l1OriginCheck)
......
...@@ -70,11 +70,11 @@ func (co *SpanChannelOut) AddBlock(block *types.Block) (uint64, error) { ...@@ -70,11 +70,11 @@ func (co *SpanChannelOut) AddBlock(block *types.Block) (uint64, error) {
return 0, errors.New("already closed") return 0, errors.New("already closed")
} }
batch, _, err := BlockToSingularBatch(block) batch, l1Info, err := BlockToSingularBatch(block)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return co.AddSingularBatch(batch) return co.AddSingularBatch(batch, l1Info.SequenceNumber)
} }
// AddSingularBatch adds a batch to the channel. It returns the RLP encoded byte size // AddSingularBatch adds a batch to the channel. It returns the RLP encoded byte size
...@@ -90,7 +90,7 @@ func (co *SpanChannelOut) AddBlock(block *types.Block) (uint64, error) { ...@@ -90,7 +90,7 @@ func (co *SpanChannelOut) AddBlock(block *types.Block) (uint64, error) {
// A channel can have only one SpanBatch. And compressed results should not be accessible until the channel is closed, since the prefix and payload can be changed. // A channel can have only one SpanBatch. And compressed results should not be accessible until the channel is closed, since the prefix and payload can be changed.
// So it resets channel contents and rewrites the entire SpanBatch each time, and compressed results are copied to reader after the channel is closed. // So it resets channel contents and rewrites the entire SpanBatch each time, and compressed results are copied to reader after the channel is closed.
// It makes we can only get frames once the channel is full or closed, in the case of SpanBatch. // It makes we can only get frames once the channel is full or closed, in the case of SpanBatch.
func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch) (uint64, error) { func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) (uint64, error) {
if co.closed { if co.closed {
return 0, errors.New("already closed") return 0, errors.New("already closed")
} }
...@@ -100,7 +100,7 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch) (uint64, error) ...@@ -100,7 +100,7 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch) (uint64, error)
} }
var buf bytes.Buffer var buf bytes.Buffer
// Append Singular batch to its span batch builder // Append Singular batch to its span batch builder
co.spanBatchBuilder.AppendSingularBatch(batch) co.spanBatchBuilder.AppendSingularBatch(batch, seqNum)
// Convert Span batch to RawSpanBatch // Convert Span batch to RawSpanBatch
rawSpanBatch, err := co.spanBatchBuilder.GetRawSpanBatch() rawSpanBatch, err := co.spanBatchBuilder.GetRawSpanBatch()
if err != nil { if err != nil {
......
package test package test
import ( import (
"math/big"
"math/rand" "math/rand"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -21,3 +22,13 @@ func RandomL2Block(rng *rand.Rand, txCount int) (*types.Block, []*types.Receipt) ...@@ -21,3 +22,13 @@ func RandomL2Block(rng *rand.Rand, txCount int) (*types.Block, []*types.Receipt)
} }
return testutils.RandomBlockPrependTxs(rng, txCount, types.NewTx(l1InfoTx)) return testutils.RandomBlockPrependTxs(rng, txCount, types.NewTx(l1InfoTx))
} }
func RandomL2BlockWithChainId(rng *rand.Rand, txCount int, chainId *big.Int) *types.Block {
signer := types.NewLondonSigner(chainId)
block, _ := RandomL2Block(rng, 0)
txs := []*types.Transaction{block.Transactions()[0]} // L1 info deposit TX
for i := 0; i < txCount; i++ {
txs = append(txs, testutils.RandomTx(rng, big.NewInt(int64(rng.Uint32())), signer))
}
return block.WithBody(txs, nil)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment