Commit 8d590713 authored by Michael de Hoog's avatar Michael de Hoog

Cleanup

parent 019ce89c
...@@ -28,7 +28,7 @@ type Queue[T any] struct { ...@@ -28,7 +28,7 @@ type Queue[T any] struct {
// NewQueue creates a new transaction sending Queue, with the following parameters: // NewQueue creates a new transaction sending Queue, with the following parameters:
// - maxPending: max number of pending txs at once (0 == no limit) // - maxPending: max number of pending txs at once (0 == no limit)
// - pendingChanged: called whenever a job starts or finishes. The // - pendingChanged: called whenever a tx send starts or finishes. The
// number of currently pending txs is passed as a parameter. // number of currently pending txs is passed as a parameter.
func NewQueue[T any](txMgr TxManager, maxPending uint64, pendingChanged func(uint64)) *Queue[T] { func NewQueue[T any](txMgr TxManager, maxPending uint64, pendingChanged func(uint64)) *Queue[T] {
if maxPending > math.MaxInt64 { if maxPending > math.MaxInt64 {
...@@ -47,7 +47,7 @@ func NewQueue[T any](txMgr TxManager, maxPending uint64, pendingChanged func(uin ...@@ -47,7 +47,7 @@ func NewQueue[T any](txMgr TxManager, maxPending uint64, pendingChanged func(uin
} }
} }
// Wait waits on all running jobs to stop. // Wait waits for all pending txs to complete (or fail).
func (q *Queue[T]) Wait() { func (q *Queue[T]) Wait() {
q.wg.Wait() q.wg.Wait()
} }
...@@ -57,8 +57,7 @@ func (q *Queue[T]) Wait() { ...@@ -57,8 +57,7 @@ func (q *Queue[T]) Wait() {
// tx does not exist. Returns the error returned from the TxFactory (if any). // tx does not exist. Returns the error returned from the TxFactory (if any).
func (q *Queue[T]) Send(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) error { func (q *Queue[T]) Send(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) error {
if q.semaphore != nil { if q.semaphore != nil {
err := q.semaphore.Acquire(ctx, 1) if err := q.semaphore.Acquire(ctx, 1); err != nil {
if err != nil {
return err return err
} }
} }
...@@ -71,10 +70,8 @@ func (q *Queue[T]) Send(ctx context.Context, factory TxFactory[T], receiptCh cha ...@@ -71,10 +70,8 @@ func (q *Queue[T]) Send(ctx context.Context, factory TxFactory[T], receiptCh cha
// The TxFactory should return `nil` if the next tx does not exist. Returns // The TxFactory should return `nil` if the next tx does not exist. Returns
// the error returned from the TxFactory (if any). // the error returned from the TxFactory (if any).
func (q *Queue[T]) TrySend(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) error { func (q *Queue[T]) TrySend(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) error {
if q.semaphore != nil { if q.semaphore != nil && !q.semaphore.TryAcquire(1) {
if !q.semaphore.TryAcquire(1) { return nil
return nil
}
} }
return q.trySend(ctx, factory, receiptCh) return q.trySend(ctx, factory, receiptCh)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment