diff --git a/op-batcher/batcher/channel_builder.go b/op-batcher/batcher/channel_builder.go new file mode 100644 index 0000000000000000000000000000000000000000..783d814f52859d77d527660b4554cc144bf765c3 --- /dev/null +++ b/op-batcher/batcher/channel_builder.go @@ -0,0 +1,281 @@ +package batcher + +import ( + "bytes" + "errors" + "fmt" + "io" + "math" + + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum/go-ethereum/core/types" +) + +type ( + // channelBuilder uses a ChannelOut to create a channel with output frame + // size approximation. + channelBuilder struct { + cfg ChannelConfig + + // marked as full if a) max RLP input bytes, b) max num frames or c) max + // allowed frame index (uint16) has been reached + fullErr error + // current channel + co *derive.ChannelOut + // list of blocks in the channel. Saved in case the channel must be rebuilt + blocks []*types.Block + // frames data queue, to be send as txs + frames []taggedData + } + + ChannelConfig struct { + // ChannelTimeout is the maximum duration, in seconds, to attempt completing + // an opened channel. The batcher can decide to set it shorter than the + // actual timeout, since submitting continued channel data to L1 is not + // instantaneous. It's not worth it to work with nearly timed-out channels. + ChannelTimeout uint64 + // The maximum byte-size a frame can have. + MaxFrameSize uint64 + // The target number of frames to create per channel. Note that if the + // realized compression ratio is worse than the approximate, more frames may + // actually be created. This also depends on how close TargetFrameSize is to + // MaxFrameSize. + TargetFrameSize uint64 + // The target number of frames to create in this channel. If the realized + // compression ratio is worse than approxComprRatio, additional leftover + // frame(s) might get created. + TargetNumFrames int + // Approximated compression ratio to assume. Should be slightly smaller than + // average from experiments to avoid the chances of creating a small + // additional leftover frame. + ApproxComprRatio float64 + } + + ChannelFullError struct { + Err error + } +) + +func (e *ChannelFullError) Error() string { + return "channel full: " + e.Err.Error() +} + +func (e *ChannelFullError) Unwrap() error { + return e.Err +} + +var ( + ErrInputTargetReached = errors.New("target amount of input data reached") + ErrMaxFrameIndex = errors.New("max frame index reached (uint16)") +) + +// InputThreshold calculates the input data threshold in bytes from the given +// parameters. +func (c ChannelConfig) InputThreshold() uint64 { + return uint64(float64(c.TargetNumFrames) * float64(c.TargetFrameSize) / c.ApproxComprRatio) +} + +func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) { + co, err := derive.NewChannelOut() + if err != nil { + return nil, err + } + + return &channelBuilder{ + cfg: cfg, + co: co, + }, nil +} + +func (c *channelBuilder) ID() derive.ChannelID { + return c.co.ID() +} + +// InputBytes returns to total amount of input bytes added to the channel. +func (c *channelBuilder) InputBytes() int { + return c.co.InputBytes() +} + +// Blocks returns a backup list of all blocks that were added to the channel. It +// can be used in case the channel needs to be rebuilt. +func (c *channelBuilder) Blocks() []*types.Block { + return c.blocks +} + +// Reset resets the internal state of the channel builder so that it can be +// reused. Note that a new channel id is also generated by Reset. +func (c *channelBuilder) Reset() error { + c.blocks = c.blocks[:0] + c.frames = c.frames[:0] + return c.co.Reset() +} + +// AddBlock adds a block to the channel compression pipeline. IsFull should be +// called aftewards to test whether the channel is full. If full, a new channel +// must be started. +// +// AddBlock returns a ChannelFullError if called even though the channel is +// already full. See description of FullErr for details. +// +// Call OutputFrames() afterwards to create frames. +func (c *channelBuilder) AddBlock(block *types.Block) error { + if c.IsFull() { + return c.FullErr() + } + + _, err := c.co.AddBlock(block) + if errors.Is(err, derive.ErrTooManyRLPBytes) { + c.setFullErr(err) + return c.FullErr() + } else if err != nil { + return fmt.Errorf("adding block to channel out: %w", err) + } + c.blocks = append(c.blocks, block) + + if c.InputTargetReached() { + c.setFullErr(ErrInputTargetReached) + // Adding this block still worked, so don't return error, just mark as full + } + + return nil +} + +// InputTargetReached says whether the target amount of input data has been +// reached in this channel builder. No more blocks can be added afterwards. +func (c *channelBuilder) InputTargetReached() bool { + return uint64(c.co.InputBytes()) >= c.cfg.InputThreshold() +} + +// IsFull returns whether the channel is full. +// FullErr returns the reason for the channel being full. +func (c *channelBuilder) IsFull() bool { + return c.fullErr != nil +} + +// FullErr returns the reason why the channel is full. If not full yet, it +// returns nil. +// +// It returns a ChannelFullError wrapping one of three possible reasons for the +// channel being full: +// - ErrInputTargetReached if the target amount of input data has been reached, +// - derive.MaxRLPBytesPerChannel if the general maximum amount of input data +// would have been exceeded by the latest AddBlock call, +// - ErrMaxFrameIndex if the maximum number of frames has been generated (uint16) +func (c *channelBuilder) FullErr() error { + return c.fullErr +} + +func (c *channelBuilder) setFullErr(err error) { + c.fullErr = &ChannelFullError{Err: err} +} + +// OutputFrames creates new frames with the channel out. It should be called +// after AddBlock and before iterating over available frames with HasFrame and +// NextFrame. +// +// If the input data target hasn't been reached yet, it will conservatively only +// pull readily available frames from the compression output. +// If the target has been reached, the channel is closed and all remaining +// frames will be created, possibly with a small leftover frame. +func (c *channelBuilder) OutputFrames() error { + if c.IsFull() { + return c.closeAndOutputAllFrames() + } + return c.outputReadyFrames() +} + +// outputReadyFrames creates new frames as long as there's enough data ready in +// the channel out compression pipeline. +// +// This is part of an optimization to already generate frames and send them off +// as txs while still collecting blocks in the channel builder. +func (c *channelBuilder) outputReadyFrames() error { + // TODO: Decide whether we want to fill frames to max size and use target + // only for estimation, or use target size. + for c.co.ReadyBytes() >= int(c.cfg.MaxFrameSize) { + if err := c.outputFrame(); err == io.EOF { + return nil + } else if err != nil { + return err + } + } + return nil +} + +func (c *channelBuilder) closeAndOutputAllFrames() error { + if err := c.co.Close(); err != nil { + return fmt.Errorf("closing channel out: %w", err) + } + + for { + if err := c.outputFrame(); err == io.EOF { + return nil + } else if err != nil { + return err + } + } +} + +// outputFrame creates one new frame and adds it to the frames queue. +// Note that compressed output data must be available on the underlying +// ChannelOut, or an empty frame will be produced. +func (c *channelBuilder) outputFrame() error { + var buf bytes.Buffer + fn, err := c.co.OutputFrame(&buf, c.cfg.MaxFrameSize) + if err != io.EOF && err != nil { + return fmt.Errorf("writing frame[%d]: %w", fn, err) + } + + // Mark as full if max index reached + // TODO: If there's still data in the compression pipeline of the channel out, + // we would miss it and the whole channel would be broken because the last + // frames would never be generated... + // Hitting the max index is impossible with current parameters, so ignore for + // now. Note that in order to properly catch this, we'd need to call Flush + // after every block addition to estimate how many more frames are coming. + if fn == math.MaxUint16 { + c.setFullErr(ErrMaxFrameIndex) + } + + frame := taggedData{ + id: txID{chID: c.co.ID(), frameNumber: fn}, + data: buf.Bytes(), + } + c.frames = append(c.frames, frame) + return err // possibly io.EOF (last frame) +} + +// HasFrame returns whether there's any available frame. If true, it can be +// popped using NextFrame(). +// +// Call OutputFrames before to create new frames from the channel out +// compression pipeline. +func (c *channelBuilder) HasFrame() bool { + return len(c.frames) > 0 +} + +func (c *channelBuilder) NumFrames() int { + return len(c.frames) +} + +// NextFrame returns the next available frame. +// HasFrame must be called prior to check if there's a next frame available. +// Panics if called when there's no next frame. +func (c *channelBuilder) NextFrame() (txID, []byte) { + if len(c.frames) == 0 { + panic("no next frame") + } + + f := c.frames[0] + c.frames = c.frames[1:] + return f.id, f.data +} + +// PushFrame adds the frame back to the internal frames queue. Panics if not of +// the same channel. +func (c *channelBuilder) PushFrame(id txID, frame []byte) { + if id.chID != c.ID() { + panic("wrong channel") + } + c.frames = append(c.frames, taggedData{id: id, data: frame}) +} diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index b9556a664483d8d2daa5ecc25c12c04192ac955d..5a15854e7de16cae9b34c2a96baf0694ffcaa202 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -1,7 +1,6 @@ package batcher import ( - "bytes" "errors" "fmt" "io" @@ -9,6 +8,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" ) @@ -46,28 +46,28 @@ type taggedData struct { // channel. // Functions on channelManager are not safe for concurrent access. type channelManager struct { - log log.Logger - channelTimeout uint64 + log log.Logger + cfg ChannelConfig // All blocks since the last request for new tx data. blocks []*types.Block - datas []taggedData + // last block hash - for reorg detection + tip common.Hash // Pending data returned by TxData waiting on Tx Confirmed/Failed - // id of the pending channel - pendingChannel derive.ChannelID - // list of blocks in the channel. Saved in case the channel must be rebuilt - pendingBlocks []*types.Block + + // pending channel builder + pendingChannel *channelBuilder // Set of unconfirmed txID -> frame data. For tx resubmission pendingTransactions map[txID][]byte // Set of confirmed txID -> inclusion block. For determining if the channel is timed out confirmedTransactions map[txID]eth.BlockID } -func NewChannelManager(log log.Logger, channelTimeout uint64) *channelManager { +func NewChannelManager(log log.Logger, cfg ChannelConfig) *channelManager { return &channelManager{ log: log, - channelTimeout: channelTimeout, + cfg: cfg, pendingTransactions: make(map[txID][]byte), confirmedTransactions: make(map[txID]eth.BlockID), } @@ -78,7 +78,8 @@ func NewChannelManager(log log.Logger, channelTimeout uint64) *channelManager { func (s *channelManager) Clear() { s.log.Trace("clearing channel manager state") s.blocks = s.blocks[:0] - s.datas = s.datas[:0] + s.tip = common.Hash{} + s.clearPendingChannel() } // TxFailed records a transaction as failed. It will attempt to resubmit the data @@ -86,10 +87,10 @@ func (s *channelManager) Clear() { func (s *channelManager) TxFailed(id txID) { if data, ok := s.pendingTransactions[id]; ok { s.log.Trace("marked transaction as failed", "id", id) - s.datas = append(s.datas, taggedData{data, id}) + s.pendingChannel.PushFrame(id, data) delete(s.pendingTransactions, id) } else { - s.log.Info("marked transaction as failed despite having no record of it.", "id", id) + s.log.Warn("unknown transaction marked as failed", "id", id) } } @@ -100,7 +101,7 @@ func (s *channelManager) TxFailed(id txID) { func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { s.log.Trace("marked transaction as confirmed", "id", id, "block", inclusionBlock) if _, ok := s.pendingTransactions[id]; !ok { - s.log.Info("marked transaction as confirmed despite having no record of it", "id", id, "block", inclusionBlock) + s.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock) // TODO: This can occur if we clear the channel while there are still pending transactions // We need to keep track of stale transactions instead return @@ -111,13 +112,13 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { // If this channel timed out, put the pending blocks back into the local saved blocks // and then reset this state so it can try to build a new channel. if s.pendingChannelIsTimedOut() { - s.log.Warn("Channel timed out", "chID", s.pendingChannel) - s.blocks = append(s.pendingBlocks, s.blocks...) + s.log.Warn("Channel timed out", "chID", s.pendingChannel.ID()) + s.blocks = append(s.pendingChannel.Blocks(), s.blocks...) s.clearPendingChannel() } // If we are done with this channel, record that. if s.pendingChannelIsFullySubmitted() { - s.log.Info("Channel is fully submitted", "chID", s.pendingChannel) + s.log.Info("Channel is fully submitted", "chID", s.pendingChannel.ID()) s.clearPendingChannel() } } @@ -125,8 +126,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { // clearPendingChannel resets all pending state back to an initialized but empty state. // TODO: Create separate "pending" state func (s *channelManager) clearPendingChannel() { - s.pendingChannel = derive.ChannelID{} - s.pendingBlocks = nil + s.pendingChannel = nil s.pendingTransactions = make(map[txID][]byte) s.confirmedTransactions = make(map[txID]eth.BlockID) } @@ -135,7 +135,7 @@ func (s *channelManager) clearPendingChannel() { // A channel has timed out if the difference in L1 Inclusion blocks between // the first & last included block is greater than or equal to the channel timeout. func (s *channelManager) pendingChannelIsTimedOut() bool { - if s.pendingChannel == (derive.ChannelID{}) { + if s.pendingChannel == nil { return false // no channel to be timed out } // No confirmed transactions => not timed out @@ -153,130 +153,133 @@ func (s *channelManager) pendingChannelIsTimedOut() bool { max = inclusionBlock.Number } } - return max-min >= s.channelTimeout + return max-min >= s.cfg.ChannelTimeout } // pendingChannelIsFullySubmitted returns true if the channel has been fully submitted. func (s *channelManager) pendingChannelIsFullySubmitted() bool { - if s.pendingChannel == (derive.ChannelID{}) { + if s.pendingChannel == nil { return false // todo: can decide either way here. Nonsensical answer though } - return len(s.pendingTransactions)+len(s.datas) == 0 -} - -// blocksToFrames turns a set of blocks into a set of frames inside a channel. -// It will only create a single channel which contains up to `MAX_RLP_BYTES`. Any -// blocks not added to the channel are returned. It uses the max supplied frame size. -func blocksToFrames(blocks []*types.Block, maxFrameSize uint64) (derive.ChannelID, [][]byte, []*types.Block, error) { - ch, err := derive.NewChannelOut() - if err != nil { - return derive.ChannelID{}, nil, nil, err - } - - i := 0 - for ; i < len(blocks); i++ { - if err := ch.AddBlock(blocks[i]); err == derive.ErrTooManyRLPBytes { - break - } else if err != nil { - return derive.ChannelID{}, nil, nil, err - } - } - if err := ch.Close(); err != nil { - return derive.ChannelID{}, nil, nil, err - } - - var frames [][]byte - for { - var buf bytes.Buffer - buf.WriteByte(derive.DerivationVersion0) - err := ch.OutputFrame(&buf, maxFrameSize-1) - if err != io.EOF && err != nil { - return derive.ChannelID{}, nil, nil, err - } - frames = append(frames, buf.Bytes()) - if err == io.EOF { - break - } - } - return ch.ID(), frames, blocks[i:], nil + return s.pendingChannel.IsFull() && len(s.pendingTransactions)+s.pendingChannel.NumFrames() == 0 } // nextTxData pops off s.datas & handles updating the internal state func (s *channelManager) nextTxData() ([]byte, txID, error) { - if len(s.datas) != 0 { - r := s.datas[0] - s.log.Trace("returning next tx data", "id", r.id) - s.pendingTransactions[r.id] = r.data - s.datas = s.datas[1:] - return r.data, r.id, nil - } else { + if s.pendingChannel == nil || !s.pendingChannel.HasFrame() { + s.log.Trace("no next tx data") return nil, txID{}, io.EOF // TODO: not enough data error instead } + + id, data := s.pendingChannel.NextFrame() + // prepend version byte for first frame of transaction + // TODO: more memory efficient solution; shouldn't be responsibility of + // channelBuilder though. + data = append([]byte{0}, data...) + + s.log.Trace("returning next tx data", "id", id) + s.pendingTransactions[id] = data + return data, id, nil } -// TxData returns the next tx.data that should be submitted to L1. -// It is very simple & currently ignores the l1Head provided (this will change). -// It may buffer very large channels as well. +// TxData returns the next tx data that should be submitted to L1. +// +// It currently only uses one frame per transaction. If the pending channel is +// full, it only returns the remaining frames of this channel until it got +// successfully fully sent to L1. It returns io.EOF if there's no pending frame. +// +// It currently ignores the l1Head provided and doesn't track channel timeouts +// or the sequencer window span yet. func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) { - channelPending := s.pendingChannel != (derive.ChannelID{}) - s.log.Debug("Requested tx data", "l1Head", l1Head, "channel_pending", channelPending, "block_count", len(s.blocks)) + dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame() + s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks)) - // Short circuit if there is a pending channel. - // We either submit the next frame from that channel or - if channelPending { + // Short circuit if there is a pending frame. + if dataPending { return s.nextTxData() } + + // No pending frame, so we have to add new blocks to the channel + // If we have no saved blocks, we will not be able to create valid frames if len(s.blocks) == 0 { return nil, txID{}, io.EOF } - // Select range of blocks - end := len(s.blocks) - if end > 100 { - end = 100 - } - blocks := s.blocks[:end] - s.blocks = s.blocks[end:] - - chID, frames, leftOverBlocks, err := blocksToFrames(blocks, 120_000) - // If the range of blocks serialized to be too large, restore - // blocks that could not be included inside the channel - if len(leftOverBlocks) != 0 { - s.blocks = append(leftOverBlocks, s.blocks...) - } - // TODO: Ensure that len(frames) < math.MaxUint16. Should not happen though. One tricky part - // is ensuring that s.blocks is properly restored. - if err != nil { - s.log.Warn("Failed to create channel from blocks", "err", err) + if err := s.ensurePendingChannel(l1Head); err != nil { return nil, txID{}, err } - s.log.Info("Created channel", "chID", chID, "frame_count", len(frames), "l1Head", l1Head) - var t []taggedData - for i, data := range frames { - t = append(t, taggedData{data: data, id: txID{chID: chID, frameNumber: uint16(i)}}) + if err := s.addBlocks(); err != nil { + return nil, txID{}, err } - // Load up pending state. Note: pending transactions is taken care of by nextTxData - s.datas = t - s.pendingChannel = chID - s.pendingBlocks = blocks[:len(leftOverBlocks)] + if err := s.pendingChannel.OutputFrames(); err != nil { + return nil, txID{}, fmt.Errorf("creating frames with channel builder: %w", err) + } return s.nextTxData() +} + +func (s *channelManager) ensurePendingChannel(l1Head eth.L1BlockRef) error { + if s.pendingChannel != nil { + return nil + } + + cb, err := newChannelBuilder(s.cfg) + if err != nil { + return fmt.Errorf("creating new channel: %w", err) + } + s.pendingChannel = cb + s.log.Info("Created channel", "chID", cb.ID(), "l1Head", l1Head) + + return nil +} +// addBlocks adds blocks from the blocks queue to the pending channel until +// either the queue got exhausted or the channel is full. +func (s *channelManager) addBlocks() error { + var blocksAdded int + var _chFullErr *ChannelFullError // throw away, just for type checking + for i, block := range s.blocks { + if err := s.pendingChannel.AddBlock(block); errors.As(err, &_chFullErr) { + // current block didn't get added because channel is already full + break + } else if err != nil { + return fmt.Errorf("adding block[%d] to channel builder: %w", i, err) + } + blocksAdded += 1 + // current block got added but channel is now full + if s.pendingChannel.IsFull() { + break + } + } + + s.log.Debug("Added blocks to channel", + "blocks_added", blocksAdded, + "channel_full", s.pendingChannel.IsFull(), + "blocks_pending", len(s.blocks)-blocksAdded, + "input_bytes", s.pendingChannel.InputBytes(), + ) + if blocksAdded == len(s.blocks) { + // all blocks processed, reuse slice + s.blocks = s.blocks[:0] + } else { + // remove processed blocks + s.blocks = s.blocks[blocksAdded:] + } + return nil } // AddL2Block saves an L2 block to the internal state. It returns ErrReorg // if the block does not extend the last block loaded into the state. -// If no block is already in the channel, the the parent hash check is skipped. -// TODO: Phantom last block b/c if the local state is fully drained we can reorg without realizing it. +// If no blocks were added yet, the parent hash check is skipped. func (s *channelManager) AddL2Block(block *types.Block) error { - if len(s.blocks) > 0 { - if s.blocks[len(s.blocks)-1].Hash() != block.ParentHash() { - return ErrReorg - } + if s.tip != (common.Hash{}) && s.tip != block.ParentHash() { + return ErrReorg } s.blocks = append(s.blocks, block) + s.tip = block.Hash() + return nil } diff --git a/op-batcher/batcher/config.go b/op-batcher/batcher/config.go index d511c6baef18dc1097e107200857720beb4d31d8..d558793ed0fa3335065bcbb80324245d4f0e0376 100644 --- a/op-batcher/batcher/config.go +++ b/op-batcher/batcher/config.go @@ -25,12 +25,6 @@ type Config struct { // RollupRpc is the HTTP provider URL for the L2 rollup node. RollupRpc string - // MinL1TxSize is the minimum size of a batch tx submitted to L1. - MinL1TxSize uint64 - - // MaxL1TxSize is the maximum size of a batch tx submitted to L1. - MaxL1TxSize uint64 - // ChannelTimeout is the maximum amount of time to attempt completing an opened channel, // as opposed to submitting missing blocks in new channels ChannelTimeout uint64 @@ -72,6 +66,19 @@ type Config struct { /* Optional Params */ + // MaxL1TxSize is the maximum size of a batch tx submitted to L1. + MaxL1TxSize uint64 + + // TargetL1TxSize is the target size of a batch tx submitted to L1. + TargetL1TxSize uint64 + + // TargetNumFrames is the target number of frames per channel. + TargetNumFrames int + + // ApproxComprRatio is the approximate compression ratio (<= 1.0) of the used + // compression algorithm. + ApproxComprRatio float64 + LogConfig oplog.CLIConfig MetricsConfig opmetrics.CLIConfig @@ -105,16 +112,20 @@ func (c Config) Check() error { func NewConfig(ctx *cli.Context) Config { return Config{ /* Required Flags */ - L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name), - L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name), - RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name), - MinL1TxSize: ctx.GlobalUint64(flags.MinL1TxSizeBytesFlag.Name), + L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name), + L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name), + RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name), + ChannelTimeout: ctx.GlobalUint64(flags.ChannelTimeoutFlag.Name), + PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name), + NumConfirmations: ctx.GlobalUint64(flags.NumConfirmationsFlag.Name), + SafeAbortNonceTooLowCount: ctx.GlobalUint64(flags.SafeAbortNonceTooLowCountFlag.Name), + ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name), + + /* Optional Flags */ MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name), - ChannelTimeout: ctx.GlobalUint64(flags.ChannelTimeoutFlag.Name), - PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name), - NumConfirmations: ctx.GlobalUint64(flags.NumConfirmationsFlag.Name), - SafeAbortNonceTooLowCount: ctx.GlobalUint64(flags.SafeAbortNonceTooLowCountFlag.Name), - ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name), + TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name), + TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name), + ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name), Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name), SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name), PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name), diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 9822b6eab298270f36d791ece5114138eec9b2f1..2f2c7a0134645fc44faf7e35a7a55b3cccb69154 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -146,12 +146,16 @@ func NewBatchSubmitterWithSigner(cfg Config, addr common.Address, signer SignerF L1Client: l1Client, L2Client: l2Client, RollupNode: rollupClient, - MinL1TxSize: cfg.MinL1TxSize, - MaxL1TxSize: cfg.MaxL1TxSize, BatchInboxAddress: batchInboxAddress, - ChannelTimeout: cfg.ChannelTimeout, - ChainID: chainID, - PollInterval: cfg.PollInterval, + Channel: ChannelConfig{ + ChannelTimeout: cfg.ChannelTimeout, + MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version + TargetFrameSize: cfg.TargetL1TxSize - 1, // subtract 1 byte for version + TargetNumFrames: cfg.TargetNumFrames, + ApproxComprRatio: cfg.ApproxComprRatio, + }, + ChainID: chainID, + PollInterval: cfg.PollInterval, } ctx, cancel := context.WithCancel(context.Background()) @@ -162,7 +166,7 @@ func NewBatchSubmitterWithSigner(cfg Config, addr common.Address, signer SignerF txMgr: NewTransactionManager(l, txManagerConfig, batchInboxAddress, chainID, addr, l1Client, signer(chainID)), done: make(chan struct{}), log: l, - state: NewChannelManager(l, cfg.ChannelTimeout), + state: NewChannelManager(l, batcherCfg.Channel), // TODO: this context only exists because the event loop doesn't reach done // if the tx manager is blocking forever due to e.g. insufficient balance. ctx: ctx, diff --git a/op-batcher/batcher/seq_driver.go b/op-batcher/batcher/seq_driver.go index 21da97971f4a4222e53f089f237530c4d866f6a1..d9cd2de94813909d6d760846ebdf62625d044098 100644 --- a/op-batcher/batcher/seq_driver.go +++ b/op-batcher/batcher/seq_driver.go @@ -23,17 +23,11 @@ type DriverConfig struct { RollupNode *sources.RollupClient - // Limit the size of txs - MinL1TxSize uint64 - MaxL1TxSize uint64 - // Where to send the batch txs to. BatchInboxAddress common.Address - // The batcher can decide to set it shorter than the actual timeout, - // since submitting continued channel data to L1 is not instantaneous. - // It's not worth it to work with nearly timed-out channels. - ChannelTimeout uint64 + // Channel creation parameters + Channel ChannelConfig // Chain ID of the L1 chain to submit txs to. ChainID *big.Int diff --git a/op-batcher/flags/flags.go b/op-batcher/flags/flags.go index 545b98fbc1511dd8acc9da5f8900bff4849e12d6..cf2c61330d9f27c71573d421febcc4bde1f59670 100644 --- a/op-batcher/flags/flags.go +++ b/op-batcher/flags/flags.go @@ -14,7 +14,7 @@ import ( const envVarPrefix = "OP_BATCHER" var ( - /* Required Flags */ + /* Required flags */ L1EthRpcFlag = cli.StringFlag{ Name: "l1-eth-rpc", @@ -34,21 +34,9 @@ var ( Required: true, EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ROLLUP_RPC"), } - MinL1TxSizeBytesFlag = cli.Uint64Flag{ - Name: "min-l1-tx-size-bytes", - Usage: "The minimum size of a batch tx submitted to L1.", - Required: true, - EnvVar: opservice.PrefixEnvVar(envVarPrefix, "MIN_L1_TX_SIZE_BYTES"), - } - MaxL1TxSizeBytesFlag = cli.Uint64Flag{ - Name: "max-l1-tx-size-bytes", - Usage: "The maximum size of a batch tx submitted to L1.", - Required: true, - EnvVar: opservice.PrefixEnvVar(envVarPrefix, "MAX_L1_TX_SIZE_BYTES"), - } ChannelTimeoutFlag = cli.Uint64Flag{ Name: "channel-timeout", - Usage: "The maximum amount of time to attempt completing an opened channel, as opposed to submitting L2 blocks into a new channel.", + Usage: "The maximum duration (in seconds) to attempt completing an opened channel, as opposed to submitting L2 blocks into a new channel.", Required: true, EnvVar: opservice.PrefixEnvVar(envVarPrefix, "CHANNEL_TIMEOUT"), } @@ -81,6 +69,39 @@ var ( Required: true, EnvVar: opservice.PrefixEnvVar(envVarPrefix, "RESUBMISSION_TIMEOUT"), } + SequencerBatchInboxAddressFlag = cli.StringFlag{ + Name: "sequencer-batch-inbox-address", + Usage: "L1 Address to receive batch transactions", + Required: true, + EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SEQUENCER_BATCH_INBOX_ADDRESS"), + } + + /* Optional flags */ + + MaxL1TxSizeBytesFlag = cli.Uint64Flag{ + Name: "max-l1-tx-size-bytes", + Usage: "The maximum size of a batch tx submitted to L1.", + Value: 120_000, + EnvVar: opservice.PrefixEnvVar(envVarPrefix, "MAX_L1_TX_SIZE_BYTES"), + } + TargetL1TxSizeBytesFlag = cli.Uint64Flag{ + Name: "target-l1-tx-size-bytes", + Usage: "The target size of a batch tx submitted to L1.", + Value: 100_000, + EnvVar: opservice.PrefixEnvVar(envVarPrefix, "TARGET_L1_TX_SIZE_BYTES"), + } + TargetNumFramesFlag = cli.IntFlag{ + Name: "target-num-frames", + Usage: "The target number of frames to create per channel", + Value: 1, + EnvVar: opservice.PrefixEnvVar(envVarPrefix, "TARGET_NUM_FRAMES"), + } + ApproxComprRatioFlag = cli.Float64Flag{ + Name: "approx-compr-ratio", + Usage: "The approximate compression ratio (<= 1.0)", + Value: 1.0, + EnvVar: opservice.PrefixEnvVar(envVarPrefix, "APPROX_COMPR_RATIO"), + } MnemonicFlag = cli.StringFlag{ Name: "mnemonic", Usage: "The mnemonic used to derive the wallets for either the " + @@ -98,20 +119,12 @@ var ( Usage: "The private key to use with the l2output wallet. Must not be used with mnemonic.", EnvVar: opservice.PrefixEnvVar(envVarPrefix, "PRIVATE_KEY"), } - SequencerBatchInboxAddressFlag = cli.StringFlag{ - Name: "sequencer-batch-inbox-address", - Usage: "L1 Address to receive batch transactions", - Required: true, - EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SEQUENCER_BATCH_INBOX_ADDRESS"), - } ) var requiredFlags = []cli.Flag{ L1EthRpcFlag, L2EthRpcFlag, RollupRpcFlag, - MinL1TxSizeBytesFlag, - MaxL1TxSizeBytesFlag, ChannelTimeoutFlag, PollIntervalFlag, NumConfirmationsFlag, @@ -121,6 +134,10 @@ var requiredFlags = []cli.Flag{ } var optionalFlags = []cli.Flag{ + MaxL1TxSizeBytesFlag, + TargetL1TxSizeBytesFlag, + TargetNumFramesFlag, + ApproxComprRatioFlag, MnemonicFlag, SequencerHDPathFlag, PrivateKeyFlag, diff --git a/op-e2e/actions/garbage_channel_out.go b/op-e2e/actions/garbage_channel_out.go index 6604f9249d13c75569e30cc944e260c62469b617..c9431960b9848a65db9999b2b9672024c7e893f9 100644 --- a/op-e2e/actions/garbage_channel_out.go +++ b/op-e2e/actions/garbage_channel_out.go @@ -54,11 +54,11 @@ type Writer interface { type ChannelOutIface interface { ID() derive.ChannelID Reset() error - AddBlock(block *types.Block) error + AddBlock(block *types.Block) (uint64, error) ReadyBytes() int Flush() error Close() error - OutputFrame(w *bytes.Buffer, maxSize uint64) error + OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) } // Compile-time check for ChannelOutIface interface implementation for the ChannelOut type. @@ -135,19 +135,19 @@ func (co *GarbageChannelOut) Reset() error { // error that it returns is ErrTooManyRLPBytes. If this error // is returned, the channel should be closed and a new one // should be made. -func (co *GarbageChannelOut) AddBlock(block *types.Block) error { +func (co *GarbageChannelOut) AddBlock(block *types.Block) (uint64, error) { if co.closed { - return errors.New("already closed") + return 0, errors.New("already closed") } batch, err := blockToBatch(block) if err != nil { - return err + return 0, err } // We encode to a temporary buffer to determine the encoded length to // ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL var buf bytes.Buffer if err := rlp.Encode(&buf, batch); err != nil { - return err + return 0, err } if co.cfg.malformRLP { // Malform the RLP by incrementing the length prefix by 1. @@ -157,13 +157,13 @@ func (co *GarbageChannelOut) AddBlock(block *types.Block) error { buf.Write(bufBytes) } if co.rlpLength+buf.Len() > derive.MaxRLPBytesPerChannel { - return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", + return 0, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", buf.Len(), co.rlpLength, derive.MaxRLPBytesPerChannel, derive.ErrTooManyRLPBytes) } co.rlpLength += buf.Len() - _, err = io.Copy(co.compress, &buf) - return err + written, err := io.Copy(co.compress, &buf) + return uint64(written), err } // ReadyBytes returns the number of bytes that the channel out can immediately output into a frame. @@ -192,11 +192,12 @@ func (co *GarbageChannelOut) Close() error { // Returns io.EOF when the channel is closed & there are no more frames // Returns nil if there is still more buffered data. // Returns and error if it ran into an error during processing. -func (co *GarbageChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error { +func (co *GarbageChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) { f := derive.Frame{ ID: co.id, FrameNumber: uint16(co.frame), } + fn := f.FrameNumber // Copy data from the local buffer into the frame data buffer // Don't go past the maxSize with the fixed frame overhead. @@ -214,18 +215,18 @@ func (co *GarbageChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error f.Data = make([]byte, maxDataSize) if _, err := io.ReadFull(&co.buf, f.Data); err != nil { - return err + return fn, err } if err := f.MarshalBinary(w); err != nil { - return err + return fn, err } co.frame += 1 if f.IsLast { - return io.EOF + return fn, io.EOF } else { - return nil + return fn, nil } } diff --git a/op-e2e/actions/l2_batcher.go b/op-e2e/actions/l2_batcher.go index 71219a4e0757315b2a16dd22bed44eebe3f074e6..21470d453ed775c110e0e815e2553778d6bda45a 100644 --- a/op-e2e/actions/l2_batcher.go +++ b/op-e2e/actions/l2_batcher.go @@ -142,7 +142,7 @@ func (s *L2Batcher) ActL2BatchBuffer(t Testing) { s.l2BufferedBlock = syncStatus.SafeL2.ID() s.l2ChannelOut = nil } - if err := s.l2ChannelOut.AddBlock(block); err != nil { // should always succeed + if _, err := s.l2ChannelOut.AddBlock(block); err != nil { // should always succeed t.Fatalf("failed to add block to channel: %v", err) } s.l2BufferedBlock = eth.ToBlockID(block) @@ -168,7 +168,7 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing) { data := new(bytes.Buffer) data.WriteByte(derive.DerivationVersion0) // subtract one, to account for the version byte - if err := s.l2ChannelOut.OutputFrame(data, s.l2BatcherCfg.MaxL1TxSize-1); err == io.EOF { + if _, err := s.l2ChannelOut.OutputFrame(data, s.l2BatcherCfg.MaxL1TxSize-1); err == io.EOF { s.l2ChannelOut = nil s.l2Submitting = false } else if err != nil { @@ -218,7 +218,7 @@ func (s *L2Batcher) ActL2BatchSubmitGarbage(t Testing, kind GarbageKind) { data.WriteByte(derive.DerivationVersion0) // subtract one, to account for the version byte - if err := s.l2ChannelOut.OutputFrame(data, s.l2BatcherCfg.MaxL1TxSize-1); err == io.EOF { + if _, err := s.l2ChannelOut.OutputFrame(data, s.l2BatcherCfg.MaxL1TxSize-1); err == io.EOF { s.l2ChannelOut = nil s.l2Submitting = false } else if err != nil { diff --git a/op-e2e/migration_test.go b/op-e2e/migration_test.go index d1f2294a58ec646b4e541553b11535c07e4bfd1e..b74868df810d6ceddc84b59e306615a6b4414430 100644 --- a/op-e2e/migration_test.go +++ b/op-e2e/migration_test.go @@ -323,8 +323,10 @@ func TestMigration(t *testing.T) { L1EthRpc: forkedL1URL, L2EthRpc: gethNode.WSEndpoint(), RollupRpc: rollupNode.HTTPEndpoint(), - MinL1TxSize: 1, - MaxL1TxSize: 120000, + MaxL1TxSize: 120_000, + TargetL1TxSize: 1, + TargetNumFrames: 1, + ApproxComprRatio: 1.0, ChannelTimeout: deployCfg.ChannelTimeout, PollInterval: 50 * time.Millisecond, NumConfirmations: 1, diff --git a/op-e2e/setup.go b/op-e2e/setup.go index 5a5cde821d944957cb0a6f899a2981d4134064d7..109f8da63818d1069405d19808a1819c2fd717ce 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -526,8 +526,10 @@ func (cfg SystemConfig) Start() (*System, error) { L1EthRpc: sys.Nodes["l1"].WSEndpoint(), L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(), RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(), - MinL1TxSize: 1, - MaxL1TxSize: 120000, + MaxL1TxSize: 120_000, + TargetL1TxSize: 1, + TargetNumFrames: 1, + ApproxComprRatio: 1.0, ChannelTimeout: cfg.DeployConfig.ChannelTimeout, PollInterval: 50 * time.Millisecond, NumConfirmations: 1, diff --git a/op-e2e/system_test.go b/op-e2e/system_test.go index 00549d2aad16b62d44cb81226b165580661b241c..a8502e20fecfae3b9b6e6ef3513c04162a702157 100644 --- a/op-e2e/system_test.go +++ b/op-e2e/system_test.go @@ -883,7 +883,7 @@ func TestWithdrawals(t *testing.T) { require.Nil(t, err) // Get l2BlockNumber for proof generation - ctx, cancel = context.WithTimeout(context.Background(), 20*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second) + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second) defer cancel() blockNumber, err := withdrawals.WaitForFinalizationPeriod(ctx, l1Client, predeploys.DevOptimismPortalAddr, receipt.BlockNumber) require.Nil(t, err) @@ -933,7 +933,7 @@ func TestWithdrawals(t *testing.T) { require.Equal(t, types.ReceiptStatusSuccessful, proveReceipt.Status) // Wait for finalization and then create the Finalized Withdrawal Transaction - ctx, cancel = context.WithTimeout(context.Background(), 20*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second) + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second) defer cancel() _, err = withdrawals.WaitForFinalizationPeriod(ctx, l1Client, predeploys.DevOptimismPortalAddr, header.Number) require.Nil(t, err) @@ -1052,10 +1052,10 @@ func TestFees(t *testing.T) { err = l2Seq.SendTransaction(context.Background(), tx) require.Nil(t, err, "Sending L2 tx to sequencer") - _, err = waitForTransaction(tx.Hash(), l2Seq, 3*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second) + _, err = waitForTransaction(tx.Hash(), l2Seq, 4*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second) require.Nil(t, err, "Waiting for L2 tx on sequencer") - receipt, err := waitForTransaction(tx.Hash(), l2Verif, 3*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second) + receipt, err := waitForTransaction(tx.Hash(), l2Verif, 4*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second) require.Nil(t, err, "Waiting for L2 tx on verifier") require.Equal(t, types.ReceiptStatusSuccessful, receipt.Status, "TX should have succeeded") diff --git a/op-node/rollup/derive/channel_out.go b/op-node/rollup/derive/channel_out.go index 33a588f7a84c3c12c81ac5533c7ac0f6816c4c5d..ab4f7203b850eb1bb9df53e46225f7e5bfa8b2b1 100644 --- a/op-node/rollup/derive/channel_out.go +++ b/op-node/rollup/derive/channel_out.go @@ -64,39 +64,40 @@ func (co *ChannelOut) Reset() error { co.compress.Reset(&co.buf) co.closed = false _, err := rand.Read(co.id[:]) - if err != nil { - return err - } - return nil + return err } -// AddBlock adds a block to the channel. It returns an error -// if there is a problem adding the block. The only sentinel -// error that it returns is ErrTooManyRLPBytes. If this error -// is returned, the channel should be closed and a new one -// should be made. -func (co *ChannelOut) AddBlock(block *types.Block) error { +// AddBlock adds a block to the channel. It returns the RLP encoded byte size +// and an error if there is a problem adding the block. The only sentinel error +// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel +// should be closed and a new one should be made. +func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) { if co.closed { - return errors.New("already closed") + return 0, errors.New("already closed") } batch, err := blockToBatch(block) if err != nil { - return err + return 0, err } // We encode to a temporary buffer to determine the encoded length to // ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL var buf bytes.Buffer if err := rlp.Encode(&buf, batch); err != nil { - return err + return 0, err } if co.rlpLength+buf.Len() > MaxRLPBytesPerChannel { - return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", + return 0, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", buf.Len(), co.rlpLength, MaxRLPBytesPerChannel, ErrTooManyRLPBytes) } co.rlpLength += buf.Len() - _, err = io.Copy(co.compress, &buf) - return err + written, err := io.Copy(co.compress, &buf) + return uint64(written), err +} + +// InputBytes returns the total amount of RLP-encoded input bytes. +func (co *ChannelOut) InputBytes() int { + return co.rlpLength } // ReadyBytes returns the number of bytes that the channel out can immediately output into a frame. @@ -120,12 +121,13 @@ func (co *ChannelOut) Close() error { return co.compress.Close() } -// OutputFrame writes a frame to w with a given max size +// OutputFrame writes a frame to w with a given max size and returns the frame +// number. // Use `ReadyBytes`, `Flush`, and `Close` to modify the ready buffer. // Returns io.EOF when the channel is closed & there are no more frames // Returns nil if there is still more buffered data. // Returns and error if it ran into an error during processing. -func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error { +func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) { f := Frame{ ID: co.id, FrameNumber: uint16(co.frame), @@ -133,9 +135,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error { // Copy data from the local buffer into the frame data buffer // Don't go past the maxSize with the fixed frame overhead. - // Fixed overhead: 32 + 8 + 2 + 4 + 1 = 47 bytes. - // Add one extra byte for the version byte (for the entire L1 tx though) - maxDataSize := maxSize - 47 - 1 + // Fixed overhead: 16 + 2 + 4 + 1 = 23 bytes. + maxDataSize := maxSize - 23 if maxDataSize > uint64(co.buf.Len()) { maxDataSize = uint64(co.buf.Len()) // If we are closed & will not spill past the current frame @@ -147,18 +148,19 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error { f.Data = make([]byte, maxDataSize) if _, err := io.ReadFull(&co.buf, f.Data); err != nil { - return err + return 0, err } if err := f.MarshalBinary(w); err != nil { - return err + return 0, err } co.frame += 1 + fn := f.FrameNumber if f.IsLast { - return io.EOF + return fn, io.EOF } else { - return nil + return fn, nil } } diff --git a/op-node/rollup/derive/channel_out_test.go b/op-node/rollup/derive/channel_out_test.go index 5083724e5450222057871be08ca5dba296f066c2..42e927039733fd60a6afc2dc60a1057f01e317a0 100644 --- a/op-node/rollup/derive/channel_out_test.go +++ b/op-node/rollup/derive/channel_out_test.go @@ -22,7 +22,7 @@ func TestChannelOutAddBlock(t *testing.T) { }, nil, ) - err := cout.AddBlock(block) + _, err := cout.AddBlock(block) require.Error(t, err) require.Equal(t, ErrNotDepositTx, err) }) diff --git a/ops-bedrock/docker-compose.yml b/ops-bedrock/docker-compose.yml index 8e233b800f9704971df00eee52457a2f0607ebdd..bfe66478e40f221291e1d11fcc3e360688ee3c69 100644 --- a/ops-bedrock/docker-compose.yml +++ b/ops-bedrock/docker-compose.yml @@ -119,8 +119,10 @@ services: OP_BATCHER_L1_ETH_RPC: http://l1:8545 OP_BATCHER_L2_ETH_RPC: http://l2:8545 OP_BATCHER_ROLLUP_RPC: http://op-node:8545 - OP_BATCHER_MIN_L1_TX_SIZE_BYTES: 1 OP_BATCHER_MAX_L1_TX_SIZE_BYTES: 120000 + OP_BATCHER_TARGET_L1_TX_SIZE_BYTES: 624 + OP_BATCHER_TARGET_NUM_FRAMES: 1 + OP_BATCHER_APPROX_COMPR_RATIO: 1.0 OP_BATCHER_CHANNEL_TIMEOUT: 40 OP_BATCHER_POLL_INTERVAL: 1s OP_BATCHER_NUM_CONFIRMATIONS: 1