Commit 2d0e83a9 authored by protolambda's avatar protolambda Committed by GitHub

op-node: reset engine through events (#10961)

parent 258a480d
...@@ -90,6 +90,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri ...@@ -90,6 +90,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
metrics := &testutils.TestDerivationMetrics{} metrics := &testutils.TestDerivationMetrics{}
ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents) ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents)
engineResetDeriver := engine.NewEngineResetDeriver(ctx, log, cfg, l1, eng, syncCfg, synchronousEvents)
clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents) clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents)
...@@ -144,6 +145,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri ...@@ -144,6 +145,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
*rootDeriver = rollup.SynchronousDerivers{ *rootDeriver = rollup.SynchronousDerivers{
syncDeriver, syncDeriver,
engineResetDeriver,
engDeriv, engDeriv,
rollupNode, rollupNode,
clSync, clSync,
......
...@@ -181,6 +181,7 @@ func NewDriver( ...@@ -181,6 +181,7 @@ func NewDriver(
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth) findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1) verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
ec := engine.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents) ec := engine.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents)
engineResetDeriver := engine.NewEngineResetDeriver(driverCtx, log, cfg, l1, l2, syncCfg, synchronousEvents)
clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents) clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents)
var finalizer Finalizer var finalizer Finalizer
...@@ -246,6 +247,7 @@ func NewDriver( ...@@ -246,6 +247,7 @@ func NewDriver(
*rootDeriver = []rollup.Deriver{ *rootDeriver = []rollup.Deriver{
syncDeriver, syncDeriver,
engineResetDeriver,
engDeriv, engDeriv,
schedDeriv, schedDeriv,
driver, driver,
......
...@@ -434,9 +434,40 @@ func (s *SyncDeriver) OnEvent(ev rollup.Event) { ...@@ -434,9 +434,40 @@ func (s *SyncDeriver) OnEvent(ev rollup.Event) {
case rollup.EngineTemporaryErrorEvent: case rollup.EngineTemporaryErrorEvent:
s.Log.Warn("Derivation process temporary error", "err", x.Err) s.Log.Warn("Derivation process temporary error", "err", x.Err)
s.Emitter.Emit(StepReqEvent{}) s.Emitter.Emit(StepReqEvent{})
case engine.EngineResetConfirmedEvent:
s.onEngineConfirmedReset(x)
} }
} }
func (s *SyncDeriver) onEngineConfirmedReset(x engine.EngineResetConfirmedEvent) {
// If the listener update fails, we return,
// and don't confirm the engine-reset with the derivation pipeline.
// The pipeline will re-trigger a reset as necessary.
if s.SafeHeadNotifs != nil {
if err := s.SafeHeadNotifs.SafeHeadReset(x.Safe); err != nil {
s.Log.Error("Failed to warn safe-head notifier of safe-head reset", "safe", x.Safe)
return
}
if s.SafeHeadNotifs.Enabled() && x.Safe.Number == s.Config.Genesis.L2.Number && x.Safe.Hash == s.Config.Genesis.L2.Hash {
// The rollup genesis block is always safe by definition. So if the pipeline resets this far back we know
// we will process all safe head updates and can record genesis as always safe from L1 genesis.
// Note that it is not safe to use cfg.Genesis.L1 here as it is the block immediately before the L2 genesis
// but the contracts may have been deployed earlier than that, allowing creating a dispute game
// with a L1 head prior to cfg.Genesis.L1
l1Genesis, err := s.L1.L1BlockRefByNumber(s.Ctx, 0)
if err != nil {
s.Log.Error("Failed to retrieve L1 genesis, cannot notify genesis as safe block", "err", err)
return
}
if err := s.SafeHeadNotifs.SafeHeadUpdated(x.Safe, l1Genesis.ID()); err != nil {
s.Log.Error("Failed to notify safe-head listener of safe-head", "err", err)
return
}
}
}
s.Derivation.ConfirmEngineReset()
}
func (s *SyncDeriver) onStepEvent() { func (s *SyncDeriver) onStepEvent() {
s.Log.Debug("Sync process step", "onto_origin", s.Derivation.Origin()) s.Log.Debug("Sync process step", "onto_origin", s.Derivation.Origin())
// Note: while we refactor the SyncStep to be entirely event-based we have an intermediate phase // Note: while we refactor the SyncStep to be entirely event-based we have an intermediate phase
...@@ -474,12 +505,7 @@ func (s *SyncDeriver) onResetEvent(x rollup.ResetEvent) { ...@@ -474,12 +505,7 @@ func (s *SyncDeriver) onResetEvent(x rollup.ResetEvent) {
s.Derivation.Reset() s.Derivation.Reset()
s.Finalizer.Reset() s.Finalizer.Reset()
s.Emitter.Emit(StepReqEvent{}) s.Emitter.Emit(StepReqEvent{})
if err := engine.ResetEngine(s.Ctx, s.Log, s.Config, s.Engine, s.L1, s.L2, s.SyncCfg, s.SafeHeadNotifs); err != nil { s.Emitter.Emit(engine.ResetEngineRequestEvent{})
s.Log.Error("Derivation pipeline not ready, failed to reset engine", "err", err)
// Derivation-pipeline will return a new ResetError until we confirm the engine has been successfully reset.
return
}
s.Derivation.ConfirmEngineReset()
} }
type DeriverIdleEvent struct{} type DeriverIdleEvent struct{}
......
...@@ -7,72 +7,54 @@ import ( ...@@ -7,72 +7,54 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
) )
type ResetL2 interface { // ResetEngineRequestEvent requests the EngineResetDeriver to walk
sync.L2Chain // the L2 chain backwards until it finds a plausible unsafe head,
derive.SystemConfigL2Fetcher // and find an L2 safe block that is guaranteed to still be from the L1 chain.
} type ResetEngineRequestEvent struct{}
type ResetEngineControl interface { func (ev ResetEngineRequestEvent) String() string {
SetUnsafeHead(eth.L2BlockRef) return "reset-engine-request"
SetSafeHead(eth.L2BlockRef) }
SetFinalizedHead(eth.L2BlockRef)
SetBackupUnsafeL2Head(block eth.L2BlockRef, triggerReorg bool) type EngineResetDeriver struct {
SetPendingSafeL2Head(eth.L2BlockRef) ctx context.Context
log log.Logger
cfg *rollup.Config
l1 sync.L1Chain
l2 sync.L2Chain
syncCfg *sync.Config
ResetBuildingState() emitter rollup.EventEmitter
} }
// ResetEngine walks the L2 chain backwards until it finds a plausible unsafe head, func NewEngineResetDeriver(ctx context.Context, log log.Logger, cfg *rollup.Config,
// and an L2 safe block that is guaranteed to still be from the L1 chain. l1 sync.L1Chain, l2 sync.L2Chain, syncCfg *sync.Config, emitter rollup.EventEmitter) *EngineResetDeriver {
func ResetEngine(ctx context.Context, log log.Logger, cfg *rollup.Config, ec ResetEngineControl, l1 sync.L1Chain, l2 ResetL2, syncCfg *sync.Config, safeHeadNotifs rollup.SafeHeadListener) error { return &EngineResetDeriver{
result, err := sync.FindL2Heads(ctx, cfg, l1, l2, log, syncCfg) ctx: ctx,
if err != nil { log: log,
return derive.NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err)) cfg: cfg,
} l1: l1,
finalized, safe, unsafe := result.Finalized, result.Safe, result.Unsafe l2: l2,
l1Origin, err := l1.L1BlockRefByHash(ctx, safe.L1Origin.Hash) syncCfg: syncCfg,
if err != nil { emitter: emitter,
return derive.NewTemporaryError(fmt.Errorf("failed to fetch the new L1 progress: origin: %v; err: %w", safe.L1Origin, err))
}
if safe.Time < l1Origin.Time {
return derive.NewResetError(fmt.Errorf("cannot reset block derivation to start at L2 block %s with time %d older than its L1 origin %s with time %d, time invariant is broken",
safe, safe.Time, l1Origin, l1Origin.Time))
} }
}
ec.SetUnsafeHead(unsafe) func (d *EngineResetDeriver) OnEvent(ev rollup.Event) {
ec.SetSafeHead(safe) switch ev.(type) {
ec.SetPendingSafeL2Head(safe) case ResetEngineRequestEvent:
ec.SetFinalizedHead(finalized) result, err := sync.FindL2Heads(d.ctx, d.cfg, d.l1, d.l2, d.log, d.syncCfg)
ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false) if err != nil {
ec.ResetBuildingState() d.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("failed to find the L2 Heads to start from: %w", err)})
return
log.Debug("Reset of Engine is completed", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time,
"unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin)
if safeHeadNotifs != nil {
if err := safeHeadNotifs.SafeHeadReset(safe); err != nil {
return err
}
if safeHeadNotifs.Enabled() && safe.Number == cfg.Genesis.L2.Number && safe.Hash == cfg.Genesis.L2.Hash {
// The rollup genesis block is always safe by definition. So if the pipeline resets this far back we know
// we will process all safe head updates and can record genesis as always safe from L1 genesis.
// Note that it is not safe to use cfg.Genesis.L1 here as it is the block immediately before the L2 genesis
// but the contracts may have been deployed earlier than that, allowing creating a dispute game
// with a L1 head prior to cfg.Genesis.L1
l1Genesis, err := l1.L1BlockRefByNumber(ctx, 0)
if err != nil {
return fmt.Errorf("failed to retrieve L1 genesis: %w", err)
}
if err := safeHeadNotifs.SafeHeadUpdated(safe, l1Genesis.ID()); err != nil {
return err
}
} }
d.emitter.Emit(ForceEngineResetEvent{
Unsafe: result.Unsafe,
Safe: result.Safe,
Finalized: result.Finalized,
})
} }
return nil
} }
...@@ -61,6 +61,22 @@ func (ev TryUpdateEngineEvent) String() string { ...@@ -61,6 +61,22 @@ func (ev TryUpdateEngineEvent) String() string {
return "try-update-engine" return "try-update-engine"
} }
type ForceEngineResetEvent struct {
Unsafe, Safe, Finalized eth.L2BlockRef
}
func (ev ForceEngineResetEvent) String() string {
return "force-engine-reset"
}
type EngineResetConfirmedEvent struct {
Unsafe, Safe, Finalized eth.L2BlockRef
}
func (ev EngineResetConfirmedEvent) String() string {
return "engine-reset-confirmed"
}
type EngDeriver struct { type EngDeriver struct {
log log.Logger log log.Logger
cfg *rollup.Config cfg *rollup.Config
...@@ -146,5 +162,34 @@ func (d *EngDeriver) OnEvent(ev rollup.Event) { ...@@ -146,5 +162,34 @@ func (d *EngDeriver) OnEvent(ev rollup.Event) {
SafeL2Head: d.ec.SafeL2Head(), SafeL2Head: d.ec.SafeL2Head(),
FinalizedL2Head: d.ec.Finalized(), FinalizedL2Head: d.ec.Finalized(),
}) })
case ForceEngineResetEvent:
ForceEngineReset(d.ec, x)
// Time to apply the changes to the underlying engine
d.emitter.Emit(TryUpdateEngineEvent{})
log.Debug("Reset of Engine is completed",
"safeHead", x.Safe, "unsafe", x.Unsafe, "safe_timestamp", x.Safe.Time,
"unsafe_timestamp", x.Unsafe.Time)
d.emitter.Emit(EngineResetConfirmedEvent{})
} }
} }
type ResetEngineControl interface {
SetUnsafeHead(eth.L2BlockRef)
SetSafeHead(eth.L2BlockRef)
SetFinalizedHead(eth.L2BlockRef)
SetBackupUnsafeL2Head(block eth.L2BlockRef, triggerReorg bool)
SetPendingSafeL2Head(eth.L2BlockRef)
ResetBuildingState()
}
// ForceEngineReset is not to be used. The op-program needs it for now, until event processing is adopted there.
func ForceEngineReset(ec ResetEngineControl, x ForceEngineResetEvent) {
ec.SetUnsafeHead(x.Unsafe)
ec.SetSafeHead(x.Safe)
ec.SetPendingSafeL2Head(x.Safe)
ec.SetFinalizedHead(x.Finalized)
ec.SetBackupUnsafeL2Head(eth.L2BlockRef{}, false)
ec.ResetBuildingState()
}
...@@ -66,9 +66,18 @@ func (d *MinimalSyncDeriver) SyncStep(ctx context.Context) error { ...@@ -66,9 +66,18 @@ func (d *MinimalSyncDeriver) SyncStep(ctx context.Context) error {
if err := d.engine.TryUpdateEngine(ctx); !errors.Is(err, engine.ErrNoFCUNeeded) { if err := d.engine.TryUpdateEngine(ctx); !errors.Is(err, engine.ErrNoFCUNeeded) {
return err return err
} }
if err := engine.ResetEngine(ctx, d.logger, d.cfg, d.engine, d.l1Source, d.l2Source, d.syncCfg, nil); err != nil { // The below two calls emulate ResetEngine, without event-processing.
return err // This will be omitted after op-program adopts events, and the deriver code is used instead.
result, err := sync.FindL2Heads(ctx, d.cfg, d.l1Source, d.l2Source, d.logger, d.syncCfg)
if err != nil {
// not really a temporary error in this context, but preserves old ResetEngine behavior.
return derive.NewTemporaryError(fmt.Errorf("failed to determine starting point: %w", err))
} }
engine.ForceEngineReset(d.engine, engine.ForceEngineResetEvent{
Unsafe: result.Unsafe,
Safe: result.Safe,
Finalized: result.Finalized,
})
d.pipeline.ConfirmEngineReset() d.pipeline.ConfirmEngineReset()
d.initialResetDone = true d.initialResetDone = true
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment