Commit 43b78a6b authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-batcher: New Transaction Manager (#3828)

This is a wrapper on top of the transaction manger. It does simple nonce management
and provides a nice interface to the underlying transaction manager.
parent d8144c4e
...@@ -23,7 +23,7 @@ import ( ...@@ -23,7 +23,7 @@ import (
// BatchSubmitter encapsulates a service responsible for submitting L2 tx // BatchSubmitter encapsulates a service responsible for submitting L2 tx
// batches to L1 for availability. // batches to L1 for availability.
type BatchSubmitter struct { type BatchSubmitter struct {
txMgr txmgr.TxManager txMgr *TransactionManager
addr common.Address addr common.Address
cfg sequencer.Config cfg sequencer.Config
wg sync.WaitGroup wg sync.WaitGroup
...@@ -145,7 +145,7 @@ func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) { ...@@ -145,7 +145,7 @@ func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) {
return &BatchSubmitter{ return &BatchSubmitter{
cfg: batcherCfg, cfg: batcherCfg,
addr: addr, addr: addr,
txMgr: txmgr.NewSimpleTxManager("batcher", txManagerConfig, l1Client), txMgr: NewTransactionManger(l, txManagerConfig, batchInboxAddress, chainID, sequencerPrivKey, l1Client),
done: make(chan struct{}), done: make(chan struct{}),
log: l, log: l,
state: new(channelManager), state: new(channelManager),
...@@ -243,7 +243,10 @@ func (l *BatchSubmitter) loop() { ...@@ -243,7 +243,10 @@ func (l *BatchSubmitter) loop() {
l.log.Error("unable to get tx data", "err", err) l.log.Error("unable to get tx data", "err", err)
break break
} }
_ = l.submitTransaction(data) // Drop receipt + error for now
if _, err := l.txMgr.SendTransaction(l.ctx, data); err != nil {
l.log.Error("Failed to send transaction", "err", err)
}
} }
case <-l.done: case <-l.done:
......
...@@ -2,108 +2,144 @@ package op_batcher ...@@ -2,108 +2,144 @@ package op_batcher
import ( import (
"context" "context"
"crypto/ecdsa"
"fmt"
"math/big"
"time" "time"
"github.com/ethereum-optimism/optimism/op-proposer/txmgr" "github.com/ethereum-optimism/optimism/op-proposer/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
) )
func (l *BatchSubmitter) submitTransaction(data []byte) error { const networkTimeout = 2 * time.Second // How long a single network request can take. TODO: put in a config somewhere
// Query for the submitter's current nonce.
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10) // TransactionManager wraps the simple txmgr package to make it easy to send & wait for transactions
nonce, err := l.cfg.L1Client.NonceAt(ctx, l.addr, nil) type TransactionManager struct {
cancel() // Config
if err != nil { batchInboxAddress common.Address
l.log.Error("unable to get current nonce", "err", err) senderAddress common.Address
return err chainID *big.Int
// Outside world
txMgr txmgr.TxManager
l1Client *ethclient.Client
signerFn func(types.TxData) (*types.Transaction, error)
log log.Logger
}
func NewTransactionManger(log log.Logger, txMgrConfg txmgr.Config, batchInboxAddress common.Address, chainID *big.Int, privKey *ecdsa.PrivateKey, l1Client *ethclient.Client) *TransactionManager {
signerFn := func(rawTx types.TxData) (*types.Transaction, error) {
return types.SignNewTx(privKey, types.LatestSignerForChainID(chainID), rawTx)
} }
// Create the transaction t := &TransactionManager{
ctx, cancel = context.WithTimeout(l.ctx, time.Second*10) batchInboxAddress: batchInboxAddress,
tx, err := l.CraftTx(ctx, data, nonce) senderAddress: crypto.PubkeyToAddress(privKey.PublicKey),
cancel() chainID: chainID,
if err != nil { txMgr: txmgr.NewSimpleTxManager("batcher", txMgrConfg, l1Client),
l.log.Error("unable to craft tx", "err", err) l1Client: l1Client,
return err signerFn: signerFn,
log: log,
} }
return t
}
// Construct the a closure that will update the txn with the current gas prices. // SendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
// TODO: where to put concurrent transaction handling logic.
func (t *TransactionManager) SendTransaction(ctx context.Context, data []byte) (*types.Receipt, error) {
tx, err := t.CraftTx(ctx, data)
if err != nil {
return nil, fmt.Errorf("failed to create tx: %w", err)
}
// Construct a closure that will update the txn with the current gas prices.
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) { updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
l.log.Debug("updating batch tx gas price") return t.UpdateGasPrice(ctx, tx)
return l.UpdateGasPrice(ctx, tx)
} }
// Wait until one of our submitted transactions confirms. If no ctx, cancel := context.WithTimeout(ctx, 100*time.Second) // TODO: Select a timeout that makes sense here.
// receipt is received it's likely our gas price was too low. defer cancel()
// TODO: does the tx manager nicely replace the tx? if receipt, err := t.txMgr.Send(ctx, updateGasPrice, t.l1Client.SendTransaction); err != nil {
// (submit a new one, that's within the channel timeout, but higher fee than previously submitted tx? Or use a cheap cancel tx?) t.log.Warn("unable to publish tx", "err", err)
ctx, cancel = context.WithTimeout(l.ctx, time.Second*time.Duration(l.cfg.ChannelTimeout)) return nil, err
receipt, err := l.txMgr.Send(ctx, updateGasPrice, l.cfg.L1Client.SendTransaction) } else {
t.log.Info("tx successfully published", "tx_hash", receipt.TxHash)
return receipt, nil
}
}
// calcGasTipAndFeeCap queries L1 to determine what a suitable miner tip & basefee limit would be for timely inclusion
func (t *TransactionManager) calcGasTipAndFeeCap(ctx context.Context) (gasTipCap *big.Int, gasFeeCap *big.Int, err error) {
childCtx, cancel := context.WithTimeout(ctx, networkTimeout)
gasTipCap, err = t.l1Client.SuggestGasTipCap(childCtx)
cancel() cancel()
if err != nil { if err != nil {
l.log.Warn("unable to publish tx", "err", err) return nil, nil, fmt.Errorf("failed to get suggested gas tip cap: %w", err)
return err
} }
// The transaction was successfully submitted. childCtx, cancel = context.WithTimeout(ctx, networkTimeout)
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash) head, err := t.l1Client.HeaderByNumber(childCtx, nil)
return nil cancel()
if err != nil {
return nil, nil, fmt.Errorf("failed to get L1 head block for fee cap: %w", err)
}
gasFeeCap = txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)
return gasTipCap, gasFeeCap, nil
} }
// CraftTx creates the signed transaction to the batchInboxAddress.
// It queries L1 for the current fee market conditions as well as for the nonce.
// NOTE: This method SHOULD NOT publish the resulting transaction. // NOTE: This method SHOULD NOT publish the resulting transaction.
func (l *BatchSubmitter) CraftTx(ctx context.Context, data []byte, nonce uint64) (*types.Transaction, error) { func (t *TransactionManager) CraftTx(ctx context.Context, data []byte) (*types.Transaction, error) {
gasTipCap, err := l.cfg.L1Client.SuggestGasTipCap(ctx) gasTipCap, gasFeeCap, err := t.calcGasTipAndFeeCap(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
head, err := l.cfg.L1Client.HeaderByNumber(ctx, nil) ctx, cancel := context.WithTimeout(ctx, networkTimeout)
nonce, err := t.l1Client.NonceAt(ctx, t.senderAddress, nil)
cancel()
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to get nonce: %w", err)
} }
gasFeeCap := txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)
rawTx := &types.DynamicFeeTx{ rawTx := &types.DynamicFeeTx{
ChainID: l.cfg.ChainID, ChainID: t.chainID,
Nonce: nonce, Nonce: nonce,
To: &l.cfg.BatchInboxAddress, To: &t.batchInboxAddress,
GasTipCap: gasTipCap, GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap, GasFeeCap: gasFeeCap,
Data: data, Data: data,
} }
l.log.Debug("creating tx", "to", rawTx.To, "from", crypto.PubkeyToAddress(l.cfg.PrivKey.PublicKey)) t.log.Info("creating tx", "to", rawTx.To, "from", t.senderAddress)
gas, err := core.IntrinsicGas(rawTx.Data, nil, false, true, true) gas, err := core.IntrinsicGas(rawTx.Data, nil, false, true, true)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
} }
rawTx.Gas = gas rawTx.Gas = gas
return types.SignNewTx(l.cfg.PrivKey, types.LatestSignerForChainID(l.cfg.ChainID), rawTx) return t.signerFn(rawTx)
} }
// UpdateGasPrice signs an otherwise identical txn to the one provided but with // UpdateGasPrice signs an otherwise identical txn to the one provided but with
// updated gas prices sampled from the existing network conditions. // updated gas prices sampled from the existing network conditions.
// //
// NOTE: Thie method SHOULD NOT publish the resulting transaction. // NOTE: This method SHOULD NOT publish the resulting transaction.
func (l *BatchSubmitter) UpdateGasPrice(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) { func (t *TransactionManager) UpdateGasPrice(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) {
gasTipCap, err := l.cfg.L1Client.SuggestGasTipCap(ctx) gasTipCap, gasFeeCap, err := t.calcGasTipAndFeeCap(ctx)
if err != nil {
return nil, err
}
head, err := l.cfg.L1Client.HeaderByNumber(ctx, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
gasFeeCap := txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)
rawTx := &types.DynamicFeeTx{ rawTx := &types.DynamicFeeTx{
ChainID: l.cfg.ChainID, ChainID: t.chainID,
Nonce: tx.Nonce(), Nonce: tx.Nonce(),
To: tx.To(), To: tx.To(),
GasTipCap: gasTipCap, GasTipCap: gasTipCap,
...@@ -111,12 +147,8 @@ func (l *BatchSubmitter) UpdateGasPrice(ctx context.Context, tx *types.Transacti ...@@ -111,12 +147,8 @@ func (l *BatchSubmitter) UpdateGasPrice(ctx context.Context, tx *types.Transacti
Gas: tx.Gas(), Gas: tx.Gas(),
Data: tx.Data(), Data: tx.Data(),
} }
// Only log the new tip/fee cap because the updateGasPrice closure reuses the same initial transaction
t.log.Trace("updating gas price", "tip_cap", gasTipCap, "fee_cap", gasFeeCap)
return types.SignNewTx(l.cfg.PrivKey, types.LatestSignerForChainID(l.cfg.ChainID), rawTx) return t.signerFn(rawTx)
}
// SendTransaction injects a signed transaction into the pending pool for
// execution.
func (l *BatchSubmitter) SendTransaction(ctx context.Context, tx *types.Transaction) error {
return l.cfg.L1Client.SendTransaction(ctx, tx)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment