Commit d0922bff authored by Michael de Hoog's avatar Michael de Hoog Committed by Brian Bland

Allow passing in ctx to Stop

Usa separate contexts for loading L2 blocks and tx submission
parent ae230d21
......@@ -29,8 +29,10 @@ type BatchSubmitter struct {
wg sync.WaitGroup
done chan struct{}
ctx context.Context
cancel context.CancelFunc
loadCtx context.Context
cancelLoad context.CancelFunc
txCtx context.Context
cancelTx context.CancelFunc
mutex sync.Mutex
running bool
......@@ -145,7 +147,8 @@ func (l *BatchSubmitter) Start() error {
l.running = true
l.done = make(chan struct{})
l.ctx, l.cancel = context.WithCancel(context.Background())
l.loadCtx, l.cancelLoad = context.WithCancel(context.Background())
l.txCtx, l.cancelTx = context.WithCancel(context.Background())
l.state.Clear()
l.lastStoredBlock = eth.BlockID{}
......@@ -158,10 +161,10 @@ func (l *BatchSubmitter) Start() error {
}
func (l *BatchSubmitter) StopIfRunning() {
_ = l.Stop()
_ = l.Stop(context.Background())
}
func (l *BatchSubmitter) Stop() error {
func (l *BatchSubmitter) Stop(ctx context.Context) error {
l.log.Info("Stopping Batch Submitter")
l.mutex.Lock()
......@@ -172,7 +175,16 @@ func (l *BatchSubmitter) Stop() error {
}
l.running = false
l.cancel()
// go routine will call cancelTx() if the passed in ctx is ever Done
cancelTx := l.cancelTx
wrapped, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-wrapped.Done()
cancelTx()
}()
l.cancelLoad()
close(l.done)
l.wg.Wait()
......@@ -285,9 +297,6 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.
func (l *BatchSubmitter) loop() {
defer l.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ticker := time.NewTicker(l.PollInterval)
defer ticker.Stop()
for {
......@@ -296,15 +305,15 @@ func (l *BatchSubmitter) loop() {
// prioritize the `done` condition over the ticker, even though select ordering is randomized
select {
case <-l.done:
l.publishStateToL1(ctx)
l.publishStateToL1(l.txCtx)
return
default:
}
l.loadBlocksIntoState(l.ctx)
l.publishStateToL1(ctx)
l.loadBlocksIntoState(l.loadCtx)
l.publishStateToL1(l.txCtx)
case <-l.done:
l.publishStateToL1(ctx)
l.publishStateToL1(l.txCtx)
return
}
}
......@@ -317,6 +326,8 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
// Attempt to gracefully terminate the current channel, ensuring that no new frames will be
// produced. Any remaining frames must still be published to the L1 to prevent stalling.
select {
case <-ctx.Done():
l.state.Close()
case <-l.done:
l.state.Close()
default:
......
......@@ -6,7 +6,7 @@ import (
type batcherClient interface {
Start() error
Stop() error
Stop(ctx context.Context) error
}
type adminAPI struct {
......@@ -23,6 +23,6 @@ func (a *adminAPI) StartBatcher(_ context.Context) error {
return a.b.Start()
}
func (a *adminAPI) StopBatcher(_ context.Context) error {
return a.b.Stop()
func (a *adminAPI) StopBatcher(ctx context.Context) error {
return a.b.Stop(ctx)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment