Commit 6ae28f56 authored by Sebastian Stammler's avatar Sebastian Stammler Committed by GitHub

op-node/rollup/derive: Add Holocene Channel Stage (#12334)

This only adds the new stage, but doesn't wire it into the derivation
pipeline yet.
parent f0d77382
......@@ -94,7 +94,7 @@ func writeChannel(ch ChannelWithMetadata, filename string) error {
// from the channel. Returns a ChannelWithMetadata struct containing all the relevant data.
func ProcessFrames(cfg Config, rollupCfg *rollup.Config, id derive.ChannelID, frames []FrameWithMetadata) ChannelWithMetadata {
spec := rollup.NewChainSpec(rollupCfg)
ch := derive.NewChannel(id, eth.L1BlockRef{Number: frames[0].InclusionBlock})
ch := derive.NewChannel(id, eth.L1BlockRef{Number: frames[0].InclusionBlock}, rollupCfg.IsHolocene(frames[0].Timestamp))
invalidFrame := false
for _, frame := range frames {
......
......@@ -18,13 +18,15 @@ const (
)
// A Channel is a set of batches that are split into at least one, but possibly multiple frames.
// Frames are allowed to be ingested out of order.
// Frames are allowed to be ingested out of order, unless the channel is set to follow Holocene
// rules.
// Each frame is ingested one by one. Once a frame with `closed` is added to the channel, the
// channel may mark itself as ready for reading once all intervening frames have been added
type Channel struct {
// id of the channel
id ChannelID
openBlock eth.L1BlockRef
id ChannelID
openBlock eth.L1BlockRef
requireInOrder bool
// estimated memory size, used to drop the channel if we have too much data
size uint64
......@@ -45,11 +47,14 @@ type Channel struct {
highestL1InclusionBlock eth.L1BlockRef
}
func NewChannel(id ChannelID, openBlock eth.L1BlockRef) *Channel {
// NewChannel creates a new channel with the given id and openening block. If requireInOrder is
// true, frames must be added in order.
func NewChannel(id ChannelID, openBlock eth.L1BlockRef, requireInOrder bool) *Channel {
return &Channel{
id: id,
inputs: make(map[uint64]Frame),
openBlock: openBlock,
id: id,
inputs: make(map[uint64]Frame),
openBlock: openBlock,
requireInOrder: requireInOrder,
}
}
......@@ -70,18 +75,22 @@ func (ch *Channel) AddFrame(frame Frame, l1InclusionBlock eth.L1BlockRef) error
if ch.closed && frame.FrameNumber >= ch.endFrameNumber {
return fmt.Errorf("frame number (%d) is greater than or equal to end frame number (%d) of a closed channel", frame.FrameNumber, ch.endFrameNumber)
}
if ch.requireInOrder && int(frame.FrameNumber) != len(ch.inputs) {
return fmt.Errorf("frame out of order, expected %d, got %d", len(ch.inputs), frame.FrameNumber)
}
// Guaranteed to succeed. Now update internal state
if frame.IsLast {
ch.endFrameNumber = frame.FrameNumber
ch.closed = true
}
// Prune frames with a number higher than the closing frame number when we receive a closing frame
// Prune frames with a number higher than the closing frame number when we receive a closing frame.
// Note that the following condition is guaranteed to never be true with strict Holocene ordering.
if frame.IsLast && ch.endFrameNumber < ch.highestFrameNumber {
// Do a linear scan over saved inputs instead of ranging over ID numbers
for id, prunedFrame := range ch.inputs {
if id >= uint64(ch.endFrameNumber) {
delete(ch.inputs, id)
for idx, prunedFrame := range ch.inputs {
if idx >= uint64(ch.endFrameNumber) {
delete(ch.inputs, idx)
}
ch.size -= frameSize(prunedFrame)
}
......@@ -119,6 +128,10 @@ func (ch *Channel) Size() uint64 {
return ch.size
}
func (ch *Channel) ID() ChannelID {
return ch.id
}
// IsReady returns true iff the channel is ready to be read.
func (ch *Channel) IsReady() bool {
// Must see the last frame before the channel is ready to be read
......
package derive
import (
"context"
"errors"
"io"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
)
// ChannelAssembler assembles frames into a raw channel. It replaces the ChannelBank since Holocene.
type ChannelAssembler struct {
log log.Logger
spec ChannelStageSpec
metrics Metrics
channel *Channel
prev NextFrameProvider
}
var _ ResettableStage = (*ChannelAssembler)(nil)
type ChannelStageSpec interface {
ChannelTimeout(t uint64) uint64
MaxRLPBytesPerChannel(t uint64) uint64
}
// NewChannelStage creates a Holocene ChannelStage.
// It must only be used for derivation from Holocene activation.
func NewChannelStage(log log.Logger, spec ChannelStageSpec, prev NextFrameProvider, m Metrics) *ChannelAssembler {
return &ChannelAssembler{
log: log,
spec: spec,
metrics: m,
prev: prev,
}
}
func (ca *ChannelAssembler) Log() log.Logger {
return ca.log.New("stage", "channel", "origin", ca.Origin())
}
func (ca *ChannelAssembler) Origin() eth.L1BlockRef {
return ca.prev.Origin()
}
func (ca *ChannelAssembler) Reset(context.Context, eth.L1BlockRef, eth.SystemConfig) error {
ca.resetChannel()
return io.EOF
}
func (ca *ChannelAssembler) resetChannel() {
ca.channel = nil
}
// Returns whether the current staging channel is timed out. Panics if there's no current channel.
func (ca *ChannelAssembler) channelTimedOut() bool {
return ca.channel.OpenBlockNumber()+ca.spec.ChannelTimeout(ca.Origin().Time) < ca.Origin().Number
}
func (ca *ChannelAssembler) NextData(ctx context.Context) ([]byte, error) {
if ca.channel != nil && ca.channelTimedOut() {
ca.metrics.RecordChannelTimedOut()
ca.resetChannel()
}
lgr := ca.Log()
origin := ca.Origin()
// Note that if the current channel was already completed, we would have forwarded its data
// already. So we start by reading in frames.
if ca.channel != nil && ca.channel.IsReady() {
return nil, NewCriticalError(errors.New("unexpected ready channel"))
}
// Ingest frames until we either hit an error (including io.EOF and NotEnoughData) or complete a
// channel.
// Note that we ingest the frame queue in a loop instead of returning NotEnoughData after a
// single frame ingestion, because it is guaranteed that the total size of new frames ingested
// per L1 origin block is limited by the size of batcher transactions in that block and it
// doesn't make a difference in computational effort if these are many small frames or one large
// frame of that size. Plus, this is really just moving data around, no decompression etc. yet.
for {
frame, err := ca.prev.NextFrame(ctx)
if err != nil { // includes io.EOF; a last frame broke the loop already
return nil, err
}
// first frames always start a new channel, discarding an existing one
if frame.FrameNumber == 0 {
ca.metrics.RecordHeadChannelOpened()
ca.channel = NewChannel(frame.ID, origin, true)
}
if frame.FrameNumber > 0 && ca.channel == nil {
lgr.Warn("dropping non-first frame without channel",
"frame_channel", frame.ID, "frame_number", frame.FrameNumber)
continue // read more frames
}
// Catches Holocene ordering rules. Note that even though the frame queue is guaranteed to
// only hold ordered frames in the current queue, it cannot guarantee this w.r.t. frames
// that already got dequeued. So ordering has to be checked here again.
if err := ca.channel.AddFrame(frame, origin); err != nil {
lgr.Warn("failed to add frame to channel",
"channel", ca.channel.ID(), "frame_channel", frame.ID,
"frame_number", frame.FrameNumber, "err", err)
continue // read more frames
}
if ca.channel.Size() > ca.spec.MaxRLPBytesPerChannel(ca.Origin().Time) {
lgr.Warn("dropping oversized channel",
"channel", ca.channel.ID(), "frame_number", frame.FrameNumber)
ca.resetChannel()
continue // read more frames
}
ca.metrics.RecordFrame()
if frame.IsLast {
break // forward current complete channel
}
}
ch := ca.channel
// Note that if we exit the frame ingestion loop, we're guaranteed to have a ready channel.
if ch == nil || !ch.IsReady() {
return nil, NewCriticalError(errors.New("unexpected non-ready channel"))
}
ca.resetChannel()
r := ch.Reader()
return io.ReadAll(r)
}
package derive
import (
"context"
"io"
"log/slog"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
rolluptest "github.com/ethereum-optimism/optimism/op-node/rollup/test"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
func TestChannelStage_NextData(t *testing.T) {
for _, tc := range []struct {
desc string
frames [][]testFrame
expErr []error
expData []string
expChID []string
rlpOverride *uint64
}{
{
desc: "simple",
frames: [][]testFrame{
{"a:0:first!"},
},
expErr: []error{nil},
expData: []string{"first"},
expChID: []string{""},
},
{
desc: "simple-two",
frames: [][]testFrame{
{"a:0:first", "a:1:second!"},
},
expErr: []error{nil},
expData: []string{"firstsecond"},
expChID: []string{""},
},
{
desc: "drop-other",
frames: [][]testFrame{
{"a:0:first", "b:1:foo"},
{"a:1:second", "c:1:bar!"},
{"a:2:third!"},
},
expErr: []error{io.EOF, io.EOF, nil},
expData: []string{"", "", "firstsecondthird"},
expChID: []string{"a", "a", ""},
},
{
desc: "drop-non-first",
frames: [][]testFrame{
{"a:1:foo"},
},
expErr: []error{io.EOF},
expData: []string{""},
expChID: []string{""},
},
{
desc: "first-discards",
frames: [][]testFrame{
{"b:0:foo"},
{"a:0:first!"},
},
expErr: []error{io.EOF, nil},
expData: []string{"", "first"},
expChID: []string{"b", ""},
},
{
desc: "already-closed",
frames: [][]testFrame{
{"a:0:foo"},
{"a:1:bar!", "a:2:baz!"},
},
expErr: []error{io.EOF, nil},
expData: []string{"", "foobar"},
expChID: []string{"a", ""},
},
{
desc: "max-size",
frames: [][]testFrame{
{"a:0:0123456789!"},
},
expErr: []error{nil},
expData: []string{"0123456789"},
expChID: []string{""},
rlpOverride: ptr[uint64](frameOverhead + 10),
},
{
desc: "oversized",
frames: [][]testFrame{
{"a:0:0123456789x!"},
},
expErr: []error{io.EOF},
expData: []string{""},
expChID: []string{""},
rlpOverride: ptr[uint64](frameOverhead + 10),
},
} {
t.Run(tc.desc, func(t *testing.T) {
fq := &fakeChannelBankInput{}
lgr := testlog.Logger(t, slog.LevelWarn)
spec := &rolluptest.ChainSpec{
ChainSpec: rollup.NewChainSpec(&rollup.Config{}),
MaxRLPBytesPerChannelOverride: tc.rlpOverride,
}
cs := NewChannelStage(lgr, spec, fq, metrics.NoopMetrics)
for i, fs := range tc.frames {
fq.AddFrames(fs...)
data, err := cs.NextData(context.Background())
require.Equal(t, tc.expData[i], string(data))
require.ErrorIs(t, tc.expErr[i], err)
// invariant: never holds a ready channel
require.True(t, cs.channel == nil || !cs.channel.IsReady())
cid := tc.expChID[i]
if cid == "" {
require.Nil(t, cs.channel)
} else {
require.Equal(t, strChannelID(cid), cs.channel.ID())
}
}
// final call should always be io.EOF after exhausting frame queue
data, err := cs.NextData(context.Background())
require.Nil(t, data)
require.Equal(t, io.EOF, err)
})
}
}
func TestChannelStage_NextData_Timeout(t *testing.T) {
require := require.New(t)
fq := &fakeChannelBankInput{}
lgr := testlog.Logger(t, slog.LevelWarn)
spec := rollup.NewChainSpec(&rollup.Config{GraniteTime: ptr(uint64(0))}) // const channel timeout
cs := NewChannelStage(lgr, spec, fq, metrics.NoopMetrics)
fq.AddFrames("a:0:foo")
data, err := cs.NextData(context.Background())
require.Nil(data)
require.Equal(io.EOF, err)
require.NotNil(cs.channel)
require.Equal(strChannelID("a"), cs.channel.ID())
// move close to timeout
fq.origin.Number = spec.ChannelTimeout(0)
fq.AddFrames("a:1:bar")
data, err = cs.NextData(context.Background())
require.Nil(data)
require.Equal(io.EOF, err)
require.NotNil(cs.channel)
require.Equal(strChannelID("a"), cs.channel.ID())
// timeout channel by moving origin past timeout
fq.origin.Number = spec.ChannelTimeout(0) + 1
fq.AddFrames("a:2:baz!")
data, err = cs.NextData(context.Background())
require.Nil(data)
require.Equal(io.EOF, err)
require.Nil(cs.channel)
}
......@@ -89,7 +89,7 @@ func (cb *ChannelBank) IngestFrame(f Frame) {
cb.metrics.RecordHeadChannelOpened()
}
// create new channel if it doesn't exist yet
currentCh = NewChannel(f.ID, origin)
currentCh = NewChannel(f.ID, origin, false)
cb.channels[f.ID] = currentCh
cb.channelQueue = append(cb.channelQueue, f.ID)
log.Info("created new channel")
......
......@@ -30,6 +30,9 @@ func (f *fakeChannelBankInput) Origin() eth.L1BlockRef {
}
func (f *fakeChannelBankInput) NextFrame(_ context.Context) (Frame, error) {
if len(f.data) == 0 {
return Frame{}, io.EOF
}
out := f.data[0]
f.data = f.data[1:]
return out.frame, out.err
......@@ -58,8 +61,12 @@ type testFrame string
func (tf testFrame) ChannelID() ChannelID {
parts := strings.Split(string(tf), ":")
return strChannelID(parts[0])
}
func strChannelID(s string) ChannelID {
var chID ChannelID
copy(chID[:], parts[0])
copy(chID[:], s)
return chID
}
......@@ -98,7 +105,6 @@ func TestChannelBankSimple(t *testing.T) {
input := &fakeChannelBankInput{origin: a}
input.AddFrames("a:0:first", "a:2:third!")
input.AddFrames("a:1:second")
input.AddFrame(Frame{}, io.EOF)
cfg := &rollup.Config{ChannelTimeoutBedrock: 10}
......@@ -142,7 +148,6 @@ func TestChannelBankInterleavedPreCanyon(t *testing.T) {
input.AddFrames("b:1:deux", "a:2:third!")
input.AddFrames("b:0:premiere")
input.AddFrames("a:1:second")
input.AddFrame(Frame{}, io.EOF)
cfg := &rollup.Config{ChannelTimeoutBedrock: 10, CanyonTime: nil}
......@@ -206,7 +211,6 @@ func TestChannelBankInterleaved(t *testing.T) {
input.AddFrames("b:1:deux", "a:2:third!")
input.AddFrames("b:0:premiere")
input.AddFrames("a:1:second")
input.AddFrame(Frame{}, io.EOF)
ct := uint64(0)
cfg := &rollup.Config{ChannelTimeoutBedrock: 10, CanyonTime: &ct}
......@@ -267,7 +271,6 @@ func TestChannelBankDuplicates(t *testing.T) {
input.AddFrames("a:0:first", "a:2:third!")
input.AddFrames("a:0:altfirst", "a:2:altthird!")
input.AddFrames("a:1:second")
input.AddFrame(Frame{}, io.EOF)
cfg := &rollup.Config{ChannelTimeoutBedrock: 10}
......
......@@ -450,7 +450,7 @@ func testSpanChannelOut_MaxBlocksPerSpanBatch(t *testing.T, tt maxBlocksTest) {
require.NoError(t, frame.UnmarshalBinary(&frameBuf))
require.True(t, frame.IsLast)
spec := rollup.NewChainSpec(&rollupCfg)
ch := NewChannel(frame.ID, l1Origin)
ch := NewChannel(frame.ID, l1Origin, false)
require.False(t, ch.IsReady())
require.NoError(t, ch.AddFrame(frame, l1Origin))
require.True(t, ch.IsReady())
......
......@@ -18,12 +18,13 @@ type frameValidityTC struct {
frames []Frame
shouldErr []bool
sizes []uint64
holocene bool
}
func (tc *frameValidityTC) Run(t *testing.T) {
id := [16]byte{0xff}
block := eth.L1BlockRef{}
ch := NewChannel(id, block)
ch := NewChannel(id, block, tc.holocene)
if len(tc.frames) != len(tc.shouldErr) || len(tc.frames) != len(tc.sizes) {
t.Errorf("lengths should be the same. frames: %d, shouldErr: %d, sizes: %d", len(tc.frames), len(tc.shouldErr), len(tc.sizes))
......@@ -32,9 +33,9 @@ func (tc *frameValidityTC) Run(t *testing.T) {
for i, frame := range tc.frames {
err := ch.AddFrame(frame, block)
if tc.shouldErr[i] {
require.NotNil(t, err)
require.Error(t, err)
} else {
require.Nil(t, err)
require.NoError(t, err)
}
require.Equal(t, tc.sizes[i], ch.Size())
}
......@@ -105,6 +106,36 @@ func TestFrameValidity(t *testing.T) {
shouldErr: []bool{false, false},
sizes: []uint64{207, 411},
},
{
name: "holocene non first",
holocene: true,
frames: []Frame{
{ID: id, FrameNumber: 2, Data: []byte("four")},
},
shouldErr: []bool{true},
sizes: []uint64{0},
},
{
name: "holocene out of order",
holocene: true,
frames: []Frame{
{ID: id, FrameNumber: 0, Data: []byte("four")},
{ID: id, FrameNumber: 2, Data: []byte("seven__")},
},
shouldErr: []bool{false, true},
sizes: []uint64{204, 204},
},
{
name: "holocene in order",
holocene: true,
frames: []Frame{
{ID: id, FrameNumber: 0, Data: []byte("four")},
{ID: id, FrameNumber: 1, Data: []byte("seven__")},
{ID: id, FrameNumber: 2, IsLast: true, Data: []byte("2_")},
},
shouldErr: []bool{false, false, false},
sizes: []uint64{204, 411, 613},
},
}
for _, tc := range testCases {
......
package test
import "github.com/ethereum-optimism/optimism/op-node/rollup"
// ChainSpec wraps a *rollup.ChainSpec, allowing to optionally override individual values,
// otherwise just returning the underlying ChainSpec's values.
type ChainSpec struct {
*rollup.ChainSpec
MaxRLPBytesPerChannelOverride *uint64 // MaxRLPBytesPerChannel override
}
func (cs *ChainSpec) MaxRLPBytesPerChannel(t uint64) uint64 {
if o := cs.MaxRLPBytesPerChannelOverride; o != nil {
return *o
}
return cs.ChainSpec.MaxRLPBytesPerChannel(t)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment