Commit 26f7a40f authored by George Knee's avatar George Knee Committed by GitHub

txmgr: `Queue.Send()` uses `q.txMgr.SendAsync` (#13120)

* txmgr: Queue.Send() uses q.txMgr.SendAsync

This should ensure that transactions are confirmed on chain in the order Queue.Send() is called, without sacrificing parallel tx submission.

* implement SendAsync in op-challenger test stubTxMgr

It doesn't preserve the nonces like the production txMgr does.

* TrySend also ensures synchronous tx nonce ordering (on success)

* factor out HandleResponse fn

* remove unused code

* unexport handler
parent e84868c2
......@@ -129,8 +129,14 @@ func (s *stubTxMgr) Send(ctx context.Context, candidate txmgr.TxCandidate) (*typ
return <-ch, nil
}
// SendAsync simply wraps Send to make it non blocking. It does not guarantee transaction nonce ordering,
// unlike the production txMgr.
func (s *stubTxMgr) SendAsync(ctx context.Context, candidate txmgr.TxCandidate, ch chan txmgr.SendResponse) {
panic("unimplemented")
go func() {
receipt, err := s.Send(ctx, candidate)
resp := txmgr.SendResponse{Receipt: receipt, Err: err}
ch <- resp
}()
}
func (s *stubTxMgr) recordTx(candidate txmgr.TxCandidate) chan *types.Receipt {
......
......@@ -51,17 +51,36 @@ func (q *Queue[T]) Wait() error {
return q.group.Wait()
}
// handleResponse will wait for the response on the first passed channel,
// and then forward it on the second passed channel (attaching the id). It returns
// the response error or the context error if the context is canceled.
func handleResponse[T any](ctx context.Context, c chan SendResponse, d chan TxReceipt[T], id T) error {
select {
case response := <-c:
d <- TxReceipt[T]{ID: id, Receipt: response.Receipt, Err: response.Err}
return response.Err
case <-ctx.Done():
d <- TxReceipt[T]{ID: id, Err: ctx.Err()}
return ctx.Err()
}
}
// Send will wait until the number of pending txs is below the max pending,
// and then send the next tx.
// and then send the next tx asynchronously. The nonce of the transaction is
// determined synchronously, so transactions should be confirmed on chain in
// the order they are sent using this method.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) {
group, ctx := q.groupContext()
group.Go(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
responseChan := make(chan SendResponse, 1)
handleResponse := func() error {
return handleResponse(ctx, responseChan, receiptCh, id)
}
group.Go(handleResponse) // This blocks until the number of handlers is below the limit
q.txMgr.SendAsync(ctx, candidate, responseChan) // Nonce management handled synchronously, i.e. before this returns
}
// TrySend sends the next tx, but only if the number of pending txs is below the
......@@ -75,19 +94,17 @@ func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]
// blocked from completing until the channel is read from.
func (q *Queue[T]) TrySend(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) bool {
group, ctx := q.groupContext()
return group.TryGo(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
}
func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) error {
receipt, err := q.txMgr.Send(ctx, candidate)
receiptCh <- TxReceipt[T]{
ID: id,
Receipt: receipt,
Err: err,
responseChan := make(chan SendResponse, 1)
handleResponse := func() error {
return handleResponse(ctx, responseChan, receiptCh, id)
}
ok := group.TryGo(handleResponse)
if !ok {
return false
} else {
q.txMgr.SendAsync(ctx, candidate, responseChan)
return true
}
return err
}
// groupContext returns a Group and a Context to use when sending a tx.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment