Commit ed9cc4cb authored by Sebastian Stammler's avatar Sebastian Stammler

op-batcher: Add channel duration tracking to the channel builder

The new channel configuration parameter MaxChannelDuration can be used
to set a max duration of L1-blocks for which a channel should be kept
open. This can be used to ensure that a channel is not kept open for too
long during times of low L2 tx volume.
parent 9fc081d3
...@@ -18,6 +18,15 @@ type ChannelConfig struct { ...@@ -18,6 +18,15 @@ type ChannelConfig struct {
// The maximum number of L1 blocks that the inclusion transactions of a // The maximum number of L1 blocks that the inclusion transactions of a
// channel's frames can span. // channel's frames can span.
ChannelTimeout uint64 ChannelTimeout uint64
// Builder Config
// MaxChannelDuration is the maximum duration (in #L1-blocks) to keep the
// channel open. This allows control over how long a channel is kept open
// during times of low transaction volume.
//
// If 0, duration checks are disabled.
MaxChannelDuration uint64
// The batcher tx submission safety margin (in #L1-blocks) to subtract from // The batcher tx submission safety margin (in #L1-blocks) to subtract from
// a channel's timeout and sequencing window, to guarantee safe inclusion of // a channel's timeout and sequencing window, to guarantee safe inclusion of
// a channel on L1. // a channel on L1.
...@@ -50,12 +59,17 @@ func (c ChannelConfig) InputThreshold() uint64 { ...@@ -50,12 +59,17 @@ func (c ChannelConfig) InputThreshold() uint64 {
type channelBuilder struct { type channelBuilder struct {
cfg ChannelConfig cfg ChannelConfig
// L1 block timestamp of combined channel & sequencing window timeout. 0 if // L1 block number timeout of combined
// no timeout set yet. // - channel duration timeout,
// - consensus channel timeout,
// - sequencing window timeout.
// 0 if no block number timeout set yet.
timeout uint64 timeout uint64
// reason for currently set timeout
timeoutReason error
// marked as full if a) max RLP input bytes, b) max num frames or c) max // Reason for the channel being full. Set by setFullErr so it's always
// allowed frame index (uint16) has been reached // guaranteed to be a ChannelFullError wrapping the specific reason.
fullErr error fullErr error
// current channel // current channel
co *derive.ChannelOut co *derive.ChannelOut
...@@ -102,28 +116,6 @@ func (c *channelBuilder) Reset() error { ...@@ -102,28 +116,6 @@ func (c *channelBuilder) Reset() error {
return c.co.Reset() return c.co.Reset()
} }
// FramePublished calculates the submission timeout of this channel from the
// given frame inclusion L1-block number. If an older frame tx has already been
// seen, the timeout is not updated.
func (c *channelBuilder) FramePublished(l1BlockNum uint64) {
timeout := l1BlockNum + c.cfg.ChannelTimeout - c.cfg.SubSafetyMargin
c.updateTimeout(timeout)
}
// TimedOut returns whether the passed block number is after the channel timeout
// block. If no block timeout is set yet, it returns false.
func (c *channelBuilder) TimedOut(blockNum uint64) bool {
return c.timeout != 0 && blockNum >= c.timeout
}
// CheckTimeout checks if the channel is timed out at the given block number and
// in this case marks the channel as full with reason ErrChannelTimedOut.
func (c *channelBuilder) CheckTimeout(blockNum uint64) {
if !c.IsFull() && c.TimedOut(blockNum) {
c.setFullErr(ErrChannelTimedOut)
}
}
// AddBlock adds a block to the channel compression pipeline. IsFull should be // AddBlock adds a block to the channel compression pipeline. IsFull should be
// called aftewards to test whether the channel is full. If full, a new channel // called aftewards to test whether the channel is full. If full, a new channel
// must be started. // must be started.
...@@ -159,22 +151,73 @@ func (c *channelBuilder) AddBlock(block *types.Block) error { ...@@ -159,22 +151,73 @@ func (c *channelBuilder) AddBlock(block *types.Block) error {
return nil return nil
} }
// Timeout management
// RegisterL1Block should be called whenever a new L1-block is seen.
//
// It ensures proper tracking of all possible timeouts (max channel duration,
// close to consensus channel timeout, close to end of sequencing window).
func (c *channelBuilder) RegisterL1Block(l1BlockNum uint64) {
c.updateDurationTimeout(l1BlockNum)
c.checkTimeout(l1BlockNum)
}
// FramePublished should be called whenever a frame of this channel got
// published with the L1-block number of the block that the frame got included
// in.
func (c *channelBuilder) FramePublished(l1BlockNum uint64) {
timeout := l1BlockNum + c.cfg.ChannelTimeout - c.cfg.SubSafetyMargin
c.updateTimeout(timeout, ErrChannelTimeoutClose)
}
// updateDurationTimeout updates the block timeout with the channel duration
// timeout derived from the given L1-block number. The timeout is only moved
// forward if the derived timeout is earlier than the currently set timeout.
//
// It does nothing if the max channel duration is set to 0.
func (c *channelBuilder) updateDurationTimeout(l1BlockNum uint64) {
if c.cfg.MaxChannelDuration == 0 {
return
}
timeout := l1BlockNum + c.cfg.MaxChannelDuration
c.updateTimeout(timeout, ErrMaxDurationReached)
}
// updateSwTimeout updates the block timeout with the sequencer window timeout // updateSwTimeout updates the block timeout with the sequencer window timeout
// derived from the batch's origin L1 block. The timeout is only moved forward // derived from the batch's origin L1 block. The timeout is only moved forward
// if the derived sequencer window timeout is earlier than the current. // if the derived sequencer window timeout is earlier than the currently set
// timeout.
func (c *channelBuilder) updateSwTimeout(batch *derive.BatchData) { func (c *channelBuilder) updateSwTimeout(batch *derive.BatchData) {
timeout := uint64(batch.EpochNum) + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin timeout := uint64(batch.EpochNum) + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin
c.updateTimeout(timeout) c.updateTimeout(timeout, ErrSeqWindowClose)
} }
// updateTimeout updates the timeout block to the given block number if it is // updateTimeout updates the timeout block to the given block number if it is
// earlier then the current block timeout, or if it still unset. // earlier than the current block timeout, or if it still unset.
func (c *channelBuilder) updateTimeout(timeoutBlockNum uint64) { //
// If the timeout is updated, the provided reason will be set as the channel
// full error reason in case the timeout is hit in the future.
func (c *channelBuilder) updateTimeout(timeoutBlockNum uint64, reason error) {
if c.timeout == 0 || c.timeout > timeoutBlockNum { if c.timeout == 0 || c.timeout > timeoutBlockNum {
c.timeout = timeoutBlockNum c.timeout = timeoutBlockNum
c.timeoutReason = reason
} }
} }
// checkTimeout checks if the channel is timed out at the given block number and
// in this case marks the channel as full, if it wasn't full alredy.
func (c *channelBuilder) checkTimeout(blockNum uint64) {
if !c.IsFull() && c.TimedOut(blockNum) {
c.setFullErr(c.timeoutReason)
}
}
// TimedOut returns whether the passed block number is after the timeout block
// number. If no block timeout is set yet, it returns false.
func (c *channelBuilder) TimedOut(blockNum uint64) bool {
return c.timeout != 0 && blockNum >= c.timeout
}
// inputTargetReached says whether the target amount of input data has been // inputTargetReached says whether the target amount of input data has been
// reached in this channel builder. No more blocks can be added afterwards. // reached in this channel builder. No more blocks can be added afterwards.
func (c *channelBuilder) inputTargetReached() bool { func (c *channelBuilder) inputTargetReached() bool {
...@@ -190,14 +233,16 @@ func (c *channelBuilder) IsFull() bool { ...@@ -190,14 +233,16 @@ func (c *channelBuilder) IsFull() bool {
// FullErr returns the reason why the channel is full. If not full yet, it // FullErr returns the reason why the channel is full. If not full yet, it
// returns nil. // returns nil.
// //
// It returns a ChannelFullError wrapping one of four possible reasons for the // It returns a ChannelFullError wrapping one of six possible reasons for the
// channel being full: // channel being full:
// - ErrInputTargetReached if the target amount of input data has been reached, // - ErrInputTargetReached if the target amount of input data has been reached,
// - derive.MaxRLPBytesPerChannel if the general maximum amount of input data // - derive.MaxRLPBytesPerChannel if the general maximum amount of input data
// would have been exceeded by the latest AddBlock call, // would have been exceeded by the latest AddBlock call,
// - ErrMaxFrameIndex if the maximum number of frames has been generated // - ErrMaxFrameIndex if the maximum number of frames has been generated
// (uint16), // (uint16),
// - ErrChannelTimedOut if the batcher channel timeout has been reached. // - ErrMaxDurationReached if the max channel duration got reached.
// - ErrChannelTimeoutClose if the consensus channel timeout got too close.
// - ErrSeqWindowClose if the end of the sequencer window got too close.
func (c *channelBuilder) FullErr() error { func (c *channelBuilder) FullErr() error {
return c.fullErr return c.fullErr
} }
...@@ -210,9 +255,9 @@ func (c *channelBuilder) setFullErr(err error) { ...@@ -210,9 +255,9 @@ func (c *channelBuilder) setFullErr(err error) {
// after AddBlock and before iterating over available frames with HasFrame and // after AddBlock and before iterating over available frames with HasFrame and
// NextFrame. // NextFrame.
// //
// If the input data target hasn't been reached yet, it will conservatively only // If the channel isn't full yet, it will conservatively only
// pull readily available frames from the compression output. // pull readily available frames from the compression output.
// If the target has been reached, the channel is closed and all remaining // If it is full, the channel is closed and all remaining
// frames will be created, possibly with a small leftover frame. // frames will be created, possibly with a small leftover frame.
func (c *channelBuilder) OutputFrames() error { func (c *channelBuilder) OutputFrames() error {
if c.IsFull() { if c.IsFull() {
...@@ -318,9 +363,11 @@ func (c *channelBuilder) PushFrame(id txID, frame []byte) { ...@@ -318,9 +363,11 @@ func (c *channelBuilder) PushFrame(id txID, frame []byte) {
} }
var ( var (
ErrInputTargetReached = errors.New("target amount of input data reached") ErrInputTargetReached = errors.New("target amount of input data reached")
ErrMaxFrameIndex = errors.New("max frame index reached (uint16)") ErrMaxFrameIndex = errors.New("max frame index reached (uint16)")
ErrChannelTimedOut = errors.New("channel timed out") ErrMaxDurationReached = errors.New("max channel duration reached")
ErrChannelTimeoutClose = errors.New("close to channel timeout")
ErrSeqWindowClose = errors.New("close to sequencer window timeout")
) )
type ChannelFullError struct { type ChannelFullError struct {
......
...@@ -188,9 +188,6 @@ func (s *channelManager) nextTxData() ([]byte, txID, error) { ...@@ -188,9 +188,6 @@ func (s *channelManager) nextTxData() ([]byte, txID, error) {
// It currently only uses one frame per transaction. If the pending channel is // It currently only uses one frame per transaction. If the pending channel is
// full, it only returns the remaining frames of this channel until it got // full, it only returns the remaining frames of this channel until it got
// successfully fully sent to L1. It returns io.EOF if there's no pending frame. // successfully fully sent to L1. It returns io.EOF if there's no pending frame.
//
// It currently ignores the l1Head provided and doesn't track channel timeouts
// or the sequencer window span yet.
func (s *channelManager) TxData(l1Head eth.BlockID) ([]byte, txID, error) { func (s *channelManager) TxData(l1Head eth.BlockID) ([]byte, txID, error) {
dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame() dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks)) s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))
...@@ -211,7 +208,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) ([]byte, txID, error) { ...@@ -211,7 +208,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) ([]byte, txID, error) {
return nil, txID{}, err return nil, txID{}, err
} }
s.checkTimeout(l1Head) s.registerL1Block(l1Head)
if err := s.processBlocks(); err != nil { if err := s.processBlocks(); err != nil {
return nil, txID{}, err return nil, txID{}, err
...@@ -239,14 +236,13 @@ func (s *channelManager) ensurePendingChannel(l1Head eth.BlockID) error { ...@@ -239,14 +236,13 @@ func (s *channelManager) ensurePendingChannel(l1Head eth.BlockID) error {
return nil return nil
} }
// checkTimeout checks the block timeout on the pending channel. // registerL1Block registers the given block at the pending channel.
func (s *channelManager) checkTimeout(l1Head eth.BlockID) { func (s *channelManager) registerL1Block(l1Head eth.BlockID) {
s.pendingChannel.CheckTimeout(l1Head.Number) s.pendingChannel.RegisterL1Block(l1Head.Number)
ferr := s.pendingChannel.FullErr() s.log.Debug("new L1-block registered at channel builder",
s.log.Debug("timeout triggered",
"l1Head", l1Head, "l1Head", l1Head,
"timed_out", errors.Is(ferr, ErrChannelTimedOut), "channel_full", s.pendingChannel.IsFull(),
"full_reason", ferr, "full_reason", s.pendingChannel.FullErr(),
) )
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment