Commit 67e21e8c authored by Michael de Hoog's avatar Michael de Hoog

Cleanup

parent a34a46aa
...@@ -300,6 +300,14 @@ func (l *BatchSubmitter) loop() { ...@@ -300,6 +300,14 @@ func (l *BatchSubmitter) loop() {
publishTicker := time.NewTicker(100 * time.Millisecond) publishTicker := time.NewTicker(100 * time.Millisecond)
defer publishTicker.Stop() defer publishTicker.Stop()
receiptsCh := make(chan txReceipt) receiptsCh := make(chan txReceipt)
receiptHandler := func(r txReceipt) {
// Record TX Status
if r.err != nil {
l.recordFailedTx(r.id, r.err)
} else {
l.recordConfirmedTx(r.id, r.receipt)
}
}
for { for {
select { select {
...@@ -307,21 +315,16 @@ func (l *BatchSubmitter) loop() { ...@@ -307,21 +315,16 @@ func (l *BatchSubmitter) loop() {
l.loadBlocksIntoState(l.shutdownCtx) l.loadBlocksIntoState(l.shutdownCtx)
case <-publishTicker.C: case <-publishTicker.C:
_ = l.publishStateToL1(l.killCtx, receiptsCh) _ = l.publishStateToL1(l.killCtx, receiptsCh)
case res := <-receiptsCh: case r := <-receiptsCh:
// Record TX Status receiptHandler(r)
if res.err != nil {
l.recordFailedTx(res.id, res.err)
} else {
l.recordConfirmedTx(res.id, res.receipt)
}
case <-l.shutdownCtx.Done(): case <-l.shutdownCtx.Done():
l.drainState(receiptsCh) l.drainState(receiptsCh, receiptHandler)
return return
} }
} }
} }
func (l *BatchSubmitter) drainState(receiptsCh chan txReceipt) { func (l *BatchSubmitter) drainState(receiptsCh chan txReceipt, receiptHandler func(res txReceipt)) {
err := l.state.Close() err := l.state.Close()
if err != nil { if err != nil {
l.log.Error("error closing the channel manager", "err", err) l.log.Error("error closing the channel manager", "err", err)
...@@ -343,27 +346,23 @@ func (l *BatchSubmitter) drainState(receiptsCh chan txReceipt) { ...@@ -343,27 +346,23 @@ func (l *BatchSubmitter) drainState(receiptsCh chan txReceipt) {
} }
} }
}() }()
var receipts []txReceipt
receiptsDone := make(chan struct{}) txDone := make(chan struct{})
go func() { go func() {
for { // wait for all transactions to complete
select { l.txWg.Wait()
case res := <-receiptsCh: close(txDone)
receipts = append(receipts, res)
case <-receiptsDone:
return
}
}
}() }()
// wait for all transactions to complete // handle the remaining receipts
l.txWg.Wait() for {
close(receiptsDone) select {
// process the receipts case r := <-receiptsCh:
for _, res := range receipts { receiptHandler(r)
if res.err != nil { case <-txDone:
l.recordFailedTx(res.id, res.err) for len(receiptsCh) > 0 {
} else { receiptHandler(<-receiptsCh)
l.recordConfirmedTx(res.id, res.receipt) }
return
} }
} }
} }
......
...@@ -159,7 +159,7 @@ func (m *SimpleTxManager) reset() { ...@@ -159,7 +159,7 @@ func (m *SimpleTxManager) reset() {
return return
} }
close(m.resetChannel()) close(m.getResetChannel())
m.wg.Wait() m.wg.Wait()
m.lock.Lock() m.lock.Lock()
...@@ -170,9 +170,9 @@ func (m *SimpleTxManager) reset() { ...@@ -170,9 +170,9 @@ func (m *SimpleTxManager) reset() {
m.resetting.Store(false) m.resetting.Store(false)
} }
// resetChannel is a thread-safe getter for the channel that is closed upon // getResetChannel is a thread-safe getter for the channel that is closed upon
// transaction manager resets. // transaction manager resets.
func (m *SimpleTxManager) resetChannel() chan struct{} { func (m *SimpleTxManager) getResetChannel() chan struct{} {
m.lock.RLock() m.lock.RLock()
defer m.lock.RUnlock() defer m.lock.RUnlock()
return m.resetC return m.resetC
...@@ -294,7 +294,7 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t ...@@ -294,7 +294,7 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t
ticker := time.NewTicker(m.cfg.ResubmissionTimeout) ticker := time.NewTicker(m.cfg.ResubmissionTimeout)
defer ticker.Stop() defer ticker.Stop()
resetChan := m.resetChannel() resetChan := m.getResetChannel()
bumpCounter := 0 bumpCounter := 0
for { for {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment