Commit acdd051d authored by Brian Bland's avatar Brian Bland

Immediately flush channel builder on channel manager close

parent 8b87c2d2
...@@ -187,8 +187,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { ...@@ -187,8 +187,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame() dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks)) s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))
// Short circuit if there is a pending frame. // Short circuit if there is a pending frame or the channel manager is closed.
if dataPending { if dataPending || s.closed {
return s.nextTxData() return s.nextTxData()
} }
...@@ -203,11 +203,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { ...@@ -203,11 +203,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return txData{}, err return txData{}, err
} }
// Avoid processing blocks if the channel manager has been explicitly closed. if err := s.processBlocks(); err != nil {
if !s.closed { return txData{}, err
if err := s.processBlocks(); err != nil {
return txData{}, err
}
} }
// Register current L1 head only after all pending blocks have been // Register current L1 head only after all pending blocks have been
...@@ -356,14 +353,12 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) ...@@ -356,14 +353,12 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo)
} }
} }
// Close closes the current pending channel, if one exists, and prevents the // Close closes the current pending channel, if one exists, outputs any remaining frames,
// creation of any new channels. // and prevents the creation of any new channels.
// This ensures that no new blocks will be added to the channel, but there may be any number // Any outputted frames still need to be published.
// of frames still produced by calling `OutputFrames()`, which flushes the compression buffer. func (s *channelManager) Close() error {
// These frames still need to be published.
func (s *channelManager) Close() {
if s.closed { if s.closed {
return return nil
} }
s.closed = true s.closed = true
...@@ -374,8 +369,10 @@ func (s *channelManager) Close() { ...@@ -374,8 +369,10 @@ func (s *channelManager) Close() {
} }
if s.pendingChannel == nil { if s.pendingChannel == nil {
return return nil
} }
s.pendingChannel.Close() s.pendingChannel.Close()
return s.outputFrames()
} }
...@@ -317,9 +317,15 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { ...@@ -317,9 +317,15 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
// produced. Any remaining frames must still be published to the L1 to prevent stalling. // produced. Any remaining frames must still be published to the L1 to prevent stalling.
select { select {
case <-ctx.Done(): case <-ctx.Done():
l.state.Close() err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
}
case <-l.shutdownCtx.Done(): case <-l.shutdownCtx.Done():
l.state.Close() err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
}
default: default:
} }
......
...@@ -352,7 +352,9 @@ func TestMigration(t *testing.T) { ...@@ -352,7 +352,9 @@ func TestMigration(t *testing.T) {
}, lgr.New("module", "batcher"), batchermetrics.NoopMetrics) }, lgr.New("module", "batcher"), batchermetrics.NoopMetrics)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
batcher.StopIfRunning(context.Background()) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
batcher.StopIfRunning(ctx)
}) })
proposer, err := l2os.NewL2OutputSubmitterFromCLIConfig(l2os.CLIConfig{ proposer, err := l2os.NewL2OutputSubmitterFromCLIConfig(l2os.CLIConfig{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment