Commit 392b8872 authored by Michael de Hoog's avatar Michael de Hoog

Thread-safe transaction manager

parent 13d2bccf
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"
"math/big"
"strings"
......@@ -28,6 +29,8 @@ const priceBump int64 = 15
var priceBumpPercent = big.NewInt(100 + priceBump)
var oneHundred = big.NewInt(100)
var ResetErr = errors.New("transaction manager reset")
// TxManager is an interface that allows callers to reliably publish txs,
// bumping the gas price if needed, and obtain the receipt of the resulting tx.
//
......@@ -38,7 +41,7 @@ type TxManager interface {
// It can be stopped by cancelling the provided context; however, the transaction
// may be included on L1 even if the context is cancelled.
//
// NOTE: Send should be called by AT MOST one caller at a time.
// NOTE: Send can be called concurrently, the nonce will be managed internally.
Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error)
// From returns the sending address associated with the instance of the transaction manager.
......@@ -84,6 +87,13 @@ type SimpleTxManager struct {
backend ETHBackend
l log.Logger
metr metrics.TxMetricer
nonce *uint64
lock sync.RWMutex
wg sync.WaitGroup
resetC chan struct{}
resetWG sync.WaitGroup
resetting atomic.Bool
}
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
......@@ -100,6 +110,7 @@ func NewSimpleTxManager(name string, l log.Logger, m metrics.TxMetricer, cfg CLI
backend: conf.Backend,
l: l.New("service", name),
metr: m,
resetC: make(chan struct{}),
}, nil
}
......@@ -126,8 +137,52 @@ type TxCandidate struct {
// The transaction manager handles all signing. If and only if the gas limit is 0, the
// transaction manager will do a gas estimation.
//
// NOTE: Send should be called by AT MOST one caller at a time.
// NOTE: Send can be called concurrently, the nonce will be managed internally.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
m.resetWG.Wait() // wait for any current resets
receipt, err := m.send(ctx, candidate)
if err != nil {
m.reset()
}
return receipt, err
}
// reset the transaction manager. All currently pending transactions will receive
// a ResetErr error. This is called if any pending send returns an error.
func (m *SimpleTxManager) reset() {
m.resetWG.Add(1)
defer m.resetWG.Done()
if m.resetting.Swap(true) {
// already resetting
return
}
close(m.resetChannel())
m.wg.Wait()
m.lock.Lock()
defer m.lock.Unlock()
m.nonce = nil
m.resetC = make(chan struct{})
m.resetting.Store(false)
}
// resetChannel is a thread-safe getter for the channel that is closed upon
// transaction manager resets.
func (m *SimpleTxManager) resetChannel() chan struct{} {
m.lock.RLock()
defer m.lock.RUnlock()
return m.resetC
}
// send performs the actual transaction creation and sending.
func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
m.wg.Add(1)
defer m.wg.Done()
if m.cfg.TxSendTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
......@@ -137,7 +192,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
if err != nil {
return nil, fmt.Errorf("failed to create the tx: %w", err)
}
return m.send(ctx, tx)
return m.sendTx(ctx, tx)
}
// craftTx creates the signed transaction
......@@ -153,15 +208,10 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
}
gasFeeCap := calcGasFeeCap(basefee, gasTipCap)
// Fetch the sender's nonce from the latest known block (nil `blockNumber`)
childCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
nonce, err := m.backend.NonceAt(childCtx, m.cfg.From, nil)
nonce, err := m.nextNonce(ctx)
if err != nil {
m.metr.RPCError()
return nil, fmt.Errorf("failed to get nonce: %w", err)
return nil, err
}
m.metr.RecordNonce(nonce)
rawTx := &types.DynamicFeeTx{
ChainID: m.chainID,
......@@ -192,14 +242,40 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
rawTx.Gas = gas
}
ctx, cancel = context.WithTimeout(ctx, m.cfg.NetworkTimeout)
ctx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
return m.cfg.Signer(ctx, m.cfg.From, types.NewTx(rawTx))
}
// nextNonce returns a nonce to use for the next transaction. It uses
// eth_getTransactionCount with "latest" once, and then subsequent calls simply
// increment this number. If the transaction manager is reset, it will query the
// eth_getTransactionCount nonce again.
func (m *SimpleTxManager) nextNonce(ctx context.Context) (uint64, error) {
m.lock.Lock()
defer m.lock.Unlock()
if m.nonce == nil {
// Fetch the sender's nonce from the latest known block (nil `blockNumber`)
childCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
nonce, err := m.backend.NonceAt(childCtx, m.cfg.From, nil)
if err != nil {
m.metr.RPCError()
return 0, fmt.Errorf("failed to get nonce: %w", err)
}
m.nonce = &nonce
} else {
*m.nonce++
}
m.metr.RecordNonce(*m.nonce)
return *m.nonce, nil
}
// send submits the same transaction several times with increasing gas prices as necessary.
// It waits for the transaction to be confirmed on chain.
func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
var wg sync.WaitGroup
defer wg.Wait()
ctx, cancel := context.WithCancel(ctx)
......@@ -218,6 +294,7 @@ func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*typ
ticker := time.NewTicker(m.cfg.ResubmissionTimeout)
defer ticker.Stop()
resetChan := m.resetChannel()
bumpCounter := 0
for {
......@@ -241,6 +318,9 @@ func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*typ
case <-ctx.Done():
return nil, ctx.Err()
case <-resetChan:
return nil, ResetErr
case receipt := <-receiptChan:
m.metr.RecordGasBumpCount(bumpCounter)
m.metr.TxConfirmed(receipt)
......
......@@ -277,7 +277,7 @@ func TestTxMgrConfirmAtMinGasPrice(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -305,7 +305,7 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt)
}
......@@ -334,7 +334,7 @@ func TestTxMgrConfirmsAtHigherGasPrice(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -365,7 +365,7 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt)
}
......@@ -443,7 +443,7 @@ func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
......@@ -478,7 +478,7 @@ func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -523,7 +523,7 @@ func TestTxMgrDoesntAbortNonceTooLowAfterMiningTx(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment