Commit 8a64179f authored by Michael de Hoog's avatar Michael de Hoog

Add concurrent sends to batcher

parent 392b8872
...@@ -26,8 +26,9 @@ type Config struct { ...@@ -26,8 +26,9 @@ type Config struct {
RollupNode *sources.RollupClient RollupNode *sources.RollupClient
TxManager txmgr.TxManager TxManager txmgr.TxManager
NetworkTimeout time.Duration NetworkTimeout time.Duration
PollInterval time.Duration PollInterval time.Duration
MaxPendingTransactions uint64
// RollupConfig is queried at startup // RollupConfig is queried at startup
Rollup *rollup.Config Rollup *rollup.Config
...@@ -76,6 +77,10 @@ type CLIConfig struct { ...@@ -76,6 +77,10 @@ type CLIConfig struct {
// and creating a new batch. // and creating a new batch.
PollInterval time.Duration PollInterval time.Duration
// MaxPendingTransactions is the maximum number of concurrent pending
// transactions sent to the transaction manager.
MaxPendingTransactions uint64
// MaxL1TxSize is the maximum size of a batch tx submitted to L1. // MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64 MaxL1TxSize uint64
...@@ -128,16 +133,17 @@ func NewConfig(ctx *cli.Context) CLIConfig { ...@@ -128,16 +133,17 @@ func NewConfig(ctx *cli.Context) CLIConfig {
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name), PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
/* Optional Flags */ /* Optional Flags */
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name), MaxPendingTransactions: ctx.GlobalUint64(flags.MaxPendingTransactionsFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name), MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name), MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name), TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name), TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name), ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx), Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
RPCConfig: rpc.ReadCLIConfig(ctx), TxMgrConfig: txmgr.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx), RPCConfig: rpc.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx), LogConfig: oplog.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx), MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
} }
} }
...@@ -75,13 +75,14 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -75,13 +75,14 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
} }
batcherCfg := Config{ batcherCfg := Config{
L1Client: l1Client, L1Client: l1Client,
L2Client: l2Client, L2Client: l2Client,
RollupNode: rollupClient, RollupNode: rollupClient,
PollInterval: cfg.PollInterval, PollInterval: cfg.PollInterval,
NetworkTimeout: cfg.TxMgrConfig.NetworkTimeout, MaxPendingTransactions: cfg.MaxPendingTransactions,
TxManager: txManager, NetworkTimeout: cfg.TxMgrConfig.NetworkTimeout,
Rollup: rcfg, TxManager: txManager,
Rollup: rcfg,
Channel: ChannelConfig{ Channel: ChannelConfig{
SeqWindowSize: rcfg.SeqWindowSize, SeqWindowSize: rcfg.SeqWindowSize,
ChannelTimeout: rcfg.ChannelTimeout, ChannelTimeout: rcfg.ChannelTimeout,
...@@ -301,6 +302,11 @@ func (l *BatchSubmitter) loop() { ...@@ -301,6 +302,11 @@ func (l *BatchSubmitter) loop() {
// publishStateToL1 loops through the block data loaded into `state` and // publishStateToL1 loops through the block data loaded into `state` and
// submits the associated data to the L1 in the form of channel frames. // submits the associated data to the L1 in the form of channel frames.
func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
maxPending := l.MaxPendingTransactions
if maxPending == 0 {
maxPending = 1<<64 - 1
}
for { for {
// Attempt to gracefully terminate the current channel, ensuring that no new frames will be // Attempt to gracefully terminate the current channel, ensuring that no new frames will be
// produced. Any remaining frames must still be published to the L1 to prevent stalling. // produced. Any remaining frames must still be published to the L1 to prevent stalling.
...@@ -326,20 +332,29 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) { ...@@ -326,20 +332,29 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
l.recordL1Tip(l1tip) l.recordL1Tip(l1tip)
// Collect next transaction data // Collect next transaction data
txdata, err := l.state.TxData(l1tip.ID()) var wg sync.WaitGroup
if err == io.EOF { for i := uint64(0); i < maxPending; i++ {
l.log.Trace("no transaction data available") var txdata txData
break txdata, err = l.state.TxData(l1tip.ID())
} else if err != nil { if err == io.EOF {
l.log.Error("unable to get tx data", "err", err) l.log.Trace("no transaction data available")
break break
} } else if err != nil {
// Record TX Status l.log.Error("unable to get tx data", "err", err)
if receipt, err := l.sendTransaction(ctx, txdata.Bytes()); err != nil { break
l.recordFailedTx(txdata.ID(), err) }
} else { wg.Add(1)
l.recordConfirmedTx(txdata.ID(), receipt) go func() {
defer wg.Done()
// Record TX Status
if receipt, err := l.sendTransaction(ctx, txdata.Bytes()); err != nil {
l.recordFailedTx(txdata.ID(), err)
} else {
l.recordConfirmedTx(txdata.ID(), receipt)
}
}()
} }
wg.Wait()
} }
} }
......
...@@ -48,6 +48,12 @@ var ( ...@@ -48,6 +48,12 @@ var (
} }
// Optional flags // Optional flags
MaxPendingTransactionsFlag = cli.Uint64Flag{
Name: "max-pending-tx",
Usage: "The maximum number of pending transactions. 0 for no limit.",
Value: 1,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "MAX_PENDING_TX"),
}
MaxChannelDurationFlag = cli.Uint64Flag{ MaxChannelDurationFlag = cli.Uint64Flag{
Name: "max-channel-duration", Name: "max-channel-duration",
Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.", Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.",
...@@ -96,6 +102,7 @@ var requiredFlags = []cli.Flag{ ...@@ -96,6 +102,7 @@ var requiredFlags = []cli.Flag{
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
MaxPendingTransactionsFlag,
MaxChannelDurationFlag, MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag, MaxL1TxSizeBytesFlag,
TargetL1TxSizeBytesFlag, TargetL1TxSizeBytesFlag,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment