Commit d975a70e authored by protolambda's avatar protolambda Committed by GitHub

op-node: cleanup driver, closer to removing stepping (#11464)

* op-node: cleanup driver, closer to removing stepping

* op-node: add comment about event Drain error

* op-node: fix lint

* op-node: handle drain errors
parent 32eea1b6
......@@ -340,7 +340,7 @@ func (s *SyncDeriver) AttachEmitter(em event.Emitter) {
func (s *SyncDeriver) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case StepEvent:
s.onStepEvent()
s.SyncStep()
case rollup.ResetEvent:
s.onResetEvent(x)
case rollup.L1TemporaryErrorEvent:
......@@ -409,31 +409,6 @@ func (s *SyncDeriver) onEngineConfirmedReset(x engine.EngineResetConfirmedEvent)
s.Emitter.Emit(derive.ConfirmPipelineResetEvent{})
}
func (s *SyncDeriver) onStepEvent() {
s.Log.Debug("Sync process step")
// Note: while we refactor the SyncStep to be entirely event-based we have an intermediate phase
// where some things are triggered through events, and some through this synchronous step function.
// We just translate the results into their equivalent events,
// to merge the error-handling with that of the new event-based system.
err := s.SyncStep()
if err != nil && errors.Is(err, derive.EngineELSyncing) {
s.Log.Debug("Derivation process went idle because the engine is syncing", "unsafe_head", s.Engine.UnsafeL2Head(), "err", err)
s.Emitter.Emit(ResetStepBackoffEvent{})
} else if err != nil && errors.Is(err, derive.ErrReset) {
s.Emitter.Emit(rollup.ResetEvent{Err: err})
} else if err != nil && errors.Is(err, derive.ErrTemporary) {
s.Emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: err})
} else if err != nil && errors.Is(err, derive.ErrCritical) {
s.Emitter.Emit(rollup.CriticalErrorEvent{Err: err})
} else if err != nil {
s.Log.Error("Derivation process error", "err", err)
s.Emitter.Emit(StepReqEvent{})
} else {
// Revisit SyncStep in 1/2 of a L2 block.
s.Emitter.Emit(StepDelayedReqEvent{Delay: (time.Duration(s.Config.BlockTime) * time.Second) / 2})
}
}
func (s *SyncDeriver) onResetEvent(x rollup.ResetEvent) {
// If the system corrupts, e.g. due to a reorg, simply reset it
s.Log.Warn("Deriver system is resetting", "err", x.Err)
......@@ -443,24 +418,42 @@ func (s *SyncDeriver) onResetEvent(x rollup.ResetEvent) {
// SyncStep performs the sequence of encapsulated syncing steps.
// Warning: this sequence will be broken apart as outlined in op-node derivers design doc.
func (s *SyncDeriver) SyncStep() error {
if err := s.Drain(); err != nil {
return err
func (s *SyncDeriver) SyncStep() {
s.Log.Debug("Sync process step")
drain := func() (ok bool) {
if err := s.Drain(); err != nil {
if errors.Is(err, context.Canceled) {
return false
} else {
s.Emitter.Emit(rollup.CriticalErrorEvent{
Err: fmt.Errorf("unexpected error on SyncStep event Drain: %w", err)})
return false
}
}
return true
}
if !drain() {
return
}
s.Emitter.Emit(engine.TryBackupUnsafeReorgEvent{})
if err := s.Drain(); err != nil {
return err
if !drain() {
return
}
s.Emitter.Emit(engine.TryUpdateEngineEvent{})
if err := s.Drain(); err != nil {
return err
if !drain() {
return
}
if s.Engine.IsEngineSyncing() {
// The pipeline cannot move forwards if doing EL sync.
return derive.EngineELSyncing
s.Log.Debug("Rollup driver is backing off because execution engine is syncing.",
"unsafe_head", s.Engine.UnsafeL2Head())
s.Emitter.Emit(ResetStepBackoffEvent{})
return
}
// Any now processed forkchoice updates will trigger CL-sync payload processing, if any payload is queued up.
......@@ -472,7 +465,6 @@ func (s *SyncDeriver) SyncStep() error {
// Upon the pending-safe signal the attributes deriver can then ask the pipeline
// to generate new attributes, if no attributes are known already.
s.Emitter.Emit(engine.PendingSafeRequestEvent{})
return nil
}
// ResetDerivationPipeline forces a reset of the derivation pipeline.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment