Commit 72d57dc7 authored by protolambda's avatar protolambda Committed by GitHub

op-node: wrap derivation pipeline with event deriver (#10962)

parent 9e68cb0e
......@@ -104,6 +104,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
attributesHandler := attributes.NewAttributesHandler(log, cfg, ec, eng)
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, metrics)
pipelineDeriver := derive.NewPipelineDeriver(ctx, pipeline, synchronousEvents)
syncDeriver := &driver.SyncDeriver{
Derivation: pipeline,
......@@ -149,6 +150,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
engDeriv,
rollupNode,
clSync,
pipelineDeriver,
}
t.Cleanup(rollupNode.rpc.Stop)
......@@ -298,7 +300,7 @@ func (s *L2Verifier) OnEvent(ev rollup.Event) {
s.log.Warn("Derivation pipeline is being reset", "err", x.Err)
case rollup.CriticalErrorEvent:
panic(fmt.Errorf("derivation failed critically: %w", x.Err))
case driver.DeriverIdleEvent:
case derive.DeriverIdleEvent:
s.l2PipelineIdle = true
}
}
......
package derive
import (
"context"
"errors"
"io"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
type DeriverIdleEvent struct{}
func (d DeriverIdleEvent) String() string {
return "derivation-idle"
}
type DeriverMoreEvent struct{}
func (d DeriverMoreEvent) String() string {
return "deriver-more"
}
type ConfirmPipelineResetEvent struct{}
func (d ConfirmPipelineResetEvent) String() string {
return "confirm-pipeline-reset"
}
// DerivedAttributesEvent is emitted when new attributes are available to apply to the engine.
type DerivedAttributesEvent struct {
Attributes *AttributesWithParent
}
func (ev DerivedAttributesEvent) String() string {
return "derived-attributes"
}
type PipelineStepEvent struct {
PendingSafe eth.L2BlockRef
}
func (ev PipelineStepEvent) String() string {
return "pipeline-step"
}
type PipelineDeriver struct {
pipeline *DerivationPipeline
ctx context.Context
emitter rollup.EventEmitter
}
func NewPipelineDeriver(ctx context.Context, pipeline *DerivationPipeline, emitter rollup.EventEmitter) *PipelineDeriver {
return &PipelineDeriver{
pipeline: pipeline,
ctx: ctx,
emitter: emitter,
}
}
func (d *PipelineDeriver) OnEvent(ev rollup.Event) {
switch x := ev.(type) {
case rollup.ResetEvent:
d.pipeline.Reset()
case PipelineStepEvent:
d.pipeline.log.Trace("Derivation pipeline step", "onto_origin", d.pipeline.Origin())
attrib, err := d.pipeline.Step(d.ctx, x.PendingSafe)
if err == io.EOF {
d.pipeline.log.Debug("Derivation process went idle", "progress", d.pipeline.Origin(), "err", err)
d.emitter.Emit(DeriverIdleEvent{})
} else if err != nil && errors.Is(err, EngineELSyncing) {
d.pipeline.log.Debug("Derivation process went idle because the engine is syncing", "progress", d.pipeline.Origin(), "err", err)
d.emitter.Emit(DeriverIdleEvent{})
} else if err != nil && errors.Is(err, ErrReset) {
d.emitter.Emit(rollup.ResetEvent{Err: err})
} else if err != nil && errors.Is(err, ErrTemporary) {
d.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: err})
} else if err != nil && errors.Is(err, ErrCritical) {
d.emitter.Emit(rollup.CriticalErrorEvent{Err: err})
} else if err != nil && errors.Is(err, NotEnoughData) {
// don't do a backoff for this error
d.emitter.Emit(DeriverMoreEvent{})
} else if err != nil {
d.pipeline.log.Error("Derivation process error", "err", err)
d.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: err})
} else {
if attrib != nil {
d.emitter.Emit(DerivedAttributesEvent{Attributes: attrib})
} else {
d.emitter.Emit(DeriverMoreEvent{}) // continue with the next step if we can
}
}
case ConfirmPipelineResetEvent:
d.pipeline.ConfirmEngineReset()
}
}
......@@ -193,6 +193,7 @@ func NewDriver(
attributesHandler := attributes.NewAttributesHandler(log, cfg, ec, l2)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, plasma, l2, metrics)
pipelineDeriver := derive.NewPipelineDeriver(driverCtx, derivationPipeline, synchronousEvents)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
meteredEngine := NewMeteredEngine(cfg, ec, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics.
sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics)
......@@ -252,6 +253,7 @@ func NewDriver(
schedDeriv,
driver,
clSync,
pipelineDeriver,
}
return driver
......
......@@ -269,7 +269,7 @@ func (s *Driver) eventLoop() {
// so, we don't need to receive the payload here
_, err := s.sequencer.RunNextSequencerAction(s.driverCtx, s.asyncGossiper, s.sequencerConductor)
if errors.Is(err, derive.ErrReset) {
s.Derivation.Reset()
s.Emitter.Emit(rollup.ResetEvent{})
} else if err != nil {
s.log.Error("Sequencer critical error", "err", err)
return
......@@ -436,6 +436,16 @@ func (s *SyncDeriver) OnEvent(ev rollup.Event) {
s.Emitter.Emit(StepReqEvent{})
case engine.EngineResetConfirmedEvent:
s.onEngineConfirmedReset(x)
case derive.DeriverIdleEvent:
// Once derivation is idle the system is healthy
// and we can wait for new inputs. No backoff necessary.
s.Emitter.Emit(ResetStepBackoffEvent{})
case derive.DeriverMoreEvent:
// If there is more data to process,
// continue derivation quickly
s.Emitter.Emit(StepReqEvent{ResetBackoff: true})
case derive.DerivedAttributesEvent:
s.AttributesHandler.SetAttributes(x.Attributes)
}
}
......@@ -465,22 +475,18 @@ func (s *SyncDeriver) onEngineConfirmedReset(x engine.EngineResetConfirmedEvent)
}
}
}
s.Derivation.ConfirmEngineReset()
s.Emitter.Emit(derive.ConfirmPipelineResetEvent{})
}
func (s *SyncDeriver) onStepEvent() {
s.Log.Debug("Sync process step", "onto_origin", s.Derivation.Origin())
s.Log.Debug("Sync process step")
// Note: while we refactor the SyncStep to be entirely event-based we have an intermediate phase
// where some things are triggered through events, and some through this synchronous step function.
// We just translate the results into their equivalent events,
// to merge the error-handling with that of the new event-based system.
err := s.SyncStep(s.Ctx)
if err == io.EOF {
s.Log.Debug("Derivation process went idle", "progress", s.Derivation.Origin(), "err", err)
s.Emitter.Emit(ResetStepBackoffEvent{})
s.Emitter.Emit(DeriverIdleEvent{})
} else if err != nil && errors.Is(err, derive.EngineELSyncing) {
s.Log.Debug("Derivation process went idle because the engine is syncing", "progress", s.Derivation.Origin(), "unsafe_head", s.Engine.UnsafeL2Head(), "err", err)
if err != nil && errors.Is(err, derive.EngineELSyncing) {
s.Log.Debug("Derivation process went idle because the engine is syncing", "unsafe_head", s.Engine.UnsafeL2Head(), "err", err)
s.Emitter.Emit(ResetStepBackoffEvent{})
} else if err != nil && errors.Is(err, derive.ErrReset) {
s.Emitter.Emit(rollup.ResetEvent{Err: err})
......@@ -488,9 +494,6 @@ func (s *SyncDeriver) onStepEvent() {
s.Emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: err})
} else if err != nil && errors.Is(err, derive.ErrCritical) {
s.Emitter.Emit(rollup.CriticalErrorEvent{Err: err})
} else if err != nil && errors.Is(err, derive.NotEnoughData) {
// don't do a backoff for this error
s.Emitter.Emit(StepReqEvent{ResetBackoff: true})
} else if err != nil {
s.Log.Error("Derivation process error", "err", err)
s.Emitter.Emit(StepReqEvent{})
......@@ -500,20 +503,13 @@ func (s *SyncDeriver) onStepEvent() {
}
func (s *SyncDeriver) onResetEvent(x rollup.ResetEvent) {
// If the pipeline corrupts, e.g. due to a reorg, simply reset it
s.Log.Warn("Derivation pipeline is reset", "err", x.Err)
s.Derivation.Reset()
// If the system corrupts, e.g. due to a reorg, simply reset it
s.Log.Warn("Deriver system is resetting", "err", x.Err)
s.Finalizer.Reset()
s.Emitter.Emit(StepReqEvent{})
s.Emitter.Emit(engine.ResetEngineRequestEvent{})
}
type DeriverIdleEvent struct{}
func (d DeriverIdleEvent) String() string {
return "derivation-idle"
}
// SyncStep performs the sequence of encapsulated syncing steps.
// Warning: this sequence will be broken apart as outlined in op-node derivers design doc.
func (s *SyncDeriver) SyncStep(ctx context.Context) error {
......@@ -562,12 +558,7 @@ func (s *SyncDeriver) SyncStep(ctx context.Context) error {
return fmt.Errorf("finalizer OnDerivationL1End error: %w", err)
}
attr, err := s.Derivation.Step(ctx, s.Engine.PendingSafeL2Head())
if err != nil {
return err
}
s.AttributesHandler.SetAttributes(attr)
s.Emitter.Emit(derive.PipelineStepEvent{PendingSafe: s.Engine.PendingSafeL2Head()})
return nil
}
......@@ -711,7 +702,6 @@ func (s *Driver) snapshot(event string) {
s.snapshotLog.Info("Rollup State Snapshot",
"event", event,
"l1Head", deferJSONString{s.l1State.L1Head()},
"l1Current", deferJSONString{s.Derivation.Origin()},
"l2Head", deferJSONString{s.Engine.UnsafeL2Head()},
"l2Safe", deferJSONString{s.Engine.SafeL2Head()},
"l2FinalizedHead", deferJSONString{s.Engine.Finalized()})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment