Commit 46202cdd authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Fully remove the progress API (#3623)

* op-node: Clean up pipeline

Now that the engine queue is the only step stage, it is easy to
consolidate different loops inside the derivation pipeline.

* op-node: Fully remove the progress API

It has been partially replaced with the Origin API, but the open/closed
distinction no longer exists.
Co-authored-by: default avatarMatthew Slipper <me@matthewslipper.com>
parent f39853ef
......@@ -47,7 +47,7 @@ func NewL2Verifier(log log.Logger, l1 derive.L1Fetcher, eng derive.Engine, cfg *
func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
return &eth.SyncStatus{
CurrentL1: s.derivation.Progress().Origin,
CurrentL1: s.derivation.Origin(),
HeadL1: s.l1Head,
SafeL1: s.l1Safe,
FinalizedL1: s.l1Finalized,
......
......@@ -26,12 +26,6 @@ import (
// It is internally responsible for making sure that batches with L1 inclusions block outside it's
// working range are not considered or pruned.
type BatchQueueOutput interface {
StageProgress
AddBatch(batch *BatchData)
SafeL2Head() eth.L2BlockRef
}
type NextBatchProvider interface {
Origin() eth.L1BlockRef
NextBatch(ctx context.Context) (*BatchData, error)
......
......@@ -38,7 +38,7 @@ type ChannelBank struct {
fetcher L1Fetcher
}
var _ PullStage = (*ChannelBank)(nil)
var _ ResetableStage = (*ChannelBank)(nil)
// NewChannelBank creates a ChannelBank, which should be Reset(origin) before use.
func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextDataProvider, fetcher L1Fetcher) *ChannelBank {
......
......@@ -22,7 +22,7 @@ type ChannelInReader struct {
prev *ChannelBank
}
var _ PullStage = (*ChannelInReader)(nil)
var _ ResetableStage = (*ChannelInReader)(nil)
// NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use.
func NewChannelInReader(log log.Logger, prev *ChannelBank) *ChannelInReader {
......
......@@ -78,13 +78,14 @@ type EngineQueue struct {
engine Engine
prev NextAttributesProvider
progress Progress // only used for pipeline resets
origin eth.L1BlockRef // only used for pipeline resets
metrics Metrics
metrics Metrics
l1Fetcher L1Fetcher
}
// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider) *EngineQueue {
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher) *EngineQueue {
return &EngineQueue{
log: log,
cfg: cfg,
......@@ -95,12 +96,13 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
MaxSize: maxUnsafePayloadsMemory,
SizeFn: payloadMemSize,
},
prev: prev,
prev: prev,
l1Fetcher: l1Fetcher,
}
}
func (eq *EngineQueue) Progress() Progress {
return eq.progress
func (eq *EngineQueue) Origin() eth.L1BlockRef {
return eq.origin
}
func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) {
......@@ -151,7 +153,7 @@ func (eq *EngineQueue) LastL2Time() uint64 {
return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp)
}
func (eq *EngineQueue) Step(ctx context.Context, _ Progress) error {
func (eq *EngineQueue) Step(ctx context.Context) error {
if len(eq.safeAttributes) > 0 {
return eq.tryNextSafeAttributes(ctx)
}
......@@ -402,13 +404,13 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
// ResetStep Walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical.
// The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical.
func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
result, err := sync.FindL2Heads(ctx, eq.cfg, l1Fetcher, eq.engine)
func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error {
result, err := sync.FindL2Heads(ctx, eq.cfg, eq.l1Fetcher, eq.engine)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err))
}
finalized, safe, unsafe := result.Finalized, result.Safe, result.Unsafe
l1Origin, err := l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash)
l1Origin, err := eq.l1Fetcher.L1BlockRefByHash(ctx, safe.L1Origin.Hash)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", safe.L1Origin, err))
}
......@@ -421,7 +423,7 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
if l1Origin.Number < eq.cfg.ChannelTimeout {
pipelineNumber = 0
}
pipelineOrigin, err := l1Fetcher.L1BlockRefByNumber(ctx, pipelineNumber)
pipelineOrigin, err := eq.l1Fetcher.L1BlockRefByNumber(ctx, pipelineNumber)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", pipelineNumber, err))
}
......@@ -431,9 +433,7 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
eq.finalized = finalized
eq.finalityData = eq.finalityData[:0]
// note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
eq.progress = Progress{
Origin: pipelineOrigin,
}
eq.origin = pipelineOrigin
eq.metrics.RecordL2Ref("l2_finalized", finalized)
eq.metrics.RecordL2Ref("l2_safe", safe)
eq.metrics.RecordL2Ref("l2_unsafe", unsafe)
......
......@@ -230,21 +230,21 @@ func TestEngineQueue_Finalize(t *testing.T) {
prev := &fakeAttributesQueue{}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev)
require.ErrorIs(t, eq.ResetStep(context.Background(), l1F), io.EOF)
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F)
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}), io.EOF)
require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B")
require.Equal(t, refB, eq.Origin(), "Expecting to be set back derivation L1 progress to B")
require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps")
// now say C1 was included in D and became the new safe head
eq.progress.Origin = refD
eq.origin = refD
prev.origin = refD
eq.safeHead = refC1
eq.postProcessSafeL2()
// now say D0 was included in E and became the new safe head
eq.progress.Origin = refE
eq.origin = refE
prev.origin = refE
eq.safeHead = refD0
eq.postProcessSafeL2()
......
......@@ -25,7 +25,7 @@ type L1Retrieval struct {
datas DataIter
}
var _ PullStage = (*L1Retrieval)(nil)
var _ ResetableStage = (*L1Retrieval)(nil)
func NewL1Retrieval(log log.Logger, dataSrc DataAvailabilitySource, prev NextBlockProvider) *L1Retrieval {
return &L1Retrieval{
......
......@@ -24,7 +24,7 @@ type L1Traversal struct {
log log.Logger
}
var _ PullStage = (*L1Traversal)(nil)
var _ ResetableStage = (*L1Traversal)(nil)
func NewL1Traversal(log log.Logger, l1Blocks L1BlockRefByNumberFetcher) *L1Traversal {
return &L1Traversal{
......
......@@ -24,46 +24,22 @@ type L1Fetcher interface {
L1TransactionFetcher
}
type StageProgress interface {
Progress() Progress
}
type PullStage interface {
type ResetableStage interface {
// Reset resets a pull stage. `base` refers to the L1 Block Reference to reset to.
// TODO: Return L1 Block reference
Reset(ctx context.Context, base eth.L1BlockRef) error
}
type Stage interface {
StageProgress
// Step tries to progress the state.
// The outer stage progress informs the step what to do.
//
// If the stage:
// - returns EOF: the stage will be skipped
// - returns another error: the stage will make the pipeline error.
// - returns nil: the stage will be repeated next Step
Step(ctx context.Context, outer Progress) error
// ResetStep prepares the state for usage in regular steps.
// Similar to Step(ctx) it returns:
// - EOF if the next stage should be reset
// - error if the reset should start all over again
// - nil if the reset should continue resetting this stage.
ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
}
type EngineQueueStage interface {
Finalized() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
Progress() Progress
Origin() eth.L1BlockRef
SetUnsafeHead(head eth.L2BlockRef)
Finalize(l1Origin eth.BlockID)
AddSafeAttributes(attributes *eth.PayloadAttributes)
AddUnsafePayload(payload *eth.ExecutionPayload)
Step(context.Context) error
}
// DerivationPipeline is updated with new L1 data, and the Step() function can be iterated on to keep the L2 Engine in sync.
......@@ -74,19 +50,12 @@ type DerivationPipeline struct {
// Index of the stage that is currently being reset.
// >= len(stages) if no additional resetting is required
resetting int
pullResetIdx int
// Index of the stage that is currently being processed.
active int
resetting int
stages []ResetableStage
// stages in execution order. A stage Step that:
stages []Stage
pullStages []PullStage
traversal *L1Traversal
eng EngineQueueStage
// Special stages to keep track of
traversal *L1Traversal
eng EngineQueueStage
metrics Metrics
}
......@@ -103,33 +72,32 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
batchQueue := NewBatchQueue(log, cfg, chInReader)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue)
// Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages)
eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue)
// Step stages
eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue, l1Fetcher)
stages := []Stage{eng}
pullStages := []PullStage{attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal}
// Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during
// the reset, but after the engine queue, this is the order in which the stages could talk to each other.
// Note: The engine queue stage is the only reset that can fail.
stages := []ResetableStage{eng, l1Traversal, l1Src, bank, chInReader, batchQueue, attributesQueue}
return &DerivationPipeline{
log: log,
cfg: cfg,
l1Fetcher: l1Fetcher,
resetting: 0,
active: 0,
stages: stages,
pullStages: pullStages,
eng: eng,
metrics: metrics,
traversal: l1Traversal,
log: log,
cfg: cfg,
l1Fetcher: l1Fetcher,
resetting: 0,
stages: stages,
eng: eng,
metrics: metrics,
traversal: l1Traversal,
}
}
func (dp *DerivationPipeline) Reset() {
dp.resetting = 0
dp.pullResetIdx = 0
}
func (dp *DerivationPipeline) Progress() Progress {
return dp.eng.Progress()
func (dp *DerivationPipeline) Origin() eth.L1BlockRef {
return dp.eng.Origin()
}
func (dp *DerivationPipeline) Finalize(l1Origin eth.BlockID) {
......@@ -165,12 +133,12 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) {
// An error is expected when the underlying source closes.
// When Step returns nil, it should be called again, to continue the derivation process.
func (dp *DerivationPipeline) Step(ctx context.Context) error {
defer dp.metrics.RecordL1Ref("l1_derived", dp.Progress().Origin)
defer dp.metrics.RecordL1Ref("l1_derived", dp.Origin())
// if any stages need to be reset, do that first.
if dp.resetting < len(dp.stages) {
if err := dp.stages[dp.resetting].ResetStep(ctx, dp.l1Fetcher); err == io.EOF {
dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.stages[dp.resetting].Progress().Origin)
if err := dp.stages[dp.resetting].Reset(ctx, dp.eng.Origin()); err == io.EOF {
dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.eng.Origin())
dp.resetting += 1
return nil
} else if err != nil {
......@@ -179,37 +147,14 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error {
return nil
}
}
// Then reset the pull based stages
if dp.pullResetIdx < len(dp.pullStages) {
// Use the last stage's progress as the one to pull from
inner := dp.stages[len(dp.stages)-1].Progress()
// Do the reset
if err := dp.pullStages[dp.pullResetIdx].Reset(ctx, inner.Origin); err == io.EOF {
// dp.log.Debug("reset of stage completed", "stage", dp.pullResetIdx, "origin", dp.pullStages[dp.pullResetIdx].Progress().Origin)
dp.pullResetIdx += 1
return nil
} else if err != nil {
return fmt.Errorf("stage %d failed resetting: %w", dp.pullResetIdx, err)
} else {
return nil
}
}
// Lastly advance the stages
for i, stage := range dp.stages {
var outer Progress
if i+1 < len(dp.stages) {
outer = dp.stages[i+1].Progress()
}
if err := stage.Step(ctx, outer); err == io.EOF {
continue
} else if err != nil {
return fmt.Errorf("stage %d failed: %w", i, err)
} else {
return nil
}
// Now step the engine queue. It will pull earlier data as needed.
if err := dp.eng.Step(ctx); err == io.EOF {
// If every stage has returned io.EOF, try to advance the L1 Origin
return dp.traversal.AdvanceL1Block(ctx)
} else if err != nil {
return fmt.Errorf("engine stage failed: %w", err)
} else {
return nil
}
// If every stage has returned io.EOF, try to advance the L1 Origin
return dp.traversal.AdvanceL1Block(ctx)
}
package derive
import (
"context"
"io"
"testing"
"github.com/stretchr/testify/mock"
"github.com/ethereum-optimism/optimism/op-node/testutils"
)
import "github.com/ethereum-optimism/optimism/op-node/testutils"
var _ Engine = (*testutils.MockEngine)(nil)
var _ L1Fetcher = (*testutils.MockL1Source)(nil)
type MockOriginStage struct {
mock.Mock
progress Progress
}
func (m *MockOriginStage) Progress() Progress {
return m.progress
}
var _ StageProgress = (*MockOriginStage)(nil)
// RepeatResetStep is a test util that will repeat the ResetStep function until an error.
// If the step runs too many times, it will fail the test.
func RepeatResetStep(t *testing.T, step func(ctx context.Context, l1Fetcher L1Fetcher) error, l1Fetcher L1Fetcher, max int) error {
ctx := context.Background()
for i := 0; i < max; i++ {
err := step(ctx, l1Fetcher)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
}
t.Fatal("ran out of steps")
return nil
}
// RepeatStep is a test util that will repeat the Step function until an error.
// If the step runs too many times, it will fail the test.
func RepeatStep(t *testing.T, step func(ctx context.Context, outer Progress) error, outer Progress, max int) error {
ctx := context.Background()
for i := 0; i < max; i++ {
err := step(ctx, outer)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
}
t.Fatal("ran out of steps")
return nil
}
var _ Metrics = (*testutils.TestDerivationMetrics)(nil)
package derive
import (
"fmt"
"github.com/ethereum-optimism/optimism/op-node/eth"
)
// Progress represents the progress of a derivation stage:
// the input L1 block that is being processed, and whether it's fully processed yet.
type Progress struct {
Origin eth.L1BlockRef
// Closed means that the Current has no more data that the stage may need.
Closed bool
}
func (pr *Progress) Update(outer Progress) (changed bool, err error) {
if outer.Origin.Number < pr.Origin.Number {
return false, nil
}
if pr.Closed {
if outer.Closed {
if pr.Origin.ID() != outer.Origin.ID() {
return true, NewResetError(fmt.Errorf("outer stage changed origin from %s to %s without opening it", pr.Origin, outer.Origin))
}
return false, nil
} else {
if pr.Origin.Hash != outer.Origin.ParentHash {
return true, NewResetError(fmt.Errorf("detected internal pipeline reorg of L1 origin data from %s to %s", pr.Origin, outer.Origin))
}
pr.Origin = outer.Origin
pr.Closed = false
return true, nil
}
} else {
if pr.Origin.ID() != outer.Origin.ID() {
return true, NewResetError(fmt.Errorf("outer stage changed origin from %s to %s before closing it", pr.Origin, outer.Origin))
}
if outer.Closed {
pr.Closed = true
return true, nil
} else {
return false, nil
}
}
}
......@@ -59,7 +59,7 @@ type DerivationPipeline interface {
Finalized() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
Progress() derive.Progress
Origin() eth.L1BlockRef
}
type outputInterface interface {
......
......@@ -416,13 +416,13 @@ func (s *state) eventLoop() {
case <-stepReqCh:
s.metrics.SetDerivationIdle(false)
s.idleDerivation = false
s.log.Debug("Derivation process step", "onto_origin", s.derivation.Progress().Origin, "onto_closed", s.derivation.Progress().Closed, "attempts", stepAttempts)
s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts)
stepCtx, cancel := context.WithTimeout(ctx, time.Second*10) // TODO pick a timeout for executing a single step
err := s.derivation.Step(stepCtx)
cancel()
stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress.
if err == io.EOF {
s.log.Debug("Derivation process went idle", "progress", s.derivation.Progress().Origin)
s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin())
s.idleDerivation = true
stepAttempts = 0
s.metrics.SetDerivationIdle(true)
......@@ -454,7 +454,7 @@ func (s *state) eventLoop() {
}
case respCh := <-s.syncStatusReq:
respCh <- eth.SyncStatus{
CurrentL1: s.derivation.Progress().Origin,
CurrentL1: s.derivation.Origin(),
HeadL1: s.l1Head,
SafeL1: s.l1Safe,
FinalizedL1: s.l1Finalized,
......@@ -520,7 +520,7 @@ func (s *state) snapshot(event string) {
s.snapshotLog.Info("Rollup State Snapshot",
"event", event,
"l1Head", deferJSONString{s.l1Head},
"l1Current", deferJSONString{s.derivation.Progress().Origin},
"l1Current", deferJSONString{s.derivation.Origin()},
"l2Head", deferJSONString{s.derivation.UnsafeL2Head()},
"l2Safe", deferJSONString{s.derivation.SafeL2Head()},
"l2FinalizedHead", deferJSONString{s.derivation.Finalized()})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment