Commit 31653e5e authored by protolambda's avatar protolambda Committed by GitHub

op-node: change finalizer to use events (#10972)

parent e449dd28
......@@ -17,6 +17,7 @@ import (
batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -246,7 +247,7 @@ func L2Finalization(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
// If we get this false signal, we shouldn't finalize the L2 chain.
altBlock4 := sequencer.SyncStatus().SafeL1
altBlock4.Hash = common.HexToHash("0xdead")
sequencer.finalizer.Finalize(t.Ctx(), altBlock4)
sequencer.synchronousEvents.Emit(finality.FinalizeL1Event{FinalizedL1: altBlock4})
sequencer.ActL2PipelineFull(t)
require.Equal(t, uint64(3), sequencer.SyncStatus().FinalizedL1.Number)
require.Equal(t, heightToSubmit, sequencer.SyncStatus().FinalizedL2.Number, "unknown/bad finalized L1 blocks are ignored")
......
......@@ -96,6 +96,7 @@ func RunProposerTest(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
sequencer.ActL2PipelineFull(t)
sequencer.ActL1SafeSignal(t)
sequencer.ActL1FinalizedSignal(t)
sequencer.ActL2PipelineFull(t)
require.Equal(t, sequencer.SyncStatus().UnsafeL2, sequencer.SyncStatus().FinalizedL2)
require.True(t, proposer.CanPropose(t))
......
......@@ -98,9 +98,9 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
var finalizer driver.Finalizer
if cfg.PlasmaEnabled() {
finalizer = finality.NewPlasmaFinalizer(log, cfg, l1, ec, plasmaSrc)
finalizer = finality.NewPlasmaFinalizer(ctx, log, cfg, l1, synchronousEvents, plasmaSrc)
} else {
finalizer = finality.NewFinalizer(log, cfg, l1, ec)
finalizer = finality.NewFinalizer(ctx, log, cfg, l1, synchronousEvents)
}
attributesHandler := attributes.NewAttributesHandler(log, cfg, ctx, eng, synchronousEvents)
......@@ -153,6 +153,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
clSync,
pipelineDeriver,
attributesHandler,
finalizer,
}
t.Cleanup(rollupNode.rpc.Stop)
......@@ -288,13 +289,15 @@ func (s *L2Verifier) ActL1FinalizedSignal(t Testing) {
finalized, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Finalized)
require.NoError(t, err)
s.l1State.HandleNewL1FinalizedBlock(finalized)
s.finalizer.Finalize(t.Ctx(), finalized)
s.synchronousEvents.Emit(finality.FinalizeL1Event{FinalizedL1: finalized})
}
func (s *L2Verifier) OnEvent(ev rollup.Event) {
switch x := ev.(type) {
case rollup.L1TemporaryErrorEvent:
s.log.Warn("L1 temporary error", "err", x.Err)
case rollup.EngineTemporaryErrorEvent:
s.log.Warn("Derivation process temporary error", "err", x.Err)
s.log.Warn("Engine temporary error", "err", x.Err)
if errors.Is(x.Err, sync.WrongChainErr) { // action-tests don't back off on temporary errors. Avoid a bad genesis setup from looping.
panic(fmt.Errorf("genesis setup issue: %w", x.Err))
}
......
......@@ -149,6 +149,7 @@ func (eq *AttributesHandler) consolidateNextSafeAttributes(attributes *derive.At
eq.emitter.Emit(engine.PromotePendingSafeEvent{
Ref: ref,
Safe: attributes.IsLastInSpan,
DerivedFrom: attributes.DerivedFrom,
})
}
......
......@@ -25,6 +25,13 @@ func TestAttributesHandler(t *testing.T) {
rng := rand.New(rand.NewSource(1234))
refA := testutils.RandomBlockRef(rng)
refB := eth.L1BlockRef{
Hash: testutils.RandomHash(rng),
Number: refA.Number + 1,
ParentHash: refA.Hash,
Time: refA.Time + 12,
}
aL1Info := &testutils.MockBlockInfo{
InfoParentHash: refA.ParentHash,
InfoNum: refA.Number,
......@@ -263,6 +270,7 @@ func TestAttributesHandler(t *testing.T) {
Attributes: attrA1.Attributes, // attributes will match, passing consolidation
Parent: attrA1.Parent,
IsLastInSpan: lastInSpan,
DerivedFrom: refB,
}
emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
......@@ -276,6 +284,7 @@ func TestAttributesHandler(t *testing.T) {
emitter.ExpectOnce(engine.PromotePendingSafeEvent{
Ref: refA1,
Safe: lastInSpan, // last in span becomes safe instantaneously
DerivedFrom: refB,
})
ah.OnEvent(engine.PendingSafeUpdateEvent{
PendingSafe: refA0,
......
......@@ -9,7 +9,9 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type DeriverIdleEvent struct{}
type DeriverIdleEvent struct {
Origin eth.L1BlockRef
}
func (d DeriverIdleEvent) String() string {
return "derivation-idle"
......@@ -84,10 +86,10 @@ func (d *PipelineDeriver) OnEvent(ev rollup.Event) {
attrib, err := d.pipeline.Step(d.ctx, x.PendingSafe)
if err == io.EOF {
d.pipeline.log.Debug("Derivation process went idle", "progress", d.pipeline.Origin(), "err", err)
d.emitter.Emit(DeriverIdleEvent{})
d.emitter.Emit(DeriverIdleEvent{Origin: d.pipeline.Origin()})
} else if err != nil && errors.Is(err, EngineELSyncing) {
d.pipeline.log.Debug("Derivation process went idle because the engine is syncing", "progress", d.pipeline.Origin(), "err", err)
d.emitter.Emit(DeriverIdleEvent{})
d.emitter.Emit(DeriverIdleEvent{Origin: d.pipeline.Origin()})
} else if err != nil && errors.Is(err, ErrReset) {
d.emitter.Emit(rollup.ResetEvent{Err: err})
} else if err != nil && errors.Is(err, ErrTemporary) {
......
......@@ -91,9 +91,8 @@ type AttributesHandler interface {
}
type Finalizer interface {
Finalize(ctx context.Context, ref eth.L1BlockRef)
FinalizedL1() eth.L1BlockRef
engine.FinalizerHooks
rollup.Deriver
}
type PlasmaIface interface {
......@@ -186,9 +185,9 @@ func NewDriver(
var finalizer Finalizer
if cfg.PlasmaEnabled() {
finalizer = finality.NewPlasmaFinalizer(log, cfg, l1, ec, plasma)
finalizer = finality.NewPlasmaFinalizer(driverCtx, log, cfg, l1, synchronousEvents, plasma)
} else {
finalizer = finality.NewFinalizer(log, cfg, l1, ec)
finalizer = finality.NewFinalizer(driverCtx, log, cfg, l1, synchronousEvents)
}
attributesHandler := attributes.NewAttributesHandler(log, cfg, driverCtx, l2, synchronousEvents)
......@@ -254,6 +253,7 @@ func NewDriver(
clSync,
pipelineDeriver,
attributesHandler,
finalizer,
}
return driver
......
......@@ -18,6 +18,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -312,9 +313,7 @@ func (s *Driver) eventLoop() {
// no step, justified L1 information does not do anything for L2 derivation or status
case newL1Finalized := <-s.l1FinalizedSig:
s.l1State.HandleNewL1FinalizedBlock(newL1Finalized)
ctx, cancel := context.WithTimeout(s.driverCtx, time.Second*5)
s.Finalizer.Finalize(ctx, newL1Finalized)
cancel()
s.Emit(finality.FinalizeL1Event{FinalizedL1: newL1Finalized})
reqStep() // we may be able to mark more L2 data as finalized now
case <-s.sched.NextDelayedStep():
s.Emit(StepAttemptEvent{})
......@@ -397,7 +396,6 @@ type SyncDeriver struct {
Finalizer Finalizer
SafeHeadNotifs rollup.SafeHeadListener // notified when safe head is updated
lastNotifiedSafeHead eth.L2BlockRef
CLSync CLSync
......@@ -428,8 +426,11 @@ func (s *SyncDeriver) OnEvent(ev rollup.Event) {
s.onStepEvent()
case rollup.ResetEvent:
s.onResetEvent(x)
case rollup.L1TemporaryErrorEvent:
s.Log.Warn("L1 temporary error", "err", x.Err)
s.Emitter.Emit(StepReqEvent{})
case rollup.EngineTemporaryErrorEvent:
s.Log.Warn("Derivation process temporary error", "err", x.Err)
s.Log.Warn("Engine temporary error", "err", x.Err)
// Make sure that for any temporarily failed attributes we retry processing.
s.Emitter.Emit(engine.PendingSafeRequestEvent{})
......@@ -445,6 +446,19 @@ func (s *SyncDeriver) OnEvent(ev rollup.Event) {
// If there is more data to process,
// continue derivation quickly
s.Emitter.Emit(StepReqEvent{ResetBackoff: true})
case engine.SafeDerivedEvent:
s.onSafeDerivedBlock(x)
}
}
func (s *SyncDeriver) onSafeDerivedBlock(x engine.SafeDerivedEvent) {
if s.SafeHeadNotifs != nil && s.SafeHeadNotifs.Enabled() {
if err := s.SafeHeadNotifs.SafeHeadUpdated(x.Safe, x.DerivedFrom.ID()); err != nil {
// At this point our state is in a potentially inconsistent state as we've updated the safe head
// in the execution client but failed to post process it. Reset the pipeline so the safe head rolls back
// a little (it always rolls back at least 1 block) and then it will retry storing the entry
s.Emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("safe head notifications failed: %w", err)})
}
}
}
......@@ -457,7 +471,7 @@ func (s *SyncDeriver) onEngineConfirmedReset(x engine.EngineResetConfirmedEvent)
s.Log.Error("Failed to warn safe-head notifier of safe-head reset", "safe", x.Safe)
return
}
if s.SafeHeadNotifs.Enabled() && x.Safe.Number == s.Config.Genesis.L2.Number && x.Safe.Hash == s.Config.Genesis.L2.Hash {
if s.SafeHeadNotifs.Enabled() && x.Safe.ID() == s.Config.Genesis.L2 {
// The rollup genesis block is always safe by definition. So if the pipeline resets this far back we know
// we will process all safe head updates and can record genesis as always safe from L1 genesis.
// Note that it is not safe to use cfg.Genesis.L1 here as it is the block immediately before the L2 genesis
......@@ -483,7 +497,7 @@ func (s *SyncDeriver) onStepEvent() {
// where some things are triggered through events, and some through this synchronous step function.
// We just translate the results into their equivalent events,
// to merge the error-handling with that of the new event-based system.
err := s.SyncStep(s.Ctx)
err := s.SyncStep()
if err != nil && errors.Is(err, derive.EngineELSyncing) {
s.Log.Debug("Derivation process went idle because the engine is syncing", "unsafe_head", s.Engine.UnsafeL2Head(), "err", err)
s.Emitter.Emit(ResetStepBackoffEvent{})
......@@ -504,14 +518,13 @@ func (s *SyncDeriver) onStepEvent() {
func (s *SyncDeriver) onResetEvent(x rollup.ResetEvent) {
// If the system corrupts, e.g. due to a reorg, simply reset it
s.Log.Warn("Deriver system is resetting", "err", x.Err)
s.Finalizer.Reset()
s.Emitter.Emit(StepReqEvent{})
s.Emitter.Emit(engine.ResetEngineRequestEvent{})
}
// SyncStep performs the sequence of encapsulated syncing steps.
// Warning: this sequence will be broken apart as outlined in op-node derivers design doc.
func (s *SyncDeriver) SyncStep(ctx context.Context) error {
func (s *SyncDeriver) SyncStep() error {
if err := s.Drain(); err != nil {
return err
}
......@@ -533,25 +546,6 @@ func (s *SyncDeriver) SyncStep(ctx context.Context) error {
// Any now processed forkchoice updates will trigger CL-sync payload processing, if any payload is queued up.
derivationOrigin := s.Derivation.Origin()
if s.SafeHeadNotifs != nil && s.SafeHeadNotifs.Enabled() && s.Derivation.DerivationReady() &&
s.lastNotifiedSafeHead != s.Engine.SafeL2Head() {
s.lastNotifiedSafeHead = s.Engine.SafeL2Head()
// make sure we track the last L2 safe head for every new L1 block
if err := s.SafeHeadNotifs.SafeHeadUpdated(s.lastNotifiedSafeHead, derivationOrigin.ID()); err != nil {
// At this point our state is in a potentially inconsistent state as we've updated the safe head
// in the execution client but failed to post process it. Reset the pipeline so the safe head rolls back
// a little (it always rolls back at least 1 block) and then it will retry storing the entry
return derive.NewResetError(fmt.Errorf("safe head notifications failed: %w", err))
}
}
s.Finalizer.PostProcessSafeL2(s.Engine.SafeL2Head(), derivationOrigin)
// try to finalize the L2 blocks we have synced so far (no-op if L1 finality is behind)
if err := s.Finalizer.OnDerivationL1End(ctx, derivationOrigin); err != nil {
return fmt.Errorf("finalizer OnDerivationL1End error: %w", err)
}
// Since we don't force attributes to be processed at this point,
// we cannot safely directly trigger the derivation, as that may generate new attributes that
// conflict with what attributes have not been applied yet.
......
......@@ -64,12 +64,23 @@ func (ev PendingSafeUpdateEvent) String() string {
type PromotePendingSafeEvent struct {
Ref eth.L2BlockRef
Safe bool
DerivedFrom eth.L1BlockRef
}
func (ev PromotePendingSafeEvent) String() string {
return "promote-pending-safe"
}
// SafeDerivedEvent signals that a block was determined to be safe, and derived from the given L1 block
type SafeDerivedEvent struct {
Safe eth.L2BlockRef
DerivedFrom eth.L1BlockRef
}
func (ev SafeDerivedEvent) String() string {
return "safe-derived"
}
type ProcessAttributesEvent struct {
Attributes *derive.AttributesWithParent
}
......@@ -123,6 +134,15 @@ func (ev EngineResetConfirmedEvent) String() string {
return "engine-reset-confirmed"
}
// PromoteFinalizedEvent signals that a block can be marked as finalized.
type PromoteFinalizedEvent struct {
Ref eth.L2BlockRef
}
func (ev PromoteFinalizedEvent) String() string {
return "promote-finalized"
}
type EngDeriver struct {
log log.Logger
cfg *rollup.Config
......@@ -217,7 +237,7 @@ func (d *EngDeriver) OnEvent(ev rollup.Event) {
log.Debug("Reset of Engine is completed",
"safeHead", x.Safe, "unsafe", x.Unsafe, "safe_timestamp", x.Safe.Time,
"unsafe_timestamp", x.Unsafe.Time)
d.emitter.Emit(EngineResetConfirmedEvent{})
d.emitter.Emit(EngineResetConfirmedEvent(x))
case ProcessAttributesEvent:
d.onForceNextSafeAttributes(x.Attributes)
case PendingSafeRequestEvent:
......@@ -233,7 +253,18 @@ func (d *EngDeriver) OnEvent(ev rollup.Event) {
}
if x.Safe && x.Ref.Number > d.ec.SafeL2Head().Number {
d.ec.SetSafeHead(x.Ref)
d.emitter.Emit(SafeDerivedEvent{Safe: x.Ref, DerivedFrom: x.DerivedFrom})
}
case PromoteFinalizedEvent:
if x.Ref.Number < d.ec.Finalized().Number {
d.log.Error("Cannot rewind finality,", "ref", x.Ref, "finalized", d.ec.Finalized())
return
}
if x.Ref.Number > d.ec.SafeL2Head().Number {
d.log.Error("Block must be safe before it can be finalized", "ref", x.Ref, "safe", d.ec.SafeL2Head())
return
}
d.ec.SetFinalizedHead(x.Ref)
}
}
......@@ -301,6 +332,7 @@ func (eq *EngDeriver) onForceNextSafeAttributes(attributes *derive.AttributesWit
eq.ec.SetPendingSafeL2Head(ref)
if attributes.IsLastInSpan {
eq.ec.SetSafeHead(ref)
eq.emitter.Emit(SafeDerivedEvent{Safe: ref, DerivedFrom: attributes.DerivedFrom})
}
eq.emitter.Emit(PendingSafeUpdateEvent{
PendingSafe: eq.ec.PendingSafeL2Head(),
......
......@@ -20,6 +20,21 @@ func (fn EmitterFunc) Emit(ev Event) {
fn(ev)
}
// L1TemporaryErrorEvent identifies a temporary issue with the L1 data.
type L1TemporaryErrorEvent struct {
Err error
}
var _ Event = L1TemporaryErrorEvent{}
func (ev L1TemporaryErrorEvent) String() string {
return "l1-temporary-error"
}
// EngineTemporaryErrorEvent identifies a temporary processing issue.
// It applies to both L1 and L2 data, often inter-related.
// This scope will be reduced over time, to only capture L2-engine specific temporary errors.
// See L1TemporaryErrorEvent for L1 related temporary errors.
type EngineTemporaryErrorEvent struct {
Err error
}
......
......@@ -4,11 +4,13 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -68,10 +70,17 @@ type Finalizer struct {
log log.Logger
ctx context.Context
emitter rollup.EventEmitter
// finalizedL1 is the currently perceived finalized L1 block.
// This may be ahead of the current traversed origin when syncing.
finalizedL1 eth.L1BlockRef
// lastFinalizedL2 maintains how far we finalized, so we don't have to emit re-attempts.
lastFinalizedL2 eth.L2BlockRef
// triedFinalizeAt tracks at which L1 block number we last tried to finalize during sync.
triedFinalizeAt uint64
......@@ -82,20 +91,19 @@ type Finalizer struct {
finalityLookback uint64
l1Fetcher FinalizerL1Interface
ec FinalizerEngine
}
func NewFinalizer(log log.Logger, cfg *rollup.Config, l1Fetcher FinalizerL1Interface, ec FinalizerEngine) *Finalizer {
func NewFinalizer(ctx context.Context, log log.Logger, cfg *rollup.Config, l1Fetcher FinalizerL1Interface, emitter rollup.EventEmitter) *Finalizer {
lookback := calcFinalityLookback(cfg)
return &Finalizer{
ctx: ctx,
log: log,
finalizedL1: eth.L1BlockRef{},
triedFinalizeAt: 0,
finalityData: make([]FinalityData, 0, lookback),
finalityLookback: lookback,
l1Fetcher: l1Fetcher,
ec: ec,
emitter: emitter,
}
}
......@@ -108,8 +116,39 @@ func (fi *Finalizer) FinalizedL1() (out eth.L1BlockRef) {
return
}
// Finalize applies a L1 finality signal, without any fork-choice or L2 state changes.
func (fi *Finalizer) Finalize(ctx context.Context, l1Origin eth.L1BlockRef) {
type FinalizeL1Event struct {
FinalizedL1 eth.L1BlockRef
}
func (ev FinalizeL1Event) String() string {
return "finalized-l1"
}
type TryFinalizeEvent struct{}
func (ev TryFinalizeEvent) String() string {
return "try-finalize"
}
func (fi *Finalizer) OnEvent(ev rollup.Event) {
switch x := ev.(type) {
case FinalizeL1Event:
fi.onL1Finalized(x.FinalizedL1)
case engine.SafeDerivedEvent:
fi.onDerivedSafeBlock(x.Safe, x.DerivedFrom)
case derive.DeriverIdleEvent:
fi.onDerivationIdle(x.Origin)
case rollup.ResetEvent:
fi.onReset()
case TryFinalizeEvent:
fi.tryFinalize()
case engine.ForkchoiceUpdateEvent:
fi.lastFinalizedL2 = x.FinalizedL2Head
}
}
// onL1Finalized applies a L1 finality signal
func (fi *Finalizer) onL1Finalized(l1Origin eth.L1BlockRef) {
fi.mu.Lock()
defer fi.mu.Unlock()
prevFinalizedL1 := fi.finalizedL1
......@@ -127,13 +166,11 @@ func (fi *Finalizer) Finalize(ctx context.Context, l1Origin eth.L1BlockRef) {
fi.finalizedL1 = l1Origin
}
// remnant of finality in EngineQueue: the finalization work does not inherit a context from the caller.
if err := fi.tryFinalize(ctx); err != nil {
fi.log.Warn("received L1 finalization signal, but was unable to determine and apply L2 finality", "err", err)
}
// when the L1 change we can suggest to try to finalize, as the pre-condition for L2 finality has now changed
fi.emitter.Emit(TryFinalizeEvent{})
}
// OnDerivationL1End is called when a L1 block has been fully exhausted (i.e. no more L2 blocks to derive from).
// onDerivationIdle is called when the pipeline is exhausted of new data (i.e. no more L2 blocks to derive from).
//
// Since finality applies to all L2 blocks fully derived from the same block,
// it optimal to only check after the derivation from the L1 block has been exhausted.
......@@ -141,24 +178,27 @@ func (fi *Finalizer) Finalize(ctx context.Context, l1Origin eth.L1BlockRef) {
// This will look at what has been buffered so far,
// sanity-check we are on the finalizing L1 chain,
// and finalize any L2 blocks that were fully derived from known finalized L1 blocks.
func (fi *Finalizer) OnDerivationL1End(ctx context.Context, derivedFrom eth.L1BlockRef) error {
func (fi *Finalizer) onDerivationIdle(derivedFrom eth.L1BlockRef) {
fi.mu.Lock()
defer fi.mu.Unlock()
if fi.finalizedL1 == (eth.L1BlockRef{}) {
return nil // if no L1 information is finalized yet, then skip this
return // if no L1 information is finalized yet, then skip this
}
// If we recently tried finalizing, then don't try again just yet, but traverse more of L1 first.
if fi.triedFinalizeAt != 0 && derivedFrom.Number <= fi.triedFinalizeAt+finalityDelay {
return nil
return
}
fi.log.Info("processing L1 finality information", "l1_finalized", fi.finalizedL1, "derived_from", derivedFrom, "previous", fi.triedFinalizeAt)
fi.log.Debug("processing L1 finality information", "l1_finalized", fi.finalizedL1, "derived_from", derivedFrom, "previous", fi.triedFinalizeAt)
fi.triedFinalizeAt = derivedFrom.Number
return fi.tryFinalize(ctx)
fi.emitter.Emit(TryFinalizeEvent{})
}
func (fi *Finalizer) tryFinalize(ctx context.Context) error {
// default to keep the same finalized block
finalizedL2 := fi.ec.Finalized()
func (fi *Finalizer) tryFinalize() {
fi.mu.Lock()
defer fi.mu.Unlock()
// overwritten if we finalize
finalizedL2 := fi.lastFinalizedL2 // may be zeroed if nothing was finalized since startup.
var finalizedDerivedFrom eth.BlockID
// go through the latest inclusion data, and find the last L2 block that was derived from a finalized L1 block
for _, fd := range fi.finalityData {
......@@ -169,37 +209,41 @@ func (fi *Finalizer) tryFinalize(ctx context.Context) error {
}
}
if finalizedDerivedFrom != (eth.BlockID{}) {
ctx, cancel := context.WithTimeout(fi.ctx, time.Second*10)
defer cancel()
// Sanity check the finality signal of L1.
// Even though the signal is trusted and we do the below check also,
// the signal itself has to be canonical to proceed.
// TODO(#10724): This check could be removed if the finality signal is fully trusted, and if tests were more flexible for this case.
signalRef, err := fi.l1Fetcher.L1BlockRefByNumber(ctx, fi.finalizedL1.Number)
if err != nil {
return derive.NewTemporaryError(fmt.Errorf("failed to check if on finalizing L1 chain, could not fetch block %d: %w", fi.finalizedL1.Number, err))
fi.emitter.Emit(rollup.L1TemporaryErrorEvent{Err: fmt.Errorf("failed to check if on finalizing L1 chain, could not fetch block %d: %w", fi.finalizedL1.Number, err)})
return
}
if signalRef.Hash != fi.finalizedL1.Hash {
return derive.NewResetError(fmt.Errorf("need to reset, we assumed %s is finalized, but canonical chain is %s", fi.finalizedL1, signalRef))
fi.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("need to reset, we assumed %s is finalized, but canonical chain is %s", fi.finalizedL1, signalRef)})
return
}
// Sanity check we are indeed on the finalizing chain, and not stuck on something else.
// We assume that the block-by-number query is consistent with the previously received finalized chain signal
derivedRef, err := fi.l1Fetcher.L1BlockRefByNumber(ctx, finalizedDerivedFrom.Number)
if err != nil {
return derive.NewTemporaryError(fmt.Errorf("failed to check if on finalizing L1 chain, could not fetch block %d: %w", finalizedDerivedFrom.Number, err))
fi.emitter.Emit(rollup.L1TemporaryErrorEvent{Err: fmt.Errorf("failed to check if on finalizing L1 chain, could not fetch block %d: %w", finalizedDerivedFrom.Number, err)})
return
}
if derivedRef.Hash != finalizedDerivedFrom.Hash {
return derive.NewResetError(fmt.Errorf("need to reset, we are on %s, not on the finalizing L1 chain %s (towards %s)",
finalizedDerivedFrom, derivedRef, fi.finalizedL1))
fi.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("need to reset, we are on %s, not on the finalizing L1 chain %s (towards %s)",
finalizedDerivedFrom, derivedRef, fi.finalizedL1)})
return
}
fi.ec.SetFinalizedHead(finalizedL2)
fi.emitter.Emit(engine.PromoteFinalizedEvent{Ref: finalizedL2})
}
return nil
}
// PostProcessSafeL2 buffers the L1 block the safe head was fully derived from,
// onDerivedSafeBlock buffers the L1 block the safe head was fully derived from,
// to finalize it once the derived-from L1 block, or a later L1 block, finalizes.
func (fi *Finalizer) PostProcessSafeL2(l2Safe eth.L2BlockRef, derivedFrom eth.L1BlockRef) {
func (fi *Finalizer) onDerivedSafeBlock(l2Safe eth.L2BlockRef, derivedFrom eth.L1BlockRef) {
fi.mu.Lock()
defer fi.mu.Unlock()
// remember the last L2 block that we fully derived from the given finality data
......@@ -225,9 +269,9 @@ func (fi *Finalizer) PostProcessSafeL2(l2Safe eth.L2BlockRef, derivedFrom eth.L1
}
}
// Reset clears the recent history of safe-L2 blocks used for finalization,
// onReset clears the recent history of safe-L2 blocks used for finalization,
// to avoid finalizing any reorged-out L2 blocks.
func (fi *Finalizer) Reset() {
func (fi *Finalizer) onReset() {
fi.mu.Lock()
defer fi.mu.Unlock()
fi.finalityData = fi.finalityData[:0]
......
This diff is collapsed.
......@@ -26,17 +26,17 @@ type PlasmaFinalizer struct {
backend PlasmaBackend
}
func NewPlasmaFinalizer(log log.Logger, cfg *rollup.Config,
l1Fetcher FinalizerL1Interface, ec FinalizerEngine,
func NewPlasmaFinalizer(ctx context.Context, log log.Logger, cfg *rollup.Config,
l1Fetcher FinalizerL1Interface, emitter rollup.EventEmitter,
backend PlasmaBackend) *PlasmaFinalizer {
inner := NewFinalizer(log, cfg, l1Fetcher, ec)
inner := NewFinalizer(ctx, log, cfg, l1Fetcher, emitter)
// In alt-da mode, the finalization signal is proxied through the plasma manager.
// Finality signal will come from the DA contract or L1 finality whichever is last.
// The plasma module will then call the inner.Finalize function when applicable.
backend.OnFinalizedHeadSignal(func(ref eth.L1BlockRef) {
inner.Finalize(context.Background(), ref) // plasma backend context passing can be improved
inner.OnEvent(FinalizeL1Event{FinalizedL1: ref})
})
return &PlasmaFinalizer{
......@@ -45,6 +45,11 @@ func NewPlasmaFinalizer(log log.Logger, cfg *rollup.Config,
}
}
func (fi *PlasmaFinalizer) Finalize(ctx context.Context, l1Origin eth.L1BlockRef) {
fi.backend.Finalize(l1Origin)
func (fi *PlasmaFinalizer) OnEvent(ev rollup.Event) {
switch x := ev.(type) {
case FinalizeL1Event:
fi.backend.Finalize(x.FinalizedL1)
default:
fi.Finalizer.OnEvent(ev)
}
}
......@@ -11,6 +11,8 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -83,9 +85,6 @@ func TestPlasmaFinalityData(t *testing.T) {
SequenceNumber: 1,
}
ec := &fakeEngine{}
ec.SetFinalizedHead(refA1)
// Simulate plasma finality by waiting for the finalized-inclusion
// of a commitment to turn into undisputed finalized data.
commitmentInclusionFinalized := eth.L1BlockRef{}
......@@ -95,7 +94,9 @@ func TestPlasmaFinalityData(t *testing.T) {
},
forwardTo: nil,
}
fi := NewPlasmaFinalizer(logger, cfg, l1F, ec, plasmaBackend)
emitter := &testutils.MockEmitter{}
fi := NewPlasmaFinalizer(context.Background(), logger, cfg, l1F, emitter, plasmaBackend)
require.NotNil(t, plasmaBackend.forwardTo, "plasma backend must have access to underlying standard finalizer")
require.Equal(t, expFinalityLookback, cap(fi.finalityData))
......@@ -107,7 +108,9 @@ func TestPlasmaFinalityData(t *testing.T) {
// and post processing.
for i := uint64(0); i < 200; i++ {
if i == 10 { // finalize a L1 commitment
fi.Finalize(context.Background(), l1parent)
fi.OnEvent(FinalizeL1Event{FinalizedL1: l1parent})
emitter.AssertExpectations(t) // no events emitted upon L1 finality
require.Equal(t, l1parent, commitmentInclusionFinalized, "plasma backend received L1 signal")
}
previous := l1parent
......@@ -127,24 +130,56 @@ func TestPlasmaFinalityData(t *testing.T) {
L1Origin: previous.ID(), // reference previous origin, not the block the batch was included in
SequenceNumber: j,
}
fi.PostProcessSafeL2(l2parent, l1parent)
fi.OnEvent(engine.SafeDerivedEvent{Safe: l2parent, DerivedFrom: l1parent})
emitter.AssertExpectations(t)
}
require.NoError(t, fi.OnDerivationL1End(context.Background(), l1parent))
plasmaFinalization := commitmentInclusionFinalized.Number + cfg.PlasmaConfig.DAChallengeWindow
if i == plasmaFinalization {
// might trigger finalization attempt, if expired finality delay
emitter.ExpectMaybeRun(func(ev rollup.Event) {
require.IsType(t, TryFinalizeEvent{}, ev)
})
fi.OnEvent(derive.DeriverIdleEvent{})
emitter.AssertExpectations(t)
// clear expectations
emitter.Mock.ExpectedCalls = nil
// no L2 finalize event, as no L1 finality signal has been forwarded by plasma backend yet
fi.OnEvent(TryFinalizeEvent{})
emitter.AssertExpectations(t)
// Pretend to be the plasma backend,
// send the original finalization signal to the underlying finalizer,
// now that we are sure the commitment itself is not just finalized,
// but the referenced data cannot be disputed anymore.
plasmaFinalization := commitmentInclusionFinalized.Number + cfg.PlasmaConfig.DAChallengeWindow
if commitmentInclusionFinalized != (eth.L1BlockRef{}) && l1parent.Number == plasmaFinalization {
// When the signal is forwarded, a finalization attempt will be scheduled
emitter.ExpectOnce(TryFinalizeEvent{})
plasmaBackend.forwardTo(commitmentInclusionFinalized)
}
// The next time OnDerivationL1End is called, after the finality signal was triggered by plasma backend,
// we should have a finalized L2 block.
// The L1 origin of the simulated L2 blocks lags 1 behind the block the L2 block is included in on L1.
emitter.AssertExpectations(t)
require.Equal(t, commitmentInclusionFinalized, fi.finalizedL1, "finality signal now made its way in regular finalizer")
// As soon as a finalization attempt is made, after the finality signal was triggered by plasma backend,
// we should get an attempt to get a finalized L2 block.
// In this test the L1 origin of the simulated L2 blocks lags 1 behind the block the L2 block is included in on L1.
// So to check the L2 finality progress, we check if the next L1 block after the L1 origin
// of the safe block matches that of the finalized L1 block.
if i == plasmaFinalization+1 {
require.Equal(t, plasmaFinalization, ec.Finalized().L1Origin.Number+1)
l1F.ExpectL1BlockRefByNumber(commitmentInclusionFinalized.Number, commitmentInclusionFinalized, nil)
l1F.ExpectL1BlockRefByNumber(commitmentInclusionFinalized.Number, commitmentInclusionFinalized, nil)
var finalizedL2 eth.L2BlockRef
emitter.ExpectOnceRun(func(ev rollup.Event) {
if x, ok := ev.(engine.PromoteFinalizedEvent); ok {
finalizedL2 = x.Ref
} else {
t.Fatalf("expected L2 finalization, but got: %s", ev)
}
})
fi.OnEvent(TryFinalizeEvent{})
l1F.AssertExpectations(t)
emitter.AssertExpectations(t)
require.Equal(t, commitmentInclusionFinalized.Number, finalizedL2.L1Origin.Number+1)
// Confirm finalization, so there will be no repeats of the PromoteFinalizedEvent
fi.OnEvent(engine.ForkchoiceUpdateEvent{FinalizedL2Head: finalizedL2})
emitter.AssertExpectations(t)
}
}
......
......@@ -68,6 +68,9 @@ func (d *ProgramDeriver) OnEvent(ev rollup.Event) {
case rollup.ResetEvent:
d.closing = true
d.result = fmt.Errorf("unexpected reset error: %w", x.Err)
case rollup.L1TemporaryErrorEvent:
d.closing = true
d.result = fmt.Errorf("unexpected L1 error: %w", x.Err)
case rollup.EngineTemporaryErrorEvent:
// (Legacy case): While most temporary errors are due to requests for external data failing which can't happen,
// they may also be returned due to other events like channels timing out so need to be handled
......
......@@ -120,8 +120,16 @@ func TestProgramDeriver(t *testing.T) {
require.True(t, p.closing)
require.NotNil(t, p.result)
})
// on temporary error: continue derivation.
t.Run("temp error event", func(t *testing.T) {
// on L1 temporary error: stop with error
t.Run("L1 temporary error event", func(t *testing.T) {
p, m := newProgram(t, 1000)
p.OnEvent(rollup.L1TemporaryErrorEvent{Err: errors.New("temp test err")})
m.AssertExpectations(t)
require.True(t, p.closing)
require.NotNil(t, p.result)
})
// on engine temporary error: continue derivation (because legacy, not all connection related)
t.Run("engine temp error event", func(t *testing.T) {
p, m := newProgram(t, 1000)
m.ExpectOnce(engine.PendingSafeRequestEvent{})
p.OnEvent(rollup.EngineTemporaryErrorEvent{Err: errors.New("temp test err")})
......
......@@ -18,10 +18,22 @@ func (m *MockEmitter) ExpectOnce(expected rollup.Event) {
m.Mock.On("Emit", expected).Once()
}
func (m *MockEmitter) ExpectMaybeRun(fn func(ev rollup.Event)) {
m.Mock.On("Emit", mock.Anything).Maybe().Run(func(args mock.Arguments) {
fn(args.Get(0).(rollup.Event))
})
}
func (m *MockEmitter) ExpectOnceType(typ string) {
m.Mock.On("Emit", mock.AnythingOfType(typ)).Once()
}
func (m *MockEmitter) ExpectOnceRun(fn func(ev rollup.Event)) {
m.Mock.On("Emit", mock.Anything).Once().Run(func(args mock.Arguments) {
fn(args.Get(0).(rollup.Event))
})
}
func (m *MockEmitter) AssertExpectations(t mock.TestingT) {
m.Mock.AssertExpectations(t)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment