Commit b5a91778 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

Merge pull request #4830 from ethereum-optimism/jg/txmgr_cleanup_pt2

txmgr: Simplify resubmit logic and scaffolding
parents eac9b3ae 3468c6af
...@@ -72,6 +72,8 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte ...@@ -72,6 +72,8 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte
ReceiptQueryInterval: time.Second, ReceiptQueryInterval: time.Second,
NumConfirmations: cfg.NumConfirmations, NumConfirmations: cfg.NumConfirmations,
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount, SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
From: fromAddress,
Signer: signer(rcfg.L1ChainID),
} }
batcherCfg := Config{ batcherCfg := Config{
......
...@@ -54,14 +54,10 @@ func (t *TransactionManager) SendTransaction(ctx context.Context, data []byte) ( ...@@ -54,14 +54,10 @@ func (t *TransactionManager) SendTransaction(ctx context.Context, data []byte) (
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create tx: %w", err) return nil, fmt.Errorf("failed to create tx: %w", err)
} }
// Construct a closure that will update the txn with the current gas prices.
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
return t.UpdateGasPrice(ctx, tx)
}
ctx, cancel := context.WithTimeout(ctx, 100*time.Second) // TODO: Select a timeout that makes sense here. ctx, cancel := context.WithTimeout(ctx, 100*time.Second) // TODO: Select a timeout that makes sense here.
defer cancel() defer cancel()
if receipt, err := t.txMgr.Send(ctx, updateGasPrice, t.l1Client.SendTransaction); err != nil { if receipt, err := t.txMgr.Send(ctx, tx); err != nil {
t.log.Warn("unable to publish tx", "err", err, "data_size", len(data)) t.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
return nil, err return nil, err
} else { } else {
...@@ -135,29 +131,3 @@ func (t *TransactionManager) CraftTx(ctx context.Context, data []byte) (*types.T ...@@ -135,29 +131,3 @@ func (t *TransactionManager) CraftTx(ctx context.Context, data []byte) (*types.T
tx := types.NewTx(rawTx) tx := types.NewTx(rawTx)
return t.signerFn(ctx, t.senderAddress, tx) return t.signerFn(ctx, t.senderAddress, tx)
} }
// UpdateGasPrice signs an otherwise identical txn to the one provided but with
// updated gas prices sampled from the existing network conditions.
//
// NOTE: This method SHOULD NOT publish the resulting transaction.
func (t *TransactionManager) UpdateGasPrice(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) {
gasTipCap, gasFeeCap, err := t.calcGasTipAndFeeCap(ctx)
if err != nil {
return nil, err
}
rawTx := &types.DynamicFeeTx{
ChainID: t.chainID,
Nonce: tx.Nonce(),
To: tx.To(),
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Gas: tx.Gas(),
Data: tx.Data(),
}
// Only log the new tip/fee cap because the updateGasPrice closure reuses the same initial transaction
t.log.Trace("updating gas price", "tip_cap", gasTipCap, "fee_cap", gasFeeCap)
finalTx := types.NewTx(rawTx)
return t.signerFn(ctx, t.senderAddress, finalTx)
}
...@@ -50,6 +50,8 @@ func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Cl ...@@ -50,6 +50,8 @@ func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Cl
ReceiptQueryInterval: time.Second, ReceiptQueryInterval: time.Second,
NumConfirmations: 1, NumConfirmations: 1,
SafeAbortNonceTooLowCount: 4, SafeAbortNonceTooLowCount: 4,
From: from,
// Signer is loaded in `proposer.NewL2OutputSubmitter`
}, },
L1Client: l1, L1Client: l1,
RollupClient: rollupCl, RollupClient: rollupCl,
...@@ -85,7 +87,9 @@ func (p *L2Proposer) ActMakeProposalTx(t Testing) { ...@@ -85,7 +87,9 @@ func (p *L2Proposer) ActMakeProposalTx(t Testing) {
tx, err := p.driver.CreateProposalTx(t.Ctx(), output) tx, err := p.driver.CreateProposalTx(t.Ctx(), output)
require.NoError(t, err) require.NoError(t, err)
err = p.driver.SendTransaction(t.Ctx(), tx) // Note: Use L1 instead of the output submitter's transaction manager because
// this is non-blocking while the txmgr is blocking & deadlocks the tests
err = p.l1.SendTransaction(t.Ctx(), tx)
require.NoError(t, err) require.NoError(t, err)
p.lastTx = tx.Hash() p.lastTx = tx.Hash()
......
...@@ -171,6 +171,7 @@ func NewL2OutputSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*L2OutputSu ...@@ -171,6 +171,7 @@ func NewL2OutputSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*L2OutputSu
ReceiptQueryInterval: time.Second, ReceiptQueryInterval: time.Second,
NumConfirmations: cfg.NumConfirmations, NumConfirmations: cfg.NumConfirmations,
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount, SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
From: fromAddress,
} }
proposerCfg := Config{ proposerCfg := Config{
...@@ -198,6 +199,8 @@ func NewL2OutputSubmitter(cfg Config, l log.Logger) (*L2OutputSubmitter, error) ...@@ -198,6 +199,8 @@ func NewL2OutputSubmitter(cfg Config, l log.Logger) (*L2OutputSubmitter, error)
cancel() cancel()
return nil, err return nil, err
} }
signer := cfg.SignerFnFactory(chainID)
cfg.TxManagerConfig.Signer = signer
l2ooContract, err := bindings.NewL2OutputOracle(cfg.L2OutputOracleAddr, cfg.L1Client) l2ooContract, err := bindings.NewL2OutputOracle(cfg.L2OutputOracleAddr, cfg.L1Client)
if err != nil { if err != nil {
...@@ -227,7 +230,7 @@ func NewL2OutputSubmitter(cfg Config, l log.Logger) (*L2OutputSubmitter, error) ...@@ -227,7 +230,7 @@ func NewL2OutputSubmitter(cfg Config, l log.Logger) (*L2OutputSubmitter, error)
allowNonFinalized: cfg.AllowNonFinalized, allowNonFinalized: cfg.AllowNonFinalized,
from: cfg.From, from: cfg.From,
signerFn: cfg.SignerFnFactory(chainID), signerFn: signer,
pollInterval: cfg.PollInterval, pollInterval: cfg.PollInterval,
}, nil }, nil
} }
...@@ -261,12 +264,6 @@ func (l *L2OutputSubmitter) UpdateGasPrice(ctx context.Context, tx *types.Transa ...@@ -261,12 +264,6 @@ func (l *L2OutputSubmitter) UpdateGasPrice(ctx context.Context, tx *types.Transa
return l.rawL2ooContract.RawTransact(opts, tx.Data()) return l.rawL2ooContract.RawTransact(opts, tx.Data())
} }
// SendTransaction injects a signed transaction into the pending pool for execution.
func (l *L2OutputSubmitter) SendTransaction(ctx context.Context, tx *types.Transaction) error {
l.log.Info("proposer sending transaction", "tx", tx.Hash())
return l.l1Client.SendTransaction(ctx, tx)
}
// FetchNextOutputInfo gets the block number of the next proposal. // FetchNextOutputInfo gets the block number of the next proposal.
// It returns: the next block number, if the proposal should be made, error // It returns: the next block number, if the proposal should be made, error
func (l *L2OutputSubmitter) FetchNextOutputInfo(ctx context.Context) (*eth.OutputResponse, bool, error) { func (l *L2OutputSubmitter) FetchNextOutputInfo(ctx context.Context) (*eth.OutputResponse, bool, error) {
...@@ -356,22 +353,15 @@ func (l *L2OutputSubmitter) CreateProposalTx(ctx context.Context, output *eth.Ou ...@@ -356,22 +353,15 @@ func (l *L2OutputSubmitter) CreateProposalTx(ctx context.Context, output *eth.Ou
return tx, nil return tx, nil
} }
// SendTransactionExt sends a transaction through the transaction manager which handles automatic // SendTransaction sends a transaction through the transaction manager which handles automatic
// price bumping. // price bumping.
// It also hardcodes a timeout of 100s. // It also hardcodes a timeout of 100s.
func (l *L2OutputSubmitter) SendTransactionExt(ctx context.Context, tx *types.Transaction) error { func (l *L2OutputSubmitter) SendTransaction(ctx context.Context, tx *types.Transaction) error {
// Construct the closure that will update the txn with the current gas prices.
nonce := tx.Nonce()
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
l.log.Info("proposer updating batch tx gas price", "nonce", nonce)
return l.UpdateGasPrice(ctx, tx)
}
// Wait until one of our submitted transactions confirms. If no // Wait until one of our submitted transactions confirms. If no
// receipt is received it's likely our gas price was too low. // receipt is received it's likely our gas price was too low.
cCtx, cancel := context.WithTimeout(ctx, 100*time.Second) cCtx, cancel := context.WithTimeout(ctx, 100*time.Second)
defer cancel() defer cancel()
receipt, err := l.txMgr.Send(cCtx, updateGasPrice, l.SendTransaction) receipt, err := l.txMgr.Send(cCtx, tx)
if err != nil { if err != nil {
l.log.Error("proposer unable to publish tx", "err", err) l.log.Error("proposer unable to publish tx", "err", err)
return err return err
...@@ -411,7 +401,7 @@ func (l *L2OutputSubmitter) loop() { ...@@ -411,7 +401,7 @@ func (l *L2OutputSubmitter) loop() {
cancel() cancel()
break break
} }
if err := l.SendTransactionExt(cCtx, tx); err != nil { if err := l.SendTransaction(cCtx, tx); err != nil {
l.log.Error("Failed to send proposal transaction", "err", err) l.log.Error("Failed to send proposal transaction", "err", err)
cancel() cancel()
break break
......
...@@ -2,14 +2,17 @@ package txmgr ...@@ -2,14 +2,17 @@ package txmgr
import ( import (
"context" "context"
"errors"
"math/big" "math/big"
"strings"
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
) )
// UpdateGasPriceSendTxFunc defines a function signature for publishing a // UpdateGasPriceSendTxFunc defines a function signature for publishing a
...@@ -40,6 +43,10 @@ type Config struct { ...@@ -40,6 +43,10 @@ type Config struct {
// are required to give up on a tx at a particular nonce without receiving // are required to give up on a tx at a particular nonce without receiving
// confirmation. // confirmation.
SafeAbortNonceTooLowCount uint64 SafeAbortNonceTooLowCount uint64
// Signer is used to sign transactions when the gas price is increased.
Signer opcrypto.SignerFn
From common.Address
} }
// TxManager is an interface that allows callers to reliably publish txs, // TxManager is an interface that allows callers to reliably publish txs,
...@@ -50,15 +57,15 @@ type TxManager interface { ...@@ -50,15 +57,15 @@ type TxManager interface {
// until an invocation of sendTx returns (called with differing gas // until an invocation of sendTx returns (called with differing gas
// prices). The method may be canceled using the passed context. // prices). The method may be canceled using the passed context.
// //
// The initial transaction MUST be signed & ready to submit.
//
// NOTE: Send should be called by AT MOST one caller at a time. // NOTE: Send should be called by AT MOST one caller at a time.
Send(ctx context.Context, updateGasPrice UpdateGasPriceFunc, sendTxn SendTransactionFunc) (*types.Receipt, error) Send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error)
} }
// ReceiptSource is a minimal function signature used to detect the confirmation // ETHBackend is the set of methods that the transaction manager uses to resubmit gas & determine
// of published txs. // when transactions are included on L1.
// type ETHBackend interface {
// NOTE: This is a subset of bind.DeployBackend.
type ReceiptSource interface {
// BlockNumber returns the most recent block number. // BlockNumber returns the most recent block number.
BlockNumber(ctx context.Context) (uint64, error) BlockNumber(ctx context.Context) (uint64, error)
...@@ -66,6 +73,14 @@ type ReceiptSource interface { ...@@ -66,6 +73,14 @@ type ReceiptSource interface {
// txHash. If lookup does not fail, but the transaction is not found, // txHash. If lookup does not fail, but the transaction is not found,
// nil should be returned for both values. // nil should be returned for both values.
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
// SendTransaction submits a signed transaction to L1.
SendTransaction(ctx context.Context, tx *types.Transaction) error
// These functions are used to estimate what the basefee & priority fee should be set to.
// TODO(CLI-3318): Maybe need a generic interface to support different RPC providers
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
SuggestGasTipCap(ctx context.Context) (*big.Int, error)
} }
// SimpleTxManager is a implementation of TxManager that performs linear fee // SimpleTxManager is a implementation of TxManager that performs linear fee
...@@ -74,12 +89,55 @@ type SimpleTxManager struct { ...@@ -74,12 +89,55 @@ type SimpleTxManager struct {
Config // embed the config directly Config // embed the config directly
name string name string
backend ReceiptSource backend ETHBackend
l log.Logger l log.Logger
} }
// IncreaseGasPrice takes the previous transaction & potentially clones then signs it with a higher tip.
// If the basefee + priority fee did not increase by a minimum percent (geth's replacement percent) an
// error will be returned.
// We do not re-estimate the amount of gas used because for some stateful transactions (like output proposals) the
// act of including the transaction renders the repeat of the transaction invalid.
func (m *SimpleTxManager) IncreaseGasPrice(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var gasTipCap, gasFeeCap *big.Int
if tip, err := m.backend.SuggestGasTipCap(ctx); err != nil {
return nil, err
} else if tip == nil {
return nil, errors.New("the suggested tip was nil")
} else {
gasTipCap = tip
}
if head, err := m.backend.HeaderByNumber(ctx, nil); err != nil {
return nil, err
} else if head.BaseFee == nil {
return nil, errors.New("txmgr does not support pre-london blocks that do not have a basefee")
} else {
gasFeeCap = CalcGasFeeCap(head.BaseFee, gasTipCap)
}
// TODO (CLI-2630): Check for a large enough price bump
rawTx := &types.DynamicFeeTx{
ChainID: tx.ChainId(),
Nonce: tx.Nonce(),
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Gas: tx.Gas(),
To: tx.To(),
Value: tx.Value(),
Data: tx.Data(),
AccessList: tx.AccessList(),
}
return m.Signer(ctx, m.From, types.NewTx(rawTx))
}
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config. // NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
func NewSimpleTxManager(name string, l log.Logger, cfg Config, backend ReceiptSource) *SimpleTxManager { func NewSimpleTxManager(name string, l log.Logger, cfg Config, backend ETHBackend) *SimpleTxManager {
if cfg.NumConfirmations == 0 { if cfg.NumConfirmations == 0 {
panic("txmgr: NumConfirmations cannot be zero") panic("txmgr: NumConfirmations cannot be zero")
} }
...@@ -97,8 +155,12 @@ func NewSimpleTxManager(name string, l log.Logger, cfg Config, backend ReceiptSo ...@@ -97,8 +155,12 @@ func NewSimpleTxManager(name string, l log.Logger, cfg Config, backend ReceiptSo
// invocation of sendTx returns (called with differing gas prices). The method // invocation of sendTx returns (called with differing gas prices). The method
// may be canceled using the passed context. // may be canceled using the passed context.
// //
// The initially supplied transaction must be signed, have gas estimation done, and have a reasonable gas fee.
// When the transaction is resubmitted the tx manager will re-sign the transaction at a different gas pricing
// but retain the gas used, the nonce, and the data.
//
// NOTE: Send should be called by AT MOST one caller at a time. // NOTE: Send should be called by AT MOST one caller at a time.
func (m *SimpleTxManager) Send(ctx context.Context, updateGasPrice UpdateGasPriceFunc, sendTx SendTransactionFunc) (*types.Receipt, error) { func (m *SimpleTxManager) Send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
// Initialize a wait group to track any spawned goroutines, and ensure // Initialize a wait group to track any spawned goroutines, and ensure
// we properly clean up any dangling resources this method generates. // we properly clean up any dangling resources this method generates.
...@@ -114,22 +176,13 @@ func (m *SimpleTxManager) Send(ctx context.Context, updateGasPrice UpdateGasPric ...@@ -114,22 +176,13 @@ func (m *SimpleTxManager) Send(ctx context.Context, updateGasPrice UpdateGasPric
sendState := NewSendState(m.SafeAbortNonceTooLowCount) sendState := NewSendState(m.SafeAbortNonceTooLowCount)
// Create a closure that will block on passed sendTx function in the // Create a closure that will block on submitting the tx in the
// background, returning the first successfully mined receipt back to // background, returning the first successfully mined receipt back to
// the main event loop via receiptChan. // the main event loop via receiptChan.
receiptChan := make(chan *types.Receipt, 1) receiptChan := make(chan *types.Receipt, 1)
sendTxAsync := func() { sendTxAsync := func(tx *types.Transaction) {
defer wg.Done() defer wg.Done()
tx, err := updateGasPrice(ctx)
if err != nil {
if err == context.Canceled || strings.Contains(err.Error(), "context canceled") {
return
}
m.l.Error("unable to update txn gas price", "err", err)
return
}
txHash := tx.Hash() txHash := tx.Hash()
nonce := tx.Nonce() nonce := tx.Nonce()
gasTipCap := tx.GasTipCap() gasTipCap := tx.GasTipCap()
...@@ -137,12 +190,14 @@ func (m *SimpleTxManager) Send(ctx context.Context, updateGasPrice UpdateGasPric ...@@ -137,12 +190,14 @@ func (m *SimpleTxManager) Send(ctx context.Context, updateGasPrice UpdateGasPric
log := m.l.New("txHash", txHash, "nonce", nonce, "gasTipCap", gasTipCap, "gasFeeCap", gasFeeCap) log := m.l.New("txHash", txHash, "nonce", nonce, "gasTipCap", gasTipCap, "gasFeeCap", gasFeeCap)
log.Info("publishing transaction") log.Info("publishing transaction")
// Sign and publish transaction with current gas price. err := m.backend.SendTransaction(ctx, tx)
err = sendTx(ctx, tx)
sendState.ProcessSendError(err) sendState.ProcessSendError(err)
if err != nil { if err != nil {
if err == context.Canceled || if errors.Is(err, context.Canceled) {
strings.Contains(err.Error(), "context canceled") { return
}
if errors.Is(err, txpool.ErrAlreadyKnown) {
log.Info("resubmitted already known transaction")
return return
} }
log.Error("unable to publish transaction", "err", err) log.Error("unable to publish transaction", "err", err)
...@@ -177,7 +232,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, updateGasPrice UpdateGasPric ...@@ -177,7 +232,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, updateGasPrice UpdateGasPric
// background, before entering the event loop and waiting out the // background, before entering the event loop and waiting out the
// resubmission timeout. // resubmission timeout.
wg.Add(1) wg.Add(1)
go sendTxAsync() go sendTxAsync(tx)
ticker := time.NewTicker(m.ResubmissionTimeout) ticker := time.NewTicker(m.ResubmissionTimeout)
defer ticker.Stop() defer ticker.Stop()
...@@ -196,9 +251,17 @@ func (m *SimpleTxManager) Send(ctx context.Context, updateGasPrice UpdateGasPric ...@@ -196,9 +251,17 @@ func (m *SimpleTxManager) Send(ctx context.Context, updateGasPrice UpdateGasPric
continue continue
} }
// Submit and wait for the bumped traction to confirm. // Increase the gas price & submit the new transaction
newTx, err := m.IncreaseGasPrice(ctx, tx)
if err != nil {
m.l.Error("Failed to increase the gas price for the tx", "err", err)
// Don't `continue` here so we resubmit the transaction with the same gas price.
} else {
// Save the tx so we know it's gas price.
tx = newTx
}
wg.Add(1) wg.Add(1)
go sendTxAsync() go sendTxAsync(tx)
// The passed context has been canceled, i.e. in the event of a // The passed context has been canceled, i.e. in the event of a
// shutdown. // shutdown.
...@@ -235,7 +298,7 @@ func (m *SimpleTxManager) waitMined(ctx context.Context, tx *types.Transaction, ...@@ -235,7 +298,7 @@ func (m *SimpleTxManager) waitMined(ctx context.Context, tx *types.Transaction,
break break
} }
m.l.Trace("Transaction mined, checking confirmations", "txHash", txHash, "txHeight", txHeight, m.l.Debug("Transaction mined, checking confirmations", "txHash", txHash, "txHeight", txHeight,
"tipHeight", tipHeight, "numConfirmations", m.NumConfirmations) "tipHeight", tipHeight, "numConfirmations", m.NumConfirmations)
// The transaction is considered confirmed when // The transaction is considered confirmed when
...@@ -252,7 +315,7 @@ func (m *SimpleTxManager) waitMined(ctx context.Context, tx *types.Transaction, ...@@ -252,7 +315,7 @@ func (m *SimpleTxManager) waitMined(ctx context.Context, tx *types.Transaction,
// Safe to subtract since we know the LHS above is greater. // Safe to subtract since we know the LHS above is greater.
confsRemaining := (txHeight + m.NumConfirmations) - (tipHeight + 1) confsRemaining := (txHeight + m.NumConfirmations) - (tipHeight + 1)
m.l.Info("Transaction not yet confirmed", "txHash", txHash, "confsRemaining", confsRemaining) m.l.Debug("Transaction not yet confirmed", "txHash", txHash, "confsRemaining", confsRemaining)
case err != nil: case err != nil:
m.l.Trace("Receipt retrievel failed", "hash", txHash, "err", err) m.l.Trace("Receipt retrievel failed", "hash", txHash, "err", err)
...@@ -266,6 +329,7 @@ func (m *SimpleTxManager) waitMined(ctx context.Context, tx *types.Transaction, ...@@ -266,6 +329,7 @@ func (m *SimpleTxManager) waitMined(ctx context.Context, tx *types.Transaction,
select { select {
case <-ctx.Done(): case <-ctx.Done():
m.l.Warn("context cancelled in waitMined")
return nil, ctx.Err() return nil, ctx.Err()
case <-queryTicker.C: case <-queryTicker.C:
} }
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment