Commit acc22d76 authored by Brian Bland's avatar Brian Bland

Address PR feedback, improve context naming and simplify selects

parent decf9825
...@@ -358,8 +358,9 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) ...@@ -358,8 +358,9 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo)
// Close closes the current pending channel, if one exists, and prevents the // Close closes the current pending channel, if one exists, and prevents the
// creation of any new channels. // creation of any new channels.
// This ensures that no new frames will be produced, but there may be any number // This ensures that no new blocks will be added to the channel, but there may be any number
// of pending frames produced before this call which should still be published. // of frames still produced by calling `OutputFrames()`, which flushes the compression buffer.
// These frames still need to be published.
func (s *channelManager) Close() { func (s *channelManager) Close() {
if s.closed { if s.closed {
return return
......
...@@ -27,12 +27,11 @@ type BatchSubmitter struct { ...@@ -27,12 +27,11 @@ type BatchSubmitter struct {
txMgr txmgr.TxManager txMgr txmgr.TxManager
wg sync.WaitGroup wg sync.WaitGroup
done chan struct{}
loadCtx context.Context shutdownCtx context.Context
cancelLoad context.CancelFunc cancelShutdownCtx context.CancelFunc
txCtx context.Context killCtx context.Context
cancelTx context.CancelFunc cancelKillCtx context.CancelFunc
mutex sync.Mutex mutex sync.Mutex
running bool running bool
...@@ -146,9 +145,8 @@ func (l *BatchSubmitter) Start() error { ...@@ -146,9 +145,8 @@ func (l *BatchSubmitter) Start() error {
} }
l.running = true l.running = true
l.done = make(chan struct{}) l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background())
l.loadCtx, l.cancelLoad = context.WithCancel(context.Background()) l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background())
l.txCtx, l.cancelTx = context.WithCancel(context.Background())
l.state.Clear() l.state.Clear()
l.lastStoredBlock = eth.BlockID{} l.lastStoredBlock = eth.BlockID{}
...@@ -175,18 +173,9 @@ func (l *BatchSubmitter) Stop(ctx context.Context) error { ...@@ -175,18 +173,9 @@ func (l *BatchSubmitter) Stop(ctx context.Context) error {
} }
l.running = false l.running = false
// go routine will call cancelTx() if the passed in ctx is ever Done l.cancelShutdownCtx()
cancelTx := l.cancelTx
wrapped, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-wrapped.Done()
cancelTx()
}()
l.cancelLoad()
close(l.done)
l.wg.Wait() l.wg.Wait()
l.cancelKillCtx()
l.log.Info("Batch Submitter stopped") l.log.Info("Batch Submitter stopped")
...@@ -302,18 +291,10 @@ func (l *BatchSubmitter) loop() { ...@@ -302,18 +291,10 @@ func (l *BatchSubmitter) loop() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
// prioritize the `done` condition over the ticker, even though select ordering is randomized l.loadBlocksIntoState(l.shutdownCtx)
select { l.publishStateToL1(l.killCtx)
case <-l.done: case <-l.shutdownCtx.Done():
l.publishStateToL1(l.txCtx) l.publishStateToL1(l.killCtx)
return
default:
}
l.loadBlocksIntoState(l.loadCtx)
l.publishStateToL1(l.txCtx)
case <-l.done:
l.publishStateToL1(l.txCtx)
return return
} }
} }
...@@ -328,7 +309,7 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { ...@@ -328,7 +309,7 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
l.state.Close() l.state.Close()
case <-l.done: case <-l.shutdownCtx.Done():
l.state.Close() l.state.Close()
default: default:
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment