Commit 873b3e0d authored by George Knee's avatar George Knee Committed by GitHub

op-batcher: fix channel duration timeout management (#12916)

* op-batcher: fix channel duration timeout management

Previously, we would use L1 data to help track channel durations. For example, the batcher would be configured to post data every hour. We update a global state variable with the latest l1 origin of a channel when it closed, and compute the deadline for that channel using a duration delta starting at that l1 origin timestamp.

Since we changed the way autoDA switching works, a channel can be _closed_ (due to a duration timeout or other reason) and this will cause the l1 origin state variable to move forward, extending the deadline ready for the next channel. Crucially, with autoDA switching nowadays, the closed channel will not always be submitted on chain (it can be discarded and the blocks requeued). If it is discarded, the channel duration timeout has already been extended.

The fix for this is to update the global state variable at channel submission time, not channel closing time.

* add regression test for channel duration timeouts during requeue
parent 7550853e
......@@ -39,8 +39,9 @@ type channelManager struct {
// All blocks since the last request for new tx data.
blocks queue.Queue[*types.Block]
// The latest L1 block from all the L2 blocks in the most recently closed channel
l1OriginLastClosedChannel eth.BlockID
// The latest L1 block from all the L2 blocks in the most recently submitted channel.
// Used to track channel duration timeouts.
l1OriginLastSubmittedChannel eth.BlockID
// The default ChannelConfig to use for the next channel
defaultCfg ChannelConfig
// last block hash - for reorg detection
......@@ -75,12 +76,12 @@ func (s *channelManager) SetChannelOutFactory(outFactory ChannelOutFactory) {
// Clear clears the entire state of the channel manager.
// It is intended to be used before launching op-batcher and after an L2 reorg.
func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) {
func (s *channelManager) Clear(l1OriginLastSubmittedChannel eth.BlockID) {
s.mu.Lock()
defer s.mu.Unlock()
s.log.Trace("clearing channel manager state")
s.blocks.Clear()
s.l1OriginLastClosedChannel = l1OriginLastClosedChannel
s.l1OriginLastSubmittedChannel = l1OriginLastSubmittedChannel
s.tip = common.Hash{}
s.closed = false
s.currentChannel = nil
......@@ -160,6 +161,12 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
return txData{}, io.EOF // TODO: not enough data error instead
}
tx := channel.NextTxData()
// update s.l1OriginLastSubmittedChannel so that the next
// channel's duration timeout will trigger properly
if channel.LatestL1Origin().Number > s.l1OriginLastSubmittedChannel.Number {
s.l1OriginLastSubmittedChannel = channel.LatestL1Origin()
}
s.txChannels[tx.ID().String()] = channel
return tx, nil
}
......@@ -284,7 +291,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return fmt.Errorf("creating channel out: %w", err)
}
pc := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number, channelOut)
pc := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastSubmittedChannel.Number, channelOut)
s.currentChannel = pc
s.channelQueue = append(s.channelQueue, pc)
......@@ -292,7 +299,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
s.log.Info("Created channel",
"id", pc.ID(),
"l1Head", l1Head,
"l1OriginLastClosedChannel", s.l1OriginLastClosedChannel,
"l1OriginLastSubmittedChannel", s.l1OriginLastSubmittedChannel,
"blocks_pending", s.blocks.Len(),
"batch_type", cfg.BatchType,
"compression_algo", cfg.CompressorConfig.CompressionAlgo,
......@@ -374,11 +381,6 @@ func (s *channelManager) outputFrames() error {
return nil
}
lastClosedL1Origin := s.currentChannel.LatestL1Origin()
if lastClosedL1Origin.Number > s.l1OriginLastClosedChannel.Number {
s.l1OriginLastClosedChannel = lastClosedL1Origin
}
inBytes, outBytes := s.currentChannel.InputBytes(), s.currentChannel.OutputBytes()
s.metr.RecordChannelClosed(
s.currentChannel.ID(),
......@@ -401,12 +403,11 @@ func (s *channelManager) outputFrames() error {
"input_bytes", inBytes,
"output_bytes", outBytes,
"oldest_l1_origin", s.currentChannel.OldestL1Origin(),
"l1_origin", lastClosedL1Origin,
"l1_origin", s.currentChannel.LatestL1Origin(),
"oldest_l2", s.currentChannel.OldestL2(),
"latest_l2", s.currentChannel.LatestL2(),
"full_reason", s.currentChannel.FullErr(),
"compr_ratio", comprRatio,
"latest_l1_origin", s.l1OriginLastClosedChannel,
)
return nil
}
......
......@@ -130,7 +130,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
// Channel Manager state should be empty by default
require.Empty(m.blocks)
require.Equal(eth.BlockID{}, m.l1OriginLastClosedChannel)
require.Equal(eth.BlockID{}, m.l1OriginLastSubmittedChannel)
require.Equal(common.Hash{}, m.tip)
require.Nil(m.currentChannel)
require.Empty(m.channelQueue)
......@@ -161,8 +161,8 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
require.NoError(m.outputFrames())
_, err := m.nextTxData(m.currentChannel)
require.NoError(err)
require.NotNil(m.l1OriginLastClosedChannel)
require.Len(m.blocks, 0)
require.NotNil(m.l1OriginLastSubmittedChannel)
require.Equal(newL1Tip, m.tip)
require.Len(m.currentChannel.pendingTransactions, 1)
......@@ -184,7 +184,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) {
// Check that the entire channel manager state cleared
require.Empty(m.blocks)
require.Equal(uint64(123), m.l1OriginLastClosedChannel.Number)
require.Equal(uint64(123), m.l1OriginLastSubmittedChannel.Number)
require.Equal(common.Hash{}, m.tip)
require.Nil(m.currentChannel)
require.Empty(m.channelQueue)
......@@ -475,7 +475,7 @@ func TestChannelManager_ChannelCreation(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
m.l1OriginLastClosedChannel = test.safeL1Block
m.l1OriginLastSubmittedChannel = test.safeL1Block
require.Nil(t, m.currentChannel)
require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))
......@@ -639,6 +639,8 @@ func TestChannelManager_Requeue(t *testing.T) {
// Assert that at least one block was processed into the channel
require.NotContains(t, m.blocks, blockA)
l1OriginBeforeRequeue := m.l1OriginLastSubmittedChannel
// Call the function we are testing
m.Requeue(m.defaultCfg)
......@@ -646,6 +648,12 @@ func TestChannelManager_Requeue(t *testing.T) {
require.Equal(t, m.blocks, stateSnapshot)
require.Empty(t, m.channelQueue)
// Ensure the l1OridingLastSubmittedChannel was
// not changed. This ensures the next channel
// has its duration timeout deadline computed
// properly.
require.Equal(t, l1OriginBeforeRequeue, m.l1OriginLastSubmittedChannel)
// Trigger the blocks -> channelQueue data pipelining again
require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))
require.NotEmpty(t, m.channelQueue)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment