Commit 9673eca7 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Isolated channel frame serialization (#3125)

* op-node: Add channel frame serialization

This creates a struct & serialization & deserialization methods
for it. This is to replace the current serialization code which
is embedded in more complex methods.

The reading is done with a reader API instead of a bytes API because
the frame is variable length & is originally merged together with
other frames without a clean division. The writing API uses a
writer for simplicity (but is not required).

This lays the groundwork for easily switching to fixed int sizes.

* op-node: Use channel frame deserialization

This uses the new channel frame object for deserialization. Some
of the API interaction is a little weird in the channel_bank
IngestData loop, but the code is not able to be easily tested and
upgraded.

* op-node: Use channel frame in serialization

This uses the default implementation (through a struct) rather
than the custom logic. It might make sense to use a slightly
different API for serialization than deserialization given the
inputs, but splitting out the logic into it's own function is
still an improvement.
Co-authored-by: default avatarmergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
parent dac4a9f0
...@@ -15,6 +15,7 @@ cloud.google.com/go/compute v1.3.0/go.mod h1:cCZiE1NHEtai4wiufUhW8I8S1JKkAnhnQJW ...@@ -15,6 +15,7 @@ cloud.google.com/go/compute v1.3.0/go.mod h1:cCZiE1NHEtai4wiufUhW8I8S1JKkAnhnQJW
cloud.google.com/go/compute v1.5.0/go.mod h1:9SMHyhJlzhlkJqrPAc839t2BZFTSk6Jdj6mkzQJeu0M= cloud.google.com/go/compute v1.5.0/go.mod h1:9SMHyhJlzhlkJqrPAc839t2BZFTSk6Jdj6mkzQJeu0M=
cloud.google.com/go/compute v1.6.0/go.mod h1:T29tfhtVbq1wvAPo0E3+7vhgmkOYeXjhFvz/FMzPu0s= cloud.google.com/go/compute v1.6.0/go.mod h1:T29tfhtVbq1wvAPo0E3+7vhgmkOYeXjhFvz/FMzPu0s=
cloud.google.com/go/compute v1.6.1/go.mod h1:g85FgpzFvNULZ+S8AYq87axRKuf2Kh7deLqV/jJ3thU= cloud.google.com/go/compute v1.6.1/go.mod h1:g85FgpzFvNULZ+S8AYq87axRKuf2Kh7deLqV/jJ3thU=
github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/btcsuite/btcd/btcec/v2 v2.1.3/go.mod h1:ctjw4H1kknNJmRN4iP1R7bTQ+v3GJkZBd6mui8ZsAZE= github.com/btcsuite/btcd/btcec/v2 v2.1.3/go.mod h1:ctjw4H1kknNJmRN4iP1R7bTQ+v3GJkZBd6mui8ZsAZE=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
......
package derive package derive
import ( import (
"bytes"
"context" "context"
"encoding/binary"
"fmt" "fmt"
"io" "io"
...@@ -81,85 +81,72 @@ func (ib *ChannelBank) IngestData(data []byte) error { ...@@ -81,85 +81,72 @@ func (ib *ChannelBank) IngestData(data []byte) error {
if data[0] != DerivationVersion0 { if data[0] != DerivationVersion0 {
return fmt.Errorf("unrecognized derivation version: %d", data) return fmt.Errorf("unrecognized derivation version: %d", data)
} }
buf := bytes.NewBuffer(data[1:])
ib.prune() ib.prune()
offset := 1 if buf.Len() < minimumFrameSize {
if len(data[offset:]) < minimumFrameSize {
return fmt.Errorf("data must be at least have one frame") return fmt.Errorf("data must be at least have one frame")
} }
// Iterate over all frames. They may have different channel IDs to indicate that they stream consumer should reset. // Iterate over all frames. They may have different channel IDs to indicate that they stream consumer should reset.
for { for {
if len(data) < offset+ChannelIDDataSize+1 { // Don't try to unmarshal from an empty buffer.
// The if done checks should catch most/all of this case though.
if buf.Len() < ChannelIDDataSize+1 {
return nil return nil
} }
var chID ChannelID done := false
copy(chID.Data[:], data[offset:]) var f Frame
offset += ChannelIDDataSize if err := (&f).UnmarshalBinary(buf); err == io.EOF {
chIDTime, n := binary.Uvarint(data[offset:]) done = true
if n <= 0 { } else if err != nil {
return fmt.Errorf("failed to read frame number") return fmt.Errorf("failed to unmarshal a frame: %w", err)
} }
offset += n
chID.Time = chIDTime
// stop reading and ignore remaining data if we encounter a zeroed ID // stop reading and ignore remaining data if we encounter a zeroed ID
if chID == (ChannelID{}) { if f.ID == (ChannelID{}) {
ib.log.Info("empty channel ID")
return nil return nil
} }
frameNumber, n := binary.Uvarint(data[offset:])
if n <= 0 {
return fmt.Errorf("failed to read frame number")
}
offset += n
frameLength, n := binary.Uvarint(data[offset:])
if n <= 0 {
return fmt.Errorf("failed to read frame length")
}
offset += n
if remaining := uint64(len(data) - offset); remaining < frameLength {
return fmt.Errorf("not enough data left for frame: %d < %d", remaining, frameLength)
}
frameData := data[offset : uint64(offset)+frameLength]
offset += int(frameLength)
if offset >= len(data) {
return fmt.Errorf("failed to read frame end byte, no data left, offset past length %d", len(data))
}
isLastNum := data[offset]
if isLastNum > 1 {
return fmt.Errorf("invalid isLast bool value: %d", data[offset])
}
isLast := isLastNum == 1
offset += 1
// check if the channel is not timed out // check if the channel is not timed out
if chID.Time+ib.cfg.ChannelTimeout < ib.progress.Origin.Time { if f.ID.Time+ib.cfg.ChannelTimeout < ib.progress.Origin.Time {
ib.log.Info("channel is timed out, ignore frame", "channel", chID, "id_time", chID.Time, "frame", frameNumber) ib.log.Info("channel is timed out, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber)
if done {
return nil
}
continue continue
} }
// check if the channel is not included too soon (otherwise timeouts wouldn't be effective) // check if the channel is not included too soon (otherwise timeouts wouldn't be effective)
if chID.Time > ib.progress.Origin.Time { if f.ID.Time > ib.progress.Origin.Time {
ib.log.Info("channel claims to be from the future, ignore frame", "channel", chID, "id_time", chID.Time, "frame", frameNumber) ib.log.Info("channel claims to be from the future, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber)
if done {
return nil
}
continue continue
} }
currentCh, ok := ib.channels[chID] currentCh, ok := ib.channels[f.ID]
if !ok { // create new channel if it doesn't exist yet if !ok { // create new channel if it doesn't exist yet
currentCh = &ChannelIn{id: chID} currentCh = &ChannelIn{id: f.ID}
ib.channels[chID] = currentCh ib.channels[f.ID] = currentCh
ib.channelQueue = append(ib.channelQueue, chID) ib.channelQueue = append(ib.channelQueue, f.ID)
} }
ib.log.Debug("ingesting frame", "channel", chID, "frame_number", frameNumber, "length", len(frameData)) ib.log.Debug("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.IngestData(frameNumber, isLast, frameData); err != nil { if err := currentCh.IngestData(f.FrameNumber, f.IsLast, f.Data); err != nil {
ib.log.Debug("failed to ingest frame into channel", "channel", chID, "frame_number", frameNumber, "err", err) ib.log.Debug("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
if done {
return nil
}
continue continue
} }
if done {
return nil
}
} }
} }
......
package derive
import (
"encoding/binary"
"errors"
"fmt"
"io"
)
// Frames cannot be larger than 1 MB.
// Data transactions that carry frames are generally not larger than 128 KB due to L1 network conditions,
// but we leave space to grow larger anyway (gas limit allows for more data).
const MaxFrameLen = 1_000_000
var ErrNotEnoughFrameBytes = errors.New("not enough available bytes for the frame")
// Data Format
//
// frame = channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last
//
// channel_id = random ++ timestamp
// random = bytes32
// timestamp = uvarint
// frame_number = uvarint
// frame_data_length = uvarint
// frame_data = bytes
// is_last = bool
type Frame struct {
ID ChannelID
FrameNumber uint64
Data []byte
IsLast bool
}
// MarshalBinary writes the frame to `w`.
// It returns the number of bytes written as well as any
// error encountered while writing.
func (f *Frame) MarshalBinary(w io.Writer) (int, error) {
n, err := w.Write(f.ID.Data[:])
if err != nil {
return n, err
}
l, err := w.Write(makeUVarint(f.ID.Time))
n += l
if err != nil {
return n, err
}
l, err = w.Write(makeUVarint(f.FrameNumber))
n += l
if err != nil {
return n, err
}
l, err = w.Write(makeUVarint(uint64(len(f.Data))))
n += l
if err != nil {
return n, err
}
l, err = w.Write(f.Data)
n += l
if err != nil {
return n, err
}
if f.IsLast {
l, err = w.Write([]byte{1})
n += l
if err != nil {
return n, err
}
} else {
l, err = w.Write([]byte{0})
n += l
if err != nil {
return n, err
}
}
return n, nil
}
type ByteReader interface {
io.Reader
io.ByteReader
}
// UnmarshalBinary consumes a full frame from the reader.
// If `r` fails a read, it returns the error from the reader
// The reader will be left in a partially read state.
func (f *Frame) UnmarshalBinary(r ByteReader) error {
_, err := io.ReadFull(r, f.ID.Data[:])
if err != nil {
return fmt.Errorf("error reading ID: %w", err)
}
f.ID.Time, err = binary.ReadUvarint(r)
if err != nil {
return fmt.Errorf("error reading ID.Time: %w", err)
}
// stop reading and ignore remaining data if we encounter a zeroed ID
if f.ID == (ChannelID{}) {
return io.EOF
}
f.FrameNumber, err = binary.ReadUvarint(r)
if err != nil {
return fmt.Errorf("error reading frame number: %w", err)
}
frameLength, err := binary.ReadUvarint(r)
if err != nil {
return fmt.Errorf("error reading frame length: %w", err)
}
// Cap frame length to MaxFrameLen (currently 1MB)
if frameLength > MaxFrameLen {
return fmt.Errorf("frameLength is too large: %d", frameLength)
}
f.Data = make([]byte, int(frameLength))
if _, err := io.ReadFull(r, f.Data); err != nil {
return fmt.Errorf("error reading frame data: %w", err)
}
isLastByte, err := r.ReadByte()
if err != nil && err != io.EOF {
return fmt.Errorf("error reading final byte: %w", err)
}
if isLastByte == 0 {
f.IsLast = false
} else if isLastByte == 1 {
f.IsLast = true
} else {
return errors.New("invalid byte as is_last")
}
return err
}
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"crypto/rand" "crypto/rand"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt"
"io" "io"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -114,41 +113,38 @@ func (co *ChannelOut) Close() error { ...@@ -114,41 +113,38 @@ func (co *ChannelOut) Close() error {
// Returns nil if there is still more buffered data. // Returns nil if there is still more buffered data.
// Returns and error if it ran into an error during processing. // Returns and error if it ran into an error during processing.
func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error { func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) error {
w.Write(co.id.Data[:]) f := Frame{
w.Write(makeUVarint(co.id.Time)) ID: co.id,
w.Write(makeUVarint(co.frame)) FrameNumber: co.frame,
}
// Copy data from the local buffer into the frame data buffer
// Don't go past the maxSize even with the max possible uvarints
// +1 for single byte of frame content, +1 for lastFrame bool // +1 for single byte of frame content, +1 for lastFrame bool
if uint64(w.Len())+2 > maxSize { // +24 for maximum uvarints
return fmt.Errorf("no more space: %d > %d", w.Len(), maxSize) // +32 for the data ID
maxDataSize := maxSize - 32 - 24 - 1 - 1
if maxDataSize >= uint64(co.buf.Len()) {
maxDataSize = uint64(co.buf.Len())
// If we are closed & will not spill past the current frame, end it.
if co.closed {
f.IsLast = true
} }
}
f.Data = make([]byte, maxDataSize)
remaining := maxSize - uint64(w.Len()) if _, err := io.ReadFull(&co.buf, f.Data); err != nil {
maxFrameLen := remaining - 1 // -1 for the bool at the end
// estimate how many bytes we lose with encoding the length of the frame
// by encoding the max length (larger uvarints take more space)
maxFrameLen -= uint64(len(makeUVarint(maxFrameLen)))
// Pull the data into a temporary buffer b/c we use uvarints to record the length
// Could theoretically use the min of co.buf.Len() & maxFrameLen
co.scratch.Reset()
_, err := io.CopyN(&co.scratch, &co.buf, int64(maxFrameLen))
if err != nil && err != io.EOF {
return err return err
} }
frameLen := uint64(co.scratch.Len())
co.offset += frameLen if _, err := f.MarshalBinary(w); err != nil {
w.Write(makeUVarint(frameLen))
if _, err := w.ReadFrom(&co.scratch); err != nil {
return err return err
} }
co.frame += 1 co.frame += 1
// Only mark as closed if the channel is closed & there is no more data available if f.IsLast {
if co.closed && err == io.EOF {
w.WriteByte(1)
return io.EOF return io.EOF
} else { } else {
w.WriteByte(0)
return nil return nil
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment