Commit 21627e4f authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Switch to 128 bit channel ID & block # timeout (#3359)

* op-node: Switch to 128 bit channel ID & block # timeout

This PR removes the 256 bit channel ID + timestamp. With the timestamp
being removed, the channel timeout is now done with block numbers.

* Update specs/derivation.md

* fix system test config
parent 83deb505
...@@ -303,7 +303,7 @@ mainLoop: ...@@ -303,7 +303,7 @@ mainLoop:
l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastSubmittedBlock, "safe", syncStatus.SafeL2) l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastSubmittedBlock, "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID() l.lastSubmittedBlock = syncStatus.SafeL2.ID()
} }
if ch, err := derive.NewChannelOut(syncStatus.HeadL1.Time); err != nil { if ch, err := derive.NewChannelOut(); err != nil {
l.log.Error("Error creating channel", "err", err) l.log.Error("Error creating channel", "err", err)
continue continue
} else { } else {
......
...@@ -137,7 +137,7 @@ func defaultSystemConfig(t *testing.T) SystemConfig { ...@@ -137,7 +137,7 @@ func defaultSystemConfig(t *testing.T) SystemConfig {
BlockTime: 1, BlockTime: 1,
MaxSequencerDrift: 10, MaxSequencerDrift: 10,
SeqWindowSize: 30, SeqWindowSize: 30,
ChannelTimeout: 20, ChannelTimeout: 10,
L1ChainID: big.NewInt(900), L1ChainID: big.NewInt(900),
L2ChainID: big.NewInt(901), L2ChainID: big.NewInt(901),
// TODO pick defaults // TODO pick defaults
......
...@@ -19,7 +19,8 @@ import ( ...@@ -19,7 +19,8 @@ import (
// channel may mark itself as ready for reading once all intervening frames have been added // channel may mark itself as ready for reading once all intervening frames have been added
type Channel struct { type Channel struct {
// id of the channel // id of the channel
id ChannelID id ChannelID
openBlock eth.L1BlockRef
// estimated memory size, used to drop the channel if we have too much data // estimated memory size, used to drop the channel if we have too much data
size uint64 size uint64
...@@ -42,10 +43,11 @@ type Channel struct { ...@@ -42,10 +43,11 @@ type Channel struct {
highestL1InclusionBlock eth.L1BlockRef highestL1InclusionBlock eth.L1BlockRef
} }
func NewChannel(id ChannelID) *Channel { func NewChannel(id ChannelID, openBlock eth.L1BlockRef) *Channel {
return &Channel{ return &Channel{
id: id, id: id,
inputs: make(map[uint64][]byte), inputs: make(map[uint64][]byte),
openBlock: openBlock,
} }
} }
...@@ -80,6 +82,12 @@ func (ch *Channel) AddFrame(frame Frame, l1InclusionBlock eth.L1BlockRef) error ...@@ -80,6 +82,12 @@ func (ch *Channel) AddFrame(frame Frame, l1InclusionBlock eth.L1BlockRef) error
return nil return nil
} }
// OpenBlockNumber returns the block number of L1 block that contained
// the first frame for this channel.
func (ch *Channel) OpenBlockNumber() uint64 {
return ch.openBlock.Number
}
// Size returns the current size of the channel including frame overhead. // Size returns the current size of the channel including frame overhead.
// Reading from the channel does not reduce the size as reading is done // Reading from the channel does not reduce the size as reading is done
// on uncompressed data while this size is over compressed data. // on uncompressed data while this size is over compressed data.
......
...@@ -35,8 +35,6 @@ type ChannelBank struct { ...@@ -35,8 +35,6 @@ type ChannelBank struct {
channels map[ChannelID]*Channel // channels by ID channels map[ChannelID]*Channel // channels by ID
channelQueue []ChannelID // channels in FIFO order channelQueue []ChannelID // channels in FIFO order
resetting bool
progress Progress progress Progress
next ChannelBankOutput next ChannelBankOutput
...@@ -94,25 +92,20 @@ func (ib *ChannelBank) IngestData(data []byte) { ...@@ -94,25 +92,20 @@ func (ib *ChannelBank) IngestData(data []byte) {
// Process each frame // Process each frame
for _, f := range frames { for _, f := range frames {
// check if the channel is not timed out
if f.ID.Time+ib.cfg.ChannelTimeout < ib.progress.Origin.Time {
ib.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber)
continue
}
// check if the channel is not included too soon (otherwise timeouts wouldn't be effective)
if f.ID.Time > ib.progress.Origin.Time {
ib.log.Warn("channel claims to be from the future, ignore frame", "channel", f.ID, "id_time", f.ID.Time, "frame", f.FrameNumber)
continue
}
currentCh, ok := ib.channels[f.ID] currentCh, ok := ib.channels[f.ID]
if !ok { if !ok {
// create new channel if it doesn't exist yet // create new channel if it doesn't exist yet
currentCh = NewChannel(f.ID) currentCh = NewChannel(f.ID, ib.progress.Origin)
ib.channels[f.ID] = currentCh ib.channels[f.ID] = currentCh
ib.channelQueue = append(ib.channelQueue, f.ID) ib.channelQueue = append(ib.channelQueue, f.ID)
} }
// check if the channel is not timed out
if currentCh.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.progress.Origin.Number {
ib.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber)
continue
}
ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data)) ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.AddFrame(f, ib.progress.Origin); err != nil { if err := currentCh.AddFrame(f, ib.progress.Origin); err != nil {
ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err) ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
...@@ -129,16 +122,17 @@ func (ib *ChannelBank) Read() (data []byte, err error) { ...@@ -129,16 +122,17 @@ func (ib *ChannelBank) Read() (data []byte, err error) {
} }
first := ib.channelQueue[0] first := ib.channelQueue[0]
ch := ib.channels[first] ch := ib.channels[first]
timedOut := first.Time+ib.cfg.ChannelTimeout < ib.progress.Origin.Time timedOut := ch.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.progress.Origin.Number
if timedOut { if timedOut {
ib.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs)) ib.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs))
delete(ib.channels, first)
ib.channelQueue = ib.channelQueue[1:]
return nil, io.EOF
} }
if ch.IsReady() { if !ch.IsReady() {
ib.log.Debug("channel ready", "channel", first)
}
if !timedOut && !ch.IsReady() { // check if channel is readya (can then be read)
return nil, io.EOF return nil, io.EOF
} }
delete(ib.channels, first) delete(ib.channels, first)
ib.channelQueue = ib.channelQueue[1:] ib.channelQueue = ib.channelQueue[1:]
r := ch.Reader() r := ch.Reader()
...@@ -176,26 +170,19 @@ func (ib *ChannelBank) Step(ctx context.Context, outer Progress) error { ...@@ -176,26 +170,19 @@ func (ib *ChannelBank) Step(ctx context.Context, outer Progress) error {
// Any channel data before this origin will be timed out by the time the channel bank is synced up to the origin, // Any channel data before this origin will be timed out by the time the channel bank is synced up to the origin,
// so it is not relevant to replay it into the bank. // so it is not relevant to replay it into the bank.
func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error { func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
if !ib.resetting { ib.progress = ib.next.Progress()
ib.progress = ib.next.Progress()
ib.resetting = true
return nil
}
if ib.progress.Origin.Time+ib.cfg.ChannelTimeout < ib.next.Progress().Origin.Time || ib.progress.Origin.Number <= ib.cfg.Genesis.L1.Number {
ib.log.Debug("found reset origin for channel bank", "origin", ib.progress.Origin)
ib.resetting = false
return io.EOF
}
ib.log.Debug("walking back to find reset origin for channel bank", "origin", ib.progress.Origin) ib.log.Debug("walking back to find reset origin for channel bank", "origin", ib.progress.Origin)
// go back in history if we are not distant enough from the next stage // go back in history if we are not distant enough from the next stage
parent, err := l1Fetcher.L1BlockRefByHash(ctx, ib.progress.Origin.ParentHash) resetBlock := ib.progress.Origin.Number - ib.cfg.ChannelTimeout
if ib.progress.Origin.Number < ib.cfg.ChannelTimeout {
resetBlock = 0 // don't underflow
}
parent, err := l1Fetcher.L1BlockRefByNumber(ctx, resetBlock)
if err != nil { if err != nil {
return NewTemporaryError(fmt.Errorf("failed to find channel bank block, failed to retrieve L1 reference: %w", err)) return NewTemporaryError(fmt.Errorf("failed to find channel bank block, failed to retrieve L1 reference: %w", err))
} }
ib.progress.Origin = parent ib.progress.Origin = parent
return nil return io.EOF
} }
type L1BlockRefByHashFetcher interface { type L1BlockRefByHashFetcher interface {
......
...@@ -73,25 +73,20 @@ func (ct *channelBankTestCase) Run(t *testing.T) { ...@@ -73,25 +73,20 @@ func (ct *channelBankTestCase) Run(t *testing.T) {
ct.fn(bt) ct.fn(bt)
} }
// format: <channelID-data>:<channelID-time>:<frame-number>:<content><optional-last-frame-marker "!"> // format: <channelID-data>:<frame-number>:<content><optional-last-frame-marker "!">
// example: "abc:123:0:helloworld!" // example: "abc:0:helloworld!"
type testFrame string type testFrame string
func (tf testFrame) ChannelID() ChannelID { func (tf testFrame) ChannelID() ChannelID {
parts := strings.Split(string(tf), ":") parts := strings.Split(string(tf), ":")
var chID ChannelID var chID ChannelID
copy(chID.Data[:], parts[0]) copy(chID[:], parts[0])
x, err := strconv.ParseUint(parts[1], 0, 64)
if err != nil {
panic(err)
}
chID.Time = x
return chID return chID
} }
func (tf testFrame) FrameNumber() uint64 { func (tf testFrame) FrameNumber() uint64 {
parts := strings.Split(string(tf), ":") parts := strings.Split(string(tf), ":")
frameNum, err := strconv.ParseUint(parts[2], 0, 64) frameNum, err := strconv.ParseUint(parts[1], 0, 64)
if err != nil { if err != nil {
panic(err) panic(err)
} }
...@@ -100,12 +95,12 @@ func (tf testFrame) FrameNumber() uint64 { ...@@ -100,12 +95,12 @@ func (tf testFrame) FrameNumber() uint64 {
func (tf testFrame) IsLast() bool { func (tf testFrame) IsLast() bool {
parts := strings.Split(string(tf), ":") parts := strings.Split(string(tf), ":")
return strings.HasSuffix(parts[3], "!") return strings.HasSuffix(parts[2], "!")
} }
func (tf testFrame) Content() []byte { func (tf testFrame) Content() []byte {
parts := strings.Split(string(tf), ":") parts := strings.Split(string(tf), ":")
return []byte(strings.TrimSuffix(parts[3], "!")) return []byte(strings.TrimSuffix(parts[2], "!"))
} }
func (tf testFrame) ToFrame() Frame { func (tf testFrame) ToFrame() Frame {
...@@ -138,12 +133,7 @@ func (bt *bankTestSetup) repeatStep(max int, outer int, outerClosed bool, err er ...@@ -138,12 +133,7 @@ func (bt *bankTestSetup) repeatStep(max int, outer int, outerClosed bool, err er
func (bt *bankTestSetup) repeatResetStep(max int, err error) { func (bt *bankTestSetup) repeatResetStep(max int, err error) {
require.Equal(bt.t, err, RepeatResetStep(bt.t, bt.cb.ResetStep, bt.l1, max)) require.Equal(bt.t, err, RepeatResetStep(bt.t, bt.cb.ResetStep, bt.l1, max))
} }
func (bt *bankTestSetup) assertProgressOpen() {
require.False(bt.t, bt.cb.progress.Closed)
}
func (bt *bankTestSetup) assertProgressClosed() {
require.True(bt.t, bt.cb.progress.Closed)
}
func (bt *bankTestSetup) assertOrigin(i int) { func (bt *bankTestSetup) assertOrigin(i int) {
require.Equal(bt.t, bt.cb.progress.Origin, bt.origins[i]) require.Equal(bt.t, bt.cb.progress.Origin, bt.origins[i])
} }
...@@ -153,8 +143,8 @@ func (bt *bankTestSetup) assertOriginTime(x uint64) { ...@@ -153,8 +143,8 @@ func (bt *bankTestSetup) assertOriginTime(x uint64) {
func (bt *bankTestSetup) expectChannel(data string) { func (bt *bankTestSetup) expectChannel(data string) {
bt.out.ExpectWriteChannel([]byte(data)) bt.out.ExpectWriteChannel([]byte(data))
} }
func (bt *bankTestSetup) expectL1RefByHash(i int) { func (bt *bankTestSetup) expectL1BlockRefByNumber(i int) {
bt.l1.ExpectL1BlockRefByHash(bt.origins[i].Hash, bt.origins[i], nil) bt.l1.ExpectL1BlockRefByNumber(bt.origins[i].Number, bt.origins[i], nil)
} }
func (bt *bankTestSetup) assertExpectations() { func (bt *bankTestSetup) assertExpectations() {
bt.l1.AssertExpectations(bt.t) bt.l1.AssertExpectations(bt.t)
...@@ -162,120 +152,83 @@ func (bt *bankTestSetup) assertExpectations() { ...@@ -162,120 +152,83 @@ func (bt *bankTestSetup) assertExpectations() {
bt.out.AssertExpectations(bt.t) bt.out.AssertExpectations(bt.t)
bt.out.ExpectedCalls = nil bt.out.ExpectedCalls = nil
} }
func (bt *bankTestSetup) logf(format string, args ...any) {
bt.t.Logf(format, args...)
}
func TestL1ChannelBank(t *testing.T) { func TestL1ChannelBank(t *testing.T) {
testCases := []channelBankTestCase{ testCases := []channelBankTestCase{
{ {
name: "time outs and buffering", name: "time outs and buffering",
originTimes: []uint64{101, 102, 105, 107, 109}, originTimes: []uint64{0, 1, 2, 3, 4, 5},
nextStartsAt: 3, // start next stage at 107 nextStartsAt: 3, // Start next stage at block #3
channelTimeout: 3, // 107-3 = 104, reset to next lower origin, thus 102 channelTimeout: 2, // Start at block #1
fn: func(bt *bankTestSetup) { fn: func(bt *bankTestSetup) {
bt.logf("reset to an origin that is timed out") bt.expectL1BlockRefByNumber(1)
bt.expectL1RefByHash(2) bt.repeatResetStep(10, nil)
bt.expectL1RefByHash(1) bt.ingestFrames("a:0:first") // will time out b/c not closed
bt.repeatResetStep(10, nil) // bank rewinds to origin pre-timeout
bt.assertExpectations()
bt.assertOrigin(1)
bt.assertOriginTime(102)
bt.logf("first step after reset should be EOF to start getting data")
bt.repeatStep(1, 1, false, nil)
bt.logf("read from there onwards, but drop content since we did not reach start origin yet")
bt.ingestFrames("a:98:0:too old") // timed out, can continue
bt.repeatStep(3, 1, false, nil)
bt.ingestFrames("b:99:0:just new enough!") // closed frame, can be read, but dropped
bt.repeatStep(3, 1, false, nil)
bt.logf("close origin 1") bt.repeatStep(10, 1, true, nil)
bt.repeatStep(2, 1, true, nil) bt.repeatStep(10, 2, false, nil)
bt.assertOrigin(1)
bt.assertProgressClosed()
bt.logf("open and close 2 without data")
bt.repeatStep(2, 2, false, nil)
bt.assertOrigin(2) bt.assertOrigin(2)
bt.assertProgressOpen()
bt.repeatStep(2, 2, true, nil)
bt.assertProgressClosed()
bt.logf("open 3, where we meet the next stage. Data isn't dropped anymore") bt.repeatStep(10, 2, true, nil)
bt.repeatStep(2, 3, false, nil) bt.repeatStep(10, 3, false, nil)
bt.assertOrigin(3) bt.assertOrigin(3)
bt.assertProgressOpen()
bt.assertOriginTime(107)
bt.ingestFrames("c:104:0:foobar")
bt.repeatStep(1, 3, false, nil)
bt.ingestFrames("d:104:0:other!")
bt.repeatStep(1, 3, false, nil)
bt.ingestFrames("e:105:0:time-out-later") // timed out when we get to 109
bt.repeatStep(1, 3, false, nil)
bt.ingestFrames("c:104:1:close!")
bt.expectChannel("foobarclose")
bt.expectChannel("other")
bt.repeatStep(3, 3, false, nil)
bt.assertExpectations()
bt.logf("close 3") bt.repeatStep(10, 3, true, nil)
bt.repeatStep(2, 3, true, nil) bt.repeatStep(10, 4, false, nil)
bt.assertProgressClosed() bt.assertOrigin(4)
bt.logf("open 4") // Properly closed channel
bt.expectChannel("time-out-later") // not closed, but processed after timeout bt.expectChannel("foobarclosed")
bt.repeatStep(3, 4, false, nil) bt.ingestFrames("b:0:foobar")
bt.assertExpectations() bt.ingestFrames("b:1:closed!")
bt.assertProgressOpen() bt.repeatStep(10, 4, true, nil)
bt.assertOriginTime(109)
bt.logf("data from 4")
bt.ingestFrames("f:108:0:hello!")
bt.expectChannel("hello")
bt.repeatStep(2, 4, false, nil)
bt.assertExpectations() bt.assertExpectations()
}, },
}, },
{ {
name: "duplicate frames", name: "duplicate frames",
originTimes: []uint64{101, 102}, originTimes: []uint64{0, 1, 2, 3, 4, 5},
nextStartsAt: 0, nextStartsAt: 3, // Start next stage at block #3
channelTimeout: 3, channelTimeout: 2, // Start at block #1c
fn: func(bt *bankTestSetup) { fn: func(bt *bankTestSetup) {
// don't do the whole setup process, just override where the stages are bt.expectL1BlockRefByNumber(1)
bt.cb.progress = Progress{Origin: bt.origins[0], Closed: false} bt.repeatResetStep(10, nil)
bt.out.progress = Progress{Origin: bt.origins[0], Closed: false} bt.ingestFrames("a:0:first") // will time out b/c not closed
bt.assertOriginTime(101) bt.repeatStep(10, 1, true, nil)
bt.repeatStep(10, 2, false, nil)
bt.assertOrigin(2)
bt.ingestFrames("x:102:0:foobar") // future frame is ignored when included too early bt.repeatStep(10, 2, true, nil)
bt.repeatStep(2, 0, false, nil) bt.repeatStep(10, 3, false, nil)
bt.assertOrigin(3)
bt.repeatStep(10, 3, true, nil)
bt.repeatStep(10, 4, false, nil)
bt.assertOrigin(4)
bt.ingestFrames("a:101:0:first") bt.ingestFrames("a:0:first")
bt.repeatStep(1, 0, false, nil) bt.repeatStep(1, 4, false, nil)
bt.ingestFrames("a:101:1:second") bt.ingestFrames("a:1:second")
bt.repeatStep(1, 0, false, nil) bt.repeatStep(1, 4, false, nil)
bt.ingestFrames("a:101:0:altfirst") // ignored as duplicate bt.ingestFrames("a:0:altfirst") // ignored as duplicate
bt.repeatStep(1, 0, false, nil) bt.repeatStep(1, 4, false, nil)
bt.ingestFrames("a:101:1:altsecond") // ignored as duplicate bt.ingestFrames("a:1:altsecond") // ignored as duplicate
bt.repeatStep(1, 0, false, nil) bt.repeatStep(1, 4, false, nil)
bt.ingestFrames("a:100:0:new") // different time, considered to be different channel bt.ingestFrames("b:0:new")
bt.repeatStep(1, 0, false, nil) bt.repeatStep(1, 4, false, nil)
// close origin 0 // close origin 4
bt.repeatStep(2, 0, true, nil) bt.repeatStep(2, 4, true, nil)
// open origin 1 // open origin 1
bt.repeatStep(2, 1, false, nil) bt.repeatStep(2, 5, false, nil)
bt.ingestFrames("a:100:1:hi!") // close the other one first, but blocked bt.ingestFrames("b:1:hi!") // close the other one first, but blocked
bt.repeatStep(1, 1, false, nil) bt.repeatStep(1, 5, false, nil)
bt.ingestFrames("a:101:2:!") // empty closing frame bt.ingestFrames("a:2:!") // empty closing frame
bt.expectChannel("firstsecond") bt.expectChannel("firstsecond")
bt.expectChannel("newhi") bt.expectChannel("newhi")
bt.repeatStep(3, 1, false, nil) bt.repeatStep(5, 5, false, nil)
bt.assertExpectations() bt.assertExpectations()
}, },
}, },
...@@ -293,7 +246,7 @@ func TestL1ChannelBank(t *testing.T) { ...@@ -293,7 +246,7 @@ func TestL1ChannelBank(t *testing.T) {
badTx := new(bytes.Buffer) badTx := new(bytes.Buffer)
badTx.WriteByte(DerivationVersion0) badTx.WriteByte(DerivationVersion0)
goodFrame := testFrame("a:101:0:helloworld!").ToFrame() goodFrame := testFrame("a:0:helloworld!").ToFrame()
if err := goodFrame.MarshalBinary(badTx); err != nil { if err := goodFrame.MarshalBinary(badTx); err != nil {
panic(fmt.Errorf("error in marshalling frame: %w", err)) panic(fmt.Errorf("error in marshalling frame: %w", err))
} }
......
...@@ -34,15 +34,13 @@ func (co *ChannelOut) ID() ChannelID { ...@@ -34,15 +34,13 @@ func (co *ChannelOut) ID() ChannelID {
return co.id return co.id
} }
func NewChannelOut(channelTime uint64) (*ChannelOut, error) { func NewChannelOut() (*ChannelOut, error) {
c := &ChannelOut{ c := &ChannelOut{
id: ChannelID{ id: ChannelID{}, // TODO: use GUID here instead of fully random data
Time: channelTime,
},
frame: 0, frame: 0,
offset: 0, offset: 0,
} }
_, err := rand.Read(c.id.Data[:]) _, err := rand.Read(c.id[:])
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -57,15 +55,14 @@ func NewChannelOut(channelTime uint64) (*ChannelOut, error) { ...@@ -57,15 +55,14 @@ func NewChannelOut(channelTime uint64) (*ChannelOut, error) {
} }
// TODO: reuse ChannelOut for performance // TODO: reuse ChannelOut for performance
func (co *ChannelOut) Reset(channelTime uint64) error { func (co *ChannelOut) Reset() error {
co.frame = 0 co.frame = 0
co.offset = 0 co.offset = 0
co.buf.Reset() co.buf.Reset()
co.scratch.Reset() co.scratch.Reset()
co.compress.Reset(&co.buf) co.compress.Reset(&co.buf)
co.closed = false co.closed = false
co.id.Time = channelTime _, err := rand.Read(co.id[:])
_, err := rand.Read(co.id.Data[:])
if err != nil { if err != nil {
return err return err
} }
......
...@@ -17,9 +17,7 @@ const MaxFrameLen = 1_000_000 ...@@ -17,9 +17,7 @@ const MaxFrameLen = 1_000_000
// //
// frame = channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last // frame = channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last
// //
// channel_id = random ++ timestamp // channel_id = bytes16
// random = bytes32
// timestamp = uint64
// frame_number = uint16 // frame_number = uint16
// frame_data_length = uint32 // frame_data_length = uint32
// frame_data = bytes // frame_data = bytes
...@@ -36,13 +34,10 @@ type Frame struct { ...@@ -36,13 +34,10 @@ type Frame struct {
// It returns any errors encountered while writing, but // It returns any errors encountered while writing, but
// generally expects the writer very rarely fail. // generally expects the writer very rarely fail.
func (f *Frame) MarshalBinary(w io.Writer) error { func (f *Frame) MarshalBinary(w io.Writer) error {
_, err := w.Write(f.ID.Data[:]) _, err := w.Write(f.ID[:])
if err != nil { if err != nil {
return err return err
} }
if err := binary.Write(w, binary.BigEndian, f.ID.Time); err != nil {
return err
}
if err := binary.Write(w, binary.BigEndian, f.FrameNumber); err != nil { if err := binary.Write(w, binary.BigEndian, f.FrameNumber); err != nil {
return err return err
} }
...@@ -74,13 +69,9 @@ type ByteReader interface { ...@@ -74,13 +69,9 @@ type ByteReader interface {
// If `r` fails a read, it returns the error from the reader // If `r` fails a read, it returns the error from the reader
// The reader will be left in a partially read state. // The reader will be left in a partially read state.
func (f *Frame) UnmarshalBinary(r ByteReader) error { func (f *Frame) UnmarshalBinary(r ByteReader) error {
if _, err := io.ReadFull(r, f.ID.Data[:]); err != nil { if _, err := io.ReadFull(r, f.ID[:]); err != nil {
return fmt.Errorf("error reading ID: %w", err) return fmt.Errorf("error reading ID: %w", err)
} }
if err := binary.Read(r, binary.BigEndian, &f.ID.Time); err != nil {
return fmt.Errorf("error reading ID time: %w", err)
}
if err := binary.Read(r, binary.BigEndian, &f.FrameNumber); err != nil { if err := binary.Read(r, binary.BigEndian, &f.FrameNumber); err != nil {
return fmt.Errorf("error reading frame number: %w", err) return fmt.Errorf("error reading frame number: %w", err)
} }
......
package derive package derive
import ( import (
"encoding/hex"
"errors" "errors"
"fmt" "fmt"
"strconv"
) )
// count the tagging info as 200 in terms of buffer size. // count the tagging info as 200 in terms of buffer size.
...@@ -20,50 +18,17 @@ const MaxChannelBankSize = 100_000_000 ...@@ -20,50 +18,17 @@ const MaxChannelBankSize = 100_000_000
// DuplicateErr is returned when a newly read frame is already known // DuplicateErr is returned when a newly read frame is already known
var DuplicateErr = errors.New("duplicate frame") var DuplicateErr = errors.New("duplicate frame")
// ChannelIDDataSize defines the length of the channel ID data part // ChannelIDLength defines the length of the channel IDs
const ChannelIDDataSize = 32 const ChannelIDLength = 16
// ChannelID identifies a "channel" a stream encoding a sequence of L2 information. // ChannelID is an opaque identifier for a channel. It is 128 bits to be globally unique.
// A channelID is part random data (this may become a hash commitment to restrict who opens which channel), type ChannelID [ChannelIDLength]byte
// and part timestamp. The timestamp invalidates the ID,
// to ensure channels cannot be re-opened after timeout, or opened too soon.
//
// The ChannelID type is flat and can be used as map key.
type ChannelID struct {
Data [ChannelIDDataSize]byte
Time uint64
}
func (id ChannelID) String() string { func (id ChannelID) String() string {
return fmt.Sprintf("%x:%d", id.Data[:], id.Time) return fmt.Sprintf("%x", id[:])
}
func (id ChannelID) MarshalText() ([]byte, error) {
return []byte(id.String()), nil
}
func (id *ChannelID) UnmarshalText(text []byte) error {
if id == nil {
return errors.New("cannot unmarshal text into nil Channel ID")
}
if len(text) < ChannelIDDataSize+1 {
return fmt.Errorf("channel ID too short: %d", len(text))
}
if _, err := hex.Decode(id.Data[:], text[:ChannelIDDataSize]); err != nil {
return fmt.Errorf("failed to unmarshal hex data part of channel ID: %v", err)
}
if c := text[ChannelIDDataSize*2]; c != ':' {
return fmt.Errorf("expected : separator in channel ID, but got %d", c)
}
v, err := strconv.ParseUint(string(text[ChannelIDDataSize*2+1:]), 10, 64)
if err != nil {
return fmt.Errorf("failed to unmarshal decimal time part of channel ID: %v", err)
}
id.Time = v
return nil
} }
// TerminalString implements log.TerminalStringer, formatting a string for console output during logging. // TerminalString implements log.TerminalStringer, formatting a string for console output during logging.
func (id ChannelID) TerminalString() string { func (id ChannelID) TerminalString() string {
return fmt.Sprintf("%x..%x-%d", id.Data[:3], id.Data[29:], id.Time) return fmt.Sprintf("%x..%x", id[:3], id[13:])
} }
...@@ -32,7 +32,7 @@ type Config struct { ...@@ -32,7 +32,7 @@ type Config struct {
MaxSequencerDrift uint64 `json:"max_sequencer_drift"` MaxSequencerDrift uint64 `json:"max_sequencer_drift"`
// Number of epochs (L1 blocks) per sequencing window, including the epoch L1 origin block itself // Number of epochs (L1 blocks) per sequencing window, including the epoch L1 origin block itself
SeqWindowSize uint64 `json:"seq_window_size"` SeqWindowSize uint64 `json:"seq_window_size"`
// Number of seconds (w.r.t. L1 time) that a frame can be valid when included in L1 // Number of L1 blocks between when a channel can be opened and when it must be closed by.
ChannelTimeout uint64 `json:"channel_timeout"` ChannelTimeout uint64 `json:"channel_timeout"`
// Required to verify L1 signatures // Required to verify L1 signatures
L1ChainID *big.Int `json:"l1_chain_id"` L1ChainID *big.Int `json:"l1_chain_id"`
......
...@@ -313,38 +313,26 @@ A [channel frame][g-channel-frame] is encoded as: ...@@ -313,38 +313,26 @@ A [channel frame][g-channel-frame] is encoded as:
```text ```text
frame = channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last frame = channel_id ++ frame_number ++ frame_data_length ++ frame_data ++ is_last
channel_id = random ++ timestamp channel_id = bytes16
random = bytes32
timestamp = uint64
frame_number = uint16 frame_number = uint16
frame_data_length = uint32 frame_data_length = uint32
frame_data = bytes frame_data = bytes
is_last = bool is_last = bool
``` ```
Where `uint64`, `uint32` and `uint16` are all big-endian unsigned integers. Type names should be interpreted to and Where `uint32` and `uint16` are all big-endian unsigned integers. Type names should be interpreted to and
encoded according to [the Solidity ABI][solidity-abi]. encoded according to [the Solidity ABI][solidity-abi].
[solidity-abi]: https://docs.soliditylang.org/en/v0.8.16/abi-spec.html [solidity-abi]: https://docs.soliditylang.org/en/v0.8.16/abi-spec.html
All data in a frame is fixed-size, except the `frame_data`. The fixed overhead is `32 + 8 + 2 + 4 + 1 = 47 bytes`. All data in a frame is fixed-size, except the `frame_data`. The fixed overhead is `16 + 2 + 4 + 1 = 23 bytes`.
Fixed-size frame metadata avoids a circular dependency with the target total data length, Fixed-size frame metadata avoids a circular dependency with the target total data length,
to simplify packing of frames with varying content length. to simplify packing of frames with varying content length.
where: where:
- `channel_id` uniquely identifies a channel as the concatenation of a random value and a timestamp - `channel_id` is an opaque identifier for the channel. It should not be reused and is suggested to be random; however,
- `random` is a random value such that two channels with different batches should have a different random value outside of timeout rules, it is not checked for validity
- `timestamp` is the time at which the channel was created (UNIX time in seconds)
- The ID includes both the random value and the timestamp, in order to prevent a malicious sequencer from reusing the
random value after the channel has [timed out][g-channel-timeout] (refer to the [batcher
specification][batcher-spec] to learn more about channel timeouts). This will also allow us substitute `random` by a
hash commitment to the batches, should we want to do so in the future.
- Frames with a channel ID whose timestamp are higher than that of the L1 block on which the frame appears must be
ignored. Note that L1 nodes cannot easily manipulate the L1 block timestamp: L1 nodes have a soft constraint to
ignore blocks whose timestamps that are ahead of the wallclock time by a certain margin. (A soft constraint is not a
consensus rule — nodes will accept such blocks in the canonical chain but will not attempt to build directly on
them.) This issue goes away with Proof of Stake, where timestamps are determined by [L1 time slots][g-time-slot].
- `frame_number` identifies the index of the frame within the channel - `frame_number` identifies the index of the frame within the channel
- `frame_data_length` is the length of `frame_data` in bytes. It is capped to 1,000,000 bytes. - `frame_data_length` is the length of `frame_data` in bytes. It is capped to 1,000,000 bytes.
- `frame_data` is a sequence of bytes belonging to the channel, logically after the bytes from the previous frames - `frame_data` is a sequence of bytes belonging to the channel, logically after the bytes from the previous frames
...@@ -505,7 +493,8 @@ However, our current implementation doesn't support streaming decompression, so ...@@ -505,7 +493,8 @@ However, our current implementation doesn't support streaming decompression, so
- We have received all frames in the channel: i.e. we received the last frame in the channel (`is_last == 1`) and every - We have received all frames in the channel: i.e. we received the last frame in the channel (`is_last == 1`) and every
frame with a lower number. frame with a lower number.
- The channel has timed out (in which we case we read all contiguous sequential frames from the start of the channel). - The channel has timed out (in which we case we read all contiguous sequential frames from the start of the channel).
- A channel is considered to be *timed out* if `currentL1Block.timestamp > channeld_id.timestamp + CHANNEL_TIMEOUT`. - A channel is considered to be *timed out* if
`currentL1Block.number > channeld_id.starting_l1_number + CHANNEL_TIMEOUT`.
- where `currentL1Block` is the L1 block maintained by this stage, which is the most recent L1 block whose frames - where `currentL1Block` is the L1 block maintained by this stage, which is the most recent L1 block whose frames
have been added to the channel bank. have been added to the channel bank.
- The channel is pruned out of the channel bank (see below), in which case it isn't passed to the further stages. - The channel is pruned out of the channel bank (see below), in which case it isn't passed to the further stages.
...@@ -945,12 +934,12 @@ epoch sequencing window, assuming it is in the same epoch as `safeL2Head`). ...@@ -945,12 +934,12 @@ epoch sequencing window, assuming it is in the same epoch as `safeL2Head`).
> could post a batch for the L2 block `safeL2Head + 1` on L1 block `safeL2Head.1Origin`. > could post a batch for the L2 block `safeL2Head + 1` on L1 block `safeL2Head.1Origin`.
Keeping things worst case, `safeL2Head.l1Origin` would also be the last allowable block for the frame to land. The Keeping things worst case, `safeL2Head.l1Origin` would also be the last allowable block for the frame to land. The
allowed time range for frames within a channel to land on L1 is `[channel_id.timestamp, channel_id.timestamp + allowed time range for frames within a channel to land on L1 is `[channel_id.number, channel_id.number +
CHANNEL_TIMEOUT]`. The allowed L1 block range for these frames are any L1 block whose timestamp falls inside this time CHANNEL_TIMEOUT]`. The allowed L1 block range for these frames are any L1 block whose number falls inside this block
range. range.
Therefore, to be safe, we can reset the L1 head of Channel Buffering to the oldest L1 block whose timestamp is higher Therefore, to be safe, we can reset the L1 head of Channel Buffering to the L1 block whose number is
than `safeL2Head.l1Origin.timestamp - CHANNEL_TIMEOUT`. `safeL2Head.l1Origin.number - CHANNEL_TIMEOUT`.
> **Note** The above is what the implementation currently does. > **Note** The above is what the implementation currently does.
...@@ -958,7 +947,7 @@ In reality it's only strictly necessary to reset the oldest L1 block whose times ...@@ -958,7 +947,7 @@ In reality it's only strictly necessary to reset the oldest L1 block whose times
`channel_id.timestamp` found in the batcher transaction that is not older than `safeL2Head.l1Origin.timestamp - `channel_id.timestamp` found in the batcher transaction that is not older than `safeL2Head.l1Origin.timestamp -
CHANNEL_TIMEOUT`. CHANNEL_TIMEOUT`.
We define `CHANNEL_TIMEOUT = 600`, i.e. 10 hours. We define `CHANNEL_TIMEOUT = 50`, i.e. 10mins
> **TODO** does `CHANNEL_TIMEOUT` have a relationship with `SWS`? > **TODO** does `CHANNEL_TIMEOUT` have a relationship with `SWS`?
> >
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment