Commit 0fd20eba authored by Joshua Gutow's avatar Joshua Gutow

txmgr: Configure timeouts better

parent 84773de7
......@@ -26,7 +26,8 @@ type Config struct {
RollupNode *sources.RollupClient
TxManager txmgr.TxManager
PollInterval time.Duration
NetworkTimeout time.Duration
PollInterval time.Duration
// RollupConfig is queried at startup
Rollup *rollup.Config
......
......@@ -76,12 +76,13 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
txManager := txmgr.NewSimpleTxManager("batcher", l, txManagerConfig)
batcherCfg := Config{
L1Client: l1Client,
L2Client: l2Client,
RollupNode: rollupClient,
PollInterval: cfg.PollInterval,
TxManager: txManager,
Rollup: rcfg,
L1Client: l1Client,
L2Client: l2Client,
RollupNode: rollupClient,
PollInterval: cfg.PollInterval,
NetworkTimeout: txManagerConfig.NetworkTimeout,
TxManager: txManager,
Rollup: rcfg,
Channel: ChannelConfig{
SeqWindowSize: rcfg.SeqWindowSize,
ChannelTimeout: rcfg.ChannelTimeout,
......@@ -223,7 +224,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) {
// loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded.
func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) {
ctx, cancel := context.WithTimeout(ctx, txManagerTimeout)
ctx, cancel := context.WithTimeout(ctx, l.NetworkTimeout)
defer cancel()
block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber))
if err != nil {
......@@ -241,9 +242,9 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions)
func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) {
childCtx, cancel := context.WithTimeout(ctx, txManagerTimeout)
ctx, cancel := context.WithTimeout(ctx, l.NetworkTimeout)
defer cancel()
syncStatus, err := l.RollupNode.SyncStatus(childCtx)
syncStatus, err := l.RollupNode.SyncStatus(ctx)
// Ensure that we have the sync status
if err != nil {
return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("failed to get sync status: %w", err)
......@@ -343,13 +344,6 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
}
}
const networkTimeout = 2 * time.Second // How long a single network request can take. TODO: put in a config somewhere
// fix(refcell):
// combined with above, these config variables should also be replicated in the op-proposer
// along with op-proposer changes to include the updated tx manager
const txManagerTimeout = 2 * time.Minute // How long the tx manager can take to send a transaction.
// sendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
......@@ -361,8 +355,6 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, data []byte) (*typ
}
// Send the transaction through the txmgr
ctx, cancel := context.WithTimeout(ctx, txManagerTimeout)
defer cancel()
if receipt, err := l.txMgr.Send(ctx, txmgr.TxCandidate{
To: l.Rollup.BatchInboxAddress,
TxData: data,
......@@ -399,7 +391,7 @@ func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) {
// l1Tip gets the current L1 tip as a L1BlockRef. The passed context is assumed
// to be a lifetime context, so it is internally wrapped with a network timeout.
func (l *BatchSubmitter) l1Tip(ctx context.Context) (eth.L1BlockRef, error) {
tctx, cancel := context.WithTimeout(ctx, networkTimeout)
tctx, cancel := context.WithTimeout(ctx, l.NetworkTimeout)
defer cancel()
head, err := l.L1Client.HeaderByNumber(tctx, nil)
if err != nil {
......
......@@ -53,6 +53,7 @@ func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Cl
proposerCfg := proposer.Config{
L2OutputOracleAddr: cfg.OutputOracleAddr,
PollInterval: time.Second,
NetworkTimeout: time.Second,
L1Client: l1,
RollupClient: rollupCl,
AllowNonFinalized: cfg.AllowNonFinalized,
......
......@@ -579,6 +579,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
SafeAbortNonceTooLowCount: 3,
ResubmissionTimeout: 3 * time.Second,
ReceiptQueryInterval: 50 * time.Millisecond,
NetworkTimeout: 2 * time.Second,
},
AllowNonFinalized: cfg.NonFinalizedProposals,
LogConfig: oplog.CLIConfig{
......@@ -613,6 +614,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
SafeAbortNonceTooLowCount: 3,
ResubmissionTimeout: 3 * time.Second,
ReceiptQueryInterval: 50 * time.Millisecond,
NetworkTimeout: 2 * time.Second,
},
LogConfig: oplog.CLIConfig{
Level: "info",
......
......@@ -22,6 +22,7 @@ import (
type Config struct {
L2OutputOracleAddr common.Address
PollInterval time.Duration
NetworkTimeout time.Duration
TxManager txmgr.TxManager
L1Client *ethclient.Client
RollupClient *sources.RollupClient
......
......@@ -28,12 +28,6 @@ import (
"github.com/ethereum-optimism/optimism/op-service/txmgr"
)
const (
// defaultDialTimeout is default duration the service will wait on
// startup to make a connection to either the L1 or L2 backends.
defaultDialTimeout = 5 * time.Second
)
var supportedL2OutputVersion = eth.Bytes32{}
// Main is the entrypoint into the L2 Output Submitter. This method executes the
......@@ -138,7 +132,8 @@ type L2OutputSubmitter struct {
// This option is not necessary when higher proposal latency is acceptable and L1 is healthy.
allowNonFinalized bool
// How frequently to poll L2 for new finalized outputs
pollInterval time.Duration
pollInterval time.Duration
networkTimeout time.Duration
}
// NewL2OutputSubmitterFromCLIConfig creates a new L2 Output Submitter given the CLI Config
......@@ -178,6 +173,7 @@ func NewL2OutputSubmitterConfigFromCLIConfig(cfg CLIConfig, l log.Logger) (*Conf
return &Config{
L2OutputOracleAddr: l2ooAddress,
PollInterval: cfg.PollInterval,
NetworkTimeout: txManagerConfig.NetworkTimeout,
L1Client: l1Client,
RollupClient: rollupClient,
AllowNonFinalized: cfg.AllowNonFinalized,
......@@ -196,7 +192,7 @@ func NewL2OutputSubmitter(cfg Config, l log.Logger, m metrics.Metricer) (*L2Outp
return nil, err
}
cCtx, cCancel := context.WithTimeout(ctx, defaultDialTimeout)
cCtx, cCancel := context.WithTimeout(ctx, cfg.NetworkTimeout)
defer cCancel()
version, err := l2ooContract.Version(&bind.CallOpts{Context: cCtx})
if err != nil {
......@@ -227,6 +223,7 @@ func NewL2OutputSubmitter(cfg Config, l log.Logger, m metrics.Metricer) (*L2Outp
allowNonFinalized: cfg.AllowNonFinalized,
pollInterval: cfg.PollInterval,
networkTimeout: cfg.NetworkTimeout,
}, nil
}
......@@ -245,7 +242,7 @@ func (l *L2OutputSubmitter) Stop() {
// FetchNextOutputInfo gets the block number of the next proposal.
// It returns: the next block number, if the proposal should be made, error
func (l *L2OutputSubmitter) FetchNextOutputInfo(ctx context.Context) (*eth.OutputResponse, bool, error) {
cCtx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
cCtx, cancel := context.WithTimeout(ctx, l.networkTimeout)
defer cancel()
callOpts := &bind.CallOpts{
From: l.txMgr.From(),
......@@ -257,7 +254,7 @@ func (l *L2OutputSubmitter) FetchNextOutputInfo(ctx context.Context) (*eth.Outpu
return nil, false, err
}
// Fetch the current L2 heads
cCtx, cancel = context.WithTimeout(ctx, defaultDialTimeout)
cCtx, cancel = context.WithTimeout(ctx, l.networkTimeout)
defer cancel()
status, err := l.rollupClient.SyncStatus(cCtx)
if err != nil {
......@@ -281,7 +278,7 @@ func (l *L2OutputSubmitter) FetchNextOutputInfo(ctx context.Context) (*eth.Outpu
}
func (l *L2OutputSubmitter) fetchOuput(ctx context.Context, block *big.Int) (*eth.OutputResponse, bool, error) {
ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
ctx, cancel := context.WithTimeout(ctx, l.networkTimeout)
defer cancel()
output, err := l.rollupClient.OutputAtBlock(ctx, block.Uint64())
if err != nil {
......
......@@ -3,6 +3,7 @@ package proposer
import (
"context"
"fmt"
"time"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/sources"
......@@ -11,11 +12,12 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)
var defaultDialTimeout = 5 * time.Second
// dialEthClientWithTimeout attempts to dial the L1 provider using the provided
// URL. If the dial doesn't complete within defaultDialTimeout seconds, this
// method will return an error.
func dialEthClientWithTimeout(ctx context.Context, url string) (*ethclient.Client, error) {
ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
......
......@@ -22,10 +22,13 @@ const (
MnemonicFlagName = "mnemonic"
HDPathFlagName = "hd-path"
PrivateKeyFlagName = "private-key"
// Legacy TxMgr Flags
// TxMgr Flags (new + legacy + some shared flags)
NumConfirmationsFlagName = "num-confirmations"
SafeAbortNonceTooLowCountFlagName = "safe-abort-nonce-too-low-count"
ResubmissionTimeoutFlagName = "resubmission-timeout"
NetworkTimeoutFlagName = "network-timeout"
TxSendTimeoutFlagName = "txmgr.send-timeout"
ReceiptQueryIntervalFlagName = "txmgr.receipt-query-interval"
)
var (
......@@ -69,17 +72,35 @@ func CLIFlags(envPrefix string) []cli.Flag {
EnvVar: opservice.PrefixEnvVar(envPrefix, "NUM_CONFIRMATIONS"),
},
cli.Uint64Flag{
Name: "safe-abort-nonce-too-low-count",
Name: SafeAbortNonceTooLowCountFlagName,
Usage: "Number of ErrNonceTooLow observations required to give up on a tx at a particular nonce without receiving confirmation",
Value: 3,
EnvVar: opservice.PrefixEnvVar(envPrefix, "SAFE_ABORT_NONCE_TOO_LOW_COUNT"),
},
cli.DurationFlag{
Name: "resubmission-timeout",
Name: ResubmissionTimeoutFlagName,
Usage: "Duration we will wait before resubmitting a transaction to L1",
Value: 30 * time.Second,
EnvVar: opservice.PrefixEnvVar(envPrefix, "RESUBMISSION_TIMEOUT"),
},
cli.DurationFlag{
Name: NetworkTimeoutFlagName,
Usage: "Timeout for all network operations",
Value: 2 * time.Second,
EnvVar: opservice.PrefixEnvVar(envPrefix, "NETWORK_TIMEOUT"),
},
cli.DurationFlag{
Name: TxSendTimeoutFlagName,
Usage: "Timeout for sending transactions. If 0 it is disabled.",
Value: 0,
EnvVar: opservice.PrefixEnvVar(envPrefix, "TXMGR_TX_SEND_TIMEOUT"),
},
cli.DurationFlag{
Name: ReceiptQueryIntervalFlagName,
Usage: "Frequency to poll for receipts",
Value: 30 * time.Second,
EnvVar: opservice.PrefixEnvVar(envPrefix, "TXMGR_RECEIPT_QUERY_INTERVAL"),
},
}, client.CLIFlags(envPrefix)...)
}
......@@ -95,6 +116,8 @@ type CLIConfig struct {
SafeAbortNonceTooLowCount uint64
ResubmissionTimeout time.Duration
ReceiptQueryInterval time.Duration
NetworkTimeout time.Duration
TxSendTimeout time.Duration
}
func (m CLIConfig) Check() error {
......@@ -104,6 +127,15 @@ func (m CLIConfig) Check() error {
if m.NumConfirmations == 0 {
return errors.New("num confirmations must not be 0")
}
if m.NetworkTimeout == 0 {
return errors.New("must provide a network timeout")
}
if m.ResubmissionTimeout == 0 {
return errors.New("must provide a resumbission interval")
}
if m.ReceiptQueryInterval == 0 {
return errors.New("must provide a receipt query interval")
}
if err := m.SignerCLIConfig.Check(); err != nil {
return err
}
......@@ -122,6 +154,9 @@ func ReadCLIConfig(ctx *cli.Context) CLIConfig {
NumConfirmations: ctx.GlobalUint64(NumConfirmationsFlagName),
SafeAbortNonceTooLowCount: ctx.GlobalUint64(SafeAbortNonceTooLowCountFlagName),
ResubmissionTimeout: ctx.GlobalDuration(ResubmissionTimeoutFlagName),
ReceiptQueryInterval: ctx.GlobalDuration(ReceiptQueryIntervalFlagName),
NetworkTimeout: ctx.GlobalDuration(NetworkTimeoutFlagName),
TxSendTimeout: ctx.GlobalDuration(TxSendTimeoutFlagName),
}
}
......@@ -130,21 +165,21 @@ func NewConfig(cfg CLIConfig, l log.Logger) (Config, error) {
return Config{}, err
}
networkTimeout := 2 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), networkTimeout)
ctx, cancel := context.WithTimeout(context.Background(), cfg.NetworkTimeout)
defer cancel()
l1, err := ethclient.DialContext(ctx, cfg.L1RPCURL)
if err != nil {
return Config{}, err
}
ctx, cancel = context.WithTimeout(context.Background(), networkTimeout)
ctx, cancel = context.WithTimeout(context.Background(), cfg.NetworkTimeout)
defer cancel()
chainID, err := l1.ChainID(ctx)
if err != nil {
return Config{}, err
}
// Allow backwards compatible ways of specifying the HD path
hdPath := cfg.HDPath
if hdPath == "" && cfg.SequencerHDPath != "" {
hdPath = cfg.SequencerHDPath
......@@ -157,17 +192,13 @@ func NewConfig(cfg CLIConfig, l log.Logger) (Config, error) {
return Config{}, err
}
receiptQueryInterval := 30 * time.Second
if cfg.ReceiptQueryInterval != 0 {
receiptQueryInterval = cfg.ReceiptQueryInterval
}
return Config{
Backend: l1,
ResubmissionTimeout: cfg.ResubmissionTimeout,
ChainID: chainID,
NetworkTimeout: networkTimeout,
ReceiptQueryInterval: receiptQueryInterval,
TxSendTimeout: cfg.TxSendTimeout,
NetworkTimeout: cfg.NetworkTimeout,
ReceiptQueryInterval: cfg.ReceiptQueryInterval,
NumConfirmations: cfg.NumConfirmations,
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
Signer: signerFactory(chainID),
......@@ -187,10 +218,12 @@ type Config struct {
// ChainID is the chain ID of the L1 chain.
ChainID *big.Int
// TxSendTimeout is how long to wait for sending a transaction.
// By default it is unbounded. If set, this is recommended to be at least 20 minutes.
TxSendTimeout time.Duration
// NetworkTimeout is the allowed duration for a single network request.
// This is intended to be used for network requests that can be replayed.
//
// If not set, this will default to 2 seconds.
NetworkTimeout time.Duration
// RequireQueryInterval is the interval at which the tx manager will
......
......@@ -126,6 +126,11 @@ type TxCandidate struct {
//
// NOTE: Send should be called by AT MOST one caller at a time.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
if m.cfg.TxSendTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
defer cancel()
}
tx, err := m.craftTx(ctx, candidate)
if err != nil {
m.l.Error("Failed to create the transaction", "err", err)
......@@ -217,7 +222,9 @@ func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*typ
log := m.l.New("txHash", txHash, "nonce", nonce, "gasTipCap", gasTipCap, "gasFeeCap", gasFeeCap)
log.Info("publishing transaction")
err := m.backend.SendTransaction(ctx, tx)
cCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
err := m.backend.SendTransaction(cCtx, tx)
sendState.ProcessSendError(err)
if err != nil {
if errors.Is(err, context.Canceled) {
......@@ -232,7 +239,6 @@ func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*typ
log.Warn("Aborting transaction submission")
cancel()
}
// TODO(conner): add retry?
return
}
......@@ -279,7 +285,7 @@ func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*typ
}
// Increase the gas price & submit the new transaction
newTx, err := m.IncreaseGasPrice(ctx, tx)
newTx, err := m.increaseGasPrice(ctx, tx)
if err != nil {
m.l.Error("Failed to increase the gas price for the tx", "err", err)
// Don't `continue` here so we resubmit the transaction with the same gas price.
......@@ -311,7 +317,9 @@ func (m *SimpleTxManager) waitMined(ctx context.Context, tx *types.Transaction,
txHash := tx.Hash()
for {
receipt, err := m.backend.TransactionReceipt(ctx, txHash)
cCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
receipt, err := m.backend.TransactionReceipt(cCtx, txHash)
cancel()
switch {
case receipt != nil:
if sendState != nil {
......@@ -384,14 +392,14 @@ func (m *SimpleTxManager) suggestGasPriceCaps(ctx context.Context) (*big.Int, *b
return tip, head.BaseFee, nil
}
// IncreaseGasPrice takes the previous transaction & potentially clones then signs it with a higher tip.
// increaseGasPrice takes the previous transaction & potentially clones then signs it with a higher tip.
// If the tip + basefee suggested by the network are not greater than the previous values, the same transaction
// will be returned. If they are greater, this function will ensure that they are at least greater by 15% than
// the previous transaction's value to ensure that the price bump is large enough.
//
// We do not re-estimate the amount of gas used because for some stateful transactions (like output proposals) the
// act of including the transaction renders the repeat of the transaction invalid.
func (m *SimpleTxManager) IncreaseGasPrice(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) {
func (m *SimpleTxManager) increaseGasPrice(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) {
tip, basefee, err := m.suggestGasPriceCaps(ctx)
if err != nil {
return nil, err
......@@ -413,6 +421,8 @@ func (m *SimpleTxManager) IncreaseGasPrice(ctx context.Context, tx *types.Transa
Data: tx.Data(),
AccessList: tx.AccessList(),
}
ctx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
return m.cfg.Signer(ctx, m.cfg.From, types.NewTx(rawTx))
}
......
......@@ -724,7 +724,7 @@ func doGasPriceIncrease(t *testing.T, txTipCap, txFeeCap, newTip, newBaseFee int
GasTipCap: big.NewInt(txTipCap),
GasFeeCap: big.NewInt(txFeeCap),
})
newTx, err := mgr.IncreaseGasPrice(context.Background(), tx)
newTx, err := mgr.increaseGasPrice(context.Background(), tx)
require.NoError(t, err)
return tx, newTx
}
......@@ -831,7 +831,7 @@ func TestIncreaseGasPriceNotExponential(t *testing.T) {
// Run IncreaseGasPrice a bunch of times in a row to simulate a very fast resubmit loop.
for i := 0; i < 20; i++ {
ctx := context.Background()
newTx, err := mgr.IncreaseGasPrice(ctx, tx)
newTx, err := mgr.increaseGasPrice(ctx, tx)
require.NoError(t, err)
require.True(t, newTx.GasFeeCap().Cmp(feeCap) == 0, "new tx fee cap must be equal L1")
require.True(t, newTx.GasTipCap().Cmp(borkedBackend.gasTip) == 0, "new tx tip must be equal L1")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment