Commit 97aa08a6 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Add SendAsync to TxMgr (#11843)

* Add SendAsync to TxMgr

Adds a SendAsync method to TxMgr. I'd like to use this for `op-deployer`, which needs to send multiple transactions in parallel but with predictable nonces. `SendAsync` returns a channel that resolves with the result of each send, but synchronously increases the nonce and prepares the first send prior to returning.

* review updates + tests
parent d887cfa9
...@@ -128,6 +128,10 @@ func (s *stubTxMgr) Send(ctx context.Context, candidate txmgr.TxCandidate) (*typ ...@@ -128,6 +128,10 @@ func (s *stubTxMgr) Send(ctx context.Context, candidate txmgr.TxCandidate) (*typ
return <-ch, nil return <-ch, nil
} }
func (s *stubTxMgr) SendAsync(ctx context.Context, candidate txmgr.TxCandidate, ch chan txmgr.SendResponse) {
panic("unimplemented")
}
func (s *stubTxMgr) recordTx(candidate txmgr.TxCandidate) chan *types.Receipt { func (s *stubTxMgr) recordTx(candidate txmgr.TxCandidate) chan *types.Receipt {
s.m.Lock() s.m.Lock()
defer s.m.Unlock() defer s.m.Unlock()
......
...@@ -69,6 +69,10 @@ func (f fakeTxMgr) Send(_ context.Context, _ txmgr.TxCandidate) (*types.Receipt, ...@@ -69,6 +69,10 @@ func (f fakeTxMgr) Send(_ context.Context, _ txmgr.TxCandidate) (*types.Receipt,
panic("unimplemented") panic("unimplemented")
} }
func (f fakeTxMgr) SendAsync(ctx context.Context, candidate txmgr.TxCandidate, ch chan txmgr.SendResponse) {
panic("unimplemented")
}
func (f fakeTxMgr) Close() { func (f fakeTxMgr) Close() {
} }
......
...@@ -120,6 +120,11 @@ func (_m *TxManager) Send(ctx context.Context, candidate txmgr.TxCandidate) (*ty ...@@ -120,6 +120,11 @@ func (_m *TxManager) Send(ctx context.Context, candidate txmgr.TxCandidate) (*ty
return r0, r1 return r0, r1
} }
// SendAsync provides a mock function with given fields: ctx, candidate, ch
func (_m *TxManager) SendAsync(ctx context.Context, candidate txmgr.TxCandidate, ch chan txmgr.SendResponse) {
_m.Called(ctx, candidate, ch)
}
type mockConstructorTestingTNewTxManager interface { type mockConstructorTestingTNewTxManager interface {
mock.TestingT mock.TestingT
Cleanup(func()) Cleanup(func())
......
...@@ -46,6 +46,12 @@ var ( ...@@ -46,6 +46,12 @@ var (
ErrClosed = errors.New("transaction manager is closed") ErrClosed = errors.New("transaction manager is closed")
) )
type SendResponse struct {
Receipt *types.Receipt
Nonce uint64
Err error
}
// TxManager is an interface that allows callers to reliably publish txs, // TxManager is an interface that allows callers to reliably publish txs,
// bumping the gas price if needed, and obtain the receipt of the resulting tx. // bumping the gas price if needed, and obtain the receipt of the resulting tx.
// //
...@@ -63,6 +69,14 @@ type TxManager interface { ...@@ -63,6 +69,14 @@ type TxManager interface {
// mempool and is in need of replacement or cancellation. // mempool and is in need of replacement or cancellation.
Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error)
// SendAsync is used to create & send a transaction asynchronously. It has similar internal
// semantics to Send, however it returns a channel that will receive the result of the
// send operation once it completes. Transactions crafted synchronously - that is, nonce
// management and gas estimation happen prior to the method returning. This allows callers
// that rely on predictable nonces to send multiple transactions in parallel while preserving
// the order of nonce increments.
SendAsync(ctx context.Context, candidate TxCandidate, ch chan SendResponse)
// From returns the sending address associated with the instance of the transaction manager. // From returns the sending address associated with the instance of the transaction manager.
// It is static for a single instance of a TxManager. // It is static for a single instance of a TxManager.
From() common.Address From() common.Address
...@@ -222,24 +236,83 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ ...@@ -222,24 +236,83 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
if m.closed.Load() { if m.closed.Load() {
return nil, ErrClosed return nil, ErrClosed
} }
m.metr.RecordPendingTx(m.pending.Add(1)) m.metr.RecordPendingTx(m.pending.Add(1))
defer func() { defer m.metr.RecordPendingTx(m.pending.Add(-1))
m.metr.RecordPendingTx(m.pending.Add(-1))
}() var cancel context.CancelFunc
receipt, err := m.send(ctx, candidate) if m.cfg.TxSendTimeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
}
defer cancel()
tx, err := m.prepare(ctx, candidate)
if err != nil { if err != nil {
m.resetNonce() m.resetNonce()
return nil, err
}
receipt, err := m.sendTx(ctx, tx)
if err != nil {
m.resetNonce()
return nil, err
} }
return receipt, err return receipt, err
} }
// send performs the actual transaction creation and sending. func (m *SimpleTxManager) SendAsync(ctx context.Context, candidate TxCandidate, ch chan SendResponse) {
func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) { if cap(ch) == 0 {
if m.cfg.TxSendTimeout != 0 { panic("SendAsync: channel must be buffered")
var cancel context.CancelFunc }
// refuse new requests if the tx manager is closed
if m.closed.Load() {
ch <- SendResponse{
Receipt: nil,
Err: ErrClosed,
}
return
}
m.metr.RecordPendingTx(m.pending.Add(1))
var cancel context.CancelFunc
if m.cfg.TxSendTimeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout) ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
defer cancel()
} }
tx, err := m.prepare(ctx, candidate)
if err != nil {
m.resetNonce()
cancel()
m.metr.RecordPendingTx(m.pending.Add(-1))
ch <- SendResponse{
Receipt: nil,
Err: err,
}
return
}
go func() {
defer m.metr.RecordPendingTx(m.pending.Add(-1))
defer cancel()
receipt, err := m.sendTx(ctx, tx)
if err != nil {
m.resetNonce()
}
ch <- SendResponse{
Receipt: receipt,
Nonce: tx.Nonce(),
Err: err,
}
}()
}
// prepare prepares the transaction for sending.
func (m *SimpleTxManager) prepare(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) {
tx, err := retry.Do(ctx, 30, retry.Fixed(2*time.Second), func() (*types.Transaction, error) { tx, err := retry.Do(ctx, 30, retry.Fixed(2*time.Second), func() (*types.Transaction, error) {
if m.closed.Load() { if m.closed.Load() {
return nil, ErrClosed return nil, ErrClosed
...@@ -253,7 +326,7 @@ func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*typ ...@@ -253,7 +326,7 @@ func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*typ
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create the tx: %w", err) return nil, fmt.Errorf("failed to create the tx: %w", err)
} }
return m.sendTx(ctx, tx) return tx, nil
} }
// craftTx creates the signed transaction // craftTx creates the signed transaction
......
...@@ -338,6 +338,27 @@ func (b *mockBackend) TransactionReceipt(ctx context.Context, txHash common.Hash ...@@ -338,6 +338,27 @@ func (b *mockBackend) TransactionReceipt(ctx context.Context, txHash common.Hash
func (b *mockBackend) Close() { func (b *mockBackend) Close() {
} }
type testSendVariantsFn func(ctx context.Context, h *testHarness, tx TxCandidate) (*types.Receipt, error)
func testSendVariants(t *testing.T, testFn func(t *testing.T, send testSendVariantsFn)) {
t.Parallel()
t.Run("Send", func(t *testing.T) {
testFn(t, func(ctx context.Context, h *testHarness, tx TxCandidate) (*types.Receipt, error) {
return h.mgr.Send(ctx, tx)
})
})
t.Run("SendAsync", func(t *testing.T) {
testFn(t, func(ctx context.Context, h *testHarness, tx TxCandidate) (*types.Receipt, error) {
ch := make(chan SendResponse, 1)
h.mgr.SendAsync(ctx, tx, ch)
res := <-ch
return res.Receipt, res.Err
})
})
}
// TestTxMgrConfirmAtMinGasPrice asserts that Send returns the min gas price tx // TestTxMgrConfirmAtMinGasPrice asserts that Send returns the min gas price tx
// if the tx is mined instantly. // if the tx is mined instantly.
func TestTxMgrConfirmAtMinGasPrice(t *testing.T) { func TestTxMgrConfirmAtMinGasPrice(t *testing.T) {
...@@ -400,32 +421,32 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) { ...@@ -400,32 +421,32 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) {
// TestTxMgrTxSendTimeout tests that the TxSendTimeout is respected when trying to send a // TestTxMgrTxSendTimeout tests that the TxSendTimeout is respected when trying to send a
// transaction, even if NetworkTimeout expires first. // transaction, even if NetworkTimeout expires first.
func TestTxMgrTxSendTimeout(t *testing.T) { func TestTxMgrTxSendTimeout(t *testing.T) {
t.Parallel() testSendVariants(t, func(t *testing.T, send testSendVariantsFn) {
conf := configWithNumConfs(1)
conf := configWithNumConfs(1) conf.TxSendTimeout = 3 * time.Second
conf.TxSendTimeout = 3 * time.Second conf.NetworkTimeout = 1 * time.Second
conf.NetworkTimeout = 1 * time.Second
h := newTestHarnessWithConfig(t, conf)
txCandidate := h.createTxCandidate()
sendCount := 0
sendTx := func(ctx context.Context, tx *types.Transaction) error {
sendCount++
<-ctx.Done()
return context.DeadlineExceeded
}
h.backend.setTxSender(sendTx)
h := newTestHarnessWithConfig(t, conf) ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()
txCandidate := h.createTxCandidate() receipt, err := send(ctx, h, txCandidate)
sendCount := 0 require.ErrorIs(t, err, context.DeadlineExceeded)
sendTx := func(ctx context.Context, tx *types.Transaction) error { // Because network timeout is much shorter than send timeout, we should see multiple send attempts
sendCount++ // before the overall send fails.
<-ctx.Done() require.Greater(t, sendCount, 1)
return context.DeadlineExceeded require.Nil(t, receipt)
} })
h.backend.setTxSender(sendTx)
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()
receipt, err := h.mgr.send(ctx, txCandidate)
require.ErrorIs(t, err, context.DeadlineExceeded)
// Because network timeout is much shorter than send timeout, we should see multiple send attempts
// before the overall send fails.
require.Greater(t, sendCount, 1)
require.Nil(t, receipt)
} }
// TestAlreadyReserved tests that AlreadyReserved error results in immediate abort of transaction // TestAlreadyReserved tests that AlreadyReserved error results in immediate abort of transaction
...@@ -1326,40 +1347,42 @@ func TestErrStringMatch(t *testing.T) { ...@@ -1326,40 +1347,42 @@ func TestErrStringMatch(t *testing.T) {
} }
func TestNonceReset(t *testing.T) { func TestNonceReset(t *testing.T) {
conf := configWithNumConfs(1) testSendVariants(t, func(t *testing.T, send testSendVariantsFn) {
conf.SafeAbortNonceTooLowCount = 1 conf := configWithNumConfs(1)
h := newTestHarnessWithConfig(t, conf) conf.SafeAbortNonceTooLowCount = 1
h := newTestHarnessWithConfig(t, conf)
index := -1
var nonces []uint64 index := -1
sendTx := func(ctx context.Context, tx *types.Transaction) error { var nonces []uint64
index++ sendTx := func(ctx context.Context, tx *types.Transaction) error {
nonces = append(nonces, tx.Nonce()) index++
// fail every 3rd tx nonces = append(nonces, tx.Nonce())
if index%3 == 0 { // fail every 3rd tx
return core.ErrNonceTooLow if index%3 == 0 {
return core.ErrNonceTooLow
}
txHash := tx.Hash()
h.backend.mine(&txHash, tx.GasFeeCap(), nil)
return nil
} }
txHash := tx.Hash() h.backend.setTxSender(sendTx)
h.backend.mine(&txHash, tx.GasFeeCap(), nil)
return nil
}
h.backend.setTxSender(sendTx)
ctx := context.Background() ctx := context.Background()
for i := 0; i < 8; i++ { for i := 0; i < 8; i++ {
_, err := h.mgr.Send(ctx, TxCandidate{ _, err := send(ctx, h, TxCandidate{
To: &common.Address{}, To: &common.Address{},
}) })
// expect every 3rd tx to fail // expect every 3rd tx to fail
if i%3 == 0 { if i%3 == 0 {
require.Error(t, err) require.Error(t, err)
} else { } else {
require.NoError(t, err) require.NoError(t, err)
}
} }
}
// internal nonce tracking should be reset to startingNonce value every 3rd tx // internal nonce tracking should be reset to startingNonce value every 3rd tx
require.Equal(t, []uint64{1, 1, 2, 3, 1, 2, 3, 1}, nonces) require.Equal(t, []uint64{1, 1, 2, 3, 1, 2, 3, 1}, nonces)
})
} }
func TestMinFees(t *testing.T) { func TestMinFees(t *testing.T) {
...@@ -1431,114 +1454,118 @@ func TestMinFees(t *testing.T) { ...@@ -1431,114 +1454,118 @@ func TestMinFees(t *testing.T) {
// TestClose ensures that the tx manager will refuse new work and cancel any in progress // TestClose ensures that the tx manager will refuse new work and cancel any in progress
func TestClose(t *testing.T) { func TestClose(t *testing.T) {
conf := configWithNumConfs(1) testSendVariants(t, func(t *testing.T, send testSendVariantsFn) {
h := newTestHarnessWithConfig(t, conf) conf := configWithNumConfs(1)
h := newTestHarnessWithConfig(t, conf)
sendingSignal := make(chan struct{})
sendingSignal := make(chan struct{})
// Ensure the manager is not closed
require.False(t, h.mgr.closed.Load()) // Ensure the manager is not closed
require.False(t, h.mgr.closed.Load())
// sendTx will fail until it is called a retry-number of times
called := 0 // sendTx will fail until it is called a retry-number of times
const retries = 4 called := 0
sendTx := func(ctx context.Context, tx *types.Transaction) (err error) { const retries = 4
called += 1 sendTx := func(ctx context.Context, tx *types.Transaction) (err error) {
// sendingSignal is used when the tx begins to be sent called += 1
if called == 1 { // sendingSignal is used when the tx begins to be sent
sendingSignal <- struct{}{} if called == 1 {
} sendingSignal <- struct{}{}
if called%retries == 0 { }
txHash := tx.Hash() if called%retries == 0 {
h.backend.mine(&txHash, tx.GasFeeCap(), big.NewInt(1)) txHash := tx.Hash()
} else { h.backend.mine(&txHash, tx.GasFeeCap(), big.NewInt(1))
time.Sleep(10 * time.Millisecond) } else {
err = errRpcFailure time.Sleep(10 * time.Millisecond)
err = errRpcFailure
}
return
} }
return h.backend.setTxSender(sendTx)
}
h.backend.setTxSender(sendTx) // on the first call, we don't use the sending signal but we still need to drain it
go func() {
// on the first call, we don't use the sending signal but we still need to drain it <-sendingSignal
go func() { }()
<-sendingSignal // demonstrate that a tx is sent, even when it must retry repeatedly
}() ctx := context.Background()
// demonstrate that a tx is sent, even when it must retry repeatedly _, err := send(ctx, h, TxCandidate{
ctx := context.Background() To: &common.Address{},
_, err := h.mgr.Send(ctx, TxCandidate{ })
To: &common.Address{}, require.NoError(t, err)
}) require.Equal(t, retries, called)
require.NoError(t, err) called = 0
require.Equal(t, retries, called) // Ensure the manager is *still* not closed
called = 0 require.False(t, h.mgr.closed.Load())
// Ensure the manager is *still* not closed
require.False(t, h.mgr.closed.Load()) // on the second call, we close the manager while the tx is in progress by consuming the sending signal
go func() {
// on the second call, we close the manager while the tx is in progress by consuming the sending signal <-sendingSignal
go func() { h.mgr.Close()
<-sendingSignal }()
h.mgr.Close() // demonstrate that a tx will cancel if it is in progress when the manager is closed
}() _, err = send(ctx, h, TxCandidate{
// demonstrate that a tx will cancel if it is in progress when the manager is closed To: &common.Address{},
_, err = h.mgr.Send(ctx, TxCandidate{ })
To: &common.Address{}, require.ErrorIs(t, err, ErrClosed)
}) // confirm that the tx was canceled before it retried to completion
require.ErrorIs(t, err, ErrClosed) require.Less(t, called, retries)
// confirm that the tx was canceled before it retried to completion require.True(t, h.mgr.closed.Load())
require.Less(t, called, retries) called = 0
require.True(t, h.mgr.closed.Load())
called = 0 // demonstrate that new calls to Send will also fail when the manager is closed
// there should be no need to capture the sending signal here because the manager is already closed and will return immediately
// demonstrate that new calls to Send will also fail when the manager is closed _, err = send(ctx, h, TxCandidate{
// there should be no need to capture the sending signal here because the manager is already closed and will return immediately To: &common.Address{},
_, err = h.mgr.Send(ctx, TxCandidate{ })
To: &common.Address{}, require.ErrorIs(t, err, ErrClosed)
// confirm that the tx was canceled before it ever made it to the backend
require.Equal(t, 0, called)
}) })
require.ErrorIs(t, err, ErrClosed)
// confirm that the tx was canceled before it ever made it to the backend
require.Equal(t, 0, called)
} }
// TestCloseWaitingForConfirmation ensures that the tx manager will wait for confirmation of a tx in flight, even when closed // TestCloseWaitingForConfirmation ensures that the tx manager will wait for confirmation of a tx in flight, even when closed
func TestCloseWaitingForConfirmation(t *testing.T) { func TestCloseWaitingForConfirmation(t *testing.T) {
// two confirmations required so that we can mine and not yet be fully confirmed testSendVariants(t, func(t *testing.T, send testSendVariantsFn) {
conf := configWithNumConfs(2) // two confirmations required so that we can mine and not yet be fully confirmed
h := newTestHarnessWithConfig(t, conf) conf := configWithNumConfs(2)
h := newTestHarnessWithConfig(t, conf)
// sendDone is a signal that the tx has been sent from the sendTx function // sendDone is a signal that the tx has been sent from the sendTx function
sendDone := make(chan struct{}) sendDone := make(chan struct{})
// closeDone is a signal that the txmanager has closed // closeDone is a signal that the txmanager has closed
closeDone := make(chan struct{}) closeDone := make(chan struct{})
sendTx := func(ctx context.Context, tx *types.Transaction) error { sendTx := func(ctx context.Context, tx *types.Transaction) error {
txHash := tx.Hash() txHash := tx.Hash()
h.backend.mine(&txHash, tx.GasFeeCap(), big.NewInt(1)) h.backend.mine(&txHash, tx.GasFeeCap(), big.NewInt(1))
close(sendDone) close(sendDone)
return nil return nil
} }
h.backend.setTxSender(sendTx) h.backend.setTxSender(sendTx)
// this goroutine will close the manager when the tx sending is complete // this goroutine will close the manager when the tx sending is complete
// the transaction is not yet confirmed, so the manager will wait for confirmation // the transaction is not yet confirmed, so the manager will wait for confirmation
go func() { go func() {
<-sendDone <-sendDone
h.mgr.Close() h.mgr.Close()
close(closeDone) close(closeDone)
}() }()
// this goroutine will complete confirmation of the tx when the manager is closed // this goroutine will complete confirmation of the tx when the manager is closed
// by forcing this to happen after close, we are able to observe a closing manager waiting for confirmation // by forcing this to happen after close, we are able to observe a closing manager waiting for confirmation
go func() { go func() {
<-closeDone <-closeDone
h.backend.mine(nil, nil, big.NewInt(1)) h.backend.mine(nil, nil, big.NewInt(1))
}() }()
ctx := context.Background() ctx := context.Background()
_, err := h.mgr.Send(ctx, TxCandidate{ _, err := send(ctx, h, TxCandidate{
To: &common.Address{}, To: &common.Address{},
})
require.True(t, h.mgr.closed.Load())
require.NoError(t, err)
}) })
require.True(t, h.mgr.closed.Load())
require.NoError(t, err)
} }
func TestMakeSidecar(t *testing.T) { func TestMakeSidecar(t *testing.T) {
...@@ -1561,3 +1588,12 @@ func TestMakeSidecar(t *testing.T) { ...@@ -1561,3 +1588,12 @@ func TestMakeSidecar(t *testing.T) {
require.Equal(t, hashes[i], eth.KZGToVersionedHash(commit)) require.Equal(t, hashes[i], eth.KZGToVersionedHash(commit))
} }
} }
func TestSendAsyncUnbufferedChan(t *testing.T) {
conf := configWithNumConfs(2)
h := newTestHarnessWithConfig(t, conf)
require.Panics(t, func() {
h.mgr.SendAsync(context.Background(), TxCandidate{}, make(chan SendResponse))
})
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment