Commit c38a81a4 authored by Michael de Hoog's avatar Michael de Hoog

Better receipt and send separation (fix potential deadlock)

parent bb2ccfab
......@@ -295,33 +295,44 @@ func (l *BatchSubmitter) loop() {
select {
case <-ticker.C:
l.loadBlocksIntoState(l.shutdownCtx)
l.publishStateToL1(l.killCtx, queue, receiptsCh)
l.publishStateToL1(queue, receiptsCh, false)
case r := <-receiptsCh:
l.handleReceipt(r)
case <-l.shutdownCtx.Done():
l.drainState(receiptsCh, queue)
return
}
}
}
func (l *BatchSubmitter) drainState(receiptsCh chan txmgr.TxReceipt[txData], queue *txmgr.Queue[txData]) {
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
}
l.publishStateToL1(queue, receiptsCh, true)
return
}
}
}
// publishStateToL1 loops through the block data loaded into `state` and
// submits the associated data to the L1 in the form of channel frames.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData], drain bool) {
txDone := make(chan struct{})
// send/wait and receipt reading must be on a separate goroutines to avoid deadlocks
go func() {
// publish remaining state
l.publishStateToL1(l.killCtx, queue, receiptsCh)
// wait for all transactions to complete
defer func() {
if drain {
// if draining, we wait for all transactions to complete
queue.Wait()
// notify that we're done
}
close(txDone)
}()
for {
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
if err != nil {
if drain && err != io.EOF {
l.log.Error("error sending tx while draining state", "err", err)
}
return
}
}
}()
// drain and handle the remaining receipts
for {
select {
case r := <-receiptsCh:
......@@ -332,15 +343,13 @@ func (l *BatchSubmitter) drainState(receiptsCh chan txmgr.TxReceipt[txData], que
}
}
// publishStateToL1 loops through the block data loaded into `state` and
// submits the associated data to the L1 in the form of channel frames.
func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) {
// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
// send all available transactions
for {
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
return
return err
}
l.recordL1Tip(l1tip)
......@@ -348,14 +357,14 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queu
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
return
return err
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
return
return err
}
l.sendTransaction(txdata, queue, receiptsCh)
}
return nil
}
// sendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
......
......@@ -24,7 +24,6 @@ type Queue[T any] struct {
txMgr TxManager
maxPending uint64
pendingChanged func(uint64)
receiptWg sync.WaitGroup
pending atomic.Uint64
groupLock sync.Mutex
groupCtx context.Context
......@@ -50,7 +49,6 @@ func NewQueue[T any](ctx context.Context, txMgr TxManager, maxPending uint64, pe
// Wait waits for all pending txs to complete (or fail).
func (q *Queue[T]) Wait() {
q.receiptWg.Wait()
if q.group == nil {
return
}
......@@ -61,9 +59,9 @@ func (q *Queue[T]) Wait() {
// and then send the next tx.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel.
// provided receipt channel .If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) {
q.receiptWg.Add(1)
group, ctx := q.groupContext()
group.Go(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
......@@ -77,18 +75,13 @@ func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]
// transaction is queued and this method returns true.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel.
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func (q *Queue[T]) TrySend(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) bool {
q.receiptWg.Add(1)
group, ctx := q.groupContext()
started := group.TryGo(func() error {
return group.TryGo(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
if !started {
// send didn't start so receipt will never be available
q.receiptWg.Done()
}
return started
}
func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) error {
......@@ -97,15 +90,11 @@ func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, rece
q.pendingChanged(q.pending.Add(^uint64(0))) // -1
}()
receipt, err := q.txMgr.Send(ctx, candidate)
go func() {
// notify from a goroutine to ensure the receipt channel won't block method completion
receiptCh <- TxReceipt[T]{
ID: id,
Receipt: receipt,
Err: err,
}
q.receiptWg.Done()
}()
return err
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment