Commit bfa22916 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Dedicated frame parsing code (#3230)

* op-node: Rename channel_frame.go -> fame.go

* op-node: New ParseFrames function + fuzzers

* op-node: Use new ParseFrames function

* fix test to new behavior

* small fixes

* fix lint

* small fixes

* fix nits

* fix nits

* fix logging format
Co-authored-by: default avatarmergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
parent 1c443841
...@@ -26,6 +26,8 @@ fuzz: ...@@ -26,6 +26,8 @@ fuzz:
go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzL1InfoRoundTrip ./rollup/derive go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzL1InfoRoundTrip ./rollup/derive
go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzL1InfoAgainstContract ./rollup/derive go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzL1InfoAgainstContract ./rollup/derive
go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzUnmarshallLogEvent ./rollup/derive go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzUnmarshallLogEvent ./rollup/derive
go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzParseFrames ./rollup/derive
go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzFrameUnmarshalBinary ./rollup/derive
.PHONY: \ .PHONY: \
......
package derive package derive
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io" "io"
...@@ -83,61 +82,26 @@ func (ib *ChannelBank) IngestData(data []byte) { ...@@ -83,61 +82,26 @@ func (ib *ChannelBank) IngestData(data []byte) {
panic("write data to bank while closed") panic("write data to bank while closed")
} }
ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data)) ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data))
if len(data) < 1 {
ib.log.Warn("data must be at least have a version byte, but got empty string")
return
}
if data[0] != DerivationVersion0 {
ib.log.Warn("unrecognized derivation version", "version", data)
return
}
buf := bytes.NewBuffer(data[1:])
// TODO: Why is the prune here?
ib.prune() ib.prune()
if buf.Len() < minimumFrameSize { frames, err := ParseFrames(data)
ib.log.Warn("data must be at least have one frame", "length", buf.Len()) if err != nil {
return ib.log.Warn("malformed frame", "err", err)
}
// Iterate over all frames. They may have different channel IDs to indicate that they stream consumer should reset.
for {
// Don't try to unmarshal from an empty buffer.
// The if done checks should catch most/all of this case though.
if buf.Len() < ChannelIDDataSize+1 {
return
}
done := false
var f Frame
if err := (&f).UnmarshalBinary(buf); err == io.EOF {
done = true
} else if err != nil {
ib.log.Warn("malformed frame: %w", err)
return
}
// stop reading and ignore remaining data if we encounter a zeroed ID,
// this happens when there is zero padding at the end.
if f.ID == (ChannelID{}) {
ib.log.Trace("empty channel ID")
return return
} }
// Process each frame
for _, f := range frames {
// check if the channel is not timed out // check if the channel is not timed out
if f.ID.Time+ib.cfg.ChannelTimeout < ib.progress.Origin.Time { if f.ID.Time+ib.cfg.ChannelTimeout < ib.progress.Origin.Time {
ib.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber) ib.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber)
if done {
return
}
continue continue
} }
// check if the channel is not included too soon (otherwise timeouts wouldn't be effective) // check if the channel is not included too soon (otherwise timeouts wouldn't be effective)
if f.ID.Time > ib.progress.Origin.Time { if f.ID.Time > ib.progress.Origin.Time {
ib.log.Warn("channel claims to be from the future, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber) ib.log.Warn("channel claims to be from the future, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber)
if done {
return
}
continue continue
} }
...@@ -151,15 +115,8 @@ func (ib *ChannelBank) IngestData(data []byte) { ...@@ -151,15 +115,8 @@ func (ib *ChannelBank) IngestData(data []byte) {
ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data)) ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.IngestData(uint64(f.FrameNumber), f.IsLast, f.Data); err != nil { if err := currentCh.IngestData(uint64(f.FrameNumber), f.IsLast, f.Data); err != nil {
ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err) ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
if done {
return
}
continue continue
} }
if done {
return
}
} }
} }
......
...@@ -299,7 +299,7 @@ func TestL1ChannelBank(t *testing.T) { ...@@ -299,7 +299,7 @@ func TestL1ChannelBank(t *testing.T) {
} }
badTx.Write(testutils.RandomData(bt.rng, 30)) // incomplete frame data badTx.Write(testutils.RandomData(bt.rng, 30)) // incomplete frame data
bt.ingestData(badTx.Bytes()) bt.ingestData(badTx.Bytes())
bt.expectChannel("helloworld") // can still read the frames before the invalid data // Expect the bad frame to render the entire chunk invalid.
bt.repeatStep(2, 0, false, nil) bt.repeatStep(2, 0, false, nil)
bt.assertExpectations() bt.assertExpectations()
}, },
......
package derive package derive
import ( import (
"bytes"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
...@@ -79,10 +80,6 @@ func (f *Frame) UnmarshalBinary(r ByteReader) error { ...@@ -79,10 +80,6 @@ func (f *Frame) UnmarshalBinary(r ByteReader) error {
if err := binary.Read(r, binary.BigEndian, &f.ID.Time); err != nil { if err := binary.Read(r, binary.BigEndian, &f.ID.Time); err != nil {
return fmt.Errorf("error reading ID time: %w", err) return fmt.Errorf("error reading ID time: %w", err)
} }
// stop reading and ignore remaining data if we encounter a zeroed ID
if f.ID == (ChannelID{}) {
return io.EOF
}
if err := binary.Read(r, binary.BigEndian, &f.FrameNumber); err != nil { if err := binary.Read(r, binary.BigEndian, &f.FrameNumber); err != nil {
return fmt.Errorf("error reading frame number: %w", err) return fmt.Errorf("error reading frame number: %w", err)
...@@ -114,3 +111,37 @@ func (f *Frame) UnmarshalBinary(r ByteReader) error { ...@@ -114,3 +111,37 @@ func (f *Frame) UnmarshalBinary(r ByteReader) error {
return errors.New("invalid byte as is_last") return errors.New("invalid byte as is_last")
} }
} }
// Frames on stored in L1 transactions with the following format:
// data = DerivationVersion0 ++ Frame(s)
// Where there is one or more frames concatenated together.
// ParseFrames parse the on chain serialization of frame(s) in
// an L1 transaction. Currently only version 0 of the serialization
// format is supported.
// All frames must be parsed without error and there must not be
// any left over data and there must be at least one frame.
func ParseFrames(data []byte) ([]Frame, error) {
if len(data) == 0 {
return nil, errors.New("data array must not be empty")
}
if data[0] != DerivationVersion0 {
return nil, fmt.Errorf("invalid derivation format byte: got %d", data[0])
}
buf := bytes.NewBuffer(data[1:])
var frames []Frame
for buf.Len() > 0 {
var f Frame
if err := (&f).UnmarshalBinary(buf); err != io.EOF && err != nil {
return nil, err
}
frames = append(frames, f)
}
if buf.Len() != 0 {
return nil, fmt.Errorf("did not fully consume data: have %d frames and %d bytes left", len(frames), buf.Len())
}
if len(frames) == 0 {
return nil, errors.New("was not able to find any frames")
}
return frames, nil
}
package derive
import (
"bytes"
"testing"
)
func FuzzFrameUnmarshalBinary(f *testing.F) {
f.Fuzz(func(t *testing.T, data []byte) {
buf := bytes.NewBuffer(data)
var f Frame
_ = (&f).UnmarshalBinary(buf)
})
}
func FuzzParseFrames(f *testing.F) {
f.Fuzz(func(t *testing.T, data []byte) {
frames, err := ParseFrames(data)
if err != nil && len(frames) != 0 {
t.Fatal("non-nil error with an amount of return data")
} else if err == nil && len(frames) == 0 {
t.Fatal("must return data with a non-nil error")
}
})
}
...@@ -12,9 +12,6 @@ const frameOverhead = 200 ...@@ -12,9 +12,6 @@ const frameOverhead = 200
const DerivationVersion0 = 0 const DerivationVersion0 = 0
// channel ID (data + time), frame number, frame length, last frame bool
const minimumFrameSize = (ChannelIDDataSize + 1) + 1 + 1 + 1
// MaxChannelBankSize is the amount of memory space, in number of bytes, // MaxChannelBankSize is the amount of memory space, in number of bytes,
// till the bank is pruned by removing channels, // till the bank is pruned by removing channels,
// starting with the oldest channel. // starting with the oldest channel.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment