Commit d8e30b15 authored by Joshua Gutow's avatar Joshua Gutow

op-node: Add frame queue to enable frame by frame processing

This changes the channel bank to match the specs that it should
ingest a single frame at a time.
parent 39d4f1d9
......@@ -10,8 +10,8 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type NextDataProvider interface {
NextData(ctx context.Context) ([]byte, error)
type NextFrameProvider interface {
NextFrame(ctx context.Context) (Frame, error)
Origin() eth.L1BlockRef
}
......@@ -34,14 +34,14 @@ type ChannelBank struct {
channels map[ChannelID]*Channel // channels by ID
channelQueue []ChannelID // channels in FIFO order
prev NextDataProvider
prev NextFrameProvider
fetcher L1Fetcher
}
var _ ResetableStage = (*ChannelBank)(nil)
// NewChannelBank creates a ChannelBank, which should be Reset(origin) before use.
func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextDataProvider, fetcher L1Fetcher) *ChannelBank {
func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextFrameProvider, fetcher L1Fetcher) *ChannelBank {
return &ChannelBank{
log: log,
cfg: cfg,
......@@ -73,43 +73,34 @@ func (cb *ChannelBank) prune() {
}
// IngestData adds new L1 data to the channel bank.
// Read() should be called repeatedly first, until everything has been read, before adding new data.\
func (cb *ChannelBank) IngestData(data []byte) {
// Read() should be called repeatedly first, until everything has been read, before adding new data.
func (cb *ChannelBank) IngestFrame(f Frame) {
origin := cb.Origin()
cb.log.Debug("channel bank got new data", "origin", origin, "data_len", len(data))
log := log.New("origin", origin, "channel", f.ID, "length", len(f.Data), "frame_number", f.FrameNumber)
log.Debug("channel bank got new data")
currentCh, ok := cb.channels[f.ID]
if !ok {
// create new channel if it doesn't exist yet
currentCh = NewChannel(f.ID, origin)
cb.channels[f.ID] = currentCh
cb.channelQueue = append(cb.channelQueue, f.ID)
}
frames, err := ParseFrames(data)
if err != nil {
cb.log.Warn("malformed frame", "err", err)
// check if the channel is not timed out
if currentCh.OpenBlockNumber()+cb.cfg.ChannelTimeout < origin.Number {
log.Warn("channel is timed out, ignore frame")
return
}
// Process each frame
for _, f := range frames {
currentCh, ok := cb.channels[f.ID]
if !ok {
// create new channel if it doesn't exist yet
currentCh = NewChannel(f.ID, origin)
cb.channels[f.ID] = currentCh
cb.channelQueue = append(cb.channelQueue, f.ID)
}
// check if the channel is not timed out
if currentCh.OpenBlockNumber()+cb.cfg.ChannelTimeout < origin.Number {
cb.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber)
continue
}
cb.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.AddFrame(f, origin); err != nil {
cb.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
continue
}
// Prune after the frame is loaded.
// TODO: Ingest a single frame at a time (must enable reads before ingesting the data / after this prune.)
cb.prune()
log.Trace("ingesting frame")
if err := currentCh.AddFrame(f, origin); err != nil {
log.Warn("failed to ingest frame into channel", "err", err)
return
}
// Prune after the frame is loaded.
cb.prune()
}
// Read the raw data of the first channel, if it's timed-out or closed.
......@@ -157,12 +148,12 @@ func (cb *ChannelBank) NextData(ctx context.Context) ([]byte, error) {
}
// Then load data into the channel bank
if data, err := cb.prev.NextData(ctx); err == io.EOF {
if frame, err := cb.prev.NextFrame(ctx); err == io.EOF {
return nil, io.EOF
} else if err != nil {
return nil, err
} else {
cb.IngestData(data)
cb.IngestFrame(frame)
return nil, NotEnoughData
}
}
......
package derive
import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"strconv"
......@@ -21,8 +19,8 @@ import (
type fakeChannelBankInput struct {
origin eth.L1BlockRef
data []struct {
data []byte
err error
frame Frame
err error
}
}
......@@ -30,34 +28,28 @@ func (f *fakeChannelBankInput) Origin() eth.L1BlockRef {
return f.origin
}
func (f *fakeChannelBankInput) NextData(_ context.Context) ([]byte, error) {
func (f *fakeChannelBankInput) NextFrame(_ context.Context) (Frame, error) {
out := f.data[0]
f.data = f.data[1:]
return out.data, out.err
return out.frame, out.err
}
func (f *fakeChannelBankInput) AddOutput(data []byte, err error) {
func (f *fakeChannelBankInput) AddFrame(frame Frame, err error) {
f.data = append(f.data, struct {
data []byte
err error
}{data: data, err: err})
frame Frame
err error
}{frame: frame, err: err})
}
// ExpectNextFrameData takes a set of test frame & turns into the raw data
// for reading into the channel bank via `NextData`
func (f *fakeChannelBankInput) AddFrames(frames ...testFrame) {
data := new(bytes.Buffer)
data.WriteByte(DerivationVersion0)
for _, frame := range frames {
ff := frame.ToFrame()
if err := ff.MarshalBinary(data); err != nil {
panic(fmt.Errorf("error in making frame during test: %w", err))
}
f.AddFrame(frame.ToFrame(), nil)
}
f.AddOutput(data.Bytes(), nil)
}
var _ NextDataProvider = (*fakeChannelBankInput)(nil)
var _ NextFrameProvider = (*fakeChannelBankInput)(nil)
// format: <channelID-data>:<frame-number>:<content><optional-last-frame-marker "!">
// example: "abc:0:helloworld!"
......@@ -105,17 +97,22 @@ func TestChannelBankSimple(t *testing.T) {
input := &fakeChannelBankInput{origin: a}
input.AddFrames("a:0:first", "a:2:third!")
input.AddFrames("a:1:second")
input.AddOutput(nil, io.EOF)
input.AddFrame(Frame{}, io.EOF)
cfg := &rollup.Config{ChannelTimeout: 10}
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil)
// Load the first + third frame
// Load the first frame
out, err := cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the third frame
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the second frame
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
......@@ -140,21 +137,29 @@ func TestChannelBankDuplicates(t *testing.T) {
input.AddFrames("a:0:first", "a:2:third!")
input.AddFrames("a:0:altfirst", "a:2:altthird!")
input.AddFrames("a:1:second")
input.AddOutput(nil, io.EOF)
input.AddFrame(Frame{}, io.EOF)
cfg := &rollup.Config{ChannelTimeout: 10}
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil)
// Load the first + third frame
// Load the first frame
out, err := cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the third frame
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the duplicate frames
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the second frame
out, err = cb.NextData(context.Background())
......
package derive
import (
"context"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/log"
)
var _ NextFrameProvider = &FrameQueue{}
type NextDataProvider interface {
NextData(context.Context) ([]byte, error)
Origin() eth.L1BlockRef
}
type FrameQueue struct {
log log.Logger
frames []Frame
prev NextDataProvider
}
func NewFrameQueue(log log.Logger, prev NextDataProvider) *FrameQueue {
return &FrameQueue{
log: log,
prev: prev,
}
}
func (fq *FrameQueue) Origin() eth.L1BlockRef {
return fq.prev.Origin()
}
func (fq *FrameQueue) NextFrame(ctx context.Context) (Frame, error) {
// Find more frames if we need to
if len(fq.frames) == 0 {
if data, err := fq.prev.NextData(ctx); err != nil {
return Frame{}, err
} else {
if new, err := ParseFrames(data); err == nil {
fq.frames = append(fq.frames, new...)
} else {
fq.log.Warn("Failed to parse frames", "origin", fq.prev.Origin(), "err", err)
}
}
}
// If we did not add more frames but still have more data, retry this function.
if len(fq.frames) == 0 {
return Frame{}, NotEnoughData
}
ret := fq.frames[0]
fq.frames = fq.frames[1:]
return ret, nil
}
func (fq *FrameQueue) Reset(ctx context.Context, base eth.L1BlockRef) error {
fq.frames = fq.frames[:0]
return io.EOF
}
......@@ -67,7 +67,8 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
l1Traversal := NewL1Traversal(log, l1Fetcher)
dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) // auxiliary stage for L1Retrieval
l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
bank := NewChannelBank(log, cfg, l1Src, l1Fetcher)
frameQueue := NewFrameQueue(log, l1Src)
bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher)
chInReader := NewChannelInReader(log, bank)
batchQueue := NewBatchQueue(log, cfg, chInReader)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment