Commit cd2975c6 authored by Joshua Gutow's avatar Joshua Gutow

txmgr: Restructure internals

parent 0fad37ea
...@@ -346,6 +346,7 @@ func TestMigration(t *testing.T) { ...@@ -346,6 +346,7 @@ func TestMigration(t *testing.T) {
NumConfirmations: 1, NumConfirmations: 1,
ResubmissionTimeout: 5 * time.Second, ResubmissionTimeout: 5 * time.Second,
SafeAbortNonceTooLowCount: 3, SafeAbortNonceTooLowCount: 3,
TxNotInMempoolTimeout: 2 * time.Minute,
}, },
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: "info", Level: "info",
...@@ -371,6 +372,7 @@ func TestMigration(t *testing.T) { ...@@ -371,6 +372,7 @@ func TestMigration(t *testing.T) {
NumConfirmations: 1, NumConfirmations: 1,
ResubmissionTimeout: 3 * time.Second, ResubmissionTimeout: 3 * time.Second,
SafeAbortNonceTooLowCount: 3, SafeAbortNonceTooLowCount: 3,
TxNotInMempoolTimeout: 2 * time.Minute,
}, },
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: "info", Level: "info",
......
...@@ -580,6 +580,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -580,6 +580,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
ResubmissionTimeout: 3 * time.Second, ResubmissionTimeout: 3 * time.Second,
ReceiptQueryInterval: 50 * time.Millisecond, ReceiptQueryInterval: 50 * time.Millisecond,
NetworkTimeout: 2 * time.Second, NetworkTimeout: 2 * time.Second,
TxNotInMempoolTimeout: 2 * time.Minute,
}, },
AllowNonFinalized: cfg.NonFinalizedProposals, AllowNonFinalized: cfg.NonFinalizedProposals,
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
...@@ -615,6 +616,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -615,6 +616,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
ResubmissionTimeout: 3 * time.Second, ResubmissionTimeout: 3 * time.Second,
ReceiptQueryInterval: 50 * time.Millisecond, ReceiptQueryInterval: 50 * time.Millisecond,
NetworkTimeout: 2 * time.Second, NetworkTimeout: 2 * time.Second,
TxNotInMempoolTimeout: 2 * time.Minute,
}, },
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: "info", Level: "info",
......
...@@ -28,6 +28,7 @@ const ( ...@@ -28,6 +28,7 @@ const (
ResubmissionTimeoutFlagName = "resubmission-timeout" ResubmissionTimeoutFlagName = "resubmission-timeout"
NetworkTimeoutFlagName = "network-timeout" NetworkTimeoutFlagName = "network-timeout"
TxSendTimeoutFlagName = "txmgr.send-timeout" TxSendTimeoutFlagName = "txmgr.send-timeout"
TxNotInMempoolTimeoutFlagName = "txmgr.not-in-mempool-timeout"
ReceiptQueryIntervalFlagName = "txmgr.receipt-query-interval" ReceiptQueryIntervalFlagName = "txmgr.receipt-query-interval"
) )
...@@ -95,6 +96,12 @@ func CLIFlags(envPrefix string) []cli.Flag { ...@@ -95,6 +96,12 @@ func CLIFlags(envPrefix string) []cli.Flag {
Value: 0, Value: 0,
EnvVar: opservice.PrefixEnvVar(envPrefix, "TXMGR_TX_SEND_TIMEOUT"), EnvVar: opservice.PrefixEnvVar(envPrefix, "TXMGR_TX_SEND_TIMEOUT"),
}, },
cli.DurationFlag{
Name: TxNotInMempoolTimeoutFlagName,
Usage: "Timeout for aborting a tx send if the tx does not make it to the mempool.",
Value: 2 * time.Minute,
EnvVar: opservice.PrefixEnvVar(envPrefix, "TXMGR_TX_NOT_IN_MEMPOOL_TIMEOUT"),
},
cli.DurationFlag{ cli.DurationFlag{
Name: ReceiptQueryIntervalFlagName, Name: ReceiptQueryIntervalFlagName,
Usage: "Frequency to poll for receipts", Usage: "Frequency to poll for receipts",
...@@ -118,6 +125,7 @@ type CLIConfig struct { ...@@ -118,6 +125,7 @@ type CLIConfig struct {
ReceiptQueryInterval time.Duration ReceiptQueryInterval time.Duration
NetworkTimeout time.Duration NetworkTimeout time.Duration
TxSendTimeout time.Duration TxSendTimeout time.Duration
TxNotInMempoolTimeout time.Duration
} }
func (m CLIConfig) Check() error { func (m CLIConfig) Check() error {
...@@ -125,16 +133,22 @@ func (m CLIConfig) Check() error { ...@@ -125,16 +133,22 @@ func (m CLIConfig) Check() error {
return errors.New("must provide a L1 RPC url") return errors.New("must provide a L1 RPC url")
} }
if m.NumConfirmations == 0 { if m.NumConfirmations == 0 {
return errors.New("num confirmations must not be 0") return errors.New("NumConfirmations must not be 0")
} }
if m.NetworkTimeout == 0 { if m.NetworkTimeout == 0 {
return errors.New("must provide a network timeout") return errors.New("must provide NetworkTimeout")
} }
if m.ResubmissionTimeout == 0 { if m.ResubmissionTimeout == 0 {
return errors.New("must provide a resumbission interval") return errors.New("must provide ResubmissionTimeout")
} }
if m.ReceiptQueryInterval == 0 { if m.ReceiptQueryInterval == 0 {
return errors.New("must provide a receipt query interval") return errors.New("must provide ReceiptQueryInterval")
}
if m.TxNotInMempoolTimeout == 0 {
return errors.New("must provide TxNotInMempoolTimeout")
}
if m.SafeAbortNonceTooLowCount == 0 {
return errors.New("SafeAbortNonceTooLowCount must not be 0")
} }
if err := m.SignerCLIConfig.Check(); err != nil { if err := m.SignerCLIConfig.Check(); err != nil {
return err return err
...@@ -157,6 +171,7 @@ func ReadCLIConfig(ctx *cli.Context) CLIConfig { ...@@ -157,6 +171,7 @@ func ReadCLIConfig(ctx *cli.Context) CLIConfig {
ReceiptQueryInterval: ctx.GlobalDuration(ReceiptQueryIntervalFlagName), ReceiptQueryInterval: ctx.GlobalDuration(ReceiptQueryIntervalFlagName),
NetworkTimeout: ctx.GlobalDuration(NetworkTimeoutFlagName), NetworkTimeout: ctx.GlobalDuration(NetworkTimeoutFlagName),
TxSendTimeout: ctx.GlobalDuration(TxSendTimeoutFlagName), TxSendTimeout: ctx.GlobalDuration(TxSendTimeoutFlagName),
TxNotInMempoolTimeout: ctx.GlobalDuration(TxNotInMempoolTimeoutFlagName),
} }
} }
...@@ -197,6 +212,7 @@ func NewConfig(cfg CLIConfig, l log.Logger) (Config, error) { ...@@ -197,6 +212,7 @@ func NewConfig(cfg CLIConfig, l log.Logger) (Config, error) {
ResubmissionTimeout: cfg.ResubmissionTimeout, ResubmissionTimeout: cfg.ResubmissionTimeout,
ChainID: chainID, ChainID: chainID,
TxSendTimeout: cfg.TxSendTimeout, TxSendTimeout: cfg.TxSendTimeout,
TxNotInMempoolTimeout: cfg.TxNotInMempoolTimeout,
NetworkTimeout: cfg.NetworkTimeout, NetworkTimeout: cfg.NetworkTimeout,
ReceiptQueryInterval: cfg.ReceiptQueryInterval, ReceiptQueryInterval: cfg.ReceiptQueryInterval,
NumConfirmations: cfg.NumConfirmations, NumConfirmations: cfg.NumConfirmations,
...@@ -222,6 +238,10 @@ type Config struct { ...@@ -222,6 +238,10 @@ type Config struct {
// By default it is unbounded. If set, this is recommended to be at least 20 minutes. // By default it is unbounded. If set, this is recommended to be at least 20 minutes.
TxSendTimeout time.Duration TxSendTimeout time.Duration
// TxNotInMempoolTimeout is how long to wait before aborting a transaction send if the transaction does not
// make it to the mempool. If the tx is in the mempool, TxSendTimeout is used instead.
TxNotInMempoolTimeout time.Duration
// NetworkTimeout is the allowed duration for a single network request. // NetworkTimeout is the allowed duration for a single network request.
// This is intended to be used for network requests that can be replayed. // This is intended to be used for network requests that can be replayed.
NetworkTimeout time.Duration NetworkTimeout time.Duration
......
...@@ -3,6 +3,7 @@ package txmgr ...@@ -3,6 +3,7 @@ package txmgr
import ( import (
"strings" "strings"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
...@@ -12,48 +13,47 @@ import ( ...@@ -12,48 +13,47 @@ import (
// this context, a txn may correspond to multiple different txn hashes due to // this context, a txn may correspond to multiple different txn hashes due to
// varying gas prices, though we treat them all as the same logical txn. This // varying gas prices, though we treat them all as the same logical txn. This
// struct is primarily used to determine whether or not the txmgr should abort a // struct is primarily used to determine whether or not the txmgr should abort a
// given txn and retry with a higher nonce. // given txn.
type SendState struct { type SendState struct {
minedTxs map[common.Hash]struct{} minedTxs map[common.Hash]struct{}
nonceTooLowCount uint64 mu sync.RWMutex
mu sync.RWMutex
safeAbortNonceTooLowCount uint64 // Config
nonceTooLowCount uint64
txInMempoolDeadline time.Time // deadline to abort at if no transactions are in the mempool
// Counts of the different types of errors
successFullPublishCount uint64 // nil error => tx made it to the mempool
safeAbortNonceTooLowCount uint64 // nonce too low error
} }
// NewSendState parameterizes a new SendState from the passed // NewSendState parameterizes a new SendState from the passed
// safeAbortNonceTooLowCount. // safeAbortNonceTooLowCount.
func NewSendState(safeAbortNonceTooLowCount uint64) *SendState { func NewSendState(safeAbortNonceTooLowCount uint64, unableToSendTimeout time.Duration) *SendState {
if safeAbortNonceTooLowCount == 0 { if safeAbortNonceTooLowCount == 0 {
panic("txmgr: safeAbortNonceTooLowCount cannot be zero") panic("txmgr: safeAbortNonceTooLowCount cannot be zero")
} }
return &SendState{ return &SendState{
minedTxs: make(map[common.Hash]struct{}), minedTxs: make(map[common.Hash]struct{}),
nonceTooLowCount: 0,
safeAbortNonceTooLowCount: safeAbortNonceTooLowCount, safeAbortNonceTooLowCount: safeAbortNonceTooLowCount,
txInMempoolDeadline: time.Now().Add(unableToSendTimeout),
} }
} }
// ProcessSendError should be invoked with the error returned for each // ProcessSendError should be invoked with the error returned for each
// publication. It is safe to call this method with nil or arbitrary errors. // publication. It is safe to call this method with nil or arbitrary errors.
// Currently it only acts on errors containing the ErrNonceTooLow message.
func (s *SendState) ProcessSendError(err error) { func (s *SendState) ProcessSendError(err error) {
// Nothing to do.
if err == nil {
return
}
// Only concerned with ErrNonceTooLow.
if !strings.Contains(err.Error(), core.ErrNonceTooLow.Error()) {
return
}
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
// Record this nonce too low observation. // Record the type of error
s.nonceTooLowCount++ switch {
case err == nil:
s.successFullPublishCount++
case strings.Contains(err.Error(), core.ErrNonceTooLow.Error()):
s.nonceTooLowCount++
}
} }
// TxMined records that the txn with txnHash has been mined and is await // TxMined records that the txn with txnHash has been mined and is await
...@@ -85,8 +85,9 @@ func (s *SendState) TxNotMined(txHash common.Hash) { ...@@ -85,8 +85,9 @@ func (s *SendState) TxNotMined(txHash common.Hash) {
} }
// ShouldAbortImmediately returns true if the txmgr should give up on trying a // ShouldAbortImmediately returns true if the txmgr should give up on trying a
// given txn with the target nonce. For now, this only happens if we see an // given txn with the target nonce.
// extended period of getting ErrNonceTooLow without having a txn mined. // This occurs when the set of errors recorded indicates that no further progress can be made
// on this transaction.
func (s *SendState) ShouldAbortImmediately() bool { func (s *SendState) ShouldAbortImmediately() bool {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
...@@ -96,9 +97,14 @@ func (s *SendState) ShouldAbortImmediately() bool { ...@@ -96,9 +97,14 @@ func (s *SendState) ShouldAbortImmediately() bool {
return false return false
} }
// Only abort if we've observed enough ErrNonceTooLow to meet our safe abort // If we have exceeded the nonce too low count, abort
// threshold. if s.nonceTooLowCount >= s.safeAbortNonceTooLowCount ||
return s.nonceTooLowCount >= s.safeAbortNonceTooLowCount // If we have not published a transaction in the allotted time, abort
(s.successFullPublishCount == 0 && time.Now().After(s.txInMempoolDeadline)) {
return true
}
return false
} }
// IsWaitingForConfirmation returns true if we have at least one confirmation on // IsWaitingForConfirmation returns true if we have at least one confirmation on
......
...@@ -3,6 +3,7 @@ package txmgr_test ...@@ -3,6 +3,7 @@ package txmgr_test
import ( import (
"errors" "errors"
"testing" "testing"
"time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -11,14 +12,18 @@ import ( ...@@ -11,14 +12,18 @@ import (
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
) )
const testSafeAbortNonceTooLowCount = 3
var ( var (
testHash = common.HexToHash("0x01") testHash = common.HexToHash("0x01")
) )
const testSafeAbortNonceTooLowCount = 3
func newSendState() *txmgr.SendState { func newSendState() *txmgr.SendState {
return txmgr.NewSendState(testSafeAbortNonceTooLowCount) return newSendStateWithTimeout(time.Hour)
}
func newSendStateWithTimeout(t time.Duration) *txmgr.SendState {
return txmgr.NewSendState(testSafeAbortNonceTooLowCount, t)
} }
func processNSendErrors(sendState *txmgr.SendState, err error, n int) { func processNSendErrors(sendState *txmgr.SendState, err error, n int) {
...@@ -160,3 +165,20 @@ func TestSendStateIsNotWaitingForConfirmationAfterTxUnmined(t *testing.T) { ...@@ -160,3 +165,20 @@ func TestSendStateIsNotWaitingForConfirmationAfterTxUnmined(t *testing.T) {
sendState.TxNotMined(testHash) sendState.TxNotMined(testHash)
require.False(t, sendState.IsWaitingForConfirmation()) require.False(t, sendState.IsWaitingForConfirmation())
} }
// TestSendStateTimeoutAbort ensure that this will abort if it passes the tx pool timeout
// when no successful transactions have been recorded
func TestSendStateTimeoutAbort(t *testing.T) {
sendState := newSendStateWithTimeout(10 * time.Millisecond)
time.Sleep(20 * time.Millisecond)
require.True(t, sendState.ShouldAbortImmediately(), "Should abort after timing out")
}
// TestSendStateNoTimeoutAbortIfPublishedTx ensure that this will not abort if there is
// a successful transaction send.
func TestSendStateNoTimeoutAbortIfPublishedTx(t *testing.T) {
sendState := newSendStateWithTimeout(10 * time.Millisecond)
sendState.ProcessSendError(nil)
time.Sleep(20 * time.Millisecond)
require.False(t, sendState.ShouldAbortImmediately(), "Should not abort if published transcation successfully")
}
...@@ -5,11 +5,13 @@ import ( ...@@ -5,11 +5,13 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"strings"
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -120,9 +122,8 @@ type TxCandidate struct { ...@@ -120,9 +122,8 @@ type TxCandidate struct {
// invocation of sendTx returns (called with differing gas prices). The method // invocation of sendTx returns (called with differing gas prices). The method
// may be canceled using the passed context. // may be canceled using the passed context.
// //
// The initially supplied transaction must be signed, have gas estimation done, and have a reasonable gas fee. // The transaction manager handles all signing. If and only if the gas limit is 0, the
// When the transaction is resubmitted the tx manager will re-sign the transaction at a different gas pricing // transaction manager will do a gas estimation.
// but retain the gas used, the nonce, and the data.
// //
// NOTE: Send should be called by AT MOST one caller at a time. // NOTE: Send should be called by AT MOST one caller at a time.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) { func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
...@@ -133,8 +134,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ ...@@ -133,8 +134,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
} }
tx, err := m.craftTx(ctx, candidate) tx, err := m.craftTx(ctx, candidate)
if err != nil { if err != nil {
m.l.Error("Failed to create the transaction", "err", err) return nil, fmt.Errorf("failed to create the tx: %w", err)
return nil, err
} }
return m.send(ctx, tx) return m.send(ctx, tx)
} }
...@@ -147,7 +147,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ ...@@ -147,7 +147,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) { func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) {
gasTipCap, basefee, err := m.suggestGasPriceCaps(ctx) gasTipCap, basefee, err := m.suggestGasPriceCaps(ctx)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to get gas price info: %w", err)
} }
gasFeeCap := calcGasFeeCap(basefee, gasTipCap) gasFeeCap := calcGasFeeCap(basefee, gasTipCap)
...@@ -193,77 +193,22 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (* ...@@ -193,77 +193,22 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
return m.cfg.Signer(ctx, candidate.From, types.NewTx(rawTx)) return m.cfg.Signer(ctx, candidate.From, types.NewTx(rawTx))
} }
// send submits the same transaction several times with increasing gas prices as necessary.
// It waits for the transaction to be confirmed on chain.
func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) { func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
// Initialize a wait group to track any spawned goroutines, and ensure
// we properly clean up any dangling resources this method generates.
// We assert that this is the case thoroughly in our unit tests.
var wg sync.WaitGroup var wg sync.WaitGroup
defer wg.Wait() defer wg.Wait()
// Initialize a subcontext for the goroutines spawned in this process.
// The defer to cancel is done here (in reverse order of Wait) so that
// the goroutines can exit before blocking on the wait group.
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
sendState := NewSendState(m.cfg.SafeAbortNonceTooLowCount) sendState := NewSendState(m.cfg.SafeAbortNonceTooLowCount, m.cfg.TxNotInMempoolTimeout)
// Create a closure that will block on submitting the tx in the
// background, returning the first successfully mined receipt back to
// the main event loop via receiptChan.
receiptChan := make(chan *types.Receipt, 1) receiptChan := make(chan *types.Receipt, 1)
sendTxAsync := func(tx *types.Transaction) { sendTxAsync := func(tx *types.Transaction) {
defer wg.Done() defer wg.Done()
m.publishAndWaitForTx(ctx, tx, sendState, receiptChan)
txHash := tx.Hash()
nonce := tx.Nonce()
gasTipCap := tx.GasTipCap()
gasFeeCap := tx.GasFeeCap()
log := m.l.New("txHash", txHash, "nonce", nonce, "gasTipCap", gasTipCap, "gasFeeCap", gasFeeCap)
log.Info("publishing transaction")
cCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
err := m.backend.SendTransaction(cCtx, tx)
sendState.ProcessSendError(err)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
if errors.Is(err, txpool.ErrAlreadyKnown) {
log.Info("resubmitted already known transaction")
return
}
log.Error("unable to publish transaction", "err", err)
if sendState.ShouldAbortImmediately() {
log.Warn("Aborting transaction submission")
cancel()
}
return
}
log.Info("transaction published successfully")
// Wait for the transaction to be mined, reporting the receipt
// back to the main event loop if found.
receipt, err := m.waitMined(ctx, tx, sendState)
if err != nil {
log.Debug("send tx failed", "err", err)
}
if receipt != nil {
// Use non-blocking select to ensure function can exit
// if more than one receipt is discovered.
select {
case receiptChan <- receipt:
log.Trace("send tx succeeded")
default:
}
}
} }
// Submit and wait for the receipt at our first gas price in the // Immediately publish a transaction before starting the resumbission loop
// background, before entering the event loop and waiting out the
// resubmission timeout.
wg.Add(1) wg.Add(1)
go sendTxAsync(tx) go sendTxAsync(tx)
...@@ -272,124 +217,137 @@ func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*typ ...@@ -272,124 +217,137 @@ func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*typ
for { for {
select { select {
// Whenever a resubmission timeout has elapsed, bump the gas
// price and publish a new transaction.
case <-ticker.C: case <-ticker.C:
// Avoid republishing if we are waiting for confirmation on an // Don't resubmit a transaction if it has been mined, but we are waiting for the conf depth.
// existing tx. This is primarily an optimization to reduce the
// number of API calls we make, but also reduces the chances of
// getting a false positive reading for ShouldAbortImmediately.
if sendState.IsWaitingForConfirmation() { if sendState.IsWaitingForConfirmation() {
continue continue
} }
// If we see lots of unrecoverable errors (and no pending transactions) abort sending the transaction.
// Increase the gas price & submit the new transaction if sendState.ShouldAbortImmediately() {
newTx, err := m.increaseGasPrice(ctx, tx) m.l.Warn("Aborting transaction submission")
if err != nil { return nil, errors.New("aborted transaction sending")
m.l.Error("Failed to increase the gas price for the tx", "err", err)
// Don't `continue` here so we resubmit the transaction with the same gas price.
} else {
// Save the tx so we know it's gas price.
tx = newTx
} }
// Increase the gas price & submit the new transaction
tx = m.increaseGasPrice(ctx, tx)
wg.Add(1) wg.Add(1)
go sendTxAsync(tx) go sendTxAsync(tx)
// The passed context has been canceled, i.e. in the event of a
// shutdown.
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, ctx.Err()
// The transaction has confirmed.
case receipt := <-receiptChan: case receipt := <-receiptChan:
return receipt, nil return receipt, nil
} }
} }
} }
// waitMined implements the core functionality of WaitMined, with the option to // publishAndWaitForTx publishes the transaction to the transaction pool and then waits for it with [waitMined].
// pass in a SendState to record whether or not the transaction is mined. // It should be called in a new go-routine. It will send the receipt to receiptChan in a non-blocking way if a receipt is found
func (m *SimpleTxManager) waitMined(ctx context.Context, tx *types.Transaction, sendState *SendState) (*types.Receipt, error) { // for the transaction.
queryTicker := time.NewTicker(m.cfg.ReceiptQueryInterval) func (m *SimpleTxManager) publishAndWaitForTx(ctx context.Context, tx *types.Transaction, sendState *SendState, receiptChan chan *types.Receipt) {
defer queryTicker.Stop() log := m.l.New("hash", tx.Hash(), "nonce", tx.Nonce(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
log.Info("publishing transaction")
txHash := tx.Hash() cCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
err := m.backend.SendTransaction(cCtx, tx)
sendState.ProcessSendError(err)
for { // Properly log & exit if there is an error
cCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout) if err != nil {
receipt, err := m.backend.TransactionReceipt(cCtx, txHash)
cancel()
switch { switch {
case receipt != nil: case errStringMatch(err, core.ErrNonceTooLow):
if sendState != nil { log.Warn("nonce too low", "err", err)
sendState.TxMined(txHash) case errStringMatch(err, context.Canceled):
} log.Warn("transaction send cancelled", "err", err)
case errStringMatch(err, txpool.ErrAlreadyKnown):
txHeight := receipt.BlockNumber.Uint64() log.Warn("resubmitted already known transaction", "err", err)
tipHeight, err := m.backend.BlockNumber(ctx) case errStringMatch(err, txpool.ErrReplaceUnderpriced):
if err != nil { log.Warn("transaction replacement is underpriced", "err", err)
m.l.Error("Unable to fetch block number", "err", err) case errStringMatch(err, txpool.ErrUnderpriced):
break log.Warn("transaction is underpriced", "err", err)
}
m.l.Debug("Transaction mined, checking confirmations", "txHash", txHash, "txHeight", txHeight,
"tipHeight", tipHeight, "numConfirmations", m.cfg.NumConfirmations)
// The transaction is considered confirmed when
// txHeight+numConfirmations-1 <= tipHeight. Note that the -1 is
// needed to account for the fact that confirmations have an
// inherent off-by-one, i.e. when using 1 confirmation the
// transaction should be confirmed when txHeight is equal to
// tipHeight. The equation is rewritten in this form to avoid
// underflows.
if txHeight+m.cfg.NumConfirmations <= tipHeight+1 {
m.l.Info("Transaction confirmed", "txHash", txHash)
return receipt, nil
}
// Safe to subtract since we know the LHS above is greater.
confsRemaining := (txHeight + m.cfg.NumConfirmations) - (tipHeight + 1)
m.l.Debug("Transaction not yet confirmed", "txHash", txHash, "confsRemaining", confsRemaining)
case err != nil:
m.l.Trace("Receipt retrievel failed", "hash", txHash, "err", err)
default: default:
if sendState != nil { log.Error("unable to publish transaction", "err", err)
sendState.TxNotMined(txHash)
}
m.l.Trace("Transaction not yet mined", "hash", txHash)
} }
return
}
log.Info("Transaction successfully published")
// Poll for the transaction to be ready & then send the result to receiptChan
receipt, err := m.waitMined(ctx, tx, sendState)
if err != nil {
log.Warn("Transaction receipt not found", "err", err)
return
}
select {
case receiptChan <- receipt:
default:
}
}
// waitMined waits for the transaction to be mined or for the context to be cancelled.
func (m *SimpleTxManager) waitMined(ctx context.Context, tx *types.Transaction, sendState *SendState) (*types.Receipt, error) {
txHash := tx.Hash()
queryTicker := time.NewTicker(m.cfg.ReceiptQueryInterval)
defer queryTicker.Stop()
for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
m.l.Warn("context cancelled in waitMined")
return nil, ctx.Err() return nil, ctx.Err()
case <-queryTicker.C: case <-queryTicker.C:
if receipt := m.queryReceipt(ctx, txHash, sendState); receipt != nil {
return receipt, nil
}
} }
} }
} }
// suggestGasPriceCaps suggests what the new tip & new basefee should be based on the current L1 conditions // queryReceipt queries for the receipt and returns the receipt if it has passed the confirmation depth
func (m *SimpleTxManager) suggestGasPriceCaps(ctx context.Context) (*big.Int, *big.Int, error) { func (m *SimpleTxManager) queryReceipt(ctx context.Context, txHash common.Hash, sendState *SendState) *types.Receipt {
cCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout) ctx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel() defer cancel()
tip, err := m.backend.SuggestGasTipCap(cCtx) receipt, err := m.backend.TransactionReceipt(ctx, txHash)
if err != nil { if errors.Is(err, ethereum.NotFound) {
return nil, nil, fmt.Errorf("failed to fetch the suggested gas tip cap: %w", err) sendState.TxNotMined(txHash)
} else if tip == nil { m.l.Trace("Transaction not yet mined", "hash", txHash)
return nil, nil, errors.New("the suggested tip was nil") return nil
} else if err != nil {
m.l.Info("Receipt retrieval failed", "hash", txHash, "err", err)
return nil
} else if receipt == nil {
m.l.Warn("Receipt and error are both nil", "hash", txHash)
return nil
} }
cCtx, cancel = context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel() // Receipt is confirmed to be valid from this point on
head, err := m.backend.HeaderByNumber(cCtx, nil) sendState.TxMined(txHash)
txHeight := receipt.BlockNumber.Uint64()
tipHeight, err := m.backend.BlockNumber(ctx)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to fetch the suggested basefee: %w", err) m.l.Error("Unable to fetch block number", "err", err)
} else if head.BaseFee == nil { return nil
return nil, nil, errors.New("txmgr does not support pre-london blocks that do not have a basefee")
} }
return tip, head.BaseFee, nil
m.l.Debug("Transaction mined, checking confirmations", "hash", txHash, "txHeight", txHeight,
"tipHeight", tipHeight, "numConfirmations", m.cfg.NumConfirmations)
// The transaction is considered confirmed when
// txHeight+numConfirmations-1 <= tipHeight. Note that the -1 is
// needed to account for the fact that confirmations have an
// inherent off-by-one, i.e. when using 1 confirmation the
// transaction should be confirmed when txHeight is equal to
// tipHeight. The equation is rewritten in this form to avoid
// underflows.
if txHeight+m.cfg.NumConfirmations <= tipHeight+1 {
m.l.Info("Transaction confirmed", "hash", txHash)
return receipt
}
// Safe to subtract since we know the LHS above is greater.
confsRemaining := (txHeight + m.cfg.NumConfirmations) - (tipHeight + 1)
m.l.Debug("Transaction not yet confirmed", "hash", txHash, "confsRemaining", confsRemaining)
return nil
} }
// increaseGasPrice takes the previous transaction & potentially clones then signs it with a higher tip. // increaseGasPrice takes the previous transaction & potentially clones then signs it with a higher tip.
...@@ -399,15 +357,18 @@ func (m *SimpleTxManager) suggestGasPriceCaps(ctx context.Context) (*big.Int, *b ...@@ -399,15 +357,18 @@ func (m *SimpleTxManager) suggestGasPriceCaps(ctx context.Context) (*big.Int, *b
// //
// We do not re-estimate the amount of gas used because for some stateful transactions (like output proposals) the // We do not re-estimate the amount of gas used because for some stateful transactions (like output proposals) the
// act of including the transaction renders the repeat of the transaction invalid. // act of including the transaction renders the repeat of the transaction invalid.
func (m *SimpleTxManager) increaseGasPrice(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) { //
// If it encounters an error with creating the new transaction, it will return the old transaction.
func (m *SimpleTxManager) increaseGasPrice(ctx context.Context, tx *types.Transaction) *types.Transaction {
tip, basefee, err := m.suggestGasPriceCaps(ctx) tip, basefee, err := m.suggestGasPriceCaps(ctx)
if err != nil { if err != nil {
return nil, err m.l.Warn("failed to get suggested gas tip and basefee", "err", err)
return tx
} }
gasTipCap, gasFeeCap := updateFees(tx.GasTipCap(), tx.GasFeeCap(), tip, basefee, m.l) gasTipCap, gasFeeCap := updateFees(tx.GasTipCap(), tx.GasFeeCap(), tip, basefee, m.l)
if tx.GasTipCapIntCmp(gasTipCap) == 0 && tx.GasFeeCapIntCmp(gasFeeCap) == 0 { if tx.GasTipCapIntCmp(gasTipCap) == 0 && tx.GasFeeCapIntCmp(gasFeeCap) == 0 {
return tx, nil return tx
} }
rawTx := &types.DynamicFeeTx{ rawTx := &types.DynamicFeeTx{
...@@ -423,7 +384,33 @@ func (m *SimpleTxManager) increaseGasPrice(ctx context.Context, tx *types.Transa ...@@ -423,7 +384,33 @@ func (m *SimpleTxManager) increaseGasPrice(ctx context.Context, tx *types.Transa
} }
ctx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout) ctx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel() defer cancel()
return m.cfg.Signer(ctx, m.cfg.From, types.NewTx(rawTx)) newTx, err := m.cfg.Signer(ctx, m.cfg.From, types.NewTx(rawTx))
if err != nil {
m.l.Warn("failed to sign new transaction", "err", err)
return tx
}
return newTx
}
// suggestGasPriceCaps suggests what the new tip & new basefee should be based on the current L1 conditions
func (m *SimpleTxManager) suggestGasPriceCaps(ctx context.Context) (*big.Int, *big.Int, error) {
cCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
tip, err := m.backend.SuggestGasTipCap(cCtx)
if err != nil {
return nil, nil, fmt.Errorf("failed to fetch the suggested gas tip cap: %w", err)
} else if tip == nil {
return nil, nil, errors.New("the suggested tip was nil")
}
cCtx, cancel = context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
head, err := m.backend.HeaderByNumber(cCtx, nil)
if err != nil {
return nil, nil, fmt.Errorf("failed to fetch the suggested basefee: %w", err)
} else if head.BaseFee == nil {
return nil, nil, errors.New("txmgr does not support pre-london blocks that do not have a basefee")
}
return tip, head.BaseFee, nil
} }
// calcThresholdValue returns x * priceBumpPercent / 100 // calcThresholdValue returns x * priceBumpPercent / 100
...@@ -478,3 +465,14 @@ func calcGasFeeCap(baseFee, gasTipCap *big.Int) *big.Int { ...@@ -478,3 +465,14 @@ func calcGasFeeCap(baseFee, gasTipCap *big.Int) *big.Int {
new(big.Int).Mul(baseFee, big.NewInt(2)), new(big.Int).Mul(baseFee, big.NewInt(2)),
) )
} }
// errStringMatch returns true if err.Error() is a substring in target.Error() or if both are nil.
// It can accept nil errors without issue.
func errStringMatch(err, target error) bool {
if err == nil && target == nil {
return true
} else if err == nil || target == nil {
return false
}
return strings.Contains(err.Error(), target.Error())
}
...@@ -3,6 +3,7 @@ package txmgr ...@@ -3,6 +3,7 @@ package txmgr
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"math/big" "math/big"
"sync" "sync"
"testing" "testing"
...@@ -20,6 +21,10 @@ import ( ...@@ -20,6 +21,10 @@ import (
type sendTransactionFunc func(ctx context.Context, tx *types.Transaction) error type sendTransactionFunc func(ctx context.Context, tx *types.Transaction) error
func testSendState() *SendState {
return NewSendState(100, time.Hour)
}
// testHarness houses the necessary resources to test the SimpleTxManager. // testHarness houses the necessary resources to test the SimpleTxManager.
type testHarness struct { type testHarness struct {
cfg Config cfg Config
...@@ -68,6 +73,7 @@ func configWithNumConfs(numConfirmations uint64) Config { ...@@ -68,6 +73,7 @@ func configWithNumConfs(numConfirmations uint64) Config {
ReceiptQueryInterval: 50 * time.Millisecond, ReceiptQueryInterval: 50 * time.Millisecond,
NumConfirmations: numConfirmations, NumConfirmations: numConfirmations,
SafeAbortNonceTooLowCount: 3, SafeAbortNonceTooLowCount: 3,
TxNotInMempoolTimeout: 1 * time.Hour,
Signer: func(ctx context.Context, from common.Address, tx *types.Transaction) (*types.Transaction, error) { Signer: func(ctx context.Context, from common.Address, tx *types.Transaction) (*types.Transaction, error) {
return tx, nil return tx, nil
}, },
...@@ -530,7 +536,7 @@ func TestWaitMinedReturnsReceiptOnFirstSuccess(t *testing.T) { ...@@ -530,7 +536,7 @@ func TestWaitMinedReturnsReceiptOnFirstSuccess(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
receipt, err := h.mgr.waitMined(ctx, tx, nil) receipt, err := h.mgr.waitMined(ctx, tx, testSendState())
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, receipt) require.NotNil(t, receipt)
require.Equal(t, receipt.TxHash, txHash) require.Equal(t, receipt.TxHash, txHash)
...@@ -549,7 +555,7 @@ func TestWaitMinedCanBeCanceled(t *testing.T) { ...@@ -549,7 +555,7 @@ func TestWaitMinedCanBeCanceled(t *testing.T) {
// Create an unimined tx. // Create an unimined tx.
tx := types.NewTx(&types.LegacyTx{}) tx := types.NewTx(&types.LegacyTx{})
receipt, err := h.mgr.waitMined(ctx, tx, nil) receipt, err := h.mgr.waitMined(ctx, tx, NewSendState(10, time.Hour))
require.Equal(t, err, context.DeadlineExceeded) require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt) require.Nil(t, receipt)
} }
...@@ -570,7 +576,7 @@ func TestWaitMinedMultipleConfs(t *testing.T) { ...@@ -570,7 +576,7 @@ func TestWaitMinedMultipleConfs(t *testing.T) {
txHash := tx.Hash() txHash := tx.Hash()
h.backend.mine(&txHash, new(big.Int)) h.backend.mine(&txHash, new(big.Int))
receipt, err := h.mgr.waitMined(ctx, tx, nil) receipt, err := h.mgr.waitMined(ctx, tx, NewSendState(10, time.Hour))
require.Equal(t, err, context.DeadlineExceeded) require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt) require.Nil(t, receipt)
...@@ -579,7 +585,7 @@ func TestWaitMinedMultipleConfs(t *testing.T) { ...@@ -579,7 +585,7 @@ func TestWaitMinedMultipleConfs(t *testing.T) {
// Mine an empty block, tx should now be confirmed. // Mine an empty block, tx should now be confirmed.
h.backend.mine(nil, nil) h.backend.mine(nil, nil)
receipt, err = h.mgr.waitMined(ctx, tx, nil) receipt, err = h.mgr.waitMined(ctx, tx, NewSendState(10, time.Hour))
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, receipt) require.NotNil(t, receipt)
require.Equal(t, txHash, receipt.TxHash) require.Equal(t, txHash, receipt.TxHash)
...@@ -692,7 +698,7 @@ func TestWaitMinedReturnsReceiptAfterFailure(t *testing.T) { ...@@ -692,7 +698,7 @@ func TestWaitMinedReturnsReceiptAfterFailure(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
receipt, err := mgr.waitMined(ctx, tx, nil) receipt, err := mgr.waitMined(ctx, tx, testSendState())
require.Nil(t, err) require.Nil(t, err)
require.NotNil(t, receipt) require.NotNil(t, receipt)
require.Equal(t, receipt.TxHash, txHash) require.Equal(t, receipt.TxHash, txHash)
...@@ -724,8 +730,7 @@ func doGasPriceIncrease(t *testing.T, txTipCap, txFeeCap, newTip, newBaseFee int ...@@ -724,8 +730,7 @@ func doGasPriceIncrease(t *testing.T, txTipCap, txFeeCap, newTip, newBaseFee int
GasTipCap: big.NewInt(txTipCap), GasTipCap: big.NewInt(txTipCap),
GasFeeCap: big.NewInt(txFeeCap), GasFeeCap: big.NewInt(txFeeCap),
}) })
newTx, err := mgr.increaseGasPrice(context.Background(), tx) newTx := mgr.increaseGasPrice(context.Background(), tx)
require.NoError(t, err)
return tx, newTx return tx, newTx
} }
...@@ -831,11 +836,32 @@ func TestIncreaseGasPriceNotExponential(t *testing.T) { ...@@ -831,11 +836,32 @@ func TestIncreaseGasPriceNotExponential(t *testing.T) {
// Run IncreaseGasPrice a bunch of times in a row to simulate a very fast resubmit loop. // Run IncreaseGasPrice a bunch of times in a row to simulate a very fast resubmit loop.
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
ctx := context.Background() ctx := context.Background()
newTx, err := mgr.increaseGasPrice(ctx, tx) newTx := mgr.increaseGasPrice(ctx, tx)
require.NoError(t, err)
require.True(t, newTx.GasFeeCap().Cmp(feeCap) == 0, "new tx fee cap must be equal L1") require.True(t, newTx.GasFeeCap().Cmp(feeCap) == 0, "new tx fee cap must be equal L1")
require.True(t, newTx.GasTipCap().Cmp(borkedBackend.gasTip) == 0, "new tx tip must be equal L1") require.True(t, newTx.GasTipCap().Cmp(borkedBackend.gasTip) == 0, "new tx tip must be equal L1")
tx = newTx tx = newTx
} }
} }
func TestErrStringMatch(t *testing.T) {
tests := []struct {
err error
target error
match bool
}{
{err: nil, target: nil, match: true},
{err: errors.New("exists"), target: nil, match: false},
{err: nil, target: errors.New("exists"), match: false},
{err: errors.New("exact match"), target: errors.New("exact match"), match: true},
{err: errors.New("partial: match"), target: errors.New("match"), match: true},
}
for i, test := range tests {
i := i
test := test
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, test.match, errStringMatch(test.err, test.target))
})
}
}
...@@ -123,7 +123,6 @@ services: ...@@ -123,7 +123,6 @@ services:
OP_BATCHER_L1_ETH_RPC: http://l1:8545 OP_BATCHER_L1_ETH_RPC: http://l1:8545
OP_BATCHER_L2_ETH_RPC: http://l2:8545 OP_BATCHER_L2_ETH_RPC: http://l2:8545
OP_BATCHER_ROLLUP_RPC: http://op-node:8545 OP_BATCHER_ROLLUP_RPC: http://op-node:8545
TX_MANAGER_TIMEOUT: 10m
OFFLINE_GAS_ESTIMATION: false OFFLINE_GAS_ESTIMATION: false
OP_BATCHER_MAX_CHANNEL_DURATION: 1 OP_BATCHER_MAX_CHANNEL_DURATION: 1
OP_BATCHER_MAX_L1_TX_SIZE_BYTES: 120000 OP_BATCHER_MAX_L1_TX_SIZE_BYTES: 120000
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment