Commit 5097797e authored by Conner Fromknecht's avatar Conner Fromknecht

feat: add batch-submitter/txmgr for tx gas bumping

This commit adds a SimpleTxManager for tracking and bumping fees on txs
the batch submitter needs to publish. The bulk of the logic is adapated
from the existing tx manager (YNATM) used in the typescript version to
minimize any new classes of bugs that are not already considered.

The manager is configured via a min and max gas price, as well as an
additive gas price step that is applied after each resubmission interval
elapses, before signing and broadcasting a new transaction. This
corresponds to the LINEAR fee policy available in YNATM.

Txs generated from the same call to Send are treated as equivalent, thus
the method blocks until the first tx confirms. Care is taken to
throughly unit test the interactions and edge cases, as subtle bugs in
tx publication can lead to big headaches in prod. To this end, we
achieve 100% test coverage in the txmgr package:

```
coverage: 100.0% of statements
ok  	github.com/ethereum-optimism/go/batch-submitter/txmgr	10.311s
```
parent f7380e17
package txmgr
import (
"context"
"errors"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// ErrPublishTimeout signals that the tx manager did not receive a confirmation
// for a given tx after publishing with the maximum gas price and waiting out a
// resubmission timeout.
var ErrPublishTimeout = errors.New("failed to publish tx with max gas price")
// SendTxFunc defines a function signature for publishing a desired tx with a
// specific gas price. Implementations of this signature should also return
// promptly when the context is canceled.
type SendTxFunc = func(
ctx context.Context, gasPrice *big.Int) (*types.Transaction, error)
// Config houses parameters for altering the behavior of a SimpleTxManager.
type Config struct {
// MinGasPrice is the minimum gas price (in gwei). This is used as the
// initial publication attempt.
MinGasPrice *big.Int
// MaxGasPrice is the maximum gas price (in gwei). This is used to clamp
// the upper end of the range that the TxManager will ever publish when
// attempting to confirm a transaction.
MaxGasPrice *big.Int
// GasRetryIncrement is the additive gas price (in gwei) that will be
// used to bump each successive tx after a ResubmissionTimeout has
// elapsed.
GasRetryIncrement *big.Int
// ResubmissionTimeout is the interval at which, if no previously
// published transaction has been mined, the new tx with a bumped gas
// price will be published. Only one publication at MaxGasPrice will be
// attempted.
ResubmissionTimeout time.Duration
// RequireQueryInterval is the interval at which the tx manager will
// query the backend to check for confirmations after a tx at a
// specific gas price has been published.
ReceiptQueryInterval time.Duration
}
// TxManager is an interface that allows callers to reliably publish txs,
// bumping the gas price if needed, and obtain the receipt of the resulting tx.
type TxManager interface {
// Send is used to publish a transaction with incrementally higher gas
// prices until the transaction eventually confirms. This method blocks
// until an invocation of sendTx returns (called with differing gas
// prices). The method may be canceled using the passed context.
//
// NOTE: Send should be called by AT MOST one caller at a time.
Send(ctx context.Context, sendTx SendTxFunc) (*types.Receipt, error)
}
// ReceiptSource is a minimal function signature used to detect the confirmation
// of published txs.
//
// NOTE: This is a subset of bind.DeployBackend.
type ReceiptSource interface {
// TransactionReceipt queries the backend for a receipt associated with
// txHash. If lookup does not fail, but the transaction is not found,
// nil should be returned for both values.
TransactionReceipt(
ctx context.Context, txHash common.Hash) (*types.Receipt, error)
}
// SimpleTxManager is a implementation of TxManager that performs linear fee
// bumping of a tx until it confirms.
type SimpleTxManager struct {
cfg Config
backend ReceiptSource
}
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
func NewSimpleTxManager(cfg Config, backend ReceiptSource) *SimpleTxManager {
return &SimpleTxManager{
cfg: cfg,
backend: backend,
}
}
// Send is used to publish a transaction with incrementally higher gas prices
// until the transaction eventually confirms. This method blocks until an
// invocation of sendTx returns (called with differing gas prices). The method
// may be canceled using the passed context.
//
// NOTE: Send should be called by AT MOST one caller at a time.
func (m *SimpleTxManager) Send(
ctx context.Context, sendTx SendTxFunc) (*types.Receipt, error) {
// Initialize a wait group to track any spawned goroutines, and ensure
// we properly clean up any dangling resources this method generates.
// We assert that this is the case thoroughly in our unit tests.
var wg sync.WaitGroup
defer wg.Wait()
// Initialize a subcontext for the goroutines spawned in this process.
// The defer to cancel is done here (in reverse order of Wait) so that
// the goroutines can exit before blocking on the wait group.
ctxc, cancel := context.WithCancel(ctx)
defer cancel()
// Create a closure that will block on passed sendTx function in the
// background, returning the first successfully mined receipt back to
// the main event loop via receiptChan.
receiptChan := make(chan *types.Receipt, 1)
sendTxAsync := func(gasPrice *big.Int) {
defer wg.Done()
// Sign and publish transaction with current gas price.
tx, err := sendTx(ctxc, gasPrice)
if err != nil {
log.Error("Unable to publish transaction",
"gas_price", gasPrice, "err", err)
// TODO(conner): add retry?
return
}
txHash := tx.Hash()
log.Info("Transaction published successfully", "hash", txHash,
"gas_price", gasPrice)
// Wait for the transaction to be mined, reporting the receipt
// back to the main event loop if found.
receipt, err := WaitMined(
ctxc, m.backend, tx, m.cfg.ReceiptQueryInterval,
)
if err != nil {
log.Trace("Send tx failed", "hash", txHash,
"gas_price", gasPrice, "err", err)
}
if receipt != nil {
// Use non-blocking select to ensure function can exit
// if more than one receipt is discovered.
select {
case receiptChan <- receipt:
log.Trace("Send tx succeeded", "hash", txHash,
"gas_price", gasPrice)
default:
}
}
}
// Initialize our initial gas price to the configured minimum.
curGasPrice := new(big.Int).Set(m.cfg.MinGasPrice)
// Submit and wait for the receipt at our first gas price in the
// background, before entering the event loop and waiting out the
// resubmission timeout.
wg.Add(1)
go sendTxAsync(curGasPrice)
for {
select {
// Whenever a resubmission timeout has elapsed, bump the gas
// price and publish a new transaction.
case <-time.After(m.cfg.ResubmissionTimeout):
// If our last attempt published at the max gas price,
// return an error as we are unlikely to succeed in
// publishing. This also indicates that the max gas
// price should likely be adjusted higher for the
// daemon.
if curGasPrice.Cmp(m.cfg.MaxGasPrice) >= 0 {
return nil, ErrPublishTimeout
}
// Bump the gas price using linear gas price increments.
curGasPrice = NextGasPrice(
curGasPrice, m.cfg.GasRetryIncrement,
m.cfg.MaxGasPrice,
)
// Submit and wait for the bumped traction to confirm.
wg.Add(1)
go sendTxAsync(curGasPrice)
// The passed context has been canceled, i.e. in the event of a
// shutdown.
case <-ctxc.Done():
return nil, ctxc.Err()
// The transaction has confirmed.
case receipt := <-receiptChan:
return receipt, nil
}
}
}
// WaitMined blocks until the backend indicates confirmation of tx and returns
// the tx receipt. Queries are made every queryInterval, regardless of whether
// the backend returns an error. This method can be canceled using the passed
// context.
func WaitMined(
ctx context.Context,
backend ReceiptSource,
tx *types.Transaction,
queryInterval time.Duration,
) (*types.Receipt, error) {
queryTicker := time.NewTicker(queryInterval)
defer queryTicker.Stop()
txHash := tx.Hash()
for {
receipt, err := backend.TransactionReceipt(ctx, txHash)
if receipt != nil {
return receipt, nil
}
if err != nil {
log.Trace("Receipt retrievel failed", "hash", txHash,
"err", err)
} else {
log.Trace("Transaction not yet mined", "hash", txHash)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-queryTicker.C:
}
}
}
// NextGasPrice bumps the current gas price using an additive gasRetryIncrement,
// clamping the resulting value to maxGasPrice.
//
// NOTE: This method does not mutate curGasPrice, but instead returns a copy.
// This removes the possiblity of races occuring from goroutines sharing access
// to the same underlying big.Int.
func NextGasPrice(curGasPrice, gasRetryIncrement, maxGasPrice *big.Int) *big.Int {
nextGasPrice := new(big.Int).Set(curGasPrice)
nextGasPrice.Add(nextGasPrice, gasRetryIncrement)
if nextGasPrice.Cmp(maxGasPrice) == 1 {
nextGasPrice.Set(maxGasPrice)
}
return nextGasPrice
}
package txmgr_test
import (
"context"
"errors"
"math/big"
"sync"
"testing"
"time"
"github.com/ethereum-optimism/go/batch-submitter/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)
// TestNextGasPrice asserts that NextGasPrice properly bumps the passed current
// gas price, and clamps it to the max gas price. It also tests that
// NextGasPrice doesn't mutate the passed curGasPrice argument.
func TestNextGasPrice(t *testing.T) {
t.Parallel()
tests := []struct {
name string
curGasPrice *big.Int
gasRetryIncrement *big.Int
maxGasPrice *big.Int
expGasPrice *big.Int
}{
{
name: "increment below max",
curGasPrice: new(big.Int).SetUint64(5),
gasRetryIncrement: new(big.Int).SetUint64(10),
maxGasPrice: new(big.Int).SetUint64(20),
expGasPrice: new(big.Int).SetUint64(15),
},
{
name: "increment equal max",
curGasPrice: new(big.Int).SetUint64(5),
gasRetryIncrement: new(big.Int).SetUint64(10),
maxGasPrice: new(big.Int).SetUint64(15),
expGasPrice: new(big.Int).SetUint64(15),
},
{
name: "increment above max",
curGasPrice: new(big.Int).SetUint64(5),
gasRetryIncrement: new(big.Int).SetUint64(10),
maxGasPrice: new(big.Int).SetUint64(12),
expGasPrice: new(big.Int).SetUint64(12),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Copy curGasPrice, as we will later test for mutation.
curGasPrice := new(big.Int).Set(test.curGasPrice)
nextGasPrice := txmgr.NextGasPrice(
curGasPrice, test.gasRetryIncrement,
test.maxGasPrice,
)
require.Equal(t, nextGasPrice, test.expGasPrice)
// Ensure curGasPrice hasn't been mutated. This check
// enforces that NextGasPrice creates a copy internally.
// Failure to do so could result in gas price bumps
// being read concurrently from other goroutines, and
// introduce race conditions.
require.Equal(t, curGasPrice, test.curGasPrice)
})
}
}
// testHarness houses the necessary resources to test the SimpleTxManager.
type testHarness struct {
cfg txmgr.Config
mgr txmgr.TxManager
backend *mockBackend
}
// newTestHarnessWithConfig initializes a testHarness with a specific
// configuration.
func newTestHarnessWithConfig(cfg txmgr.Config) *testHarness {
backend := newMockBackend()
mgr := txmgr.NewSimpleTxManager(cfg, backend)
return &testHarness{
cfg: cfg,
mgr: mgr,
backend: backend,
}
}
// newTestHarness initializes a testHarness with a defualt configuration that is
// suitable for most tests.
func newTestHarness() *testHarness {
return newTestHarnessWithConfig(txmgr.Config{
MinGasPrice: new(big.Int).SetUint64(5),
MaxGasPrice: new(big.Int).SetUint64(50),
GasRetryIncrement: new(big.Int).SetUint64(5),
ResubmissionTimeout: time.Second,
ReceiptQueryInterval: 50 * time.Millisecond,
})
}
// mockBackend implements txmgr.ReceiptSource that tracks mined transactions
// along with the gas price used.
type mockBackend struct {
mu sync.RWMutex
// txHashMinedWithGasPrice tracks the has of a mined transaction to its
// gas price.
txHashMinedWithGasPrice map[common.Hash]*big.Int
}
// newMockBackend initializes a new mockBackend.
func newMockBackend() *mockBackend {
return &mockBackend{
txHashMinedWithGasPrice: make(map[common.Hash]*big.Int),
}
}
// mine records a (txHash, gasPrice) as confirmed. Subsequent calls to
// TransactionReceipt with a matching txHash will result in a non-nil receipt.
func (b *mockBackend) mine(txHash common.Hash, gasPrice *big.Int) {
b.mu.Lock()
defer b.mu.Unlock()
b.txHashMinedWithGasPrice[txHash] = gasPrice
}
// TransactionReceipt queries the mockBackend for a mined txHash. If none is
// found, nil is returned for both return values. Otherwise, it retruns a
// receipt containing the txHash and the gasPrice used in the GasUsed to make
// the value accessible from our test framework.
func (b *mockBackend) TransactionReceipt(
ctx context.Context,
txHash common.Hash,
) (*types.Receipt, error) {
b.mu.RLock()
defer b.mu.RUnlock()
gasPrice, ok := b.txHashMinedWithGasPrice[txHash]
if !ok {
return nil, nil
}
// Return the gas price for the transaction in the GasUsed field so that
// we can assert the proper tx confirmed in our tests.
return &types.Receipt{
TxHash: txHash,
GasUsed: gasPrice.Uint64(),
}, nil
}
// TestTxMgrConfirmAtMinGasPrice asserts that Send returns the min gas price tx
// if the tx is mined instantly.
func TestTxMgrConfirmAtMinGasPrice(t *testing.T) {
t.Parallel()
h := newTestHarness()
sendTxFunc := func(
ctx context.Context,
gasPrice *big.Int,
) (*types.Transaction, error) {
tx := types.NewTx(&types.LegacyTx{
GasPrice: gasPrice,
})
h.backend.mine(tx.Hash(), gasPrice)
return tx, nil
}
ctx := context.Background()
receipt, err := h.mgr.Send(ctx, sendTxFunc)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, receipt.GasUsed, h.cfg.MinGasPrice.Uint64())
}
// TestTxMgrNeverConfirmCancel asserts that a Send can be canceled even if no
// transaction is mined. This is done to ensure the the tx mgr can properly
// abort on shutdown, even if a txn is in the process of being published.
func TestTxMgrNeverConfirmCancel(t *testing.T) {
t.Parallel()
h := newTestHarness()
sendTxFunc := func(
ctx context.Context,
gasPrice *big.Int,
) (*types.Transaction, error) {
// Don't publish tx to backend, simulating never being mined.
return types.NewTx(&types.LegacyTx{
GasPrice: gasPrice,
}), nil
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
receipt, err := h.mgr.Send(ctx, sendTxFunc)
require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt)
}
// TestTxMgrConfirmsAtMaxGasPrice asserts that Send properly returns the max gas
// price receipt if none of the lower gas price txs were mined.
func TestTxMgrConfirmsAtMaxGasPrice(t *testing.T) {
t.Parallel()
h := newTestHarness()
sendTxFunc := func(
ctx context.Context,
gasPrice *big.Int,
) (*types.Transaction, error) {
tx := types.NewTx(&types.LegacyTx{
GasPrice: gasPrice,
})
if gasPrice.Cmp(h.cfg.MaxGasPrice) == 0 {
h.backend.mine(tx.Hash(), gasPrice)
}
return tx, nil
}
ctx := context.Background()
receipt, err := h.mgr.Send(ctx, sendTxFunc)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, receipt.GasUsed, h.cfg.MaxGasPrice.Uint64())
}
// TestTxMgrConfirmsAtMaxGasPriceDelayed asserts that after the maximum gas
// price tx has been published, and a resubmission timeout has elapsed, that an
// error is returned signaling that even our max gas price is taking too long.
func TestTxMgrConfirmsAtMaxGasPriceDelayed(t *testing.T) {
t.Parallel()
h := newTestHarness()
sendTxFunc := func(
ctx context.Context,
gasPrice *big.Int,
) (*types.Transaction, error) {
tx := types.NewTx(&types.LegacyTx{
GasPrice: gasPrice,
})
// Delay mining of the max gas price tx by more than the
// resubmission timeout. Default config uses 1 second. Send
// should still return an error beforehand.
if gasPrice.Cmp(h.cfg.MaxGasPrice) == 0 {
time.AfterFunc(2*time.Second, func() {
h.backend.mine(tx.Hash(), gasPrice)
})
}
return tx, nil
}
ctx := context.Background()
receipt, err := h.mgr.Send(ctx, sendTxFunc)
require.Equal(t, err, txmgr.ErrPublishTimeout)
require.Nil(t, receipt)
}
// errRpcFailure is a sentinel error used in testing to fail publications.
var errRpcFailure = errors.New("rpc failure")
// TestTxMgrBlocksOnFailingRpcCalls asserts that if all of the publication
// attempts fail due to rpc failures, that the tx manager will return
// txmgr.ErrPublishTimeout.
func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) {
t.Parallel()
h := newTestHarness()
sendTxFunc := func(
ctx context.Context,
gasPrice *big.Int,
) (*types.Transaction, error) {
return nil, errRpcFailure
}
ctx := context.Background()
receipt, err := h.mgr.Send(ctx, sendTxFunc)
require.Equal(t, err, txmgr.ErrPublishTimeout)
require.Nil(t, receipt)
}
// TestTxMgrOnlyOnePublicationSucceeds asserts that the tx manager will return a
// receipt so long as at least one of the publications is able to succeed with a
// simulated rpc failure.
func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) {
t.Parallel()
h := newTestHarness()
sendTxFunc := func(
ctx context.Context,
gasPrice *big.Int,
) (*types.Transaction, error) {
// Fail all but the final attempt.
if gasPrice.Cmp(h.cfg.MaxGasPrice) != 0 {
return nil, errRpcFailure
}
tx := types.NewTx(&types.LegacyTx{
GasPrice: gasPrice,
})
h.backend.mine(tx.Hash(), gasPrice)
return tx, nil
}
ctx := context.Background()
receipt, err := h.mgr.Send(ctx, sendTxFunc)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, receipt.GasUsed, h.cfg.MaxGasPrice.Uint64())
}
// TestTxMgrConfirmsMinGasPriceAfterBumping delays the mining of the initial tx
// with the minimum gas price, and asserts that it's receipt is returned even
// though if the gas price has been bumped in other goroutines.
func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) {
t.Parallel()
h := newTestHarness()
sendTxFunc := func(
ctx context.Context,
gasPrice *big.Int,
) (*types.Transaction, error) {
tx := types.NewTx(&types.LegacyTx{
GasPrice: gasPrice,
})
// Delay mining the tx with the min gas price.
if gasPrice.Cmp(h.cfg.MinGasPrice) == 0 {
time.AfterFunc(5*time.Second, func() {
h.backend.mine(tx.Hash(), gasPrice)
})
}
return tx, nil
}
ctx := context.Background()
receipt, err := h.mgr.Send(ctx, sendTxFunc)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, receipt.GasUsed, h.cfg.MinGasPrice.Uint64())
}
// TestWaitMinedReturnsReceiptOnFirstSuccess insta-mines a transaction and
// asserts that WaitMined returns the appropriate receipt.
func TestWaitMinedReturnsReceiptOnFirstSuccess(t *testing.T) {
t.Parallel()
h := newTestHarness()
// Create a tx and mine it immediately using the default backend.
tx := types.NewTx(&types.LegacyTx{})
txHash := tx.Hash()
h.backend.mine(txHash, new(big.Int))
ctx := context.Background()
receipt, err := txmgr.WaitMined(ctx, h.backend, tx, 50*time.Millisecond)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, receipt.TxHash, txHash)
}
// TestWaitMinedCanBeCanceled ensures that WaitMined exits of the passed context
// is canceled before a receipt is found.
func TestWaitMinedCanBeCanceled(t *testing.T) {
t.Parallel()
h := newTestHarness()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create an unimined tx.
tx := types.NewTx(&types.LegacyTx{})
receipt, err := txmgr.WaitMined(ctx, h.backend, tx, 50*time.Millisecond)
require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt)
}
// failingBackend implements txmgr.ReceiptSource, returning a failure on the
// first call but a success on the second call. This allows us to test that the
// inner loop of WaitMined properly handles this case.
type failingBackend struct {
returnSuccess bool
}
// TransactionReceipt for the failingBackend returns errRpcFailure on the first
// invocation, and a receipt containing the passed TxHash on the second.
func (b *failingBackend) TransactionReceipt(
ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
if !b.returnSuccess {
b.returnSuccess = true
return nil, errRpcFailure
}
return &types.Receipt{
TxHash: txHash,
}, nil
}
// TestWaitMinedReturnsReceiptAfterFailure asserts that WaitMined is able to
// recover from failed calls to the backend. It uses the failedBackend to
// simulate an rpc call failure, followed by the successful return of a receipt.
func TestWaitMinedReturnsReceiptAfterFailure(t *testing.T) {
t.Parallel()
var borkedBackend failingBackend
// Don't mine the tx with the default backend. The failingBackend will
// return the txHash on the second call.
tx := types.NewTx(&types.LegacyTx{})
txHash := tx.Hash()
ctx := context.Background()
receipt, err := txmgr.WaitMined(ctx, &borkedBackend, tx, 50*time.Millisecond)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, receipt.TxHash, txHash)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment