Commit 1214d486 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge pull request #4780 from ethereum-optimism/seb/batcher-impr-txsize

[ENG-2626] op-batcher: Efficient batching
parents ca810b29 21484beb
package batcher
import (
"bytes"
"errors"
"fmt"
"io"
"math"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types"
)
type (
// channelBuilder uses a ChannelOut to create a channel with output frame
// size approximation.
channelBuilder struct {
cfg ChannelConfig
// marked as full if a) max RLP input bytes, b) max num frames or c) max
// allowed frame index (uint16) has been reached
fullErr error
// current channel
co *derive.ChannelOut
// list of blocks in the channel. Saved in case the channel must be rebuilt
blocks []*types.Block
// frames data queue, to be send as txs
frames []taggedData
}
ChannelConfig struct {
// ChannelTimeout is the maximum duration, in seconds, to attempt completing
// an opened channel. The batcher can decide to set it shorter than the
// actual timeout, since submitting continued channel data to L1 is not
// instantaneous. It's not worth it to work with nearly timed-out channels.
ChannelTimeout uint64
// The maximum byte-size a frame can have.
MaxFrameSize uint64
// The target number of frames to create per channel. Note that if the
// realized compression ratio is worse than the approximate, more frames may
// actually be created. This also depends on how close TargetFrameSize is to
// MaxFrameSize.
TargetFrameSize uint64
// The target number of frames to create in this channel. If the realized
// compression ratio is worse than approxComprRatio, additional leftover
// frame(s) might get created.
TargetNumFrames int
// Approximated compression ratio to assume. Should be slightly smaller than
// average from experiments to avoid the chances of creating a small
// additional leftover frame.
ApproxComprRatio float64
}
ChannelFullError struct {
Err error
}
)
func (e *ChannelFullError) Error() string {
return "channel full: " + e.Err.Error()
}
func (e *ChannelFullError) Unwrap() error {
return e.Err
}
var (
ErrInputTargetReached = errors.New("target amount of input data reached")
ErrMaxFrameIndex = errors.New("max frame index reached (uint16)")
)
// InputThreshold calculates the input data threshold in bytes from the given
// parameters.
func (c ChannelConfig) InputThreshold() uint64 {
return uint64(float64(c.TargetNumFrames) * float64(c.TargetFrameSize) / c.ApproxComprRatio)
}
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) {
co, err := derive.NewChannelOut()
if err != nil {
return nil, err
}
return &channelBuilder{
cfg: cfg,
co: co,
}, nil
}
func (c *channelBuilder) ID() derive.ChannelID {
return c.co.ID()
}
// InputBytes returns to total amount of input bytes added to the channel.
func (c *channelBuilder) InputBytes() int {
return c.co.InputBytes()
}
// Blocks returns a backup list of all blocks that were added to the channel. It
// can be used in case the channel needs to be rebuilt.
func (c *channelBuilder) Blocks() []*types.Block {
return c.blocks
}
// Reset resets the internal state of the channel builder so that it can be
// reused. Note that a new channel id is also generated by Reset.
func (c *channelBuilder) Reset() error {
c.blocks = c.blocks[:0]
c.frames = c.frames[:0]
return c.co.Reset()
}
// AddBlock adds a block to the channel compression pipeline. IsFull should be
// called aftewards to test whether the channel is full. If full, a new channel
// must be started.
//
// AddBlock returns a ChannelFullError if called even though the channel is
// already full. See description of FullErr for details.
//
// Call OutputFrames() afterwards to create frames.
func (c *channelBuilder) AddBlock(block *types.Block) error {
if c.IsFull() {
return c.FullErr()
}
_, err := c.co.AddBlock(block)
if errors.Is(err, derive.ErrTooManyRLPBytes) {
c.setFullErr(err)
return c.FullErr()
} else if err != nil {
return fmt.Errorf("adding block to channel out: %w", err)
}
c.blocks = append(c.blocks, block)
if c.InputTargetReached() {
c.setFullErr(ErrInputTargetReached)
// Adding this block still worked, so don't return error, just mark as full
}
return nil
}
// InputTargetReached says whether the target amount of input data has been
// reached in this channel builder. No more blocks can be added afterwards.
func (c *channelBuilder) InputTargetReached() bool {
return uint64(c.co.InputBytes()) >= c.cfg.InputThreshold()
}
// IsFull returns whether the channel is full.
// FullErr returns the reason for the channel being full.
func (c *channelBuilder) IsFull() bool {
return c.fullErr != nil
}
// FullErr returns the reason why the channel is full. If not full yet, it
// returns nil.
//
// It returns a ChannelFullError wrapping one of three possible reasons for the
// channel being full:
// - ErrInputTargetReached if the target amount of input data has been reached,
// - derive.MaxRLPBytesPerChannel if the general maximum amount of input data
// would have been exceeded by the latest AddBlock call,
// - ErrMaxFrameIndex if the maximum number of frames has been generated (uint16)
func (c *channelBuilder) FullErr() error {
return c.fullErr
}
func (c *channelBuilder) setFullErr(err error) {
c.fullErr = &ChannelFullError{Err: err}
}
// OutputFrames creates new frames with the channel out. It should be called
// after AddBlock and before iterating over available frames with HasFrame and
// NextFrame.
//
// If the input data target hasn't been reached yet, it will conservatively only
// pull readily available frames from the compression output.
// If the target has been reached, the channel is closed and all remaining
// frames will be created, possibly with a small leftover frame.
func (c *channelBuilder) OutputFrames() error {
if c.IsFull() {
return c.closeAndOutputAllFrames()
}
return c.outputReadyFrames()
}
// outputReadyFrames creates new frames as long as there's enough data ready in
// the channel out compression pipeline.
//
// This is part of an optimization to already generate frames and send them off
// as txs while still collecting blocks in the channel builder.
func (c *channelBuilder) outputReadyFrames() error {
// TODO: Decide whether we want to fill frames to max size and use target
// only for estimation, or use target size.
for c.co.ReadyBytes() >= int(c.cfg.MaxFrameSize) {
if err := c.outputFrame(); err == io.EOF {
return nil
} else if err != nil {
return err
}
}
return nil
}
func (c *channelBuilder) closeAndOutputAllFrames() error {
if err := c.co.Close(); err != nil {
return fmt.Errorf("closing channel out: %w", err)
}
for {
if err := c.outputFrame(); err == io.EOF {
return nil
} else if err != nil {
return err
}
}
}
// outputFrame creates one new frame and adds it to the frames queue.
// Note that compressed output data must be available on the underlying
// ChannelOut, or an empty frame will be produced.
func (c *channelBuilder) outputFrame() error {
var buf bytes.Buffer
fn, err := c.co.OutputFrame(&buf, c.cfg.MaxFrameSize)
if err != io.EOF && err != nil {
return fmt.Errorf("writing frame[%d]: %w", fn, err)
}
// Mark as full if max index reached
// TODO: If there's still data in the compression pipeline of the channel out,
// we would miss it and the whole channel would be broken because the last
// frames would never be generated...
// Hitting the max index is impossible with current parameters, so ignore for
// now. Note that in order to properly catch this, we'd need to call Flush
// after every block addition to estimate how many more frames are coming.
if fn == math.MaxUint16 {
c.setFullErr(ErrMaxFrameIndex)
}
frame := taggedData{
id: txID{chID: c.co.ID(), frameNumber: fn},
data: buf.Bytes(),
}
c.frames = append(c.frames, frame)
return err // possibly io.EOF (last frame)
}
// HasFrame returns whether there's any available frame. If true, it can be
// popped using NextFrame().
//
// Call OutputFrames before to create new frames from the channel out
// compression pipeline.
func (c *channelBuilder) HasFrame() bool {
return len(c.frames) > 0
}
func (c *channelBuilder) NumFrames() int {
return len(c.frames)
}
// NextFrame returns the next available frame.
// HasFrame must be called prior to check if there's a next frame available.
// Panics if called when there's no next frame.
func (c *channelBuilder) NextFrame() (txID, []byte) {
if len(c.frames) == 0 {
panic("no next frame")
}
f := c.frames[0]
c.frames = c.frames[1:]
return f.id, f.data
}
// PushFrame adds the frame back to the internal frames queue. Panics if not of
// the same channel.
func (c *channelBuilder) PushFrame(id txID, frame []byte) {
if id.chID != c.ID() {
panic("wrong channel")
}
c.frames = append(c.frames, taggedData{id: id, data: frame})
}
package batcher package batcher
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"io" "io"
...@@ -9,6 +8,7 @@ import ( ...@@ -9,6 +8,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -46,28 +46,28 @@ type taggedData struct { ...@@ -46,28 +46,28 @@ type taggedData struct {
// channel. // channel.
// Functions on channelManager are not safe for concurrent access. // Functions on channelManager are not safe for concurrent access.
type channelManager struct { type channelManager struct {
log log.Logger log log.Logger
channelTimeout uint64 cfg ChannelConfig
// All blocks since the last request for new tx data. // All blocks since the last request for new tx data.
blocks []*types.Block blocks []*types.Block
datas []taggedData // last block hash - for reorg detection
tip common.Hash
// Pending data returned by TxData waiting on Tx Confirmed/Failed // Pending data returned by TxData waiting on Tx Confirmed/Failed
// id of the pending channel
pendingChannel derive.ChannelID // pending channel builder
// list of blocks in the channel. Saved in case the channel must be rebuilt pendingChannel *channelBuilder
pendingBlocks []*types.Block
// Set of unconfirmed txID -> frame data. For tx resubmission // Set of unconfirmed txID -> frame data. For tx resubmission
pendingTransactions map[txID][]byte pendingTransactions map[txID][]byte
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out // Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.BlockID confirmedTransactions map[txID]eth.BlockID
} }
func NewChannelManager(log log.Logger, channelTimeout uint64) *channelManager { func NewChannelManager(log log.Logger, cfg ChannelConfig) *channelManager {
return &channelManager{ return &channelManager{
log: log, log: log,
channelTimeout: channelTimeout, cfg: cfg,
pendingTransactions: make(map[txID][]byte), pendingTransactions: make(map[txID][]byte),
confirmedTransactions: make(map[txID]eth.BlockID), confirmedTransactions: make(map[txID]eth.BlockID),
} }
...@@ -78,7 +78,8 @@ func NewChannelManager(log log.Logger, channelTimeout uint64) *channelManager { ...@@ -78,7 +78,8 @@ func NewChannelManager(log log.Logger, channelTimeout uint64) *channelManager {
func (s *channelManager) Clear() { func (s *channelManager) Clear() {
s.log.Trace("clearing channel manager state") s.log.Trace("clearing channel manager state")
s.blocks = s.blocks[:0] s.blocks = s.blocks[:0]
s.datas = s.datas[:0] s.tip = common.Hash{}
s.clearPendingChannel()
} }
// TxFailed records a transaction as failed. It will attempt to resubmit the data // TxFailed records a transaction as failed. It will attempt to resubmit the data
...@@ -86,10 +87,10 @@ func (s *channelManager) Clear() { ...@@ -86,10 +87,10 @@ func (s *channelManager) Clear() {
func (s *channelManager) TxFailed(id txID) { func (s *channelManager) TxFailed(id txID) {
if data, ok := s.pendingTransactions[id]; ok { if data, ok := s.pendingTransactions[id]; ok {
s.log.Trace("marked transaction as failed", "id", id) s.log.Trace("marked transaction as failed", "id", id)
s.datas = append(s.datas, taggedData{data, id}) s.pendingChannel.PushFrame(id, data)
delete(s.pendingTransactions, id) delete(s.pendingTransactions, id)
} else { } else {
s.log.Info("marked transaction as failed despite having no record of it.", "id", id) s.log.Warn("unknown transaction marked as failed", "id", id)
} }
} }
...@@ -100,7 +101,7 @@ func (s *channelManager) TxFailed(id txID) { ...@@ -100,7 +101,7 @@ func (s *channelManager) TxFailed(id txID) {
func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
s.log.Trace("marked transaction as confirmed", "id", id, "block", inclusionBlock) s.log.Trace("marked transaction as confirmed", "id", id, "block", inclusionBlock)
if _, ok := s.pendingTransactions[id]; !ok { if _, ok := s.pendingTransactions[id]; !ok {
s.log.Info("marked transaction as confirmed despite having no record of it", "id", id, "block", inclusionBlock) s.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock)
// TODO: This can occur if we clear the channel while there are still pending transactions // TODO: This can occur if we clear the channel while there are still pending transactions
// We need to keep track of stale transactions instead // We need to keep track of stale transactions instead
return return
...@@ -111,13 +112,13 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { ...@@ -111,13 +112,13 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
// If this channel timed out, put the pending blocks back into the local saved blocks // If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel. // and then reset this state so it can try to build a new channel.
if s.pendingChannelIsTimedOut() { if s.pendingChannelIsTimedOut() {
s.log.Warn("Channel timed out", "chID", s.pendingChannel) s.log.Warn("Channel timed out", "chID", s.pendingChannel.ID())
s.blocks = append(s.pendingBlocks, s.blocks...) s.blocks = append(s.pendingChannel.Blocks(), s.blocks...)
s.clearPendingChannel() s.clearPendingChannel()
} }
// If we are done with this channel, record that. // If we are done with this channel, record that.
if s.pendingChannelIsFullySubmitted() { if s.pendingChannelIsFullySubmitted() {
s.log.Info("Channel is fully submitted", "chID", s.pendingChannel) s.log.Info("Channel is fully submitted", "chID", s.pendingChannel.ID())
s.clearPendingChannel() s.clearPendingChannel()
} }
} }
...@@ -125,8 +126,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { ...@@ -125,8 +126,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
// clearPendingChannel resets all pending state back to an initialized but empty state. // clearPendingChannel resets all pending state back to an initialized but empty state.
// TODO: Create separate "pending" state // TODO: Create separate "pending" state
func (s *channelManager) clearPendingChannel() { func (s *channelManager) clearPendingChannel() {
s.pendingChannel = derive.ChannelID{} s.pendingChannel = nil
s.pendingBlocks = nil
s.pendingTransactions = make(map[txID][]byte) s.pendingTransactions = make(map[txID][]byte)
s.confirmedTransactions = make(map[txID]eth.BlockID) s.confirmedTransactions = make(map[txID]eth.BlockID)
} }
...@@ -135,7 +135,7 @@ func (s *channelManager) clearPendingChannel() { ...@@ -135,7 +135,7 @@ func (s *channelManager) clearPendingChannel() {
// A channel has timed out if the difference in L1 Inclusion blocks between // A channel has timed out if the difference in L1 Inclusion blocks between
// the first & last included block is greater than or equal to the channel timeout. // the first & last included block is greater than or equal to the channel timeout.
func (s *channelManager) pendingChannelIsTimedOut() bool { func (s *channelManager) pendingChannelIsTimedOut() bool {
if s.pendingChannel == (derive.ChannelID{}) { if s.pendingChannel == nil {
return false // no channel to be timed out return false // no channel to be timed out
} }
// No confirmed transactions => not timed out // No confirmed transactions => not timed out
...@@ -153,130 +153,133 @@ func (s *channelManager) pendingChannelIsTimedOut() bool { ...@@ -153,130 +153,133 @@ func (s *channelManager) pendingChannelIsTimedOut() bool {
max = inclusionBlock.Number max = inclusionBlock.Number
} }
} }
return max-min >= s.channelTimeout return max-min >= s.cfg.ChannelTimeout
} }
// pendingChannelIsFullySubmitted returns true if the channel has been fully submitted. // pendingChannelIsFullySubmitted returns true if the channel has been fully submitted.
func (s *channelManager) pendingChannelIsFullySubmitted() bool { func (s *channelManager) pendingChannelIsFullySubmitted() bool {
if s.pendingChannel == (derive.ChannelID{}) { if s.pendingChannel == nil {
return false // todo: can decide either way here. Nonsensical answer though return false // todo: can decide either way here. Nonsensical answer though
} }
return len(s.pendingTransactions)+len(s.datas) == 0 return s.pendingChannel.IsFull() && len(s.pendingTransactions)+s.pendingChannel.NumFrames() == 0
}
// blocksToFrames turns a set of blocks into a set of frames inside a channel.
// It will only create a single channel which contains up to `MAX_RLP_BYTES`. Any
// blocks not added to the channel are returned. It uses the max supplied frame size.
func blocksToFrames(blocks []*types.Block, maxFrameSize uint64) (derive.ChannelID, [][]byte, []*types.Block, error) {
ch, err := derive.NewChannelOut()
if err != nil {
return derive.ChannelID{}, nil, nil, err
}
i := 0
for ; i < len(blocks); i++ {
if err := ch.AddBlock(blocks[i]); err == derive.ErrTooManyRLPBytes {
break
} else if err != nil {
return derive.ChannelID{}, nil, nil, err
}
}
if err := ch.Close(); err != nil {
return derive.ChannelID{}, nil, nil, err
}
var frames [][]byte
for {
var buf bytes.Buffer
buf.WriteByte(derive.DerivationVersion0)
err := ch.OutputFrame(&buf, maxFrameSize-1)
if err != io.EOF && err != nil {
return derive.ChannelID{}, nil, nil, err
}
frames = append(frames, buf.Bytes())
if err == io.EOF {
break
}
}
return ch.ID(), frames, blocks[i:], nil
} }
// nextTxData pops off s.datas & handles updating the internal state // nextTxData pops off s.datas & handles updating the internal state
func (s *channelManager) nextTxData() ([]byte, txID, error) { func (s *channelManager) nextTxData() ([]byte, txID, error) {
if len(s.datas) != 0 { if s.pendingChannel == nil || !s.pendingChannel.HasFrame() {
r := s.datas[0] s.log.Trace("no next tx data")
s.log.Trace("returning next tx data", "id", r.id)
s.pendingTransactions[r.id] = r.data
s.datas = s.datas[1:]
return r.data, r.id, nil
} else {
return nil, txID{}, io.EOF // TODO: not enough data error instead return nil, txID{}, io.EOF // TODO: not enough data error instead
} }
id, data := s.pendingChannel.NextFrame()
// prepend version byte for first frame of transaction
// TODO: more memory efficient solution; shouldn't be responsibility of
// channelBuilder though.
data = append([]byte{0}, data...)
s.log.Trace("returning next tx data", "id", id)
s.pendingTransactions[id] = data
return data, id, nil
} }
// TxData returns the next tx.data that should be submitted to L1. // TxData returns the next tx data that should be submitted to L1.
// It is very simple & currently ignores the l1Head provided (this will change). //
// It may buffer very large channels as well. // It currently only uses one frame per transaction. If the pending channel is
// full, it only returns the remaining frames of this channel until it got
// successfully fully sent to L1. It returns io.EOF if there's no pending frame.
//
// It currently ignores the l1Head provided and doesn't track channel timeouts
// or the sequencer window span yet.
func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) { func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
channelPending := s.pendingChannel != (derive.ChannelID{}) dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "channel_pending", channelPending, "block_count", len(s.blocks)) s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))
// Short circuit if there is a pending channel. // Short circuit if there is a pending frame.
// We either submit the next frame from that channel or if dataPending {
if channelPending {
return s.nextTxData() return s.nextTxData()
} }
// No pending frame, so we have to add new blocks to the channel
// If we have no saved blocks, we will not be able to create valid frames // If we have no saved blocks, we will not be able to create valid frames
if len(s.blocks) == 0 { if len(s.blocks) == 0 {
return nil, txID{}, io.EOF return nil, txID{}, io.EOF
} }
// Select range of blocks if err := s.ensurePendingChannel(l1Head); err != nil {
end := len(s.blocks)
if end > 100 {
end = 100
}
blocks := s.blocks[:end]
s.blocks = s.blocks[end:]
chID, frames, leftOverBlocks, err := blocksToFrames(blocks, 120_000)
// If the range of blocks serialized to be too large, restore
// blocks that could not be included inside the channel
if len(leftOverBlocks) != 0 {
s.blocks = append(leftOverBlocks, s.blocks...)
}
// TODO: Ensure that len(frames) < math.MaxUint16. Should not happen though. One tricky part
// is ensuring that s.blocks is properly restored.
if err != nil {
s.log.Warn("Failed to create channel from blocks", "err", err)
return nil, txID{}, err return nil, txID{}, err
} }
s.log.Info("Created channel", "chID", chID, "frame_count", len(frames), "l1Head", l1Head)
var t []taggedData if err := s.addBlocks(); err != nil {
for i, data := range frames { return nil, txID{}, err
t = append(t, taggedData{data: data, id: txID{chID: chID, frameNumber: uint16(i)}})
} }
// Load up pending state. Note: pending transactions is taken care of by nextTxData if err := s.pendingChannel.OutputFrames(); err != nil {
s.datas = t return nil, txID{}, fmt.Errorf("creating frames with channel builder: %w", err)
s.pendingChannel = chID }
s.pendingBlocks = blocks[:len(leftOverBlocks)]
return s.nextTxData() return s.nextTxData()
}
func (s *channelManager) ensurePendingChannel(l1Head eth.L1BlockRef) error {
if s.pendingChannel != nil {
return nil
}
cb, err := newChannelBuilder(s.cfg)
if err != nil {
return fmt.Errorf("creating new channel: %w", err)
}
s.pendingChannel = cb
s.log.Info("Created channel", "chID", cb.ID(), "l1Head", l1Head)
return nil
}
// addBlocks adds blocks from the blocks queue to the pending channel until
// either the queue got exhausted or the channel is full.
func (s *channelManager) addBlocks() error {
var blocksAdded int
var _chFullErr *ChannelFullError // throw away, just for type checking
for i, block := range s.blocks {
if err := s.pendingChannel.AddBlock(block); errors.As(err, &_chFullErr) {
// current block didn't get added because channel is already full
break
} else if err != nil {
return fmt.Errorf("adding block[%d] to channel builder: %w", i, err)
}
blocksAdded += 1
// current block got added but channel is now full
if s.pendingChannel.IsFull() {
break
}
}
s.log.Debug("Added blocks to channel",
"blocks_added", blocksAdded,
"channel_full", s.pendingChannel.IsFull(),
"blocks_pending", len(s.blocks)-blocksAdded,
"input_bytes", s.pendingChannel.InputBytes(),
)
if blocksAdded == len(s.blocks) {
// all blocks processed, reuse slice
s.blocks = s.blocks[:0]
} else {
// remove processed blocks
s.blocks = s.blocks[blocksAdded:]
}
return nil
} }
// AddL2Block saves an L2 block to the internal state. It returns ErrReorg // AddL2Block saves an L2 block to the internal state. It returns ErrReorg
// if the block does not extend the last block loaded into the state. // if the block does not extend the last block loaded into the state.
// If no block is already in the channel, the the parent hash check is skipped. // If no blocks were added yet, the parent hash check is skipped.
// TODO: Phantom last block b/c if the local state is fully drained we can reorg without realizing it.
func (s *channelManager) AddL2Block(block *types.Block) error { func (s *channelManager) AddL2Block(block *types.Block) error {
if len(s.blocks) > 0 { if s.tip != (common.Hash{}) && s.tip != block.ParentHash() {
if s.blocks[len(s.blocks)-1].Hash() != block.ParentHash() { return ErrReorg
return ErrReorg
}
} }
s.blocks = append(s.blocks, block) s.blocks = append(s.blocks, block)
s.tip = block.Hash()
return nil return nil
} }
...@@ -25,12 +25,6 @@ type Config struct { ...@@ -25,12 +25,6 @@ type Config struct {
// RollupRpc is the HTTP provider URL for the L2 rollup node. // RollupRpc is the HTTP provider URL for the L2 rollup node.
RollupRpc string RollupRpc string
// MinL1TxSize is the minimum size of a batch tx submitted to L1.
MinL1TxSize uint64
// MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64
// ChannelTimeout is the maximum amount of time to attempt completing an opened channel, // ChannelTimeout is the maximum amount of time to attempt completing an opened channel,
// as opposed to submitting missing blocks in new channels // as opposed to submitting missing blocks in new channels
ChannelTimeout uint64 ChannelTimeout uint64
...@@ -72,6 +66,19 @@ type Config struct { ...@@ -72,6 +66,19 @@ type Config struct {
/* Optional Params */ /* Optional Params */
// MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64
// TargetL1TxSize is the target size of a batch tx submitted to L1.
TargetL1TxSize uint64
// TargetNumFrames is the target number of frames per channel.
TargetNumFrames int
// ApproxComprRatio is the approximate compression ratio (<= 1.0) of the used
// compression algorithm.
ApproxComprRatio float64
LogConfig oplog.CLIConfig LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig MetricsConfig opmetrics.CLIConfig
...@@ -105,16 +112,20 @@ func (c Config) Check() error { ...@@ -105,16 +112,20 @@ func (c Config) Check() error {
func NewConfig(ctx *cli.Context) Config { func NewConfig(ctx *cli.Context) Config {
return Config{ return Config{
/* Required Flags */ /* Required Flags */
L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name), L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name), L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name),
RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name), RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name),
MinL1TxSize: ctx.GlobalUint64(flags.MinL1TxSizeBytesFlag.Name), ChannelTimeout: ctx.GlobalUint64(flags.ChannelTimeoutFlag.Name),
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
NumConfirmations: ctx.GlobalUint64(flags.NumConfirmationsFlag.Name),
SafeAbortNonceTooLowCount: ctx.GlobalUint64(flags.SafeAbortNonceTooLowCountFlag.Name),
ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name),
/* Optional Flags */
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name), MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
ChannelTimeout: ctx.GlobalUint64(flags.ChannelTimeoutFlag.Name), TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name), TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
NumConfirmations: ctx.GlobalUint64(flags.NumConfirmationsFlag.Name), ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
SafeAbortNonceTooLowCount: ctx.GlobalUint64(flags.SafeAbortNonceTooLowCountFlag.Name),
ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name),
Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name), Mnemonic: ctx.GlobalString(flags.MnemonicFlag.Name),
SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name), SequencerHDPath: ctx.GlobalString(flags.SequencerHDPathFlag.Name),
PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name), PrivateKey: ctx.GlobalString(flags.PrivateKeyFlag.Name),
......
...@@ -146,12 +146,16 @@ func NewBatchSubmitterWithSigner(cfg Config, addr common.Address, signer SignerF ...@@ -146,12 +146,16 @@ func NewBatchSubmitterWithSigner(cfg Config, addr common.Address, signer SignerF
L1Client: l1Client, L1Client: l1Client,
L2Client: l2Client, L2Client: l2Client,
RollupNode: rollupClient, RollupNode: rollupClient,
MinL1TxSize: cfg.MinL1TxSize,
MaxL1TxSize: cfg.MaxL1TxSize,
BatchInboxAddress: batchInboxAddress, BatchInboxAddress: batchInboxAddress,
ChannelTimeout: cfg.ChannelTimeout, Channel: ChannelConfig{
ChainID: chainID, ChannelTimeout: cfg.ChannelTimeout,
PollInterval: cfg.PollInterval, MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
TargetFrameSize: cfg.TargetL1TxSize - 1, // subtract 1 byte for version
TargetNumFrames: cfg.TargetNumFrames,
ApproxComprRatio: cfg.ApproxComprRatio,
},
ChainID: chainID,
PollInterval: cfg.PollInterval,
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
...@@ -162,7 +166,7 @@ func NewBatchSubmitterWithSigner(cfg Config, addr common.Address, signer SignerF ...@@ -162,7 +166,7 @@ func NewBatchSubmitterWithSigner(cfg Config, addr common.Address, signer SignerF
txMgr: NewTransactionManager(l, txManagerConfig, batchInboxAddress, chainID, addr, l1Client, signer(chainID)), txMgr: NewTransactionManager(l, txManagerConfig, batchInboxAddress, chainID, addr, l1Client, signer(chainID)),
done: make(chan struct{}), done: make(chan struct{}),
log: l, log: l,
state: NewChannelManager(l, cfg.ChannelTimeout), state: NewChannelManager(l, batcherCfg.Channel),
// TODO: this context only exists because the event loop doesn't reach done // TODO: this context only exists because the event loop doesn't reach done
// if the tx manager is blocking forever due to e.g. insufficient balance. // if the tx manager is blocking forever due to e.g. insufficient balance.
ctx: ctx, ctx: ctx,
......
...@@ -23,17 +23,11 @@ type DriverConfig struct { ...@@ -23,17 +23,11 @@ type DriverConfig struct {
RollupNode *sources.RollupClient RollupNode *sources.RollupClient
// Limit the size of txs
MinL1TxSize uint64
MaxL1TxSize uint64
// Where to send the batch txs to. // Where to send the batch txs to.
BatchInboxAddress common.Address BatchInboxAddress common.Address
// The batcher can decide to set it shorter than the actual timeout, // Channel creation parameters
// since submitting continued channel data to L1 is not instantaneous. Channel ChannelConfig
// It's not worth it to work with nearly timed-out channels.
ChannelTimeout uint64
// Chain ID of the L1 chain to submit txs to. // Chain ID of the L1 chain to submit txs to.
ChainID *big.Int ChainID *big.Int
......
...@@ -14,7 +14,7 @@ import ( ...@@ -14,7 +14,7 @@ import (
const envVarPrefix = "OP_BATCHER" const envVarPrefix = "OP_BATCHER"
var ( var (
/* Required Flags */ /* Required flags */
L1EthRpcFlag = cli.StringFlag{ L1EthRpcFlag = cli.StringFlag{
Name: "l1-eth-rpc", Name: "l1-eth-rpc",
...@@ -34,21 +34,9 @@ var ( ...@@ -34,21 +34,9 @@ var (
Required: true, Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ROLLUP_RPC"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ROLLUP_RPC"),
} }
MinL1TxSizeBytesFlag = cli.Uint64Flag{
Name: "min-l1-tx-size-bytes",
Usage: "The minimum size of a batch tx submitted to L1.",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "MIN_L1_TX_SIZE_BYTES"),
}
MaxL1TxSizeBytesFlag = cli.Uint64Flag{
Name: "max-l1-tx-size-bytes",
Usage: "The maximum size of a batch tx submitted to L1.",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "MAX_L1_TX_SIZE_BYTES"),
}
ChannelTimeoutFlag = cli.Uint64Flag{ ChannelTimeoutFlag = cli.Uint64Flag{
Name: "channel-timeout", Name: "channel-timeout",
Usage: "The maximum amount of time to attempt completing an opened channel, as opposed to submitting L2 blocks into a new channel.", Usage: "The maximum duration (in seconds) to attempt completing an opened channel, as opposed to submitting L2 blocks into a new channel.",
Required: true, Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "CHANNEL_TIMEOUT"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "CHANNEL_TIMEOUT"),
} }
...@@ -81,6 +69,39 @@ var ( ...@@ -81,6 +69,39 @@ var (
Required: true, Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "RESUBMISSION_TIMEOUT"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "RESUBMISSION_TIMEOUT"),
} }
SequencerBatchInboxAddressFlag = cli.StringFlag{
Name: "sequencer-batch-inbox-address",
Usage: "L1 Address to receive batch transactions",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SEQUENCER_BATCH_INBOX_ADDRESS"),
}
/* Optional flags */
MaxL1TxSizeBytesFlag = cli.Uint64Flag{
Name: "max-l1-tx-size-bytes",
Usage: "The maximum size of a batch tx submitted to L1.",
Value: 120_000,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "MAX_L1_TX_SIZE_BYTES"),
}
TargetL1TxSizeBytesFlag = cli.Uint64Flag{
Name: "target-l1-tx-size-bytes",
Usage: "The target size of a batch tx submitted to L1.",
Value: 100_000,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "TARGET_L1_TX_SIZE_BYTES"),
}
TargetNumFramesFlag = cli.IntFlag{
Name: "target-num-frames",
Usage: "The target number of frames to create per channel",
Value: 1,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "TARGET_NUM_FRAMES"),
}
ApproxComprRatioFlag = cli.Float64Flag{
Name: "approx-compr-ratio",
Usage: "The approximate compression ratio (<= 1.0)",
Value: 1.0,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "APPROX_COMPR_RATIO"),
}
MnemonicFlag = cli.StringFlag{ MnemonicFlag = cli.StringFlag{
Name: "mnemonic", Name: "mnemonic",
Usage: "The mnemonic used to derive the wallets for either the " + Usage: "The mnemonic used to derive the wallets for either the " +
...@@ -98,20 +119,12 @@ var ( ...@@ -98,20 +119,12 @@ var (
Usage: "The private key to use with the l2output wallet. Must not be used with mnemonic.", Usage: "The private key to use with the l2output wallet. Must not be used with mnemonic.",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "PRIVATE_KEY"), EnvVar: opservice.PrefixEnvVar(envVarPrefix, "PRIVATE_KEY"),
} }
SequencerBatchInboxAddressFlag = cli.StringFlag{
Name: "sequencer-batch-inbox-address",
Usage: "L1 Address to receive batch transactions",
Required: true,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SEQUENCER_BATCH_INBOX_ADDRESS"),
}
) )
var requiredFlags = []cli.Flag{ var requiredFlags = []cli.Flag{
L1EthRpcFlag, L1EthRpcFlag,
L2EthRpcFlag, L2EthRpcFlag,
RollupRpcFlag, RollupRpcFlag,
MinL1TxSizeBytesFlag,
MaxL1TxSizeBytesFlag,
ChannelTimeoutFlag, ChannelTimeoutFlag,
PollIntervalFlag, PollIntervalFlag,
NumConfirmationsFlag, NumConfirmationsFlag,
...@@ -121,6 +134,10 @@ var requiredFlags = []cli.Flag{ ...@@ -121,6 +134,10 @@ var requiredFlags = []cli.Flag{
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
MaxL1TxSizeBytesFlag,
TargetL1TxSizeBytesFlag,
TargetNumFramesFlag,
ApproxComprRatioFlag,
MnemonicFlag, MnemonicFlag,
SequencerHDPathFlag, SequencerHDPathFlag,
PrivateKeyFlag, PrivateKeyFlag,
......
...@@ -54,11 +54,11 @@ type Writer interface { ...@@ -54,11 +54,11 @@ type Writer interface {
type ChannelOutIface interface { type ChannelOutIface interface {
ID() derive.ChannelID ID() derive.ChannelID
Reset() error Reset() error
AddBlock(block *types.Block) error AddBlock(block *types.Block) (uint64, error)
ReadyBytes() int ReadyBytes() int
Flush() error Flush() error
Close() error Close() error
OutputFrame(w *bytes.Buffer, maxSize uint64) error OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error)
} }
// Compile-time check for ChannelOutIface interface implementation for the ChannelOut type. // Compile-time check for ChannelOutIface interface implementation for the ChannelOut type.
...@@ -135,19 +135,19 @@ func (co *GarbageChannelOut) Reset() error { ...@@ -135,19 +135,19 @@ func (co *GarbageChannelOut) Reset() error {
// error that it returns is ErrTooManyRLPBytes. If this error // error that it returns is ErrTooManyRLPBytes. If this error
// is returned, the channel should be closed and a new one // is returned, the channel should be closed and a new one
// should be made. // should be made.
func (co *GarbageChannelOut) AddBlock(block *types.Block) error { func (co *GarbageChannelOut) AddBlock(block *types.Block) (uint64, error) {
if co.closed { if co.closed {
return errors.New("already closed") return 0, errors.New("already closed")
} }
batch, err := blockToBatch(block) batch, err := blockToBatch(block)
if err != nil { if err != nil {
return err return 0, err
} }
// We encode to a temporary buffer to determine the encoded length to // We encode to a temporary buffer to determine the encoded length to
// ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL // ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
var buf bytes.Buffer var buf bytes.Buffer
if err := rlp.Encode(&buf, batch); err != nil { if err := rlp.Encode(&buf, batch); err != nil {
return err return 0, err
} }
if co.cfg.malformRLP { if co.cfg.malformRLP {
// Malform the RLP by incrementing the length prefix by 1. // Malform the RLP by incrementing the length prefix by 1.
...@@ -157,13 +157,13 @@ func (co *GarbageChannelOut) AddBlock(block *types.Block) error { ...@@ -157,13 +157,13 @@ func (co *GarbageChannelOut) AddBlock(block *types.Block) error {
buf.Write(bufBytes) buf.Write(bufBytes)
} }
if co.rlpLength+buf.Len() > derive.MaxRLPBytesPerChannel { if co.rlpLength+buf.Len() > derive.MaxRLPBytesPerChannel {
return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", return 0, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
buf.Len(), co.rlpLength, derive.MaxRLPBytesPerChannel, derive.ErrTooManyRLPBytes) buf.Len(), co.rlpLength, derive.MaxRLPBytesPerChannel, derive.ErrTooManyRLPBytes)
} }
co.rlpLength += buf.Len() co.rlpLength += buf.Len()
_, err = io.Copy(co.compress, &buf) written, err := io.Copy(co.compress, &buf)
return err return uint64(written), err
} }
// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame. // ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
...@@ -192,11 +192,12 @@ func (co *GarbageChannelOut) Close() error { ...@@ -192,11 +192,12 @@ func (co *GarbageChannelOut) Close() error {
// Returns io.EOF when the channel is closed & there are no more frames // Returns io.EOF when the channel is closed & there are no more frames
// Returns nil if there is still more buffered data. // Returns nil if there is still more buffered data.
// Returns and error if it ran into an error during processing. // Returns and error if it ran into an error during processing.
func (co *GarbageChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error { func (co *GarbageChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) {
f := derive.Frame{ f := derive.Frame{
ID: co.id, ID: co.id,
FrameNumber: uint16(co.frame), FrameNumber: uint16(co.frame),
} }
fn := f.FrameNumber
// Copy data from the local buffer into the frame data buffer // Copy data from the local buffer into the frame data buffer
// Don't go past the maxSize with the fixed frame overhead. // Don't go past the maxSize with the fixed frame overhead.
...@@ -214,18 +215,18 @@ func (co *GarbageChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error ...@@ -214,18 +215,18 @@ func (co *GarbageChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error
f.Data = make([]byte, maxDataSize) f.Data = make([]byte, maxDataSize)
if _, err := io.ReadFull(&co.buf, f.Data); err != nil { if _, err := io.ReadFull(&co.buf, f.Data); err != nil {
return err return fn, err
} }
if err := f.MarshalBinary(w); err != nil { if err := f.MarshalBinary(w); err != nil {
return err return fn, err
} }
co.frame += 1 co.frame += 1
if f.IsLast { if f.IsLast {
return io.EOF return fn, io.EOF
} else { } else {
return nil return fn, nil
} }
} }
......
...@@ -142,7 +142,7 @@ func (s *L2Batcher) ActL2BatchBuffer(t Testing) { ...@@ -142,7 +142,7 @@ func (s *L2Batcher) ActL2BatchBuffer(t Testing) {
s.l2BufferedBlock = syncStatus.SafeL2.ID() s.l2BufferedBlock = syncStatus.SafeL2.ID()
s.l2ChannelOut = nil s.l2ChannelOut = nil
} }
if err := s.l2ChannelOut.AddBlock(block); err != nil { // should always succeed if _, err := s.l2ChannelOut.AddBlock(block); err != nil { // should always succeed
t.Fatalf("failed to add block to channel: %v", err) t.Fatalf("failed to add block to channel: %v", err)
} }
s.l2BufferedBlock = eth.ToBlockID(block) s.l2BufferedBlock = eth.ToBlockID(block)
...@@ -168,7 +168,7 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing) { ...@@ -168,7 +168,7 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing) {
data := new(bytes.Buffer) data := new(bytes.Buffer)
data.WriteByte(derive.DerivationVersion0) data.WriteByte(derive.DerivationVersion0)
// subtract one, to account for the version byte // subtract one, to account for the version byte
if err := s.l2ChannelOut.OutputFrame(data, s.l2BatcherCfg.MaxL1TxSize-1); err == io.EOF { if _, err := s.l2ChannelOut.OutputFrame(data, s.l2BatcherCfg.MaxL1TxSize-1); err == io.EOF {
s.l2ChannelOut = nil s.l2ChannelOut = nil
s.l2Submitting = false s.l2Submitting = false
} else if err != nil { } else if err != nil {
...@@ -218,7 +218,7 @@ func (s *L2Batcher) ActL2BatchSubmitGarbage(t Testing, kind GarbageKind) { ...@@ -218,7 +218,7 @@ func (s *L2Batcher) ActL2BatchSubmitGarbage(t Testing, kind GarbageKind) {
data.WriteByte(derive.DerivationVersion0) data.WriteByte(derive.DerivationVersion0)
// subtract one, to account for the version byte // subtract one, to account for the version byte
if err := s.l2ChannelOut.OutputFrame(data, s.l2BatcherCfg.MaxL1TxSize-1); err == io.EOF { if _, err := s.l2ChannelOut.OutputFrame(data, s.l2BatcherCfg.MaxL1TxSize-1); err == io.EOF {
s.l2ChannelOut = nil s.l2ChannelOut = nil
s.l2Submitting = false s.l2Submitting = false
} else if err != nil { } else if err != nil {
......
...@@ -323,8 +323,10 @@ func TestMigration(t *testing.T) { ...@@ -323,8 +323,10 @@ func TestMigration(t *testing.T) {
L1EthRpc: forkedL1URL, L1EthRpc: forkedL1URL,
L2EthRpc: gethNode.WSEndpoint(), L2EthRpc: gethNode.WSEndpoint(),
RollupRpc: rollupNode.HTTPEndpoint(), RollupRpc: rollupNode.HTTPEndpoint(),
MinL1TxSize: 1, MaxL1TxSize: 120_000,
MaxL1TxSize: 120000, TargetL1TxSize: 1,
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
ChannelTimeout: deployCfg.ChannelTimeout, ChannelTimeout: deployCfg.ChannelTimeout,
PollInterval: 50 * time.Millisecond, PollInterval: 50 * time.Millisecond,
NumConfirmations: 1, NumConfirmations: 1,
......
...@@ -526,8 +526,10 @@ func (cfg SystemConfig) Start() (*System, error) { ...@@ -526,8 +526,10 @@ func (cfg SystemConfig) Start() (*System, error) {
L1EthRpc: sys.Nodes["l1"].WSEndpoint(), L1EthRpc: sys.Nodes["l1"].WSEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(), L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(), RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MinL1TxSize: 1, MaxL1TxSize: 120_000,
MaxL1TxSize: 120000, TargetL1TxSize: 1,
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
ChannelTimeout: cfg.DeployConfig.ChannelTimeout, ChannelTimeout: cfg.DeployConfig.ChannelTimeout,
PollInterval: 50 * time.Millisecond, PollInterval: 50 * time.Millisecond,
NumConfirmations: 1, NumConfirmations: 1,
......
...@@ -883,7 +883,7 @@ func TestWithdrawals(t *testing.T) { ...@@ -883,7 +883,7 @@ func TestWithdrawals(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
// Get l2BlockNumber for proof generation // Get l2BlockNumber for proof generation
ctx, cancel = context.WithTimeout(context.Background(), 20*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second) ctx, cancel = context.WithTimeout(context.Background(), 30*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second)
defer cancel() defer cancel()
blockNumber, err := withdrawals.WaitForFinalizationPeriod(ctx, l1Client, predeploys.DevOptimismPortalAddr, receipt.BlockNumber) blockNumber, err := withdrawals.WaitForFinalizationPeriod(ctx, l1Client, predeploys.DevOptimismPortalAddr, receipt.BlockNumber)
require.Nil(t, err) require.Nil(t, err)
...@@ -933,7 +933,7 @@ func TestWithdrawals(t *testing.T) { ...@@ -933,7 +933,7 @@ func TestWithdrawals(t *testing.T) {
require.Equal(t, types.ReceiptStatusSuccessful, proveReceipt.Status) require.Equal(t, types.ReceiptStatusSuccessful, proveReceipt.Status)
// Wait for finalization and then create the Finalized Withdrawal Transaction // Wait for finalization and then create the Finalized Withdrawal Transaction
ctx, cancel = context.WithTimeout(context.Background(), 20*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second) ctx, cancel = context.WithTimeout(context.Background(), 30*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second)
defer cancel() defer cancel()
_, err = withdrawals.WaitForFinalizationPeriod(ctx, l1Client, predeploys.DevOptimismPortalAddr, header.Number) _, err = withdrawals.WaitForFinalizationPeriod(ctx, l1Client, predeploys.DevOptimismPortalAddr, header.Number)
require.Nil(t, err) require.Nil(t, err)
...@@ -1052,10 +1052,10 @@ func TestFees(t *testing.T) { ...@@ -1052,10 +1052,10 @@ func TestFees(t *testing.T) {
err = l2Seq.SendTransaction(context.Background(), tx) err = l2Seq.SendTransaction(context.Background(), tx)
require.Nil(t, err, "Sending L2 tx to sequencer") require.Nil(t, err, "Sending L2 tx to sequencer")
_, err = waitForTransaction(tx.Hash(), l2Seq, 3*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second) _, err = waitForTransaction(tx.Hash(), l2Seq, 4*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on sequencer") require.Nil(t, err, "Waiting for L2 tx on sequencer")
receipt, err := waitForTransaction(tx.Hash(), l2Verif, 3*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second) receipt, err := waitForTransaction(tx.Hash(), l2Verif, 4*time.Duration(cfg.DeployConfig.L1BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on verifier") require.Nil(t, err, "Waiting for L2 tx on verifier")
require.Equal(t, types.ReceiptStatusSuccessful, receipt.Status, "TX should have succeeded") require.Equal(t, types.ReceiptStatusSuccessful, receipt.Status, "TX should have succeeded")
......
...@@ -64,39 +64,40 @@ func (co *ChannelOut) Reset() error { ...@@ -64,39 +64,40 @@ func (co *ChannelOut) Reset() error {
co.compress.Reset(&co.buf) co.compress.Reset(&co.buf)
co.closed = false co.closed = false
_, err := rand.Read(co.id[:]) _, err := rand.Read(co.id[:])
if err != nil { return err
return err
}
return nil
} }
// AddBlock adds a block to the channel. It returns an error // AddBlock adds a block to the channel. It returns the RLP encoded byte size
// if there is a problem adding the block. The only sentinel // and an error if there is a problem adding the block. The only sentinel error
// error that it returns is ErrTooManyRLPBytes. If this error // that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// is returned, the channel should be closed and a new one // should be closed and a new one should be made.
// should be made. func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) {
func (co *ChannelOut) AddBlock(block *types.Block) error {
if co.closed { if co.closed {
return errors.New("already closed") return 0, errors.New("already closed")
} }
batch, err := blockToBatch(block) batch, err := blockToBatch(block)
if err != nil { if err != nil {
return err return 0, err
} }
// We encode to a temporary buffer to determine the encoded length to // We encode to a temporary buffer to determine the encoded length to
// ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL // ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
var buf bytes.Buffer var buf bytes.Buffer
if err := rlp.Encode(&buf, batch); err != nil { if err := rlp.Encode(&buf, batch); err != nil {
return err return 0, err
} }
if co.rlpLength+buf.Len() > MaxRLPBytesPerChannel { if co.rlpLength+buf.Len() > MaxRLPBytesPerChannel {
return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w", return 0, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
buf.Len(), co.rlpLength, MaxRLPBytesPerChannel, ErrTooManyRLPBytes) buf.Len(), co.rlpLength, MaxRLPBytesPerChannel, ErrTooManyRLPBytes)
} }
co.rlpLength += buf.Len() co.rlpLength += buf.Len()
_, err = io.Copy(co.compress, &buf) written, err := io.Copy(co.compress, &buf)
return err return uint64(written), err
}
// InputBytes returns the total amount of RLP-encoded input bytes.
func (co *ChannelOut) InputBytes() int {
return co.rlpLength
} }
// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame. // ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
...@@ -120,12 +121,13 @@ func (co *ChannelOut) Close() error { ...@@ -120,12 +121,13 @@ func (co *ChannelOut) Close() error {
return co.compress.Close() return co.compress.Close()
} }
// OutputFrame writes a frame to w with a given max size // OutputFrame writes a frame to w with a given max size and returns the frame
// number.
// Use `ReadyBytes`, `Flush`, and `Close` to modify the ready buffer. // Use `ReadyBytes`, `Flush`, and `Close` to modify the ready buffer.
// Returns io.EOF when the channel is closed & there are no more frames // Returns io.EOF when the channel is closed & there are no more frames
// Returns nil if there is still more buffered data. // Returns nil if there is still more buffered data.
// Returns and error if it ran into an error during processing. // Returns and error if it ran into an error during processing.
func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error { func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) {
f := Frame{ f := Frame{
ID: co.id, ID: co.id,
FrameNumber: uint16(co.frame), FrameNumber: uint16(co.frame),
...@@ -133,9 +135,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error { ...@@ -133,9 +135,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error {
// Copy data from the local buffer into the frame data buffer // Copy data from the local buffer into the frame data buffer
// Don't go past the maxSize with the fixed frame overhead. // Don't go past the maxSize with the fixed frame overhead.
// Fixed overhead: 32 + 8 + 2 + 4 + 1 = 47 bytes. // Fixed overhead: 16 + 2 + 4 + 1 = 23 bytes.
// Add one extra byte for the version byte (for the entire L1 tx though) maxDataSize := maxSize - 23
maxDataSize := maxSize - 47 - 1
if maxDataSize > uint64(co.buf.Len()) { if maxDataSize > uint64(co.buf.Len()) {
maxDataSize = uint64(co.buf.Len()) maxDataSize = uint64(co.buf.Len())
// If we are closed & will not spill past the current frame // If we are closed & will not spill past the current frame
...@@ -147,18 +148,19 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error { ...@@ -147,18 +148,19 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error {
f.Data = make([]byte, maxDataSize) f.Data = make([]byte, maxDataSize)
if _, err := io.ReadFull(&co.buf, f.Data); err != nil { if _, err := io.ReadFull(&co.buf, f.Data); err != nil {
return err return 0, err
} }
if err := f.MarshalBinary(w); err != nil { if err := f.MarshalBinary(w); err != nil {
return err return 0, err
} }
co.frame += 1 co.frame += 1
fn := f.FrameNumber
if f.IsLast { if f.IsLast {
return io.EOF return fn, io.EOF
} else { } else {
return nil return fn, nil
} }
} }
......
...@@ -22,7 +22,7 @@ func TestChannelOutAddBlock(t *testing.T) { ...@@ -22,7 +22,7 @@ func TestChannelOutAddBlock(t *testing.T) {
}, },
nil, nil,
) )
err := cout.AddBlock(block) _, err := cout.AddBlock(block)
require.Error(t, err) require.Error(t, err)
require.Equal(t, ErrNotDepositTx, err) require.Equal(t, ErrNotDepositTx, err)
}) })
......
...@@ -119,8 +119,10 @@ services: ...@@ -119,8 +119,10 @@ services:
OP_BATCHER_L1_ETH_RPC: http://l1:8545 OP_BATCHER_L1_ETH_RPC: http://l1:8545
OP_BATCHER_L2_ETH_RPC: http://l2:8545 OP_BATCHER_L2_ETH_RPC: http://l2:8545
OP_BATCHER_ROLLUP_RPC: http://op-node:8545 OP_BATCHER_ROLLUP_RPC: http://op-node:8545
OP_BATCHER_MIN_L1_TX_SIZE_BYTES: 1
OP_BATCHER_MAX_L1_TX_SIZE_BYTES: 120000 OP_BATCHER_MAX_L1_TX_SIZE_BYTES: 120000
OP_BATCHER_TARGET_L1_TX_SIZE_BYTES: 624
OP_BATCHER_TARGET_NUM_FRAMES: 1
OP_BATCHER_APPROX_COMPR_RATIO: 1.0
OP_BATCHER_CHANNEL_TIMEOUT: 40 OP_BATCHER_CHANNEL_TIMEOUT: 40
OP_BATCHER_POLL_INTERVAL: 1s OP_BATCHER_POLL_INTERVAL: 1s
OP_BATCHER_NUM_CONFIRMATIONS: 1 OP_BATCHER_NUM_CONFIRMATIONS: 1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment