Commit 5d7fa8d2 authored by Hamdi Allam's avatar Hamdi Allam

ensure w.Add is called in the same routine as as w.Done

parent f8b3e028
...@@ -72,39 +72,40 @@ func (i *Indexer) Run(ctx context.Context) error { ...@@ -72,39 +72,40 @@ func (i *Indexer) Run(ctx context.Context) error {
var wg sync.WaitGroup var wg sync.WaitGroup
errCh := make(chan error, 3) errCh := make(chan error, 3)
// If either processor errors out, we stop // if any goroutine halts, we stop the entire indexer
subCtx, cancel := context.WithCancel(ctx) subCtx, cancel := context.WithCancel(ctx)
run := func(start func(ctx context.Context) error) { run := func(start func(ctx context.Context) error) {
wg.Add(1) wg.Add(1)
defer func() { go func() {
if err := recover(); err != nil { defer func() {
i.log.Error("halting indexer on panic", "err", err) if err := recover(); err != nil {
debug.PrintStack() i.log.Error("halting indexer on panic", "err", err)
errCh <- fmt.Errorf("panic: %v", err) debug.PrintStack()
} errCh <- fmt.Errorf("panic: %v", err)
}
cancel()
wg.Done() cancel()
wg.Done()
}()
errCh <- start(subCtx)
}() }()
err := start(subCtx)
if err != nil {
i.log.Error("halting indexer on error", "err", err)
}
// Send a value down regardless if we've received an error
// or halted via cancellation where err == nil
errCh <- err
} }
// Kick off all the dependent routines // Kick off all the dependent routines
go run(i.L1ETL.Start) run(i.L1ETL.Start)
go run(i.L2ETL.Start) run(i.L2ETL.Start)
go run(i.BridgeProcessor.Start) run(i.BridgeProcessor.Start)
wg.Wait()
// Since we wait to receipt of an error
err := <-errCh err := <-errCh
if err != nil {
i.log.Error("indexer stopped", "err", err)
} else {
i.log.Info("indexer stopped")
}
wg.Wait()
i.log.Info("indexer stopped")
return err return err
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment