Commit c0ebd1a4 authored by Andreas Bigger's avatar Andreas Bigger

stub out txmgr tx creation

parent 2351ed46
...@@ -27,6 +27,7 @@ type Config struct { ...@@ -27,6 +27,7 @@ type Config struct {
PollInterval time.Duration PollInterval time.Duration
TxManagerConfig txmgr.Config TxManagerConfig txmgr.Config
From common.Address From common.Address
NetworkTimeout time.Duration
// RollupConfig is queried at startup // RollupConfig is queried at startup
Rollup *rollup.Config Rollup *rollup.Config
......
...@@ -22,7 +22,7 @@ import ( ...@@ -22,7 +22,7 @@ import (
type BatchSubmitter struct { type BatchSubmitter struct {
Config // directly embed the config + sources Config // directly embed the config + sources
txMgr *TransactionManager txMgr txmgr.TxManager
wg sync.WaitGroup wg sync.WaitGroup
done chan struct{} done chan struct{}
...@@ -120,9 +120,10 @@ func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger) (*BatchSub ...@@ -120,9 +120,10 @@ func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger) (*BatchSub
return &BatchSubmitter{ return &BatchSubmitter{
Config: cfg, Config: cfg,
txMgr: NewTransactionManager(l, txMgr: txmgr.NewSimpleTxManager("batcher", l, cfg.TxManagerConfig, cfg.L1Client),
cfg.TxManagerConfig, cfg.Rollup.BatchInboxAddress, cfg.Rollup.L1ChainID, // NewTransactionManager(l,
cfg.From, cfg.L1Client), // cfg.TxManagerConfig, cfg.Rollup.BatchInboxAddress, cfg.Rollup.L1ChainID,
// cfg.From, cfg.L1Client),
state: NewChannelManager(l, cfg.Channel), state: NewChannelManager(l, cfg.Channel),
}, nil }, nil
...@@ -209,7 +210,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) { ...@@ -209,7 +210,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) {
// loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded. // loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded.
func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (eth.BlockID, error) { func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (eth.BlockID, error) {
ctx, cancel := context.WithTimeout(ctx, networkTimeout) ctx, cancel := context.WithTimeout(ctx, l.NetworkTimeout)
block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber)) block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber))
cancel() cancel()
if err != nil { if err != nil {
...@@ -226,7 +227,7 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin ...@@ -226,7 +227,7 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state. // calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions) // It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions)
func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) { func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) {
childCtx, cancel := context.WithTimeout(ctx, networkTimeout) childCtx, cancel := context.WithTimeout(ctx, l.NetworkTimeout)
defer cancel() defer cancel()
syncStatus, err := l.RollupNode.SyncStatus(childCtx) syncStatus, err := l.RollupNode.SyncStatus(childCtx)
// Ensure that we have the sync status // Ensure that we have the sync status
...@@ -294,7 +295,7 @@ func (l *BatchSubmitter) loop() { ...@@ -294,7 +295,7 @@ func (l *BatchSubmitter) loop() {
break break
} }
// Record TX Status // Record TX Status
if receipt, err := l.txMgr.SendTransaction(l.ctx, txdata.Bytes()); err != nil { if receipt, err := l.SendTransaction(l.ctx, txdata.Bytes()); err != nil {
l.recordFailedTx(txdata.ID(), err) l.recordFailedTx(txdata.ID(), err)
} else { } else {
l.recordConfirmedTx(txdata.ID(), receipt) l.recordConfirmedTx(txdata.ID(), receipt)
...@@ -316,6 +317,36 @@ func (l *BatchSubmitter) loop() { ...@@ -316,6 +317,36 @@ func (l *BatchSubmitter) loop() {
} }
} }
// SendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
// TODO: where to put concurrent transaction handling logic.
func (l *BatchSubmitter) SendTransaction(ctx context.Context, data []byte) (*types.Receipt, error) {
tx, err := l.txMgr.CraftTx(ctx, txmgr.TxCandidate{
Recipient: l.Rollup.BatchInboxAddress,
TxData: data,
From: l.From,
ChainID: l.Rollup.L1ChainID,
// Explicit instantiation here so we can make a note that a gas
// limit of 0 will cause the [txmgr] to estimate the gas limit.
GasLimit: 0,
})
if err != nil {
return nil, fmt.Errorf("failed to create tx: %w", err)
}
// TODO: Select a timeout that makes sense here.
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
if receipt, err := l.txMgr.Send(ctx, tx); err != nil {
l.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
return nil, err
} else {
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data))
return receipt, nil
}
}
func (l *BatchSubmitter) recordFailedTx(id txID, err error) { func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
l.log.Warn("Failed to send transaction", "err", err) l.log.Warn("Failed to send transaction", "err", err)
l.state.TxFailed(id) l.state.TxFailed(id)
......
...@@ -3,14 +3,17 @@ package txmgr ...@@ -3,14 +3,17 @@ package txmgr
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"math/big" "math/big"
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
) )
...@@ -38,6 +41,12 @@ type Config struct { ...@@ -38,6 +41,12 @@ type Config struct {
// attempted. // attempted.
ResubmissionTimeout time.Duration ResubmissionTimeout time.Duration
// NetworkTimeout is the allowed duration for a single network request.
// This is intended to be used for network requests that can be replayed.
//
// If not set, this will default to 2 seconds.
NetworkTimeout time.Duration
// RequireQueryInterval is the interval at which the tx manager will // RequireQueryInterval is the interval at which the tx manager will
// query the backend to check for confirmations after a tx at a // query the backend to check for confirmations after a tx at a
// specific gas price has been published. // specific gas price has been published.
...@@ -69,6 +78,9 @@ type TxManager interface { ...@@ -69,6 +78,9 @@ type TxManager interface {
// //
// NOTE: Send should be called by AT MOST one caller at a time. // NOTE: Send should be called by AT MOST one caller at a time.
Send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) Send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error)
// CraftTx is used to craft a transaction using a [TxCandidate].
CraftTx(ctx context.Context, candidate TxCandidate) (*types.Transaction, error)
} }
// ETHBackend is the set of methods that the transaction manager uses to resubmit gas & determine // ETHBackend is the set of methods that the transaction manager uses to resubmit gas & determine
...@@ -89,6 +101,7 @@ type ETHBackend interface { ...@@ -89,6 +101,7 @@ type ETHBackend interface {
// TODO(CLI-3318): Maybe need a generic interface to support different RPC providers // TODO(CLI-3318): Maybe need a generic interface to support different RPC providers
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
SuggestGasTipCap(ctx context.Context) (*big.Int, error) SuggestGasTipCap(ctx context.Context) (*big.Int, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
} }
// SimpleTxManager is a implementation of TxManager that performs linear fee // SimpleTxManager is a implementation of TxManager that performs linear fee
...@@ -101,6 +114,86 @@ type SimpleTxManager struct { ...@@ -101,6 +114,86 @@ type SimpleTxManager struct {
l log.Logger l log.Logger
} }
// TxCandidate is a transaction candidate that can be submitted to ask the
// [TxManager] to construct a transaction with gas price bounds.
type TxCandidate struct {
// TxData is the transaction data to be used in the constructed tx.
TxData []byte
// Recipient is the recipient (or `to`) of the constructed tx.
Recipient common.Address
// GasLimit is the gas limit to be used in the constructed tx.
GasLimit uint64
// From is the sender (or `from`) of the constructed tx.
From common.Address
/// ChainID is the chain ID to be used in the constructed tx.
ChainID *big.Int
}
// calcGasTipAndFeeCap queries L1 to determine what a suitable miner tip & basefee limit would be for timely inclusion
func (m *SimpleTxManager) calcGasTipAndFeeCap(ctx context.Context) (gasTipCap *big.Int, gasFeeCap *big.Int, err error) {
childCtx, cancel := context.WithTimeout(ctx, m.Config.NetworkTimeout)
gasTipCap, err = m.backend.SuggestGasTipCap(childCtx)
cancel()
if err != nil {
return nil, nil, fmt.Errorf("failed to get suggested gas tip cap: %w", err)
}
if gasTipCap == nil {
m.l.Warn("unexpected unset gasTipCap, using default 2 gwei")
gasTipCap = new(big.Int).SetUint64(params.GWei * 2)
}
childCtx, cancel = context.WithTimeout(ctx, m.Config.NetworkTimeout)
head, err := m.backend.HeaderByNumber(childCtx, nil)
cancel()
if err != nil || head == nil {
return nil, nil, fmt.Errorf("failed to get L1 head block for fee cap: %w", err)
}
if head.BaseFee == nil {
return nil, nil, fmt.Errorf("failed to get L1 basefee in block %d for fee cap", head.Number)
}
gasFeeCap = CalcGasFeeCap(head.BaseFee, gasTipCap)
return gasTipCap, gasFeeCap, nil
}
// CraftTx creates the signed transaction to the batchInboxAddress.
// It queries L1 for the current fee market conditions as well as for the nonce.
// NOTE: This method SHOULD NOT publish the resulting transaction.
func (m *SimpleTxManager) CraftTx(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) {
gasTipCap, gasFeeCap, err := m.calcGasTipAndFeeCap(ctx)
if err != nil {
return nil, err
}
childCtx, cancel := context.WithTimeout(ctx, m.Config.NetworkTimeout)
nonce, err := m.backend.NonceAt(childCtx, candidate.From, nil)
cancel()
if err != nil {
return nil, fmt.Errorf("failed to get nonce: %w", err)
}
rawTx := &types.DynamicFeeTx{
ChainID: candidate.ChainID,
Nonce: nonce,
To: &candidate.Recipient,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Data: candidate.TxData,
}
m.l.Info("creating tx", "to", rawTx.To, "from", candidate.From)
gas, err := core.IntrinsicGas(rawTx.Data, nil, false, true, true, false)
if err != nil {
return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
}
rawTx.Gas = gas
ctx, cancel = context.WithTimeout(ctx, m.Config.NetworkTimeout)
defer cancel()
return m.Signer(ctx, candidate.From, types.NewTx(rawTx))
}
// IncreaseGasPrice takes the previous transaction & potentially clones then signs it with a higher tip. // IncreaseGasPrice takes the previous transaction & potentially clones then signs it with a higher tip.
// If the tip + basefee suggested by the network are not greater than the previous values, the same transaction // If the tip + basefee suggested by the network are not greater than the previous values, the same transaction
// will be returned. If they are greater, this function will ensure that they are at least greater by 15% than // will be returned. If they are greater, this function will ensure that they are at least greater by 15% than
......
...@@ -185,7 +185,10 @@ func (b *mockBackend) SendTransaction(ctx context.Context, tx *types.Transaction ...@@ -185,7 +185,10 @@ func (b *mockBackend) SendTransaction(ctx context.Context, tx *types.Transaction
panic("set sender function was not set") panic("set sender function was not set")
} }
return b.send(ctx, tx) return b.send(ctx, tx)
}
func (b *mockBackend) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
return 0, nil
} }
// TransactionReceipt queries the mockBackend for a mined txHash. If none is // TransactionReceipt queries the mockBackend for a mined txHash. If none is
...@@ -577,6 +580,10 @@ func (b *failingBackend) SuggestGasTipCap(_ context.Context) (*big.Int, error) { ...@@ -577,6 +580,10 @@ func (b *failingBackend) SuggestGasTipCap(_ context.Context) (*big.Int, error) {
return b.gasTip, nil return b.gasTip, nil
} }
func (b *failingBackend) NonceAt(_ context.Context, _ common.Address, _ *big.Int) (uint64, error) {
return 0, errors.New("unimplemented")
}
// TestWaitMinedReturnsReceiptAfterFailure asserts that WaitMined is able to // TestWaitMinedReturnsReceiptAfterFailure asserts that WaitMined is able to
// recover from failed calls to the backend. It uses the failedBackend to // recover from failed calls to the backend. It uses the failedBackend to
// simulate an rpc call failure, followed by the successful return of a receipt. // simulate an rpc call failure, followed by the successful return of a receipt.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment