Commit 085b83d5 authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

txmgr: Add `closed` flag and behavior to SimpleTXManager (#8694)

* Add `closed` flag and behavior to SimpleTxManager

* Add additional Cancel opportunity; Add more test

* Cancel during publishTx submission loop

* Test Improvements

* update unit test
parent 0a22f8b8
...@@ -45,6 +45,7 @@ var ( ...@@ -45,6 +45,7 @@ var (
two = big.NewInt(2) two = big.NewInt(2)
ErrBlobFeeLimit = errors.New("blob fee limit reached") ErrBlobFeeLimit = errors.New("blob fee limit reached")
ErrClosed = errors.New("transaction manager is closed")
) )
// TxManager is an interface that allows callers to reliably publish txs, // TxManager is an interface that allows callers to reliably publish txs,
...@@ -119,6 +120,8 @@ type SimpleTxManager struct { ...@@ -119,6 +120,8 @@ type SimpleTxManager struct {
nonceLock sync.RWMutex nonceLock sync.RWMutex
pending atomic.Int64 pending atomic.Int64
closed atomic.Bool
} }
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config. // NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
...@@ -153,8 +156,11 @@ func (m *SimpleTxManager) BlockNumber(ctx context.Context) (uint64, error) { ...@@ -153,8 +156,11 @@ func (m *SimpleTxManager) BlockNumber(ctx context.Context) (uint64, error) {
return m.backend.BlockNumber(ctx) return m.backend.BlockNumber(ctx)
} }
// Close closes the underlying connection, and sets the closed flag.
// once closed, the tx manager will refuse to send any new transactions, and may abandon pending ones.
func (m *SimpleTxManager) Close() { func (m *SimpleTxManager) Close() {
m.backend.Close() m.backend.Close()
m.closed.Store(true)
} }
func (m *SimpleTxManager) txLogger(tx *types.Transaction, logGas bool) log.Logger { func (m *SimpleTxManager) txLogger(tx *types.Transaction, logGas bool) log.Logger {
...@@ -195,6 +201,10 @@ type TxCandidate struct { ...@@ -195,6 +201,10 @@ type TxCandidate struct {
// //
// NOTE: Send can be called concurrently, the nonce will be managed internally. // NOTE: Send can be called concurrently, the nonce will be managed internally.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) { func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
// refuse new requests if the tx manager is closed
if m.closed.Load() {
return nil, ErrClosed
}
m.metr.RecordPendingTx(m.pending.Add(1)) m.metr.RecordPendingTx(m.pending.Add(1))
defer func() { defer func() {
m.metr.RecordPendingTx(m.pending.Add(-1)) m.metr.RecordPendingTx(m.pending.Add(-1))
...@@ -214,6 +224,9 @@ func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*typ ...@@ -214,6 +224,9 @@ func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*typ
defer cancel() defer cancel()
} }
tx, err := retry.Do(ctx, 30, retry.Fixed(2*time.Second), func() (*types.Transaction, error) { tx, err := retry.Do(ctx, 30, retry.Fixed(2*time.Second), func() (*types.Transaction, error) {
if m.closed.Load() {
return nil, ErrClosed
}
tx, err := m.craftTx(ctx, candidate) tx, err := m.craftTx(ctx, candidate)
if err != nil { if err != nil {
m.l.Warn("Failed to create a transaction, will retry", "err", err) m.l.Warn("Failed to create a transaction, will retry", "err", err)
...@@ -418,6 +431,11 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t ...@@ -418,6 +431,11 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t
m.txLogger(tx, false).Warn("Aborting transaction submission") m.txLogger(tx, false).Warn("Aborting transaction submission")
return nil, errors.New("aborted transaction sending") return nil, errors.New("aborted transaction sending")
} }
// if the tx manager closed while we were waiting for the tx, give up
if m.closed.Load() {
m.txLogger(tx, false).Warn("TxManager closed, aborting transaction submission")
return nil, ErrClosed
}
tx = publishAndWait(tx, true) tx = publishAndWait(tx, true)
case <-ctx.Done(): case <-ctx.Done():
...@@ -440,6 +458,11 @@ func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction, ...@@ -440,6 +458,11 @@ func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction,
l.Info("Publishing transaction") l.Info("Publishing transaction")
for { for {
// if the tx manager closed, give up without bumping fees or retrying
if m.closed.Load() {
l.Warn("TxManager closed, aborting transaction submission")
return tx, false
}
if bumpFeesImmediately { if bumpFeesImmediately {
newTx, err := m.increaseGasPrice(ctx, tx) newTx, err := m.increaseGasPrice(ctx, tx)
if err != nil { if err != nil {
......
...@@ -1290,3 +1290,116 @@ func TestMinFees(t *testing.T) { ...@@ -1290,3 +1290,116 @@ func TestMinFees(t *testing.T) {
}) })
} }
} }
// TestClose ensures that the tx manager will refuse new work and cancel any in progress
func TestClose(t *testing.T) {
conf := configWithNumConfs(1)
conf.SafeAbortNonceTooLowCount = 100
h := newTestHarnessWithConfig(t, conf)
sendingSignal := make(chan struct{})
// Ensure the manager is not closed
require.False(t, h.mgr.closed.Load())
// sendTx will fail until it is called a retry-number of times
called := 0
const retries = 4
sendTx := func(ctx context.Context, tx *types.Transaction) (err error) {
called += 1
// sendingSignal is used when the tx begins to be sent
if called == 1 {
sendingSignal <- struct{}{}
}
if called%retries == 0 {
txHash := tx.Hash()
h.backend.mine(&txHash, tx.GasFeeCap(), big.NewInt(1))
} else {
time.Sleep(10 * time.Millisecond)
err = core.ErrNonceTooLow
}
return
}
h.backend.setTxSender(sendTx)
// on the first call, we don't use the sending signal but we still need to drain it
go func() {
<-sendingSignal
}()
// demonstrate that a tx is sent, even when it must retry repeatedly
ctx := context.Background()
_, err := h.mgr.Send(ctx, TxCandidate{
To: &common.Address{},
})
require.NoError(t, err)
require.Equal(t, retries, called)
called = 0
// Ensure the manager is *still* not closed
require.False(t, h.mgr.closed.Load())
// on the second call, we close the manager while the tx is in progress by consuming the sending signal
go func() {
<-sendingSignal
h.mgr.Close()
}()
// demonstrate that a tx will cancel if it is in progress when the manager is closed
_, err = h.mgr.Send(ctx, TxCandidate{
To: &common.Address{},
})
require.ErrorIs(t, ErrClosed, err)
// confirm that the tx was canceled before it retried to completion
require.Less(t, called, retries)
require.True(t, h.mgr.closed.Load())
called = 0
// demonstrate that new calls to Send will also fail when the manager is closed
// there should be no need to capture the sending signal here because the manager is already closed and will return immediately
_, err = h.mgr.Send(ctx, TxCandidate{
To: &common.Address{},
})
require.ErrorIs(t, ErrClosed, err)
// confirm that the tx was canceled before it ever made it to the backend
require.Equal(t, 0, called)
}
// TestCloseWaitingForConfirmation ensures that the tx manager will wait for confirmation of a tx in flight, even when closed
func TestCloseWaitingForConfirmation(t *testing.T) {
// two confirmations required so that we can mine and not yet be fully confirmed
conf := configWithNumConfs(2)
h := newTestHarnessWithConfig(t, conf)
// sendDone is a signal that the tx has been sent from the sendTx function
sendDone := make(chan struct{})
// closeDone is a signal that the txmanager has closed
closeDone := make(chan struct{})
sendTx := func(ctx context.Context, tx *types.Transaction) error {
txHash := tx.Hash()
h.backend.mine(&txHash, tx.GasFeeCap(), big.NewInt(1))
close(sendDone)
return nil
}
h.backend.setTxSender(sendTx)
// this goroutine will close the manager when the tx sending is complete
// the transaction is not yet confirmed, so the manager will wait for confirmation
go func() {
<-sendDone
h.mgr.Close()
close(closeDone)
}()
// this goroutine will complete confirmation of the tx when the manager is closed
// by forcing this to happen after close, we are able to observe a closing manager waiting for confirmation
go func() {
<-closeDone
h.backend.mine(nil, nil, big.NewInt(1))
}()
ctx := context.Background()
_, err := h.mgr.Send(ctx, TxCandidate{
To: &common.Address{},
})
require.True(t, h.mgr.closed.Load())
require.NoError(t, err)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment