Commit 4a91c9ae authored by protolambda's avatar protolambda Committed by GitHub

derivation: remove `EngineQueue` (#10643)

* op-node: remove engine queue

(squashed) remove debug line

* op-node: test VerifyNewL1Origin

* op-node: engine-queue removal review fixes
parent f8143c8c
...@@ -145,9 +145,15 @@ func TestL2Sequencer_SequencerOnlyReorg(gt *testing.T) { ...@@ -145,9 +145,15 @@ func TestL2Sequencer_SequencerOnlyReorg(gt *testing.T) {
sequencer.ActL1HeadSignal(t) sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1HeadUnsafe(t) sequencer.ActBuildToL1HeadUnsafe(t)
// If sequencer does not pick up on pre-reorg chain in derivation,
// then derivation won't see the difference in L1 chains,
// and not trigger a reorg if we traverse from 0 to the new chain later on
// (but would once it gets to consolidate unsafe head later).
sequencer.ActL2PipelineFull(t)
status := sequencer.SyncStatus() status := sequencer.SyncStatus()
require.Zero(t, status.SafeL2.L1Origin.Number, "no safe head progress") require.Zero(t, status.SafeL2.L1Origin.Number, "no safe head progress")
require.Equal(t, status.HeadL1.Hash, status.UnsafeL2.L1Origin.Hash, "have head L1 origin") require.Equal(t, status.HeadL1.Hash, status.UnsafeL2.L1Origin.Hash, "have head L1 origin")
require.NotZero(t, status.UnsafeL2.L1Origin.Number, "have head L1 origin")
// reorg out block with coinbase A, and make a block with coinbase B // reorg out block with coinbase A, and make a block with coinbase B
miner.ActL1RewindToParent(t) miner.ActL1RewindToParent(t)
miner.ActL1SetFeeRecipient(common.Address{'B'}) miner.ActL1SetFeeRecipient(common.Address{'B'})
...@@ -163,6 +169,11 @@ func TestL2Sequencer_SequencerOnlyReorg(gt *testing.T) { ...@@ -163,6 +169,11 @@ func TestL2Sequencer_SequencerOnlyReorg(gt *testing.T) {
// No batches are submitted yet however, // No batches are submitted yet however,
// so it'll keep the L2 block with the old L1 origin, since no conflict is detected. // so it'll keep the L2 block with the old L1 origin, since no conflict is detected.
sequencer.ActL1HeadSignal(t) sequencer.ActL1HeadSignal(t)
postReorgStatus := sequencer.SyncStatus()
require.Zero(t, postReorgStatus.SafeL2.L1Origin.Number, "no safe head progress")
require.NotEqual(t, postReorgStatus.HeadL1.Hash, postReorgStatus.UnsafeL2.L1Origin.Hash, "no longer have head L1 origin")
sequencer.ActL2PipelineFull(t) sequencer.ActL2PipelineFull(t)
// Verifier should detect the inconsistency of the L1 origin and reset the pipeline to follow the reorg // Verifier should detect the inconsistency of the L1 origin and reset the pipeline to follow the reorg
newStatus := sequencer.SyncStatus() newStatus := sequencer.SyncStatus()
...@@ -174,9 +185,6 @@ func TestL2Sequencer_SequencerOnlyReorg(gt *testing.T) { ...@@ -174,9 +185,6 @@ func TestL2Sequencer_SequencerOnlyReorg(gt *testing.T) {
require.Equal(t, status.UnsafeL2.L1Origin.Number, newStatus.HeadL1.Number-1, "seeing N+1 to attempt to build on N") require.Equal(t, status.UnsafeL2.L1Origin.Number, newStatus.HeadL1.Number-1, "seeing N+1 to attempt to build on N")
require.NotEqual(t, status.UnsafeL2.L1Origin.Hash, newStatus.HeadL1.ParentHash, "but N+1 cannot fit on N") require.NotEqual(t, status.UnsafeL2.L1Origin.Hash, newStatus.HeadL1.ParentHash, "but N+1 cannot fit on N")
// After hitting a reset error, it resets derivation, and drops the old L1 chain
sequencer.ActL2PipelineFull(t)
// Can build new L2 blocks with good L1 origin // Can build new L2 blocks with good L1 origin
sequencer.ActBuildToL1HeadUnsafe(t) sequencer.ActBuildToL1HeadUnsafe(t)
require.Equal(t, newStatus.HeadL1.Hash, sequencer.SyncStatus().UnsafeL2.L1Origin.Hash, "build L2 chain with new correct L1 origins") require.Equal(t, newStatus.HeadL1.Hash, sequencer.SyncStatus().UnsafeL2.L1Origin.Hash, "build L2 chain with new correct L1 origins")
......
...@@ -35,12 +35,17 @@ type L2Verifier struct { ...@@ -35,12 +35,17 @@ type L2Verifier struct {
L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error) L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error)
} }
syncDeriver *driver.SyncDeriver
// L2 rollup // L2 rollup
engine *derive.EngineController engine *derive.EngineController
derivation *derive.DerivationPipeline derivation *derive.DerivationPipeline
clSync *clsync.CLSync clSync *clsync.CLSync
finalizer driver.Finalizer attributesHandler driver.AttributesHandler
safeHeadListener derive.SafeHeadListener
finalizer driver.Finalizer
syncCfg *sync.Config
l1 derive.L1Fetcher l1 derive.L1Fetcher
l1State *driver.L1State l1State *driver.L1State
...@@ -84,17 +89,26 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri ...@@ -84,17 +89,26 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
attributesHandler := attributes.NewAttributesHandler(log, cfg, engine, eng) attributesHandler := attributes.NewAttributesHandler(log, cfg, engine, eng)
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, engine, metrics, pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, metrics)
syncCfg, safeHeadListener, finalizer, attributesHandler)
pipeline.Reset()
rollupNode := &L2Verifier{ rollupNode := &L2Verifier{
log: log, log: log,
eng: eng, eng: eng,
engine: engine, engine: engine,
clSync: clSync, clSync: clSync,
derivation: pipeline, derivation: pipeline,
finalizer: finalizer, finalizer: finalizer,
attributesHandler: attributesHandler,
safeHeadListener: safeHeadListener,
syncCfg: syncCfg,
syncDeriver: &driver.SyncDeriver{
Derivation: pipeline,
Finalizer: finalizer,
AttributesHandler: attributesHandler,
SafeHeadNotifs: safeHeadListener,
CLSync: clSync,
Engine: engine,
},
l1: l1, l1: l1,
l1State: driver.NewL1State(log, metrics), l1State: driver.NewL1State(log, metrics),
l2PipelineIdle: true, l2PipelineIdle: true,
...@@ -240,18 +254,7 @@ func (s *L2Verifier) ActL1FinalizedSignal(t Testing) { ...@@ -240,18 +254,7 @@ func (s *L2Verifier) ActL1FinalizedSignal(t Testing) {
// syncStep represents the Driver.syncStep // syncStep represents the Driver.syncStep
func (s *L2Verifier) syncStep(ctx context.Context) error { func (s *L2Verifier) syncStep(ctx context.Context) error {
if fcuCalled, err := s.engine.TryBackupUnsafeReorg(ctx); fcuCalled { return s.syncDeriver.SyncStep(ctx)
return err
}
if err := s.engine.TryUpdateEngine(ctx); !errors.Is(err, derive.ErrNoFCUNeeded) {
return err
}
if err := s.clSync.Proceed(ctx); err != io.EOF {
return err
}
s.l2PipelineIdle = false
return s.derivation.Step(ctx)
} }
// ActL2PipelineStep runs one iteration of the L2 derivation pipeline // ActL2PipelineStep runs one iteration of the L2 derivation pipeline
...@@ -270,6 +273,12 @@ func (s *L2Verifier) ActL2PipelineStep(t Testing) { ...@@ -270,6 +273,12 @@ func (s *L2Verifier) ActL2PipelineStep(t Testing) {
} else if err != nil && errors.Is(err, derive.ErrReset) { } else if err != nil && errors.Is(err, derive.ErrReset) {
s.log.Warn("Derivation pipeline is reset", "err", err) s.log.Warn("Derivation pipeline is reset", "err", err)
s.derivation.Reset() s.derivation.Reset()
if err := derive.ResetEngine(t.Ctx(), s.log, s.rollupCfg, s.engine, s.l1, s.eng, s.syncCfg, s.safeHeadListener); err != nil {
s.log.Error("Derivation pipeline not ready, failed to reset engine", "err", err)
// Derivation-pipeline will return a new ResetError until we confirm the engine has been successfully reset.
return
}
s.derivation.ConfirmEngineReset()
return return
} else if err != nil && errors.Is(err, derive.ErrTemporary) { } else if err != nil && errors.Is(err, derive.ErrTemporary) {
s.log.Warn("Derivation process temporary error", "err", err) s.log.Warn("Derivation process temporary error", "err", err)
......
...@@ -31,6 +31,8 @@ type AttributesWithParent struct { ...@@ -31,6 +31,8 @@ type AttributesWithParent struct {
Attributes *eth.PayloadAttributes Attributes *eth.PayloadAttributes
Parent eth.L2BlockRef Parent eth.L2BlockRef
IsLastInSpan bool IsLastInSpan bool
DerivedFrom eth.L1BlockRef
} }
type AttributesQueue struct { type AttributesQueue struct {
...@@ -71,7 +73,12 @@ func (aq *AttributesQueue) NextAttributes(ctx context.Context, parent eth.L2Bloc ...@@ -71,7 +73,12 @@ func (aq *AttributesQueue) NextAttributes(ctx context.Context, parent eth.L2Bloc
return nil, err return nil, err
} else { } else {
// Clear out the local state once we will succeed // Clear out the local state once we will succeed
attr := AttributesWithParent{attrs, parent, aq.isLastInSpan} attr := AttributesWithParent{
Attributes: attrs,
Parent: parent,
IsLastInSpan: aq.isLastInSpan,
DerivedFrom: aq.Origin(),
}
aq.batch = nil aq.batch = nil
aq.isLastInSpan = false aq.isLastInSpan = false
return &attr, nil return &attr, nil
......
package derive
import (
"context"
"fmt"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type L1BlockRefByNumber interface {
L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error)
}
// VerifyNewL1Origin checks that the L2 unsafe head still has a L1 origin that is on the canonical chain.
// If the unsafe head origin is after the new L1 origin it is assumed to still be canonical.
// The check is only required when moving to a new L1 origin.
func VerifyNewL1Origin(ctx context.Context, unsafeOrigin eth.L1BlockRef, l1 L1BlockRefByNumber, newOrigin eth.L1BlockRef) error {
if newOrigin.Number == unsafeOrigin.Number && newOrigin != unsafeOrigin {
return NewResetError(fmt.Errorf("l1 origin was inconsistent with l2 unsafe head origin, need reset to resolve: l1 origin: %v; unsafe origin: %v",
newOrigin.ID(), unsafeOrigin))
}
// Avoid requesting an older block by checking against the parent hash
if newOrigin.Number == unsafeOrigin.Number+1 && newOrigin.ParentHash != unsafeOrigin.Hash {
return NewResetError(fmt.Errorf("l2 unsafe head origin is no longer canonical, need reset to resolve: canonical hash: %v; unsafe origin hash: %v",
newOrigin.ParentHash, unsafeOrigin.Hash))
}
if newOrigin.Number > unsafeOrigin.Number+1 {
// If unsafe origin is further behind new origin, check it's still on the canonical chain.
canonical, err := l1.L1BlockRefByNumber(ctx, unsafeOrigin.Number)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to fetch canonical L1 block at slot: %v; err: %w", unsafeOrigin.Number, err))
}
if canonical != unsafeOrigin {
return NewResetError(fmt.Errorf("l2 unsafe head origin is no longer canonical, need reset to resolve: canonical: %v; unsafe origin: %v",
canonical, unsafeOrigin))
}
}
return nil
}
package derive
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testutils"
)
func TestVerifyNewL1Origin(t *testing.T) {
t.Run("same height inconsistency", func(t *testing.T) {
l1F := &testutils.MockL1Source{}
defer l1F.AssertExpectations(t)
a := eth.L1BlockRef{Number: 123, Hash: common.Hash{0xa}}
b := eth.L1BlockRef{Number: 123, Hash: common.Hash{0xb}}
err := VerifyNewL1Origin(context.Background(), a, l1F, b)
require.ErrorIs(t, err, ErrReset, "different origin at same height, must be a reorg")
})
t.Run("same height success", func(t *testing.T) {
l1F := &testutils.MockL1Source{}
defer l1F.AssertExpectations(t)
a := eth.L1BlockRef{Number: 123, Hash: common.Hash{0xa}}
b := eth.L1BlockRef{Number: 123, Hash: common.Hash{0xa}}
err := VerifyNewL1Origin(context.Background(), a, l1F, b)
require.NoError(t, err, "same origin")
})
t.Run("parent-hash inconsistency", func(t *testing.T) {
l1F := &testutils.MockL1Source{}
defer l1F.AssertExpectations(t)
a := eth.L1BlockRef{Number: 123, Hash: common.Hash{0xa}}
b := eth.L1BlockRef{Number: 123 + 1, Hash: common.Hash{0xb}, ParentHash: common.Hash{42}}
err := VerifyNewL1Origin(context.Background(), a, l1F, b)
require.ErrorIs(t, err, ErrReset, "parent hash of new origin does not match")
})
t.Run("parent-hash success", func(t *testing.T) {
l1F := &testutils.MockL1Source{}
defer l1F.AssertExpectations(t)
a := eth.L1BlockRef{Number: 123, Hash: common.Hash{0xa}}
b := eth.L1BlockRef{Number: 123 + 1, Hash: common.Hash{0xb}, ParentHash: common.Hash{0xa}}
err := VerifyNewL1Origin(context.Background(), a, l1F, b)
require.NoError(t, err, "expecting block b just after a")
})
t.Run("failed canonical check", func(t *testing.T) {
l1F := &testutils.MockL1Source{}
defer l1F.AssertExpectations(t)
mockErr := errors.New("test error")
l1F.ExpectL1BlockRefByNumber(123, eth.L1BlockRef{}, mockErr)
a := eth.L1BlockRef{Number: 123, Hash: common.Hash{0xa}}
b := eth.L1BlockRef{Number: 123 + 2, Hash: common.Hash{0xb}}
err := VerifyNewL1Origin(context.Background(), a, l1F, b)
require.ErrorIs(t, err, ErrTemporary, "temporary fetching error")
require.ErrorIs(t, err, mockErr, "wraps the underlying error")
})
t.Run("older not canonical", func(t *testing.T) {
l1F := &testutils.MockL1Source{}
defer l1F.AssertExpectations(t)
l1F.ExpectL1BlockRefByNumber(123, eth.L1BlockRef{Number: 123, Hash: common.Hash{42}}, nil)
a := eth.L1BlockRef{Number: 123, Hash: common.Hash{0xa}}
b := eth.L1BlockRef{Number: 123 + 2, Hash: common.Hash{0xb}}
err := VerifyNewL1Origin(context.Background(), a, l1F, b)
require.ErrorIs(t, err, ErrReset, "block A is no longer canonical, need to reset")
})
t.Run("success older block", func(t *testing.T) {
l1F := &testutils.MockL1Source{}
defer l1F.AssertExpectations(t)
l1F.ExpectL1BlockRefByNumber(123, eth.L1BlockRef{Number: 123, Hash: common.Hash{0xa}}, nil)
a := eth.L1BlockRef{Number: 123, Hash: common.Hash{0xa}}
b := eth.L1BlockRef{Number: 123 + 2, Hash: common.Hash{0xb}}
err := VerifyNewL1Origin(context.Background(), a, l1F, b)
require.NoError(t, err, "block A is still canonical, can proceed")
})
}
This diff is collapsed.
package derive
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type ResetL2 interface {
sync.L2Chain
SystemConfigL2Fetcher
}
// ResetEngine walks the L2 chain backwards until it finds a plausible unsafe head,
// and an L2 safe block that is guaranteed to still be from the L1 chain.
func ResetEngine(ctx context.Context, log log.Logger, cfg *rollup.Config, ec ResetEngineControl, l1 sync.L1Chain, l2 ResetL2, syncCfg *sync.Config, safeHeadNotifs SafeHeadListener) error {
result, err := sync.FindL2Heads(ctx, cfg, l1, l2, log, syncCfg)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err))
}
finalized, safe, unsafe := result.Finalized, result.Safe, result.Unsafe
l1Origin, err := l1.L1BlockRefByHash(ctx, safe.L1Origin.Hash)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", safe.L1Origin, err))
}
if safe.Time < l1Origin.Time {
return NewResetError(fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken",
safe, safe.Time, l1Origin, l1Origin.Time))
}
ec.SetUnsafeHead(unsafe)
ec.SetSafeHead(safe)
ec.SetPendingSafeL2Head(safe)
ec.SetFinalizedHead(finalized)
ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
ec.ResetBuildingState()
log.Debug("Reset of Engine is completed", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time,
"unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin)
if safeHeadNotifs != nil {
if err := safeHeadNotifs.SafeHeadReset(safe); err != nil {
return err
}
if safeHeadNotifs.Enabled() && safe.Number == cfg.Genesis.L2.Number && safe.Hash == cfg.Genesis.L2.Hash {
// The rollup genesis block is always safe by definition. So if the pipeline resets this far back we know
// we will process all safe head updates and can record genesis as always safe from L1 genesis.
// Note that it is not safe to use cfg.Genesis.L1 here as it is the block immediately before the L2 genesis
// but the contracts may have been deployed earlier than that, allowing creating a dispute game
// with a L1 head prior to cfg.Genesis.L1
l1Genesis, err := l1.L1BlockRefByNumber(ctx, 0)
if err != nil {
return fmt.Errorf("failed to retrieve L1 genesis: %w", err)
}
if err := safeHeadNotifs.SafeHeadUpdated(safe, l1Genesis.ID()); err != nil {
return err
}
}
}
return nil
}
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -21,6 +20,7 @@ type Metrics interface { ...@@ -21,6 +20,7 @@ type Metrics interface {
RecordChannelTimedOut() RecordChannelTimedOut()
RecordFrame() RecordFrame()
RecordDerivedBatches(batchType string) RecordDerivedBatches(batchType string)
SetDerivationIdle(idle bool)
} }
type L1Fetcher interface { type L1Fetcher interface {
...@@ -36,19 +36,15 @@ type ResettableStage interface { ...@@ -36,19 +36,15 @@ type ResettableStage interface {
Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error
} }
type EngineQueueStage interface { // DerivationPipeline is updated with new L1 data, and the Step() function can be iterated on to generate attributes
Origin() eth.L1BlockRef
SystemConfig() eth.SystemConfig
Step(context.Context) error
}
// DerivationPipeline is updated with new L1 data, and the Step() function can be iterated on to keep the L2 Engine in sync.
type DerivationPipeline struct { type DerivationPipeline struct {
log log.Logger log log.Logger
rollupCfg *rollup.Config rollupCfg *rollup.Config
l1Fetcher L1Fetcher l1Fetcher L1Fetcher
plasma PlasmaInputFetcher plasma PlasmaInputFetcher
l2 L2Source
// Index of the stage that is currently being reset. // Index of the stage that is currently being reset.
// >= len(stages) if no additional resetting is required // >= len(stages) if no additional resetting is required
resetting int resetting int
...@@ -56,16 +52,21 @@ type DerivationPipeline struct { ...@@ -56,16 +52,21 @@ type DerivationPipeline struct {
// Special stages to keep track of // Special stages to keep track of
traversal *L1Traversal traversal *L1Traversal
eng EngineQueueStage
attrib *AttributesQueue
// L1 block that the next returned attributes are derived from, i.e. at the L2-end of the pipeline.
origin eth.L1BlockRef
resetL2Safe eth.L2BlockRef
resetSysConfig eth.SystemConfig
engineIsReset bool
metrics Metrics metrics Metrics
} }
// NewDerivationPipeline creates a derivation pipeline, which should be reset before use. // NewDerivationPipeline creates a DerivationPipeline, to turn L1 data into L2 block-inputs.
func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher,
plasma PlasmaInputFetcher, l2Source L2Source, engine LocalEngineControl, metrics Metrics, plasma PlasmaInputFetcher, l2Source L2Source, metrics Metrics) *DerivationPipeline {
syncCfg *sync.Config, safeHeadListener SafeHeadListener, finalizer FinalizerHooks, attributesHandler AttributesHandler) *DerivationPipeline {
// Pull stages // Pull stages
l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher) l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher)
...@@ -78,14 +79,10 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L ...@@ -78,14 +79,10 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L
attrBuilder := NewFetchingAttributesBuilder(rollupCfg, l1Fetcher, l2Source) attrBuilder := NewFetchingAttributesBuilder(rollupCfg, l1Fetcher, l2Source)
attributesQueue := NewAttributesQueue(log, rollupCfg, attrBuilder, batchQueue) attributesQueue := NewAttributesQueue(log, rollupCfg, attrBuilder, batchQueue)
// Step stages
eng := NewEngineQueue(log, rollupCfg, l2Source, engine, metrics, attributesQueue,
l1Fetcher, syncCfg, safeHeadListener, finalizer, attributesHandler)
// Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during // Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during
// the reset, but after the engine queue, this is the order in which the stages could talk to each other. // the reset, but after the engine queue, this is the order in which the stages could talk to each other.
// Note: The engine queue stage is the only reset that can fail. // Note: The engine queue stage is the only reset that can fail.
stages := []ResettableStage{eng, l1Traversal, l1Src, plasma, frameQueue, bank, chInReader, batchQueue, attributesQueue} stages := []ResettableStage{l1Traversal, l1Src, plasma, frameQueue, bank, chInReader, batchQueue, attributesQueue}
return &DerivationPipeline{ return &DerivationPipeline{
log: log, log: log,
...@@ -94,26 +91,30 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L ...@@ -94,26 +91,30 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L
plasma: plasma, plasma: plasma,
resetting: 0, resetting: 0,
stages: stages, stages: stages,
eng: eng,
metrics: metrics, metrics: metrics,
traversal: l1Traversal, traversal: l1Traversal,
attrib: attributesQueue,
l2: l2Source,
} }
} }
// EngineReady returns true if the engine is ready to be used. // DerivationReady returns true if the derivation pipeline is ready to be used.
// When it's being reset its state is inconsistent, and should not be used externally. // When it's being reset its state is inconsistent, and should not be used externally.
func (dp *DerivationPipeline) EngineReady() bool { func (dp *DerivationPipeline) DerivationReady() bool {
return dp.resetting > 0 return dp.engineIsReset && dp.resetting > 0
} }
func (dp *DerivationPipeline) Reset() { func (dp *DerivationPipeline) Reset() {
dp.resetting = 0 dp.resetting = 0
dp.resetSysConfig = eth.SystemConfig{}
dp.resetL2Safe = eth.L2BlockRef{}
dp.engineIsReset = false
} }
// Origin is the L1 block of the inner-most stage of the derivation pipeline, // Origin is the L1 block of the inner-most stage of the derivation pipeline,
// i.e. the L1 chain up to and including this point included and/or produced all the safe L2 blocks. // i.e. the L1 chain up to and including this point included and/or produced all the safe L2 blocks.
func (dp *DerivationPipeline) Origin() eth.L1BlockRef { func (dp *DerivationPipeline) Origin() eth.L1BlockRef {
return dp.eng.Origin() return dp.origin
} }
// Step tries to progress the buffer. // Step tries to progress the buffer.
...@@ -122,31 +123,101 @@ func (dp *DerivationPipeline) Origin() eth.L1BlockRef { ...@@ -122,31 +123,101 @@ func (dp *DerivationPipeline) Origin() eth.L1BlockRef {
// Any other error is critical and the derivation pipeline should be reset. // Any other error is critical and the derivation pipeline should be reset.
// An error is expected when the underlying source closes. // An error is expected when the underlying source closes.
// When Step returns nil, it should be called again, to continue the derivation process. // When Step returns nil, it should be called again, to continue the derivation process.
func (dp *DerivationPipeline) Step(ctx context.Context) error { func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2BlockRef) (outAttrib *AttributesWithParent, outErr error) {
defer dp.metrics.RecordL1Ref("l1_derived", dp.Origin()) defer dp.metrics.RecordL1Ref("l1_derived", dp.Origin())
dp.metrics.SetDerivationIdle(false)
defer func() {
if outErr == io.EOF || errors.Is(outErr, EngineELSyncing) {
dp.metrics.SetDerivationIdle(true)
}
}()
// if any stages need to be reset, do that first. // if any stages need to be reset, do that first.
if dp.resetting < len(dp.stages) { if dp.resetting < len(dp.stages) {
if err := dp.stages[dp.resetting].Reset(ctx, dp.eng.Origin(), dp.eng.SystemConfig()); err == io.EOF { if !dp.engineIsReset {
dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.eng.Origin()) return nil, NewResetError(errors.New("cannot continue derivation until Engine has been reset"))
}
// After the Engine has been reset to ensure it is derived from the canonical L1 chain,
// we still need to internally rewind the L1 traversal further,
// so we can read all the L2 data necessary for constructing the next batches that come after the safe head.
if pendingSafeHead != dp.resetL2Safe {
if err := dp.initialReset(ctx, pendingSafeHead); err != nil {
return nil, fmt.Errorf("failed initial reset work: %w", err)
}
}
if err := dp.stages[dp.resetting].Reset(ctx, dp.origin, dp.resetSysConfig); err == io.EOF {
dp.log.Debug("reset of stage completed", "stage", dp.resetting, "origin", dp.origin)
dp.resetting += 1 dp.resetting += 1
return nil return nil, nil
} else if err != nil { } else if err != nil {
return fmt.Errorf("stage %d failed resetting: %w", dp.resetting, err) return nil, fmt.Errorf("stage %d failed resetting: %w", dp.resetting, err)
} else { } else {
return nil return nil, nil
} }
} }
// Now step the engine queue. It will pull earlier data as needed. prevOrigin := dp.origin
if err := dp.eng.Step(ctx); err == io.EOF { newOrigin := dp.attrib.Origin()
if prevOrigin != newOrigin {
// Check if the L2 unsafe head origin is consistent with the new origin
if err := VerifyNewL1Origin(ctx, prevOrigin, dp.l1Fetcher, newOrigin); err != nil {
return nil, fmt.Errorf("failed to verify L1 origin transition: %w", err)
}
dp.origin = newOrigin
}
if attrib, err := dp.attrib.NextAttributes(ctx, pendingSafeHead); err == nil {
return attrib, nil
} else if err == io.EOF {
// If every stage has returned io.EOF, try to advance the L1 Origin // If every stage has returned io.EOF, try to advance the L1 Origin
return dp.traversal.AdvanceL1Block(ctx) return nil, dp.traversal.AdvanceL1Block(ctx)
} else if errors.Is(err, EngineELSyncing) { } else if errors.Is(err, EngineELSyncing) {
return err return nil, err
} else if err != nil {
return fmt.Errorf("engine stage failed: %w", err)
} else { } else {
return nil return nil, fmt.Errorf("derivation failed: %w", err)
}
}
// initialReset does the initial reset work of finding the L1 point to rewind back to
func (dp *DerivationPipeline) initialReset(ctx context.Context, resetL2Safe eth.L2BlockRef) error {
dp.log.Info("Rewinding derivation-pipeline L1 traversal to handle reset")
// Walk back L2 chain to find the L1 origin that is old enough to start buffering channel data from.
pipelineL2 := resetL2Safe
l1Origin := resetL2Safe.L1Origin
for {
afterL2Genesis := pipelineL2.Number > dp.rollupCfg.Genesis.L2.Number
afterL1Genesis := pipelineL2.L1Origin.Number > dp.rollupCfg.Genesis.L1.Number
afterChannelTimeout := pipelineL2.L1Origin.Number+dp.rollupCfg.ChannelTimeout > l1Origin.Number
if afterL2Genesis && afterL1Genesis && afterChannelTimeout {
parent, err := dp.l2.L2BlockRefByHash(ctx, pipelineL2.ParentHash)
if err != nil {
return NewResetError(fmt.Errorf("failed to fetch L2 parent block %s", pipelineL2.ParentID()))
}
pipelineL2 = parent
} else {
break
}
} }
pipelineOrigin, err := dp.l1Fetcher.L1BlockRefByHash(ctx, pipelineL2.L1Origin.Hash)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %s; err: %w", pipelineL2.L1Origin, err))
}
sysCfg, err := dp.l2.SystemConfigByL2Hash(ctx, pipelineL2.Hash)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to fetch L1 config of L2 block %s: %w", pipelineL2.ID(), err))
}
dp.origin = pipelineOrigin
dp.resetSysConfig = sysCfg
dp.resetL2Safe = resetL2Safe
return nil
}
func (dp *DerivationPipeline) ConfirmEngineReset() {
dp.engineIsReset = true
} }
...@@ -60,9 +60,18 @@ type L2Chain interface { ...@@ -60,9 +60,18 @@ type L2Chain interface {
type DerivationPipeline interface { type DerivationPipeline interface {
Reset() Reset()
Step(ctx context.Context) error Step(ctx context.Context, pendingSafeHead eth.L2BlockRef) (*derive.AttributesWithParent, error)
Origin() eth.L1BlockRef Origin() eth.L1BlockRef
EngineReady() bool DerivationReady() bool
ConfirmEngineReset()
}
type EngineController interface {
derive.LocalEngineControl
IsEngineSyncing() bool
InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error
TryUpdateEngine(ctx context.Context) error
TryBackupUnsafeReorg(ctx context.Context) (bool, error)
} }
type CLSync interface { type CLSync interface {
...@@ -71,6 +80,11 @@ type CLSync interface { ...@@ -71,6 +80,11 @@ type CLSync interface {
Proceed(ctx context.Context) error Proceed(ctx context.Context) error
} }
type AttributesHandler interface {
SetAttributes(attributes *derive.AttributesWithParent)
Proceed(ctx context.Context) error
}
type Finalizer interface { type Finalizer interface {
Finalize(ctx context.Context, ref eth.L1BlockRef) Finalize(ctx context.Context, ref eth.L1BlockRef)
FinalizedL1() eth.L1BlockRef FinalizedL1() eth.L1BlockRef
...@@ -134,7 +148,7 @@ type SequencerStateListener interface { ...@@ -134,7 +148,7 @@ type SequencerStateListener interface {
SequencerStopped() error SequencerStopped() error
} }
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. // NewDriver composes an events handler that tracks L1 state, triggers L2 Derivation, and optionally sequences new L2 blocks.
func NewDriver( func NewDriver(
driverCfg *Config, driverCfg *Config,
cfg *rollup.Config, cfg *rollup.Config,
...@@ -168,19 +182,22 @@ func NewDriver( ...@@ -168,19 +182,22 @@ func NewDriver(
} }
attributesHandler := attributes.NewAttributesHandler(log, cfg, engine, l2) attributesHandler := attributes.NewAttributesHandler(log, cfg, engine, l2)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, plasma, l2, engine, derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, plasma, l2, metrics)
metrics, syncCfg, safeHeadListener, finalizer, attributesHandler)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics. meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics.
sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics) sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics)
driverCtx, driverCancel := context.WithCancel(context.Background()) driverCtx, driverCancel := context.WithCancel(context.Background())
asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics) asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics)
return &Driver{ return &Driver{
l1State: l1State, l1State: l1State,
derivation: derivationPipeline, SyncDeriver: &SyncDeriver{
clSync: clSync, Derivation: derivationPipeline,
finalizer: finalizer, Finalizer: finalizer,
engineController: engine, AttributesHandler: attributesHandler,
SafeHeadNotifs: safeHeadListener,
CLSync: clSync,
Engine: engine,
},
stateReq: make(chan chan struct{}), stateReq: make(chan chan struct{}),
forceReset: make(chan chan struct{}, 10), forceReset: make(chan chan struct{}, 10),
startSequencer: make(chan hashAndErrorChannel, 10), startSequencer: make(chan hashAndErrorChannel, 10),
......
This diff is collapsed.
...@@ -9,10 +9,10 @@ import ( ...@@ -9,10 +9,10 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/attributes" "github.com/ethereum-optimism/optimism/op-node/rollup/attributes"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma" plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
...@@ -24,8 +24,16 @@ type Derivation interface { ...@@ -24,8 +24,16 @@ type Derivation interface {
Step(ctx context.Context) error Step(ctx context.Context) error
} }
type EngineState interface { type Pipeline interface {
Step(ctx context.Context, pendingSafeHead eth.L2BlockRef) (outAttrib *derive.AttributesWithParent, outErr error)
ConfirmEngineReset()
}
type Engine interface {
SafeL2Head() eth.L2BlockRef SafeL2Head() eth.L2BlockRef
PendingSafeL2Head() eth.L2BlockRef
TryUpdateEngine(ctx context.Context) error
derive.ResetEngineControl
} }
type L2Source interface { type L2Source interface {
...@@ -33,22 +41,60 @@ type L2Source interface { ...@@ -33,22 +41,60 @@ type L2Source interface {
L2OutputRoot(uint64) (eth.Bytes32, error) L2OutputRoot(uint64) (eth.Bytes32, error)
} }
type NoopFinalizer struct{} type Deriver interface {
SafeL2Head() eth.L2BlockRef
SyncStep(ctx context.Context) error
}
func (n NoopFinalizer) OnDerivationL1End(ctx context.Context, derivedFrom eth.L1BlockRef) error { type MinimalSyncDeriver struct {
return nil logger log.Logger
pipeline Pipeline
attributesHandler driver.AttributesHandler
l1Source derive.L1Fetcher
l2Source L2Source
engine Engine
syncCfg *sync.Config
initialResetDone bool
cfg *rollup.Config
}
func (d *MinimalSyncDeriver) SafeL2Head() eth.L2BlockRef {
return d.engine.SafeL2Head()
} }
func (n NoopFinalizer) PostProcessSafeL2(l2Safe eth.L2BlockRef, derivedFrom eth.L1BlockRef) {} func (d *MinimalSyncDeriver) SyncStep(ctx context.Context) error {
if !d.initialResetDone {
if err := d.engine.TryUpdateEngine(ctx); !errors.Is(err, derive.ErrNoFCUNeeded) {
return err
}
if err := derive.ResetEngine(ctx, d.logger, d.cfg, d.engine, d.l1Source, d.l2Source, d.syncCfg, nil); err != nil {
return err
}
d.pipeline.ConfirmEngineReset()
d.initialResetDone = true
}
func (n NoopFinalizer) Reset() {} if err := d.engine.TryUpdateEngine(ctx); !errors.Is(err, derive.ErrNoFCUNeeded) {
return err
}
if err := d.attributesHandler.Proceed(ctx); err != io.EOF {
// EOF error means we can't process the next attributes. Then we should derive the next attributes.
return err
}
var _ derive.FinalizerHooks = (*NoopFinalizer)(nil) attrib, err := d.pipeline.Step(ctx, d.engine.PendingSafeL2Head())
if err != nil {
return err
}
d.attributesHandler.SetAttributes(attrib)
return nil
}
type Driver struct { type Driver struct {
logger log.Logger logger log.Logger
pipeline Derivation
engine EngineState deriver Deriver
l2OutputRoot func(uint64) (eth.Bytes32, error) l2OutputRoot func(uint64) (eth.Bytes32, error)
targetBlockNum uint64 targetBlockNum uint64
} }
...@@ -56,12 +102,20 @@ type Driver struct { ...@@ -56,12 +102,20 @@ type Driver struct {
func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver { func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l1BlobsSource derive.L1BlobsFetcher, l2Source L2Source, targetBlockNum uint64) *Driver {
engine := derive.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync) engine := derive.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync)
attributesHandler := attributes.NewAttributesHandler(logger, cfg, engine, l2Source) attributesHandler := attributes.NewAttributesHandler(logger, cfg, engine, l2Source)
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, engine, metrics.NoopMetrics, &sync.Config{}, safedb.Disabled, NoopFinalizer{}, attributesHandler) syncCfg := &sync.Config{SyncMode: sync.CLSync}
pipeline.Reset() pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, metrics.NoopMetrics)
return &Driver{ return &Driver{
logger: logger, logger: logger,
pipeline: pipeline, deriver: &MinimalSyncDeriver{
engine: engine, logger: logger,
pipeline: pipeline,
attributesHandler: attributesHandler,
l1Source: l1Source,
l2Source: l2Source,
engine: engine,
syncCfg: syncCfg,
cfg: cfg,
},
l2OutputRoot: l2Source.L2OutputRoot, l2OutputRoot: l2Source.L2OutputRoot,
targetBlockNum: targetBlockNum, targetBlockNum: targetBlockNum,
} }
...@@ -72,17 +126,14 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, ...@@ -72,17 +126,14 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher,
// Returns io.EOF if the derivation completed successfully // Returns io.EOF if the derivation completed successfully
// Returns a non-EOF error if the derivation failed // Returns a non-EOF error if the derivation failed
func (d *Driver) Step(ctx context.Context) error { func (d *Driver) Step(ctx context.Context) error {
if err := d.pipeline.Step(ctx); errors.Is(err, io.EOF) { if err := d.deriver.SyncStep(ctx); errors.Is(err, io.EOF) {
d.logger.Info("Derivation complete: reached L1 head", "head", d.engine.SafeL2Head()) d.logger.Info("Derivation complete: reached L1 head", "head", d.deriver.SafeL2Head())
return io.EOF return io.EOF
} else if errors.Is(err, derive.NotEnoughData) { } else if errors.Is(err, derive.NotEnoughData) {
head := d.engine.SafeL2Head() // NotEnoughData is not handled differently than a nil error.
if head.Number >= d.targetBlockNum { // This used to be returned by the EngineQueue when a block was derived, but also other stages.
d.logger.Info("Derivation complete: reached L2 block", "head", head) // Instead, every driver-loop iteration we check if the target block number has been reached.
return io.EOF
}
d.logger.Debug("Data is lacking") d.logger.Debug("Data is lacking")
return nil
} else if errors.Is(err, derive.ErrTemporary) { } else if errors.Is(err, derive.ErrTemporary) {
// While most temporary errors are due to requests for external data failing which can't happen, // While most temporary errors are due to requests for external data failing which can't happen,
// they may also be returned due to other events like channels timing out so need to be handled // they may also be returned due to other events like channels timing out so need to be handled
...@@ -91,11 +142,16 @@ func (d *Driver) Step(ctx context.Context) error { ...@@ -91,11 +142,16 @@ func (d *Driver) Step(ctx context.Context) error {
} else if err != nil { } else if err != nil {
return fmt.Errorf("pipeline err: %w", err) return fmt.Errorf("pipeline err: %w", err)
} }
head := d.deriver.SafeL2Head()
if head.Number >= d.targetBlockNum {
d.logger.Info("Derivation complete: reached L2 block", "head", head)
return io.EOF
}
return nil return nil
} }
func (d *Driver) SafeHead() eth.L2BlockRef { func (d *Driver) SafeHead() eth.L2BlockRef {
return d.engine.SafeL2Head() return d.deriver.SafeL2Head()
} }
func (d *Driver) ValidateClaim(l2ClaimBlockNum uint64, claimedOutputRoot eth.Bytes32) error { func (d *Driver) ValidateClaim(l2ClaimBlockNum uint64, claimedOutputRoot eth.Bytes32) error {
......
...@@ -131,25 +131,25 @@ func createDriver(t *testing.T, derivationResult error) *Driver { ...@@ -131,25 +131,25 @@ func createDriver(t *testing.T, derivationResult error) *Driver {
} }
func createDriverWithNextBlock(t *testing.T, derivationResult error, nextBlockNum uint64) *Driver { func createDriverWithNextBlock(t *testing.T, derivationResult error, nextBlockNum uint64) *Driver {
derivation := &stubDerivation{nextErr: derivationResult, nextBlockNum: nextBlockNum} derivation := &stubDeriver{nextErr: derivationResult, nextBlockNum: nextBlockNum}
return &Driver{ return &Driver{
logger: testlog.Logger(t, log.LevelDebug), logger: testlog.Logger(t, log.LevelDebug),
pipeline: derivation, deriver: derivation,
engine: derivation, l2OutputRoot: nil,
targetBlockNum: 1_000_000, targetBlockNum: 1_000_000,
} }
} }
type stubDerivation struct { type stubDeriver struct {
nextErr error nextErr error
nextBlockNum uint64 nextBlockNum uint64
} }
func (s stubDerivation) Step(ctx context.Context) error { func (s *stubDeriver) SyncStep(ctx context.Context) error {
return s.nextErr return s.nextErr
} }
func (s stubDerivation) SafeL2Head() eth.L2BlockRef { func (s *stubDeriver) SafeL2Head() eth.L2BlockRef {
return eth.L2BlockRef{ return eth.L2BlockRef{
Number: s.nextBlockNum, Number: s.nextBlockNum,
} }
......
...@@ -67,3 +67,5 @@ func (n *TestRPCMetrics) RecordRPCClientRequest(method string) func(err error) { ...@@ -67,3 +67,5 @@ func (n *TestRPCMetrics) RecordRPCClientRequest(method string) func(err error) {
} }
func (n *TestRPCMetrics) RecordRPCClientResponse(method string, err error) {} func (n *TestRPCMetrics) RecordRPCClientResponse(method string, err error) {}
func (t *TestDerivationMetrics) SetDerivationIdle(idle bool) {}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment