Commit 3152615f authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge pull request #5191 from ethereum-optimism/refcell/feat/txcreation

feat(op-service): Move Tx Creation to the txmgr
parents f7d0e1ac be704d54
...@@ -110,6 +110,14 @@ type CLIConfig struct { ...@@ -110,6 +110,14 @@ type CLIConfig struct {
/* Optional Params */ /* Optional Params */
// TxManagerTimeout is the max amount of time to wait for the [txmgr].
// This will default to: 10 * time.Minute.
TxManagerTimeout time.Duration
// OfflineGasEstimation specifies whether the batcher should calculate
// gas estimations offline using the [core.IntrinsicGas] function.
OfflineGasEstimation bool
// MaxL1TxSize is the maximum size of a batch tx submitted to L1. // MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64 MaxL1TxSize uint64
...@@ -168,19 +176,21 @@ func NewConfig(ctx *cli.Context) CLIConfig { ...@@ -168,19 +176,21 @@ func NewConfig(ctx *cli.Context) CLIConfig {
ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name), ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name),
/* Optional Flags */ /* Optional Flags */
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name), OfflineGasEstimation: ctx.GlobalBool(flags.OfflineGasEstimationFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name), TxManagerTimeout: ctx.GlobalDuration(flags.TxManagerTimeoutFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name), MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name), MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name), TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name), TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name), ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name), Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name), Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name),
RPCConfig: rpc.ReadCLIConfig(ctx), SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name),
LogConfig: oplog.ReadCLIConfig(ctx), PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name),
MetricsConfig: opmetrics.ReadCLIConfig(ctx), RPCConfig: rpc.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx), LogConfig: oplog.ReadCLIConfig(ctx),
SignerConfig: opsigner.ReadCLIConfig(ctx), MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
SignerConfig: opsigner.ReadCLIConfig(ctx),
} }
} }
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -24,7 +25,7 @@ import ( ...@@ -24,7 +25,7 @@ import (
type BatchSubmitter struct { type BatchSubmitter struct {
Config // directly embed the config + sources Config // directly embed the config + sources
txMgr *TransactionManager txMgr txmgr.TxManager
wg sync.WaitGroup wg sync.WaitGroup
done chan struct{} done chan struct{}
...@@ -79,6 +80,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -79,6 +80,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
NumConfirmations: cfg.NumConfirmations, NumConfirmations: cfg.NumConfirmations,
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount, SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
From: fromAddress, From: fromAddress,
ChainID: rcfg.L1ChainID,
Signer: signer(rcfg.L1ChainID), Signer: signer(rcfg.L1ChainID),
} }
...@@ -125,10 +127,8 @@ func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger, m metrics. ...@@ -125,10 +127,8 @@ func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger, m metrics.
return &BatchSubmitter{ return &BatchSubmitter{
Config: cfg, Config: cfg,
txMgr: NewTransactionManager(l, txMgr: txmgr.NewSimpleTxManager("batcher", l, cfg.TxManagerConfig, cfg.L1Client),
cfg.TxManagerConfig, cfg.Rollup.BatchInboxAddress, cfg.Rollup.L1ChainID, state: NewChannelManager(l, m, cfg.Channel),
cfg.From, cfg.L1Client),
state: NewChannelManager(l, m, cfg.Channel),
}, nil }, nil
} }
...@@ -226,7 +226,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) { ...@@ -226,7 +226,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) {
// loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded. // loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded.
func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) { func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) {
ctx, cancel := context.WithTimeout(ctx, networkTimeout) ctx, cancel := context.WithTimeout(ctx, txManagerTimeout)
defer cancel() defer cancel()
block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber)) block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber))
if err != nil { if err != nil {
...@@ -244,7 +244,7 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin ...@@ -244,7 +244,7 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state. // calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions) // It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions)
func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) { func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) {
childCtx, cancel := context.WithTimeout(ctx, networkTimeout) childCtx, cancel := context.WithTimeout(ctx, txManagerTimeout)
defer cancel() defer cancel()
syncStatus, err := l.RollupNode.SyncStatus(childCtx) syncStatus, err := l.RollupNode.SyncStatus(childCtx)
// Ensure that we have the sync status // Ensure that we have the sync status
...@@ -312,8 +312,9 @@ func (l *BatchSubmitter) loop() { ...@@ -312,8 +312,9 @@ func (l *BatchSubmitter) loop() {
l.log.Error("unable to get tx data", "err", err) l.log.Error("unable to get tx data", "err", err)
break break
} }
// Record TX Status // Record TX Status
if receipt, err := l.txMgr.SendTransaction(l.ctx, txdata.Bytes()); err != nil { if receipt, err := l.SendTransaction(l.ctx, txdata.Bytes()); err != nil {
l.recordFailedTx(txdata.ID(), err) l.recordFailedTx(txdata.ID(), err)
} else { } else {
l.recordConfirmedTx(txdata.ID(), receipt) l.recordConfirmedTx(txdata.ID(), receipt)
...@@ -335,6 +336,46 @@ func (l *BatchSubmitter) loop() { ...@@ -335,6 +336,46 @@ func (l *BatchSubmitter) loop() {
} }
} }
const networkTimeout = 2 * time.Second // How long a single network request can take. TODO: put in a config somewhere
// fix(refcell):
// combined with above, these config variables should also be replicated in the op-proposer
// along with op-proposer changes to include the updated tx manager
const txManagerTimeout = 2 * time.Minute // How long the tx manager can take to send a transaction.
// SendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
func (l *BatchSubmitter) SendTransaction(ctx context.Context, data []byte) (*types.Receipt, error) {
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil {
return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
}
// Create the transaction
tx, err := l.txMgr.CraftTx(ctx, txmgr.TxCandidate{
To: l.Rollup.BatchInboxAddress,
TxData: data,
From: l.From,
GasLimit: intrinsicGas,
})
if err != nil {
return nil, fmt.Errorf("failed to create tx: %w", err)
}
// Send the transaction through the txmgr
ctx, cancel := context.WithTimeout(ctx, txManagerTimeout)
defer cancel()
if receipt, err := l.txMgr.Send(ctx, tx); err != nil {
l.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
return nil, err
} else {
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data))
return receipt, nil
}
}
func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) { func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
if l.lastL1Tip == l1tip { if l.lastL1Tip == l1tip {
return return
......
package batcher
import (
"context"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum-optimism/optimism/op-service/txmgr/mocks"
)
// TestBatchSubmitter_SendTransaction tests the driver's
// [SendTransaction] external facing function.
func TestBatchSubmitter_SendTransaction(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
txMgr := mocks.TxManager{}
batcherInboxAddress := common.HexToAddress("0x42000000000000000000000000000000000000ff")
chainID := big.NewInt(1)
sender := common.HexToAddress("0xdeadbeef")
bs := BatchSubmitter{
Config: Config{
log: log,
From: sender,
Rollup: &rollup.Config{
L1ChainID: chainID,
BatchInboxAddress: batcherInboxAddress,
},
},
txMgr: &txMgr,
}
txData := []byte{0x00, 0x01, 0x02}
gasTipCap := big.NewInt(136)
gasFeeCap := big.NewInt(137)
gas := uint64(1337)
// Candidate gas should be calculated with [core.IntrinsicGas]
intrinsicGas, err := core.IntrinsicGas(txData, nil, false, true, true, false)
require.NoError(t, err)
candidate := txmgr.TxCandidate{
To: batcherInboxAddress,
TxData: txData,
From: sender,
GasLimit: intrinsicGas,
}
tx := types.NewTx(&types.DynamicFeeTx{
ChainID: chainID,
Nonce: 0,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Gas: gas,
To: &batcherInboxAddress,
Data: txData,
})
txHash := tx.Hash()
expectedReceipt := types.Receipt{
Type: 1,
PostState: []byte{},
Status: uint64(1),
CumulativeGasUsed: gas,
TxHash: txHash,
GasUsed: gas,
}
txMgr.On("CraftTx", mock.Anything, candidate).Return(tx, nil)
txMgr.On("Send", mock.Anything, tx).Return(&expectedReceipt, nil)
receipt, err := bs.SendTransaction(context.Background(), tx.Data())
require.NoError(t, err)
require.Equal(t, receipt, &expectedReceipt)
}
package batcher
import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
)
const networkTimeout = 2 * time.Second // How long a single network request can take. TODO: put in a config somewhere
// TransactionManager wraps the simple txmgr package to make it easy to send & wait for transactions
type TransactionManager struct {
// Config
batchInboxAddress common.Address
senderAddress common.Address
chainID *big.Int
// Outside world
txMgr txmgr.TxManager
l1Client *ethclient.Client
signerFn opcrypto.SignerFn
log log.Logger
}
func NewTransactionManager(log log.Logger, txMgrConfg txmgr.Config, batchInboxAddress common.Address, chainID *big.Int, senderAddress common.Address, l1Client *ethclient.Client) *TransactionManager {
t := &TransactionManager{
batchInboxAddress: batchInboxAddress,
senderAddress: senderAddress,
chainID: chainID,
txMgr: txmgr.NewSimpleTxManager("batcher", log, txMgrConfg, l1Client),
l1Client: l1Client,
signerFn: txMgrConfg.Signer,
log: log,
}
return t
}
// SendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
// TODO: where to put concurrent transaction handling logic.
func (t *TransactionManager) SendTransaction(ctx context.Context, data []byte) (*types.Receipt, error) {
tx, err := t.CraftTx(ctx, data)
if err != nil {
return nil, fmt.Errorf("failed to create tx: %w", err)
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) // TODO: Select a timeout that makes sense here.
defer cancel()
if receipt, err := t.txMgr.Send(ctx, tx); err != nil {
t.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
return nil, err
} else {
t.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data))
return receipt, nil
}
}
// calcGasTipAndFeeCap queries L1 to determine what a suitable miner tip & basefee limit would be for timely inclusion
func (t *TransactionManager) calcGasTipAndFeeCap(ctx context.Context) (gasTipCap *big.Int, gasFeeCap *big.Int, err error) {
childCtx, cancel := context.WithTimeout(ctx, networkTimeout)
gasTipCap, err = t.l1Client.SuggestGasTipCap(childCtx)
cancel()
if err != nil {
return nil, nil, fmt.Errorf("failed to get suggested gas tip cap: %w", err)
}
if gasTipCap == nil {
t.log.Warn("unexpected unset gasTipCap, using default 2 gwei")
gasTipCap = new(big.Int).SetUint64(params.GWei * 2)
}
childCtx, cancel = context.WithTimeout(ctx, networkTimeout)
head, err := t.l1Client.HeaderByNumber(childCtx, nil)
cancel()
if err != nil || head == nil {
return nil, nil, fmt.Errorf("failed to get L1 head block for fee cap: %w", err)
}
if head.BaseFee == nil {
return nil, nil, fmt.Errorf("failed to get L1 basefee in block %d for fee cap", head.Number)
}
gasFeeCap = txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)
return gasTipCap, gasFeeCap, nil
}
// CraftTx creates the signed transaction to the batchInboxAddress.
// It queries L1 for the current fee market conditions as well as for the nonce.
// NOTE: This method SHOULD NOT publish the resulting transaction.
func (t *TransactionManager) CraftTx(ctx context.Context, data []byte) (*types.Transaction, error) {
gasTipCap, gasFeeCap, err := t.calcGasTipAndFeeCap(ctx)
if err != nil {
return nil, err
}
childCtx, cancel := context.WithTimeout(ctx, networkTimeout)
nonce, err := t.l1Client.NonceAt(childCtx, t.senderAddress, nil)
cancel()
if err != nil {
return nil, fmt.Errorf("failed to get nonce: %w", err)
}
rawTx := &types.DynamicFeeTx{
ChainID: t.chainID,
Nonce: nonce,
To: &t.batchInboxAddress,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Data: data,
}
t.log.Info("creating tx", "to", rawTx.To, "from", t.senderAddress)
gas, err := core.IntrinsicGas(rawTx.Data, nil, false, true, true, false)
if err != nil {
return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
}
rawTx.Gas = gas
ctx, cancel = context.WithTimeout(ctx, networkTimeout)
defer cancel()
tx := types.NewTx(rawTx)
return t.signerFn(ctx, t.senderAddress, tx)
}
package flags package flags
import ( import (
"time"
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-batcher/rpc"
...@@ -74,7 +76,17 @@ var ( ...@@ -74,7 +76,17 @@ var (
} }
/* Optional flags */ /* Optional flags */
OfflineGasEstimationFlag = cli.BoolFlag{
Name: "offline-gas-estimation",
Usage: "Whether to use offline gas estimation",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "OFFLINE_GAS_ESTIMATION"),
}
TxManagerTimeoutFlag = cli.DurationFlag{
Name: "tx-manager-timeout",
Usage: "Maximum duration to wait for L1 transactions, including resubmissions",
Value: 10 * time.Minute,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "TX_MANAGER_TIMEOUT"),
}
MaxChannelDurationFlag = cli.Uint64Flag{ MaxChannelDurationFlag = cli.Uint64Flag{
Name: "max-channel-duration", Name: "max-channel-duration",
Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.", Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.",
...@@ -141,6 +153,8 @@ var requiredFlags = []cli.Flag{ ...@@ -141,6 +153,8 @@ var requiredFlags = []cli.Flag{
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
OfflineGasEstimationFlag,
TxManagerTimeoutFlag,
MaxChannelDurationFlag, MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag, MaxL1TxSizeBytesFlag,
TargetL1TxSizeBytesFlag, TargetL1TxSizeBytesFlag,
......
...@@ -52,6 +52,7 @@ func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Cl ...@@ -52,6 +52,7 @@ func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Cl
NumConfirmations: 1, NumConfirmations: 1,
SafeAbortNonceTooLowCount: 4, SafeAbortNonceTooLowCount: 4,
From: from, From: from,
ChainID: big.NewInt(420),
// Signer is loaded in `proposer.NewL2OutputSubmitter` // Signer is loaded in `proposer.NewL2OutputSubmitter`
}, },
L1Client: l1, L1Client: l1,
......
...@@ -329,6 +329,8 @@ func TestMigration(t *testing.T) { ...@@ -329,6 +329,8 @@ func TestMigration(t *testing.T) {
L1EthRpc: forkedL1URL, L1EthRpc: forkedL1URL,
L2EthRpc: gethNode.WSEndpoint(), L2EthRpc: gethNode.WSEndpoint(),
RollupRpc: rollupNode.HTTPEndpoint(), RollupRpc: rollupNode.HTTPEndpoint(),
TxManagerTimeout: 10 * time.Minute,
OfflineGasEstimation: true,
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000, TargetL1TxSize: 100_000,
......
...@@ -590,10 +590,13 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -590,10 +590,13 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
} }
// Batch Submitter // Batch Submitter
txManagerTimeout := 10 * time.Minute
sys.BatchSubmitter, err = bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{ sys.BatchSubmitter, err = bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{
L1EthRpc: sys.Nodes["l1"].WSEndpoint(), L1EthRpc: sys.Nodes["l1"].WSEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(), L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(), RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
TxManagerTimeout: txManagerTimeout,
OfflineGasEstimation: true,
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000, TargetL1TxSize: 100_000,
......
...@@ -169,12 +169,18 @@ func NewL2OutputSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Me ...@@ -169,12 +169,18 @@ func NewL2OutputSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Me
return nil, err return nil, err
} }
chainID, err := l1Client.ChainID(context.Background())
if err != nil {
return nil, err
}
txMgrConfg := txmgr.Config{ txMgrConfg := txmgr.Config{
ResubmissionTimeout: cfg.ResubmissionTimeout, ResubmissionTimeout: cfg.ResubmissionTimeout,
ReceiptQueryInterval: time.Second, ReceiptQueryInterval: time.Second,
NumConfirmations: cfg.NumConfirmations, NumConfirmations: cfg.NumConfirmations,
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount, SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
From: fromAddress, From: fromAddress,
ChainID: chainID,
} }
proposerCfg := Config{ proposerCfg := Config{
......
// Code generated by mockery v2.22.1. DO NOT EDIT.
package mocks
import (
context "context"
txmgr "github.com/ethereum-optimism/optimism/op-service/txmgr"
mock "github.com/stretchr/testify/mock"
types "github.com/ethereum/go-ethereum/core/types"
)
// TxManager is an autogenerated mock type for the TxManager type
type TxManager struct {
mock.Mock
}
// CraftTx provides a mock function with given fields: ctx, candidate
func (_m *TxManager) CraftTx(ctx context.Context, candidate txmgr.TxCandidate) (*types.Transaction, error) {
ret := _m.Called(ctx, candidate)
var r0 *types.Transaction
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, txmgr.TxCandidate) (*types.Transaction, error)); ok {
return rf(ctx, candidate)
}
if rf, ok := ret.Get(0).(func(context.Context, txmgr.TxCandidate) *types.Transaction); ok {
r0 = rf(ctx, candidate)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.Transaction)
}
}
if rf, ok := ret.Get(1).(func(context.Context, txmgr.TxCandidate) error); ok {
r1 = rf(ctx, candidate)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Send provides a mock function with given fields: ctx, tx
func (_m *TxManager) Send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
ret := _m.Called(ctx, tx)
var r0 *types.Receipt
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *types.Transaction) (*types.Receipt, error)); ok {
return rf(ctx, tx)
}
if rf, ok := ret.Get(0).(func(context.Context, *types.Transaction) *types.Receipt); ok {
r0 = rf(ctx, tx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.Receipt)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *types.Transaction) error); ok {
r1 = rf(ctx, tx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
type mockConstructorTestingTNewTxManager interface {
mock.TestingT
Cleanup(func())
}
// NewTxManager creates a new instance of TxManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewTxManager(t mockConstructorTestingTNewTxManager) *TxManager {
mock := &TxManager{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
...@@ -3,14 +3,17 @@ package txmgr ...@@ -3,14 +3,17 @@ package txmgr
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"math/big" "math/big"
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
) )
...@@ -38,6 +41,15 @@ type Config struct { ...@@ -38,6 +41,15 @@ type Config struct {
// attempted. // attempted.
ResubmissionTimeout time.Duration ResubmissionTimeout time.Duration
// ChainID is the chain ID of the L1 chain.
ChainID *big.Int
// NetworkTimeout is the allowed duration for a single network request.
// This is intended to be used for network requests that can be replayed.
//
// If not set, this will default to 2 seconds.
NetworkTimeout time.Duration
// RequireQueryInterval is the interval at which the tx manager will // RequireQueryInterval is the interval at which the tx manager will
// query the backend to check for confirmations after a tx at a // query the backend to check for confirmations after a tx at a
// specific gas price has been published. // specific gas price has been published.
...@@ -59,6 +71,8 @@ type Config struct { ...@@ -59,6 +71,8 @@ type Config struct {
// TxManager is an interface that allows callers to reliably publish txs, // TxManager is an interface that allows callers to reliably publish txs,
// bumping the gas price if needed, and obtain the receipt of the resulting tx. // bumping the gas price if needed, and obtain the receipt of the resulting tx.
//
//go:generate mockery --name TxManager --output ./mocks
type TxManager interface { type TxManager interface {
// Send is used to publish a transaction with incrementally higher gas // Send is used to publish a transaction with incrementally higher gas
// prices until the transaction eventually confirms. This method blocks // prices until the transaction eventually confirms. This method blocks
...@@ -69,6 +83,9 @@ type TxManager interface { ...@@ -69,6 +83,9 @@ type TxManager interface {
// //
// NOTE: Send should be called by AT MOST one caller at a time. // NOTE: Send should be called by AT MOST one caller at a time.
Send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) Send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error)
// CraftTx is used to craft a transaction using a [TxCandidate].
CraftTx(ctx context.Context, candidate TxCandidate) (*types.Transaction, error)
} }
// ETHBackend is the set of methods that the transaction manager uses to resubmit gas & determine // ETHBackend is the set of methods that the transaction manager uses to resubmit gas & determine
...@@ -89,18 +106,119 @@ type ETHBackend interface { ...@@ -89,18 +106,119 @@ type ETHBackend interface {
// TODO(CLI-3318): Maybe need a generic interface to support different RPC providers // TODO(CLI-3318): Maybe need a generic interface to support different RPC providers
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
SuggestGasTipCap(ctx context.Context) (*big.Int, error) SuggestGasTipCap(ctx context.Context) (*big.Int, error)
// NonceAt returns the account nonce of the given account.
// The block number can be nil, in which case the nonce is taken from the latest known block.
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
/// EstimateGas returns an estimate of the amount of gas needed to execute the given
/// transaction against the current pending block.
EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error)
} }
// SimpleTxManager is a implementation of TxManager that performs linear fee // SimpleTxManager is a implementation of TxManager that performs linear fee
// bumping of a tx until it confirms. // bumping of a tx until it confirms.
type SimpleTxManager struct { type SimpleTxManager struct {
Config // embed the config directly Config // embed the config directly
name string name string
chainID *big.Int
backend ETHBackend backend ETHBackend
l log.Logger l log.Logger
} }
// TxCandidate is a transaction candidate that can be submitted to ask the
// [TxManager] to construct a transaction with gas price bounds.
type TxCandidate struct {
// TxData is the transaction data to be used in the constructed tx.
TxData []byte
// To is the recipient of the constructed tx.
To common.Address
// GasLimit is the gas limit to be used in the constructed tx.
GasLimit uint64
// From is the sender (or `from`) of the constructed tx.
From common.Address
}
// calcGasTipAndFeeCap queries L1 to determine what a suitable miner tip & basefee limit would be for timely inclusion
func (m *SimpleTxManager) calcGasTipAndFeeCap(ctx context.Context) (gasTipCap *big.Int, gasFeeCap *big.Int, err error) {
childCtx, cancel := context.WithTimeout(ctx, m.Config.NetworkTimeout)
gasTipCap, err = m.backend.SuggestGasTipCap(childCtx)
cancel()
if err != nil {
return nil, nil, fmt.Errorf("failed to get suggested gas tip cap: %w", err)
}
if gasTipCap == nil {
m.l.Warn("unexpected unset gasTipCap, using default 2 gwei")
gasTipCap = new(big.Int).SetUint64(params.GWei * 2)
}
childCtx, cancel = context.WithTimeout(ctx, m.Config.NetworkTimeout)
head, err := m.backend.HeaderByNumber(childCtx, nil)
cancel()
if err != nil || head == nil {
return nil, nil, fmt.Errorf("failed to get L1 head block for fee cap: %w", err)
}
if head.BaseFee == nil {
return nil, nil, fmt.Errorf("failed to get L1 basefee in block %d for fee cap", head.Number)
}
gasFeeCap = CalcGasFeeCap(head.BaseFee, gasTipCap)
return gasTipCap, gasFeeCap, nil
}
// CraftTx creates the signed transaction to the batchInboxAddress.
// It queries L1 for the current fee market conditions as well as for the nonce.
// NOTE: This method SHOULD NOT publish the resulting transaction.
// NOTE: If the [TxCandidate.GasLimit] is non-zero, it will be used as the transaction's gas.
// NOTE: Otherwise, the [SimpleTxManager] will query the specified backend for an estimate.
func (m *SimpleTxManager) CraftTx(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) {
gasTipCap, gasFeeCap, err := m.calcGasTipAndFeeCap(ctx)
if err != nil {
return nil, err
}
// Fetch the sender's nonce from the latest known block (nil `blockNumber`)
childCtx, cancel := context.WithTimeout(ctx, m.Config.NetworkTimeout)
defer cancel()
nonce, err := m.backend.NonceAt(childCtx, candidate.From, nil)
if err != nil {
return nil, fmt.Errorf("failed to get nonce: %w", err)
}
rawTx := &types.DynamicFeeTx{
ChainID: m.chainID,
Nonce: nonce,
To: &candidate.To,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Data: candidate.TxData,
}
m.l.Info("creating tx", "to", rawTx.To, "from", candidate.From)
// If the gas limit is set, we can use that as the gas
if candidate.GasLimit != 0 {
rawTx.Gas = candidate.GasLimit
} else {
// Calculate the intrinsic gas for the transaction
gas, err := m.backend.EstimateGas(ctx, ethereum.CallMsg{
From: candidate.From,
To: &candidate.To,
GasFeeCap: gasFeeCap,
GasTipCap: gasTipCap,
Data: rawTx.Data,
})
if err != nil {
return nil, fmt.Errorf("failed to estimate gas: %w", err)
}
rawTx.Gas = gas
}
ctx, cancel = context.WithTimeout(ctx, m.Config.NetworkTimeout)
defer cancel()
return m.Signer(ctx, candidate.From, types.NewTx(rawTx))
}
// IncreaseGasPrice takes the previous transaction & potentially clones then signs it with a higher tip. // IncreaseGasPrice takes the previous transaction & potentially clones then signs it with a higher tip.
// If the tip + basefee suggested by the network are not greater than the previous values, the same transaction // If the tip + basefee suggested by the network are not greater than the previous values, the same transaction
// will be returned. If they are greater, this function will ensure that they are at least greater by 15% than // will be returned. If they are greater, this function will ensure that they are at least greater by 15% than
...@@ -196,8 +314,12 @@ func NewSimpleTxManager(name string, l log.Logger, cfg Config, backend ETHBacken ...@@ -196,8 +314,12 @@ func NewSimpleTxManager(name string, l log.Logger, cfg Config, backend ETHBacken
if cfg.NumConfirmations == 0 { if cfg.NumConfirmations == 0 {
panic("txmgr: NumConfirmations cannot be zero") panic("txmgr: NumConfirmations cannot be zero")
} }
if cfg.NetworkTimeout == 0 {
cfg.NetworkTimeout = 2 * time.Second
}
return &SimpleTxManager{ return &SimpleTxManager{
chainID: cfg.ChainID,
name: name, name: name,
Config: cfg, Config: cfg,
backend: backend, backend: backend,
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils" "github.com/ethereum-optimism/optimism/op-node/testutils"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -50,6 +51,18 @@ func newTestHarness(t *testing.T) *testHarness { ...@@ -50,6 +51,18 @@ func newTestHarness(t *testing.T) *testHarness {
return newTestHarnessWithConfig(t, configWithNumConfs(1)) return newTestHarnessWithConfig(t, configWithNumConfs(1))
} }
// createTxCandidate creates a mock [TxCandidate].
func (h testHarness) createTxCandidate() TxCandidate {
inbox := common.HexToAddress("0x42000000000000000000000000000000000000ff")
sender := common.HexToAddress("0xdeadbeef")
return TxCandidate{
To: inbox,
TxData: []byte{0x00, 0x01, 0x02},
From: sender,
GasLimit: uint64(1337),
}
}
func configWithNumConfs(numConfirmations uint64) Config { func configWithNumConfs(numConfirmations uint64) Config {
return Config{ return Config{
ResubmissionTimeout: time.Second, ResubmissionTimeout: time.Second,
...@@ -175,6 +188,10 @@ func (b *mockBackend) HeaderByNumber(ctx context.Context, number *big.Int) (*typ ...@@ -175,6 +188,10 @@ func (b *mockBackend) HeaderByNumber(ctx context.Context, number *big.Int) (*typ
}, nil }, nil
} }
func (b *mockBackend) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) {
return b.g.basefee().Uint64(), nil
}
func (b *mockBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { func (b *mockBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
tip, _ := b.g.sample() tip, _ := b.g.sample()
return tip, nil return tip, nil
...@@ -185,7 +202,14 @@ func (b *mockBackend) SendTransaction(ctx context.Context, tx *types.Transaction ...@@ -185,7 +202,14 @@ func (b *mockBackend) SendTransaction(ctx context.Context, tx *types.Transaction
panic("set sender function was not set") panic("set sender function was not set")
} }
return b.send(ctx, tx) return b.send(ctx, tx)
}
func (b *mockBackend) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
return 0, nil
}
func (*mockBackend) ChainID(ctx context.Context) (*big.Int, error) {
return big.NewInt(1), nil
} }
// TransactionReceipt queries the mockBackend for a mined txHash. If none is // TransactionReceipt queries the mockBackend for a mined txHash. If none is
...@@ -330,6 +354,51 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) { ...@@ -330,6 +354,51 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) {
require.Nil(t, receipt) require.Nil(t, receipt)
} }
// TestTxMgr_CraftTx ensures that the tx manager will create transactions as expected.
func TestTxMgr_CraftTx(t *testing.T) {
t.Parallel()
h := newTestHarness(t)
candidate := h.createTxCandidate()
// Craft the transaction.
gasTipCap, gasFeeCap := h.gasPricer.feesForEpoch(h.gasPricer.epoch + 1)
tx, err := h.mgr.CraftTx(context.Background(), candidate)
require.Nil(t, err)
require.NotNil(t, tx)
// Validate the gas tip cap and fee cap.
require.Equal(t, gasTipCap, tx.GasTipCap())
require.Equal(t, gasFeeCap, tx.GasFeeCap())
// Validate the nonce was set correctly using the backend.
require.Zero(t, tx.Nonce())
// Check that the gas was set using the gas limit.
require.Equal(t, candidate.GasLimit, tx.Gas())
}
// TestTxMgr_EstimateGas ensures that the tx manager will estimate
// the gas when candidate gas limit is zero in [CraftTx].
func TestTxMgr_EstimateGas(t *testing.T) {
t.Parallel()
h := newTestHarness(t)
candidate := h.createTxCandidate()
// Set the gas limit to zero to trigger gas estimation.
candidate.GasLimit = 0
// Gas estimate
gasEstimate := h.gasPricer.baseBaseFee.Uint64()
// Craft the transaction.
tx, err := h.mgr.CraftTx(context.Background(), candidate)
require.Nil(t, err)
require.NotNil(t, tx)
// Check that the gas was estimated correctly.
require.Equal(t, gasEstimate, tx.Gas())
}
// TestTxMgrOnlyOnePublicationSucceeds asserts that the tx manager will return a // TestTxMgrOnlyOnePublicationSucceeds asserts that the tx manager will return a
// receipt so long as at least one of the publications is able to succeed with a // receipt so long as at least one of the publications is able to succeed with a
// simulated rpc failure. // simulated rpc failure.
...@@ -577,6 +646,18 @@ func (b *failingBackend) SuggestGasTipCap(_ context.Context) (*big.Int, error) { ...@@ -577,6 +646,18 @@ func (b *failingBackend) SuggestGasTipCap(_ context.Context) (*big.Int, error) {
return b.gasTip, nil return b.gasTip, nil
} }
func (b *failingBackend) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) {
return b.baseFee.Uint64(), nil
}
func (b *failingBackend) NonceAt(_ context.Context, _ common.Address, _ *big.Int) (uint64, error) {
return 0, errors.New("unimplemented")
}
func (b *failingBackend) ChainID(ctx context.Context) (*big.Int, error) {
return nil, errors.New("unimplemented")
}
// TestWaitMinedReturnsReceiptAfterFailure asserts that WaitMined is able to // TestWaitMinedReturnsReceiptAfterFailure asserts that WaitMined is able to
// recover from failed calls to the backend. It uses the failedBackend to // recover from failed calls to the backend. It uses the failedBackend to
// simulate an rpc call failure, followed by the successful return of a receipt. // simulate an rpc call failure, followed by the successful return of a receipt.
......
...@@ -123,6 +123,8 @@ services: ...@@ -123,6 +123,8 @@ services:
OP_BATCHER_L1_ETH_RPC: http://l1:8545 OP_BATCHER_L1_ETH_RPC: http://l1:8545
OP_BATCHER_L2_ETH_RPC: http://l2:8545 OP_BATCHER_L2_ETH_RPC: http://l2:8545
OP_BATCHER_ROLLUP_RPC: http://op-node:8545 OP_BATCHER_ROLLUP_RPC: http://op-node:8545
TX_MANAGER_TIMEOUT: 10m
OFFLINE_GAS_ESTIMATION: false
OP_BATCHER_MAX_CHANNEL_DURATION: 1 OP_BATCHER_MAX_CHANNEL_DURATION: 1
OP_BATCHER_MAX_L1_TX_SIZE_BYTES: 120000 OP_BATCHER_MAX_L1_TX_SIZE_BYTES: 120000
OP_BATCHER_TARGET_L1_TX_SIZE_BYTES: 100000 OP_BATCHER_TARGET_L1_TX_SIZE_BYTES: 100000
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment