Commit be1cb549 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge pull request #3741 from ethereum-optimism/jg/limit_decompressed_size_channel_out

op-node: Limit decompressed size in channel_out
parents 7fbf8cf7 4cc4b938
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"compress/zlib" "compress/zlib"
"crypto/rand" "crypto/rand"
"errors" "errors"
"fmt"
"io" "io"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -14,15 +15,14 @@ import ( ...@@ -14,15 +15,14 @@ import (
) )
var ErrNotDepositTx = errors.New("first transaction in block is not a deposit tx") var ErrNotDepositTx = errors.New("first transaction in block is not a deposit tx")
var ErrTooManyRLPBytes = errors.New("batch would cause RLP bytes to go over limit")
type ChannelOut struct { type ChannelOut struct {
id ChannelID id ChannelID
// Frame ID of the next frame to emit. Increment after emitting // Frame ID of the next frame to emit. Increment after emitting
frame uint64 frame uint64
// How much we've pulled from the reader so far // rlpLength is the uncompressed size of the channel. Must be less than MAX_RLP_BYTES_PER_CHANNEL
offset uint64 rlpLength int
// scratch for temporary buffering
scratch bytes.Buffer
// Compressor stage. Write input data to it // Compressor stage. Write input data to it
compress *zlib.Writer compress *zlib.Writer
...@@ -38,9 +38,9 @@ func (co *ChannelOut) ID() ChannelID { ...@@ -38,9 +38,9 @@ func (co *ChannelOut) ID() ChannelID {
func NewChannelOut() (*ChannelOut, error) { func NewChannelOut() (*ChannelOut, error) {
c := &ChannelOut{ c := &ChannelOut{
id: ChannelID{}, // TODO: use GUID here instead of fully random data id: ChannelID{}, // TODO: use GUID here instead of fully random data
frame: 0, frame: 0,
offset: 0, rlpLength: 0,
} }
_, err := rand.Read(c.id[:]) _, err := rand.Read(c.id[:])
if err != nil { if err != nil {
...@@ -59,9 +59,8 @@ func NewChannelOut() (*ChannelOut, error) { ...@@ -59,9 +59,8 @@ func NewChannelOut() (*ChannelOut, error) {
// TODO: reuse ChannelOut for performance // TODO: reuse ChannelOut for performance
func (co *ChannelOut) Reset() error { func (co *ChannelOut) Reset() error {
co.frame = 0 co.frame = 0
co.offset = 0 co.rlpLength = 0
co.buf.Reset() co.buf.Reset()
co.scratch.Reset()
co.compress.Reset(&co.buf) co.compress.Reset(&co.buf)
co.closed = false co.closed = false
_, err := rand.Read(co.id[:]) _, err := rand.Read(co.id[:])
...@@ -71,11 +70,33 @@ func (co *ChannelOut) Reset() error { ...@@ -71,11 +70,33 @@ func (co *ChannelOut) Reset() error {
return nil return nil
} }
// AddBlock adds a block to the channel. It returns an error
// if there is a problem adding the block. The only sentinel
// error that it returns is ErrTooManyRLPBytes. If this error
// is returned, the channel should be closed and a new one
// should be made.
func (co *ChannelOut) AddBlock(block *types.Block) error { func (co *ChannelOut) AddBlock(block *types.Block) error {
if co.closed { if co.closed {
return errors.New("already closed") return errors.New("already closed")
} }
return blockToBatch(block, co.compress) batch, err := blockToBatch(block)
if err != nil {
return err
}
// We encode to a temporary buffer to determine the encoded length to
// ensure that the total size of all RLP elements is less than MAX_RLP_BYTES_PER_CHANNEL
var buf bytes.Buffer
if err := rlp.Encode(&buf, batch); err != nil {
return err
}
if co.rlpLength+buf.Len() > MaxRLPBytesPerChannel {
return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
buf.Len(), co.rlpLength, MaxRLPBytesPerChannel, ErrTooManyRLPBytes)
}
co.rlpLength += buf.Len()
_, err = io.Copy(co.compress, &buf)
return err
} }
// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame. // ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
...@@ -141,35 +162,35 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error { ...@@ -141,35 +162,35 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error {
} }
} }
// blockToBatch writes the raw block bytes (after batch encoding) to the writer // blockToBatch transforms a block into a batch object that can easily be RLP encoded.
func blockToBatch(block *types.Block, w io.Writer) error { func blockToBatch(block *types.Block) (*BatchData, error) {
var opaqueTxs []hexutil.Bytes var opaqueTxs []hexutil.Bytes
for _, tx := range block.Transactions() { for i, tx := range block.Transactions() {
if tx.Type() == types.DepositTxType { if tx.Type() == types.DepositTxType {
continue continue
} }
otx, err := tx.MarshalBinary() otx, err := tx.MarshalBinary()
if err != nil { if err != nil {
return err // TODO: wrap err return nil, fmt.Errorf("could not encode tx %v in block %v: %w", i, tx.Hash(), err)
} }
opaqueTxs = append(opaqueTxs, otx) opaqueTxs = append(opaqueTxs, otx)
} }
l1InfoTx := block.Transactions()[0] l1InfoTx := block.Transactions()[0]
if l1InfoTx.Type() != types.DepositTxType { if l1InfoTx.Type() != types.DepositTxType {
return ErrNotDepositTx return nil, ErrNotDepositTx
} }
l1Info, err := L1InfoDepositTxData(l1InfoTx.Data()) l1Info, err := L1InfoDepositTxData(l1InfoTx.Data())
if err != nil { if err != nil {
return err // TODO: wrap err return nil, fmt.Errorf("could not parse the L1 Info deposit: %w", err)
} }
batch := &BatchData{BatchV1{ return &BatchData{
ParentHash: block.ParentHash(), BatchV1{
EpochNum: rollup.Epoch(l1Info.Number), ParentHash: block.ParentHash(),
EpochHash: l1Info.BlockHash, EpochNum: rollup.Epoch(l1Info.Number),
Timestamp: block.Time(), EpochHash: l1Info.BlockHash,
Transactions: opaqueTxs, Timestamp: block.Time(),
}, Transactions: opaqueTxs,
} },
return rlp.Encode(w, batch) }, nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment