Commit 9452aa66 authored by Roberto Bayardo's avatar Roberto Bayardo Committed by GitHub

op-service/txmgr: multiple fixes / improvements (#11614)

* - make immediate nonce-too-low error abort send (since it could never succeed otherwise)

- make txmgr resubmit a transaction when fee bumping fails in case it has been dropped from the mempool

- only bump fees when they really should be bumped

- set txmgr overall default send timeout of 10 minutes. It was infinite, which led to permanently
  stuck transaction in combination with the other bugs fixed in this PR.

* Update op-service/txmgr/txmgr_test.go

---------
Co-authored-by: default avatarSebastian Stammler <stammler.s@gmail.com>
parent 4c211faa
......@@ -79,7 +79,7 @@ var (
MinBaseFeeGwei: 1.0,
ResubmissionTimeout: 48 * time.Second,
NetworkTimeout: 10 * time.Second,
TxSendTimeout: 0 * time.Second,
TxSendTimeout: 10 * time.Minute,
TxNotInMempoolTimeout: 2 * time.Minute,
ReceiptQueryInterval: 12 * time.Second,
}
......
......@@ -37,6 +37,9 @@ type SendState struct {
// Whether any attempt to send the tx resulted in ErrAlreadyReserved
alreadyReserved bool
// Whether we should bump fees before trying to publish the tx again
bumpFees bool
// Miscellaneous tracking
bumpCount int // number of times we have bumped the gas price
}
......@@ -120,6 +123,10 @@ func (s *SendState) CriticalError() error {
case s.nonceTooLowCount >= s.safeAbortNonceTooLowCount:
// we have exceeded the nonce too low count
return core.ErrNonceTooLow
case s.successFullPublishCount == 0 && s.nonceTooLowCount > 0:
// A nonce too low error before successfully publishing any transaction means the tx will
// need a different nonce, which we can force by returning error.
return core.ErrNonceTooLow
case s.successFullPublishCount == 0 && s.now().After(s.txInMempoolDeadline):
// unable to get the tx into the mempool in the allotted time
return ErrMempoolDeadlineExpired
......
......@@ -58,11 +58,21 @@ func TestSendStateNoAbortAfterProcessOtherError(t *testing.T) {
require.Nil(t, sendState.CriticalError())
}
// TestSendStateAbortSafelyAfterNonceTooLowButNoTxMined asserts that we will abort after the very
// first none-too-low error if a tx hasn't yet been published.
func TestSendStateAbortSafelyAfterNonceTooLowNoTxPublished(t *testing.T) {
sendState := newSendState()
sendState.ProcessSendError(core.ErrNonceTooLow)
require.ErrorIs(t, sendState.CriticalError(), core.ErrNonceTooLow)
}
// TestSendStateAbortSafelyAfterNonceTooLowButNoTxMined asserts that we will
// abort after the safe abort interval has elapsed if we haven't mined a tx.
func TestSendStateAbortSafelyAfterNonceTooLowButNoTxMined(t *testing.T) {
sendState := newSendState()
sendState.ProcessSendError(nil)
sendState.ProcessSendError(core.ErrNonceTooLow)
require.Nil(t, sendState.CriticalError())
sendState.ProcessSendError(core.ErrNonceTooLow)
......@@ -90,6 +100,7 @@ func TestSendStateMiningTxCancelsAbort(t *testing.T) {
func TestSendStateReorgingTxResetsAbort(t *testing.T) {
sendState := newSendState()
sendState.ProcessSendError(nil)
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.TxMined(testHash)
......@@ -120,6 +131,7 @@ func TestSendStateNoAbortEvenIfNonceTooLowAfterTxMined(t *testing.T) {
func TestSendStateSafeAbortIfNonceTooLowPersistsAfterUnmine(t *testing.T) {
sendState := newSendState()
sendState.ProcessSendError(nil)
sendState.TxMined(testHash)
sendState.TxNotMined(testHash)
sendState.ProcessSendError(core.ErrNonceTooLow)
......
......@@ -53,8 +53,8 @@ var (
type TxManager interface {
// Send is used to create & send a transaction. It will handle increasing
// the gas price & ensuring that the transaction remains in the transaction pool.
// It can be stopped by cancelling the provided context; however, the transaction
// may be included on L1 even if the context is cancelled.
// It can be stopped by canceling the provided context; however, the transaction
// may be included on L1 even if the context is canceled.
//
// NOTE: Send can be called concurrently, the nonce will be managed internally.
//
......@@ -470,44 +470,33 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t
sendState := NewSendState(m.cfg.SafeAbortNonceTooLowCount, m.cfg.TxNotInMempoolTimeout)
receiptChan := make(chan *types.Receipt, 1)
publishAndWait := func(tx *types.Transaction, bumpFees bool) *types.Transaction {
resubmissionTimeout := m.GetBumpFeeRetryTime()
ticker := time.NewTicker(resubmissionTimeout)
defer ticker.Stop()
for {
if !sendState.IsWaitingForConfirmation() {
if m.closed.Load() {
// the tx manager closed and no txs are waiting to be confirmed, give up
m.txLogger(tx, false).Warn("TxManager closed, aborting transaction submission")
return nil, ErrClosed
}
var published bool
if tx, published = m.publishTx(ctx, tx, sendState); published {
wg.Add(1)
tx, published := m.publishTx(ctx, tx, sendState, bumpFees)
if published {
go func() {
defer wg.Done()
m.waitForTx(ctx, tx, sendState, receiptChan)
}()
} else {
wg.Done()
}
return tx
}
// Immediately publish a transaction before starting the resubmission loop
tx = publishAndWait(tx, false)
resubmissionTimeout := m.GetBumpFeeRetryTime()
ticker := time.NewTicker(resubmissionTimeout)
defer ticker.Stop()
for {
if err := sendState.CriticalError(); err != nil {
m.txLogger(tx, false).Warn("Aborting transaction submission", "err", err)
return nil, fmt.Errorf("aborted tx send due to critical error: %w", err)
}
select {
case <-ticker.C:
// Don't resubmit a transaction if it has been mined, but we are waiting for the conf depth.
if sendState.IsWaitingForConfirmation() {
continue
}
// if the tx manager closed while we were waiting for the tx, give up
if m.closed.Load() {
m.txLogger(tx, false).Warn("TxManager closed, aborting transaction submission")
return nil, ErrClosed
}
tx = publishAndWait(tx, true)
case <-ctx.Done():
return nil, ctx.Err()
......@@ -523,34 +512,34 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t
// publishTx publishes the transaction to the transaction pool. If it receives any underpriced errors
// it will bump the fees and retry.
// Returns the latest fee bumped tx, and a boolean indicating whether the tx was sent or not
func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction, sendState *SendState, bumpFeesImmediately bool) (*types.Transaction, bool) {
func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction, sendState *SendState) (*types.Transaction, bool) {
l := m.txLogger(tx, true)
l.Info("Publishing transaction", "tx", tx.Hash())
for {
// if the tx manager closed, give up without bumping fees or retrying
if m.closed.Load() {
l.Warn("TxManager closed, aborting transaction submission")
return tx, false
}
if bumpFeesImmediately {
newTx, err := m.increaseGasPrice(ctx, tx)
if err != nil {
l.Error("unable to increase gas", "err", err)
if sendState.bumpFees {
if newTx, err := m.increaseGasPrice(ctx, tx); err != nil {
l.Warn("unable to increase gas, will try to re-publish the tx", "err", err)
m.metr.TxPublished("bump_failed")
// Even if we are unable to bump fees, we must still resubmit the transaction
// because a previously successfully published tx can get dropped from the
// mempool. If we don't try to resubmit it to either force a failure (eg. from
// nonce to low errors) or get it back into the mempool, we can end up waiting on
// it to get mined indefinitely.
} else {
if sendState.IsWaitingForConfirmation() {
// A previously published tx might get mined during the increaseGasPrice call
// above, in which case we can abort trying to replace it with a higher fee tx.
return tx, false
}
tx = newTx
sendState.bumpCount++
tx = newTx
l = m.txLogger(tx, true)
// Disable bumping fees again until the new transaction is successfully published,
// or we immediately get another underpriced error.
sendState.bumpFees = false
}
bumpFeesImmediately = true // bump fees next loop
if sendState.IsWaitingForConfirmation() {
// there is a chance the previous tx goes into "waiting for confirmation" state
// during the increaseGasPrice call; continue waiting rather than resubmit the tx
return tx, false
}
cCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
......@@ -561,6 +550,9 @@ func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction,
if err == nil {
m.metr.TxPublished("")
l.Info("Transaction successfully published", "tx", tx.Hash())
// Tx made it into the mempool, so we'll need a fee bump if we end up trying to replace
// it with another publish attempt.
sendState.bumpFees = true
return tx, true
}
......@@ -575,26 +567,33 @@ func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction,
m.metr.TxPublished("nonce_too_low")
case errStringMatch(err, context.Canceled):
m.metr.RPCError()
l.Warn("transaction send cancelled", "err", err)
m.metr.TxPublished("context_cancelled")
l.Warn("transaction send canceled", "err", err)
m.metr.TxPublished("context_canceled")
case errStringMatch(err, txpool.ErrAlreadyKnown):
l.Warn("resubmitted already known transaction", "err", err)
m.metr.TxPublished("tx_already_known")
case errStringMatch(err, txpool.ErrReplaceUnderpriced):
l.Warn("transaction replacement is underpriced", "err", err)
m.metr.TxPublished("tx_replacement_underpriced")
continue // retry with fee bump
// retry tx with fee bump, unless we already just tried to bump them
if !sendState.bumpFees {
sendState.bumpFees = true
continue
}
case errStringMatch(err, txpool.ErrUnderpriced):
l.Warn("transaction is underpriced", "err", err)
m.metr.TxPublished("tx_underpriced")
continue // retry with fee bump
// retry tx with fee bump, unless we already just tried to bump them
if !sendState.bumpFees {
sendState.bumpFees = true
continue
}
default:
m.metr.RPCError()
l.Error("unable to publish transaction", "err", err)
m.metr.TxPublished("unknown_error")
}
// on non-underpriced error return immediately; will retry on next resubmission timeout
return tx, false
}
}
......@@ -617,7 +616,7 @@ func (m *SimpleTxManager) waitForTx(ctx context.Context, tx *types.Transaction,
}
}
// waitMined waits for the transaction to be mined or for the context to be cancelled.
// waitMined waits for the transaction to be mined or for the context to be canceled.
func (m *SimpleTxManager) waitMined(ctx context.Context, tx *types.Transaction, sendState *SendState) (*types.Receipt, error) {
txHash := tx.Hash()
queryTicker := time.NewTicker(m.cfg.ReceiptQueryInterval)
......
......@@ -389,7 +389,7 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) {
}
h.backend.setTxSender(sendTx)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
receipt, err := h.mgr.sendTx(ctx, tx)
......@@ -397,6 +397,37 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) {
require.Nil(t, receipt)
}
// TestTxMgrTxSendTimeout tests that the TxSendTimeout is respected when trying to send a
// transaction, even if NetworkTimeout expires first.
func TestTxMgrTxSendTimeout(t *testing.T) {
t.Parallel()
conf := configWithNumConfs(1)
conf.TxSendTimeout = 3 * time.Second
conf.NetworkTimeout = 1 * time.Second
h := newTestHarnessWithConfig(t, conf)
txCandidate := h.createTxCandidate()
sendCount := 0
sendTx := func(ctx context.Context, tx *types.Transaction) error {
sendCount++
<-ctx.Done()
return context.DeadlineExceeded
}
h.backend.setTxSender(sendTx)
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()
receipt, err := h.mgr.send(ctx, txCandidate)
require.ErrorIs(t, err, context.DeadlineExceeded)
// Because network timeout is much shorter than send timeout, we should see multiple send attempts
// before the overall send fails.
require.Greater(t, sendCount, 1)
require.Nil(t, receipt)
}
// TestAlreadyReserved tests that AlreadyReserved error results in immediate abort of transaction
// sending.
func TestAlreadyReserved(t *testing.T) {
......@@ -664,7 +695,7 @@ func TestTxMgr_SigningFails(t *testing.T) {
// TestTxMgrOnlyOnePublicationSucceeds asserts that the tx manager will return a
// receipt so long as at least one of the publications is able to succeed with a
// simulated rpc failure.
// simulated failure.
func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) {
t.Parallel()
......@@ -679,7 +710,7 @@ func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) {
sendTx := func(ctx context.Context, tx *types.Transaction) error {
// Fail all but the final attempt.
if !h.gasPricer.shouldMine(tx.GasFeeCap()) {
return errRpcFailure
return txpool.ErrUnderpriced
}
txHash := tx.Hash()
......@@ -699,7 +730,7 @@ func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) {
// TestTxMgrConfirmsMinGasPriceAfterBumping delays the mining of the initial tx
// with the minimum gas price, and asserts that its receipt is returned even
// though if the gas price has been bumped in other goroutines.
// if the gas price has been bumped in other goroutines.
func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) {
t.Parallel()
......@@ -731,6 +762,47 @@ func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) {
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
}
// TestTxMgrRetriesUnbumpableTx tests that a tx whose fees cannot be bumped will still be
// re-published in case it had been dropped from the mempool.
func TestTxMgrRetriesUnbumpableTx(t *testing.T) {
t.Parallel()
cfg := configWithNumConfs(1)
cfg.FeeLimitMultiplier.Store(1) // don't allow fees to be bumped over the suggested values
h := newTestHarnessWithConfig(t, cfg)
// Make the fees unbumpable by starting with fees that will be WAY over the suggested values
gasTipCap, gasFeeCap, _ := h.gasPricer.feesForEpoch(100)
txToSend := types.NewTx(&types.DynamicFeeTx{
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
})
sameTxPublishAttempts := 0
sendTx := func(ctx context.Context, tx *types.Transaction) error {
// delay mining so several retries should be triggered
if tx.Hash().Cmp(txToSend.Hash()) == 0 {
sameTxPublishAttempts++
}
if h.gasPricer.shouldMine(tx.GasFeeCap()) {
// delay mining to give it enough time for ~3 retries
time.AfterFunc(3*time.Second, func() {
txHash := tx.Hash()
h.backend.mine(&txHash, tx.GasFeeCap(), nil)
})
}
return nil
}
h.backend.setTxSender(sendTx)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.sendTx(ctx, txToSend)
require.NoError(t, err)
require.NotNil(t, receipt)
require.Greater(t, sameTxPublishAttempts, 1, "expected the original tx to be retried at least once")
}
// TestTxMgrDoesntAbortNonceTooLowAfterMiningTx
func TestTxMgrDoesntAbortNonceTooLowAfterMiningTx(t *testing.T) {
t.Parallel()
......@@ -796,21 +868,21 @@ func TestWaitMinedReturnsReceiptOnFirstSuccess(t *testing.T) {
require.Equal(t, receipt.TxHash, txHash)
}
// TestWaitMinedCanBeCanceled ensures that waitMined exits of the passed context
// TestWaitMinedCanBeCanceled ensures that waitMined exits if the passed context
// is canceled before a receipt is found.
func TestWaitMinedCanBeCanceled(t *testing.T) {
t.Parallel()
h := newTestHarness(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
// Create an unimined tx.
tx := types.NewTx(&types.LegacyTx{})
receipt, err := h.mgr.waitMined(ctx, tx, NewSendState(10, time.Hour))
require.Equal(t, err, context.DeadlineExceeded)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Nil(t, receipt)
}
......@@ -831,7 +903,7 @@ func TestWaitMinedMultipleConfs(t *testing.T) {
h.backend.mine(&txHash, new(big.Int), nil)
receipt, err := h.mgr.waitMined(ctx, tx, NewSendState(10, time.Hour))
require.Equal(t, err, context.DeadlineExceeded)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Nil(t, receipt)
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
......@@ -1360,7 +1432,6 @@ func TestMinFees(t *testing.T) {
// TestClose ensures that the tx manager will refuse new work and cancel any in progress
func TestClose(t *testing.T) {
conf := configWithNumConfs(1)
conf.SafeAbortNonceTooLowCount = 100
h := newTestHarnessWithConfig(t, conf)
sendingSignal := make(chan struct{})
......@@ -1382,7 +1453,7 @@ func TestClose(t *testing.T) {
h.backend.mine(&txHash, tx.GasFeeCap(), big.NewInt(1))
} else {
time.Sleep(10 * time.Millisecond)
err = core.ErrNonceTooLow
err = errRpcFailure
}
return
}
......@@ -1412,7 +1483,7 @@ func TestClose(t *testing.T) {
_, err = h.mgr.Send(ctx, TxCandidate{
To: &common.Address{},
})
require.ErrorIs(t, ErrClosed, err)
require.ErrorIs(t, err, ErrClosed)
// confirm that the tx was canceled before it retried to completion
require.Less(t, called, retries)
require.True(t, h.mgr.closed.Load())
......@@ -1423,7 +1494,7 @@ func TestClose(t *testing.T) {
_, err = h.mgr.Send(ctx, TxCandidate{
To: &common.Address{},
})
require.ErrorIs(t, ErrClosed, err)
require.ErrorIs(t, err, ErrClosed)
// confirm that the tx was canceled before it ever made it to the backend
require.Equal(t, 0, called)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment