Commit e5adb7f2 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into aj/update-geth-erszbat

parents b1c38583 a70a4e3e
...@@ -51,7 +51,7 @@ func Main(version string, cliCtx *cli.Context) error { ...@@ -51,7 +51,7 @@ func Main(version string, cliCtx *cli.Context) error {
return err return err
} }
} }
defer batchSubmitter.StopIfRunning() defer batchSubmitter.StopIfRunning(context.Background())
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
...@@ -73,7 +73,7 @@ func Main(version string, cliCtx *cli.Context) error { ...@@ -73,7 +73,7 @@ func Main(version string, cliCtx *cli.Context) error {
l.Error("error starting metrics server", err) l.Error("error starting metrics server", err)
} }
}() }()
m.StartBalanceMetrics(ctx, l, batchSubmitter.L1Client, batchSubmitter.From) m.StartBalanceMetrics(ctx, l, batchSubmitter.L1Client, batchSubmitter.TxManager.From())
} }
rpcCfg := cfg.RPCConfig rpcCfg := cfg.RPCConfig
......
...@@ -18,6 +18,7 @@ var ( ...@@ -18,6 +18,7 @@ var (
ErrMaxDurationReached = errors.New("max channel duration reached") ErrMaxDurationReached = errors.New("max channel duration reached")
ErrChannelTimeoutClose = errors.New("close to channel timeout") ErrChannelTimeoutClose = errors.New("close to channel timeout")
ErrSeqWindowClose = errors.New("close to sequencer window timeout") ErrSeqWindowClose = errors.New("close to sequencer window timeout")
ErrTerminated = errors.New("channel terminated")
) )
type ChannelFullError struct { type ChannelFullError struct {
...@@ -188,7 +189,7 @@ func (c *channelBuilder) Reset() error { ...@@ -188,7 +189,7 @@ func (c *channelBuilder) Reset() error {
} }
// AddBlock adds a block to the channel compression pipeline. IsFull should be // AddBlock adds a block to the channel compression pipeline. IsFull should be
// called aftewards to test whether the channel is full. If full, a new channel // called afterwards to test whether the channel is full. If full, a new channel
// must be started. // must be started.
// //
// AddBlock returns a ChannelFullError if called even though the channel is // AddBlock returns a ChannelFullError if called even though the channel is
...@@ -307,16 +308,17 @@ func (c *channelBuilder) IsFull() bool { ...@@ -307,16 +308,17 @@ func (c *channelBuilder) IsFull() bool {
// FullErr returns the reason why the channel is full. If not full yet, it // FullErr returns the reason why the channel is full. If not full yet, it
// returns nil. // returns nil.
// //
// It returns a ChannelFullError wrapping one of six possible reasons for the // It returns a ChannelFullError wrapping one of the following possible reasons
// channel being full: // for the channel being full:
// - ErrInputTargetReached if the target amount of input data has been reached, // - ErrInputTargetReached if the target amount of input data has been reached,
// - derive.MaxRLPBytesPerChannel if the general maximum amount of input data // - derive.MaxRLPBytesPerChannel if the general maximum amount of input data
// would have been exceeded by the latest AddBlock call, // would have been exceeded by the latest AddBlock call,
// - ErrMaxFrameIndex if the maximum number of frames has been generated // - ErrMaxFrameIndex if the maximum number of frames has been generated
// (uint16), // (uint16),
// - ErrMaxDurationReached if the max channel duration got reached. // - ErrMaxDurationReached if the max channel duration got reached,
// - ErrChannelTimeoutClose if the consensus channel timeout got too close. // - ErrChannelTimeoutClose if the consensus channel timeout got too close,
// - ErrSeqWindowClose if the end of the sequencer window got too close. // - ErrSeqWindowClose if the end of the sequencer window got too close,
// - ErrTerminated if the channel was explicitly terminated.
func (c *channelBuilder) FullErr() error { func (c *channelBuilder) FullErr() error {
return c.fullErr return c.fullErr
} }
...@@ -402,6 +404,14 @@ func (c *channelBuilder) outputFrame() error { ...@@ -402,6 +404,14 @@ func (c *channelBuilder) outputFrame() error {
return err // possibly io.EOF (last frame) return err // possibly io.EOF (last frame)
} }
// Close immediately marks the channel as full with an ErrTerminated
// if the channel is not already full.
func (c *channelBuilder) Close() {
if !c.IsFull() {
c.setFullErr(ErrTerminated)
}
}
// HasFrame returns whether there's any available frame. If true, it can be // HasFrame returns whether there's any available frame. If true, it can be
// popped using NextFrame(). // popped using NextFrame().
// //
......
...@@ -41,6 +41,9 @@ type channelManager struct { ...@@ -41,6 +41,9 @@ type channelManager struct {
pendingTransactions map[txID]txData pendingTransactions map[txID]txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out // Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.BlockID confirmedTransactions map[txID]eth.BlockID
// if set to true, prevents production of any new channel frames
closed bool
} }
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager { func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager {
...@@ -60,6 +63,7 @@ func (s *channelManager) Clear() { ...@@ -60,6 +63,7 @@ func (s *channelManager) Clear() {
s.log.Trace("clearing channel manager state") s.log.Trace("clearing channel manager state")
s.blocks = s.blocks[:0] s.blocks = s.blocks[:0]
s.tip = common.Hash{} s.tip = common.Hash{}
s.closed = false
s.clearPendingChannel() s.clearPendingChannel()
} }
...@@ -78,6 +82,10 @@ func (s *channelManager) TxFailed(id txID) { ...@@ -78,6 +82,10 @@ func (s *channelManager) TxFailed(id txID) {
} }
s.metr.RecordBatchTxFailed() s.metr.RecordBatchTxFailed()
if s.closed && len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 {
s.log.Info("Channel has no submitted transactions, clearing for shutdown", "chID", s.pendingChannel.ID())
s.clearPendingChannel()
}
} }
// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in // TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in
...@@ -179,8 +187,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { ...@@ -179,8 +187,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame() dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks)) s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))
// Short circuit if there is a pending frame. // Short circuit if there is a pending frame or the channel manager is closed.
if dataPending { if dataPending || s.closed {
return s.nextTxData() return s.nextTxData()
} }
...@@ -344,3 +352,27 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) ...@@ -344,3 +352,27 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo)
SequenceNumber: l1info.SequenceNumber, SequenceNumber: l1info.SequenceNumber,
} }
} }
// Close closes the current pending channel, if one exists, outputs any remaining frames,
// and prevents the creation of any new channels.
// Any outputted frames still need to be published.
func (s *channelManager) Close() error {
if s.closed {
return nil
}
s.closed = true
// Any pending state can be proactively cleared if there are no submitted transactions
if len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 {
s.clearPendingChannel()
}
if s.pendingChannel == nil {
return nil
}
s.pendingChannel.Close()
return s.outputFrames()
}
...@@ -363,3 +363,145 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -363,3 +363,145 @@ func TestChannelManager_TxResend(t *testing.T) {
require.NoError(err) require.NoError(err)
require.Len(fs, 1) require.Len(fs, 1)
} }
// TestChannelManagerCloseBeforeFirstUse ensures that the channel manager
// will not produce any frames if closed immediately.
func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetFrameSize: 0,
MaxFrameSize: 100,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a, _ := derivetest.RandomL2Block(rng, 4)
m.Close()
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to contain no tx data")
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with no pending channels, and will not emit any new
// channel frames.
func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetFrameSize: 0,
MaxFrameSize: 100,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to return valid tx data")
m.TxConfirmed(txdata.ID(), eth.BlockID{})
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected channel manager to EOF")
m.Close()
err = m.AddL2Block(b)
require.NoError(err, "Failed to add L2 block")
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to return no new tx data")
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with a pending channel, and will not produce any
// new channel frames after this point.
func TestChannelManagerClosePendingChannel(t *testing.T) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetNumFrames: 100,
TargetFrameSize: 1000,
MaxFrameSize: 1000,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a := newMiniL2Block(50_000)
b := newMiniL2BlockWithNumberParent(10, big.NewInt(1), a.Hash())
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce valid tx data")
m.TxConfirmed(txdata.ID(), eth.BlockID{})
m.Close()
txdata, err = m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data")
m.TxConfirmed(txdata.ID(), eth.BlockID{})
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected channel manager to have no more tx data")
err = m.AddL2Block(b)
require.NoError(err, "Failed to add L2 block")
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}
// TestChannelManagerCloseAllTxsFailed ensures that the channel manager
// can gracefully close after producing transaction frames if none of these
// have successfully landed on chain.
func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetNumFrames: 100,
TargetFrameSize: 1000,
MaxFrameSize: 1000,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a := newMiniL2Block(50_000)
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce valid tx data")
m.TxFailed(txdata.ID())
// Show that this data will continue to be emitted as long as the transaction
// fails and the channel manager is not closed
txdata, err = m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to re-attempt the failed transaction")
m.TxFailed(txdata.ID())
m.Close()
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}
...@@ -3,7 +3,6 @@ package batcher ...@@ -3,7 +3,6 @@ package batcher
import ( import (
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli" "github.com/urfave/cli"
...@@ -17,7 +16,6 @@ import ( ...@@ -17,7 +16,6 @@ import (
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
opsigner "github.com/ethereum-optimism/optimism/op-signer/client"
) )
type Config struct { type Config struct {
...@@ -26,11 +24,9 @@ type Config struct { ...@@ -26,11 +24,9 @@ type Config struct {
L1Client *ethclient.Client L1Client *ethclient.Client
L2Client *ethclient.Client L2Client *ethclient.Client
RollupNode *sources.RollupClient RollupNode *sources.RollupClient
TxManager txmgr.TxManager
PollInterval time.Duration PollInterval time.Duration
From common.Address
TxManagerConfig txmgr.Config
// RollupConfig is queried at startup // RollupConfig is queried at startup
Rollup *rollup.Config Rollup *rollup.Config
...@@ -51,8 +47,6 @@ func (c *Config) Check() error { ...@@ -51,8 +47,6 @@ func (c *Config) Check() error {
} }
type CLIConfig struct { type CLIConfig struct {
/* Required Params */
// L1EthRpc is the HTTP provider URL for L1. // L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string L1EthRpc string
...@@ -81,43 +75,6 @@ type CLIConfig struct { ...@@ -81,43 +75,6 @@ type CLIConfig struct {
// and creating a new batch. // and creating a new batch.
PollInterval time.Duration PollInterval time.Duration
// NumConfirmations is the number of confirmations which we will wait after
// appending new batches.
NumConfirmations uint64
// SafeAbortNonceTooLowCount is the number of ErrNonceTooLowObservations
// required to give up on a tx at a particular nonce without receiving
// confirmation.
SafeAbortNonceTooLowCount uint64
// ResubmissionTimeout is time we will wait before resubmitting a
// transaction.
ResubmissionTimeout time.Duration
// Mnemonic is the HD seed used to derive the wallet private keys for both
// the sequence and proposer. Must be used in conjunction with
// SequencerHDPath and ProposerHDPath.
Mnemonic string
// SequencerHDPath is the derivation path used to obtain the private key for
// batched submission of sequencer transactions.
SequencerHDPath string
// PrivateKey is the private key used to submit sequencer transactions.
PrivateKey string
RPCConfig rpc.CLIConfig
/* Optional Params */
// TxManagerTimeout is the max amount of time to wait for the [txmgr].
// This will default to: 10 * time.Minute.
TxManagerTimeout time.Duration
// OfflineGasEstimation specifies whether the batcher should calculate
// gas estimations offline using the [core.IntrinsicGas] function.
OfflineGasEstimation bool
// MaxL1TxSize is the maximum size of a batch tx submitted to L1. // MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64 MaxL1TxSize uint64
...@@ -133,14 +90,11 @@ type CLIConfig struct { ...@@ -133,14 +90,11 @@ type CLIConfig struct {
Stopped bool Stopped bool
LogConfig oplog.CLIConfig TxMgrConfig txmgr.CLIConfig
RPCConfig rpc.CLIConfig
LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig MetricsConfig opmetrics.CLIConfig
PprofConfig oppprof.CLIConfig
PprofConfig oppprof.CLIConfig
// SignerConfig contains the client config for op-signer service
SignerConfig opsigner.CLIConfig
} }
func (c CLIConfig) Check() error { func (c CLIConfig) Check() error {
...@@ -156,7 +110,7 @@ func (c CLIConfig) Check() error { ...@@ -156,7 +110,7 @@ func (c CLIConfig) Check() error {
if err := c.PprofConfig.Check(); err != nil { if err := c.PprofConfig.Check(); err != nil {
return err return err
} }
if err := c.SignerConfig.Check(); err != nil { if err := c.TxMgrConfig.Check(); err != nil {
return err return err
} }
return nil return nil
...@@ -166,31 +120,23 @@ func (c CLIConfig) Check() error { ...@@ -166,31 +120,23 @@ func (c CLIConfig) Check() error {
func NewConfig(ctx *cli.Context) CLIConfig { func NewConfig(ctx *cli.Context) CLIConfig {
return CLIConfig{ return CLIConfig{
/* Required Flags */ /* Required Flags */
L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name), L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name), L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name),
RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name), RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name),
SubSafetyMargin: ctx.GlobalUint64(flags.SubSafetyMarginFlag.Name), SubSafetyMargin: ctx.GlobalUint64(flags.SubSafetyMarginFlag.Name),
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name), PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
NumConfirmations: ctx.GlobalUint64(flags.NumConfirmationsFlag.Name),
SafeAbortNonceTooLowCount: ctx.GlobalUint64(flags.SafeAbortNonceTooLowCountFlag.Name),
ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name),
/* Optional Flags */ /* Optional Flags */
OfflineGasEstimation: ctx.GlobalBool(flags.OfflineGasEstimationFlag.Name), MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
TxManagerTimeout: ctx.GlobalDuration(flags.TxManagerTimeoutFlag.Name), MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name), TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name), TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name), ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name), Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name), TxMgrConfig: txmgr.ReadCLIConfig(ctx),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name), RPCConfig: rpc.ReadCLIConfig(ctx),
Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name), LogConfig: oplog.ReadCLIConfig(ctx),
SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name), MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name), PprofConfig: oppprof.ReadCLIConfig(ctx),
RPCConfig: rpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
SignerConfig: opsigner.ReadCLIConfig(ctx),
} }
} }
...@@ -13,7 +13,6 @@ import ( ...@@ -13,7 +13,6 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -27,10 +26,11 @@ type BatchSubmitter struct { ...@@ -27,10 +26,11 @@ type BatchSubmitter struct {
txMgr txmgr.TxManager txMgr txmgr.TxManager
wg sync.WaitGroup wg sync.WaitGroup
done chan struct{}
ctx context.Context shutdownCtx context.Context
cancel context.CancelFunc cancelShutdownCtx context.CancelFunc
killCtx context.Context
cancelKillCtx context.CancelFunc
mutex sync.Mutex mutex sync.Mutex
running bool running bool
...@@ -47,11 +47,6 @@ type BatchSubmitter struct { ...@@ -47,11 +47,6 @@ type BatchSubmitter struct {
func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metricer) (*BatchSubmitter, error) { func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metricer) (*BatchSubmitter, error) {
ctx := context.Background() ctx := context.Background()
signer, fromAddress, err := opcrypto.SignerFactoryFromConfig(l, cfg.PrivateKey, cfg.Mnemonic, cfg.SequencerHDPath, cfg.SignerConfig)
if err != nil {
return nil, err
}
// Connect to L1 and L2 providers. Perform these last since they are the // Connect to L1 and L2 providers. Perform these last since they are the
// most expensive. // most expensive.
l1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc) l1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc)
...@@ -74,24 +69,19 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -74,24 +69,19 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
return nil, fmt.Errorf("querying rollup config: %w", err) return nil, fmt.Errorf("querying rollup config: %w", err)
} }
txManagerConfig := txmgr.Config{ txManagerConfig, err := txmgr.NewConfig(cfg.TxMgrConfig, l)
ResubmissionTimeout: cfg.ResubmissionTimeout, if err != nil {
ReceiptQueryInterval: time.Second, return nil, err
NumConfirmations: cfg.NumConfirmations,
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
From: fromAddress,
ChainID: rcfg.L1ChainID,
Signer: signer(rcfg.L1ChainID),
} }
txManager := txmgr.NewSimpleTxManager("batcher", l, txManagerConfig)
batcherCfg := Config{ batcherCfg := Config{
L1Client: l1Client, L1Client: l1Client,
L2Client: l2Client, L2Client: l2Client,
RollupNode: rollupClient, RollupNode: rollupClient,
PollInterval: cfg.PollInterval, PollInterval: cfg.PollInterval,
TxManagerConfig: txManagerConfig, TxManager: txManager,
From: fromAddress, Rollup: rcfg,
Rollup: rcfg,
Channel: ChannelConfig{ Channel: ChannelConfig{
SeqWindowSize: rcfg.SeqWindowSize, SeqWindowSize: rcfg.SeqWindowSize,
ChannelTimeout: rcfg.ChannelTimeout, ChannelTimeout: rcfg.ChannelTimeout,
...@@ -115,19 +105,19 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -115,19 +105,19 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
// NewBatchSubmitter initializes the BatchSubmitter, gathering any resources // NewBatchSubmitter initializes the BatchSubmitter, gathering any resources
// that will be needed during operation. // that will be needed during operation.
func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger, m metrics.Metricer) (*BatchSubmitter, error) { func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger, m metrics.Metricer) (*BatchSubmitter, error) {
balance, err := cfg.L1Client.BalanceAt(ctx, cfg.From, nil) balance, err := cfg.L1Client.BalanceAt(ctx, cfg.TxManager.From(), nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
cfg.log = l cfg.log = l
cfg.log.Info("creating batch submitter", "submitter_addr", cfg.From, "submitter_bal", balance) cfg.log.Info("creating batch submitter", "submitter_addr", cfg.TxManager.From(), "submitter_bal", balance)
cfg.metr = m cfg.metr = m
return &BatchSubmitter{ return &BatchSubmitter{
Config: cfg, Config: cfg,
txMgr: txmgr.NewSimpleTxManager("batcher", l, cfg.TxManagerConfig, cfg.L1Client), txMgr: cfg.TxManager,
state: NewChannelManager(l, m, cfg.Channel), state: NewChannelManager(l, m, cfg.Channel),
}, nil }, nil
...@@ -144,10 +134,8 @@ func (l *BatchSubmitter) Start() error { ...@@ -144,10 +134,8 @@ func (l *BatchSubmitter) Start() error {
} }
l.running = true l.running = true
l.done = make(chan struct{}) l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background())
// TODO: this context only exists because the event loop doesn't reach done l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background())
// if the tx manager is blocking forever due to e.g. insufficient balance.
l.ctx, l.cancel = context.WithCancel(context.Background())
l.state.Clear() l.state.Clear()
l.lastStoredBlock = eth.BlockID{} l.lastStoredBlock = eth.BlockID{}
...@@ -159,11 +147,11 @@ func (l *BatchSubmitter) Start() error { ...@@ -159,11 +147,11 @@ func (l *BatchSubmitter) Start() error {
return nil return nil
} }
func (l *BatchSubmitter) StopIfRunning() { func (l *BatchSubmitter) StopIfRunning(ctx context.Context) {
_ = l.Stop() _ = l.Stop(ctx)
} }
func (l *BatchSubmitter) Stop() error { func (l *BatchSubmitter) Stop(ctx context.Context) error {
l.log.Info("Stopping Batch Submitter") l.log.Info("Stopping Batch Submitter")
l.mutex.Lock() l.mutex.Lock()
...@@ -174,9 +162,18 @@ func (l *BatchSubmitter) Stop() error { ...@@ -174,9 +162,18 @@ func (l *BatchSubmitter) Stop() error {
} }
l.running = false l.running = false
l.cancel() // go routine will call cancelKill() if the passed in ctx is ever Done
close(l.done) cancelKill := l.cancelKillCtx
wrapped, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-wrapped.Done()
cancelKill()
}()
l.cancelShutdownCtx()
l.wg.Wait() l.wg.Wait()
l.cancelKillCtx()
l.log.Info("Batch Submitter stopped") l.log.Info("Batch Submitter stopped")
...@@ -292,47 +289,57 @@ func (l *BatchSubmitter) loop() { ...@@ -292,47 +289,57 @@ func (l *BatchSubmitter) loop() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
l.loadBlocksIntoState(l.ctx) l.loadBlocksIntoState(l.shutdownCtx)
l.publishStateToL1(l.killCtx)
blockLoop: case <-l.shutdownCtx.Done():
for { l.publishStateToL1(l.killCtx)
l1tip, err := l.l1Tip(l.ctx) return
if err != nil { }
l.log.Error("Failed to query L1 tip", "error", err) }
break }
}
l.recordL1Tip(l1tip) // publishStateToL1 loops through the block data loaded into `state` and
// submits the associated data to the L1 in the form of channel frames.
// Collect next transaction data func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
txdata, err := l.state.TxData(l1tip.ID()) for {
if err == io.EOF { // Attempt to gracefully terminate the current channel, ensuring that no new frames will be
l.log.Trace("no transaction data available") // produced. Any remaining frames must still be published to the L1 to prevent stalling.
break // local for loop select {
} else if err != nil { case <-ctx.Done():
l.log.Error("unable to get tx data", "err", err) err := l.state.Close()
break if err != nil {
} l.log.Error("error closing the channel manager", "err", err)
}
// Record TX Status case <-l.shutdownCtx.Done():
if receipt, err := l.sendTransaction(l.ctx, txdata.Bytes()); err != nil { err := l.state.Close()
l.recordFailedTx(txdata.ID(), err) if err != nil {
} else { l.log.Error("error closing the channel manager", "err", err)
l.recordConfirmedTx(txdata.ID(), receipt)
}
// hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending
// from the channel manager rather than sending the channel in a loop. This stalls b/c if the
// context is cancelled while sending, it will never fully clear the pending txns.
select {
case <-l.ctx.Done():
break blockLoop
default:
}
} }
default:
}
case <-l.done: l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
return return
} }
l.recordL1Tip(l1tip)
// Collect next transaction data
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
break
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
break
}
// Record TX Status
if receipt, err := l.sendTransaction(ctx, txdata.Bytes()); err != nil {
l.recordFailedTx(txdata.ID(), err)
} else {
l.recordConfirmedTx(txdata.ID(), receipt)
}
} }
} }
...@@ -359,7 +366,7 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, data []byte) (*typ ...@@ -359,7 +366,7 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, data []byte) (*typ
if receipt, err := l.txMgr.Send(ctx, txmgr.TxCandidate{ if receipt, err := l.txMgr.Send(ctx, txmgr.TxCandidate{
To: l.Rollup.BatchInboxAddress, To: l.Rollup.BatchInboxAddress,
TxData: data, TxData: data,
From: l.From, From: l.txMgr.From(),
GasLimit: intrinsicGas, GasLimit: intrinsicGas,
}); err != nil { }); err != nil {
l.log.Warn("unable to publish tx", "err", err, "data_size", len(data)) l.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
......
package batcher
import (
"context"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum-optimism/optimism/op-service/txmgr/mocks"
)
// TestBatchSubmitter_SendTransaction tests the driver's
// [SendTransaction] external facing function.
func TestBatchSubmitter_SendTransaction(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
txMgr := mocks.TxManager{}
batcherInboxAddress := common.HexToAddress("0x42000000000000000000000000000000000000ff")
chainID := big.NewInt(1)
sender := common.HexToAddress("0xdeadbeef")
bs := BatchSubmitter{
Config: Config{
log: log,
From: sender,
Rollup: &rollup.Config{
L1ChainID: chainID,
BatchInboxAddress: batcherInboxAddress,
},
},
txMgr: &txMgr,
}
txData := []byte{0x00, 0x01, 0x02}
gasTipCap := big.NewInt(136)
gasFeeCap := big.NewInt(137)
gas := uint64(1337)
// Candidate gas should be calculated with [core.IntrinsicGas]
intrinsicGas, err := core.IntrinsicGas(txData, nil, false, true, true, false)
require.NoError(t, err)
candidate := txmgr.TxCandidate{
To: batcherInboxAddress,
TxData: txData,
From: sender,
GasLimit: intrinsicGas,
}
tx := types.NewTx(&types.DynamicFeeTx{
ChainID: chainID,
Nonce: 0,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Gas: gas,
To: &batcherInboxAddress,
Data: txData,
})
txHash := tx.Hash()
expectedReceipt := types.Receipt{
Type: 1,
PostState: []byte{},
Status: uint64(1),
CumulativeGasUsed: gas,
TxHash: txHash,
GasUsed: gas,
}
txMgr.On("Send", mock.Anything, candidate).Return(&expectedReceipt, nil)
receipt, err := bs.sendTransaction(context.Background(), tx.Data())
require.NoError(t, err)
require.Equal(t, receipt, &expectedReceipt)
}
package flags package flags
import ( import (
"time"
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-batcher/rpc"
...@@ -11,14 +9,13 @@ import ( ...@@ -11,14 +9,13 @@ import (
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
opsigner "github.com/ethereum-optimism/optimism/op-signer/client" "github.com/ethereum-optimism/optimism/op-service/txmgr"
) )
const envVarPrefix = "OP_BATCHER" const envVarPrefix = "OP_BATCHER"
var ( var (
/* Required flags */ // Required flags
L1EthRpcFlag = cli.StringFlag{ L1EthRpcFlag = cli.StringFlag{
Name: "l1-eth-rpc", Name: "l1-eth-rpc",
Usage: "HTTP provider URL for L1", Usage: "HTTP provider URL for L1",
...@@ -52,41 +49,8 @@ var ( ...@@ -52,41 +49,8 @@ var (
Required: true, Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"),
} }
NumConfirmationsFlag = cli.Uint64Flag{
Name: "num-confirmations",
Usage: "Number of confirmations which we will wait after " +
"appending a new batch",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "NUM_CONFIRMATIONS"),
}
SafeAbortNonceTooLowCountFlag = cli.Uint64Flag{
Name: "safe-abort-nonce-too-low-count",
Usage: "Number of ErrNonceTooLow observations required to " +
"give up on a tx at a particular nonce without receiving " +
"confirmation",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SAFE_ABORT_NONCE_TOO_LOW_COUNT"),
}
ResubmissionTimeoutFlag = cli.DurationFlag{
Name: "resubmission-timeout",
Usage: "Duration we will wait before resubmitting a " +
"transaction to L1",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "RESUBMISSION_TIMEOUT"),
}
/* Optional flags */ // Optional flags
OfflineGasEstimationFlag = cli.BoolFlag{
Name: "offline-gas-estimation",
Usage: "Whether to use offline gas estimation",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "OFFLINE_GAS_ESTIMATION"),
}
TxManagerTimeoutFlag = cli.DurationFlag{
Name: "tx-manager-timeout",
Usage: "Maximum duration to wait for L1 transactions, including resubmissions",
Value: 10 * time.Minute,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "TX_MANAGER_TIMEOUT"),
}
MaxChannelDurationFlag = cli.Uint64Flag{ MaxChannelDurationFlag = cli.Uint64Flag{
Name: "max-channel-duration", Name: "max-channel-duration",
Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.", Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.",
...@@ -122,23 +86,8 @@ var ( ...@@ -122,23 +86,8 @@ var (
Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC", Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "STOPPED"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "STOPPED"),
} }
MnemonicFlag = cli.StringFlag{ // Legacy Flags
Name: "mnemonic", SequencerHDPathFlag = txmgr.SequencerHDPathFlag
Usage: "The mnemonic used to derive the wallets for either the " +
"sequencer or the l2output",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "MNEMONIC"),
}
SequencerHDPathFlag = cli.StringFlag{
Name: "sequencer-hd-path",
Usage: "The HD path used to derive the sequencer wallet from the " +
"mnemonic. The mnemonic flag must also be set.",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SEQUENCER_HD_PATH"),
}
PrivateKeyFlag = cli.StringFlag{
Name: "private-key",
Usage: "The private key to use with the l2output wallet. Must not be used with mnemonic.",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "PRIVATE_KEY"),
}
) )
var requiredFlags = []cli.Flag{ var requiredFlags = []cli.Flag{
...@@ -147,23 +96,15 @@ var requiredFlags = []cli.Flag{ ...@@ -147,23 +96,15 @@ var requiredFlags = []cli.Flag{
RollupRpcFlag, RollupRpcFlag,
SubSafetyMarginFlag, SubSafetyMarginFlag,
PollIntervalFlag, PollIntervalFlag,
NumConfirmationsFlag,
SafeAbortNonceTooLowCountFlag,
ResubmissionTimeoutFlag,
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
OfflineGasEstimationFlag,
TxManagerTimeoutFlag,
MaxChannelDurationFlag, MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag, MaxL1TxSizeBytesFlag,
TargetL1TxSizeBytesFlag, TargetL1TxSizeBytesFlag,
TargetNumFramesFlag, TargetNumFramesFlag,
ApproxComprRatioFlag, ApproxComprRatioFlag,
StoppedFlag, StoppedFlag,
MnemonicFlag,
SequencerHDPathFlag,
PrivateKeyFlag,
} }
func init() { func init() {
...@@ -172,8 +113,8 @@ func init() { ...@@ -172,8 +113,8 @@ func init() {
optionalFlags = append(optionalFlags, oplog.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, oplog.CLIFlags(envVarPrefix)...)
optionalFlags = append(optionalFlags, opmetrics.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, opmetrics.CLIFlags(envVarPrefix)...)
optionalFlags = append(optionalFlags, oppprof.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, oppprof.CLIFlags(envVarPrefix)...)
optionalFlags = append(optionalFlags, opsigner.CLIFlags(envVarPrefix)...)
optionalFlags = append(optionalFlags, rpc.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, rpc.CLIFlags(envVarPrefix)...)
optionalFlags = append(optionalFlags, txmgr.CLIFlags(envVarPrefix)...)
Flags = append(requiredFlags, optionalFlags...) Flags = append(requiredFlags, optionalFlags...)
} }
......
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
type batcherClient interface { type batcherClient interface {
Start() error Start() error
Stop() error Stop(ctx context.Context) error
} }
type adminAPI struct { type adminAPI struct {
...@@ -23,6 +23,6 @@ func (a *adminAPI) StartBatcher(_ context.Context) error { ...@@ -23,6 +23,6 @@ func (a *adminAPI) StartBatcher(_ context.Context) error {
return a.b.Start() return a.b.Start()
} }
func (a *adminAPI) StopBatcher(_ context.Context) error { func (a *adminAPI) StopBatcher(ctx context.Context) error {
return a.b.Stop() return a.b.Stop(ctx)
} }
...@@ -18,7 +18,6 @@ import ( ...@@ -18,7 +18,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-proposer/metrics" "github.com/ethereum-optimism/optimism/op-proposer/metrics"
"github.com/ethereum-optimism/optimism/op-proposer/proposer" "github.com/ethereum-optimism/optimism/op-proposer/proposer"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
) )
...@@ -34,37 +33,31 @@ type L2Proposer struct { ...@@ -34,37 +33,31 @@ type L2Proposer struct {
driver *proposer.L2OutputSubmitter driver *proposer.L2OutputSubmitter
address common.Address address common.Address
privKey *ecdsa.PrivateKey privKey *ecdsa.PrivateKey
signer opcrypto.SignerFn
contractAddr common.Address contractAddr common.Address
lastTx common.Hash lastTx common.Hash
} }
type fakeTxMgr struct {
from common.Address
}
func (f fakeTxMgr) From() common.Address {
return f.from
}
func (f fakeTxMgr) Send(_ context.Context, _ txmgr.TxCandidate) (*types.Receipt, error) {
panic("unimplemented")
}
func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Client, rollupCl *sources.RollupClient) *L2Proposer { func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Client, rollupCl *sources.RollupClient) *L2Proposer {
signer := func(chainID *big.Int) opcrypto.SignerFn {
s := opcrypto.PrivateKeySignerFn(cfg.ProposerKey, chainID)
return func(_ context.Context, addr common.Address, tx *types.Transaction) (*types.Transaction, error) {
return s(addr, tx)
}
}
from := crypto.PubkeyToAddress(cfg.ProposerKey.PublicKey)
proposerCfg := proposer.Config{ proposerCfg := proposer.Config{
L2OutputOracleAddr: cfg.OutputOracleAddr, L2OutputOracleAddr: cfg.OutputOracleAddr,
PollInterval: time.Second, PollInterval: time.Second,
TxManagerConfig: txmgr.Config{ L1Client: l1,
ResubmissionTimeout: 5 * time.Second, RollupClient: rollupCl,
ReceiptQueryInterval: time.Second, AllowNonFinalized: cfg.AllowNonFinalized,
NumConfirmations: 1, // We use custom signing here instead of using the transaction manager.
SafeAbortNonceTooLowCount: 4, TxManager: fakeTxMgr{from: crypto.PubkeyToAddress(cfg.ProposerKey.PublicKey)},
From: from,
ChainID: big.NewInt(420),
// Signer is loaded in `proposer.NewL2OutputSubmitter`
},
L1Client: l1,
RollupClient: rollupCl,
AllowNonFinalized: cfg.AllowNonFinalized,
From: from,
SignerFnFactory: signer,
} }
dr, err := proposer.NewL2OutputSubmitter(proposerCfg, log, metrics.NoopMetrics) dr, err := proposer.NewL2OutputSubmitter(proposerCfg, log, metrics.NoopMetrics)
...@@ -76,7 +69,6 @@ func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Cl ...@@ -76,7 +69,6 @@ func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Cl
driver: dr, driver: dr,
address: crypto.PubkeyToAddress(cfg.ProposerKey.PublicKey), address: crypto.PubkeyToAddress(cfg.ProposerKey.PublicKey),
privKey: cfg.ProposerKey, privKey: cfg.ProposerKey,
signer: proposerCfg.TxManagerConfig.Signer,
contractAddr: cfg.OutputOracleAddr, contractAddr: cfg.OutputOracleAddr,
} }
} }
......
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
proposermetrics "github.com/ethereum-optimism/optimism/op-proposer/metrics" proposermetrics "github.com/ethereum-optimism/optimism/op-proposer/metrics"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer" l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
...@@ -329,46 +330,52 @@ func TestMigration(t *testing.T) { ...@@ -329,46 +330,52 @@ func TestMigration(t *testing.T) {
}) })
batcher, err := bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{ batcher, err := bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{
L1EthRpc: forkedL1URL, L1EthRpc: forkedL1URL,
L2EthRpc: gethNode.WSEndpoint(), L2EthRpc: gethNode.WSEndpoint(),
RollupRpc: rollupNode.HTTPEndpoint(), RollupRpc: rollupNode.HTTPEndpoint(),
TxManagerTimeout: 10 * time.Minute, MaxChannelDuration: 1,
OfflineGasEstimation: true, MaxL1TxSize: 120_000,
MaxChannelDuration: 1, TargetL1TxSize: 100_000,
MaxL1TxSize: 120_000, TargetNumFrames: 1,
TargetL1TxSize: 100_000, ApproxComprRatio: 0.4,
TargetNumFrames: 1, SubSafetyMargin: 4,
ApproxComprRatio: 0.4, PollInterval: 50 * time.Millisecond,
SubSafetyMargin: 4, TxMgrConfig: txmgr.CLIConfig{
PollInterval: 50 * time.Millisecond, L1RPCURL: forkedL1URL,
NumConfirmations: 1, PrivateKey: hexPriv(secrets.Batcher),
ResubmissionTimeout: 5 * time.Second, NumConfirmations: 1,
SafeAbortNonceTooLowCount: 3, ResubmissionTimeout: 5 * time.Second,
SafeAbortNonceTooLowCount: 3,
},
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: "info", Level: "info",
Format: "text", Format: "text",
}, },
PrivateKey: hexPriv(secrets.Batcher),
}, lgr.New("module", "batcher"), batchermetrics.NoopMetrics) }, lgr.New("module", "batcher"), batchermetrics.NoopMetrics)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
batcher.StopIfRunning() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
batcher.StopIfRunning(ctx)
}) })
proposer, err := l2os.NewL2OutputSubmitterFromCLIConfig(l2os.CLIConfig{ proposer, err := l2os.NewL2OutputSubmitterFromCLIConfig(l2os.CLIConfig{
L1EthRpc: forkedL1URL, L1EthRpc: forkedL1URL,
RollupRpc: rollupNode.HTTPEndpoint(), RollupRpc: rollupNode.HTTPEndpoint(),
L2OOAddress: l2OS.Address.String(), L2OOAddress: l2OS.Address.String(),
PollInterval: 50 * time.Millisecond, PollInterval: 50 * time.Millisecond,
NumConfirmations: 1, AllowNonFinalized: true,
ResubmissionTimeout: 3 * time.Second, TxMgrConfig: txmgr.CLIConfig{
SafeAbortNonceTooLowCount: 3, L1RPCURL: forkedL1URL,
AllowNonFinalized: true, PrivateKey: hexPriv(secrets.Proposer),
NumConfirmations: 1,
ResubmissionTimeout: 3 * time.Second,
SafeAbortNonceTooLowCount: 3,
},
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: "info", Level: "info",
Format: "text", Format: "text",
}, },
PrivateKey: hexPriv(secrets.Proposer),
}, lgr.New("module", "proposer"), proposermetrics.NoopMetrics) }, lgr.New("module", "proposer"), proposermetrics.NoopMetrics)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
......
...@@ -40,6 +40,7 @@ import ( ...@@ -40,6 +40,7 @@ import (
proposermetrics "github.com/ethereum-optimism/optimism/op-proposer/metrics" proposermetrics "github.com/ethereum-optimism/optimism/op-proposer/metrics"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer" l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
) )
var ( var (
...@@ -219,7 +220,9 @@ func (sys *System) Close() { ...@@ -219,7 +220,9 @@ func (sys *System) Close() {
sys.L2OutputSubmitter.Stop() sys.L2OutputSubmitter.Stop()
} }
if sys.BatchSubmitter != nil { if sys.BatchSubmitter != nil {
sys.BatchSubmitter.StopIfRunning() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
sys.BatchSubmitter.StopIfRunning(ctx)
} }
for _, node := range sys.RollupNodes { for _, node := range sys.RollupNodes {
...@@ -565,19 +568,23 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -565,19 +568,23 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
// L2Output Submitter // L2Output Submitter
sys.L2OutputSubmitter, err = l2os.NewL2OutputSubmitterFromCLIConfig(l2os.CLIConfig{ sys.L2OutputSubmitter, err = l2os.NewL2OutputSubmitterFromCLIConfig(l2os.CLIConfig{
L1EthRpc: sys.Nodes["l1"].WSEndpoint(), L1EthRpc: sys.Nodes["l1"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(), RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
L2OOAddress: predeploys.DevL2OutputOracleAddr.String(), L2OOAddress: predeploys.DevL2OutputOracleAddr.String(),
PollInterval: 50 * time.Millisecond, PollInterval: 50 * time.Millisecond,
NumConfirmations: 1, TxMgrConfig: txmgr.CLIConfig{
ResubmissionTimeout: 3 * time.Second, L1RPCURL: sys.Nodes["l1"].WSEndpoint(),
SafeAbortNonceTooLowCount: 3, PrivateKey: hexPriv(cfg.Secrets.Proposer),
AllowNonFinalized: cfg.NonFinalizedProposals, NumConfirmations: 1,
SafeAbortNonceTooLowCount: 3,
ResubmissionTimeout: 3 * time.Second,
ReceiptQueryInterval: 50 * time.Millisecond,
},
AllowNonFinalized: cfg.NonFinalizedProposals,
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: "info", Level: "info",
Format: "text", Format: "text",
}, },
PrivateKey: hexPriv(cfg.Secrets.Proposer),
}, sys.cfg.Loggers["proposer"], proposermetrics.NoopMetrics) }, sys.cfg.Loggers["proposer"], proposermetrics.NoopMetrics)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to setup l2 output submitter: %w", err) return nil, fmt.Errorf("unable to setup l2 output submitter: %w", err)
...@@ -588,28 +595,29 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -588,28 +595,29 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
} }
// Batch Submitter // Batch Submitter
txManagerTimeout := 10 * time.Minute
sys.BatchSubmitter, err = bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{ sys.BatchSubmitter, err = bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{
L1EthRpc: sys.Nodes["l1"].WSEndpoint(), L1EthRpc: sys.Nodes["l1"].WSEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(), L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(), RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
TxManagerTimeout: txManagerTimeout, MaxChannelDuration: 1,
OfflineGasEstimation: true, MaxL1TxSize: 120_000,
MaxChannelDuration: 1, TargetL1TxSize: 100_000,
MaxL1TxSize: 120_000, TargetNumFrames: 1,
TargetL1TxSize: 100_000, ApproxComprRatio: 0.4,
TargetNumFrames: 1, SubSafetyMargin: 4,
ApproxComprRatio: 0.4, PollInterval: 50 * time.Millisecond,
SubSafetyMargin: 4, TxMgrConfig: txmgr.CLIConfig{
PollInterval: 50 * time.Millisecond, L1RPCURL: sys.Nodes["l1"].WSEndpoint(),
NumConfirmations: 1, PrivateKey: hexPriv(cfg.Secrets.Batcher),
ResubmissionTimeout: 5 * time.Second, NumConfirmations: 1,
SafeAbortNonceTooLowCount: 3, SafeAbortNonceTooLowCount: 3,
ResubmissionTimeout: 3 * time.Second,
ReceiptQueryInterval: 50 * time.Millisecond,
},
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: "info", Level: "info",
Format: "text", Format: "text",
}, },
PrivateKey: hexPriv(cfg.Secrets.Batcher),
}, sys.cfg.Loggers["batcher"], batchermetrics.NoopMetrics) }, sys.cfg.Loggers["batcher"], batchermetrics.NoopMetrics)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to setup batch submitter: %w", err) return nil, fmt.Errorf("failed to setup batch submitter: %w", err)
......
...@@ -1449,7 +1449,7 @@ func TestStopStartBatcher(t *testing.T) { ...@@ -1449,7 +1449,7 @@ func TestStopStartBatcher(t *testing.T) {
require.Greater(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain did not advance") require.Greater(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain did not advance")
// stop the batch submission // stop the batch submission
err = sys.BatchSubmitter.Stop() err = sys.BatchSubmitter.Stop(context.Background())
require.Nil(t, err) require.Nil(t, err)
// wait for any old safe blocks being submitted / derived // wait for any old safe blocks being submitted / derived
......
...@@ -8,14 +8,13 @@ import ( ...@@ -8,14 +8,13 @@ import (
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
opsigner "github.com/ethereum-optimism/optimism/op-signer/client" "github.com/ethereum-optimism/optimism/op-service/txmgr"
) )
const envVarPrefix = "OP_PROPOSER" const envVarPrefix = "OP_PROPOSER"
var ( var (
/* Required Flags */ // Required Flags
L1EthRpcFlag = cli.StringFlag{ L1EthRpcFlag = cli.StringFlag{
Name: "l1-eth-rpc", Name: "l1-eth-rpc",
Usage: "HTTP provider URL for L1", Usage: "HTTP provider URL for L1",
...@@ -41,53 +40,14 @@ var ( ...@@ -41,53 +40,14 @@ var (
Required: true, Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"),
} }
NumConfirmationsFlag = cli.Uint64Flag{ // Optional flags
Name: "num-confirmations",
Usage: "Number of confirmations which we will wait after " +
"appending a new batch",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "NUM_CONFIRMATIONS"),
}
SafeAbortNonceTooLowCountFlag = cli.Uint64Flag{
Name: "safe-abort-nonce-too-low-count",
Usage: "Number of ErrNonceTooLow observations required to " +
"give up on a tx at a particular nonce without receiving " +
"confirmation",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SAFE_ABORT_NONCE_TOO_LOW_COUNT"),
}
ResubmissionTimeoutFlag = cli.DurationFlag{
Name: "resubmission-timeout",
Usage: "Duration we will wait before resubmitting a " +
"transaction to L1",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "RESUBMISSION_TIMEOUT"),
}
/* Optional flags */
MnemonicFlag = cli.StringFlag{
Name: "mnemonic",
Usage: "The mnemonic used to derive the wallets for either the " +
"sequencer or the l2output",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "MNEMONIC"),
}
L2OutputHDPathFlag = cli.StringFlag{
Name: "l2-output-hd-path",
Usage: "The HD path used to derive the l2output wallet from the " +
"mnemonic. The mnemonic flag must also be set.",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "L2_OUTPUT_HD_PATH"),
}
PrivateKeyFlag = cli.StringFlag{
Name: "private-key",
Usage: "The private key to use with the l2output wallet. Must not be used with mnemonic.",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "PRIVATE_KEY"),
}
AllowNonFinalizedFlag = cli.BoolFlag{ AllowNonFinalizedFlag = cli.BoolFlag{
Name: "allow-non-finalized", Name: "allow-non-finalized",
Usage: "Allow the proposer to submit proposals for L2 blocks derived from non-finalized L1 blocks.", Usage: "Allow the proposer to submit proposals for L2 blocks derived from non-finalized L1 blocks.",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ALLOW_NON_FINALIZED"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ALLOW_NON_FINALIZED"),
} }
// Legacy Flags
L2OutputHDPathFlag = txmgr.L2OutputHDPathFlag
) )
var requiredFlags = []cli.Flag{ var requiredFlags = []cli.Flag{
...@@ -95,15 +55,9 @@ var requiredFlags = []cli.Flag{ ...@@ -95,15 +55,9 @@ var requiredFlags = []cli.Flag{
RollupRpcFlag, RollupRpcFlag,
L2OOAddressFlag, L2OOAddressFlag,
PollIntervalFlag, PollIntervalFlag,
NumConfirmationsFlag,
SafeAbortNonceTooLowCountFlag,
ResubmissionTimeoutFlag,
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
MnemonicFlag,
L2OutputHDPathFlag,
PrivateKeyFlag,
AllowNonFinalizedFlag, AllowNonFinalizedFlag,
} }
...@@ -113,7 +67,7 @@ func init() { ...@@ -113,7 +67,7 @@ func init() {
optionalFlags = append(optionalFlags, oplog.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, oplog.CLIFlags(envVarPrefix)...)
optionalFlags = append(optionalFlags, opmetrics.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, opmetrics.CLIFlags(envVarPrefix)...)
optionalFlags = append(optionalFlags, oppprof.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, oppprof.CLIFlags(envVarPrefix)...)
optionalFlags = append(optionalFlags, opsigner.CLIFlags(envVarPrefix)...) optionalFlags = append(optionalFlags, txmgr.CLIFlags(envVarPrefix)...)
Flags = append(requiredFlags, optionalFlags...) Flags = append(requiredFlags, optionalFlags...)
} }
......
...@@ -10,13 +10,11 @@ import ( ...@@ -10,13 +10,11 @@ import (
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-proposer/flags" "github.com/ethereum-optimism/optimism/op-proposer/flags"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
opsigner "github.com/ethereum-optimism/optimism/op-signer/client"
) )
// Config contains the well typed fields that are used to initialize the output submitter. // Config contains the well typed fields that are used to initialize the output submitter.
...@@ -24,12 +22,10 @@ import ( ...@@ -24,12 +22,10 @@ import (
type Config struct { type Config struct {
L2OutputOracleAddr common.Address L2OutputOracleAddr common.Address
PollInterval time.Duration PollInterval time.Duration
TxManagerConfig txmgr.Config TxManager txmgr.TxManager
L1Client *ethclient.Client L1Client *ethclient.Client
RollupClient *sources.RollupClient RollupClient *sources.RollupClient
AllowNonFinalized bool AllowNonFinalized bool
From common.Address
SignerFnFactory opcrypto.SignerFactory
} }
// CLIConfig is a well typed config that is parsed from the CLI params. // CLIConfig is a well typed config that is parsed from the CLI params.
...@@ -51,47 +47,19 @@ type CLIConfig struct { ...@@ -51,47 +47,19 @@ type CLIConfig struct {
// and creating a new batch. // and creating a new batch.
PollInterval time.Duration PollInterval time.Duration
// NumConfirmations is the number of confirmations which we will wait after
// appending new batches.
NumConfirmations uint64
// SafeAbortNonceTooLowCount is the number of ErrNonceTooLowObservations
// required to give up on a tx at a particular nonce without receiving
// confirmation.
SafeAbortNonceTooLowCount uint64
// ResubmissionTimeout is time we will wait before resubmitting a
// transaction.
ResubmissionTimeout time.Duration
// Mnemonic is the HD seed used to derive the wallet private keys for both
// the sequence and proposer. Must be used in conjunction with
// SequencerHDPath and ProposerHDPath.
Mnemonic string
// L2OutputHDPath is the derivation path used to obtain the private key for
// the l2output transactions.
L2OutputHDPath string
// PrivateKey is the private key used for l2output transactions.
PrivateKey string
RPCConfig oprpc.CLIConfig
/* Optional Params */
// AllowNonFinalized can be set to true to propose outputs // AllowNonFinalized can be set to true to propose outputs
// for L2 blocks derived from non-finalized L1 data. // for L2 blocks derived from non-finalized L1 data.
AllowNonFinalized bool AllowNonFinalized bool
TxMgrConfig txmgr.CLIConfig
RPCConfig oprpc.CLIConfig
LogConfig oplog.CLIConfig LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig MetricsConfig opmetrics.CLIConfig
PprofConfig oppprof.CLIConfig PprofConfig oppprof.CLIConfig
// SignerConfig contains the client config for op-signer service
SignerConfig opsigner.CLIConfig
} }
func (c CLIConfig) Check() error { func (c CLIConfig) Check() error {
...@@ -107,7 +75,7 @@ func (c CLIConfig) Check() error { ...@@ -107,7 +75,7 @@ func (c CLIConfig) Check() error {
if err := c.PprofConfig.Check(); err != nil { if err := c.PprofConfig.Check(); err != nil {
return err return err
} }
if err := c.SignerConfig.Check(); err != nil { if err := c.TxMgrConfig.Check(); err != nil {
return err return err
} }
return nil return nil
...@@ -117,22 +85,16 @@ func (c CLIConfig) Check() error { ...@@ -117,22 +85,16 @@ func (c CLIConfig) Check() error {
func NewConfig(ctx *cli.Context) CLIConfig { func NewConfig(ctx *cli.Context) CLIConfig {
return CLIConfig{ return CLIConfig{
// Required Flags // Required Flags
L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name), L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name),
RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name), RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name),
L2OOAddress: ctx.GlobalString(flags.L2OOAddressFlag.Name), L2OOAddress: ctx.GlobalString(flags.L2OOAddressFlag.Name),
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name), PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
NumConfirmations: ctx.GlobalUint64(flags.NumConfirmationsFlag.Name), TxMgrConfig: txmgr.ReadCLIConfig(ctx),
SafeAbortNonceTooLowCount: ctx.GlobalUint64(flags.SafeAbortNonceTooLowCountFlag.Name),
ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name),
Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name),
L2OutputHDPath: ctx.GlobalString(flags.L2OutputHDPathFlag.Name),
PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name),
// Optional Flags // Optional Flags
AllowNonFinalized: ctx.GlobalBool(flags.AllowNonFinalizedFlag.Name), AllowNonFinalized: ctx.GlobalBool(flags.AllowNonFinalizedFlag.Name),
RPCConfig: oprpc.ReadCLIConfig(ctx), RPCConfig: oprpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx), LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx), MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx), PprofConfig: oppprof.ReadCLIConfig(ctx),
SignerConfig: opsigner.ReadCLIConfig(ctx),
} }
} }
...@@ -15,7 +15,6 @@ import ( ...@@ -15,7 +15,6 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli" "github.com/urfave/cli"
...@@ -23,7 +22,6 @@ import ( ...@@ -23,7 +22,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-proposer/metrics" "github.com/ethereum-optimism/optimism/op-proposer/metrics"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
...@@ -50,7 +48,13 @@ func Main(version string, cliCtx *cli.Context) error { ...@@ -50,7 +48,13 @@ func Main(version string, cliCtx *cli.Context) error {
m := metrics.NewMetrics("default") m := metrics.NewMetrics("default")
l.Info("Initializing L2 Output Submitter") l.Info("Initializing L2 Output Submitter")
l2OutputSubmitter, err := NewL2OutputSubmitterFromCLIConfig(cfg, l, m) proposerConfig, err := NewL2OutputSubmitterConfigFromCLIConfig(cfg, l)
if err != nil {
l.Error("Unable to create the L2 Output Submitter", "error", err)
return err
}
l2OutputSubmitter, err := NewL2OutputSubmitter(*proposerConfig, l, m)
if err != nil { if err != nil {
l.Error("Unable to create the L2 Output Submitter", "error", err) l.Error("Unable to create the L2 Output Submitter", "error", err)
return err return err
...@@ -58,7 +62,6 @@ func Main(version string, cliCtx *cli.Context) error { ...@@ -58,7 +62,6 @@ func Main(version string, cliCtx *cli.Context) error {
l.Info("Starting L2 Output Submitter") l.Info("Starting L2 Output Submitter")
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
if err := l2OutputSubmitter.Start(); err != nil { if err := l2OutputSubmitter.Start(); err != nil {
cancel() cancel()
l.Error("Unable to start L2 Output Submitter", "error", err) l.Error("Unable to start L2 Output Submitter", "error", err)
...@@ -85,7 +88,7 @@ func Main(version string, cliCtx *cli.Context) error { ...@@ -85,7 +88,7 @@ func Main(version string, cliCtx *cli.Context) error {
l.Error("error starting metrics server", err) l.Error("error starting metrics server", err)
} }
}() }()
m.StartBalanceMetrics(ctx, l, l2OutputSubmitter.l1Client, l2OutputSubmitter.from) m.StartBalanceMetrics(ctx, l, proposerConfig.L1Client, proposerConfig.TxManager.From())
} }
rpcCfg := cfg.RPCConfig rpcCfg := cfg.RPCConfig
...@@ -122,8 +125,6 @@ type L2OutputSubmitter struct { ...@@ -122,8 +125,6 @@ type L2OutputSubmitter struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
// l1Client is retained to make it easier to start the metrics balance check
l1Client *ethclient.Client
// RollupClient is used to retrieve output roots from // RollupClient is used to retrieve output roots from
rollupClient *sources.RollupClient rollupClient *sources.RollupClient
...@@ -136,85 +137,66 @@ type L2OutputSubmitter struct { ...@@ -136,85 +137,66 @@ type L2OutputSubmitter struct {
// is never valid on an alternative L1 chain that would produce different L2 data. // is never valid on an alternative L1 chain that would produce different L2 data.
// This option is not necessary when higher proposal latency is acceptable and L1 is healthy. // This option is not necessary when higher proposal latency is acceptable and L1 is healthy.
allowNonFinalized bool allowNonFinalized bool
// From is the address to send transactions from
from common.Address
// How frequently to poll L2 for new finalized outputs // How frequently to poll L2 for new finalized outputs
pollInterval time.Duration pollInterval time.Duration
} }
// NewL2OutputSubmitterFromCLIConfig creates a new L2 Output Submitter given the CLI Config // NewL2OutputSubmitterFromCLIConfig creates a new L2 Output Submitter given the CLI Config
func NewL2OutputSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metricer) (*L2OutputSubmitter, error) { func NewL2OutputSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metricer) (*L2OutputSubmitter, error) {
signer, fromAddress, err := opcrypto.SignerFactoryFromConfig(l, cfg.PrivateKey, cfg.Mnemonic, cfg.L2OutputHDPath, cfg.SignerConfig) proposerConfig, err := NewL2OutputSubmitterConfigFromCLIConfig(cfg, l)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewL2OutputSubmitter(*proposerConfig, l, m)
}
// NewL2OutputSubmitterConfigFromCLIConfig creates the proposer config from the CLI config.
func NewL2OutputSubmitterConfigFromCLIConfig(cfg CLIConfig, l log.Logger) (*Config, error) {
l2ooAddress, err := parseAddress(cfg.L2OOAddress) l2ooAddress, err := parseAddress(cfg.L2OOAddress)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Connect to L1 and L2 providers. Perform these last since they are the most expensive. txManagerConfig, err := txmgr.NewConfig(cfg.TxMgrConfig, l)
ctx := context.Background()
l1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
txManager := txmgr.NewSimpleTxManager("proposer", l, txManagerConfig)
rollupClient, err := dialRollupClientWithTimeout(ctx, cfg.RollupRpc) // Connect to L1 and L2 providers. Perform these last since they are the most expensive.
ctx := context.Background()
l1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
chainID, err := l1Client.ChainID(context.Background()) rollupClient, err := dialRollupClientWithTimeout(ctx, cfg.RollupRpc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
txMgrConfg := txmgr.Config{ return &Config{
ResubmissionTimeout: cfg.ResubmissionTimeout,
ReceiptQueryInterval: time.Second,
NumConfirmations: cfg.NumConfirmations,
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
From: fromAddress,
ChainID: chainID,
}
proposerCfg := Config{
L2OutputOracleAddr: l2ooAddress, L2OutputOracleAddr: l2ooAddress,
PollInterval: cfg.PollInterval, PollInterval: cfg.PollInterval,
TxManagerConfig: txMgrConfg,
L1Client: l1Client, L1Client: l1Client,
RollupClient: rollupClient, RollupClient: rollupClient,
AllowNonFinalized: cfg.AllowNonFinalized, AllowNonFinalized: cfg.AllowNonFinalized,
From: fromAddress, TxManager: txManager,
SignerFnFactory: signer, }, nil
}
return NewL2OutputSubmitter(proposerCfg, l, m)
} }
// NewL2OutputSubmitter creates a new L2 Output Submitter // NewL2OutputSubmitter creates a new L2 Output Submitter
func NewL2OutputSubmitter(cfg Config, l log.Logger, m metrics.Metricer) (*L2OutputSubmitter, error) { func NewL2OutputSubmitter(cfg Config, l log.Logger, m metrics.Metricer) (*L2OutputSubmitter, error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cCtx, cCancel := context.WithTimeout(ctx, defaultDialTimeout)
chainID, err := cfg.L1Client.ChainID(cCtx)
cCancel()
if err != nil {
cancel()
return nil, err
}
signer := cfg.SignerFnFactory(chainID)
cfg.TxManagerConfig.Signer = signer
l2ooContract, err := bindings.NewL2OutputOracleCaller(cfg.L2OutputOracleAddr, cfg.L1Client) l2ooContract, err := bindings.NewL2OutputOracleCaller(cfg.L2OutputOracleAddr, cfg.L1Client)
if err != nil { if err != nil {
cancel() cancel()
return nil, err return nil, err
} }
cCtx, cCancel = context.WithTimeout(ctx, defaultDialTimeout) cCtx, cCancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cCancel() defer cCancel()
version, err := l2ooContract.Version(&bind.CallOpts{Context: cCtx}) version, err := l2ooContract.Version(&bind.CallOpts{Context: cCtx})
if err != nil { if err != nil {
...@@ -230,14 +212,13 @@ func NewL2OutputSubmitter(cfg Config, l log.Logger, m metrics.Metricer) (*L2Outp ...@@ -230,14 +212,13 @@ func NewL2OutputSubmitter(cfg Config, l log.Logger, m metrics.Metricer) (*L2Outp
} }
return &L2OutputSubmitter{ return &L2OutputSubmitter{
txMgr: txmgr.NewSimpleTxManager("proposer", l, cfg.TxManagerConfig, cfg.L1Client), txMgr: cfg.TxManager,
done: make(chan struct{}), done: make(chan struct{}),
log: l, log: l,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
metr: m, metr: m,
l1Client: cfg.L1Client,
rollupClient: cfg.RollupClient, rollupClient: cfg.RollupClient,
l2ooContract: l2ooContract, l2ooContract: l2ooContract,
...@@ -245,7 +226,6 @@ func NewL2OutputSubmitter(cfg Config, l log.Logger, m metrics.Metricer) (*L2Outp ...@@ -245,7 +226,6 @@ func NewL2OutputSubmitter(cfg Config, l log.Logger, m metrics.Metricer) (*L2Outp
l2ooABI: parsed, l2ooABI: parsed,
allowNonFinalized: cfg.AllowNonFinalized, allowNonFinalized: cfg.AllowNonFinalized,
from: cfg.From,
pollInterval: cfg.PollInterval, pollInterval: cfg.PollInterval,
}, nil }, nil
} }
...@@ -268,7 +248,7 @@ func (l *L2OutputSubmitter) FetchNextOutputInfo(ctx context.Context) (*eth.Outpu ...@@ -268,7 +248,7 @@ func (l *L2OutputSubmitter) FetchNextOutputInfo(ctx context.Context) (*eth.Outpu
cCtx, cancel := context.WithTimeout(ctx, defaultDialTimeout) cCtx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel() defer cancel()
callOpts := &bind.CallOpts{ callOpts := &bind.CallOpts{
From: l.from, From: l.txMgr.From(),
Context: cCtx, Context: cCtx,
} }
nextCheckpointBlock, err := l.l2ooContract.NextBlockNumber(callOpts) nextCheckpointBlock, err := l.l2ooContract.NextBlockNumber(callOpts)
...@@ -354,7 +334,7 @@ func (l *L2OutputSubmitter) sendTransaction(ctx context.Context, output *eth.Out ...@@ -354,7 +334,7 @@ func (l *L2OutputSubmitter) sendTransaction(ctx context.Context, output *eth.Out
TxData: data, TxData: data,
To: l.l2ooContractAddr, To: l.l2ooContractAddr,
GasLimit: 0, GasLimit: 0,
From: l.from, From: l.txMgr.From(),
}) })
if err != nil { if err != nil {
return err return err
......
package txmgr
import (
"context"
"errors"
"math/big"
"time"
opservice "github.com/ethereum-optimism/optimism/op-service"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
"github.com/ethereum-optimism/optimism/op-signer/client"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli"
)
const (
// Duplicated L1 RPC flag
L1RPCFlagName = "l1-eth-rpc"
// Key Management Flags (also have op-signer client flags)
MnemonicFlagName = "mnemonic"
HDPathFlagName = "hd-path"
PrivateKeyFlagName = "private-key"
// Legacy TxMgr Flags
NumConfirmationsFlagName = "num-confirmations"
SafeAbortNonceTooLowCountFlagName = "safe-abort-nonce-too-low-count"
ResubmissionTimeoutFlagName = "resubmission-timeout"
)
var (
SequencerHDPathFlag = cli.StringFlag{
Name: "sequencer-hd-path",
Usage: "DEPRECATED: The HD path used to derive the sequencer wallet from the " +
"mnemonic. The mnemonic flag must also be set.",
EnvVar: "OP_BATCHER_SEQUENCER_HD_PATH",
}
L2OutputHDPathFlag = cli.StringFlag{
Name: "l2-output-hd-path",
Usage: "DEPRECATED:The HD path used to derive the l2output wallet from the " +
"mnemonic. The mnemonic flag must also be set.",
EnvVar: "OP_PROPOSER_L2_OUTPUT_HD_PATH",
}
)
func CLIFlags(envPrefix string) []cli.Flag {
return append([]cli.Flag{
cli.StringFlag{
Name: MnemonicFlagName,
Usage: "The mnemonic used to derive the wallets for either the service",
EnvVar: opservice.PrefixEnvVar(envPrefix, "MNEMONIC"),
},
cli.StringFlag{
Name: HDPathFlagName,
Usage: "The HD path used to derive the sequencer wallet from the mnemonic. The mnemonic flag must also be set.",
EnvVar: opservice.PrefixEnvVar(envPrefix, "HD_PATH"),
},
SequencerHDPathFlag,
L2OutputHDPathFlag,
cli.StringFlag{
Name: "private-key",
Usage: "The private key to use with the service. Must not be used with mnemonic.",
EnvVar: opservice.PrefixEnvVar(envPrefix, "PRIVATE_KEY"),
},
cli.Uint64Flag{
Name: NumConfirmationsFlagName,
Usage: "Number of confirmations which we will wait after sending a transaction",
Value: 10,
EnvVar: opservice.PrefixEnvVar(envPrefix, "NUM_CONFIRMATIONS"),
},
cli.Uint64Flag{
Name: "safe-abort-nonce-too-low-count",
Usage: "Number of ErrNonceTooLow observations required to give up on a tx at a particular nonce without receiving confirmation",
Value: 3,
EnvVar: opservice.PrefixEnvVar(envPrefix, "SAFE_ABORT_NONCE_TOO_LOW_COUNT"),
},
cli.DurationFlag{
Name: "resubmission-timeout",
Usage: "Duration we will wait before resubmitting a transaction to L1",
Value: 30 * time.Second,
EnvVar: opservice.PrefixEnvVar(envPrefix, "RESUBMISSION_TIMEOUT"),
},
}, client.CLIFlags(envPrefix)...)
}
type CLIConfig struct {
L1RPCURL string
Mnemonic string
HDPath string
SequencerHDPath string
L2OutputHDPath string
PrivateKey string
SignerCLIConfig client.CLIConfig
NumConfirmations uint64
SafeAbortNonceTooLowCount uint64
ResubmissionTimeout time.Duration
ReceiptQueryInterval time.Duration
}
func (m CLIConfig) Check() error {
if m.L1RPCURL == "" {
return errors.New("must provide a L1 RPC url")
}
if m.NumConfirmations == 0 {
return errors.New("num confirmations must not be 0")
}
if err := m.SignerCLIConfig.Check(); err != nil {
return err
}
return nil
}
func ReadCLIConfig(ctx *cli.Context) CLIConfig {
return CLIConfig{
L1RPCURL: ctx.GlobalString(L1RPCFlagName),
Mnemonic: ctx.GlobalString(MnemonicFlagName),
HDPath: ctx.GlobalString(HDPathFlagName),
SequencerHDPath: ctx.GlobalString(SequencerHDPathFlag.Name),
L2OutputHDPath: ctx.GlobalString(L2OutputHDPathFlag.Name),
PrivateKey: ctx.GlobalString(PrivateKeyFlagName),
SignerCLIConfig: client.ReadCLIConfig(ctx),
NumConfirmations: ctx.GlobalUint64(NumConfirmationsFlagName),
SafeAbortNonceTooLowCount: ctx.GlobalUint64(SafeAbortNonceTooLowCountFlagName),
ResubmissionTimeout: ctx.GlobalDuration(ResubmissionTimeoutFlagName),
}
}
func NewConfig(cfg CLIConfig, l log.Logger) (Config, error) {
if err := cfg.Check(); err != nil {
return Config{}, err
}
networkTimeout := 2 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), networkTimeout)
defer cancel()
l1, err := ethclient.DialContext(ctx, cfg.L1RPCURL)
if err != nil {
return Config{}, err
}
ctx, cancel = context.WithTimeout(context.Background(), networkTimeout)
defer cancel()
chainID, err := l1.ChainID(ctx)
if err != nil {
return Config{}, err
}
hdPath := cfg.HDPath
if hdPath == "" && cfg.SequencerHDPath != "" {
hdPath = cfg.SequencerHDPath
} else if hdPath == "" && cfg.L2OutputHDPath != "" {
hdPath = cfg.L2OutputHDPath
}
signerFactory, from, err := opcrypto.SignerFactoryFromConfig(l, cfg.PrivateKey, cfg.Mnemonic, hdPath, cfg.SignerCLIConfig)
if err != nil {
return Config{}, err
}
receiptQueryInterval := 30 * time.Second
if cfg.ReceiptQueryInterval != 0 {
receiptQueryInterval = cfg.ReceiptQueryInterval
}
return Config{
Backend: l1,
ResubmissionTimeout: cfg.ResubmissionTimeout,
ChainID: chainID,
NetworkTimeout: networkTimeout,
ReceiptQueryInterval: receiptQueryInterval,
NumConfirmations: cfg.NumConfirmations,
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
Signer: signerFactory(chainID),
From: from,
}, nil
}
// Config houses parameters for altering the behavior of a SimpleTxManager.
type Config struct {
Backend ETHBackend
// ResubmissionTimeout is the interval at which, if no previously
// published transaction has been mined, the new tx with a bumped gas
// price will be published. Only one publication at MaxGasPrice will be
// attempted.
ResubmissionTimeout time.Duration
// ChainID is the chain ID of the L1 chain.
ChainID *big.Int
// NetworkTimeout is the allowed duration for a single network request.
// This is intended to be used for network requests that can be replayed.
//
// If not set, this will default to 2 seconds.
NetworkTimeout time.Duration
// RequireQueryInterval is the interval at which the tx manager will
// query the backend to check for confirmations after a tx at a
// specific gas price has been published.
ReceiptQueryInterval time.Duration
// NumConfirmations specifies how many blocks are need to consider a
// transaction confirmed.
NumConfirmations uint64
// SafeAbortNonceTooLowCount specifies how many ErrNonceTooLow observations
// are required to give up on a tx at a particular nonce without receiving
// confirmation.
SafeAbortNonceTooLowCount uint64
// Signer is used to sign transactions when the gas price is increased.
Signer opcrypto.SignerFn
From common.Address
}
...@@ -5,9 +5,12 @@ package mocks ...@@ -5,9 +5,12 @@ package mocks
import ( import (
context "context" context "context"
txmgr "github.com/ethereum-optimism/optimism/op-service/txmgr" common "github.com/ethereum/go-ethereum/common"
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
txmgr "github.com/ethereum-optimism/optimism/op-service/txmgr"
types "github.com/ethereum/go-ethereum/core/types" types "github.com/ethereum/go-ethereum/core/types"
) )
...@@ -16,6 +19,22 @@ type TxManager struct { ...@@ -16,6 +19,22 @@ type TxManager struct {
mock.Mock mock.Mock
} }
// From provides a mock function with given fields:
func (_m *TxManager) From() common.Address {
ret := _m.Called()
var r0 common.Address
if rf, ok := ret.Get(0).(func() common.Address); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(common.Address)
}
}
return r0
}
// Send provides a mock function with given fields: ctx, candidate // Send provides a mock function with given fields: ctx, candidate
func (_m *TxManager) Send(ctx context.Context, candidate txmgr.TxCandidate) (*types.Receipt, error) { func (_m *TxManager) Send(ctx context.Context, candidate txmgr.TxCandidate) (*types.Receipt, error) {
ret := _m.Called(ctx, candidate) ret := _m.Called(ctx, candidate)
......
...@@ -20,7 +20,7 @@ type priceBumpTest struct { ...@@ -20,7 +20,7 @@ type priceBumpTest struct {
} }
func (tc *priceBumpTest) run(t *testing.T) { func (tc *priceBumpTest) run(t *testing.T) {
prevFC := CalcGasFeeCap(big.NewInt(tc.prevBasefee), big.NewInt(tc.prevGasTip)) prevFC := calcGasFeeCap(big.NewInt(tc.prevBasefee), big.NewInt(tc.prevGasTip))
lgr := testlog.Logger(t, log.LvlCrit) lgr := testlog.Logger(t, log.LvlCrit)
tip, fc := updateFees(big.NewInt(tc.prevGasTip), prevFC, big.NewInt(tc.newGasTip), big.NewInt(tc.newBasefee), lgr) tip, fc := updateFees(big.NewInt(tc.prevGasTip), prevFC, big.NewInt(tc.newGasTip), big.NewInt(tc.newBasefee), lgr)
......
This diff is collapsed.
...@@ -18,6 +18,8 @@ import ( ...@@ -18,6 +18,8 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
type sendTransactionFunc func(ctx context.Context, tx *types.Transaction) error
// testHarness houses the necessary resources to test the SimpleTxManager. // testHarness houses the necessary resources to test the SimpleTxManager.
type testHarness struct { type testHarness struct {
cfg Config cfg Config
...@@ -31,7 +33,8 @@ type testHarness struct { ...@@ -31,7 +33,8 @@ type testHarness struct {
func newTestHarnessWithConfig(t *testing.T, cfg Config) *testHarness { func newTestHarnessWithConfig(t *testing.T, cfg Config) *testHarness {
g := newGasPricer(3) g := newGasPricer(3)
backend := newMockBackend(g) backend := newMockBackend(g)
mgr := NewSimpleTxManager("TEST", testlog.Logger(t, log.LvlCrit), cfg, backend) cfg.Backend = backend
mgr := NewSimpleTxManager("TEST", testlog.Logger(t, log.LvlCrit), cfg)
return &testHarness{ return &testHarness{
cfg: cfg, cfg: cfg,
...@@ -100,7 +103,7 @@ func (g *gasPricer) shouldMine(gasFeeCap *big.Int) bool { ...@@ -100,7 +103,7 @@ func (g *gasPricer) shouldMine(gasFeeCap *big.Int) bool {
func (g *gasPricer) feesForEpoch(epoch int64) (*big.Int, *big.Int) { func (g *gasPricer) feesForEpoch(epoch int64) (*big.Int, *big.Int) {
epochBaseFee := new(big.Int).Mul(g.baseBaseFee, big.NewInt(epoch)) epochBaseFee := new(big.Int).Mul(g.baseBaseFee, big.NewInt(epoch))
epochGasTipCap := new(big.Int).Mul(g.baseGasTipFee, big.NewInt(epoch)) epochGasTipCap := new(big.Int).Mul(g.baseGasTipFee, big.NewInt(epoch))
epochGasFeeCap := CalcGasFeeCap(epochBaseFee, epochGasTipCap) epochGasFeeCap := calcGasFeeCap(epochBaseFee, epochGasTipCap)
return epochGasTipCap, epochGasFeeCap return epochGasTipCap, epochGasFeeCap
} }
...@@ -132,7 +135,7 @@ type mockBackend struct { ...@@ -132,7 +135,7 @@ type mockBackend struct {
mu sync.RWMutex mu sync.RWMutex
g *gasPricer g *gasPricer
send SendTransactionFunc send sendTransactionFunc
// blockHeight tracks the current height of the chain. // blockHeight tracks the current height of the chain.
blockHeight uint64 blockHeight uint64
...@@ -149,8 +152,8 @@ func newMockBackend(g *gasPricer) *mockBackend { ...@@ -149,8 +152,8 @@ func newMockBackend(g *gasPricer) *mockBackend {
} }
} }
// setTxSender sets the implementation for the SendTransactionFunction // setTxSender sets the implementation for the sendTransactionFunction
func (b *mockBackend) setTxSender(s SendTransactionFunc) { func (b *mockBackend) setTxSender(s sendTransactionFunc) {
b.send = s b.send = s
} }
...@@ -204,6 +207,10 @@ func (b *mockBackend) NonceAt(ctx context.Context, account common.Address, block ...@@ -204,6 +207,10 @@ func (b *mockBackend) NonceAt(ctx context.Context, account common.Address, block
return 0, nil return 0, nil
} }
func (b *mockBackend) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
return 0, nil
}
func (*mockBackend) ChainID(ctx context.Context) (*big.Int, error) { func (*mockBackend) ChainID(ctx context.Context) (*big.Int, error) {
return big.NewInt(1), nil return big.NewInt(1), nil
} }
...@@ -650,6 +657,10 @@ func (b *failingBackend) NonceAt(_ context.Context, _ common.Address, _ *big.Int ...@@ -650,6 +657,10 @@ func (b *failingBackend) NonceAt(_ context.Context, _ common.Address, _ *big.Int
return 0, errors.New("unimplemented") return 0, errors.New("unimplemented")
} }
func (b *failingBackend) PendingNonceAt(_ context.Context, _ common.Address) (uint64, error) {
return 0, errors.New("unimplemented")
}
func (b *failingBackend) ChainID(ctx context.Context) (*big.Int, error) { func (b *failingBackend) ChainID(ctx context.Context) (*big.Int, error) {
return nil, errors.New("unimplemented") return nil, errors.New("unimplemented")
} }
...@@ -663,7 +674,7 @@ func TestWaitMinedReturnsReceiptAfterFailure(t *testing.T) { ...@@ -663,7 +674,7 @@ func TestWaitMinedReturnsReceiptAfterFailure(t *testing.T) {
var borkedBackend failingBackend var borkedBackend failingBackend
mgr := &SimpleTxManager{ mgr := &SimpleTxManager{
Config: Config{ cfg: Config{
ResubmissionTimeout: time.Second, ResubmissionTimeout: time.Second,
ReceiptQueryInterval: 50 * time.Millisecond, ReceiptQueryInterval: 50 * time.Millisecond,
NumConfirmations: 1, NumConfirmations: 1,
...@@ -694,7 +705,7 @@ func doGasPriceIncrease(t *testing.T, txTipCap, txFeeCap, newTip, newBaseFee int ...@@ -694,7 +705,7 @@ func doGasPriceIncrease(t *testing.T, txTipCap, txFeeCap, newTip, newBaseFee int
} }
mgr := &SimpleTxManager{ mgr := &SimpleTxManager{
Config: Config{ cfg: Config{
ResubmissionTimeout: time.Second, ResubmissionTimeout: time.Second,
ReceiptQueryInterval: 50 * time.Millisecond, ReceiptQueryInterval: 50 * time.Millisecond,
NumConfirmations: 1, NumConfirmations: 1,
...@@ -795,10 +806,10 @@ func TestIncreaseGasPriceNotExponential(t *testing.T) { ...@@ -795,10 +806,10 @@ func TestIncreaseGasPriceNotExponential(t *testing.T) {
gasTip: big.NewInt(10), gasTip: big.NewInt(10),
baseFee: big.NewInt(45), baseFee: big.NewInt(45),
} }
feeCap := CalcGasFeeCap(borkedBackend.baseFee, borkedBackend.gasTip) feeCap := calcGasFeeCap(borkedBackend.baseFee, borkedBackend.gasTip)
mgr := &SimpleTxManager{ mgr := &SimpleTxManager{
Config: Config{ cfg: Config{
ResubmissionTimeout: time.Second, ResubmissionTimeout: time.Second,
ReceiptQueryInterval: 50 * time.Millisecond, ReceiptQueryInterval: 50 * time.Millisecond,
NumConfirmations: 1, NumConfirmations: 1,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment