Commit fe78c148 authored by Joshua Gutow's avatar Joshua Gutow

op-node: Switch L1 Traversal to a pull based model

The L1 Retrieval stage is now responsible for pulling data from
the L1 Traversal stage. In addition, the pipeline is responsible
for advancing the state of the L1 Traversal stage.

The L1 Traversal stage only provides access to the current L1 block
once - it pretends to be a queue that is consumed from.
parent 3b98cc61
......@@ -17,24 +17,29 @@ type DataAvailabilitySource interface {
OpenData(ctx context.Context, id eth.BlockID) DataIter
}
type NextBlockProvider interface {
NextL1Block(context.Context) (eth.L1BlockRef, error)
}
type L1Retrieval struct {
log log.Logger
dataSrc DataAvailabilitySource
next L1SourceOutput
prev NextBlockProvider
progress Progress
data eth.Data
datas DataIter
}
var _ Stage = (*L1Retrieval)(nil)
func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, next L1SourceOutput) *L1Retrieval {
func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, next L1SourceOutput, prev NextBlockProvider) *L1Retrieval {
return &L1Retrieval{
log: log,
dataSrc: dataSrc,
next: next,
prev: prev,
}
}
......@@ -42,48 +47,46 @@ func (l1r *L1Retrieval) Progress() Progress {
return l1r.progress
}
func (l1r *L1Retrieval) Step(ctx context.Context, outer Progress) error {
if changed, err := l1r.progress.Update(outer); err != nil || changed {
return err
}
// specific to L1 source: if the L1 origin is closed, there is no more data to retrieve.
if l1r.progress.Closed {
return io.EOF
}
// create a source if we have none
if l1r.datas == nil {
l1r.datas = l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID())
return nil
}
// buffer data if we have none
if l1r.data == nil {
// Step does an action in the L1 Retrieval stage
// If there is data, it pushes it to the next stage.
// If there is no more data open ourselves if we are closed or close ourselves if we are open
func (l1r *L1Retrieval) Step(ctx context.Context, _ Progress) error {
if l1r.datas != nil {
l1r.log.Debug("fetching next piece of data")
data, err := l1r.datas.Next(ctx)
if err == io.EOF {
l1r.progress.Closed = true
l1r.datas = nil
return io.EOF
} else if err != nil {
return err
} else {
l1r.data = data
l1r.next.IngestData(data)
return nil
}
} else {
if l1r.progress.Closed {
next, err := l1r.prev.NextL1Block(ctx)
if err == io.EOF {
return io.EOF
} else if err != nil {
return err
}
l1r.datas = l1r.dataSrc.OpenData(ctx, next.ID())
l1r.progress.Origin = next
l1r.progress.Closed = false
} else {
l1r.progress.Closed = true
}
return nil
}
// flush the data to next stage
l1r.next.IngestData(l1r.data)
// and nil the data, the next step will retrieve the next data
l1r.data = nil
return nil
}
// ResetStep re-initializes the L1 Retrieval stage to block of it's `next` progress.
// Note that we open up the `l1r.datas` here because it is requires to maintain the
// internal invariants that later propagate up the derivation pipeline.
func (l1r *L1Retrieval) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
l1r.progress = l1r.next.Progress()
l1r.datas = nil
l1r.data = nil
l1r.datas = l1r.dataSrc.OpenData(ctx, l1r.progress.Origin.ID())
l1r.log.Info("Reset of L1Retrieval done", "origin", l1r.progress.Origin)
return io.EOF
}
......@@ -44,6 +44,19 @@ func (m *MockDataSource) ExpectOpenData(id eth.BlockID, iter DataIter, err error
var _ DataAvailabilitySource = (*MockDataSource)(nil)
type MockL1Traversal struct {
mock.Mock
}
func (m *MockL1Traversal) NextL1Block(_ context.Context) (eth.L1BlockRef, error) {
out := m.Mock.MethodCalled("NextL1Block")
return out[0].(eth.L1BlockRef), out[1].(error)
}
func (m *MockL1Traversal) ExpectNextL1Block(block eth.L1BlockRef, err error) {
m.Mock.On("NextL1Block").Return(block, err)
}
type MockIngestData struct {
MockOriginStage
}
......@@ -63,12 +76,13 @@ func TestL1Retrieval_Step(t *testing.T) {
next := &MockIngestData{MockOriginStage{progress: Progress{Origin: testutils.RandomBlockRef(rng), Closed: true}}}
dataSrc := &MockDataSource{}
prev := &MockL1Traversal{}
a := testutils.RandomData(rng, 10)
b := testutils.RandomData(rng, 15)
iter := &fakeDataIter{data: []eth.Data{a, b}}
outer := Progress{Origin: testutils.NextRandomRef(rng, next.progress.Origin), Closed: false}
outer := next.progress
// mock some L1 data to open for the origin that is opened by the outer stage
dataSrc.ExpectOpenData(outer.Origin.ID(), iter, nil)
......@@ -79,7 +93,7 @@ func TestL1Retrieval_Step(t *testing.T) {
defer dataSrc.AssertExpectations(t)
defer next.AssertExpectations(t)
l1r := NewL1Retrieval(testlog.Logger(t, log.LvlError), dataSrc, next)
l1r := NewL1Retrieval(testlog.Logger(t, log.LvlError), dataSrc, next, prev)
// first we expect the stage to reset to the origin of the inner stage
require.NoError(t, RepeatResetStep(t, l1r.ResetStep, nil, 1))
......
......@@ -11,42 +11,42 @@ import (
"github.com/ethereum/go-ethereum/log"
)
// L1 Traversal fetches the next L1 block and exposes it through the progress API
type L1BlockRefByNumberFetcher interface {
L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error)
}
type L1Traversal struct {
log log.Logger
block eth.L1BlockRef
done bool
l1Blocks L1BlockRefByNumberFetcher
next StageProgress
progress Progress
log log.Logger
}
var _ Stage = (*L1Traversal)(nil)
var _ PullStage = (*L1Traversal)(nil)
func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher, next StageProgress) *L1Traversal {
func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher) *L1Traversal {
return &L1Traversal{
log: log,
l1Blocks: l1Blocks,
next: next,
}
}
func (l1t *L1Traversal) Progress() Progress {
return l1t.progress
}
func (l1t *L1Traversal) Step(ctx context.Context, outer Progress) error {
if !l1t.progress.Closed { // close origin and do another pipeline sweep, before we try to move to the next origin
l1t.progress.Closed = true
return nil
// NextL1Block returns the next block. It does not advance, but it can only be
// called once before returning io.EOF
func (l1t *L1Traversal) NextL1Block(_ context.Context) (eth.L1BlockRef, error) {
if !l1t.done {
l1t.done = true
return l1t.block, nil
} else {
return eth.L1BlockRef{}, io.EOF
}
}
// If we reorg to a shorter chain, then we'll only derive new L2 data once the L1 reorg
// becomes longer than the previous L1 chain.
// This is fine, assuming the new L1 chain is live, but we may want to reconsider this.
origin := l1t.progress.Origin
// AdvanceL1Block advances the internal state of L1 Traversal
func (l1t *L1Traversal) AdvanceL1Block(ctx context.Context) error {
origin := l1t.block
nextL1Origin, err := l1t.l1Blocks.L1BlockRefByNumber(ctx, origin.Number+1)
if errors.Is(err, ethereum.NotFound) {
l1t.log.Debug("can't find next L1 block info (yet)", "number", origin.Number+1, "origin", origin)
......@@ -54,16 +54,20 @@ func (l1t *L1Traversal) Step(ctx context.Context, outer Progress) error {
} else if err != nil {
return NewTemporaryError(fmt.Errorf("failed to find L1 block info by number, at origin %s next %d: %w", origin, origin.Number+1, err))
}
if l1t.progress.Origin.Hash != nextL1Origin.ParentHash {
return NewResetError(fmt.Errorf("detected L1 reorg from %s to %s with conflicting parent %s", l1t.progress.Origin, nextL1Origin, nextL1Origin.ParentID()))
if l1t.block.Hash != nextL1Origin.ParentHash {
return NewResetError(fmt.Errorf("detected L1 reorg from %s to %s with conflicting parent %s", l1t.block, nextL1Origin, nextL1Origin.ParentID()))
}
l1t.progress.Origin = nextL1Origin
l1t.progress.Closed = false
l1t.block = nextL1Origin
l1t.done = false
return nil
}
func (l1t *L1Traversal) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
l1t.progress = l1t.next.Progress()
l1t.log.Info("completed reset of derivation pipeline", "origin", l1t.progress.Origin)
// Reset sets the internal L1 block to the supplied base.
// Note that the next call to `NextL1Block` will return the block after `base`
// TODO: Walk one back/figure this out.
func (l1t *L1Traversal) Reset(ctx context.Context, base eth.L1BlockRef) error {
l1t.block = base
l1t.done = false
l1t.log.Info("completed reset of derivation pipeline", "origin", base)
return io.EOF
}
package derive
import (
"context"
"errors"
"io"
"math/rand"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
)
func TestL1Traversal_Step(t *testing.T) {
// TestL1TraversalNext tests that the `Next` function only returns
// a block reference once and then properly returns io.EOF afterwards
func TestL1TraversalNext(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
tr := NewL1Traversal(testlog.Logger(t, log.LvlError), nil)
// Load up the initial state with a reset
_ = tr.Reset(context.Background(), a)
// First call should always succeed
ref, err := tr.NextL1Block(context.Background())
require.Nil(t, err)
require.Equal(t, a, ref)
// Subsequent calls should return io.EOF
ref, err = tr.NextL1Block(context.Background())
require.Equal(t, eth.L1BlockRef{}, ref)
require.Equal(t, io.EOF, err)
ref, err = tr.NextL1Block(context.Background())
require.Equal(t, eth.L1BlockRef{}, ref)
require.Equal(t, io.EOF, err)
}
// TestL1TraversalAdvance tests that the `Advance` function properly
// handles different error cases and returns the expected block ref
// if there is no error.
func TestL1TraversalAdvance(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
a := testutils.RandomBlockRef(rng)
b := testutils.NextRandomRef(rng, a)
c := testutils.NextRandomRef(rng, b)
d := testutils.NextRandomRef(rng, c)
e := testutils.NextRandomRef(rng, d)
f := testutils.RandomBlockRef(rng) // a fork, doesn't build on d
f.Number = e.Number + 1 // even though it might be the next number
l1Fetcher := &testutils.MockL1Source{}
l1Fetcher.ExpectL1BlockRefByNumber(b.Number, b, nil)
// pretend there's an RPC error
l1Fetcher.ExpectL1BlockRefByNumber(c.Number, c, errors.New("rpc error - check back later"))
l1Fetcher.ExpectL1BlockRefByNumber(c.Number, c, nil)
// pretend the block is not there yet for a while
l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, ethereum.NotFound)
l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, ethereum.NotFound)
// it will show up though
l1Fetcher.ExpectL1BlockRefByNumber(d.Number, d, nil)
l1Fetcher.ExpectL1BlockRefByNumber(e.Number, e, nil)
l1Fetcher.ExpectL1BlockRefByNumber(f.Number, f, nil)
next := &MockOriginStage{progress: Progress{Origin: a, Closed: false}}
tr := NewL1Traversal(testlog.Logger(t, log.LvlError), l1Fetcher, next)
defer l1Fetcher.AssertExpectations(t)
defer next.AssertExpectations(t)
require.NoError(t, RepeatResetStep(t, tr.ResetStep, nil, 1))
require.Equal(t, a, tr.Progress().Origin, "stage needs to adopt the origin of next stage on reset")
require.False(t, tr.Progress().Closed, "stage needs to be open after reset")
require.ErrorIs(t, RepeatStep(t, tr.Step, Progress{}, 10), ErrTemporary, "expected temporary error because of RPC mock fail")
require.NoError(t, RepeatStep(t, tr.Step, Progress{}, 10))
require.Equal(t, c, tr.Progress().Origin, "expected to be stuck on ethereum.NotFound on d")
require.NoError(t, RepeatStep(t, tr.Step, Progress{}, 1))
require.Equal(t, c, tr.Progress().Origin, "expected to be stuck again, should get the EOF within 1 step")
require.ErrorIs(t, RepeatStep(t, tr.Step, Progress{}, 10), ErrReset, "completed pipeline, until L1 input f that causes a reorg")
// x is at the same height as b but does not extend `a`
x := testutils.RandomBlockRef(rng)
x.Number = b.Number
tests := []struct {
name string
startBlock eth.L1BlockRef
nextBlock eth.L1BlockRef
fetcherErr error
expectedErr error
}{
{
name: "simple extension",
startBlock: a,
nextBlock: b,
fetcherErr: nil,
expectedErr: nil,
},
{
name: "reorg",
startBlock: a,
nextBlock: x,
fetcherErr: nil,
expectedErr: ErrReset,
},
{
name: "not found",
startBlock: a,
nextBlock: eth.L1BlockRef{},
fetcherErr: ethereum.NotFound,
expectedErr: io.EOF,
},
{
name: "temporary error",
startBlock: a,
nextBlock: eth.L1BlockRef{},
fetcherErr: errors.New("interrupted connection"),
expectedErr: ErrTemporary,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
src := &testutils.MockL1Source{}
src.ExpectL1BlockRefByNumber(test.startBlock.Number+1, test.nextBlock, test.fetcherErr)
tr := NewL1Traversal(testlog.Logger(t, log.LvlError), src)
// Load up the initial state with a reset
_ = tr.Reset(context.Background(), test.startBlock)
// Advance it + assert output
err := tr.AdvanceL1Block(context.Background())
require.ErrorIs(t, err, test.expectedErr)
if test.expectedErr == nil {
ref, err := tr.NextL1Block(context.Background())
require.Nil(t, err)
require.Equal(t, test.nextBlock, ref)
}
src.AssertExpectations(t)
})
}
}
......@@ -28,6 +28,12 @@ type StageProgress interface {
Progress() Progress
}
type PullStage interface {
// Reset resets a pull stage. `base` refers to the L1 Block Reference to reset to.
// TODO: Return L1 Block reference
Reset(ctx context.Context, base eth.L1BlockRef) error
}
type Stage interface {
StageProgress
......@@ -68,7 +74,8 @@ type DerivationPipeline struct {
// Index of the stage that is currently being reset.
// >= len(stages) if no additional resetting is required
resetting int
resetting int
pullResetIdx int
// Index of the stage that is currently being processed.
active int
......@@ -76,6 +83,9 @@ type DerivationPipeline struct {
// stages in execution order. A stage Step that:
stages []Stage
pullStages []PullStage
traversal *L1Traversal
eng EngineQueueStage
metrics Metrics
......@@ -83,30 +93,40 @@ type DerivationPipeline struct {
// NewDerivationPipeline creates a derivation pipeline, which should be reset before use.
func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics) *DerivationPipeline {
// Pull stages
l1Traversal := NewL1Traversal(log, l1Fetcher)
// Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages)
eng := NewEngineQueue(log, cfg, engine, metrics)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng)
batchQueue := NewBatchQueue(log, cfg, attributesQueue)
chInReader := NewChannelInReader(log, batchQueue)
bank := NewChannelBank(log, cfg, chInReader)
dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher)
l1Src := NewL1Retrieval(log, dataSrc, bank)
l1Traversal := NewL1Traversal(log, l1Fetcher, l1Src)
stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal}
l1Src := NewL1Retrieval(log, dataSrc, bank, l1Traversal)
stages := []Stage{eng, attributesQueue, batchQueue, chInReader, bank, l1Src}
pullStages := []PullStage{l1Traversal}
return &DerivationPipeline{
log: log,
cfg: cfg,
l1Fetcher: l1Fetcher,
resetting: 0,
active: 0,
stages: stages,
eng: eng,
metrics: metrics,
log: log,
cfg: cfg,
l1Fetcher: l1Fetcher,
resetting: 0,
active: 0,
stages: stages,
pullStages: pullStages,
eng: eng,
metrics: metrics,
traversal: l1Traversal,
}
}
func (dp *DerivationPipeline) Reset() {
dp.resetting = 0
dp.pullResetIdx = 0
}
func (dp *DerivationPipeline) Progress() Progress {
......@@ -160,7 +180,24 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error {
return nil
}
}
// Then reset the pull based stages
if dp.pullResetIdx < len(dp.pullStages) {
// Use the last stage's progress as the one to pull from
inner := dp.stages[len(dp.stages)-1].Progress()
// Do the reset
if err := dp.pullStages[dp.pullResetIdx].Reset(ctx, inner.Origin); err == io.EOF {
// dp.log.Debug("reset of stage completed", "stage", dp.pullResetIdx, "origin", dp.pullStages[dp.pullResetIdx].Progress().Origin)
dp.pullResetIdx += 1
return nil
} else if err != nil {
return fmt.Errorf("stage %d failed resetting: %w", dp.pullResetIdx, err)
} else {
return nil
}
}
// Lastly advance the stages
for i, stage := range dp.stages {
var outer Progress
if i+1 < len(dp.stages) {
......@@ -174,5 +211,6 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error {
return nil
}
}
return io.EOF
// If every stage has returned io.EOF, try to advance the L1 Origin
return dp.traversal.AdvanceL1Block(ctx)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment