Commit 7fd8443b authored by Michael de Hoog's avatar Michael de Hoog

Ensure that Queue.Wait also waits for tx receipts

parent e49b0068
...@@ -24,6 +24,7 @@ type Queue[T any] struct { ...@@ -24,6 +24,7 @@ type Queue[T any] struct {
txMgr TxManager txMgr TxManager
maxPending uint64 maxPending uint64
pendingChanged func(uint64) pendingChanged func(uint64)
receiptWg sync.WaitGroup
pending atomic.Uint64 pending atomic.Uint64
groupLock sync.Mutex groupLock sync.Mutex
groupCtx context.Context groupCtx context.Context
...@@ -49,6 +50,7 @@ func NewQueue[T any](ctx context.Context, txMgr TxManager, maxPending uint64, pe ...@@ -49,6 +50,7 @@ func NewQueue[T any](ctx context.Context, txMgr TxManager, maxPending uint64, pe
// Wait waits for all pending txs to complete (or fail). // Wait waits for all pending txs to complete (or fail).
func (q *Queue[T]) Wait() { func (q *Queue[T]) Wait() {
q.receiptWg.Wait()
if q.group == nil { if q.group == nil {
return return
} }
...@@ -61,6 +63,7 @@ func (q *Queue[T]) Wait() { ...@@ -61,6 +63,7 @@ func (q *Queue[T]) Wait() {
// The actual tx sending is non-blocking, with the receipt returned on the // The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. // provided receipt channel.
func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) { func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) {
q.receiptWg.Add(1)
group, ctx := q.groupContext() group, ctx := q.groupContext()
group.Go(func() error { group.Go(func() error {
return q.sendTx(ctx, id, candidate, receiptCh) return q.sendTx(ctx, id, candidate, receiptCh)
...@@ -76,10 +79,16 @@ func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T] ...@@ -76,10 +79,16 @@ func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]
// The actual tx sending is non-blocking, with the receipt returned on the // The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. // provided receipt channel.
func (q *Queue[T]) TrySend(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) bool { func (q *Queue[T]) TrySend(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) bool {
q.receiptWg.Add(1)
group, ctx := q.groupContext() group, ctx := q.groupContext()
return group.TryGo(func() error { started := group.TryGo(func() error {
return q.sendTx(ctx, id, candidate, receiptCh) return q.sendTx(ctx, id, candidate, receiptCh)
}) })
if !started {
// send didn't start so receipt will never be available
q.receiptWg.Done()
}
return started
} }
func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) error { func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) error {
...@@ -95,6 +104,7 @@ func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, rece ...@@ -95,6 +104,7 @@ func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, rece
Receipt: receipt, Receipt: receipt,
Err: err, Err: err,
} }
q.receiptWg.Done()
}() }()
return err return err
} }
...@@ -110,7 +120,9 @@ func (q *Queue[T]) groupContext() (*errgroup.Group, context.Context) { ...@@ -110,7 +120,9 @@ func (q *Queue[T]) groupContext() (*errgroup.Group, context.Context) {
if q.groupCtx == nil || q.groupCtx.Err() != nil { if q.groupCtx == nil || q.groupCtx.Err() != nil {
// no group exists, or the existing context has an error, so we need to wait // no group exists, or the existing context has an error, so we need to wait
// for existing group threads to complete (if any) and create a new group // for existing group threads to complete (if any) and create a new group
q.Wait() if q.group != nil {
_ = q.group.Wait()
}
q.group, q.groupCtx = errgroup.WithContext(q.ctx) q.group, q.groupCtx = errgroup.WithContext(q.ctx)
if q.maxPending > 0 { if q.maxPending > 0 {
q.group.SetLimit(int(q.maxPending)) q.group.SetLimit(int(q.maxPending))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment