Commit 752a2d1f authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Merge pull request #2165 from cfromknecht/bss-txmgr-mine-then-abort

fix: edge case in go/txmgr not waiting for numConfs
parents 99c40279 0b2ead8f
---
'@eth-optimism/batch-submitter-service': patch
---
Fixes a bug that causes the txmgr to not wait for the configured numConfirmations
......@@ -108,9 +108,10 @@ func Main(gitVersion string) func(ctx *cli.Context) error {
}
txManagerConfig := txmgr.Config{
ResubmissionTimeout: cfg.ResubmissionTimeout,
ReceiptQueryInterval: time.Second,
NumConfirmations: cfg.NumConfirmations,
ResubmissionTimeout: cfg.ResubmissionTimeout,
ReceiptQueryInterval: time.Second,
NumConfirmations: cfg.NumConfirmations,
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
}
var services []*bsscore.Service
......
......@@ -89,6 +89,11 @@ type Config struct {
// appending new batches.
NumConfirmations uint64
// SafeAbortNonceTooLowCount is the number of ErrNonceTooLowObservations
// required to give up on a tx at a particular nonce without receiving
// confirmation.
SafeAbortNonceTooLowCount uint64
// ResubmissionTimeout is time we will wait before resubmitting a
// transaction.
ResubmissionTimeout time.Duration
......@@ -178,22 +183,23 @@ type Config struct {
func NewConfig(ctx *cli.Context) (Config, error) {
cfg := Config{
/* Required Flags */
BuildEnv: ctx.GlobalString(flags.BuildEnvFlag.Name),
EthNetworkName: ctx.GlobalString(flags.EthNetworkNameFlag.Name),
L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name),
CTCAddress: ctx.GlobalString(flags.CTCAddressFlag.Name),
SCCAddress: ctx.GlobalString(flags.SCCAddressFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeFlag.Name),
MaxBatchSubmissionTime: ctx.GlobalDuration(flags.MaxBatchSubmissionTimeFlag.Name),
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
NumConfirmations: ctx.GlobalUint64(flags.NumConfirmationsFlag.Name),
ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name),
FinalityConfirmations: ctx.GlobalUint64(flags.FinalityConfirmationsFlag.Name),
RunTxBatchSubmitter: ctx.GlobalBool(flags.RunTxBatchSubmitterFlag.Name),
RunStateBatchSubmitter: ctx.GlobalBool(flags.RunStateBatchSubmitterFlag.Name),
SafeMinimumEtherBalance: ctx.GlobalUint64(flags.SafeMinimumEtherBalanceFlag.Name),
ClearPendingTxs: ctx.GlobalBool(flags.ClearPendingTxsFlag.Name),
BuildEnv: ctx.GlobalString(flags.BuildEnvFlag.Name),
EthNetworkName: ctx.GlobalString(flags.EthNetworkNameFlag.Name),
L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name),
CTCAddress: ctx.GlobalString(flags.CTCAddressFlag.Name),
SCCAddress: ctx.GlobalString(flags.SCCAddressFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeFlag.Name),
MaxBatchSubmissionTime: ctx.GlobalDuration(flags.MaxBatchSubmissionTimeFlag.Name),
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
NumConfirmations: ctx.GlobalUint64(flags.NumConfirmationsFlag.Name),
SafeAbortNonceTooLowCount: ctx.GlobalUint64(flags.SafeAbortNonceTooLowCountFlag.Name),
ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name),
FinalityConfirmations: ctx.GlobalUint64(flags.FinalityConfirmationsFlag.Name),
RunTxBatchSubmitter: ctx.GlobalBool(flags.RunTxBatchSubmitterFlag.Name),
RunStateBatchSubmitter: ctx.GlobalBool(flags.RunStateBatchSubmitterFlag.Name),
SafeMinimumEtherBalance: ctx.GlobalUint64(flags.SafeMinimumEtherBalanceFlag.Name),
ClearPendingTxs: ctx.GlobalBool(flags.ClearPendingTxsFlag.Name),
/* Optional Flags */
LogLevel: ctx.GlobalString(flags.LogLevelFlag.Name),
LogTerminal: ctx.GlobalBool(flags.LogTerminalFlag.Name),
......
......@@ -227,10 +227,11 @@ func (d *Driver) CraftBatchTx(
}
}
// SubmitBatchTx using the passed transaction as a template, signs and
// publishes the transaction unmodified apart from sampling the current gas
// price. The final transaction is returned to the caller.
func (d *Driver) SubmitBatchTx(
// UpdateGasPrice signs an otherwise identical txn to the one provided but with
// updated gas prices sampled from the existing network conditions.
//
// NOTE: Thie method SHOULD NOT publish the resulting transaction.
func (d *Driver) UpdateGasPrice(
ctx context.Context,
tx *types.Transaction,
) (*types.Transaction, error) {
......@@ -243,6 +244,7 @@ func (d *Driver) SubmitBatchTx(
}
opts.Context = ctx
opts.Nonce = new(big.Int).SetUint64(tx.Nonce())
opts.NoSend = true
finalTx, err := d.rawSccContract.RawTransact(opts, tx.Data())
switch {
......@@ -265,3 +267,12 @@ func (d *Driver) SubmitBatchTx(
return nil, err
}
}
// SendTransaction injects a signed transaction into the pending pool for
// execution.
func (d *Driver) SendTransaction(
ctx context.Context,
tx *types.Transaction,
) error {
return d.cfg.L1Client.SendTransaction(ctx, tx)
}
......@@ -257,10 +257,11 @@ func (d *Driver) CraftBatchTx(
}
}
// SubmitBatchTx using the passed transaction as a template, signs and publishes
// the transaction unmodified apart from sampling the current gas price. The
// final transaction is returned to the caller.
func (d *Driver) SubmitBatchTx(
// UpdateGasPrice signs an otherwise identical txn to the one provided but with
// updated gas prices sampled from the existing network conditions.
//
// NOTE: Thie method SHOULD NOT publish the resulting transaction.
func (d *Driver) UpdateGasPrice(
ctx context.Context,
tx *types.Transaction,
) (*types.Transaction, error) {
......@@ -273,6 +274,7 @@ func (d *Driver) SubmitBatchTx(
}
opts.Context = ctx
opts.Nonce = new(big.Int).SetUint64(tx.Nonce())
opts.NoSend = true
finalTx, err := d.rawCtcContract.RawTransact(opts, tx.Data())
switch {
......@@ -295,3 +297,12 @@ func (d *Driver) SubmitBatchTx(
return nil, err
}
}
// SendTransaction injects a signed transaction into the pending pool for
// execution.
func (d *Driver) SendTransaction(
ctx context.Context,
tx *types.Transaction,
) error {
return d.cfg.L1Client.SendTransaction(ctx, tx)
}
......@@ -80,6 +80,14 @@ var (
Required: true,
EnvVar: prefixEnvVar("NUM_CONFIRMATIONS"),
}
SafeAbortNonceTooLowCountFlag = cli.Uint64Flag{
Name: "safe-abort-nonce-too-low-count",
Usage: "Number of ErrNonceTooLow observations required to " +
"give up on a tx at a particular nonce without receiving " +
"confirmation",
Required: true,
EnvVar: prefixEnvVar("SAFE_ABORT_NONCE_TOO_LOW_COUNT"),
}
ResubmissionTimeoutFlag = cli.DurationFlag{
Name: "resubmission-timeout",
Usage: "Duration we will wait before resubmitting a " +
......@@ -221,6 +229,7 @@ var requiredFlags = []cli.Flag{
MaxBatchSubmissionTimeFlag,
PollIntervalFlag,
NumConfirmationsFlag,
SafeAbortNonceTooLowCountFlag,
ResubmissionTimeoutFlag,
FinalityConfirmationsFlag,
RunTxBatchSubmitterFlag,
......
......@@ -48,7 +48,7 @@ func ClearPendingTx(
// Construct the clearing transaction submission clousure that will attempt
// to send the a clearing transaction transaction at the given nonce and gas
// price.
sendTx := func(
updateGasPrice := func(
ctx context.Context,
) (*types.Transaction, error) {
log.Info(name+" clearing pending tx", "nonce", nonce)
......@@ -61,11 +61,16 @@ func ClearPendingTx(
"err", err)
return nil, err
}
txHash := signedTx.Hash()
gasTipCap := signedTx.GasTipCap()
gasFeeCap := signedTx.GasFeeCap()
err = l1Client.SendTransaction(ctx, signedTx)
return signedTx, nil
}
sendTx := func(ctx context.Context, tx *types.Transaction) error {
txHash := tx.Hash()
gasTipCap := tx.GasTipCap()
gasFeeCap := tx.GasFeeCap()
err := l1Client.SendTransaction(ctx, tx)
switch {
// Clearing transaction successfully confirmed.
......@@ -74,7 +79,7 @@ func ClearPendingTx(
"gasTipCap", gasTipCap, "gasFeeCap", gasFeeCap,
"txHash", txHash)
return signedTx, nil
return nil
// Getting a nonce too low error implies that a previous transaction in
// the mempool has confirmed and we should abort trying to publish at
......@@ -83,7 +88,7 @@ func ClearPendingTx(
log.Info(name + " transaction from previous restart confirmed, " +
"aborting mempool clearing")
cancel()
return nil, context.Canceled
return context.Canceled
// An unexpected error occurred. This also handles the case where the
// clearing transaction has not yet bested the gas price a prior
......@@ -94,11 +99,11 @@ func ClearPendingTx(
log.Error(name+" unable to submit clearing tx",
"nonce", nonce, "gasTipCap", gasTipCap, "gasFeeCap", gasFeeCap,
"txHash", txHash, "err", err)
return nil, err
return err
}
}
receipt, err := txMgr.Send(ctx, sendTx)
receipt, err := txMgr.Send(ctx, updateGasPrice, sendTx)
switch {
// If the current context is canceled, a prior transaction in the mempool
......
......@@ -204,9 +204,10 @@ func newClearPendingTxHarnessWithNumConfs(
l1Client := mock.NewL1Client(l1ClientConfig)
txMgr := txmgr.NewSimpleTxManager("test", txmgr.Config{
ResubmissionTimeout: time.Second,
ReceiptQueryInterval: 50 * time.Millisecond,
NumConfirmations: numConfirmations,
ResubmissionTimeout: time.Second,
ReceiptQueryInterval: 50 * time.Millisecond,
NumConfirmations: numConfirmations,
SafeAbortNonceTooLowCount: 3,
}, l1Client)
return &clearPendingTxHarness{
......
......@@ -54,13 +54,18 @@ type Driver interface {
start, end, nonce *big.Int,
) (*types.Transaction, error)
// SubmitBatchTx using the passed transaction as a template, signs and
// publishes the transaction unmodified apart from sampling the current gas
// price. The final transaction is returned to the caller.
SubmitBatchTx(
// UpdateGasPrice signs an otherwise identical txn to the one provided but
// with updated gas prices sampled from the existing network conditions.
//
// NOTE: Thie method SHOULD NOT publish the resulting transaction.
UpdateGasPrice(
ctx context.Context,
tx *types.Transaction,
) (*types.Transaction, error)
// SendTransaction injects a signed transaction into the pending pool for
// execution.
SendTransaction(ctx context.Context, tx *types.Transaction) error
}
type ServiceConfig struct {
......@@ -193,30 +198,19 @@ func (s *Service) eventLoop() {
// Construct the transaction submission clousure that will attempt
// to send the next transaction at the given nonce and gas price.
sendTx := func(ctx context.Context) (*types.Transaction, error) {
log.Info(name+" attempting batch tx", "start", start,
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
log.Info(name+" updating batch tx gas price", "start", start,
"end", end, "nonce", nonce)
tx, err := s.cfg.Driver.SubmitBatchTx(ctx, tx)
if err != nil {
return nil, err
}
log.Info(
name+" submitted batch tx",
"start", start,
"end", end,
"nonce", nonce,
"tx_hash", tx.Hash(),
)
return tx, nil
return s.cfg.Driver.UpdateGasPrice(ctx, tx)
}
// Wait until one of our submitted transactions confirms. If no
// receipt is received it's likely our gas price was too low.
batchConfirmationStart := time.Now()
receipt, err := s.txMgr.Send(s.ctx, sendTx)
receipt, err := s.txMgr.Send(
s.ctx, updateGasPrice, s.cfg.Driver.SendTransaction,
)
if err != nil {
log.Error(name+" unable to publish batch tx",
"err", err)
......
package txmgr
import (
"strings"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
)
// SendState tracks information about the publication state of a given txn. In
// this context, a txn may correspond to multiple different txn hashes due to
// varying gas prices, though we treat them all as the same logical txn. This
// struct is primarly used to determine whether or not the txmgr should abort a
// given txn and retry with a higher nonce.
type SendState struct {
minedTxs map[common.Hash]struct{}
nonceTooLowCount uint64
mu sync.RWMutex
safeAbortNonceTooLowCount uint64
}
// NewSendState parameterizes a new SendState from the passed
// safeAbortNonceTooLowCount.
func NewSendState(safeAbortNonceTooLowCount uint64) *SendState {
if safeAbortNonceTooLowCount == 0 {
panic("txmgr: safeAbortNonceTooLowCount cannot be zero")
}
return &SendState{
minedTxs: make(map[common.Hash]struct{}),
nonceTooLowCount: 0,
safeAbortNonceTooLowCount: safeAbortNonceTooLowCount,
}
}
// ProcessSendError should be invoked with the error returned for each
// publication. It is safe to call this method with nil or arbitrary errors.
// Currently it only acts on errors containing the ErrNonceTooLow message.
func (s *SendState) ProcessSendError(err error) {
// Nothing to do.
if err == nil {
return
}
// Only concerned with ErrNonceTooLow.
if !strings.Contains(err.Error(), core.ErrNonceTooLow.Error()) {
return
}
s.mu.Lock()
defer s.mu.Unlock()
// Record this nonce too low observation.
s.nonceTooLowCount++
}
// TxMined records that the txn with txnHash has been mined and is await
// confirmation. It is safe to call this function multiple times.
func (s *SendState) TxMined(txHash common.Hash) {
s.mu.Lock()
defer s.mu.Unlock()
s.minedTxs[txHash] = struct{}{}
}
// TxMined records that the txn with txnHash has not been mined or has been
// reorg'd out. It is safe to call this function multiple times.
func (s *SendState) TxNotMined(txHash common.Hash) {
s.mu.Lock()
defer s.mu.Unlock()
_, wasMined := s.minedTxs[txHash]
delete(s.minedTxs, txHash)
// If the txn got reorged and left us with no mined txns, reset the nonce
// too low count, otherwise we might abort too soon when processing the next
// error. If the nonce too low errors persist, we want to ensure we wait out
// the full safe abort count to enesure we have a sufficient number of
// observations.
if len(s.minedTxs) == 0 && wasMined {
s.nonceTooLowCount = 0
}
}
// ShouldAbortImmediately returns true if the txmgr should give up on trying a
// given txn with the target nonce. For now, this only happens if we see an
// extended period of getting ErrNonceTooLow without having a txn mined.
func (s *SendState) ShouldAbortImmediately() bool {
s.mu.RLock()
defer s.mu.RUnlock()
// Never abort if our latest sample reports having at least one mined txn.
if len(s.minedTxs) > 0 {
return false
}
// Only abort if we've observed enough ErrNonceTooLow to meet our safe abort
// threshold.
return s.nonceTooLowCount >= s.safeAbortNonceTooLowCount
}
// IsWaitingForConfirmation returns true if we have at least one confirmation on
// one of our txs.
func (s *SendState) IsWaitingForConfirmation() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.minedTxs) > 0
}
package txmgr_test
import (
"errors"
"testing"
"github.com/ethereum-optimism/optimism/go/bss-core/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/stretchr/testify/require"
)
const testSafeAbortNonceTooLowCount = 3
var (
testHash = common.HexToHash("0x01")
)
func newSendState() *txmgr.SendState {
return txmgr.NewSendState(testSafeAbortNonceTooLowCount)
}
func processNSendErrors(sendState *txmgr.SendState, err error, n int) {
for i := 0; i < n; i++ {
sendState.ProcessSendError(err)
}
}
// TestSendStateNoAbortAfterInit asserts that the default SendState won't
// trigger an abort even after the safe abort interval has elapsed.
func TestSendStateNoAbortAfterInit(t *testing.T) {
sendState := newSendState()
require.False(t, sendState.ShouldAbortImmediately())
require.False(t, sendState.IsWaitingForConfirmation())
}
// TestSendStateNoAbortAfterProcessNilError asserts that nil errors are not
// considered for abort status.
func TestSendStateNoAbortAfterProcessNilError(t *testing.T) {
sendState := newSendState()
processNSendErrors(sendState, nil, testSafeAbortNonceTooLowCount)
require.False(t, sendState.ShouldAbortImmediately())
}
// TestSendStateNoAbortAfterProcessOtherError asserts that non-nil errors other
// than ErrNonceTooLow are not considered for abort status.
func TestSendStateNoAbortAfterProcessOtherError(t *testing.T) {
sendState := newSendState()
otherError := errors.New("other error")
processNSendErrors(sendState, otherError, testSafeAbortNonceTooLowCount)
require.False(t, sendState.ShouldAbortImmediately())
}
// TestSendStateAbortSafelyAfterNonceTooLowButNoTxMined asserts that we will
// abort after the safe abort interval has elapsed if we haven't mined a tx.
func TestSendStateAbortSafelyAfterNonceTooLowButNoTxMined(t *testing.T) {
sendState := newSendState()
sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
sendState.ProcessSendError(core.ErrNonceTooLow)
require.True(t, sendState.ShouldAbortImmediately())
}
// TestSendStateMiningTxCancelsAbort asserts that a tx getting mined after
// processing ErrNonceTooLow takes precedence and doesn't cause an abort.
func TestSendStateMiningTxCancelsAbort(t *testing.T) {
sendState := newSendState()
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.TxMined(testHash)
require.False(t, sendState.ShouldAbortImmediately())
sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
}
// TestSendStateReorgingTxResetsAbort asserts that unmining a tx does not
// consider ErrNonceTooLow's prior to being mined when determing whether to
// abort.
func TestSendStateReorgingTxResetsAbort(t *testing.T) {
sendState := newSendState()
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.TxMined(testHash)
sendState.TxNotMined(testHash)
sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
}
// TestSendStateNoAbortEvenIfNonceTooLowAfterTxMined asserts that we will not
// abort if we continue to get ErrNonceTooLow after a tx has been mined.
//
// NOTE: This is the most crucial role of the SendState, as we _expect_ to get
// ErrNonceTooLow failures after one of our txs has been mined, but that
// shouldn't cause us to not continue waiting for confirmations.
func TestSendStateNoAbortEvenIfNonceTooLowAfterTxMined(t *testing.T) {
sendState := newSendState()
sendState.TxMined(testHash)
processNSendErrors(
sendState, core.ErrNonceTooLow, testSafeAbortNonceTooLowCount,
)
require.False(t, sendState.ShouldAbortImmediately())
}
// TestSendStateSafeAbortIfNonceTooLowPersistsAfterUnmine asserts that we will
// correctly abort if we continue to get ErrNonceTooLow after a tx is unmined
// but not remined.
func TestSendStateSafeAbortIfNonceTooLowPersistsAfterUnmine(t *testing.T) {
sendState := newSendState()
sendState.TxMined(testHash)
sendState.TxNotMined(testHash)
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
sendState.ProcessSendError(core.ErrNonceTooLow)
require.True(t, sendState.ShouldAbortImmediately())
}
// TestSendStateSafeAbortWhileCallingNotMinedOnUnminedTx asserts that we will
// correctly abort if we continue to call TxNotMined on txns that haven't been
// mined.
func TestSendStateSafeAbortWhileCallingNotMinedOnUnminedTx(t *testing.T) {
sendState := newSendState()
processNSendErrors(
sendState, core.ErrNonceTooLow, testSafeAbortNonceTooLowCount,
)
sendState.TxNotMined(testHash)
require.True(t, sendState.ShouldAbortImmediately())
}
// TestSendStateIsWaitingForConfirmationAfterTxMined asserts that we are waiting
// for confirmation after a tx is mined.
func TestSendStateIsWaitingForConfirmationAfterTxMined(t *testing.T) {
sendState := newSendState()
testHash2 := common.HexToHash("0x02")
sendState.TxMined(testHash)
require.True(t, sendState.IsWaitingForConfirmation())
sendState.TxMined(testHash2)
require.True(t, sendState.IsWaitingForConfirmation())
}
// TestSendStateIsNotWaitingForConfirmationAfterTxUnmined asserts that we are
// not waiting for confirmation after a tx is mined then unmined.
func TestSendStateIsNotWaitingForConfirmationAfterTxUnmined(t *testing.T) {
sendState := newSendState()
sendState.TxMined(testHash)
sendState.TxNotMined(testHash)
require.False(t, sendState.IsWaitingForConfirmation())
}
......@@ -8,15 +8,16 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// SendTxFunc defines a function signature for publishing a desired tx with a
// specific gas price. Implementations of this signature should also return
// promptly when the context is canceled.
type SendTxFunc = func(ctx context.Context) (*types.Transaction, error)
// UpdateGasPriceSendTxFunc defines a function signature for publishing a
// desired tx with a specific gas price. Implementations of this signature
// should also return promptly when the context is canceled.
type UpdateGasPriceFunc = func(ctx context.Context) (*types.Transaction, error)
type SendTransactionFunc = func(ctx context.Context, tx *types.Transaction) error
// Config houses parameters for altering the behavior of a SimpleTxManager.
type Config struct {
......@@ -37,6 +38,11 @@ type Config struct {
// NumConfirmations specifies how many blocks are need to consider a
// transaction confirmed.
NumConfirmations uint64
// SafeAbortNonceTooLowCount specifies how many ErrNonceTooLow observations
// are required to give up on a tx at a particular nonce without receiving
// confirmation.
SafeAbortNonceTooLowCount uint64
}
// TxManager is an interface that allows callers to reliably publish txs,
......@@ -48,7 +54,11 @@ type TxManager interface {
// prices). The method may be canceled using the passed context.
//
// NOTE: Send should be called by AT MOST one caller at a time.
Send(ctx context.Context, sendTx SendTxFunc) (*types.Receipt, error)
Send(
ctx context.Context,
updateGasPrice UpdateGasPriceFunc,
sendTxn SendTransactionFunc,
) (*types.Receipt, error)
}
// ReceiptSource is a minimal function signature used to detect the confirmation
......@@ -96,7 +106,10 @@ func NewSimpleTxManager(
//
// NOTE: Send should be called by AT MOST one caller at a time.
func (m *SimpleTxManager) Send(
ctx context.Context, sendTx SendTxFunc) (*types.Receipt, error) {
ctx context.Context,
updateGasPrice UpdateGasPriceFunc,
sendTx SendTransactionFunc,
) (*types.Receipt, error) {
name := m.name
......@@ -112,6 +125,8 @@ func (m *SimpleTxManager) Send(
ctxc, cancel := context.WithCancel(ctx)
defer cancel()
sendState := NewSendState(m.cfg.SafeAbortNonceTooLowCount)
// Create a closure that will block on passed sendTx function in the
// background, returning the first successfully mined receipt back to
// the main event loop via receiptChan.
......@@ -119,36 +134,52 @@ func (m *SimpleTxManager) Send(
sendTxAsync := func() {
defer wg.Done()
tx, err := updateGasPrice(ctxc)
if err != nil {
if err == context.Canceled ||
strings.Contains(err.Error(), "context canceled") {
return
}
log.Error(name+" unable to update txn gas price", "err", err)
return
}
txHash := tx.Hash()
nonce := tx.Nonce()
gasTipCap := tx.GasTipCap()
gasFeeCap := tx.GasFeeCap()
log.Info(name+" publishing transaction", "txHash", txHash,
"nonce", nonce, "gasTipCap", gasTipCap, "gasFeeCap", gasFeeCap)
// Sign and publish transaction with current gas price.
tx, err := sendTx(ctxc)
err = sendTx(ctxc, tx)
sendState.ProcessSendError(err)
if err != nil {
if err == context.Canceled ||
strings.Contains(err.Error(), "context canceled") {
return
}
log.Error(name+" unable to publish transaction", "err", err)
if shouldAbortImmediately(err) {
if sendState.ShouldAbortImmediately() {
cancel()
}
// TODO(conner): add retry?
return
}
txHash := tx.Hash()
gasTipCap := tx.GasTipCap()
gasFeeCap := tx.GasFeeCap()
log.Info(name+" transaction published successfully", "hash", txHash,
"gasTipCap", gasTipCap, "gasFeeCap", gasFeeCap)
"nonce", nonce, "gasTipCap", gasTipCap, "gasFeeCap", gasFeeCap)
// Wait for the transaction to be mined, reporting the receipt
// back to the main event loop if found.
receipt, err := WaitMined(
receipt, err := waitMined(
ctxc, m.backend, tx, m.cfg.ReceiptQueryInterval,
m.cfg.NumConfirmations,
m.cfg.NumConfirmations, sendState,
)
if err != nil {
log.Debug(name+" send tx failed", "hash", txHash,
"gasTipCap", gasTipCap, "gasFeeCap", gasFeeCap, "err", err)
"nonce", nonce, "gasTipCap", gasTipCap, "gasFeeCap", gasFeeCap,
"err", err)
}
if receipt != nil {
// Use non-blocking select to ensure function can exit
......@@ -156,7 +187,8 @@ func (m *SimpleTxManager) Send(
select {
case receiptChan <- receipt:
log.Trace(name+" send tx succeeded", "hash", txHash,
"gasTipCap", gasTipCap, "gasFeeCap", gasFeeCap)
"nonce", nonce, "gasTipCap", gasTipCap,
"gasFeeCap", gasFeeCap)
default:
}
}
......@@ -174,6 +206,14 @@ func (m *SimpleTxManager) Send(
// Whenever a resubmission timeout has elapsed, bump the gas
// price and publish a new transaction.
case <-time.After(m.cfg.ResubmissionTimeout):
// Avoid republishing if we are waiting for confirmation on an
// existing tx. This is primarily an optimization to reduce the
// number of API calls we make, but also reduces the chances of
// getting a false postive reading for ShouldAbortImmediately.
if sendState.IsWaitingForConfirmation() {
continue
}
// Submit and wait for the bumped traction to confirm.
wg.Add(1)
go sendTxAsync()
......@@ -190,13 +230,6 @@ func (m *SimpleTxManager) Send(
}
}
// shouldAbortImmediately returns true if the txmgr should cancel all
// publication attempts and retry. For now, this only includes nonce errors, as
// that error indicates that none of the transactions will ever confirm.
func shouldAbortImmediately(err error) bool {
return strings.Contains(err.Error(), core.ErrNonceTooLow.Error())
}
// WaitMined blocks until the backend indicates confirmation of tx and returns
// the tx receipt. Queries are made every queryInterval, regardless of whether
// the backend returns an error. This method can be canceled using the passed
......@@ -208,6 +241,19 @@ func WaitMined(
queryInterval time.Duration,
numConfirmations uint64,
) (*types.Receipt, error) {
return waitMined(ctx, backend, tx, queryInterval, numConfirmations, nil)
}
// waitMined implements the core functionality of WaitMined, with the option to
// pass in a SendState to record whether or not the transaction is mined.
func waitMined(
ctx context.Context,
backend ReceiptSource,
tx *types.Transaction,
queryInterval time.Duration,
numConfirmations uint64,
sendState *SendState,
) (*types.Receipt, error) {
queryTicker := time.NewTicker(queryInterval)
defer queryTicker.Stop()
......@@ -218,6 +264,10 @@ func WaitMined(
receipt, err := backend.TransactionReceipt(ctx, txHash)
switch {
case receipt != nil:
if sendState != nil {
sendState.TxMined(txHash)
}
txHeight := receipt.BlockNumber.Uint64()
tipHeight, err := backend.BlockNumber(ctx)
if err != nil {
......@@ -252,6 +302,9 @@ func WaitMined(
"err", err)
default:
if sendState != nil {
sendState.TxNotMined(txHash)
}
log.Trace("Transaction not yet mined", "hash", txHash)
}
......
......@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/go/bss-core/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)
......@@ -44,9 +45,10 @@ func newTestHarness() *testHarness {
func configWithNumConfs(numConfirmations uint64) txmgr.Config {
return txmgr.Config{
ResubmissionTimeout: time.Second,
ReceiptQueryInterval: 50 * time.Millisecond,
NumConfirmations: numConfirmations,
ResubmissionTimeout: time.Second,
ReceiptQueryInterval: 50 * time.Millisecond,
NumConfirmations: numConfirmations,
SafeAbortNonceTooLowCount: 3,
}
}
......@@ -71,6 +73,10 @@ func (g *gasPricer) expGasFeeCap() *big.Int {
return gasFeeCap
}
func (g *gasPricer) shouldMine(gasFeeCap *big.Int) bool {
return g.expGasFeeCap().Cmp(gasFeeCap) == 0
}
func (g *gasPricer) feesForEpoch(epoch int64) (*big.Int, *big.Int) {
epochBaseFee := new(big.Int).Mul(g.baseBaseFee, big.NewInt(epoch))
epochGasTipCap := new(big.Int).Mul(g.baseGasTipFee, big.NewInt(epoch))
......@@ -79,15 +85,14 @@ func (g *gasPricer) feesForEpoch(epoch int64) (*big.Int, *big.Int) {
return epochGasTipCap, epochGasFeeCap
}
func (g *gasPricer) sample() (*big.Int, *big.Int, bool) {
func (g *gasPricer) sample() (*big.Int, *big.Int) {
g.mu.Lock()
defer g.mu.Unlock()
g.epoch++
epochGasTipCap, epochGasFeeCap := g.feesForEpoch(g.epoch)
shouldMine := g.epoch == g.mineAtEpoch
return epochGasTipCap, epochGasFeeCap, shouldMine
return epochGasTipCap, epochGasFeeCap
}
type minedTxInfo struct {
......@@ -171,23 +176,29 @@ func TestTxMgrConfirmAtMinGasPrice(t *testing.T) {
h := newTestHarness()
gasFeeCap := big.NewInt(5)
sendTxFunc := func(
ctx context.Context,
) (*types.Transaction, error) {
tx := types.NewTx(&types.DynamicFeeTx{
gasPricer := newGasPricer(1)
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
gasTipCap, gasFeeCap := gasPricer.sample()
return types.NewTx(&types.DynamicFeeTx{
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
})
txHash := tx.Hash()
h.backend.mine(&txHash, gasFeeCap)
return tx, nil
}), nil
}
sendTx := func(ctx context.Context, tx *types.Transaction) error {
if gasPricer.shouldMine(tx.GasFeeCap()) {
txHash := tx.Hash()
h.backend.mine(&txHash, tx.GasFeeCap())
}
return nil
}
ctx := context.Background()
receipt, err := h.mgr.Send(ctx, sendTxFunc)
receipt, err := h.mgr.Send(ctx, updateGasPrice, sendTx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, gasFeeCap.Uint64(), receipt.GasUsed)
require.Equal(t, gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
}
// TestTxMgrNeverConfirmCancel asserts that a Send can be canceled even if no
......@@ -198,19 +209,23 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) {
h := newTestHarness()
sendTxFunc := func(
ctx context.Context,
) (*types.Transaction, error) {
// Don't publish tx to backend, simulating never being mined.
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
gasTipCap, gasFeeCap := h.gasPricer.sample()
return types.NewTx(&types.DynamicFeeTx{
GasFeeCap: big.NewInt(5),
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
}), nil
}
sendTx := func(ctx context.Context, tx *types.Transaction) error {
// Don't publish tx to backend, simulating never being mined.
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
receipt, err := h.mgr.Send(ctx, sendTxFunc)
receipt, err := h.mgr.Send(ctx, updateGasPrice, sendTx)
require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt)
}
......@@ -222,23 +237,24 @@ func TestTxMgrConfirmsAtHigherGasPrice(t *testing.T) {
h := newTestHarness()
sendTxFunc := func(
ctx context.Context,
) (*types.Transaction, error) {
gasTipCap, gasFeeCap, shouldMine := h.gasPricer.sample()
tx := types.NewTx(&types.DynamicFeeTx{
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
gasTipCap, gasFeeCap := h.gasPricer.sample()
return types.NewTx(&types.DynamicFeeTx{
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
})
if shouldMine {
}), nil
}
sendTx := func(ctx context.Context, tx *types.Transaction) error {
if h.gasPricer.shouldMine(tx.GasFeeCap()) {
txHash := tx.Hash()
h.backend.mine(&txHash, gasFeeCap)
h.backend.mine(&txHash, tx.GasFeeCap())
}
return tx, nil
return nil
}
ctx := context.Background()
receipt, err := h.mgr.Send(ctx, sendTxFunc)
receipt, err := h.mgr.Send(ctx, updateGasPrice, sendTx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -255,16 +271,22 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) {
h := newTestHarness()
sendTxFunc := func(
ctx context.Context,
) (*types.Transaction, error) {
return nil, errRpcFailure
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
gasTipCap, gasFeeCap := h.gasPricer.sample()
return types.NewTx(&types.DynamicFeeTx{
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
}), nil
}
sendTx := func(ctx context.Context, tx *types.Transaction) error {
return errRpcFailure
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
receipt, err := h.mgr.Send(ctx, sendTxFunc)
receipt, err := h.mgr.Send(ctx, updateGasPrice, sendTx)
require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt)
}
......@@ -277,27 +299,27 @@ func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) {
h := newTestHarness()
sendTxFunc := func(
ctx context.Context,
) (*types.Transaction, error) {
gasTipCap, gasFeeCap, shouldMine := h.gasPricer.sample()
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
gasTipCap, gasFeeCap := h.gasPricer.sample()
return types.NewTx(&types.DynamicFeeTx{
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
}), nil
}
sendTx := func(ctx context.Context, tx *types.Transaction) error {
// Fail all but the final attempt.
if !shouldMine {
return nil, errRpcFailure
if !h.gasPricer.shouldMine(tx.GasFeeCap()) {
return errRpcFailure
}
tx := types.NewTx(&types.DynamicFeeTx{
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
})
txHash := tx.Hash()
h.backend.mine(&txHash, gasFeeCap)
return tx, nil
h.backend.mine(&txHash, tx.GasFeeCap())
return nil
}
ctx := context.Background()
receipt, err := h.mgr.Send(ctx, sendTxFunc)
receipt, err := h.mgr.Send(ctx, updateGasPrice, sendTx)
require.Nil(t, err)
require.NotNil(t, receipt)
......@@ -312,26 +334,72 @@ func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) {
h := newTestHarness()
sendTxFunc := func(
ctx context.Context,
) (*types.Transaction, error) {
gasTipCap, gasFeeCap, shouldMine := h.gasPricer.sample()
tx := types.NewTx(&types.DynamicFeeTx{
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
gasTipCap, gasFeeCap := h.gasPricer.sample()
return types.NewTx(&types.DynamicFeeTx{
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
})
}), nil
}
sendTx := func(ctx context.Context, tx *types.Transaction) error {
// Delay mining the tx with the min gas price.
if shouldMine {
if h.gasPricer.shouldMine(tx.GasFeeCap()) {
time.AfterFunc(5*time.Second, func() {
txHash := tx.Hash()
h.backend.mine(&txHash, gasFeeCap)
h.backend.mine(&txHash, tx.GasFeeCap())
})
}
return nil
}
ctx := context.Background()
receipt, err := h.mgr.Send(ctx, updateGasPrice, sendTx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
}
// TestTxMgrDoesntAbortNonceTooLowAfterMiningTx
func TestTxMgrDoesntAbortNonceTooLowAfterMiningTx(t *testing.T) {
t.Parallel()
h := newTestHarnessWithConfig(configWithNumConfs(2))
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
gasTipCap, gasFeeCap := h.gasPricer.sample()
return types.NewTx(&types.DynamicFeeTx{
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
}), nil
}
sendTx := func(ctx context.Context, tx *types.Transaction) error {
switch {
// If the txn's gas fee cap is less than the one we expect to mine,
// accept the txn to the mempool.
case tx.GasFeeCap().Cmp(h.gasPricer.expGasFeeCap()) < 0:
return nil
// Accept and mine the actual txn we expect to confirm.
case h.gasPricer.shouldMine(tx.GasFeeCap()):
txHash := tx.Hash()
h.backend.mine(&txHash, tx.GasFeeCap())
time.AfterFunc(5*time.Second, func() {
h.backend.mine(nil, nil)
})
return nil
// For gas prices greater than our expected, return ErrNonceTooLow since
// the prior txn confirmed and will invalidate subsequent publications.
default:
return core.ErrNonceTooLow
}
return tx, nil
}
ctx := context.Background()
receipt, err := h.mgr.Send(ctx, sendTxFunc)
receipt, err := h.mgr.Send(ctx, updateGasPrice, sendTx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......
......@@ -8,6 +8,7 @@ BATCH_SUBMITTER_MAX_L1_TX_SIZE=90000
BATCH_SUBMITTER_MAX_BATCH_SUBMISSION_TIME=0
BATCH_SUBMITTER_POLL_INTERVAL=500ms
BATCH_SUBMITTER_NUM_CONFIRMATIONS=1
BATCH_SUBMITTER_SAFE_ABORT_NONCE_TOO_LOW_COUNT=3
BATCH_SUBMITTER_RESUBMISSION_TIMEOUT=1s
BATCH_SUBMITTER_FINALITY_CONFIRMATIONS=0
BATCH_SUBMITTER_RUN_TX_BATCH_SUBMITTER=true
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment