Commit 90533145 authored by Danyal Prout's avatar Danyal Prout Committed by GitHub

op-batcher: more accurate max channel duration tracking (#9769)

* Update channel timeout duration logic to persist across restarts
Co-authored-by: default avatarSebastian Stammler <stammler.s@gmail.com>

* Add tests for fetching safe l1 origin

---------
Co-authored-by: default avatarSebastian Stammler <stammler.s@gmail.com>
parent e57787ea
...@@ -34,11 +34,12 @@ type channel struct { ...@@ -34,11 +34,12 @@ type channel struct {
maxInclusionBlock uint64 maxInclusionBlock uint64
} }
func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config) (*channel, error) { func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64) (*channel, error) {
cb, err := NewChannelBuilder(cfg, *rollupCfg) cb, err := NewChannelBuilder(cfg, *rollupCfg, latestL1OriginBlockNum)
if err != nil { if err != nil {
return nil, fmt.Errorf("creating new channel: %w", err) return nil, fmt.Errorf("creating new channel: %w", err)
} }
return &channel{ return &channel{
log: log, log: log,
metr: metr, metr: metr,
...@@ -101,6 +102,11 @@ func (s *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*t ...@@ -101,6 +102,11 @@ func (s *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*t
return false, nil return false, nil
} }
// Timeout returns the channel timeout L1 block number. If there is no timeout set, it returns 0.
func (s *channel) Timeout() uint64 {
return s.channelBuilder.Timeout()
}
// updateInclusionBlocks finds the first & last confirmed tx and saves its inclusion numbers // updateInclusionBlocks finds the first & last confirmed tx and saves its inclusion numbers
func (s *channel) updateInclusionBlocks() { func (s *channel) updateInclusionBlocks() {
if len(s.confirmedTransactions) == 0 || !s.confirmedTxUpdated { if len(s.confirmedTransactions) == 0 || !s.confirmedTxUpdated {
...@@ -183,8 +189,8 @@ func (s *channel) FullErr() error { ...@@ -183,8 +189,8 @@ func (s *channel) FullErr() error {
return s.channelBuilder.FullErr() return s.channelBuilder.FullErr()
} }
func (s *channel) RegisterL1Block(l1BlockNum uint64) { func (s *channel) CheckTimeout(l1BlockNum uint64) {
s.channelBuilder.RegisterL1Block(l1BlockNum) s.channelBuilder.CheckTimeout(l1BlockNum)
} }
func (s *channel) AddBlock(block *types.Block) (*derive.L1BlockInfo, error) { func (s *channel) AddBlock(block *types.Block) (*derive.L1BlockInfo, error) {
...@@ -215,6 +221,11 @@ func (s *channel) OutputFrames() error { ...@@ -215,6 +221,11 @@ func (s *channel) OutputFrames() error {
return s.channelBuilder.OutputFrames() return s.channelBuilder.OutputFrames()
} }
// LatestL1Origin returns the latest L1 block origin from all the L2 blocks that have been added to the channel
func (c *channel) LatestL1Origin() eth.BlockID {
return c.channelBuilder.LatestL1Origin()
}
func (s *channel) Close() { func (s *channel) Close() {
s.channelBuilder.Close() s.channelBuilder.Close()
} }
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
...@@ -141,6 +142,8 @@ type ChannelBuilder struct { ...@@ -141,6 +142,8 @@ type ChannelBuilder struct {
co derive.ChannelOut co derive.ChannelOut
// list of blocks in the channel. Saved in case the channel must be rebuilt // list of blocks in the channel. Saved in case the channel must be rebuilt
blocks []*types.Block blocks []*types.Block
// latestL1Origin is the latest L1 origin of all the L2 blocks that have been added to the channel
latestL1Origin eth.BlockID
// frames data queue, to be send as txs // frames data queue, to be send as txs
frames []frameData frames []frameData
// total frames counter // total frames counter
...@@ -151,7 +154,7 @@ type ChannelBuilder struct { ...@@ -151,7 +154,7 @@ type ChannelBuilder struct {
// newChannelBuilder creates a new channel builder or returns an error if the // newChannelBuilder creates a new channel builder or returns an error if the
// channel out could not be created. // channel out could not be created.
func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config) (*ChannelBuilder, error) { func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1OriginBlockNum uint64) (*ChannelBuilder, error) {
c, err := cfg.CompressorConfig.NewCompressor() c, err := cfg.CompressorConfig.NewCompressor()
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -165,11 +168,15 @@ func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config) (*ChannelBuil ...@@ -165,11 +168,15 @@ func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config) (*ChannelBuil
return nil, err return nil, err
} }
return &ChannelBuilder{ cb := &ChannelBuilder{
cfg: cfg, cfg: cfg,
rollupCfg: rollupCfg, rollupCfg: rollupCfg,
co: co, co: co,
}, nil }
cb.updateDurationTimeout(latestL1OriginBlockNum)
return cb, nil
} }
func (c *ChannelBuilder) ID() derive.ChannelID { func (c *ChannelBuilder) ID() derive.ChannelID {
...@@ -197,14 +204,9 @@ func (c *ChannelBuilder) Blocks() []*types.Block { ...@@ -197,14 +204,9 @@ func (c *ChannelBuilder) Blocks() []*types.Block {
return c.blocks return c.blocks
} }
// Reset resets the internal state of the channel builder so that it can be // LatestL1Origin returns the latest L1 block origin from all the L2 blocks that have been added to the channel
// reused. Note that a new channel id is also generated by Reset. func (c *ChannelBuilder) LatestL1Origin() eth.BlockID {
func (c *ChannelBuilder) Reset() error { return c.latestL1Origin
c.blocks = c.blocks[:0]
c.frames = c.frames[:0]
c.timeout = 0
c.fullErr = nil
return c.co.Reset()
} }
// AddBlock adds a block to the channel compression pipeline. IsFull should be // AddBlock adds a block to the channel compression pipeline. IsFull should be
...@@ -234,9 +236,17 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro ...@@ -234,9 +236,17 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro
} else if err != nil { } else if err != nil {
return l1info, fmt.Errorf("adding block to channel out: %w", err) return l1info, fmt.Errorf("adding block to channel out: %w", err)
} }
c.blocks = append(c.blocks, block) c.blocks = append(c.blocks, block)
c.updateSwTimeout(batch) c.updateSwTimeout(batch)
if l1info.Number > c.latestL1Origin.Number {
c.latestL1Origin = eth.BlockID{
Hash: l1info.BlockHash,
Number: l1info.Number,
}
}
if err = c.co.FullErr(); err != nil { if err = c.co.FullErr(); err != nil {
c.setFullErr(err) c.setFullErr(err)
// Adding this block still worked, so don't return error, just mark as full // Adding this block still worked, so don't return error, just mark as full
...@@ -247,13 +257,9 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro ...@@ -247,13 +257,9 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro
// Timeout management // Timeout management
// RegisterL1Block should be called whenever a new L1-block is seen. // Timeout returns the block number of the channel timeout. If no timeout is set it returns 0
// func (c *ChannelBuilder) Timeout() uint64 {
// It ensures proper tracking of all possible timeouts (max channel duration, return c.timeout
// close to consensus channel timeout, close to end of sequencing window).
func (c *ChannelBuilder) RegisterL1Block(l1BlockNum uint64) {
c.updateDurationTimeout(l1BlockNum)
c.checkTimeout(l1BlockNum)
} }
// FramePublished should be called whenever a frame of this channel got // FramePublished should be called whenever a frame of this channel got
...@@ -298,10 +304,10 @@ func (c *ChannelBuilder) updateTimeout(timeoutBlockNum uint64, reason error) { ...@@ -298,10 +304,10 @@ func (c *ChannelBuilder) updateTimeout(timeoutBlockNum uint64, reason error) {
} }
} }
// checkTimeout checks if the channel is timed out at the given block number and // CheckTimeout checks if the channel is timed out at the given block number and
// in this case marks the channel as full, if it wasn't full already. // in this case marks the channel as full, if it wasn't full already.
func (c *ChannelBuilder) checkTimeout(blockNum uint64) { func (c *ChannelBuilder) CheckTimeout(l1BlockNum uint64) {
if !c.IsFull() && c.TimedOut(blockNum) { if !c.IsFull() && c.TimedOut(l1BlockNum) {
c.setFullErr(c.timeoutReason) c.setFullErr(c.timeoutReason)
} }
} }
......
This diff is collapsed.
...@@ -33,6 +33,8 @@ type channelManager struct { ...@@ -33,6 +33,8 @@ type channelManager struct {
// All blocks since the last request for new tx data. // All blocks since the last request for new tx data.
blocks []*types.Block blocks []*types.Block
// The latest L1 block from all the L2 blocks in the most recently closed channel
l1OriginLastClosedChannel eth.BlockID
// last block hash - for reorg detection // last block hash - for reorg detection
tip common.Hash tip common.Hash
...@@ -59,11 +61,12 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, ...@@ -59,11 +61,12 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig,
// Clear clears the entire state of the channel manager. // Clear clears the entire state of the channel manager.
// It is intended to be used before launching op-batcher and after an L2 reorg. // It is intended to be used before launching op-batcher and after an L2 reorg.
func (s *channelManager) Clear() { func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.log.Trace("clearing channel manager state") s.log.Trace("clearing channel manager state")
s.blocks = s.blocks[:0] s.blocks = s.blocks[:0]
s.l1OriginLastClosedChannel = l1OriginLastClosedChannel
s.tip = common.Hash{} s.tip = common.Hash{}
s.closed = false s.closed = false
s.currentChannel = nil s.currentChannel = nil
...@@ -200,15 +203,19 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { ...@@ -200,15 +203,19 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil return nil
} }
pc, err := newChannel(s.log, s.metr, s.cfg, s.rollupCfg) pc, err := newChannel(s.log, s.metr, s.cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)
if err != nil { if err != nil {
return fmt.Errorf("creating new channel: %w", err) return fmt.Errorf("creating new channel: %w", err)
} }
s.currentChannel = pc s.currentChannel = pc
s.channelQueue = append(s.channelQueue, pc) s.channelQueue = append(s.channelQueue, pc)
s.log.Info("Created channel", s.log.Info("Created channel",
"id", pc.ID(), "id", pc.ID(),
"l1Head", l1Head, "l1Head", l1Head,
"l1OriginLastClosedChannel", s.l1OriginLastClosedChannel,
"blocks_pending", len(s.blocks), "blocks_pending", len(s.blocks),
"batch_type", s.cfg.BatchType, "batch_type", s.cfg.BatchType,
"max_frame_size", s.cfg.MaxFrameSize, "max_frame_size", s.cfg.MaxFrameSize,
...@@ -220,7 +227,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { ...@@ -220,7 +227,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
// registerL1Block registers the given block at the pending channel. // registerL1Block registers the given block at the pending channel.
func (s *channelManager) registerL1Block(l1Head eth.BlockID) { func (s *channelManager) registerL1Block(l1Head eth.BlockID) {
s.currentChannel.RegisterL1Block(l1Head.Number) s.currentChannel.CheckTimeout(l1Head.Number)
s.log.Debug("new L1-block registered at channel builder", s.log.Debug("new L1-block registered at channel builder",
"l1Head", l1Head, "l1Head", l1Head,
"channel_full", s.currentChannel.IsFull(), "channel_full", s.currentChannel.IsFull(),
...@@ -286,6 +293,11 @@ func (s *channelManager) outputFrames() error { ...@@ -286,6 +293,11 @@ func (s *channelManager) outputFrames() error {
return nil return nil
} }
lastClosedL1Origin := s.currentChannel.LatestL1Origin()
if lastClosedL1Origin.Number > s.l1OriginLastClosedChannel.Number {
s.l1OriginLastClosedChannel = lastClosedL1Origin
}
inBytes, outBytes := s.currentChannel.InputBytes(), s.currentChannel.OutputBytes() inBytes, outBytes := s.currentChannel.InputBytes(), s.currentChannel.OutputBytes()
s.metr.RecordChannelClosed( s.metr.RecordChannelClosed(
s.currentChannel.ID(), s.currentChannel.ID(),
...@@ -300,14 +312,17 @@ func (s *channelManager) outputFrames() error { ...@@ -300,14 +312,17 @@ func (s *channelManager) outputFrames() error {
if inBytes > 0 { if inBytes > 0 {
comprRatio = float64(outBytes) / float64(inBytes) comprRatio = float64(outBytes) / float64(inBytes)
} }
s.log.Info("Channel closed", s.log.Info("Channel closed",
"id", s.currentChannel.ID(), "id", s.currentChannel.ID(),
"blocks_pending", len(s.blocks), "blocks_pending", len(s.blocks),
"num_frames", s.currentChannel.TotalFrames(), "num_frames", s.currentChannel.TotalFrames(),
"input_bytes", inBytes, "input_bytes", inBytes,
"output_bytes", outBytes, "output_bytes", outBytes,
"l1_origin", lastClosedL1Origin,
"full_reason", s.currentChannel.FullErr(), "full_reason", s.currentChannel.FullErr(),
"compr_ratio", comprRatio, "compr_ratio", comprRatio,
"latest_l1_origin", s.l1OriginLastClosedChannel,
) )
return nil return nil
} }
......
...@@ -54,7 +54,7 @@ func TestChannelManagerBatchType(t *testing.T) { ...@@ -54,7 +54,7 @@ func TestChannelManagerBatchType(t *testing.T) {
func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) { func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) {
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{BatchType: batchType}, &rollup.Config{}) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{BatchType: batchType}, &rollup.Config{})
m.Clear() m.Clear(eth.BlockID{})
a := types.NewBlock(&types.Header{ a := types.NewBlock(&types.Header{
Number: big.NewInt(0), Number: big.NewInt(0),
...@@ -96,7 +96,7 @@ func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) { ...@@ -96,7 +96,7 @@ func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) {
}, },
&rollup.Config{}, &rollup.Config{},
) )
m.Clear() m.Clear(eth.BlockID{})
a := newMiniL2Block(0) a := newMiniL2Block(0)
x := newMiniL2BlockWithNumberParent(0, big.NewInt(1), common.Hash{0xff}) x := newMiniL2BlockWithNumberParent(0, big.NewInt(1), common.Hash{0xff})
...@@ -138,12 +138,13 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { ...@@ -138,12 +138,13 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
// Channel Manager state should be empty by default // Channel Manager state should be empty by default
require.Empty(m.blocks) require.Empty(m.blocks)
require.Equal(eth.BlockID{}, m.l1OriginLastClosedChannel)
require.Equal(common.Hash{}, m.tip) require.Equal(common.Hash{}, m.tip)
require.Nil(m.currentChannel) require.Nil(m.currentChannel)
require.Empty(m.channelQueue) require.Empty(m.channelQueue)
require.Empty(m.txChannels) require.Empty(m.txChannels)
// Set the last block // Set the last block
m.Clear() m.Clear(eth.BlockID{})
// Add a block to the channel manager // Add a block to the channel manager
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID) a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
...@@ -165,9 +166,10 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { ...@@ -165,9 +166,10 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
// the list // the list
require.NoError(m.processBlocks()) require.NoError(m.processBlocks())
require.NoError(m.currentChannel.channelBuilder.co.Flush()) require.NoError(m.currentChannel.channelBuilder.co.Flush())
require.NoError(m.currentChannel.OutputFrames()) require.NoError(m.outputFrames())
_, err := m.nextTxData(m.currentChannel) _, err := m.nextTxData(m.currentChannel)
require.NoError(err) require.NoError(err)
require.NotNil(m.l1OriginLastClosedChannel)
require.Len(m.blocks, 0) require.Len(m.blocks, 0)
require.Equal(newL1Tip, m.tip) require.Equal(newL1Tip, m.tip)
require.Len(m.currentChannel.pendingTransactions, 1) require.Len(m.currentChannel.pendingTransactions, 1)
...@@ -182,11 +184,15 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { ...@@ -182,11 +184,15 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
require.Len(m.blocks, 1) require.Len(m.blocks, 1)
require.Equal(b.Hash(), m.tip) require.Equal(b.Hash(), m.tip)
safeL1Origin := eth.BlockID{
Number: 123,
}
// Clear the channel manager // Clear the channel manager
m.Clear() m.Clear(safeL1Origin)
// Check that the entire channel manager state cleared // Check that the entire channel manager state cleared
require.Empty(m.blocks) require.Empty(m.blocks)
require.Equal(uint64(123), m.l1OriginLastClosedChannel.Number)
require.Equal(common.Hash{}, m.tip) require.Equal(common.Hash{}, m.tip)
require.Nil(m.currentChannel) require.Nil(m.currentChannel)
require.Empty(m.channelQueue) require.Empty(m.channelQueue)
...@@ -209,7 +215,7 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) { ...@@ -209,7 +215,7 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) {
}, },
&defaultTestRollupConfig, &defaultTestRollupConfig,
) )
m.Clear() m.Clear(eth.BlockID{})
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID) a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
...@@ -257,7 +263,7 @@ func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) { ...@@ -257,7 +263,7 @@ func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) {
}, },
&defaultTestRollupConfig, &defaultTestRollupConfig,
) )
m.Clear() m.Clear(eth.BlockID{})
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID) a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
...@@ -289,7 +295,7 @@ func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) { ...@@ -289,7 +295,7 @@ func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) {
}, },
&defaultTestRollupConfig, &defaultTestRollupConfig,
) )
m.Clear() m.Clear(eth.BlockID{})
a := newMiniL2Block(0) a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash()) b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
...@@ -335,7 +341,7 @@ func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) { ...@@ -335,7 +341,7 @@ func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
}, },
&defaultTestRollupConfig, &defaultTestRollupConfig,
) )
m.Clear() m.Clear(eth.BlockID{})
numTx := 20 // Adjust number of txs to make 2 frames numTx := 20 // Adjust number of txs to make 2 frames
a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
...@@ -394,7 +400,7 @@ func TestChannelManager_Close_PartiallyPendingChannel(t *testing.T) { ...@@ -394,7 +400,7 @@ func TestChannelManager_Close_PartiallyPendingChannel(t *testing.T) {
}, },
&defaultTestRollupConfig, &defaultTestRollupConfig,
) )
m.Clear() m.Clear(eth.BlockID{})
numTx := 3 // Adjust number of txs to make 2 frames numTx := 3 // Adjust number of txs to make 2 frames
a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
...@@ -454,7 +460,7 @@ func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) { ...@@ -454,7 +460,7 @@ func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
BatchType: batchType, BatchType: batchType,
}, &defaultTestRollupConfig, }, &defaultTestRollupConfig,
) )
m.Clear() m.Clear(eth.BlockID{})
a := derivetest.RandomL2BlockWithChainId(rng, 50000, defaultTestRollupConfig.L2ChainID) a := derivetest.RandomL2BlockWithChainId(rng, 50000, defaultTestRollupConfig.L2ChainID)
...@@ -478,3 +484,53 @@ func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) { ...@@ -478,3 +484,53 @@ func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
_, err = m.TxData(eth.BlockID{}) _, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data") require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
} }
func TestChannelManager_ChannelCreation(t *testing.T) {
l := testlog.Logger(t, log.LevelCrit)
maxChannelDuration := uint64(15)
cfg := ChannelConfig{
MaxChannelDuration: maxChannelDuration,
MaxFrameSize: 1000,
CompressorConfig: compressor.Config{
TargetNumFrames: 100,
TargetFrameSize: 1000,
ApproxComprRatio: 1.0,
},
}
for _, tt := range []struct {
name string
safeL1Block eth.BlockID
expectedChannelTimeout uint64
}{
{
name: "UseSafeHeadWhenNoLastL1Block",
safeL1Block: eth.BlockID{
Number: uint64(123),
},
// Safe head + maxChannelDuration
expectedChannelTimeout: 123 + maxChannelDuration,
},
{
name: "NoLastL1BlockNoSafeL1Block",
safeL1Block: eth.BlockID{
Number: 0,
},
// No timeout
expectedChannelTimeout: 0 + maxChannelDuration,
},
} {
test := tt
t.Run(test.name, func(t *testing.T) {
m := NewChannelManager(l, metrics.NoopMetrics, cfg, &defaultTestRollupConfig)
m.l1OriginLastClosedChannel = test.safeL1Block
require.Nil(t, m.currentChannel)
require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))
require.NotNil(t, m.currentChannel)
require.Equal(t, test.expectedChannelTimeout, m.currentChannel.Timeout())
})
}
}
...@@ -31,7 +31,7 @@ func TestChannelTimeout(t *testing.T) { ...@@ -31,7 +31,7 @@ func TestChannelTimeout(t *testing.T) {
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
ChannelTimeout: 100, ChannelTimeout: 100,
}, &rollup.Config{}) }, &rollup.Config{})
m.Clear() m.Clear(eth.BlockID{})
// Pending channel is nil so is cannot be timed out // Pending channel is nil so is cannot be timed out
require.Nil(t, m.currentChannel) require.Nil(t, m.currentChannel)
...@@ -73,7 +73,7 @@ func TestChannelTimeout(t *testing.T) { ...@@ -73,7 +73,7 @@ func TestChannelTimeout(t *testing.T) {
func TestChannelManager_NextTxData(t *testing.T) { func TestChannelManager_NextTxData(t *testing.T) {
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{}) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear() m.Clear(eth.BlockID{})
// Nil pending channel should return EOF // Nil pending channel should return EOF
returnedTxData, err := m.nextTxData(nil) returnedTxData, err := m.nextTxData(nil)
...@@ -121,7 +121,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) { ...@@ -121,7 +121,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
CompressorConfig: compressor.Config{ CompressorConfig: compressor.Config{
TargetNumFrames: n, TargetNumFrames: n,
}, },
}, &rollup.Config{}) }, &rollup.Config{}, latestL1BlockOrigin)
require.NoError(err) require.NoError(err)
chID := ch.ID() chID := ch.ID()
...@@ -161,7 +161,7 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) { ...@@ -161,7 +161,7 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) {
CompressorConfig: compressor.Config{ CompressorConfig: compressor.Config{
TargetNumFrames: n, TargetNumFrames: n,
}, },
}, &rollup.Config{}) }, &rollup.Config{}, latestL1BlockOrigin)
require.NoError(err) require.NoError(err)
chID := ch.ID() chID := ch.ID()
...@@ -208,7 +208,7 @@ func TestChannelTxConfirmed(t *testing.T) { ...@@ -208,7 +208,7 @@ func TestChannelTxConfirmed(t *testing.T) {
// clearing confirmed transactions, and resetting the pendingChannels map // clearing confirmed transactions, and resetting the pendingChannels map
ChannelTimeout: 10, ChannelTimeout: 10,
}, &rollup.Config{}) }, &rollup.Config{})
m.Clear() m.Clear(eth.BlockID{})
// Let's add a valid pending transaction to the channel manager // Let's add a valid pending transaction to the channel manager
// So we can demonstrate that TxConfirmed's correctness // So we can demonstrate that TxConfirmed's correctness
...@@ -257,7 +257,7 @@ func TestChannelTxFailed(t *testing.T) { ...@@ -257,7 +257,7 @@ func TestChannelTxFailed(t *testing.T) {
// Create a channel manager // Create a channel manager
log := testlog.Logger(t, log.LevelCrit) log := testlog.Logger(t, log.LevelCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{}) m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear() m.Clear(eth.BlockID{})
// Let's add a valid pending transaction to the channel // Let's add a valid pending transaction to the channel
// manager so we can demonstrate correctness // manager so we can demonstrate correctness
......
...@@ -10,10 +10,6 @@ import ( ...@@ -10,10 +10,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -21,6 +17,9 @@ import ( ...@@ -21,6 +17,9 @@ import (
"github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
) )
var ErrBatcherNotRunning = errors.New("batcher is not running") var ErrBatcherNotRunning = errors.New("batcher is not running")
...@@ -93,7 +92,7 @@ func (l *BatchSubmitter) StartBatchSubmitting() error { ...@@ -93,7 +92,7 @@ func (l *BatchSubmitter) StartBatchSubmitting() error {
l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background()) l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background())
l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background()) l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background())
l.state.Clear() l.clearState(l.shutdownCtx)
l.lastStoredBlock = eth.BlockID{} l.lastStoredBlock = eth.BlockID{}
l.wg.Add(1) l.wg.Add(1)
...@@ -299,7 +298,7 @@ func (l *BatchSubmitter) loop() { ...@@ -299,7 +298,7 @@ func (l *BatchSubmitter) loop() {
// on reorg we want to publish all pending state then wait until each result clears before resetting // on reorg we want to publish all pending state then wait until each result clears before resetting
// the state. // the state.
publishAndWait() publishAndWait()
l.state.Clear() l.clearState(l.shutdownCtx)
continue continue
} }
l.publishStateToL1(queue, receiptsCh) l.publishStateToL1(queue, receiptsCh)
...@@ -344,7 +343,46 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txData], receiptsCh ...@@ -344,7 +343,46 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txData], receiptsCh
} }
} }
// publishTxToL1 queues a single tx to be published to the L1 // clearState clears the state of the channel manager
func (l *BatchSubmitter) clearState(ctx context.Context) {
l.Log.Info("Clearing state")
defer l.Log.Info("State cleared")
clearStateWithL1Origin := func() bool {
l1SafeOrigin, err := l.safeL1Origin(ctx)
if err != nil {
l.Log.Warn("Failed to query L1 safe origin, will retry", "err", err)
return false
} else {
l.Log.Info("Clearing state with safe L1 origin", "origin", l1SafeOrigin)
l.state.Clear(l1SafeOrigin)
return true
}
}
// Attempt to set the L1 safe origin and clear the state, if fetching fails -- fall through to an infinite retry
if clearStateWithL1Origin() {
return
}
tick := time.NewTicker(5 * time.Second)
defer tick.Stop()
for {
select {
case <-tick.C:
if clearStateWithL1Origin() {
return
}
case <-ctx.Done():
l.Log.Warn("Clearing state cancelled")
l.state.Clear(eth.BlockID{})
return
}
}
}
// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error { func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
// send all available transactions // send all available transactions
l1tip, err := l.l1Tip(ctx) l1tip, err := l.l1Tip(ctx)
...@@ -356,6 +394,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t ...@@ -356,6 +394,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
// Collect next transaction data // Collect next transaction data
txdata, err := l.state.TxData(l1tip.ID()) txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF { if err == io.EOF {
l.Log.Trace("no transaction data available") l.Log.Trace("no transaction data available")
return err return err
...@@ -370,6 +409,30 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t ...@@ -370,6 +409,30 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
return nil return nil
} }
func (l *BatchSubmitter) safeL1Origin(ctx context.Context) (eth.BlockID, error) {
ctx, cancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
defer cancel()
c, err := l.EndpointProvider.RollupClient(ctx)
if err != nil {
log.Error("Failed to get rollup client", "err", err)
return eth.BlockID{}, fmt.Errorf("safe l1 origin: error getting rollup client: %w", err)
}
status, err := c.SyncStatus(ctx)
if err != nil {
log.Error("Failed to get sync status", "err", err)
return eth.BlockID{}, fmt.Errorf("safe l1 origin: error getting sync status: %w", err)
}
// If the safe L2 block origin is 0, we are at the genesis block and should use the L1 origin from the rollup config.
if status.SafeL2.L1Origin.Number == 0 {
return l.RollupConfig.Genesis.L1, nil
}
return status.SafeL2.L1Origin, nil
}
// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`. // sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// The method will block if the queue's MaxPendingTransactions is exceeded. // The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error { func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
......
package batcher
import (
"context"
"errors"
"testing"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
type mockL2EndpointProvider struct {
ethClient *testutils.MockL2Client
ethClientErr error
rollupClient *testutils.MockRollupClient
rollupClientErr error
}
func newEndpointProvider() *mockL2EndpointProvider {
return &mockL2EndpointProvider{
ethClient: new(testutils.MockL2Client),
rollupClient: new(testutils.MockRollupClient),
}
}
func (p *mockL2EndpointProvider) EthClient(context.Context) (dial.EthClientInterface, error) {
return p.ethClient, p.ethClientErr
}
func (p *mockL2EndpointProvider) RollupClient(context.Context) (dial.RollupClientInterface, error) {
return p.rollupClient, p.rollupClientErr
}
func (p *mockL2EndpointProvider) Close() {}
const genesisL1Origin = uint64(123)
func setup(t *testing.T) (*BatchSubmitter, *mockL2EndpointProvider) {
ep := newEndpointProvider()
cfg := defaultTestRollupConfig
cfg.Genesis.L1.Number = genesisL1Origin
return NewBatchSubmitter(DriverSetup{
Log: testlog.Logger(t, log.LevelDebug),
Metr: metrics.NoopMetrics,
RollupConfig: &cfg,
EndpointProvider: ep,
}), ep
}
func TestBatchSubmitter_SafeL1Origin(t *testing.T) {
bs, ep := setup(t)
tests := []struct {
name string
currentSafeOrigin uint64
failsToFetchSyncStatus bool
expectResult uint64
expectErr bool
}{
{
name: "ExistingSafeL1Origin",
currentSafeOrigin: 999,
expectResult: 999,
},
{
name: "NoExistingSafeL1OriginUsesGenesis",
currentSafeOrigin: 0,
expectResult: genesisL1Origin,
},
{
name: "ErrorFetchingSyncStatus",
failsToFetchSyncStatus: true,
expectErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.failsToFetchSyncStatus {
ep.rollupClient.ExpectSyncStatus(&eth.SyncStatus{}, errors.New("failed to fetch sync status"))
} else {
ep.rollupClient.ExpectSyncStatus(&eth.SyncStatus{
SafeL2: eth.L2BlockRef{
L1Origin: eth.BlockID{
Number: tt.currentSafeOrigin,
},
},
}, nil)
}
id, err := bs.safeL1Origin(context.Background())
if tt.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tt.expectResult, id.Number)
}
})
}
}
func TestBatchSubmitter_SafeL1Origin_FailsToResolveRollupClient(t *testing.T) {
bs, ep := setup(t)
ep.rollupClientErr = errors.New("failed to resolve rollup client")
_, err := bs.safeL1Origin(context.Background())
require.Error(t, err)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment