Commit 3172ccfe authored by Michael de Hoog's avatar Michael de Hoog

Simplify

parent 3d632f70
...@@ -69,13 +69,11 @@ func (q *Queue[T]) Wait() { ...@@ -69,13 +69,11 @@ func (q *Queue[T]) Wait() {
func (q *Queue[T]) Send(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) error { func (q *Queue[T]) Send(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) error {
ctx, cancel := mergeContexts(ctx, q.ctx) ctx, cancel := mergeContexts(ctx, q.ctx)
defer cancel() defer cancel()
errChan := make(chan error) factoryErrCh := make(chan error)
q.group.Go(func() error { q.group.Go(func() error {
sender, err := q.createTxSender(ctx, factory, receiptCh) return q.sendTx(ctx, factory, factoryErrCh, receiptCh)
errChan <- err
return sender()
}) })
return <-errChan return <-factoryErrCh
} }
// TrySend sends the next tx, but only if the number of pending txs is below the // TrySend sends the next tx, but only if the number of pending txs is below the
...@@ -91,42 +89,41 @@ func (q *Queue[T]) Send(ctx context.Context, factory TxFactory[T], receiptCh cha ...@@ -91,42 +89,41 @@ func (q *Queue[T]) Send(ctx context.Context, factory TxFactory[T], receiptCh cha
func (q *Queue[T]) TrySend(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) (bool, error) { func (q *Queue[T]) TrySend(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) (bool, error) {
ctx, cancel := mergeContexts(ctx, q.ctx) ctx, cancel := mergeContexts(ctx, q.ctx)
defer cancel() defer cancel()
errChan := make(chan error) factoryErrCh := make(chan error)
queued := q.group.TryGo(func() error { started := q.group.TryGo(func() error {
sender, err := q.createTxSender(ctx, factory, receiptCh) return q.sendTx(ctx, factory, factoryErrCh, receiptCh)
errChan <- err
return sender()
}) })
if !queued { if !started {
return false, nil return false, nil
} }
err := <-errChan err := <-factoryErrCh
return err != nil, err return err != nil, err
} }
func (q *Queue[T]) createTxSender(ctx context.Context, factory TxFactory[T], receiptCh chan TxReceipt[T]) (func() error, error) { func (q *Queue[T]) sendTx(ctx context.Context, factory TxFactory[T], factoryErrorCh chan error, receiptCh chan TxReceipt[T]) error {
// lock to prevent concurrent access to the tx factory // lock to prevent concurrent access to the tx factory
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
candidate, id, err := factory(ctx) candidate, id, err := factory(ctx)
factoryErrorCh <- err
if err != nil { if err != nil {
return nil, err // Factory returned an error which was returned in the channel. This means
// there was no tx to send, so return nil.
return nil
} }
q.pendingChanged(q.pending.Add(1)) q.pendingChanged(q.pending.Add(1))
return func() error { defer func() {
defer func() { q.pendingChanged(q.pending.Add(^uint64(0))) // -1
q.pendingChanged(q.pending.Add(^uint64(0))) // -1 }()
}() receipt, err := q.txMgr.Send(ctx, candidate)
receipt, err := q.txMgr.Send(ctx, candidate) receiptCh <- TxReceipt[T]{
receiptCh <- TxReceipt[T]{ ID: id,
ID: id, Receipt: receipt,
Receipt: receipt, Err: err,
Err: err, }
} return err
return err
}, nil
} }
// mergeContexts creates a new Context that is canceled if either of the two // mergeContexts creates a new Context that is canceled if either of the two
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment