Commit c4a679cb authored by Brian Bland's avatar Brian Bland

Gracefully shut down the BatchSubmitter, closing current channel

parent c6c9b864
......@@ -18,6 +18,7 @@ var (
ErrMaxDurationReached = errors.New("max channel duration reached")
ErrChannelTimeoutClose = errors.New("close to channel timeout")
ErrSeqWindowClose = errors.New("close to sequencer window timeout")
ErrTerminated = errors.New("channel terminated")
)
type ChannelFullError struct {
......@@ -188,7 +189,7 @@ func (c *channelBuilder) Reset() error {
}
// AddBlock adds a block to the channel compression pipeline. IsFull should be
// called aftewards to test whether the channel is full. If full, a new channel
// called afterwards to test whether the channel is full. If full, a new channel
// must be started.
//
// AddBlock returns a ChannelFullError if called even though the channel is
......@@ -314,9 +315,10 @@ func (c *channelBuilder) IsFull() bool {
// would have been exceeded by the latest AddBlock call,
// - ErrMaxFrameIndex if the maximum number of frames has been generated
// (uint16),
// - ErrMaxDurationReached if the max channel duration got reached.
// - ErrChannelTimeoutClose if the consensus channel timeout got too close.
// - ErrSeqWindowClose if the end of the sequencer window got too close.
// - ErrMaxDurationReached if the max channel duration got reached,
// - ErrChannelTimeoutClose if the consensus channel timeout got too close,
// - ErrSeqWindowClose if the end of the sequencer window got too close,
// - ErrTerminated if the channel was explicitly terminated.
func (c *channelBuilder) FullErr() error {
return c.fullErr
}
......@@ -402,6 +404,16 @@ func (c *channelBuilder) outputFrame() error {
return err // possibly io.EOF (last frame)
}
// Close immediately marks the channel as full with an ErrTerminated
// if the channel is not already full. This ensures that no additional
// frames will be added to the channel.
func (c *channelBuilder) Close() error {
if !c.IsFull() {
c.setFullErr(ErrTerminated)
}
return c.FullErr()
}
// HasFrame returns whether there's any available frame. If true, it can be
// popped using NextFrame().
//
......
......@@ -344,3 +344,13 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo)
SequenceNumber: l1info.SequenceNumber,
}
}
// CloseCurrentChannel closes the current pending channel, if one exists.
// This ensures that no new frames will be produced, but there still may be any
// number of pending frames produced before this call.
func (s *channelManager) CloseCurrentChannel() error {
if s.pendingChannel == nil {
return nil
}
return s.pendingChannel.Close()
}
......@@ -145,8 +145,6 @@ func (l *BatchSubmitter) Start() error {
l.running = true
l.done = make(chan struct{})
// TODO: this context only exists because the event loop doesn't reach done
// if the tx manager is blocking forever due to e.g. insufficient balance.
l.ctx, l.cancel = context.WithCancel(context.Background())
l.state.Clear()
l.lastStoredBlock = eth.BlockID{}
......@@ -287,6 +285,9 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
func (l *BatchSubmitter) loop() {
defer l.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ticker := time.NewTicker(l.PollInterval)
defer ticker.Stop()
for {
......@@ -294,9 +295,8 @@ func (l *BatchSubmitter) loop() {
case <-ticker.C:
l.loadBlocksIntoState(l.ctx)
blockLoop:
for {
l1tip, err := l.l1Tip(l.ctx)
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
break
......@@ -320,12 +320,11 @@ func (l *BatchSubmitter) loop() {
l.recordConfirmedTx(txdata.ID(), receipt)
}
// hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending
// from the channel manager rather than sending the channel in a loop. This stalls b/c if the
// context is cancelled while sending, it will never fully clear the pending txns.
// Attempt to gracefully terminate the current channel, ensuring that no new frames will be
// produced. Any remaining frames must still be published to the L1 to prevent stalling.
select {
case <-l.ctx.Done():
break blockLoop
case <-l.done:
l.state.CloseCurrentChannel()
default:
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment