Commit 11a1976f authored by Sebastian Stammler's avatar Sebastian Stammler

op-batcher: Rework batcher to fill up txs

Created an abstraction around channel creation, channelBuilder, that
tracks the amount of input rlp bytes and then decides when to close the
channel based on an estimated compression ratio.
parent b5eb40de
package batcher
import (
"bytes"
"errors"
"fmt"
"io"
"math"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types"
)
type (
// channelBuilder uses a ChannelOut to create a channel with output frame
// size approximation.
channelBuilder struct {
cfg ChannelConfig
// tracks total input data rlp bytes
inputSize uint64
// marked as full if a) max RLP input bytes, b) max num frames or c) max
// allowed frame index (uint16) has been reached
fullErr error
// current channel
co *derive.ChannelOut
// list of blocks in the channel. Saved in case the channel must be rebuilt
blocks []*types.Block
// frames data queue, to be send as txs
frames []taggedData
}
ChannelConfig struct {
// ChannelTimeout is the maximum duration, in seconds, to attempt completing
// an opened channel. The batcher can decide to set it shorter than the
// actual timeout, since submitting continued channel data to L1 is not
// instantaneous. It's not worth it to work with nearly timed-out channels.
ChannelTimeout uint64
// The maximum byte-size a frame can have.
MaxFrameSize uint64
// The target number of frames to create per channel. Note that if the
// realized compression ratio is worse than the approximate, more frames may
// actually be created. This also depends on how close TargetFrameSize is to
// MaxFrameSize.
TargetFrameSize uint64
// The target number of frames to create in this channel. If the realized
// compression ratio is worse than approxComprRatio, additional leftover
// frame(s) might get created.
TargetNumFrames int
// Approximated compression ratio to assume. Should be slightly smaller than
// average from experiments to avoid the chances of creating a small
// additional leftover frame.
ApproxComprRatio float64
}
ChannelFullError struct {
Err error
}
)
func (e *ChannelFullError) Error() string {
return "channel full: " + e.Err.Error()
}
func (e *ChannelFullError) Unwrap() error {
return e.Err
}
var (
ErrInputTargetReached = errors.New("target amount of input data reached")
ErrMaxFrameIndex = errors.New("max frame index reached (uint16)")
)
func (c ChannelConfig) InputThreshold() uint64 {
return uint64(float64(c.TargetNumFrames) * float64(c.TargetFrameSize) / c.ApproxComprRatio)
}
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) {
co, err := derive.NewChannelOut()
if err != nil {
return nil, err
}
return &channelBuilder{
cfg: cfg,
co: co,
}, nil
}
func (c *channelBuilder) ID() derive.ChannelID {
return c.co.ID()
}
func (c *channelBuilder) Blocks() []*types.Block {
return c.blocks
}
func (c *channelBuilder) Reset() error {
c.inputSize = 0
c.blocks = c.blocks[:0]
c.frames = c.frames[:0]
return c.co.Reset()
}
// AddBlock adds a block to the channel compression pipeline. InputTargetReached
// should be called aftewards to test whether the target input data amount has
// been reached. In this case, a new channel must be started.
//
// AddBlock returns ErrInputTargetReached if called despite having reached the
// input data target.
// It returns ErrTooManyRLPBytes if the channel cannot take more input
// data. Then, too, a new channel has to be started.
// It returns
//
// Call OutputFrames() afterwards to create frames.
func (c *channelBuilder) AddBlock(block *types.Block) error {
if c.IsFull() {
return c.FullErr()
}
rlpsize, err := c.co.AddBlock(block)
if errors.Is(err, derive.ErrTooManyRLPBytes) {
c.setFullErr(err)
return c.FullErr()
} else if err != nil {
return fmt.Errorf("adding block to channel out: %w", err)
}
c.inputSize += rlpsize
c.blocks = append(c.blocks, block)
if c.InputTargetReached() {
c.setFullErr(ErrInputTargetReached)
// Adding this block still worked, so don't return error, just mark as full
}
return nil
}
// InputTargetReached says whether the target amount of input data has been
// reached in this channel builder. No more blocks can be added afterwards.
func (c *channelBuilder) InputTargetReached() bool {
return c.inputSize >= c.cfg.InputThreshold()
}
// IsFull returns whether the channel is full.
func (c *channelBuilder) IsFull() bool {
return c.fullErr != nil
}
// FullErr returns the reason why the channel is full.
func (c *channelBuilder) FullErr() error {
return c.fullErr
}
func (c *channelBuilder) setFullErr(err error) {
c.fullErr = &ChannelFullError{Err: err}
}
// OutputFrames creates new frames with the channel out. It should be called
// after AddBlock and before iterating over available frames with HasFrame and
// NextFrame.
//
// If the input data target hasn't been reached yet, it will conservatively only
// pull readily available frames from the compression output.
// If the target has been reached, the channel is closed and all remaining
// frames will be created, possibly with a small leftover frame.
func (c *channelBuilder) OutputFrames() error {
if c.InputTargetReached() {
return c.closeAndOutputAllFrames()
}
return c.outputReadyFrames()
}
// outputReadyFrames creates new frames as long as there's enough data ready in
// the channel out compression pipeline.
//
// This is part of an optimization to already generate frames and send them off
// as txs while still collecting blocks in the channel builder.
func (c *channelBuilder) outputReadyFrames() error {
// TODO: Decide whether we want to fill frames to max size and use target
// only for estimation, or use target size.
for c.co.ReadyBytes() >= int(c.cfg.MaxFrameSize) {
if err := c.outputFrame(); err == io.EOF {
return nil
} else if err != nil {
return err
}
}
return nil
}
func (c *channelBuilder) closeAndOutputAllFrames() error {
if err := c.co.Close(); err != nil {
return fmt.Errorf("closing channel out: %w", err)
}
for {
if err := c.outputFrame(); err == io.EOF {
return nil
} else if err != nil {
return err
}
}
}
func (c *channelBuilder) outputFrame() error {
var buf bytes.Buffer
fn, err := c.co.OutputFrame(&buf, c.cfg.MaxFrameSize)
if err != io.EOF && err != nil {
return fmt.Errorf("writing frame[%d]: %w", fn, err)
}
// Mark as full if max index reached
// TODO: If there's still data in the compression pipeline of the channel out,
// we would miss it and the whole channel would be broken because the last
// frames would never be generated...
// Hitting the max index is impossible with current parameters, so ignore for
// now. Note that in order to properly catch this, we'd need to call Flush
// after every block addition to estimate how many more frames are coming.
if fn == math.MaxUint16 {
c.setFullErr(ErrMaxFrameIndex)
}
frame := taggedData{
id: txID{chID: c.co.ID(), frameNumber: fn},
data: buf.Bytes(),
}
c.frames = append(c.frames, frame)
return err // possibly io.EOF (last frame)
}
// HasFrame returns whether there's any available frame. If true, it can be
// popped using NextFrame().
//
// Call OutputFrames before to create new frames from the channel out
// compression pipeline.
func (c *channelBuilder) HasFrame() bool {
return len(c.frames) > 0
}
func (c *channelBuilder) NumFrames() int {
return len(c.frames)
}
// NextFrame returns the next available frame.
// HasFrame must be called prio to check if there's a next frame available.
// Panics if called when there's no next frame.
func (c *channelBuilder) NextFrame() (txID, []byte) {
if len(c.frames) == 0 {
panic("no next frame")
}
f := c.frames[0]
c.frames = c.frames[1:]
return f.id, f.data
}
// PushFrame adds the frame back to the internal frames queue. Panics if not of
// the same channel.
func (c *channelBuilder) PushFrame(id txID, frame []byte) {
if id.chID != c.ID() {
panic("wrong channel")
}
c.frames = append(c.frames, taggedData{id: id, data: frame})
}
package batcher
import (
"bytes"
"errors"
"fmt"
"io"
......@@ -51,13 +50,11 @@ type channelManager struct {
// All blocks since the last request for new tx data.
blocks []*types.Block
datas []taggedData
// Pending data returned by TxData waiting on Tx Confirmed/Failed
// id of the pending channel
pendingChannel derive.ChannelID
// list of blocks in the channel. Saved in case the channel must be rebuilt
pendingBlocks []*types.Block
// pending channel builder
pendingChannel *channelBuilder
// Set of unconfirmed txID -> frame data. For tx resubmission
pendingTransactions map[txID][]byte
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
......@@ -78,7 +75,7 @@ func NewChannelManager(log log.Logger, cfg ChannelConfig) *channelManager {
func (s *channelManager) Clear() {
s.log.Trace("clearing channel manager state")
s.blocks = s.blocks[:0]
s.datas = s.datas[:0]
s.clearPendingChannel()
}
// TxFailed records a transaction as failed. It will attempt to resubmit the data
......@@ -86,10 +83,10 @@ func (s *channelManager) Clear() {
func (s *channelManager) TxFailed(id txID) {
if data, ok := s.pendingTransactions[id]; ok {
s.log.Trace("marked transaction as failed", "id", id)
s.datas = append(s.datas, taggedData{data, id})
s.pendingChannel.PushFrame(id, data)
delete(s.pendingTransactions, id)
} else {
s.log.Info("marked transaction as failed despite having no record of it.", "id", id)
s.log.Warn("unknown frame transaction marked as failed", "id", id)
}
}
......@@ -112,7 +109,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
// and then reset this state so it can try to build a new channel.
if s.pendingChannelIsTimedOut() {
s.log.Warn("Channel timed out", "chID", s.pendingChannel)
s.blocks = append(s.pendingBlocks, s.blocks...)
s.blocks = append(s.pendingChannel.Blocks(), s.blocks...)
s.clearPendingChannel()
}
// If we are done with this channel, record that.
......@@ -125,8 +122,7 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) {
// clearPendingChannel resets all pending state back to an initialized but empty state.
// TODO: Create separate "pending" state
func (s *channelManager) clearPendingChannel() {
s.pendingChannel = derive.ChannelID{}
s.pendingBlocks = nil
s.pendingChannel = nil
s.pendingTransactions = make(map[txID][]byte)
s.confirmedTransactions = make(map[txID]eth.BlockID)
}
......@@ -135,7 +131,7 @@ func (s *channelManager) clearPendingChannel() {
// A channel has timed out if the difference in L1 Inclusion blocks between
// the first & last included block is greater than or equal to the channel timeout.
func (s *channelManager) pendingChannelIsTimedOut() bool {
if s.pendingChannel == (derive.ChannelID{}) {
if s.pendingChannel == nil {
return false // no channel to be timed out
}
// No confirmed transactions => not timed out
......@@ -153,118 +149,107 @@ func (s *channelManager) pendingChannelIsTimedOut() bool {
max = inclusionBlock.Number
}
}
return max-min >= s.channelTimeout
return max-min >= s.cfg.ChannelTimeout
}
// pendingChannelIsFullySubmitted returns true if the channel has been fully submitted.
func (s *channelManager) pendingChannelIsFullySubmitted() bool {
if s.pendingChannel == (derive.ChannelID{}) {
if s.pendingChannel == nil {
return false // todo: can decide either way here. Nonsensical answer though
}
return len(s.pendingTransactions)+len(s.datas) == 0
}
// blocksToFrames turns a set of blocks into a set of frames inside a channel.
// It will only create a single channel which contains up to `MAX_RLP_BYTES`. Any
// blocks not added to the channel are returned. It uses the max supplied frame size.
func blocksToFrames(blocks []*types.Block, maxFrameSize uint64) (derive.ChannelID, [][]byte, []*types.Block, error) {
ch, err := derive.NewChannelOut()
if err != nil {
return derive.ChannelID{}, nil, nil, err
}
i := 0
for ; i < len(blocks); i++ {
if err := ch.AddBlock(blocks[i]); err == derive.ErrTooManyRLPBytes {
break
} else if err != nil {
return derive.ChannelID{}, nil, nil, err
}
}
if err := ch.Close(); err != nil {
return derive.ChannelID{}, nil, nil, err
}
var frames [][]byte
for {
var buf bytes.Buffer
buf.WriteByte(derive.DerivationVersion0)
err := ch.OutputFrame(&buf, maxFrameSize-1)
if err != io.EOF && err != nil {
return derive.ChannelID{}, nil, nil, err
}
frames = append(frames, buf.Bytes())
if err == io.EOF {
break
}
}
return ch.ID(), frames, blocks[i:], nil
return s.pendingChannel.IsFull() && len(s.pendingTransactions)+s.pendingChannel.NumFrames() == 0
}
// nextTxData pops off s.datas & handles updating the internal state
func (s *channelManager) nextTxData() ([]byte, txID, error) {
if len(s.datas) != 0 {
r := s.datas[0]
s.log.Trace("returning next tx data", "id", r.id)
s.pendingTransactions[r.id] = r.data
s.datas = s.datas[1:]
return r.data, r.id, nil
} else {
if s.pendingChannel == nil || !s.pendingChannel.HasFrame() {
return nil, txID{}, io.EOF // TODO: not enough data error instead
}
id, data := s.pendingChannel.NextFrame()
if id.frameNumber == 0 {
// prepend version byte for first frame
// TODO: more memory efficient solution; shouldn't be responsibility of
// channelBuilder though.
data = append([]byte{0}, data...)
}
s.log.Trace("returning next tx data", "id", id)
s.pendingTransactions[id] = data
return data, id, nil
}
// TxData returns the next tx.data that should be submitted to L1.
// It is very simple & currently ignores the l1Head provided (this will change).
// It may buffer very large channels as well.
func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
channelPending := s.pendingChannel != (derive.ChannelID{})
s.log.Debug("Requested tx data", "l1Head", l1Head, "channel_pending", channelPending, "block_count", len(s.blocks))
dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "block_count", len(s.blocks))
// Short circuit if there is a pending channel.
// We either submit the next frame from that channel or
if channelPending {
// Short circuit if there is a pending frame.
if dataPending {
return s.nextTxData()
}
// No pending frame, so we have to add new blocks to the channel
// If we have no saved blocks, we will not be able to create valid frames
if len(s.blocks) == 0 {
return nil, txID{}, io.EOF
}
// Select range of blocks
end := len(s.blocks)
if end > 100 {
end = 100
}
blocks := s.blocks[:end]
s.blocks = s.blocks[end:]
chID, frames, leftOverBlocks, err := blocksToFrames(blocks, 120_000)
// If the range of blocks serialized to be too large, restore
// blocks that could not be included inside the channel
if len(leftOverBlocks) != 0 {
s.blocks = append(leftOverBlocks, s.blocks...)
}
// TODO: Ensure that len(frames) < math.MaxUint16. Should not happen though. One tricky part
// is ensuring that s.blocks is properly restored.
if err != nil {
s.log.Warn("Failed to create channel from blocks", "err", err)
if err := s.ensurePendingChannel(l1Head); err != nil {
return nil, txID{}, err
}
s.log.Info("Created channel", "chID", chID, "frame_count", len(frames), "l1Head", l1Head)
var t []taggedData
for i, data := range frames {
t = append(t, taggedData{data: data, id: txID{chID: chID, frameNumber: uint16(i)}})
if err := s.addBlocks(); err != nil {
return nil, txID{}, err
}
// Load up pending state. Note: pending transactions is taken care of by nextTxData
s.datas = t
s.pendingChannel = chID
s.pendingBlocks = blocks[:len(leftOverBlocks)]
if err := s.pendingChannel.OutputFrames(); err != nil {
return nil, txID{}, fmt.Errorf("create frames with channel builder: %w", err)
}
return s.nextTxData()
}
func (s *channelManager) ensurePendingChannel(l1Head eth.L1BlockRef) error {
if s.pendingChannel != nil {
return nil
}
cb, err := newChannelBuilder(s.cfg)
if err != nil {
return fmt.Errorf("creating new channel: %w", err)
}
s.pendingChannel = cb
s.log.Info("Created channel", "chID", cb.ID(), "l1Head", l1Head)
return nil
}
func (s *channelManager) addBlocks() error {
var (
blockidx int
channelFull bool
)
for ; blockidx < len(s.blocks); blockidx++ {
if err := s.pendingChannel.AddBlock(s.blocks[blockidx]); s.pendingChannel.IsFull() {
break
} else if err != nil {
return fmt.Errorf("adding block[%d] to channel builder: %w", blockidx, err)
}
}
s.log.Debug("Added blocks to channel", "num_blocks", blockidx+1, "channel_full", channelFull)
if blockidx+1 == len(s.blocks) {
// all blocks processed, reuse slice
s.blocks = s.blocks[:0]
} else {
// remove processed blocks
s.blocks = s.blocks[blockidx+1:]
}
return nil
}
// AddL2Block saves an L2 block to the internal state. It returns ErrReorg
......@@ -272,11 +257,10 @@ func (s *channelManager) TxData(l1Head eth.L1BlockRef) ([]byte, txID, error) {
// If no block is already in the channel, the the parent hash check is skipped.
// TODO: Phantom last block b/c if the local state is fully drained we can reorg without realizing it.
func (s *channelManager) AddL2Block(block *types.Block) error {
if len(s.blocks) > 0 {
if s.blocks[len(s.blocks)-1].Hash() != block.ParentHash() {
return ErrReorg
}
if l := len(s.blocks); l > 0 && s.blocks[l-1].Hash() != block.ParentHash() {
return ErrReorg
}
s.blocks = append(s.blocks, block)
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment