Commit 9e3a8847 authored by Joshua Gutow's avatar Joshua Gutow

op-node: Switch channel bank to be pull based

This again requires a fair amount of changes to channel_in_reader.go
for the channel in reader to maintain its progress state.
parent 5aa961dd
......@@ -2,6 +2,7 @@ package derive
import (
"context"
"errors"
"fmt"
"io"
......@@ -11,6 +12,11 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type NextDataProvider interface {
NextData(ctx context.Context) ([]byte, error)
Origin() eth.L1BlockRef
}
// ChannelBank is a stateful stage that does the following:
// 1. Unmarshalls frames from L1 transaction data
// 2. Applies those frames to a channel
......@@ -22,11 +28,6 @@ import (
// Specifically, the channel bank is not allowed to become too large between successive calls
// to `IngestData`. This means that we can do an ingest and then do a read while becoming too large.
type ChannelBankOutput interface {
StageProgress
WriteChannel(data []byte)
}
// ChannelBank buffers channel frames, and emits full channel data
type ChannelBank struct {
log log.Logger
......@@ -35,82 +36,79 @@ type ChannelBank struct {
channels map[ChannelID]*Channel // channels by ID
channelQueue []ChannelID // channels in FIFO order
progress Progress
origin eth.L1BlockRef
next ChannelBankOutput
prev *L1Retrieval
prev NextDataProvider
fetcher L1Fetcher
}
var _ Stage = (*ChannelBank)(nil)
var _ PullStage = (*ChannelBank)(nil)
// NewChannelBank creates a ChannelBank, which should be Reset(origin) before use.
func NewChannelBank(log log.Logger, cfg *rollup.Config, next ChannelBankOutput, prev *L1Retrieval) *ChannelBank {
func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextDataProvider, fetcher L1Fetcher) *ChannelBank {
return &ChannelBank{
log: log,
cfg: cfg,
channels: make(map[ChannelID]*Channel),
channelQueue: make([]ChannelID, 0, 10),
next: next,
prev: prev,
fetcher: fetcher,
}
}
func (ib *ChannelBank) Progress() Progress {
return ib.progress
func (cb *ChannelBank) Origin() eth.L1BlockRef {
return cb.prev.Origin()
}
func (ib *ChannelBank) prune() {
func (cb *ChannelBank) prune() {
// check total size
totalSize := uint64(0)
for _, ch := range ib.channels {
for _, ch := range cb.channels {
totalSize += ch.size
}
// prune until it is reasonable again. The high-priority channel failed to be read, so we start pruning there.
for totalSize > MaxChannelBankSize {
id := ib.channelQueue[0]
ch := ib.channels[id]
ib.channelQueue = ib.channelQueue[1:]
delete(ib.channels, id)
id := cb.channelQueue[0]
ch := cb.channels[id]
cb.channelQueue = cb.channelQueue[1:]
delete(cb.channels, id)
totalSize -= ch.size
}
}
// IngestData adds new L1 data to the channel bank.
// Read() should be called repeatedly first, until everything has been read, before adding new data.\
func (ib *ChannelBank) IngestData(data []byte) {
if ib.progress.Closed {
panic("write data to bank while closed")
}
ib.log.Debug("channel bank got new data", "origin", ib.progress.Origin, "data_len", len(data))
func (cb *ChannelBank) IngestData(data []byte) {
cb.log.Debug("channel bank got new data", "origin", cb.origin, "data_len", len(data))
// TODO: Why is the prune here?
ib.prune()
cb.prune()
frames, err := ParseFrames(data)
if err != nil {
ib.log.Warn("malformed frame", "err", err)
cb.log.Warn("malformed frame", "err", err)
return
}
// Process each frame
for _, f := range frames {
currentCh, ok := ib.channels[f.ID]
currentCh, ok := cb.channels[f.ID]
if !ok {
// create new channel if it doesn't exist yet
currentCh = NewChannel(f.ID, ib.progress.Origin)
ib.channels[f.ID] = currentCh
ib.channelQueue = append(ib.channelQueue, f.ID)
currentCh = NewChannel(f.ID, cb.origin)
cb.channels[f.ID] = currentCh
cb.channelQueue = append(cb.channelQueue, f.ID)
}
// check if the channel is not timed out
if currentCh.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.progress.Origin.Number {
ib.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber)
if currentCh.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.origin.Number {
cb.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber)
continue
}
ib.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.AddFrame(f, ib.progress.Origin); err != nil {
ib.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
cb.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
if err := currentCh.AddFrame(f, cb.origin); err != nil {
cb.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
continue
}
}
......@@ -118,78 +116,60 @@ func (ib *ChannelBank) IngestData(data []byte) {
// Read the raw data of the first channel, if it's timed-out or closed.
// Read returns io.EOF if there is nothing new to read.
func (ib *ChannelBank) Read() (data []byte, err error) {
if len(ib.channelQueue) == 0 {
func (cb *ChannelBank) Read() (data []byte, err error) {
if len(cb.channelQueue) == 0 {
return nil, io.EOF
}
first := ib.channelQueue[0]
ch := ib.channels[first]
timedOut := ch.OpenBlockNumber()+ib.cfg.ChannelTimeout < ib.progress.Origin.Number
first := cb.channelQueue[0]
ch := cb.channels[first]
timedOut := ch.OpenBlockNumber()+cb.cfg.ChannelTimeout < cb.origin.Number
if timedOut {
ib.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs))
delete(ib.channels, first)
ib.channelQueue = ib.channelQueue[1:]
cb.log.Debug("channel timed out", "channel", first, "frames", len(ch.inputs))
delete(cb.channels, first)
cb.channelQueue = cb.channelQueue[1:]
return nil, io.EOF
}
if !ch.IsReady() {
return nil, io.EOF
}
delete(ib.channels, first)
ib.channelQueue = ib.channelQueue[1:]
delete(cb.channels, first)
cb.channelQueue = cb.channelQueue[1:]
r := ch.Reader()
// Suprress error here. io.ReadAll does return nil instead of io.EOF though.
data, _ = io.ReadAll(r)
return data, nil
}
// Step does the advancement for the channel bank.
// Channel bank as the first non-pull stage does it's own progress maintentance.
// When closed, it checks against the previous origin to determine if to open itself
func (ib *ChannelBank) Step(ctx context.Context, _ Progress) error {
// Open ourselves
// This is ok to do b/c we would not have yielded control to the lower stages
// of the pipeline without being completely done reading from L1.
if ib.progress.Closed {
if ib.progress.Origin != ib.prev.Origin() {
ib.progress.Closed = false
ib.progress.Origin = ib.prev.Origin()
return nil
}
// NextData pulls the next piece of data from the channel bank.
// Note that it attempts to pull data out of the channel bank prior to
// loading data in (unlike most other stages). This is to ensure maintain
// consistency around channel bank pruning which depends upon the order
// of operations.
func (cb *ChannelBank) NextData(ctx context.Context) ([]byte, error) {
if cb.origin != cb.prev.Origin() {
cb.origin = cb.prev.Origin()
}
skipIngest := ib.next.Progress().Origin.Number > ib.progress.Origin.Number
outOfData := false
if data, err := ib.prev.NextData(ctx); err == io.EOF {
outOfData = true
// Do the read from the channel bank first
data, err := cb.Read()
if err == io.EOF {
// continue - We will attempt to load data into the channel bank
} else if err != nil {
return err
return nil, err
} else {
ib.IngestData(data)
return data, nil
}
// otherwise, read the next channel data from the bank
data, err := ib.Read()
if err == io.EOF { // need new L1 data in the bank before we can read more channel data
if outOfData {
if !ib.progress.Closed {
ib.progress.Closed = true
return nil
}
return io.EOF
} else {
return nil
}
// Then load data into the channel bank
if data, err := cb.prev.NextData(ctx); err == io.EOF {
return nil, io.EOF
} else if err != nil {
return err
return nil, err
} else {
if !skipIngest {
ib.next.WriteChannel(data)
return nil
}
cb.IngestData(data)
return nil, NewTemporaryError(errors.New("not enough data"))
}
return nil
}
// ResetStep walks back the L1 chain, starting at the origin of the next stage,
......@@ -197,19 +177,18 @@ func (ib *ChannelBank) Step(ctx context.Context, _ Progress) error {
// to get consistent reads starting at origin.
// Any channel data before this origin will be timed out by the time the channel bank is synced up to the origin,
// so it is not relevant to replay it into the bank.
func (ib *ChannelBank) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
ib.progress = ib.next.Progress()
ib.log.Debug("walking back to find reset origin for channel bank", "origin", ib.progress.Origin)
func (cb *ChannelBank) Reset(ctx context.Context, base eth.L1BlockRef) error {
cb.log.Debug("walking back to find reset origin for channel bank", "origin", base)
// go back in history if we are not distant enough from the next stage
resetBlock := ib.progress.Origin.Number - ib.cfg.ChannelTimeout
if ib.progress.Origin.Number < ib.cfg.ChannelTimeout {
resetBlock := base.Number - cb.cfg.ChannelTimeout
if base.Number < cb.cfg.ChannelTimeout {
resetBlock = 0 // don't underflow
}
parent, err := l1Fetcher.L1BlockRefByNumber(ctx, resetBlock)
parent, err := cb.fetcher.L1BlockRefByNumber(ctx, resetBlock)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to find channel bank block, failed to retrieve L1 reference: %w", err))
}
ib.progress.Origin = parent
cb.origin = parent
return io.EOF
}
......
......@@ -2,80 +2,63 @@ package derive
import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"strconv"
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
type MockChannelBankOutput struct {
MockOriginStage
}
func (m *MockChannelBankOutput) WriteChannel(data []byte) {
m.MethodCalled("WriteChannel", data)
type fakeChannelBankInput struct {
origin eth.L1BlockRef
data []struct {
data []byte
err error
}
}
func (m *MockChannelBankOutput) ExpectWriteChannel(data []byte) {
m.On("WriteChannel", data).Once().Return()
func (f *fakeChannelBankInput) Origin() eth.L1BlockRef {
return f.origin
}
var _ ChannelBankOutput = (*MockChannelBankOutput)(nil)
type bankTestSetup struct {
origins []eth.L1BlockRef
t *testing.T
rng *rand.Rand
cb *ChannelBank
out *MockChannelBankOutput
l1 *testutils.MockL1Source
func (f *fakeChannelBankInput) NextData(_ context.Context) ([]byte, error) {
out := f.data[0]
f.data = f.data[1:]
return out.data, out.err
}
// nolint - this is getting picked up b/c of a t.Skip that will go away
type channelBankTestCase struct {
name string
originTimes []uint64
nextStartsAt int
channelTimeout uint64
fn func(bt *bankTestSetup)
func (f *fakeChannelBankInput) AddOutput(data []byte, err error) {
f.data = append(f.data, struct {
data []byte
err error
}{data: data, err: err})
}
// nolint
func (ct *channelBankTestCase) Run(t *testing.T) {
cfg := &rollup.Config{
ChannelTimeout: ct.channelTimeout,
}
bt := &bankTestSetup{
t: t,
rng: rand.New(rand.NewSource(1234)),
l1: &testutils.MockL1Source{},
}
bt.origins = append(bt.origins, testutils.RandomBlockRef(bt.rng))
for i := range ct.originTimes[1:] {
ref := testutils.NextRandomRef(bt.rng, bt.origins[i])
bt.origins = append(bt.origins, ref)
}
for i, x := range ct.originTimes {
bt.origins[i].Time = x
// ExpectNextFrameData takes a set of test frame & turns into the raw data
// for reading into the channel bank via `NextData`
func (f *fakeChannelBankInput) AddFrames(frames ...testFrame) {
data := new(bytes.Buffer)
data.WriteByte(DerivationVersion0)
for _, frame := range frames {
ff := frame.ToFrame()
if err := ff.MarshalBinary(data); err != nil {
panic(fmt.Errorf("error in making frame during test: %w", err))
}
}
bt.out = &MockChannelBankOutput{MockOriginStage{progress: Progress{Origin: bt.origins[ct.nextStartsAt], Closed: false}}}
bt.cb = NewChannelBank(testlog.Logger(t, log.LvlError), cfg, bt.out, nil)
ct.fn(bt)
f.AddOutput(data.Bytes(), nil)
}
var _ NextDataProvider = (*fakeChannelBankInput)(nil)
// format: <channelID-data>:<frame-number>:<content><optional-last-frame-marker "!">
// example: "abc:0:helloworld!"
type testFrame string
......@@ -115,154 +98,76 @@ func (tf testFrame) ToFrame() Frame {
}
}
func (bt *bankTestSetup) ingestData(data []byte) {
bt.cb.IngestData(data)
}
func TestChannelBankSimple(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
func (bt *bankTestSetup) ingestFrames(frames ...testFrame) {
data := new(bytes.Buffer)
data.WriteByte(DerivationVersion0)
for _, fr := range frames {
f := fr.ToFrame()
if err := f.MarshalBinary(data); err != nil {
panic(fmt.Errorf("error in making frame during test: %w", err))
}
}
bt.ingestData(data.Bytes())
}
func (bt *bankTestSetup) repeatStep(max int, outer int, outerClosed bool, err error) {
require.Equal(bt.t, err, RepeatStep(bt.t, bt.cb.Step, Progress{Origin: bt.origins[outer], Closed: outerClosed}, max))
}
func (bt *bankTestSetup) repeatResetStep(max int, err error) {
require.Equal(bt.t, err, RepeatResetStep(bt.t, bt.cb.ResetStep, bt.l1, max))
}
input := &fakeChannelBankInput{origin: a}
input.AddFrames("a:0:first", "a:2:third!")
input.AddFrames("a:1:second")
input.AddOutput(nil, io.EOF)
func (bt *bankTestSetup) assertOrigin(i int) {
require.Equal(bt.t, bt.cb.progress.Origin, bt.origins[i])
}
func (bt *bankTestSetup) assertOriginTime(x uint64) {
require.Equal(bt.t, x, bt.cb.progress.Origin.Time)
}
func (bt *bankTestSetup) expectChannel(data string) {
bt.out.ExpectWriteChannel([]byte(data))
}
func (bt *bankTestSetup) expectL1BlockRefByNumber(i int) {
bt.l1.ExpectL1BlockRefByNumber(bt.origins[i].Number, bt.origins[i], nil)
}
func (bt *bankTestSetup) assertExpectations() {
bt.l1.AssertExpectations(bt.t)
bt.l1.ExpectedCalls = nil
bt.out.AssertExpectations(bt.t)
bt.out.ExpectedCalls = nil
}
cfg := &rollup.Config{ChannelTimeout: 10}
func TestL1ChannelBank(t *testing.T) {
t.Skip("broken b/c the fake L1Retrieval is not yet built")
testCases := []channelBankTestCase{
{
name: "time outs and buffering",
originTimes: []uint64{0, 1, 2, 3, 4, 5},
nextStartsAt: 3, // Start next stage at block #3
channelTimeout: 2, // Start at block #1
fn: func(bt *bankTestSetup) {
bt.expectL1BlockRefByNumber(1)
bt.repeatResetStep(10, nil)
bt.ingestFrames("a:0:first") // will time out b/c not closed
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil)
bt.repeatStep(10, 1, true, nil)
bt.repeatStep(10, 2, false, nil)
bt.assertOrigin(2)
// Load the first + third frame
out, err := cb.NextData(context.Background())
require.ErrorIs(t, err, ErrTemporary)
require.Equal(t, []byte(nil), out)
bt.repeatStep(10, 2, true, nil)
bt.repeatStep(10, 3, false, nil)
bt.assertOrigin(3)
// Load the second frame
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, ErrTemporary)
require.Equal(t, []byte(nil), out)
bt.repeatStep(10, 3, true, nil)
bt.repeatStep(10, 4, false, nil)
bt.assertOrigin(4)
// Pull out the channel data
out, err = cb.NextData(context.Background())
require.Nil(t, err)
require.Equal(t, "firstsecondthird", string(out))
// Properly closed channel
bt.expectChannel("foobarclosed")
bt.ingestFrames("b:0:foobar")
bt.ingestFrames("b:1:closed!")
bt.repeatStep(10, 4, true, nil)
bt.assertExpectations()
},
},
{
name: "duplicate frames",
originTimes: []uint64{0, 1, 2, 3, 4, 5},
nextStartsAt: 3, // Start next stage at block #3
channelTimeout: 2, // Start at block #1c
fn: func(bt *bankTestSetup) {
bt.expectL1BlockRefByNumber(1)
bt.repeatResetStep(10, nil)
bt.ingestFrames("a:0:first") // will time out b/c not closed
// No more data
out, err = cb.NextData(context.Background())
require.Nil(t, out)
require.Equal(t, io.EOF, err)
}
bt.repeatStep(10, 1, true, nil)
bt.repeatStep(10, 2, false, nil)
bt.assertOrigin(2)
func TestChannelBankDuplicates(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
bt.repeatStep(10, 2, true, nil)
bt.repeatStep(10, 3, false, nil)
bt.assertOrigin(3)
input := &fakeChannelBankInput{origin: a}
input.AddFrames("a:0:first", "a:2:third!")
input.AddFrames("a:0:altfirst", "a:2:altthird!")
input.AddFrames("a:1:second")
input.AddOutput(nil, io.EOF)
bt.repeatStep(10, 3, true, nil)
bt.repeatStep(10, 4, false, nil)
bt.assertOrigin(4)
cfg := &rollup.Config{ChannelTimeout: 10}
bt.ingestFrames("a:0:first")
bt.repeatStep(1, 4, false, nil)
bt.ingestFrames("a:1:second")
bt.repeatStep(1, 4, false, nil)
bt.ingestFrames("a:0:altfirst") // ignored as duplicate
bt.repeatStep(1, 4, false, nil)
bt.ingestFrames("a:1:altsecond") // ignored as duplicate
bt.repeatStep(1, 4, false, nil)
bt.ingestFrames("b:0:new")
bt.repeatStep(1, 4, false, nil)
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil)
// close origin 4
bt.repeatStep(2, 4, true, nil)
// Load the first + third frame
out, err := cb.NextData(context.Background())
require.ErrorIs(t, err, ErrTemporary)
require.Equal(t, []byte(nil), out)
// open origin 1
bt.repeatStep(2, 5, false, nil)
bt.ingestFrames("b:1:hi!") // close the other one first, but blocked
bt.repeatStep(1, 5, false, nil)
bt.ingestFrames("a:2:!") // empty closing frame
bt.expectChannel("firstsecond")
bt.expectChannel("newhi")
bt.repeatStep(5, 5, false, nil)
bt.assertExpectations()
},
},
{
name: "skip bad frames",
originTimes: []uint64{101, 102},
nextStartsAt: 0,
channelTimeout: 3,
fn: func(bt *bankTestSetup) {
// don't do the whole setup process, just override where the stages are
bt.cb.progress = Progress{Origin: bt.origins[0], Closed: false}
bt.out.progress = Progress{Origin: bt.origins[0], Closed: false}
// Load the duplicate frames
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, ErrTemporary)
require.Equal(t, []byte(nil), out)
bt.assertOriginTime(101)
// Load the second frame
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, ErrTemporary)
require.Equal(t, []byte(nil), out)
badTx := new(bytes.Buffer)
badTx.WriteByte(DerivationVersion0)
goodFrame := testFrame("a:0:helloworld!").ToFrame()
if err := goodFrame.MarshalBinary(badTx); err != nil {
panic(fmt.Errorf("error in marshalling frame: %w", err))
}
badTx.Write(testutils.RandomData(bt.rng, 30)) // incomplete frame data
bt.ingestData(badTx.Bytes())
// Expect the bad frame to render the entire chunk invalid.
bt.repeatStep(2, 0, false, nil)
bt.assertExpectations()
},
},
}
for _, testCase := range testCases {
t.Run(testCase.name, testCase.Run)
}
// Pull out the channel data. Expect to see the original set & not the duplicates
out, err = cb.NextData(context.Background())
require.Nil(t, err)
require.Equal(t, "firstsecondthird", string(out))
// No more data
out, err = cb.NextData(context.Background())
require.Nil(t, out)
require.Equal(t, io.EOF, err)
}
......@@ -26,13 +26,18 @@ type ChannelInReader struct {
progress Progress
next BatchQueueStage
prev *ChannelBank
}
var _ ChannelBankOutput = (*ChannelInReader)(nil)
var _ Stage = (*ChannelInReader)(nil)
// NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use.
func NewChannelInReader(log log.Logger, next BatchQueueStage) *ChannelInReader {
return &ChannelInReader{log: log, next: next}
func NewChannelInReader(log log.Logger, next BatchQueueStage, prev *ChannelBank) *ChannelInReader {
return &ChannelInReader{
log: log,
next: next,
prev: prev,
}
}
func (cr *ChannelInReader) Progress() Progress {
......@@ -58,27 +63,45 @@ func (cr *ChannelInReader) NextChannel() {
}
func (cr *ChannelInReader) Step(ctx context.Context, outer Progress) error {
if changed, err := cr.progress.Update(outer); err != nil || changed {
return err
// Close ourselves if required
if cr.progress.Closed {
if cr.progress.Origin != cr.prev.Origin() {
cr.progress.Closed = false
cr.progress.Origin = cr.prev.Origin()
return nil
}
}
if cr.nextBatchFn == nil {
return io.EOF
}
// TODO: can batch be non nil while err == io.EOF
// This depends on the behavior of rlp.Stream
batch, err := cr.nextBatchFn()
if err == io.EOF {
return io.EOF
} else if err != nil {
cr.log.Warn("failed to read batch from channel reader, skipping to next channel now", "err", err)
cr.NextChannel()
if data, err := cr.prev.NextData(ctx); err == io.EOF {
if !cr.progress.Closed {
cr.progress.Closed = true
return nil
} else {
return io.EOF
}
} else if err != nil {
return err
} else {
cr.WriteChannel(data)
return nil
}
} else {
// TODO: can batch be non nil while err == io.EOF
// This depends on the behavior of rlp.Stream
batch, err := cr.nextBatchFn()
if err == io.EOF {
cr.NextChannel()
return io.EOF
} else if err != nil {
cr.log.Warn("failed to read batch from channel reader, skipping to next channel now", "err", err)
cr.NextChannel()
return nil
}
cr.next.AddBatch(batch.Batch)
return nil
}
cr.next.AddBatch(batch.Batch)
return nil
}
func (cr *ChannelInReader) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
......
......@@ -98,16 +98,16 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
l1Traversal := NewL1Traversal(log, l1Fetcher)
dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) // auxiliary stage for L1Retrieval
l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
bank := NewChannelBank(log, cfg, l1Src, l1Fetcher)
// Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages)
eng := NewEngineQueue(log, cfg, engine, metrics)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng)
batchQueue := NewBatchQueue(log, cfg, attributesQueue)
chInReader := NewChannelInReader(log, batchQueue)
bank := NewChannelBank(log, cfg, chInReader, l1Src)
chInReader := NewChannelInReader(log, batchQueue, bank)
stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank}
pullStages := []PullStage{l1Src, l1Traversal}
stages := []Stage{eng, attributesQueue, batchQueue, chInReader}
pullStages := []PullStage{bank, l1Src, l1Traversal}
return &DerivationPipeline{
log: log,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment