Commit a2189aab authored by Sebastian Stammler's avatar Sebastian Stammler

op-batcher: Rework batcher to use submission safety margin

- Removes the ChannelSubTimeout config flag and instead uses the SubSafetyMargin
  as a #L1-block safety margin to both, the channel timeout and sequencing
  window timeout
- Remove unused flags SequencerBatchInboxAddress and ChannelTimeout as
  these values are now queried from the rollup node.
parent e864ee9e
......@@ -17,12 +17,10 @@ type (
channelBuilder struct {
cfg ChannelConfig
// L1 block timestamp of channel timeout. 0 if no timeout set yet.
// L1 block timestamp of combined channel & sequencing window timeout. 0 if
// no timeout set yet.
timeout uint64
// sequencer window timeout block. 0 if not set yet.
swTimeoutBlock uint64
// marked as full if a) max RLP input bytes, b) max num frames or c) max
// allowed frame index (uint16) has been reached
fullErr error
......@@ -41,13 +39,10 @@ type (
// The maximum number of L1 blocks that the inclusion transactions of a
// channel's frames can span.
ChannelTimeout uint64
// ChannelSubTimeout is the maximum duration, in seconds, to attempt
// completing an opened channel. When reached, the channel is closed and all
// remaining frames are submitted. The batcher should set it shorter than
// the actual channel timeout (specified in number of L1 blocks), since
// submitting continued channel data to L1 is not instantaneous. It's not
// worth it to work with nearly timed-out channels.
ChannelSubTimeout uint64
// The batcher tx submission safety margin (in #L1-blocks) to subtract from
// a channel's timeout and sequencing window, to guarantee safe inclusion of
// a channel on L1.
SubSafetyMargin uint64
// The maximum byte-size a frame can have.
MaxFrameSize uint64
// The target number of frames to create per channel. Note that if the
......@@ -128,25 +123,23 @@ func (c *channelBuilder) Reset() error {
}
// FramePublished calculates the submission timeout of this channel from the
// given frame inclusion tx timestamp. If an older frame tx has already been
// given frame inclusion L1-block number. If an older frame tx has already been
// seen, the timeout is not updated.
func (c *channelBuilder) FramePublished(ts uint64) {
timeout := ts + c.cfg.ChannelSubTimeout
if c.timeout == 0 || c.timeout > timeout {
c.timeout = timeout
}
func (c *channelBuilder) FramePublished(l1BlockNum uint64) {
timeout := l1BlockNum + c.cfg.ChannelTimeout - c.cfg.SubSafetyMargin
c.updateTimeout(timeout)
}
// TimedOut returns whether the passed timestamp is after the channel timeout.
// If no timeout is set yet, it returns false.
func (c *channelBuilder) TimedOut(ts uint64) bool {
return c.timeout != 0 && ts >= c.timeout
// TimedOut returns whether the passed block number is after the channel timeout
// block. If no block timeout is set yet, it returns false.
func (c *channelBuilder) TimedOut(blockNum uint64) bool {
return c.timeout != 0 && blockNum >= c.timeout
}
// TriggerTimeout checks if the channel is timed out at the given timestamp and
// in this case sets the channel as full with reason ErrChannelTimedOut.
func (c *channelBuilder) TriggerTimeout(ts uint64) {
if !c.IsFull() && c.TimedOut(ts) {
func (c *channelBuilder) TriggerTimeout(blockNum uint64) {
if !c.IsFull() && c.TimedOut(blockNum) {
c.setFullErr(ErrChannelTimedOut)
}
}
......@@ -186,12 +179,20 @@ func (c *channelBuilder) AddBlock(block *types.Block) error {
return nil
}
// updateSwTimeout updates the block timeout with the sequencer window timeout
// derived from the batch's origin L1 block. The timeout is only moved forward
// if the derived sequencer window timeout is earlier than the current.
func (c *channelBuilder) updateSwTimeout(batch *derive.BatchData) {
if c.swTimeoutBlock != 0 {
return
timeout := uint64(batch.EpochNum) + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin
c.updateTimeout(timeout)
}
// updateTimeout updates the timeout block to the given block number if it is
// earlier then the current block timeout, or if it still unset.
func (c *channelBuilder) updateTimeout(timeoutBlockNum uint64) {
if c.timeout == 0 || c.timeout > timeoutBlockNum {
c.timeout = timeoutBlockNum
}
// TODO: subtract safety margin
c.swTimeoutBlock = uint64(batch.EpochNum) + c.cfg.SeqWindowSize
}
// InputTargetReached says whether the target amount of input data has been
......
......@@ -61,7 +61,7 @@ type channelManager struct {
// Set of unconfirmed txID -> frame data. For tx resubmission
pendingTransactions map[txID][]byte
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.L1BlockRef
confirmedTransactions map[txID]eth.BlockID
}
func NewChannelManager(log log.Logger, cfg ChannelConfig) *channelManager {
......@@ -69,7 +69,7 @@ func NewChannelManager(log log.Logger, cfg ChannelConfig) *channelManager {
log: log,
cfg: cfg,
pendingTransactions: make(map[txID][]byte),
confirmedTransactions: make(map[txID]eth.L1BlockRef),
confirmedTransactions: make(map[txID]eth.BlockID),
}
}
......@@ -98,7 +98,7 @@ func (s *channelManager) TxFailed(id txID) {
// a channel have been marked as confirmed on L1 the channel may be invalid & need to be
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.L1BlockRef) {
func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
s.log.Trace("marked transaction as confirmed", "id", id, "block", inclusionBlock)
if _, ok := s.pendingTransactions[id]; !ok {
s.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock)
......@@ -108,7 +108,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.L1BlockRef) {
}
delete(s.pendingTransactions, id)
s.confirmedTransactions[id] = inclusionBlock
s.pendingChannel.FramePublished(inclusionBlock.Time)
s.pendingChannel.FramePublished(inclusionBlock.Number)
// If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel.
......@@ -129,7 +129,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.L1BlockRef) {
func (s *channelManager) clearPendingChannel() {
s.pendingChannel = nil
s.pendingTransactions = make(map[txID][]byte)
s.confirmedTransactions = make(map[txID]eth.L1BlockRef)
s.confirmedTransactions = make(map[txID]eth.BlockID)
}
// pendingChannelIsTimedOut returns true if submitted channel has timed out.
......@@ -191,7 +191,7 @@ func (s *channelManager) nextTxData() ([]byte, txID, error) {
//
// It currently ignores the l1Head provided and doesn't track channel timeouts
// or the sequencer window span yet.
func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
func (s *channelManager) TxData(l1Head eth.BlockID) ([]byte, txID, error) {
dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))
......@@ -224,7 +224,7 @@ func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
return s.nextTxData()
}
func (s *channelManager) ensurePendingChannel(l1Head eth.L1BlockRef) error {
func (s *channelManager) ensurePendingChannel(l1Head eth.BlockID) error {
if s.pendingChannel != nil {
return nil
}
......@@ -239,8 +239,8 @@ func (s *channelManager) ensurePendingChannel(l1Head eth.L1BlockRef) error {
return nil
}
func (s *channelManager) triggerTimeout(l1Head eth.L1BlockRef) {
s.pendingChannel.TriggerTimeout(l1Head.Time)
func (s *channelManager) triggerTimeout(l1Head eth.BlockID) {
s.pendingChannel.TriggerTimeout(l1Head.Number)
ferr := s.pendingChannel.FullErr()
s.log.Debug("timeout triggered",
"l1Head", l1Head,
......
......@@ -49,17 +49,10 @@ type CLIConfig struct {
// RollupRpc is the HTTP provider URL for the L2 rollup node.
RollupRpc string
// The maximum number of L1 blocks that the inclusion transactions of a
// channel's frames can span.
ChannelTimeout uint64
// ChannelSubTimeout is the maximum duration, in seconds, to attempt
// completing an opened channel. When reached, the channel is closed and all
// remaining frames are submitted. The batcher should set it shorter than
// the actual channel timeout (specified in number of L1 blocks), since
// submitting continued channel data to L1 is not instantaneous. It's not
// worth it to work with nearly timed-out channels.
ChannelSubTimeout uint64
// The batcher tx submission safety margin (in #L1-blocks) to subtract from
// a channel's timeout and sequencing window, to guarantee safe inclusion of
// a channel on L1.
SubSafetyMargin uint64
// PollInterval is the delay between querying L2 for more transaction
// and creating a new batch.
......@@ -90,10 +83,6 @@ type CLIConfig struct {
// PrivateKey is the private key used to submit sequencer transactions.
PrivateKey string
// SequencerBatchInboxAddress is the address in which to send batch
// transactions.
SequencerBatchInboxAddress string
RPCConfig oprpc.CLIConfig
/* Optional Params */
......@@ -147,26 +136,24 @@ func NewConfig(ctx *cli.Context) CLIConfig {
L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name),
RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name),
ChannelTimeout: ctx.GlobalUint64(flags.ChannelTimeoutFlag.Name),
ChannelSubTimeout: ctx.GlobalUint64(flags.ChannelSubTimeoutFlag.Name),
SubSafetyMargin: ctx.GlobalUint64(flags.SubSafetyMarginFlag.Name),
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
NumConfirmations: ctx.GlobalUint64(flags.NumConfirmationsFlag.Name),
SafeAbortNonceTooLowCount: ctx.GlobalUint64(flags.SafeAbortNonceTooLowCountFlag.Name),
ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name),
/* Optional Flags */
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name),
SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name),
PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name),
SequencerBatchInboxAddress: ctx.GlobalString(flags.SequencerBatchInboxAddressFlag.Name),
RPCConfig: oprpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
SignerConfig: opsigner.ReadCLIConfig(ctx),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name),
SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name),
PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name),
RPCConfig: oprpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
SignerConfig: opsigner.ReadCLIConfig(ctx),
}
}
......@@ -89,13 +89,13 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte
SignerFnFactory: signer,
Rollup: rcfg,
Channel: ChannelConfig{
SeqWindowSize: rcfg.SeqWindowSize,
ChannelTimeout: rcfg.ChannelTimeout,
ChannelSubTimeout: cfg.ChannelSubTimeout,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
TargetFrameSize: cfg.TargetL1TxSize - 1, // subtract 1 byte for version
TargetNumFrames: cfg.TargetNumFrames,
ApproxComprRatio: cfg.ApproxComprRatio,
SeqWindowSize: rcfg.SeqWindowSize,
ChannelTimeout: rcfg.ChannelTimeout,
SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
TargetFrameSize: cfg.TargetL1TxSize - 1, // subtract 1 byte for version
TargetNumFrames: cfg.TargetNumFrames,
ApproxComprRatio: cfg.ApproxComprRatio,
},
}
......@@ -245,12 +245,12 @@ func (l *BatchSubmitter) loop() {
for {
l1tip, err := l.l1Tip(l.ctx)
if err != nil {
l.log.Error("Failed to query L1 tip")
l.log.Error("Failed to query L1 tip", "error", err)
break
}
// Collect next transaction data
data, id, err := l.state.TxData(l1tip)
data, id, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
break // local for loop
......@@ -288,20 +288,8 @@ func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) {
l.log.Info("Transaction confirmed", "tx_hash", receipt.TxHash, "status", receipt.Status, "block_hash", receipt.BlockHash, "block_number", receipt.BlockNumber)
// Unfortunately, a tx receipt doesn't include the timestamp, so we have to
// query the header.
l1ref, err := l.l1BlockRefByReceipt(l.ctx, receipt)
if err != nil {
// Very unlikely that tx sending worked but then we cannot get the
// header. Fall back to latest known L1 time to be on the safe side.
l1ref.Time = l.lastKnownL1Time
l.log.Warn("Failed to get block ref for successful batcher tx. Setting timestamp to latest know L1 block time.", "block_ref", l1ref)
} else {
l.lastKnownL1Time = l1ref.Time
}
// l1ref is guaranteed to have at least fields Hash, Number and Time set.
l.state.TxConfirmed(id, l1ref)
l1block := eth.BlockID{Number: receipt.BlockNumber.Uint64(), Hash: receipt.BlockHash}
l.state.TxConfirmed(id, l1block)
}
// l1Tip gets the current L1 tip as a L1BlockRef. The passed context is assumed
......
......@@ -34,17 +34,13 @@ var (
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ROLLUP_RPC"),
}
ChannelTimeoutFlag = cli.Uint64Flag{
Name: "channel-timeout",
Usage: "The maximum number of L1 blocks that the inclusion transactions of a channel's frames can span",
SubSafetyMarginFlag = cli.Uint64Flag{
Name: "sub-safety-margin",
Usage: "The batcher tx submission safety margin (in #L1-blocks) to subtract " +
"from a channel's timeout and sequencing window, to guarantee safe inclusion " +
"of a channel on L1.",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "CHANNEL_TIMEOUT"),
}
ChannelSubTimeoutFlag = cli.Uint64Flag{
Name: "channel-sub-timeout",
Usage: "The maximum duration (in seconds) to attempt completing an opened channel, as opposed to submitting L2 blocks into a new channel.",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "CHANNEL_SUB_TIMEOUT"),
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SUB_SAFETY_MARGIN"),
}
PollIntervalFlag = cli.DurationFlag{
Name: "poll-interval",
......@@ -75,12 +71,6 @@ var (
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "RESUBMISSION_TIMEOUT"),
}
SequencerBatchInboxAddressFlag = cli.StringFlag{
Name: "sequencer-batch-inbox-address",
Usage: "L1 Address to receive batch transactions",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SEQUENCER_BATCH_INBOX_ADDRESS"),
}
/* Optional flags */
......@@ -131,13 +121,11 @@ var requiredFlags = []cli.Flag{
L1EthRpcFlag,
L2EthRpcFlag,
RollupRpcFlag,
ChannelTimeoutFlag,
ChannelSubTimeoutFlag,
SubSafetyMarginFlag,
PollIntervalFlag,
NumConfirmationsFlag,
SafeAbortNonceTooLowCountFlag,
ResubmissionTimeoutFlag,
SequencerBatchInboxAddressFlag,
}
var optionalFlags = []cli.Flag{
......
......@@ -324,11 +324,10 @@ func TestMigration(t *testing.T) {
L2EthRpc: gethNode.WSEndpoint(),
RollupRpc: rollupNode.HTTPEndpoint(),
MaxL1TxSize: 120_000,
TargetL1TxSize: 1,
TargetL1TxSize: 624,
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
ChannelTimeout: deployCfg.ChannelTimeout,
ChannelSubTimeout: 24,
SubSafetyMargin: testSafetyMargin(deployCfg),
PollInterval: 50 * time.Millisecond,
NumConfirmations: 1,
ResubmissionTimeout: 5 * time.Second,
......@@ -337,8 +336,7 @@ func TestMigration(t *testing.T) {
Level: "info",
Format: "text",
},
PrivateKey: hexPriv(secrets.Batcher),
SequencerBatchInboxAddress: deployCfg.BatchSenderAddress.String(),
PrivateKey: hexPriv(secrets.Batcher),
}, lgr.New("module", "batcher"))
require.NoError(t, err)
t.Cleanup(func() {
......
......@@ -527,11 +527,10 @@ func (cfg SystemConfig) Start() (*System, error) {
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxL1TxSize: 120_000,
TargetL1TxSize: 1,
TargetL1TxSize: 160, //624,
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
ChannelTimeout: cfg.DeployConfig.ChannelTimeout,
ChannelSubTimeout: 24,
SubSafetyMargin: testSafetyMargin(cfg.DeployConfig),
PollInterval: 50 * time.Millisecond,
NumConfirmations: 1,
ResubmissionTimeout: 5 * time.Second,
......@@ -540,8 +539,7 @@ func (cfg SystemConfig) Start() (*System, error) {
Level: "info",
Format: "text",
},
PrivateKey: hexPriv(cfg.Secrets.Batcher),
SequencerBatchInboxAddress: cfg.DeployConfig.BatchInboxAddress.String(),
PrivateKey: hexPriv(cfg.Secrets.Batcher),
}, sys.cfg.Loggers["batcher"])
if err != nil {
return nil, fmt.Errorf("failed to setup batch submitter: %w", err)
......@@ -572,3 +570,24 @@ func hexPriv(in *ecdsa.PrivateKey) string {
b := e2eutils.EncodePrivKey(in)
return hexutil.Encode(b)
}
// returns a safety margin that heuristically leads to a short channel lifetime
// of netChannelDuration. In current testing setups, we want channels to close
// quickly to have a low latency. We don't optimize for gas consumption.
func testSafetyMargin(cfg *genesis.DeployConfig) uint64 {
// target channel duration after first frame is included on L1
const netChannelDuration = 2
// The sequencing window timeout starts from the L1 origin, whereas the
// channel timeout starts from the first L1 inclusion block of any frame.
// So to have comparable values, the sws is converted to an effective
// sequencing window from the first L1 inclusion block, assuming that L2
// blocks are quickly included on L1.
// So we subtract 1 block distance from the origin block and 1 block for
// minging the first frame.
openChannelSeqWindow := cfg.SequencerWindowSize - 2
if openChannelSeqWindow > cfg.ChannelTimeout {
return cfg.ChannelTimeout - netChannelDuration
} else {
return openChannelSeqWindow - netChannelDuration
}
}
......@@ -123,8 +123,7 @@ services:
OP_BATCHER_TARGET_L1_TX_SIZE_BYTES: 624
OP_BATCHER_TARGET_NUM_FRAMES: 1
OP_BATCHER_APPROX_COMPR_RATIO: 1.0
OP_BATCHER_CHANNEL_TIMEOUT: 40
OP_BATCHER_CHANNEL_SUB_TIMEOUT: 0
OP_BATCHER_SUB_SAFETY_MARGIN: 6 # SWS is 15, ChannelTimeout is 40
OP_BATCHER_POLL_INTERVAL: 1s
OP_BATCHER_NUM_CONFIRMATIONS: 1
OP_BATCHER_SAFE_ABORT_NONCE_TOO_LOW_COUNT: 3
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment