Commit 19d7b721 authored by protolambda's avatar protolambda Committed by GitHub

op-node: implement event emitter/handler derivers, support first few Engine events (#10783)

* op-node: driver now uses event processing

* op-node: deriver event processing review fixes
parent 9147c6e9
...@@ -34,7 +34,7 @@ func (m *MockL1OriginSelector) FindL1Origin(ctx context.Context, l2Head eth.L2Bl ...@@ -34,7 +34,7 @@ func (m *MockL1OriginSelector) FindL1Origin(ctx context.Context, l2Head eth.L2Bl
// L2Sequencer is an actor that functions like a rollup node, // L2Sequencer is an actor that functions like a rollup node,
// without the full P2P/API/Node stack, but just the derivation state, and simplified driver with sequencing ability. // without the full P2P/API/Node stack, but just the derivation state, and simplified driver with sequencing ability.
type L2Sequencer struct { type L2Sequencer struct {
L2Verifier *L2Verifier
sequencer *driver.Sequencer sequencer *driver.Sequencer
...@@ -52,7 +52,7 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc deri ...@@ -52,7 +52,7 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc deri
actual: driver.NewL1OriginSelector(log, cfg, seqConfDepthL1), actual: driver.NewL1OriginSelector(log, cfg, seqConfDepthL1),
} }
return &L2Sequencer{ return &L2Sequencer{
L2Verifier: *ver, L2Verifier: ver,
sequencer: driver.NewSequencer(log, cfg, ver.engine, attrBuilder, l1OriginSelector, metrics.NoopMetrics), sequencer: driver.NewSequencer(log, cfg, ver.engine, attrBuilder, l1OriginSelector, metrics.NoopMetrics),
mockL1OriginSelector: l1OriginSelector, mockL1OriginSelector: l1OriginSelector,
failL2GossipUnsafeBlock: nil, failL2GossipUnsafeBlock: nil,
......
...@@ -3,13 +3,14 @@ package actions ...@@ -3,13 +3,14 @@ package actions
import ( import (
"context" "context"
"errors" "errors"
"io" "fmt"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
gnode "github.com/ethereum/go-ethereum/node" gnode "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/node" "github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -22,6 +23,7 @@ import ( ...@@ -22,6 +23,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/safego"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testutils" "github.com/ethereum-optimism/optimism/op-service/testutils"
) )
...@@ -59,6 +61,10 @@ type L2Verifier struct { ...@@ -59,6 +61,10 @@ type L2Verifier struct {
rpc *rpc.Server rpc *rpc.Server
failRPC error // mock error failRPC error // mock error
// The L2Verifier actor is embedded in the L2Sequencer actor,
// but must not be copied for the deriver-functionality to modify the same state.
_ safego.NoCopy
} }
type L2API interface { type L2API interface {
...@@ -77,39 +83,57 @@ type safeDB interface { ...@@ -77,39 +83,57 @@ type safeDB interface {
func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, plasmaSrc driver.PlasmaIface, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier { func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc derive.L1BlobsFetcher, plasmaSrc driver.PlasmaIface, eng L2API, cfg *rollup.Config, syncCfg *sync.Config, safeHeadListener safeDB) *L2Verifier {
metrics := &testutils.TestDerivationMetrics{} metrics := &testutils.TestDerivationMetrics{}
engine := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode) ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode)
clSync := clsync.NewCLSync(log, cfg, metrics, engine) clSync := clsync.NewCLSync(log, cfg, metrics, ec)
var finalizer driver.Finalizer var finalizer driver.Finalizer
if cfg.PlasmaEnabled() { if cfg.PlasmaEnabled() {
finalizer = finality.NewPlasmaFinalizer(log, cfg, l1, engine, plasmaSrc) finalizer = finality.NewPlasmaFinalizer(log, cfg, l1, ec, plasmaSrc)
} else { } else {
finalizer = finality.NewFinalizer(log, cfg, l1, engine) finalizer = finality.NewFinalizer(log, cfg, l1, ec)
} }
attributesHandler := attributes.NewAttributesHandler(log, cfg, engine, eng) attributesHandler := attributes.NewAttributesHandler(log, cfg, ec, eng)
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, metrics) pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, metrics)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
rootDeriver := &rollup.SynchronousDerivers{}
synchronousEvents := driver.NewSynchronousEvents(log, ctx, rootDeriver)
syncDeriver := &driver.SyncDeriver{
Derivation: pipeline,
Finalizer: finalizer,
AttributesHandler: attributesHandler,
SafeHeadNotifs: safeHeadListener,
CLSync: clSync,
Engine: ec,
SyncCfg: syncCfg,
Config: cfg,
L1: l1,
L2: eng,
Emitter: synchronousEvents,
Log: log,
Ctx: ctx,
Drain: synchronousEvents.Drain,
}
engDeriv := engine.NewEngDeriver(log, ctx, cfg, ec, synchronousEvents)
rollupNode := &L2Verifier{ rollupNode := &L2Verifier{
log: log, log: log,
eng: eng, eng: eng,
engine: engine, engine: ec,
clSync: clSync, clSync: clSync,
derivation: pipeline, derivation: pipeline,
finalizer: finalizer, finalizer: finalizer,
attributesHandler: attributesHandler, attributesHandler: attributesHandler,
safeHeadListener: safeHeadListener, safeHeadListener: safeHeadListener,
syncCfg: syncCfg, syncCfg: syncCfg,
syncDeriver: &driver.SyncDeriver{ syncDeriver: syncDeriver,
Derivation: pipeline,
Finalizer: finalizer,
AttributesHandler: attributesHandler,
SafeHeadNotifs: safeHeadListener,
CLSync: clSync,
Engine: engine,
},
l1: l1, l1: l1,
l1State: driver.NewL1State(log, metrics), l1State: driver.NewL1State(log, metrics),
l2PipelineIdle: true, l2PipelineIdle: true,
...@@ -117,6 +141,13 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri ...@@ -117,6 +141,13 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
rollupCfg: cfg, rollupCfg: cfg,
rpc: rpc.NewServer(), rpc: rpc.NewServer(),
} }
*rootDeriver = rollup.SynchronousDerivers{
syncDeriver,
engDeriv,
rollupNode,
}
t.Cleanup(rollupNode.rpc.Stop) t.Cleanup(rollupNode.rpc.Stop)
// setup RPC server for rollup node, hooked to the actor as backend // setup RPC server for rollup node, hooked to the actor as backend
...@@ -253,9 +284,20 @@ func (s *L2Verifier) ActL1FinalizedSignal(t Testing) { ...@@ -253,9 +284,20 @@ func (s *L2Verifier) ActL1FinalizedSignal(t Testing) {
s.finalizer.Finalize(t.Ctx(), finalized) s.finalizer.Finalize(t.Ctx(), finalized)
} }
// syncStep represents the Driver.syncStep func (s *L2Verifier) OnEvent(ev rollup.Event) {
func (s *L2Verifier) syncStep(ctx context.Context) error { switch x := ev.(type) {
return s.syncDeriver.SyncStep(ctx) case rollup.EngineTemporaryErrorEvent:
s.log.Warn("Derivation process temporary error", "err", x.Err)
if errors.Is(x.Err, sync.WrongChainErr) { // action-tests don't back off on temporary errors. Avoid a bad genesis setup from looping.
panic(fmt.Errorf("genesis setup issue: %w", x.Err))
}
case rollup.ResetEvent:
s.log.Warn("Derivation pipeline is being reset", "err", x.Err)
case rollup.CriticalErrorEvent:
panic(fmt.Errorf("derivation failed critically: %w", x.Err))
case driver.DeriverIdleEvent:
s.l2PipelineIdle = true
}
} }
// ActL2PipelineStep runs one iteration of the L2 derivation pipeline // ActL2PipelineStep runs one iteration of the L2 derivation pipeline
...@@ -264,41 +306,21 @@ func (s *L2Verifier) ActL2PipelineStep(t Testing) { ...@@ -264,41 +306,21 @@ func (s *L2Verifier) ActL2PipelineStep(t Testing) {
t.InvalidAction("cannot derive new data while building L2 block") t.InvalidAction("cannot derive new data while building L2 block")
return return
} }
s.syncDeriver.Emitter.Emit(driver.StepEvent{})
err := s.syncStep(t.Ctx()) require.NoError(t, s.syncDeriver.Drain(), "complete all event processing triggered by deriver step")
if err == io.EOF || (err != nil && errors.Is(err, derive.EngineELSyncing)) {
s.l2PipelineIdle = true
return
} else if err != nil && errors.Is(err, derive.NotEnoughData) {
return
} else if err != nil && errors.Is(err, derive.ErrReset) {
s.log.Warn("Derivation pipeline is reset", "err", err)
s.derivation.Reset()
if err := engine.ResetEngine(t.Ctx(), s.log, s.rollupCfg, s.engine, s.l1, s.eng, s.syncCfg, s.safeHeadListener); err != nil {
s.log.Error("Derivation pipeline not ready, failed to reset engine", "err", err)
// Derivation-pipeline will return a new ResetError until we confirm the engine has been successfully reset.
return
}
s.derivation.ConfirmEngineReset()
return
} else if err != nil && errors.Is(err, derive.ErrTemporary) {
s.log.Warn("Derivation process temporary error", "err", err)
if errors.Is(err, sync.WrongChainErr) { // action-tests don't back off on temporary errors. Avoid a bad genesis setup from looping.
t.Fatalf("genesis setup issue: %v", err)
}
return
} else if err != nil && errors.Is(err, derive.ErrCritical) {
t.Fatalf("derivation failed critically: %v", err)
} else if err != nil {
t.Fatalf("derivation failed: %v", err)
} else {
return
}
} }
func (s *L2Verifier) ActL2PipelineFull(t Testing) { func (s *L2Verifier) ActL2PipelineFull(t Testing) {
s.l2PipelineIdle = false s.l2PipelineIdle = false
i := 0
for !s.l2PipelineIdle { for !s.l2PipelineIdle {
i += 1
// Some tests do generate a lot of derivation steps
// (e.g. thousand blocks span-batch, or deep reorgs).
// Hence we set the sanity limit to something really high.
if i > 10_000 {
t.Fatalf("ActL2PipelineFull running for too long. Is a deriver looping?")
}
s.ActL2PipelineStep(t) s.ActL2PipelineStep(t)
} }
} }
......
...@@ -22,6 +22,7 @@ type Metrics interface { ...@@ -22,6 +22,7 @@ type Metrics interface {
RecordFrame() RecordFrame()
RecordDerivedBatches(batchType string) RecordDerivedBatches(batchType string)
SetDerivationIdle(idle bool) SetDerivationIdle(idle bool)
RecordPipelineReset()
} }
type L1Fetcher interface { type L1Fetcher interface {
...@@ -195,6 +196,8 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl ...@@ -195,6 +196,8 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl
func (dp *DerivationPipeline) initialReset(ctx context.Context, resetL2Safe eth.L2BlockRef) error { func (dp *DerivationPipeline) initialReset(ctx context.Context, resetL2Safe eth.L2BlockRef) error {
dp.log.Info("Rewinding derivation-pipeline L1 traversal to handle reset") dp.log.Info("Rewinding derivation-pipeline L1 traversal to handle reset")
dp.metrics.RecordPipelineReset()
// Walk back L2 chain to find the L1 origin that is old enough to start buffering channel data from. // Walk back L2 chain to find the L1 origin that is old enough to start buffering channel data from.
pipelineL2 := resetL2Safe pipelineL2 := resetL2Safe
l1Origin := resetL2Safe.L1Origin l1Origin := resetL2Safe.L1Origin
......
...@@ -178,48 +178,62 @@ func NewDriver( ...@@ -178,48 +178,62 @@ func NewDriver(
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth) findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1) verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1)
engine := engine.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode) ec := engine.NewEngineController(l2, log, metrics, cfg, syncCfg.SyncMode)
clSync := clsync.NewCLSync(log, cfg, metrics, engine) clSync := clsync.NewCLSync(log, cfg, metrics, ec)
var finalizer Finalizer var finalizer Finalizer
if cfg.PlasmaEnabled() { if cfg.PlasmaEnabled() {
finalizer = finality.NewPlasmaFinalizer(log, cfg, l1, engine, plasma) finalizer = finality.NewPlasmaFinalizer(log, cfg, l1, ec, plasma)
} else { } else {
finalizer = finality.NewFinalizer(log, cfg, l1, engine) finalizer = finality.NewFinalizer(log, cfg, l1, ec)
} }
attributesHandler := attributes.NewAttributesHandler(log, cfg, engine, l2) attributesHandler := attributes.NewAttributesHandler(log, cfg, ec, l2)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, plasma, l2, metrics) derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, plasma, l2, metrics)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics. meteredEngine := NewMeteredEngine(cfg, ec, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics.
sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics) sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics)
driverCtx, driverCancel := context.WithCancel(context.Background()) driverCtx, driverCancel := context.WithCancel(context.Background())
asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics) asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics)
return &Driver{
l1State: l1State, rootDeriver := &rollup.SynchronousDerivers{}
SyncDeriver: &SyncDeriver{ synchronousEvents := NewSynchronousEvents(log, driverCtx, rootDeriver)
syncDeriver := &SyncDeriver{
Derivation: derivationPipeline, Derivation: derivationPipeline,
Finalizer: finalizer, Finalizer: finalizer,
AttributesHandler: attributesHandler, AttributesHandler: attributesHandler,
SafeHeadNotifs: safeHeadListener, SafeHeadNotifs: safeHeadListener,
CLSync: clSync, CLSync: clSync,
Engine: engine, Engine: ec,
}, SyncCfg: syncCfg,
Config: cfg,
L1: l1,
L2: l2,
Emitter: synchronousEvents,
Log: log,
Ctx: driverCtx,
Drain: synchronousEvents.Drain,
}
engDeriv := engine.NewEngDeriver(log, driverCtx, cfg, ec, synchronousEvents)
schedDeriv := NewStepSchedulingDeriver(log, synchronousEvents)
driver := &Driver{
l1State: l1State,
SyncDeriver: syncDeriver,
sched: schedDeriv,
synchronousEvents: synchronousEvents,
stateReq: make(chan chan struct{}), stateReq: make(chan chan struct{}),
forceReset: make(chan chan struct{}, 10), forceReset: make(chan chan struct{}, 10),
startSequencer: make(chan hashAndErrorChannel, 10), startSequencer: make(chan hashAndErrorChannel, 10),
stopSequencer: make(chan chan hashAndError, 10), stopSequencer: make(chan chan hashAndError, 10),
sequencerActive: make(chan chan bool, 10), sequencerActive: make(chan chan bool, 10),
sequencerNotifs: sequencerStateListener, sequencerNotifs: sequencerStateListener,
config: cfg,
syncCfg: syncCfg,
driverConfig: driverCfg, driverConfig: driverCfg,
driverCtx: driverCtx, driverCtx: driverCtx,
driverCancel: driverCancel, driverCancel: driverCancel,
log: log, log: log,
snapshotLog: snapshotLog, snapshotLog: snapshotLog,
l1: l1,
l2: l2,
sequencer: sequencer, sequencer: sequencer,
network: network, network: network,
metrics: metrics, metrics: metrics,
...@@ -231,4 +245,13 @@ func NewDriver( ...@@ -231,4 +245,13 @@ func NewDriver(
asyncGossiper: asyncGossiper, asyncGossiper: asyncGossiper,
sequencerConductor: sequencerConductor, sequencerConductor: sequencerConductor,
} }
*rootDeriver = []rollup.Deriver{
syncDeriver,
engDeriv,
schedDeriv,
driver,
}
return driver
} }
...@@ -20,7 +20,6 @@ import ( ...@@ -20,7 +20,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/retry"
) )
var ( var (
...@@ -39,6 +38,10 @@ type Driver struct { ...@@ -39,6 +38,10 @@ type Driver struct {
*SyncDeriver *SyncDeriver
sched *StepSchedulingDeriver
synchronousEvents *SynchronousEvents
// Requests to block the event loop for synchronous execution to avoid reading an inconsistent state // Requests to block the event loop for synchronous execution to avoid reading an inconsistent state
stateReq chan chan struct{} stateReq chan chan struct{}
...@@ -62,17 +65,11 @@ type Driver struct { ...@@ -62,17 +65,11 @@ type Driver struct {
// sequencerNotifs is notified when the sequencer is started or stopped // sequencerNotifs is notified when the sequencer is started or stopped
sequencerNotifs SequencerStateListener sequencerNotifs SequencerStateListener
// Rollup config: rollup chain configuration
config *rollup.Config
sequencerConductor conductor.SequencerConductor sequencerConductor conductor.SequencerConductor
// Driver config: verifier and sequencer settings // Driver config: verifier and sequencer settings
driverConfig *Config driverConfig *Config
// Sync Mod Config
syncCfg *sync.Config
// L1 Signals: // L1 Signals:
// //
// Not all L1 blocks, or all changes, have to be signalled: // Not all L1 blocks, or all changes, have to be signalled:
...@@ -94,8 +91,6 @@ type Driver struct { ...@@ -94,8 +91,6 @@ type Driver struct {
unsafeL2Payloads chan *eth.ExecutionPayloadEnvelope unsafeL2Payloads chan *eth.ExecutionPayloadEnvelope
l1 L1Chain
l2 L2Chain
sequencer SequencerIface sequencer SequencerIface
network Network // may be nil, network for is optional network Network // may be nil, network for is optional
...@@ -191,39 +186,9 @@ func (s *Driver) eventLoop() { ...@@ -191,39 +186,9 @@ func (s *Driver) eventLoop() {
defer s.driverCancel() defer s.driverCancel()
// stepReqCh is used to request that the driver attempts to step forward by one L1 block.
stepReqCh := make(chan struct{}, 1)
// channel, nil by default (not firing), but used to schedule re-attempts with delay
var delayedStepReq <-chan time.Time
// keep track of consecutive failed attempts, to adjust the backoff time accordingly
bOffStrategy := retry.Exponential()
stepAttempts := 0
// step requests a derivation step to be taken. Won't deadlock if the channel is full.
step := func() {
select {
case stepReqCh <- struct{}{}:
// Don't deadlock if the channel is already full
default:
}
}
// reqStep requests a derivation step nicely, with a delay if this is a reattempt, or not at all if we already scheduled a reattempt. // reqStep requests a derivation step nicely, with a delay if this is a reattempt, or not at all if we already scheduled a reattempt.
reqStep := func() { reqStep := func() {
if stepAttempts > 0 { s.Emit(StepReqEvent{})
// if this is not the first attempt, we re-schedule with a backoff, *without blocking other events*
if delayedStepReq == nil {
delay := bOffStrategy.Duration(stepAttempts)
s.log.Debug("scheduling re-attempt with delay", "attempts", stepAttempts, "delay", delay)
delayedStepReq = time.After(delay)
} else {
s.log.Debug("ignoring step request, already scheduled re-attempt after previous failure", "attempts", stepAttempts)
}
} else {
step()
}
} }
// We call reqStep right away to finish syncing to the tip of the chain if we're behind. // We call reqStep right away to finish syncing to the tip of the chain if we're behind.
...@@ -244,7 +209,7 @@ func (s *Driver) eventLoop() { ...@@ -244,7 +209,7 @@ func (s *Driver) eventLoop() {
// Create a ticker to check if there is a gap in the engine queue. Whenever // Create a ticker to check if there is a gap in the engine queue. Whenever
// there is, we send requests to sync source to retrieve the missing payloads. // there is, we send requests to sync source to retrieve the missing payloads.
syncCheckInterval := time.Duration(s.config.BlockTime) * time.Second * 2 syncCheckInterval := time.Duration(s.Config.BlockTime) * time.Second * 2
altSyncTicker := time.NewTicker(syncCheckInterval) altSyncTicker := time.NewTicker(syncCheckInterval)
defer altSyncTicker.Stop() defer altSyncTicker.Stop()
lastUnsafeL2 := s.Engine.UnsafeL2Head() lastUnsafeL2 := s.Engine.UnsafeL2Head()
...@@ -254,6 +219,15 @@ func (s *Driver) eventLoop() { ...@@ -254,6 +219,15 @@ func (s *Driver) eventLoop() {
return return
} }
// While event-processing is synchronous we have to drain
// (i.e. process all queued-up events) before creating any new events.
if err := s.synchronousEvents.Drain(); err != nil {
if s.driverCtx.Err() != nil {
return
}
s.log.Error("unexpected error from event-draining", "err", err)
}
// If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action. // If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action.
// This may adjust at any time based on fork-choice changes or previous errors. // This may adjust at any time based on fork-choice changes or previous errors.
// And avoid sequencing if the derivation pipeline indicates the engine is not ready. // And avoid sequencing if the derivation pipeline indicates the engine is not ready.
...@@ -311,13 +285,13 @@ func (s *Driver) eventLoop() { ...@@ -311,13 +285,13 @@ func (s *Driver) eventLoop() {
case envelope := <-s.unsafeL2Payloads: case envelope := <-s.unsafeL2Payloads:
s.snapshot("New unsafe payload") s.snapshot("New unsafe payload")
// If we are doing CL sync or done with engine syncing, fallback to the unsafe payload queue & CL P2P sync. // If we are doing CL sync or done with engine syncing, fallback to the unsafe payload queue & CL P2P sync.
if s.syncCfg.SyncMode == sync.CLSync || !s.Engine.IsEngineSyncing() { if s.SyncCfg.SyncMode == sync.CLSync || !s.Engine.IsEngineSyncing() {
s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", envelope.ExecutionPayload.ID()) s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", envelope.ExecutionPayload.ID())
s.CLSync.AddUnsafePayload(envelope) s.CLSync.AddUnsafePayload(envelope)
s.metrics.RecordReceivedUnsafePayload(envelope) s.metrics.RecordReceivedUnsafePayload(envelope)
reqStep() reqStep()
} else if s.syncCfg.SyncMode == sync.ELSync { } else if s.SyncCfg.SyncMode == sync.ELSync {
ref, err := derive.PayloadToBlockRef(s.config, envelope.ExecutionPayload) ref, err := derive.PayloadToBlockRef(s.Config, envelope.ExecutionPayload)
if err != nil { if err != nil {
s.log.Info("Failed to turn execution payload into a block ref", "id", envelope.ExecutionPayload.ID(), "err", err) s.log.Info("Failed to turn execution payload into a block ref", "id", envelope.ExecutionPayload.ID(), "err", err)
continue continue
...@@ -342,58 +316,10 @@ func (s *Driver) eventLoop() { ...@@ -342,58 +316,10 @@ func (s *Driver) eventLoop() {
s.Finalizer.Finalize(ctx, newL1Finalized) s.Finalizer.Finalize(ctx, newL1Finalized)
cancel() cancel()
reqStep() // we may be able to mark more L2 data as finalized now reqStep() // we may be able to mark more L2 data as finalized now
case <-delayedStepReq: case <-s.sched.NextDelayedStep():
delayedStepReq = nil s.Emit(StepAttemptEvent{})
step() case <-s.sched.NextStep():
case <-stepReqCh: s.Emit(StepAttemptEvent{})
// Don't start the derivation pipeline until we are done with EL sync
if s.Engine.IsEngineSyncing() {
continue
}
s.log.Debug("Sync process step", "onto_origin", s.Derivation.Origin(), "attempts", stepAttempts)
err := s.SyncStep(s.driverCtx)
stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress.
if err == io.EOF {
s.log.Debug("Derivation process went idle", "progress", s.Derivation.Origin(), "err", err)
stepAttempts = 0
continue
} else if err != nil && errors.Is(err, derive.EngineELSyncing) {
s.log.Debug("Derivation process went idle because the engine is syncing", "progress", s.Derivation.Origin(), "unsafe_head", s.Engine.UnsafeL2Head(), "err", err)
stepAttempts = 0
continue
} else if err != nil && errors.Is(err, derive.ErrReset) {
// If the pipeline corrupts, e.g. due to a reorg, simply reset it
s.log.Warn("Derivation pipeline is reset", "err", err)
s.Derivation.Reset()
s.Finalizer.Reset()
s.metrics.RecordPipelineReset()
reqStep()
if err := engine.ResetEngine(s.driverCtx, s.log, s.config, s.Engine, s.l1, s.l2, s.syncCfg, s.SafeHeadNotifs); err != nil {
s.log.Error("Derivation pipeline not ready, failed to reset engine", "err", err)
// Derivation-pipeline will return a new ResetError until we confirm the engine has been successfully reset.
continue
}
s.Derivation.ConfirmEngineReset()
continue
} else if err != nil && errors.Is(err, derive.ErrTemporary) {
s.log.Warn("Derivation process temporary error", "attempts", stepAttempts, "err", err)
reqStep()
continue
} else if err != nil && errors.Is(err, derive.ErrCritical) {
s.log.Error("Derivation process critical error", "err", err)
return
} else if err != nil && errors.Is(err, derive.NotEnoughData) {
stepAttempts = 0 // don't do a backoff for this error
reqStep()
continue
} else if err != nil {
s.log.Error("Derivation process error", "attempts", stepAttempts, "err", err)
reqStep()
continue
} else {
stepAttempts = 0
reqStep() // continue with the next step if we can
}
case respCh := <-s.stateReq: case respCh := <-s.stateReq:
respCh <- struct{}{} respCh <- struct{}{}
case respCh := <-s.forceReset: case respCh := <-s.forceReset:
...@@ -440,6 +366,29 @@ func (s *Driver) eventLoop() { ...@@ -440,6 +366,29 @@ func (s *Driver) eventLoop() {
} }
} }
// OnEvent handles broadcasted events.
// The Driver itself is a deriver to catch system-critical events.
// Other event-handling should be encapsulated into standalone derivers.
func (s *Driver) OnEvent(ev rollup.Event) {
switch x := ev.(type) {
case rollup.CriticalErrorEvent:
s.Log.Error("Derivation process critical error", "err", x.Err)
// we need to unblock event-processing to be able to close
go func() {
logger := s.Log
err := s.Close()
if err != nil {
logger.Error("Failed to shutdown driver on critical error", "err", err)
}
}()
return
}
}
func (s *Driver) Emit(ev rollup.Event) {
s.synchronousEvents.Emit(ev)
}
type SyncDeriver struct { type SyncDeriver struct {
// The derivation pipeline is reset whenever we reorg. // The derivation pipeline is reset whenever we reorg.
// The derivation pipeline determines the new l2Safe. // The derivation pipeline determines the new l2Safe.
...@@ -457,20 +406,101 @@ type SyncDeriver struct { ...@@ -457,20 +406,101 @@ type SyncDeriver struct {
// The engine controller is used by the sequencer & Derivation components. // The engine controller is used by the sequencer & Derivation components.
// We will also use it for EL sync in a future PR. // We will also use it for EL sync in a future PR.
Engine EngineController Engine EngineController
// Sync Mod Config
SyncCfg *sync.Config
Config *rollup.Config
L1 L1Chain
L2 L2Chain
Emitter rollup.EventEmitter
Log log.Logger
Ctx context.Context
Drain func() error
}
func (s *SyncDeriver) OnEvent(ev rollup.Event) {
switch x := ev.(type) {
case StepEvent:
s.onStepEvent()
case rollup.ResetEvent:
s.onResetEvent(x)
case rollup.EngineTemporaryErrorEvent:
s.Log.Warn("Derivation process temporary error", "err", x.Err)
s.Emitter.Emit(StepReqEvent{})
}
}
func (s *SyncDeriver) onStepEvent() {
s.Log.Debug("Sync process step", "onto_origin", s.Derivation.Origin())
// Note: while we refactor the SyncStep to be entirely event-based we have an intermediate phase
// where some things are triggered through events, and some through this synchronous step function.
// We just translate the results into their equivalent events,
// to merge the error-handling with that of the new event-based system.
err := s.SyncStep(s.Ctx)
if err == io.EOF {
s.Log.Debug("Derivation process went idle", "progress", s.Derivation.Origin(), "err", err)
s.Emitter.Emit(ResetStepBackoffEvent{})
s.Emitter.Emit(DeriverIdleEvent{})
} else if err != nil && errors.Is(err, derive.EngineELSyncing) {
s.Log.Debug("Derivation process went idle because the engine is syncing", "progress", s.Derivation.Origin(), "unsafe_head", s.Engine.UnsafeL2Head(), "err", err)
s.Emitter.Emit(ResetStepBackoffEvent{})
} else if err != nil && errors.Is(err, derive.ErrReset) {
s.Emitter.Emit(rollup.ResetEvent{Err: err})
} else if err != nil && errors.Is(err, derive.ErrTemporary) {
s.Emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: err})
} else if err != nil && errors.Is(err, derive.ErrCritical) {
s.Emitter.Emit(rollup.CriticalErrorEvent{Err: err})
} else if err != nil && errors.Is(err, derive.NotEnoughData) {
// don't do a backoff for this error
s.Emitter.Emit(StepReqEvent{ResetBackoff: true})
} else if err != nil {
s.Log.Error("Derivation process error", "err", err)
s.Emitter.Emit(StepReqEvent{})
} else {
s.Emitter.Emit(StepReqEvent{ResetBackoff: true}) // continue with the next step if we can
}
}
func (s *SyncDeriver) onResetEvent(x rollup.ResetEvent) {
// If the pipeline corrupts, e.g. due to a reorg, simply reset it
s.Log.Warn("Derivation pipeline is reset", "err", x.Err)
s.Derivation.Reset()
s.Finalizer.Reset()
s.Emitter.Emit(StepReqEvent{})
if err := engine.ResetEngine(s.Ctx, s.Log, s.Config, s.Engine, s.L1, s.L2, s.SyncCfg, s.SafeHeadNotifs); err != nil {
s.Log.Error("Derivation pipeline not ready, failed to reset engine", "err", err)
// Derivation-pipeline will return a new ResetError until we confirm the engine has been successfully reset.
return
}
s.Derivation.ConfirmEngineReset()
}
type DeriverIdleEvent struct{}
func (d DeriverIdleEvent) String() string {
return "derivation-idle"
} }
// SyncStep performs the sequence of encapsulated syncing steps. // SyncStep performs the sequence of encapsulated syncing steps.
// Warning: this sequence will be broken apart as outlined in op-node derivers design doc. // Warning: this sequence will be broken apart as outlined in op-node derivers design doc.
func (s *SyncDeriver) SyncStep(ctx context.Context) error { func (s *SyncDeriver) SyncStep(ctx context.Context) error {
// If we don't need to call FCU to restore unsafeHead using backupUnsafe, keep going b/c if err := s.Drain(); err != nil {
// this was a no-op(except correcting invalid state when backupUnsafe is empty but TryBackupUnsafeReorg called). return err
if fcuCalled, err := s.Engine.TryBackupUnsafeReorg(ctx); fcuCalled { }
// If we needed to perform a network call, then we should yield even if we did not encounter an error.
s.Emitter.Emit(engine.TryBackupUnsafeReorgEvent{})
if err := s.Drain(); err != nil {
return err return err
} }
// If we don't need to call FCU, keep going b/c this was a no-op. If we needed to
// perform a network call, then we should yield even if we did not encounter an error. s.Emitter.Emit(engine.TryUpdateEngineEvent{})
if err := s.Engine.TryUpdateEngine(ctx); !errors.Is(err, engine.ErrNoFCUNeeded) { if err := s.Drain(); err != nil {
return err return err
} }
...@@ -636,7 +666,7 @@ func (s *Driver) BlockRefWithStatus(ctx context.Context, num uint64) (eth.L2Bloc ...@@ -636,7 +666,7 @@ func (s *Driver) BlockRefWithStatus(ctx context.Context, num uint64) (eth.L2Bloc
select { select {
case s.stateReq <- wait: case s.stateReq <- wait:
resp := s.syncStatus() resp := s.syncStatus()
ref, err := s.l2.L2BlockRefByNumber(ctx, num) ref, err := s.L2.L2BlockRefByNumber(ctx, num)
<-wait <-wait
return ref, resp, err return ref, resp, err
case <-ctx.Done(): case <-ctx.Done():
......
package driver
import (
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/retry"
)
type ResetStepBackoffEvent struct {
}
func (ev ResetStepBackoffEvent) String() string {
return "reset-step-backoff"
}
type StepReqEvent struct {
ResetBackoff bool
}
func (ev StepReqEvent) String() string {
return "step-req"
}
type StepAttemptEvent struct{}
func (ev StepAttemptEvent) String() string {
return "step-attempt"
}
type StepEvent struct{}
func (ev StepEvent) String() string {
return "step"
}
// StepSchedulingDeriver is a deriver that emits StepEvent events.
// The deriver can be requested to schedule a step with a StepReqEvent.
//
// It is then up to the caller to translate scheduling into StepAttemptEvent emissions, by waiting for
// NextStep or NextDelayedStep channels (nil if there is nothing to wait for, for channel-merging purposes).
//
// Upon StepAttemptEvent the scheduler will then emit a StepEvent,
// while maintaining backoff state, to not spam steps.
//
// Backoff can be reset by sending a request with StepReqEvent.ResetBackoff
// set to true, or by sending a ResetStepBackoffEvent.
type StepSchedulingDeriver struct {
// keep track of consecutive failed attempts, to adjust the backoff time accordingly
stepAttempts int
bOffStrategy retry.Strategy
// channel, nil by default (not firing), but used to schedule re-attempts with delay
delayedStepReq <-chan time.Time
// stepReqCh is used to request that the driver attempts to step forward by one L1 block.
stepReqCh chan struct{}
log log.Logger
emitter rollup.EventEmitter
}
func NewStepSchedulingDeriver(log log.Logger, emitter rollup.EventEmitter) *StepSchedulingDeriver {
return &StepSchedulingDeriver{
stepAttempts: 0,
bOffStrategy: retry.Exponential(),
stepReqCh: make(chan struct{}, 1),
delayedStepReq: nil,
log: log,
emitter: emitter,
}
}
// NextStep is a channel to await, and if triggered,
// the caller should emit a StepAttemptEvent to queue up a step while maintaining backoff.
func (s *StepSchedulingDeriver) NextStep() <-chan struct{} {
return s.stepReqCh
}
// NextDelayedStep is a temporary channel to await, and if triggered,
// the caller should emit a StepAttemptEvent to queue up a step while maintaining backoff.
// The returned channel may be nil, if there is no requested step with delay scheduled.
func (s *StepSchedulingDeriver) NextDelayedStep() <-chan time.Time {
return s.delayedStepReq
}
func (s *StepSchedulingDeriver) OnEvent(ev rollup.Event) {
step := func() {
s.delayedStepReq = nil
select {
case s.stepReqCh <- struct{}{}:
// Don't deadlock if the channel is already full
default:
}
}
switch x := ev.(type) {
case StepReqEvent:
if x.ResetBackoff {
s.stepAttempts = 0
}
if s.stepAttempts > 0 {
// if this is not the first attempt, we re-schedule with a backoff, *without blocking other events*
if s.delayedStepReq == nil {
delay := s.bOffStrategy.Duration(s.stepAttempts)
s.log.Debug("scheduling re-attempt with delay", "attempts", s.stepAttempts, "delay", delay)
s.delayedStepReq = time.After(delay)
} else {
s.log.Debug("ignoring step request, already scheduled re-attempt after previous failure", "attempts", s.stepAttempts)
}
} else {
step()
}
case StepAttemptEvent:
// clear the delayed-step channel
s.delayedStepReq = nil
if s.stepAttempts > 0 {
s.log.Debug("Running step retry", "attempts", s.stepAttempts)
}
// count as attempt by default. We reset to 0 if we are making healthy progress.
s.stepAttempts += 1
s.emitter.Emit(StepEvent{})
case ResetStepBackoffEvent:
s.stepAttempts = 0
}
}
package driver
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
func TestStepSchedulingDeriver(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
var queued []rollup.Event
emitter := rollup.EmitterFunc(func(ev rollup.Event) {
queued = append(queued, ev)
})
sched := NewStepSchedulingDeriver(logger, emitter)
require.Len(t, sched.NextStep(), 0, "start empty")
sched.OnEvent(StepReqEvent{})
require.Len(t, sched.NextStep(), 1, "take request")
sched.OnEvent(StepReqEvent{})
require.Len(t, sched.NextStep(), 1, "ignore duplicate request")
require.Empty(t, queued, "only scheduled so far, no step attempts yet")
<-sched.NextStep()
sched.OnEvent(StepAttemptEvent{})
require.Equal(t, []rollup.Event{StepEvent{}}, queued, "got step event")
require.Nil(t, sched.NextDelayedStep(), "no delayed steps yet")
sched.OnEvent(StepReqEvent{})
require.NotNil(t, sched.NextDelayedStep(), "2nd attempt before backoff reset causes delayed step to be scheduled")
sched.OnEvent(StepReqEvent{})
require.NotNil(t, sched.NextDelayedStep(), "can continue to request attempts")
sched.OnEvent(StepReqEvent{})
require.Len(t, sched.NextStep(), 0, "no step requests accepted without delay if backoff is counting")
sched.OnEvent(StepReqEvent{ResetBackoff: true})
require.Len(t, sched.NextStep(), 1, "request accepted if backoff is reset")
<-sched.NextStep()
sched.OnEvent(StepReqEvent{})
require.Len(t, sched.NextStep(), 1, "no backoff, no attempt has been made yet")
<-sched.NextStep()
sched.OnEvent(StepAttemptEvent{})
sched.OnEvent(StepReqEvent{})
require.Len(t, sched.NextStep(), 0, "backoff again")
sched.OnEvent(ResetStepBackoffEvent{})
sched.OnEvent(StepReqEvent{})
require.Len(t, sched.NextStep(), 1, "reset backoff accepted, was able to schedule non-delayed step")
}
package driver
import (
"context"
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
)
// Don't queue up an endless number of events.
// At some point it's better to drop events and warn something is exploding the number of events.
const sanityEventLimit = 1000
// SynchronousEvents is a rollup.EventEmitter that a rollup.Deriver can emit events to.
// The events will be queued up, and can then be executed synchronously by calling the Drain function,
// which will apply all events to the root Deriver.
// New events may be queued up while events are being processed by the root rollup.Deriver.
type SynchronousEvents struct {
// The lock is no-op in FP execution, if running in synchronous FP-VM.
// This lock ensures that all emitted events are merged together correctly,
// if this util is used in a concurrent context.
evLock sync.Mutex
events []rollup.Event
log log.Logger
ctx context.Context
root rollup.Deriver
}
func NewSynchronousEvents(log log.Logger, ctx context.Context, root rollup.Deriver) *SynchronousEvents {
return &SynchronousEvents{
log: log,
ctx: ctx,
root: root,
}
}
func (s *SynchronousEvents) Emit(event rollup.Event) {
s.evLock.Lock()
defer s.evLock.Unlock()
if s.ctx.Err() != nil {
s.log.Warn("Ignoring emitted event during shutdown", "event", event)
return
}
// sanity limit, never queue too many events
if len(s.events) >= sanityEventLimit {
s.log.Error("Something is very wrong, queued up too many events! Dropping event", "ev", event)
return
}
s.events = append(s.events, event)
}
func (s *SynchronousEvents) Drain() error {
for {
if s.ctx.Err() != nil {
return s.ctx.Err()
}
if len(s.events) == 0 {
return nil
}
s.evLock.Lock()
first := s.events[0]
s.events = s.events[1:]
s.evLock.Unlock()
s.root.OnEvent(first)
}
}
var _ rollup.EventEmitter = (*SynchronousEvents)(nil)
package driver
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
type TestEvent struct{}
func (ev TestEvent) String() string {
return "X"
}
func TestSynchronousEvents(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
ctx, cancel := context.WithCancel(context.Background())
count := 0
deriver := rollup.DeriverFunc(func(ev rollup.Event) {
count += 1
})
syncEv := NewSynchronousEvents(logger, ctx, deriver)
require.NoError(t, syncEv.Drain(), "can drain, even if empty")
syncEv.Emit(TestEvent{})
require.Equal(t, 0, count, "no processing yet, queued event")
require.NoError(t, syncEv.Drain())
require.Equal(t, 1, count, "processed event")
syncEv.Emit(TestEvent{})
syncEv.Emit(TestEvent{})
require.Equal(t, 1, count, "no processing yet, queued events")
require.NoError(t, syncEv.Drain())
require.Equal(t, 3, count, "processed events")
cancel()
syncEv.Emit(TestEvent{})
require.Equal(t, ctx.Err(), syncEv.Drain(), "no draining after close")
require.Equal(t, 3, count, "didn't process event after trigger close")
}
func TestSynchronousEventsSanityLimit(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
count := 0
deriver := rollup.DeriverFunc(func(ev rollup.Event) {
count += 1
})
syncEv := NewSynchronousEvents(logger, context.Background(), deriver)
// emit 1 too many events
for i := 0; i < sanityEventLimit+1; i++ {
syncEv.Emit(TestEvent{})
}
require.NoError(t, syncEv.Drain())
require.Equal(t, sanityEventLimit, count, "processed all non-dropped events")
syncEv.Emit(TestEvent{})
require.NoError(t, syncEv.Drain())
require.Equal(t, sanityEventLimit+1, count, "back to normal after drain")
}
type CyclicEvent struct {
Count int
}
func (ev CyclicEvent) String() string {
return "cyclic-event"
}
func TestSynchronousCyclic(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
var emitter rollup.EventEmitter
result := false
deriver := rollup.DeriverFunc(func(ev rollup.Event) {
logger.Info("received event", "event", ev)
switch x := ev.(type) {
case CyclicEvent:
if x.Count < 10 {
emitter.Emit(CyclicEvent{Count: x.Count + 1})
} else {
result = true
}
}
})
syncEv := NewSynchronousEvents(logger, context.Background(), deriver)
emitter = syncEv
syncEv.Emit(CyclicEvent{Count: 0})
require.NoError(t, syncEv.Drain())
require.True(t, result, "expecting event processing to fully recurse")
}
package engine
import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type TryBackupUnsafeReorgEvent struct {
}
func (ev TryBackupUnsafeReorgEvent) String() string {
return "try-backup-unsafe-reorg"
}
type TryUpdateEngineEvent struct {
}
func (ev TryUpdateEngineEvent) String() string {
return "try-update-engine"
}
type EngDeriver struct {
log log.Logger
cfg *rollup.Config
ec *EngineController
ctx context.Context
emitter rollup.EventEmitter
}
var _ rollup.Deriver = (*EngDeriver)(nil)
func NewEngDeriver(log log.Logger, ctx context.Context, cfg *rollup.Config,
ec *EngineController, emitter rollup.EventEmitter) *EngDeriver {
return &EngDeriver{
log: log,
cfg: cfg,
ec: ec,
ctx: ctx,
emitter: emitter,
}
}
func (d *EngDeriver) OnEvent(ev rollup.Event) {
switch ev.(type) {
case TryBackupUnsafeReorgEvent:
// If we don't need to call FCU to restore unsafeHead using backupUnsafe, keep going b/c
// this was a no-op(except correcting invalid state when backupUnsafe is empty but TryBackupUnsafeReorg called).
fcuCalled, err := d.ec.TryBackupUnsafeReorg(d.ctx)
// Dealing with legacy here: it used to skip over the error-handling if fcuCalled was false.
// But that combination is not actually a code-path in TryBackupUnsafeReorg.
// We should drop fcuCalled, and make the function emit events directly,
// once there are no more synchronous callers.
if !fcuCalled && err != nil {
d.log.Crit("unexpected TryBackupUnsafeReorg error after no FCU call", "err", err)
}
if err != nil {
// If we needed to perform a network call, then we should yield even if we did not encounter an error.
if errors.Is(err, derive.ErrReset) {
d.emitter.Emit(rollup.ResetEvent{Err: err})
} else if errors.Is(err, derive.ErrTemporary) {
d.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: err})
} else {
d.emitter.Emit(rollup.CriticalErrorEvent{Err: fmt.Errorf("unexpected TryBackupUnsafeReorg error type: %w", err)})
}
}
case TryUpdateEngineEvent:
// If we don't need to call FCU, keep going b/c this was a no-op. If we needed to
// perform a network call, then we should yield even if we did not encounter an error.
if err := d.ec.TryUpdateEngine(d.ctx); err != nil && !errors.Is(err, ErrNoFCUNeeded) {
if errors.Is(err, derive.ErrReset) {
d.emitter.Emit(rollup.ResetEvent{Err: err})
} else if errors.Is(err, derive.ErrTemporary) {
d.emitter.Emit(rollup.EngineTemporaryErrorEvent{Err: err})
} else {
d.emitter.Emit(rollup.CriticalErrorEvent{Err: fmt.Errorf("unexpected TryUpdateEngine error type: %w", err)})
}
}
}
}
package rollup
import "github.com/ethereum/go-ethereum/log"
type Event interface {
String() string
}
type Deriver interface {
OnEvent(ev Event)
}
type EventEmitter interface {
Emit(ev Event)
}
type EmitterFunc func(ev Event)
func (fn EmitterFunc) Emit(ev Event) {
fn(ev)
}
type EngineTemporaryErrorEvent struct {
Err error
}
var _ Event = EngineTemporaryErrorEvent{}
func (ev EngineTemporaryErrorEvent) String() string {
return "engine-temporary-error"
}
type ResetEvent struct {
Err error
}
var _ Event = ResetEvent{}
func (ev ResetEvent) String() string {
return "reset-event"
}
type CriticalErrorEvent struct {
Err error
}
var _ Event = CriticalErrorEvent{}
func (ev CriticalErrorEvent) String() string {
return "critical-error"
}
type SynchronousDerivers []Deriver
func (s *SynchronousDerivers) OnEvent(ev Event) {
for _, d := range *s {
d.OnEvent(ev)
}
}
var _ Deriver = (*SynchronousDerivers)(nil)
type DebugDeriver struct {
Log log.Logger
}
func (d DebugDeriver) OnEvent(ev Event) {
d.Log.Debug("on-event", "event", ev)
}
type NoopDeriver struct{}
func (d NoopDeriver) OnEvent(ev Event) {}
// DeriverFunc implements the Deriver interface as a function,
// similar to how the std-lib http HandlerFunc implements a Handler.
// This can be used for small in-place derivers, test helpers, etc.
type DeriverFunc func(ev Event)
func (fn DeriverFunc) OnEvent(ev Event) {
fn(ev)
}
package rollup
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
)
type TestEvent struct{}
func (ev TestEvent) String() string {
return "X"
}
func TestSynchronousDerivers_OnEvent(t *testing.T) {
result := ""
a := DeriverFunc(func(ev Event) {
result += fmt.Sprintf("A:%s\n", ev)
})
b := DeriverFunc(func(ev Event) {
result += fmt.Sprintf("B:%s\n", ev)
})
c := DeriverFunc(func(ev Event) {
result += fmt.Sprintf("C:%s\n", ev)
})
x := SynchronousDerivers{}
x.OnEvent(TestEvent{})
require.Equal(t, "", result)
x = SynchronousDerivers{a}
x.OnEvent(TestEvent{})
require.Equal(t, "A:X\n", result)
result = ""
x = SynchronousDerivers{a, a}
x.OnEvent(TestEvent{})
require.Equal(t, "A:X\nA:X\n", result)
result = ""
x = SynchronousDerivers{a, b}
x.OnEvent(TestEvent{})
require.Equal(t, "A:X\nB:X\n", result)
result = ""
x = SynchronousDerivers{a, b, c}
x.OnEvent(TestEvent{})
require.Equal(t, "A:X\nB:X\nC:X\n", result)
}
package safego
// NoCopy is a super simple safety util taken from the Go atomic lib.
//
// NoCopy may be added to structs which must not be copied
// after the first use.
//
// The NoCopy struct is empty, so should be a zero-cost util at runtime.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
//
// Note that it must not be embedded, due to the Lock and Unlock methods.
//
// Like:
// ```
//
// type Example {
// V uint64
// _ NoCopy
// }
//
// Then run: `go vet -copylocks .`
// ```
type NoCopy struct{}
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*NoCopy) Lock() {}
func (*NoCopy) Unlock() {}
...@@ -69,3 +69,6 @@ func (n *TestRPCMetrics) RecordRPCClientRequest(method string) func(err error) { ...@@ -69,3 +69,6 @@ func (n *TestRPCMetrics) RecordRPCClientRequest(method string) func(err error) {
func (n *TestRPCMetrics) RecordRPCClientResponse(method string, err error) {} func (n *TestRPCMetrics) RecordRPCClientResponse(method string, err error) {}
func (t *TestDerivationMetrics) SetDerivationIdle(idle bool) {} func (t *TestDerivationMetrics) SetDerivationIdle(idle bool) {}
func (t *TestDerivationMetrics) RecordPipelineReset() {
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment