Commit a7e473e1 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Initialize EngineController inside the Driver (#8966)

This moves the EngineController up to be able to use it without having
to intialize the Derivation Pipeline.
parent 49abfb92
......@@ -49,7 +49,7 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, c
}
return &L2Sequencer{
L2Verifier: *ver,
sequencer: driver.NewSequencer(log, cfg, ver.derivation, attrBuilder, l1OriginSelector, metrics.NoopMetrics),
sequencer: driver.NewSequencer(log, cfg, ver.engine, attrBuilder, l1OriginSelector, metrics.NoopMetrics),
mockL1OriginSelector: l1OriginSelector,
failL2GossipUnsafeBlock: nil,
}
......@@ -104,7 +104,7 @@ func (s *L2Sequencer) ActL2EndBlock(t Testing) {
// ActL2KeepL1Origin makes the sequencer use the current L1 origin, even if the next origin is available.
func (s *L2Sequencer) ActL2KeepL1Origin(t Testing) {
parent := s.derivation.UnsafeL2Head()
parent := s.engine.UnsafeL2Head()
// force old origin, for testing purposes
oldOrigin, err := s.l1.L1BlockRefByHash(t.Ctx(), parent.L1Origin.Hash)
require.NoError(t, err, "failed to get current origin: %s", parent.L1Origin)
......@@ -113,7 +113,7 @@ func (s *L2Sequencer) ActL2KeepL1Origin(t Testing) {
// ActBuildToL1Head builds empty blocks until (incl.) the L1 head becomes the L2 origin
func (s *L2Sequencer) ActBuildToL1Head(t Testing) {
for s.derivation.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number {
for s.engine.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number {
s.ActL2PipelineFull(t)
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
......@@ -122,7 +122,7 @@ func (s *L2Sequencer) ActBuildToL1Head(t Testing) {
// ActBuildToL1HeadUnsafe builds empty blocks until (incl.) the L1 head becomes the L1 origin of the L2 head
func (s *L2Sequencer) ActBuildToL1HeadUnsafe(t Testing) {
for s.derivation.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number {
for s.engine.UnsafeL2Head().L1Origin.Number < s.l1State.L1Head().Number {
// Note: the derivation pipeline does not run, we are just sequencing a block on top of the existing L2 chain.
s.ActL2StartBlock(t)
s.ActL2EndBlock(t)
......@@ -133,7 +133,7 @@ func (s *L2Sequencer) ActBuildToL1HeadUnsafe(t Testing) {
func (s *L2Sequencer) ActBuildToL1HeadExcl(t Testing) {
for {
s.ActL2PipelineFull(t)
nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.derivation.UnsafeL2Head())
nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.engine.UnsafeL2Head())
require.NoError(t, err)
if nextOrigin.Number >= s.l1State.L1Head().Number {
break
......@@ -147,7 +147,7 @@ func (s *L2Sequencer) ActBuildToL1HeadExcl(t Testing) {
func (s *L2Sequencer) ActBuildToL1HeadExclUnsafe(t Testing) {
for {
// Note: the derivation pipeline does not run, we are just sequencing a block on top of the existing L2 chain.
nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.derivation.UnsafeL2Head())
nextOrigin, err := s.mockL1OriginSelector.FindL1Origin(t.Ctx(), s.engine.UnsafeL2Head())
require.NoError(t, err)
if nextOrigin.Number >= s.l1State.L1Head().Number {
break
......
......@@ -33,6 +33,7 @@ type L2Verifier struct {
}
// L2 rollup
engine *derive.EngineController
derivation *derive.DerivationPipeline
l1 derive.L1Fetcher
......@@ -59,12 +60,14 @@ type L2API interface {
func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config) *L2Verifier {
metrics := &testutils.TestDerivationMetrics{}
pipeline := derive.NewDerivationPipeline(log, cfg, l1, nil, eng, metrics, syncCfg)
engine := derive.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode)
pipeline := derive.NewDerivationPipeline(log, cfg, l1, nil, eng, engine, metrics, syncCfg)
pipeline.Reset()
rollupNode := &L2Verifier{
log: log,
eng: eng,
engine: engine,
derivation: pipeline,
l1: l1,
l1State: driver.NewL1State(log, metrics),
......@@ -132,19 +135,19 @@ func (s *l2VerifierBackend) OnUnsafeL2Payload(ctx context.Context, payload *eth.
}
func (s *L2Verifier) L2Finalized() eth.L2BlockRef {
return s.derivation.Finalized()
return s.engine.Finalized()
}
func (s *L2Verifier) L2Safe() eth.L2BlockRef {
return s.derivation.SafeL2Head()
return s.engine.SafeL2Head()
}
func (s *L2Verifier) L2PendingSafe() eth.L2BlockRef {
return s.derivation.PendingSafeL2Head()
return s.engine.PendingSafeL2Head()
}
func (s *L2Verifier) L2Unsafe() eth.L2BlockRef {
return s.derivation.UnsafeL2Head()
return s.engine.UnsafeL2Head()
}
func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
......@@ -158,7 +161,6 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
SafeL2: s.L2Safe(),
FinalizedL2: s.L2Finalized(),
PendingSafeL2: s.L2PendingSafe(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
}
}
......
......@@ -833,11 +833,11 @@ func SyncAfterReorg(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
miner.ActL1SetFeeRecipient(common.Address{'A', 0})
miner.ActEmptyBlock(t)
sequencer.ActL1HeadSignal(t)
for sequencer.derivation.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
// build L2 blocks until the L1 origin is the current L1 head(A0)
sequencer.ActL2PipelineFull(t)
sequencer.ActL2StartBlock(t)
if sequencer.derivation.UnsafeL2Head().Number == 11 {
if sequencer.engine.UnsafeL2Head().Number == 11 {
// include a user tx at L2 block #12 to make a state transition
alice.L2.ActResetTxOpts(t)
alice.L2.ActSetTxToAddr(&dp.Addresses.Bob)(t)
......
......@@ -509,7 +509,7 @@ func TestSpanBatchLowThroughputChain(gt *testing.T) {
totalTxCount := 0
// Make 600 L2 blocks (L1BlockTime / L2BlockTime * 50) including 1~3 txs
for i := 0; i < 50; i++ {
for sequencer.derivation.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
sequencer.ActL2StartBlock(t)
// fill the block with random number of L2 txs
for j := 0; j < rand.Intn(3); j++ {
......@@ -646,7 +646,7 @@ func TestBatchEquivalence(gt *testing.T) {
sequencer.ActL2PipelineFull(t)
totalTxCount := 0
// Build random blocks
for sequencer.derivation.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
for sequencer.engine.UnsafeL2Head().L1Origin.Number < sequencer.l1State.L1Head().Number {
sequencer.ActL2StartBlock(t)
// fill the block with random number of L2 txs
for j := 0; j < rand.Intn(3); j++ {
......
......@@ -161,7 +161,6 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus {
SafeL2: testutils.RandomL2BlockRef(rng),
FinalizedL2: testutils.RandomL2BlockRef(rng),
PendingSafeL2: testutils.RandomL2BlockRef(rng),
UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng),
}
}
......
......@@ -32,7 +32,7 @@ type EngineController struct {
// Block Head State
unsafeHead eth.L2BlockRef
pendingSafeHead eth.L2BlockRef
pendingSafeHead eth.L2BlockRef // L2 block processed from the middle of a span batch, but not marked as the safe block yet.
safeHead eth.L2BlockRef
finalizedHead eth.L2BlockRef
needFCUCall bool
......@@ -165,7 +165,6 @@ func (e *EngineController) ConfirmPayload(ctx context.Context) (out *eth.Executi
e.unsafeHead = ref
e.metrics.RecordL2Ref("l2_unsafe", ref)
e.metrics.RecordL2Ref("l2_engineSyncTarget", ref)
if e.buildingSafe {
e.metrics.RecordL2Ref("l2_pending_safe", ref)
e.pendingSafeHead = ref
......
......@@ -153,15 +153,13 @@ type EngineQueue struct {
syncCfg *sync.Config
}
var _ EngineControl = (*EngineQueue)(nil)
// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config) *EngineQueue {
func NewEngineQueue(log log.Logger, cfg *rollup.Config, l2Source L2Source, engine LocalEngineControl, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config) *EngineQueue {
return &EngineQueue{
log: log,
cfg: cfg,
ec: NewEngineController(engine, log, metrics, cfg, syncCfg.SyncMode),
engine: engine,
ec: engine,
engine: l2Source,
metrics: metrics,
finalityData: make([]FinalityData, 0, finalityLookback),
unsafePayloads: NewPayloadsQueue(maxUnsafePayloadsMemory, payloadMemSize),
......@@ -225,20 +223,17 @@ func (eq *EngineQueue) FinalizedL1() eth.L1BlockRef {
return eq.finalizedL1
}
func (eq *EngineQueue) Finalized() eth.L2BlockRef {
return eq.ec.Finalized()
}
func (eq *EngineQueue) UnsafeL2Head() eth.L2BlockRef {
return eq.ec.UnsafeL2Head()
}
func (eq *EngineQueue) SafeL2Head() eth.L2BlockRef {
return eq.ec.SafeL2Head()
}
func (eq *EngineQueue) PendingSafeL2Head() eth.L2BlockRef {
return eq.ec.PendingSafeL2Head()
// LowestQueuedUnsafeBlock returns the block
func (eq *EngineQueue) LowestQueuedUnsafeBlock() eth.L2BlockRef {
payload := eq.unsafePayloads.Peek()
if payload == nil {
return eth.L2BlockRef{}
}
ref, err := PayloadToBlockRef(eq.cfg, payload)
if err != nil {
return eth.L2BlockRef{}
}
return ref
}
// Determine if the engine is syncing to the target block
......@@ -261,7 +256,7 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
// EOF error means we can't process the next unsafe payload. Then we should process next safe attributes.
}
if eq.isEngineSyncing() {
// Make pipeline first focus to sync unsafe blocks to engineSyncTarget
// The pipeline cannot move forwards if doing EL sync.
return EngineELSyncing
}
if eq.safeAttributes != nil {
......
......@@ -248,12 +248,13 @@ func TestEngineQueue_Finalize(t *testing.T) {
prev := &fakeAttributesQueue{}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB, eq.Origin(), "Expecting to be set back derivation L1 progress to B")
require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps")
require.Equal(t, refA1, ec.Finalized(), "A1 is recognized as finalized before we run any steps")
// now say C1 was included in D and became the new safe head
eq.origin = refD
......@@ -270,7 +271,7 @@ func TestEngineQueue_Finalize(t *testing.T) {
// let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E)
eq.Finalize(refD)
require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized")
require.Equal(t, refC1, ec.Finalized(), "C1 was included in finalized D, and should now be finalized")
l1F.AssertExpectations(t)
eng.AssertExpectations(t)
......@@ -483,12 +484,13 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) {
prev := &fakeAttributesQueue{origin: refE}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB, eq.Origin(), "Expecting to be set back derivation L1 progress to B")
require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps")
require.Equal(t, refA1, ec.Finalized(), "A1 is recognized as finalized before we run any steps")
// First step after reset will do a fork choice update
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
......@@ -508,7 +510,6 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) {
ParentHash: refE.Hash,
Time: refF.Time,
}
eq.UnsafeL2Head()
err = eq.Step(context.Background())
require.ErrorIs(t, err, ErrReset, "should reset pipeline due to mismatched origin")
......@@ -813,12 +814,13 @@ func TestVerifyNewL1Origin(t *testing.T) {
}, nil)
prev := &fakeAttributesQueue{origin: refE}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB1, ec.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB, eq.Origin(), "Expecting to be set back derivation L1 progress to B")
require.Equal(t, refA1, eq.Finalized(), "A1 is recognized as finalized before we run any steps")
require.Equal(t, refA1, ec.Finalized(), "A1 is recognized as finalized before we run any steps")
// First step after reset will do a fork choice update
eng.ExpectForkchoiceUpdate(&eth.ForkchoiceState{
......@@ -833,7 +835,6 @@ func TestVerifyNewL1Origin(t *testing.T) {
// L1 chain reorgs so new origin is at same slot as refF but on a different fork
prev.origin = test.newOrigin
eq.UnsafeL2Head()
err = eq.Step(context.Background())
if test.expectReset {
require.ErrorIs(t, err, ErrReset, "should reset pipeline due to mismatched origin")
......@@ -910,7 +911,8 @@ func TestBlockBuildingRace(t *testing.T) {
}
prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{})
ec := NewEngineController(eng, logger, metrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics, prev, l1F, &sync.Config{})
require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF)
id := eth.PayloadID{0xff}
......@@ -1001,7 +1003,7 @@ func TestBlockBuildingRace(t *testing.T) {
// Now complete the job, as external user of the engine
_, _, err = eq.ConfirmPayload(context.Background())
require.NoError(t, err)
require.Equal(t, refA1, eq.SafeL2Head(), "safe head should have changed")
require.Equal(t, refA1, ec.SafeL2Head(), "safe head should have changed")
require.NoError(t, eq.Step(context.Background()))
require.Nil(t, eq.safeAttributes, "attributes should now be invalidated")
......@@ -1080,7 +1082,8 @@ func TestResetLoop(t *testing.T) {
prev := &fakeAttributesQueue{origin: refA, attrs: attrs, islastInSpan: true}
eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{})
ec := NewEngineController(eng, logger, metrics.NoopMetrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{})
eq.ec.SetUnsafeHead(refA2)
eq.ec.SetSafeHead(refA1)
eq.ec.SetFinalizedHead(refA0)
......@@ -1185,7 +1188,8 @@ func TestEngineQueue_StepPopOlderUnsafe(t *testing.T) {
prev := &fakeAttributesQueue{origin: refA}
eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{})
ec := NewEngineController(eng, logger, metrics.NoopMetrics, &rollup.Config{}, sync.CLSync)
eq := NewEngineQueue(logger, cfg, eng, ec, metrics.NoopMetrics, prev, l1F, &sync.Config{})
eq.ec.SetUnsafeHead(refA2)
eq.ec.SetSafeHead(refA0)
eq.ec.SetFinalizedHead(refA0)
......
......@@ -32,33 +32,19 @@ type L1Fetcher interface {
L1TransactionFetcher
}
// ResettableEngineControl wraps EngineControl with reset-functionality,
// which handles reorgs like the derivation pipeline:
// by determining the last valid block references to continue from.
type ResettableEngineControl interface {
EngineControl
Reset()
}
type ResettableStage interface {
// Reset resets a pull stage. `base` refers to the L1 Block Reference to reset to, with corresponding configuration.
Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error
}
type EngineQueueStage interface {
EngineControl
LowestQueuedUnsafeBlock() eth.L2BlockRef
FinalizedL1() eth.L1BlockRef
Finalized() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
PendingSafeL2Head() eth.L2BlockRef
Origin() eth.L1BlockRef
SystemConfig() eth.SystemConfig
Finalize(l1Origin eth.L1BlockRef)
AddUnsafePayload(payload *eth.ExecutionPayload)
UnsafeL2SyncTarget() eth.L2BlockRef
Step(context.Context) error
}
......@@ -81,7 +67,8 @@ type DerivationPipeline struct {
}
// NewDerivationPipeline creates a derivation pipeline, which should be reset before use.
func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, engine Engine, metrics Metrics, syncCfg *sync.Config) *DerivationPipeline {
func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L1Fetcher, l1Blobs L1BlobsFetcher, l2Source L2Source, engine LocalEngineControl, metrics Metrics, syncCfg *sync.Config) *DerivationPipeline {
// Pull stages
l1Traversal := NewL1Traversal(log, rollupCfg, l1Fetcher)
......@@ -90,12 +77,12 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L
frameQueue := NewFrameQueue(log, l1Src)
bank := NewChannelBank(log, rollupCfg, frameQueue, l1Fetcher, metrics)
chInReader := NewChannelInReader(rollupCfg, log, bank, metrics)
batchQueue := NewBatchQueue(log, rollupCfg, chInReader, engine)
attrBuilder := NewFetchingAttributesBuilder(rollupCfg, l1Fetcher, engine)
batchQueue := NewBatchQueue(log, rollupCfg, chInReader, l2Source)
attrBuilder := NewFetchingAttributesBuilder(rollupCfg, l1Fetcher, l2Source)
attributesQueue := NewAttributesQueue(log, rollupCfg, attrBuilder, batchQueue)
// Step stages
eng := NewEngineQueue(log, rollupCfg, engine, metrics, attributesQueue, l1Fetcher, syncCfg)
eng := NewEngineQueue(log, rollupCfg, l2Source, engine, metrics, attributesQueue, l1Fetcher, syncCfg)
// Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during
// the reset, but after the engine queue, this is the order in which the stages could talk to each other.
......@@ -140,47 +127,15 @@ func (dp *DerivationPipeline) FinalizedL1() eth.L1BlockRef {
return dp.eng.FinalizedL1()
}
func (dp *DerivationPipeline) Finalized() eth.L2BlockRef {
return dp.eng.Finalized()
}
func (dp *DerivationPipeline) SafeL2Head() eth.L2BlockRef {
return dp.eng.SafeL2Head()
}
func (dp *DerivationPipeline) PendingSafeL2Head() eth.L2BlockRef {
return dp.eng.PendingSafeL2Head()
}
// UnsafeL2Head returns the head of the L2 chain that we are deriving for, this may be past what we derived from L1
func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef {
return dp.eng.UnsafeL2Head()
}
func (dp *DerivationPipeline) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *AttributesWithParent, updateSafe bool) (errType BlockInsertionErrType, err error) {
return dp.eng.StartPayload(ctx, parent, attrs, updateSafe)
}
func (dp *DerivationPipeline) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) {
return dp.eng.ConfirmPayload(ctx)
}
func (dp *DerivationPipeline) CancelPayload(ctx context.Context, force bool) error {
return dp.eng.CancelPayload(ctx, force)
}
func (dp *DerivationPipeline) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) {
return dp.eng.BuildingPayload()
}
// AddUnsafePayload schedules an execution payload to be processed, ahead of deriving it from L1
func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) {
dp.eng.AddUnsafePayload(payload)
}
// UnsafeL2SyncTarget retrieves the first queued-up L2 unsafe payload, or a zeroed reference if there is none.
func (dp *DerivationPipeline) UnsafeL2SyncTarget() eth.L2BlockRef {
return dp.eng.UnsafeL2SyncTarget()
// LowestQueuedUnsafeBlock returns the lowest queued unsafe block. If the gap is filled from the unsafe head
// to this block, the EngineQueue will be able to apply the queued payloads.
func (dp *DerivationPipeline) LowestQueuedUnsafeBlock() eth.L2BlockRef {
return dp.eng.LowestQueuedUnsafeBlock()
}
// Step tries to progress the buffer.
......
......@@ -56,15 +56,11 @@ type DerivationPipeline interface {
Reset()
Step(ctx context.Context) error
AddUnsafePayload(payload *eth.ExecutionPayload)
UnsafeL2SyncTarget() eth.L2BlockRef
Finalize(ref eth.L1BlockRef)
FinalizedL1() eth.L1BlockRef
Finalized() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
PendingSafeL2Head() eth.L2BlockRef
Origin() eth.L1BlockRef
EngineReady() bool
LowestQueuedUnsafeBlock() eth.L2BlockRef
}
type L1StateIface interface {
......@@ -122,15 +118,16 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, l1
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, l2, metrics, syncCfg)
engine := derive.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, l2, engine, metrics, syncCfg)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
engine := derivationPipeline
meteredEngine := NewMeteredEngine(cfg, engine, metrics, log)
meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics.
sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics)
driverCtx, driverCancel := context.WithCancel(context.Background())
return &Driver{
l1State: l1State,
derivation: derivationPipeline,
engineController: engine,
stateReq: make(chan chan struct{}),
forceReset: make(chan chan struct{}, 10),
startSequencer: make(chan hashAndErrorChannel, 10),
......
......@@ -21,7 +21,7 @@ type EngineMetrics interface {
// MeteredEngine wraps an EngineControl and adds metrics such as block building time diff and sealing time
type MeteredEngine struct {
inner derive.ResettableEngineControl
inner derive.EngineControl
cfg *rollup.Config
metrics EngineMetrics
......@@ -30,10 +30,7 @@ type MeteredEngine struct {
buildingStartTime time.Time
}
// MeteredEngine implements derive.ResettableEngineControl
var _ derive.ResettableEngineControl = (*MeteredEngine)(nil)
func NewMeteredEngine(cfg *rollup.Config, inner derive.ResettableEngineControl, metrics EngineMetrics, log log.Logger) *MeteredEngine {
func NewMeteredEngine(cfg *rollup.Config, inner derive.EngineControl, metrics EngineMetrics, log log.Logger) *MeteredEngine {
return &MeteredEngine{
inner: inner,
cfg: cfg,
......@@ -93,7 +90,3 @@ func (m *MeteredEngine) CancelPayload(ctx context.Context, force bool) error {
func (m *MeteredEngine) BuildingPayload() (onto eth.L2BlockRef, id eth.PayloadID, safe bool) {
return m.inner.BuildingPayload()
}
func (m *MeteredEngine) Reset() {
m.inner.Reset()
}
......@@ -34,7 +34,7 @@ type Sequencer struct {
log log.Logger
rollupCfg *rollup.Config
engine derive.ResettableEngineControl
engine derive.EngineControl
attrBuilder derive.AttributesBuilder
l1OriginSelector L1OriginSelectorIface
......@@ -47,7 +47,7 @@ type Sequencer struct {
nextAction time.Time
}
func NewSequencer(log log.Logger, rollupCfg *rollup.Config, engine derive.ResettableEngineControl, attributesBuilder derive.AttributesBuilder, l1OriginSelector L1OriginSelectorIface, metrics SequencerMetrics) *Sequencer {
func NewSequencer(log log.Logger, rollupCfg *rollup.Config, engine derive.EngineControl, attributesBuilder derive.AttributesBuilder, l1OriginSelector L1OriginSelectorIface, metrics SequencerMetrics) *Sequencer {
return &Sequencer{
log: log,
rollupCfg: rollupCfg,
......@@ -214,7 +214,7 @@ func (d *Sequencer) RunNextSequencerAction(ctx context.Context) (*eth.ExecutionP
d.metrics.RecordSequencerReset()
d.nextAction = d.timeNow().Add(time.Second * time.Duration(d.rollupCfg.BlockTime)) // hold off from sequencing for a full block
d.CancelBuildingBlock(ctx)
d.engine.Reset()
return nil, err
} else if errors.Is(err, derive.ErrTemporary) {
d.log.Error("sequencer failed temporarily to seal new block", "err", err)
d.nextAction = d.timeNow().Add(time.Second)
......@@ -239,7 +239,7 @@ func (d *Sequencer) RunNextSequencerAction(ctx context.Context) (*eth.ExecutionP
d.log.Error("sequencer failed to seal new block, requiring derivation reset", "err", err)
d.metrics.RecordSequencerReset()
d.nextAction = d.timeNow().Add(time.Second * time.Duration(d.rollupCfg.BlockTime)) // hold off from sequencing for a full block
d.engine.Reset()
return nil, err
} else if errors.Is(err, derive.ErrTemporary) {
d.log.Error("sequencer temporarily failed to start building new block", "err", err)
d.nextAction = d.timeNow().Add(time.Second)
......
......@@ -125,11 +125,7 @@ func (m *FakeEngineControl) resetBuildingState() {
m.buildingAttrs = nil
}
func (m *FakeEngineControl) Reset() {
m.err = nil
}
var _ derive.ResettableEngineControl = (*FakeEngineControl)(nil)
var _ derive.EngineControl = (*FakeEngineControl)(nil)
type testAttrBuilderFn func(ctx context.Context, l2Parent eth.L2BlockRef, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error)
......@@ -349,7 +345,11 @@ func TestSequencerChaosMonkey(t *testing.T) {
// no error
}
payload, err := seq.RunNextSequencerAction(context.Background())
// RunNextSequencerAction passes ErrReset & ErrCritical through.
// Only suppress ErrReset, not ErrCritical
if !errors.Is(err, derive.ErrReset) {
require.NoError(t, err)
}
if payload != nil {
require.Equal(t, engControl.UnsafeL2Head().ID(), payload.ID(), "head must stay in sync with emitted payloads")
var tx types.Transaction
......
......@@ -32,6 +32,10 @@ type Driver struct {
// The derivation pipeline determines the new l2Safe.
derivation DerivationPipeline
// The engine controller is used by the sequencer & derivation components.
// We will also use it for EL sync in a future PR.
engineController *derive.EngineController
// Requests to block the event loop for synchronous execution to avoid reading an inconsistent state
stateReq chan chan struct{}
......@@ -229,7 +233,7 @@ func (s *Driver) eventLoop() {
syncCheckInterval := time.Duration(s.config.BlockTime) * time.Second * 2
altSyncTicker := time.NewTicker(syncCheckInterval)
defer altSyncTicker.Stop()
lastUnsafeL2 := s.derivation.UnsafeL2Head()
lastUnsafeL2 := s.engineController.UnsafeL2Head()
for {
if s.driverCtx.Err() != nil { // don't try to schedule/handle more work when we are closing.
......@@ -241,18 +245,18 @@ func (s *Driver) eventLoop() {
// And avoid sequencing if the derivation pipeline indicates the engine is not ready.
if s.driverConfig.SequencerEnabled && !s.driverConfig.SequencerStopped &&
s.l1State.L1Head() != (eth.L1BlockRef{}) && s.derivation.EngineReady() {
if s.driverConfig.SequencerMaxSafeLag > 0 && s.derivation.SafeL2Head().Number+s.driverConfig.SequencerMaxSafeLag <= s.derivation.UnsafeL2Head().Number {
if s.driverConfig.SequencerMaxSafeLag > 0 && s.engineController.SafeL2Head().Number+s.driverConfig.SequencerMaxSafeLag <= s.engineController.UnsafeL2Head().Number {
// If the safe head has fallen behind by a significant number of blocks, delay creating new blocks
// until the safe lag is below SequencerMaxSafeLag.
if sequencerCh != nil {
s.log.Warn(
"Delay creating new block since safe lag exceeds limit",
"safe_l2", s.derivation.SafeL2Head(),
"unsafe_l2", s.derivation.UnsafeL2Head(),
"safe_l2", s.engineController.SafeL2Head(),
"unsafe_l2", s.engineController.UnsafeL2Head(),
)
sequencerCh = nil
}
} else if s.sequencer.BuildingOnto().ID() != s.derivation.UnsafeL2Head().ID() {
} else if s.sequencer.BuildingOnto().ID() != s.engineController.UnsafeL2Head().ID() {
// If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action.
// This may adjust at any time based on fork-choice changes or previous errors.
//
......@@ -265,7 +269,7 @@ func (s *Driver) eventLoop() {
// If the engine is not ready, or if the L2 head is actively changing, then reset the alt-sync:
// there is no need to request L2 blocks when we are syncing already.
if head := s.derivation.UnsafeL2Head(); head != lastUnsafeL2 || !s.derivation.EngineReady() {
if head := s.engineController.UnsafeL2Head(); head != lastUnsafeL2 || !s.derivation.EngineReady() {
lastUnsafeL2 = head
altSyncTicker.Reset(syncCheckInterval)
}
......@@ -273,7 +277,9 @@ func (s *Driver) eventLoop() {
select {
case <-sequencerCh:
payload, err := s.sequencer.RunNextSequencerAction(s.driverCtx)
if err != nil {
if errors.Is(err, derive.ErrReset) {
s.derivation.Reset()
} else if err != nil {
s.log.Error("Sequencer critical error", "err", err)
return
}
......@@ -325,7 +331,7 @@ func (s *Driver) eventLoop() {
s.metrics.SetDerivationIdle(true)
continue
} else if err != nil && errors.Is(err, derive.EngineELSyncing) {
s.log.Debug("Derivation process went idle because the engine is syncing", "progress", s.derivation.Origin(), "unsafe_head", s.derivation.UnsafeL2Head(), "err", err)
s.log.Debug("Derivation process went idle because the engine is syncing", "progress", s.derivation.Origin(), "unsafe_head", s.engineController.UnsafeL2Head(), "err", err)
stepAttempts = 0
s.metrics.SetDerivationIdle(true)
continue
......@@ -362,7 +368,7 @@ func (s *Driver) eventLoop() {
s.metrics.RecordPipelineReset()
close(respCh)
case resp := <-s.startSequencer:
unsafeHead := s.derivation.UnsafeL2Head().Hash
unsafeHead := s.engineController.UnsafeL2Head().Hash
if !s.driverConfig.SequencerStopped {
resp.err <- errors.New("sequencer already running")
} else if !bytes.Equal(unsafeHead[:], resp.hash[:]) {
......@@ -390,7 +396,7 @@ func (s *Driver) eventLoop() {
// Cancel any inflight block building. If we don't cancel this, we can resume sequencing an old block
// even if we've received new unsafe heads in the interim, causing us to introduce a re-org.
s.sequencer.CancelBuildingBlock(s.driverCtx)
respCh <- hashAndError{hash: s.derivation.UnsafeL2Head().Hash}
respCh <- hashAndError{hash: s.engineController.UnsafeL2Head().Hash}
}
case respCh := <-s.sequencerActive:
respCh <- !s.driverConfig.SequencerStopped
......@@ -484,11 +490,10 @@ func (s *Driver) syncStatus() *eth.SyncStatus {
HeadL1: s.l1State.L1Head(),
SafeL1: s.l1State.L1Safe(),
FinalizedL1: s.l1State.L1Finalized(),
UnsafeL2: s.derivation.UnsafeL2Head(),
SafeL2: s.derivation.SafeL2Head(),
FinalizedL2: s.derivation.Finalized(),
PendingSafeL2: s.derivation.PendingSafeL2Head(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
UnsafeL2: s.engineController.UnsafeL2Head(),
SafeL2: s.engineController.SafeL2Head(),
FinalizedL2: s.engineController.Finalized(),
PendingSafeL2: s.engineController.PendingSafeL2Head(),
}
}
......@@ -537,9 +542,9 @@ func (s *Driver) snapshot(event string) {
"event", event,
"l1Head", deferJSONString{s.l1State.L1Head()},
"l1Current", deferJSONString{s.derivation.Origin()},
"l2Head", deferJSONString{s.derivation.UnsafeL2Head()},
"l2Safe", deferJSONString{s.derivation.SafeL2Head()},
"l2FinalizedHead", deferJSONString{s.derivation.Finalized()})
"l2Head", deferJSONString{s.engineController.UnsafeL2Head()},
"l2Safe", deferJSONString{s.engineController.SafeL2Head()},
"l2FinalizedHead", deferJSONString{s.engineController.Finalized()})
}
type hashAndError struct {
......@@ -556,8 +561,8 @@ type hashAndErrorChannel struct {
// WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved.
// Results are received through OnUnsafeL2Payload.
func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error {
start := s.derivation.UnsafeL2Head()
end := s.derivation.UnsafeL2SyncTarget()
start := s.engineController.UnsafeL2Head()
end := s.derivation.LowestQueuedUnsafeBlock()
// Check if we have missing blocks between the start and end. Request them if we do.
if end == (eth.L2BlockRef{}) {
s.log.Debug("requesting sync with open-end range", "start", start)
......
......@@ -20,6 +20,9 @@ var (
type Derivation interface {
Step(ctx context.Context) error
}
type EngineState interface {
SafeL2Head() eth.L2BlockRef
}
......@@ -31,16 +34,19 @@ type L2Source interface {
type Driver struct {
logger log.Logger
pipeline Derivation
engine EngineState
l2OutputRoot func(uint64) (eth.Bytes32, error)
targetBlockNum uint64
}
func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l2Source L2Source, targetBlockNum uint64) *Driver {
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, nil, l2Source, metrics.NoopMetrics, &sync.Config{})
engine := derive.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, sync.CLSync)
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, nil, l2Source, engine, metrics.NoopMetrics, &sync.Config{})
pipeline.Reset()
return &Driver{
logger: logger,
pipeline: pipeline,
engine: engine,
l2OutputRoot: l2Source.L2OutputRoot,
targetBlockNum: targetBlockNum,
}
......@@ -52,10 +58,10 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher,
// Returns a non-EOF error if the derivation failed
func (d *Driver) Step(ctx context.Context) error {
if err := d.pipeline.Step(ctx); errors.Is(err, io.EOF) {
d.logger.Info("Derivation complete: reached L1 head", "head", d.pipeline.SafeL2Head())
d.logger.Info("Derivation complete: reached L1 head", "head", d.engine.SafeL2Head())
return io.EOF
} else if errors.Is(err, derive.NotEnoughData) {
head := d.pipeline.SafeL2Head()
head := d.engine.SafeL2Head()
if head.Number >= d.targetBlockNum {
d.logger.Info("Derivation complete: reached L2 block", "head", head)
return io.EOF
......@@ -74,7 +80,7 @@ func (d *Driver) Step(ctx context.Context) error {
}
func (d *Driver) SafeHead() eth.L2BlockRef {
return d.pipeline.SafeL2Head()
return d.engine.SafeL2Head()
}
func (d *Driver) ValidateClaim(l2ClaimBlockNum uint64, claimedOutputRoot eth.Bytes32) error {
......
......@@ -109,6 +109,7 @@ func createDriverWithNextBlock(t *testing.T, derivationResult error, nextBlockNu
return &Driver{
logger: testlog.Logger(t, log.LvlDebug),
pipeline: derivation,
engine: derivation,
targetBlockNum: 1_000_000,
}
}
......
......@@ -34,7 +34,4 @@ type SyncStatus struct {
FinalizedL2 L2BlockRef `json:"finalized_l2"`
// PendingSafeL2 points to the L2 block processed from the batch, but not consolidated to the safe block yet.
PendingSafeL2 L2BlockRef `json:"pending_safe_l2"`
// UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block.
// It may be zeroed if there is no targeted block.
UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"`
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment