Commit 04a39a47 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Fixed Integers in channel frame header (#3126)

* goals for fixed int

* op-node: Fixed integer sizes in the channel header

This mainly modifies the channel_frame, but has some knock on effects
as the width of some of the fields have been reduced. The channel
frame code is also changed more than I expected due to differences
in API of working with fixed int vs uvarints in go. Otherwise the
code reads very similarly with using Reader/Writer APIs.

* op-node: fix frame unmarshal func to return correct err

* specs: update frame format specs

* Update op-node/rollup/derive/channel_bank_test.go
Co-authored-by: default avatarprotolambda <proto@protolambda.com>
Co-authored-by: default avatarMatthew Slipper <me@matthewslipper.com>
parent d37c7ff5
...@@ -136,7 +136,7 @@ func (ib *ChannelBank) IngestData(data []byte) error { ...@@ -136,7 +136,7 @@ func (ib *ChannelBank) IngestData(data []byte) error {
} }
ib.log.Debug("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data)) ib.log.Debug("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.IngestData(f.FrameNumber, f.IsLast, f.Data); err != nil { if err := currentCh.IngestData(uint64(f.FrameNumber), f.IsLast, f.Data); err != nil {
ib.log.Debug("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err) ib.log.Debug("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
if done { if done {
return nil return nil
......
package derive package derive
import ( import (
"bytes"
"fmt"
"math/rand" "math/rand"
"strconv" "strconv"
"strings" "strings"
...@@ -106,32 +108,28 @@ func (tf testFrame) Content() []byte { ...@@ -106,32 +108,28 @@ func (tf testFrame) Content() []byte {
return []byte(strings.TrimSuffix(parts[3], "!")) return []byte(strings.TrimSuffix(parts[3], "!"))
} }
func (tf testFrame) Encode() []byte { func (tf testFrame) ToFrame() Frame {
chID := tf.ChannelID() return Frame{
var out []byte ID: tf.ChannelID(),
out = append(out, chID.Data[:]...) FrameNumber: uint16(tf.FrameNumber()),
out = append(out, makeUVarint(chID.Time)...) Data: tf.Content(),
out = append(out, makeUVarint(tf.FrameNumber())...) IsLast: tf.IsLast(),
content := tf.Content()
out = append(out, makeUVarint(uint64(len(content)))...)
out = append(out, content...)
if tf.IsLast() {
out = append(out, 1)
} else {
out = append(out, 0)
} }
return out
} }
func (bt *bankTestSetup) ingestData(data []byte) { func (bt *bankTestSetup) ingestData(data []byte) {
require.NoError(bt.t, bt.cb.IngestData(data)) require.NoError(bt.t, bt.cb.IngestData(data))
} }
func (bt *bankTestSetup) ingestFrames(frames ...testFrame) { func (bt *bankTestSetup) ingestFrames(frames ...testFrame) {
data := []byte{DerivationVersion0} data := new(bytes.Buffer)
data.WriteByte(DerivationVersion0)
for _, fr := range frames { for _, fr := range frames {
data = append(data, fr.Encode()...) f := fr.ToFrame()
if err := f.MarshalBinary(data); err != nil {
panic(fmt.Errorf("error in making frame during test: %w", err))
}
} }
bt.ingestData(data) bt.ingestData(data.Bytes())
} }
func (bt *bankTestSetup) repeatStep(max int, outer int, outerClosed bool, err error) { func (bt *bankTestSetup) repeatStep(max int, outer int, outerClosed bool, err error) {
require.Equal(bt.t, err, RepeatStep(bt.t, bt.cb.Step, Progress{Origin: bt.origins[outer], Closed: outerClosed}, max)) require.Equal(bt.t, err, RepeatStep(bt.t, bt.cb.Step, Progress{Origin: bt.origins[outer], Closed: outerClosed}, max))
...@@ -292,10 +290,14 @@ func TestL1ChannelBank(t *testing.T) { ...@@ -292,10 +290,14 @@ func TestL1ChannelBank(t *testing.T) {
bt.assertOriginTime(101) bt.assertOriginTime(101)
badTx := []byte{DerivationVersion0} badTx := new(bytes.Buffer)
badTx = append(badTx, testFrame("a:101:0:helloworld!").Encode()...) badTx.WriteByte(DerivationVersion0)
badTx = append(badTx, testutils.RandomData(bt.rng, 30)...) // incomplete frame data goodFrame := testFrame("a:101:0:helloworld!").ToFrame()
bt.ingestData(badTx) if err := goodFrame.MarshalBinary(badTx); err != nil {
panic(fmt.Errorf("error in marshalling frame: %w", err))
}
badTx.Write(testutils.RandomData(bt.rng, 30)) // incomplete frame data
bt.ingestData(badTx.Bytes())
bt.expectChannel("helloworld") // can still read the frames before the invalid data bt.expectChannel("helloworld") // can still read the frames before the invalid data
bt.repeatStep(2, 0, false, nil) bt.repeatStep(2, 0, false, nil)
bt.assertExpectations() bt.assertExpectations()
......
...@@ -12,70 +12,56 @@ import ( ...@@ -12,70 +12,56 @@ import (
// but we leave space to grow larger anyway (gas limit allows for more data). // but we leave space to grow larger anyway (gas limit allows for more data).
const MaxFrameLen = 1_000_000 const MaxFrameLen = 1_000_000
var ErrNotEnoughFrameBytes = errors.New("not enough available bytes for the frame")
// Data Format // Data Format
// //
// frame = channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last // frame = channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last
// //
// channel_id = random ++ timestamp // channel_id = random ++ timestamp
// random = bytes32 // random = bytes32
// timestamp = uvarint // timestamp = uint64
// frame_number = uvarint // frame_number = uint16
// frame_data_length = uvarint // frame_data_length = uint32
// frame_data = bytes // frame_data = bytes
// is_last = bool // is_last = bool
type Frame struct { type Frame struct {
ID ChannelID ID ChannelID
FrameNumber uint64 FrameNumber uint16
Data []byte Data []byte
IsLast bool IsLast bool
} }
// MarshalBinary writes the frame to `w`. // MarshalBinary writes the frame to `w`.
// It returns the number of bytes written as well as any // It returns any errors encountered while writing, but
// error encountered while writing. // generally expects the writer very rarely fail.
func (f *Frame) MarshalBinary(w io.Writer) (int, error) { func (f *Frame) MarshalBinary(w io.Writer) error {
n, err := w.Write(f.ID.Data[:]) _, err := w.Write(f.ID.Data[:])
if err != nil { if err != nil {
return n, err return err
} }
l, err := w.Write(makeUVarint(f.ID.Time)) if err := binary.Write(w, binary.BigEndian, f.ID.Time); err != nil {
n += l return err
if err != nil {
return n, err
} }
l, err = w.Write(makeUVarint(f.FrameNumber)) if err := binary.Write(w, binary.BigEndian, f.FrameNumber); err != nil {
n += l return err
if err != nil {
return n, err
} }
if err := binary.Write(w, binary.BigEndian, uint32(len(f.Data))); err != nil {
l, err = w.Write(makeUVarint(uint64(len(f.Data)))) return err
n += l
if err != nil {
return n, err
} }
l, err = w.Write(f.Data) _, err = w.Write(f.Data)
n += l
if err != nil { if err != nil {
return n, err return err
} }
if f.IsLast { if f.IsLast {
l, err = w.Write([]byte{1}) if _, err = w.Write([]byte{1}); err != nil {
n += l return err
if err != nil {
return n, err
} }
} else { } else {
l, err = w.Write([]byte{0}) if _, err = w.Write([]byte{0}); err != nil {
n += l return err
if err != nil {
return n, err
} }
} }
return n, nil return nil
} }
type ByteReader interface { type ByteReader interface {
...@@ -87,25 +73,23 @@ type ByteReader interface { ...@@ -87,25 +73,23 @@ type ByteReader interface {
// If `r` fails a read, it returns the error from the reader // If `r` fails a read, it returns the error from the reader
// The reader will be left in a partially read state. // The reader will be left in a partially read state.
func (f *Frame) UnmarshalBinary(r ByteReader) error { func (f *Frame) UnmarshalBinary(r ByteReader) error {
_, err := io.ReadFull(r, f.ID.Data[:]) if _, err := io.ReadFull(r, f.ID.Data[:]); err != nil {
if err != nil {
return fmt.Errorf("error reading ID: %w", err) return fmt.Errorf("error reading ID: %w", err)
} }
f.ID.Time, err = binary.ReadUvarint(r) if err := binary.Read(r, binary.BigEndian, &f.ID.Time); err != nil {
if err != nil { return fmt.Errorf("error reading ID time: %w", err)
return fmt.Errorf("error reading ID.Time: %w", err)
} }
// stop reading and ignore remaining data if we encounter a zeroed ID // stop reading and ignore remaining data if we encounter a zeroed ID
if f.ID == (ChannelID{}) { if f.ID == (ChannelID{}) {
return io.EOF return io.EOF
} }
f.FrameNumber, err = binary.ReadUvarint(r)
if err != nil { if err := binary.Read(r, binary.BigEndian, &f.FrameNumber); err != nil {
return fmt.Errorf("error reading frame number: %w", err) return fmt.Errorf("error reading frame number: %w", err)
} }
frameLength, err := binary.ReadUvarint(r) var frameLength uint32
if err != nil { if err := binary.Read(r, binary.BigEndian, &frameLength); err != nil {
return fmt.Errorf("error reading frame length: %w", err) return fmt.Errorf("error reading frame length: %w", err)
} }
...@@ -118,16 +102,15 @@ func (f *Frame) UnmarshalBinary(r ByteReader) error { ...@@ -118,16 +102,15 @@ func (f *Frame) UnmarshalBinary(r ByteReader) error {
return fmt.Errorf("error reading frame data: %w", err) return fmt.Errorf("error reading frame data: %w", err)
} }
isLastByte, err := r.ReadByte() if isLastByte, err := r.ReadByte(); err != nil && err != io.EOF {
if err != nil && err != io.EOF {
return fmt.Errorf("error reading final byte: %w", err) return fmt.Errorf("error reading final byte: %w", err)
} } else if isLastByte == 0 {
if isLastByte == 0 {
f.IsLast = false f.IsLast = false
return err
} else if isLastByte == 1 { } else if isLastByte == 1 {
f.IsLast = true f.IsLast = true
return err
} else { } else {
return errors.New("invalid byte as is_last") return errors.New("invalid byte as is_last")
} }
return err
} }
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"compress/zlib" "compress/zlib"
"crypto/rand" "crypto/rand"
"encoding/binary"
"errors" "errors"
"io" "io"
...@@ -80,12 +79,6 @@ func (co *ChannelOut) AddBlock(block *types.Block) error { ...@@ -80,12 +79,6 @@ func (co *ChannelOut) AddBlock(block *types.Block) error {
return blockToBatch(block, co.compress) return blockToBatch(block, co.compress)
} }
func makeUVarint(x uint64) []byte {
var tmp [binary.MaxVarintLen64]byte
n := binary.PutUvarint(tmp[:], x)
return tmp[:n]
}
// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame. // ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
// Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes // Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes
// are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage. // are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage.
...@@ -115,18 +108,18 @@ func (co *ChannelOut) Close() error { ...@@ -115,18 +108,18 @@ func (co *ChannelOut) Close() error {
func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error { func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error {
f := Frame{ f := Frame{
ID: co.id, ID: co.id,
FrameNumber: co.frame, FrameNumber: uint16(co.frame),
} }
// Copy data from the local buffer into the frame data buffer // Copy data from the local buffer into the frame data buffer
// Don't go past the maxSize even with the max possible uvarints // Don't go past the maxSize with the fixed frame overhead.
// +1 for single byte of frame content, +1 for lastFrame bool // Fixed overhead: 32 + 8 + 2 + 4 + 1 = 47 bytes.
// +24 for maximum uvarints // Add one extra byte for the version byte (for the entire L1 tx though)
// +32 for the data ID maxDataSize := maxSize - 47 - 1
maxDataSize := maxSize - 32 - 24 - 1 - 1 if maxDataSize > uint64(co.buf.Len()) {
if maxDataSize >= uint64(co.buf.Len()) {
maxDataSize = uint64(co.buf.Len()) maxDataSize = uint64(co.buf.Len())
// If we are closed & will not spill past the current frame, end it. // If we are closed & will not spill past the current frame
// mark it is the final frame of the channel.
if co.closed { if co.closed {
f.IsLast = true f.IsLast = true
} }
...@@ -137,7 +130,7 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error { ...@@ -137,7 +130,7 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error {
return err return err
} }
if _, err := f.MarshalBinary(w); err != nil { if err := f.MarshalBinary(w); err != nil {
return err return err
} }
......
...@@ -265,19 +265,21 @@ frame = channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last ...@@ -265,19 +265,21 @@ frame = channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last
channel_id = random ++ timestamp channel_id = random ++ timestamp
random = bytes32 random = bytes32
timestamp = uvarint timestamp = uint64
frame_number = uvarint frame_number = uint16
frame_data_length = uvarint frame_data_length = uint32
frame_data = bytes frame_data = bytes
is_last = bool is_last = bool
Where `uint64`, `uint32` and `uint16` are all big-endian unsigned integers.
``` ```
> **TODO** replace `uvarint` by fixed size integers All data in a frame is fixed-size, except the `frame_data`. The fixed overhead is `32 + 8 + 2 + 4 + 1 = 47 bytes`.
Fixed-size frame metadata avoids a circular dependency with the target total data length,
to simplify packing of frames with varying content length.
where: where:
- `uvarint` is a variable-length encoding of a 64-bit unsigned integer into between 1 and 9 bytes, [as specified in
SQLite 4][sqlite-uvarint].
- `channel_id` uniquely identifies a channel as the concatenation of a random value and a timestamp - `channel_id` uniquely identifies a channel as the concatenation of a random value and a timestamp
- `random` is a random value such that two channels with different batches should have a different random value - `random` is a random value such that two channels with different batches should have a different random value
- `timestamp` is the time at which the channel was created (UNIX time in seconds) - `timestamp` is the time at which the channel was created (UNIX time in seconds)
...@@ -290,7 +292,7 @@ where: ...@@ -290,7 +292,7 @@ where:
margin. (A soft constraint is not a consensus rule — nodes will accept such blocks in the canonical chain but will margin. (A soft constraint is not a consensus rule — nodes will accept such blocks in the canonical chain but will
not attempt to build directly on them.) not attempt to build directly on them.)
- `frame_number` identifies the index of the frame within the channel - `frame_number` identifies the index of the frame within the channel
- `frame_data_length` is the length of `frame_data` in bytes - `frame_data_length` is the length of `frame_data` in bytes. It is capped to 1,000,000 bytes.
- `frame_data` is a sequence of bytes belonging to the channel, logically after the bytes from the previous frames - `frame_data` is a sequence of bytes belonging to the channel, logically after the bytes from the previous frames
- `is_last` is a single byte with a value of 1 if the frame is the last in the channel, 0 if there are frames in the - `is_last` is a single byte with a value of 1 if the frame is the last in the channel, 0 if there are frames in the
channel. Any other value makes the frame invalid (it must be ignored by the rollup node). channel. Any other value makes the frame invalid (it must be ignored by the rollup node).
...@@ -302,8 +304,7 @@ where: ...@@ -302,8 +304,7 @@ where:
> - Do we drop the channel or just the first frame? End result is the same but this changes the channel bank size, which > - Do we drop the channel or just the first frame? End result is the same but this changes the channel bank size, which
> can influence things down the line!! > can influence things down the line!!
[sqlite-uvarint]: https://www.sqlite.org/src4/doc/trunk/www/varint.wiki [batcher-spec]: batching.md
[batcher-spec]: batcher.md
### Channel Format ### Channel Format
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment