Commit 029fb1cc authored by protolambda's avatar protolambda Committed by GitHub

op-node: event-system with tracing (#11099)

* op-node: event-system with tracing

* op-node: fix global synchronous events queue execution

* op-node: bump rate limits on action tests

* op-node: add missing read-lock to synchronous event executor

* op-node: automatically attach emitters to derivers that need one
parent bda36504
......@@ -110,7 +110,7 @@ func (s *L2Sequencer) ActL2EndBlock(t Testing) {
// This will ensure the sync-status and such reflect the latest changes.
s.synchronousEvents.Emit(engine.TryUpdateEngineEvent{})
s.synchronousEvents.Emit(engine.ForkchoiceRequestEvent{})
require.NoError(t, s.synchronousEvents.DrainUntil(func(ev event.Event) bool {
require.NoError(t, s.drainer.DrainUntil(func(ev event.Event) bool {
x, ok := ev.(engine.ForkchoiceUpdateEvent)
return ok && x.UnsafeL2Head == s.engine.UnsafeL2Head()
}, false))
......
......@@ -35,23 +35,23 @@ import (
// L2Verifier is an actor that functions like a rollup node,
// without the full P2P/API/Node stack, but just the derivation state, and simplified driver.
type L2Verifier struct {
eventSys event.System
log log.Logger
eng L2API
syncStatus driver.SyncStatusTracker
synchronousEvents event.EmitterDrainer
synchronousEvents event.Emitter
syncDeriver *driver.SyncDeriver
drainer event.Drainer
// L2 rollup
engine *engine.EngineController
derivation *derive.DerivationPipeline
clSync *clsync.CLSync
safeHeadListener rollup.SafeHeadListener
finalizer driver.Finalizer
syncCfg *sync.Config
l1 derive.L1Fetcher
......@@ -88,37 +88,53 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
rootDeriver := &event.DeriverMux{}
var synchronousEvents event.EmitterDrainer
synchronousEvents = event.NewQueue(log, ctx, rootDeriver, event.NoopMetrics{})
synchronousEvents = event.NewLimiterDrainer(ctx, synchronousEvents, rate.Limit(1000), 20, func() {
executor := event.NewGlobalSynchronous(ctx)
sys := event.NewSystem(log, executor)
t.Cleanup(sys.Stop)
opts := event.DefaultRegisterOpts()
opts.Emitter = event.EmitterOpts{
Limiting: true,
// TestSyncBatchType/DerivationWithFlakyL1RPC does *a lot* of quick retries
// TestL2BatcherBatchType/ExtendedTimeWithoutL1Batches as well.
Rate: rate.Limit(100_000),
Burst: 100_000,
OnLimited: func() {
log.Warn("Hitting events rate-limit. An events code-path may be hot-looping.")
t.Fatal("Tests must not hot-loop events")
})
},
}
metrics := &testutils.TestDerivationMetrics{}
ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg, synchronousEvents)
engineResetDeriver := engine.NewEngineResetDeriver(ctx, log, cfg, l1, eng, syncCfg, synchronousEvents)
ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg,
sys.Register("engine-controller", nil, opts))
clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents)
sys.Register("engine-reset",
engine.NewEngineResetDeriver(ctx, log, cfg, l1, eng, syncCfg), opts)
clSync := clsync.NewCLSync(log, cfg, metrics)
sys.Register("cl-sync", clSync, opts)
var finalizer driver.Finalizer
if cfg.PlasmaEnabled() {
finalizer = finality.NewPlasmaFinalizer(ctx, log, cfg, l1, synchronousEvents, plasmaSrc)
finalizer = finality.NewPlasmaFinalizer(ctx, log, cfg, l1, plasmaSrc)
} else {
finalizer = finality.NewFinalizer(ctx, log, cfg, l1, synchronousEvents)
finalizer = finality.NewFinalizer(ctx, log, cfg, l1)
}
sys.Register("finalizer", finalizer, opts)
attributesHandler := attributes.NewAttributesHandler(log, cfg, ctx, eng, synchronousEvents)
sys.Register("attributes-handler",
attributes.NewAttributesHandler(log, cfg, ctx, eng), opts)
pipeline := derive.NewDerivationPipeline(log, cfg, l1, blobsSrc, plasmaSrc, eng, metrics)
pipelineDeriver := derive.NewPipelineDeriver(ctx, pipeline, synchronousEvents)
sys.Register("pipeline", derive.NewPipelineDeriver(ctx, pipeline), opts)
testActionEmitter := sys.Register("test-action", nil, opts)
syncStatusTracker := status.NewStatusTracker(log, metrics)
sys.Register("status", syncStatusTracker, opts)
syncDeriver := &driver.SyncDeriver{
sys.Register("sync", &driver.SyncDeriver{
Derivation: pipeline,
Finalizer: finalizer,
SafeHeadNotifs: safeHeadListener,
CLSync: clSync,
Engine: ec,
......@@ -126,44 +142,31 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
Config: cfg,
L1: l1,
L2: eng,
Emitter: synchronousEvents,
Log: log,
Ctx: ctx,
Drain: synchronousEvents.Drain,
}
Drain: executor.Drain,
}, opts)
engDeriv := engine.NewEngDeriver(log, ctx, cfg, ec, synchronousEvents)
sys.Register("engine", engine.NewEngDeriver(log, ctx, cfg, ec), opts)
rollupNode := &L2Verifier{
eventSys: sys,
log: log,
eng: eng,
engine: ec,
clSync: clSync,
derivation: pipeline,
finalizer: finalizer,
safeHeadListener: safeHeadListener,
syncCfg: syncCfg,
syncDeriver: syncDeriver,
drainer: executor,
l1: l1,
syncStatus: syncStatusTracker,
l2PipelineIdle: true,
l2Building: false,
rollupCfg: cfg,
rpc: rpc.NewServer(),
synchronousEvents: synchronousEvents,
}
*rootDeriver = event.DeriverMux{
syncStatusTracker,
syncDeriver,
engineResetDeriver,
engDeriv,
rollupNode,
clSync,
pipelineDeriver,
attributesHandler,
finalizer,
synchronousEvents: testActionEmitter,
}
sys.Register("verifier", rollupNode, opts)
t.Cleanup(rollupNode.rpc.Stop)
......@@ -280,7 +283,7 @@ func (s *L2Verifier) ActL1HeadSignal(t Testing) {
head, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err)
s.synchronousEvents.Emit(status.L1UnsafeEvent{L1Unsafe: head})
require.NoError(t, s.synchronousEvents.DrainUntil(func(ev event.Event) bool {
require.NoError(t, s.drainer.DrainUntil(func(ev event.Event) bool {
x, ok := ev.(status.L1UnsafeEvent)
return ok && x.L1Unsafe == head
}, false))
......@@ -291,7 +294,7 @@ func (s *L2Verifier) ActL1SafeSignal(t Testing) {
safe, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Safe)
require.NoError(t, err)
s.synchronousEvents.Emit(status.L1SafeEvent{L1Safe: safe})
require.NoError(t, s.synchronousEvents.DrainUntil(func(ev event.Event) bool {
require.NoError(t, s.drainer.DrainUntil(func(ev event.Event) bool {
x, ok := ev.(status.L1SafeEvent)
return ok && x.L1Safe == safe
}, false))
......@@ -302,14 +305,14 @@ func (s *L2Verifier) ActL1FinalizedSignal(t Testing) {
finalized, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Finalized)
require.NoError(t, err)
s.synchronousEvents.Emit(finality.FinalizeL1Event{FinalizedL1: finalized})
require.NoError(t, s.synchronousEvents.DrainUntil(func(ev event.Event) bool {
require.NoError(t, s.drainer.DrainUntil(func(ev event.Event) bool {
x, ok := ev.(finality.FinalizeL1Event)
return ok && x.FinalizedL1 == finalized
}, false))
require.Equal(t, finalized, s.syncStatus.SyncStatus().FinalizedL1)
}
func (s *L2Verifier) OnEvent(ev event.Event) {
func (s *L2Verifier) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case rollup.L1TemporaryErrorEvent:
s.log.Warn("L1 temporary error", "err", x.Err)
......@@ -324,7 +327,10 @@ func (s *L2Verifier) OnEvent(ev event.Event) {
panic(fmt.Errorf("derivation failed critically: %w", x.Err))
case derive.DeriverIdleEvent:
s.l2PipelineIdle = true
default:
return false
}
return true
}
func (s *L2Verifier) ActL2EventsUntilPending(t Testing, num uint64) {
......@@ -341,7 +347,7 @@ func (s *L2Verifier) ActL2EventsUntil(t Testing, fn func(ev event.Event) bool, m
return
}
for i := 0; i < max; i++ {
err := s.synchronousEvents.DrainUntil(fn, excl)
err := s.drainer.DrainUntil(fn, excl)
if err == nil {
return
}
......@@ -367,8 +373,8 @@ func (s *L2Verifier) ActL2PipelineFull(t Testing) {
t.InvalidAction("cannot derive new data while building L2 block")
return
}
s.syncDeriver.Emitter.Emit(driver.StepEvent{})
require.NoError(t, s.syncDeriver.Drain(), "complete all event processing triggered by deriver step")
s.synchronousEvents.Emit(driver.StepEvent{})
require.NoError(t, s.drainer.Drain(), "complete all event processing triggered by deriver step")
}
}
......
......@@ -37,18 +37,21 @@ type AttributesHandler struct {
attributes *derive.AttributesWithParent
}
func NewAttributesHandler(log log.Logger, cfg *rollup.Config, ctx context.Context, l2 L2, emitter event.Emitter) *AttributesHandler {
func NewAttributesHandler(log log.Logger, cfg *rollup.Config, ctx context.Context, l2 L2) *AttributesHandler {
return &AttributesHandler{
log: log,
cfg: cfg,
ctx: ctx,
l2: l2,
emitter: emitter,
attributes: nil,
}
}
func (eq *AttributesHandler) OnEvent(ev event.Event) {
func (eq *AttributesHandler) AttachEmitter(em event.Emitter) {
eq.emitter = em
}
func (eq *AttributesHandler) OnEvent(ev event.Event) bool {
// Events may be concurrent in the future. Prevent unsafe concurrent modifications to the attributes.
eq.mu.Lock()
defer eq.mu.Unlock()
......@@ -68,7 +71,10 @@ func (eq *AttributesHandler) OnEvent(ev event.Event) {
// Time to re-evaluate without attributes.
// (the pending-safe state will then be forwarded to our source of attributes).
eq.emitter.Emit(engine.PendingSafeRequestEvent{})
default:
return false
}
return true
}
// onPendingSafeUpdate applies the queued-up block attributes, if any, on top of the signaled pending state.
......
......@@ -161,7 +161,8 @@ func TestAttributesHandler(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
ah := NewAttributesHandler(logger, cfg, context.Background(), l2)
ah.AttachEmitter(emitter)
emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
......@@ -182,7 +183,8 @@ func TestAttributesHandler(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
ah := NewAttributesHandler(logger, cfg, context.Background(), l2)
ah.AttachEmitter(emitter)
emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
......@@ -204,7 +206,8 @@ func TestAttributesHandler(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
ah := NewAttributesHandler(logger, cfg, context.Background(), l2)
ah.AttachEmitter(emitter)
emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
......@@ -229,7 +232,8 @@ func TestAttributesHandler(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
ah := NewAttributesHandler(logger, cfg, context.Background(), l2)
ah.AttachEmitter(emitter)
// attrA1Alt does not match block A1, so will cause force-reorg.
emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
......@@ -264,7 +268,8 @@ func TestAttributesHandler(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
ah := NewAttributesHandler(logger, cfg, context.Background(), l2)
ah.AttachEmitter(emitter)
attr := &derive.AttributesWithParent{
Attributes: attrA1.Attributes, // attributes will match, passing consolidation
......@@ -316,7 +321,8 @@ func TestAttributesHandler(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
ah := NewAttributesHandler(logger, cfg, context.Background(), l2)
ah.AttachEmitter(emitter)
emitter.ExpectOnce(derive.ConfirmReceivedAttributesEvent{})
emitter.ExpectOnce(engine.PendingSafeRequestEvent{})
......@@ -351,7 +357,8 @@ func TestAttributesHandler(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
ah := NewAttributesHandler(logger, cfg, context.Background(), l2)
ah.AttachEmitter(emitter)
emitter.ExpectOnceType("ResetEvent")
ah.OnEvent(engine.PendingSafeUpdateEvent{
......@@ -366,7 +373,8 @@ func TestAttributesHandler(t *testing.T) {
logger := testlog.Logger(t, log.LevelInfo)
l2 := &testutils.MockL2Client{}
emitter := &testutils.MockEmitter{}
ah := NewAttributesHandler(logger, cfg, context.Background(), l2, emitter)
ah := NewAttributesHandler(logger, cfg, context.Background(), l2)
ah.AttachEmitter(emitter)
// If there are no attributes, we expect the pipeline to be requested to generate attributes.
emitter.ExpectOnce(derive.PipelineStepEvent{PendingSafe: refA1})
......
......@@ -33,16 +33,19 @@ type CLSync struct {
unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates
}
func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, emitter event.Emitter) *CLSync {
func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics) *CLSync {
return &CLSync{
log: log,
cfg: cfg,
metrics: metrics,
emitter: emitter,
unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize),
}
}
func (eq *CLSync) AttachEmitter(em event.Emitter) {
eq.emitter = em
}
// LowestQueuedUnsafeBlock retrieves the first queued-up L2 unsafe payload, or a zeroed reference if there is none.
func (eq *CLSync) LowestQueuedUnsafeBlock() eth.L2BlockRef {
payload := eq.unsafePayloads.Peek()
......@@ -64,7 +67,7 @@ func (ev ReceivedUnsafePayloadEvent) String() string {
return "received-unsafe-payload"
}
func (eq *CLSync) OnEvent(ev event.Event) {
func (eq *CLSync) OnEvent(ev event.Event) bool {
// Events may be concurrent in the future. Prevent unsafe concurrent modifications to the payloads queue.
eq.mu.Lock()
defer eq.mu.Unlock()
......@@ -76,7 +79,10 @@ func (eq *CLSync) OnEvent(ev event.Event) {
eq.onForkchoiceUpdate(x)
case ReceivedUnsafePayloadEvent:
eq.onUnsafePayload(x)
default:
return false
}
return true
}
// onInvalidPayload checks if the first next-up payload matches the invalid payload.
......
......@@ -127,7 +127,8 @@ func TestCLSync(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
cl := NewCLSync(logger, cfg, metrics)
cl.AttachEmitter(emitter)
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1})
......@@ -148,7 +149,8 @@ func TestCLSync(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
cl := NewCLSync(logger, cfg, metrics)
cl.AttachEmitter(emitter)
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1})
......@@ -170,7 +172,8 @@ func TestCLSync(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
cl := NewCLSync(logger, cfg, metrics)
cl.AttachEmitter(emitter)
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1})
......@@ -190,7 +193,8 @@ func TestCLSync(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
cl := NewCLSync(logger, cfg, metrics)
cl.AttachEmitter(emitter)
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA2})
......@@ -210,7 +214,8 @@ func TestCLSync(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
cl := NewCLSync(logger, cfg, metrics)
cl.AttachEmitter(emitter)
emitter.AssertExpectations(t) // nothing to process yet
require.Nil(t, cl.unsafePayloads.Peek(), "no payloads yet")
......@@ -268,7 +273,8 @@ func TestCLSync(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
cl := NewCLSync(logger, cfg, metrics)
cl.AttachEmitter(emitter)
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1})
......@@ -312,7 +318,8 @@ func TestCLSync(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
cl := NewCLSync(logger, cfg, metrics)
cl.AttachEmitter(emitter)
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
cl.OnEvent(ReceivedUnsafePayloadEvent{Envelope: payloadA1})
......@@ -352,7 +359,8 @@ func TestCLSync(t *testing.T) {
t.Run("invalid payload error", func(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
emitter := &testutils.MockEmitter{}
cl := NewCLSync(logger, cfg, metrics, emitter)
cl := NewCLSync(logger, cfg, metrics)
cl.AttachEmitter(emitter)
// CLSync gets payload and requests engine state, to later determine if payload should be forwarded
emitter.ExpectOnce(engine.ForkchoiceRequestEvent{})
......
......@@ -73,15 +73,18 @@ type PipelineDeriver struct {
needAttributesConfirmation bool
}
func NewPipelineDeriver(ctx context.Context, pipeline *DerivationPipeline, emitter event.Emitter) *PipelineDeriver {
func NewPipelineDeriver(ctx context.Context, pipeline *DerivationPipeline) *PipelineDeriver {
return &PipelineDeriver{
pipeline: pipeline,
ctx: ctx,
emitter: emitter,
}
}
func (d *PipelineDeriver) OnEvent(ev event.Event) {
func (d *PipelineDeriver) AttachEmitter(em event.Emitter) {
d.emitter = em
}
func (d *PipelineDeriver) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case rollup.ResetEvent:
d.pipeline.Reset()
......@@ -89,7 +92,7 @@ func (d *PipelineDeriver) OnEvent(ev event.Event) {
// Don't generate attributes if there are already attributes in-flight
if d.needAttributesConfirmation {
d.pipeline.log.Debug("Previously sent attributes are unconfirmed to be received")
return
return true
}
d.pipeline.log.Trace("Derivation pipeline step", "onto_origin", d.pipeline.Origin())
preOrigin := d.pipeline.Origin()
......@@ -128,5 +131,8 @@ func (d *PipelineDeriver) OnEvent(ev event.Event) {
d.pipeline.ConfirmEngineReset()
case ConfirmReceivedAttributesEvent:
d.needAttributesConfirmation = false
default:
return false
}
return true
}
......@@ -4,8 +4,6 @@ import (
"context"
"time"
"golang.org/x/time/rate"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
......@@ -50,7 +48,6 @@ type Metrics interface {
L1FetcherMetrics
SequencerMetrics
event.Metrics
RecordEventsRateLimited()
}
type L1Chain interface {
......@@ -154,14 +151,6 @@ type SequencerStateListener interface {
SequencerStopped() error
}
// 10,000 events per second is plenty.
// If we are going through more events, the driver needs to breathe, and warn the user of a potential issue.
const eventsLimit = rate.Limit(10_000)
// 500 events of burst: the maximum amount of events to eat up
// past the rate limit before the rate limit becomes applicable.
const eventsBurst = 500
// NewDriver composes an events handler that tracks L1 state, triggers L2 Derivation, and optionally sequences new L2 blocks.
func NewDriver(
driverCfg *Config,
......@@ -180,34 +169,52 @@ func NewDriver(
plasma PlasmaIface,
) *Driver {
driverCtx, driverCancel := context.WithCancel(context.Background())
rootDeriver := &event.DeriverMux{}
var synchronousEvents event.EmitterDrainer
synchronousEvents = event.NewQueue(log, driverCtx, rootDeriver, metrics)
synchronousEvents = event.NewLimiterDrainer(context.Background(), synchronousEvents, eventsLimit, eventsBurst, func() {
metrics.RecordEventsRateLimited()
log.Warn("Driver is hitting events rate limit.")
})
var executor event.Executor
var drain func() error
// This instantiation will be one of more options: soon there will be a parallel events executor
{
s := event.NewGlobalSynchronous(driverCtx)
executor = s
drain = s.Drain
}
sys := event.NewSystem(log, executor)
opts := event.DefaultRegisterOpts()
statusTracker := status.NewStatusTracker(log, metrics)
sys.Register("status", statusTracker, opts)
l1 = NewMeteredL1Fetcher(l1, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, statusTracker.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, statusTracker.L1Head, l1)
ec := engine.NewEngineController(l2, log, metrics, cfg, syncCfg, synchronousEvents)
engineResetDeriver := engine.NewEngineResetDeriver(driverCtx, log, cfg, l1, l2, syncCfg, synchronousEvents)
clSync := clsync.NewCLSync(log, cfg, metrics, synchronousEvents)
ec := engine.NewEngineController(l2, log, metrics, cfg, syncCfg,
sys.Register("engine-controller", nil, opts))
sys.Register("engine-reset",
engine.NewEngineResetDeriver(driverCtx, log, cfg, l1, l2, syncCfg), opts)
clSync := clsync.NewCLSync(log, cfg, metrics) // alt-sync still uses cl-sync state to determine what to sync to
sys.Register("cl-sync", clSync, opts)
var finalizer Finalizer
if cfg.PlasmaEnabled() {
finalizer = finality.NewPlasmaFinalizer(driverCtx, log, cfg, l1, synchronousEvents, plasma)
finalizer = finality.NewPlasmaFinalizer(driverCtx, log, cfg, l1, plasma)
} else {
finalizer = finality.NewFinalizer(driverCtx, log, cfg, l1, synchronousEvents)
finalizer = finality.NewFinalizer(driverCtx, log, cfg, l1)
}
sys.Register("finalizer", finalizer, opts)
sys.Register("attributes-handler",
attributes.NewAttributesHandler(log, cfg, driverCtx, l2), opts)
attributesHandler := attributes.NewAttributesHandler(log, cfg, driverCtx, l2, synchronousEvents)
derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, plasma, l2, metrics)
pipelineDeriver := derive.NewPipelineDeriver(driverCtx, derivationPipeline, synchronousEvents)
sys.Register("pipeline",
derive.NewPipelineDeriver(driverCtx, derivationPipeline), opts)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
meteredEngine := NewMeteredEngine(cfg, ec, metrics, log) // Only use the metered engine in the sequencer b/c it records sequencing metrics.
sequencer := NewSequencer(log, cfg, meteredEngine, attrBuilder, findL1Origin, metrics)
......@@ -215,7 +222,6 @@ func NewDriver(
syncDeriver := &SyncDeriver{
Derivation: derivationPipeline,
Finalizer: finalizer,
SafeHeadNotifs: safeHeadListener,
CLSync: clSync,
Engine: ec,
......@@ -223,19 +229,25 @@ func NewDriver(
Config: cfg,
L1: l1,
L2: l2,
Emitter: synchronousEvents,
Log: log,
Ctx: driverCtx,
Drain: synchronousEvents.Drain,
Drain: drain,
}
engDeriv := engine.NewEngDeriver(log, driverCtx, cfg, ec, synchronousEvents)
schedDeriv := NewStepSchedulingDeriver(log, synchronousEvents)
sys.Register("sync", syncDeriver, opts)
sys.Register("engine", engine.NewEngDeriver(log, driverCtx, cfg, ec), opts)
schedDeriv := NewStepSchedulingDeriver(log)
sys.Register("step-scheduler", schedDeriv, opts)
driverEmitter := sys.Register("driver", nil, opts)
driver := &Driver{
eventSys: sys,
statusTracker: statusTracker,
SyncDeriver: syncDeriver,
sched: schedDeriv,
synchronousEvents: synchronousEvents,
emitter: driverEmitter,
drain: drain,
stateReq: make(chan chan struct{}),
forceReset: make(chan chan struct{}, 10),
startSequencer: make(chan hashAndErrorChannel, 10),
......@@ -258,18 +270,5 @@ func NewDriver(
sequencerConductor: sequencerConductor,
}
*rootDeriver = []event.Deriver{
syncDeriver,
engineResetDeriver,
engDeriv,
schedDeriv,
driver,
clSync,
pipelineDeriver,
attributesHandler,
finalizer,
statusTracker,
}
return driver
}
......@@ -36,13 +36,16 @@ type SyncStatus = eth.SyncStatus
const sealingDuration = time.Millisecond * 50
type Driver struct {
eventSys event.System
statusTracker SyncStatusTracker
*SyncDeriver
sched *StepSchedulingDeriver
synchronousEvents event.EmitterDrainer
emitter event.Emitter
drain func() error
// Requests to block the event loop for synchronous execution to avoid reading an inconsistent state
stateReq chan chan struct{}
......@@ -134,6 +137,7 @@ func (s *Driver) Start() error {
func (s *Driver) Close() error {
s.driverCancel()
s.wg.Wait()
s.eventSys.Stop()
s.asyncGossiper.Stop()
s.sequencerConductor.Close()
return nil
......@@ -189,7 +193,7 @@ func (s *Driver) eventLoop() {
// reqStep requests a derivation step nicely, with a delay if this is a reattempt, or not at all if we already scheduled a reattempt.
reqStep := func() {
s.Emit(StepReqEvent{})
s.emitter.Emit(StepReqEvent{})
}
// We call reqStep right away to finish syncing to the tip of the chain if we're behind.
......@@ -220,14 +224,16 @@ func (s *Driver) eventLoop() {
return
}
if s.drain != nil {
// While event-processing is synchronous we have to drain
// (i.e. process all queued-up events) before creating any new events.
if err := s.synchronousEvents.Drain(); err != nil {
if err := s.drain(); err != nil {
if s.driverCtx.Err() != nil {
return
}
s.log.Error("unexpected error from event-draining", "err", err)
}
}
// If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action.
// This may adjust at any time based on fork-choice changes or previous errors.
......@@ -311,12 +317,12 @@ func (s *Driver) eventLoop() {
s.Emitter.Emit(status.L1SafeEvent{L1Safe: newL1Safe})
// no step, justified L1 information does not do anything for L2 derivation or status
case newL1Finalized := <-s.l1FinalizedSig:
s.Emit(finality.FinalizeL1Event{FinalizedL1: newL1Finalized})
s.emitter.Emit(finality.FinalizeL1Event{FinalizedL1: newL1Finalized})
reqStep() // we may be able to mark more L2 data as finalized now
case <-s.sched.NextDelayedStep():
s.Emit(StepAttemptEvent{})
s.emitter.Emit(StepAttemptEvent{})
case <-s.sched.NextStep():
s.Emit(StepAttemptEvent{})
s.emitter.Emit(StepAttemptEvent{})
case respCh := <-s.stateReq:
respCh <- struct{}{}
case respCh := <-s.forceReset:
......@@ -366,7 +372,7 @@ func (s *Driver) eventLoop() {
// OnEvent handles broadcasted events.
// The Driver itself is a deriver to catch system-critical events.
// Other event-handling should be encapsulated into standalone derivers.
func (s *Driver) OnEvent(ev event.Event) {
func (s *Driver) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case rollup.CriticalErrorEvent:
s.Log.Error("Derivation process critical error", "err", x.Err)
......@@ -378,21 +384,17 @@ func (s *Driver) OnEvent(ev event.Event) {
logger.Error("Failed to shutdown driver on critical error", "err", err)
}
}()
return
return true
default:
return false
}
}
func (s *Driver) Emit(ev event.Event) {
s.synchronousEvents.Emit(ev)
}
type SyncDeriver struct {
// The derivation pipeline is reset whenever we reorg.
// The derivation pipeline determines the new l2Safe.
Derivation DerivationPipeline
Finalizer Finalizer
SafeHeadNotifs rollup.SafeHeadListener // notified when safe head is updated
CLSync CLSync
......@@ -418,7 +420,11 @@ type SyncDeriver struct {
Drain func() error
}
func (s *SyncDeriver) OnEvent(ev event.Event) {
func (s *SyncDeriver) AttachEmitter(em event.Emitter) {
s.Emitter = em
}
func (s *SyncDeriver) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case StepEvent:
s.onStepEvent()
......@@ -446,7 +452,10 @@ func (s *SyncDeriver) OnEvent(ev event.Event) {
s.Emitter.Emit(StepReqEvent{ResetBackoff: true})
case engine.SafeDerivedEvent:
s.onSafeDerivedBlock(x)
default:
return false
}
return true
}
func (s *SyncDeriver) onSafeDerivedBlock(x engine.SafeDerivedEvent) {
......
......@@ -72,17 +72,20 @@ type StepSchedulingDeriver struct {
emitter event.Emitter
}
func NewStepSchedulingDeriver(log log.Logger, emitter event.Emitter) *StepSchedulingDeriver {
func NewStepSchedulingDeriver(log log.Logger) *StepSchedulingDeriver {
return &StepSchedulingDeriver{
stepAttempts: 0,
bOffStrategy: retry.Exponential(),
stepReqCh: make(chan struct{}, 1),
delayedStepReq: nil,
log: log,
emitter: emitter,
}
}
func (s *StepSchedulingDeriver) AttachEmitter(em event.Emitter) {
s.emitter = em
}
// NextStep is a channel to await, and if triggered,
// the caller should emit a StepAttemptEvent to queue up a step while maintaining backoff.
func (s *StepSchedulingDeriver) NextStep() <-chan struct{} {
......@@ -96,7 +99,7 @@ func (s *StepSchedulingDeriver) NextDelayedStep() <-chan time.Time {
return s.delayedStepReq
}
func (s *StepSchedulingDeriver) OnEvent(ev event.Event) {
func (s *StepSchedulingDeriver) OnEvent(ev event.Event) bool {
step := func() {
s.delayedStepReq = nil
select {
......@@ -138,5 +141,8 @@ func (s *StepSchedulingDeriver) OnEvent(ev event.Event) {
s.emitter.Emit(StepEvent{})
case ResetStepBackoffEvent:
s.stepAttempts = 0
default:
return false
}
return true
}
......@@ -17,7 +17,8 @@ func TestStepSchedulingDeriver(t *testing.T) {
emitter := event.EmitterFunc(func(ev event.Event) {
queued = append(queued, ev)
})
sched := NewStepSchedulingDeriver(logger, emitter)
sched := NewStepSchedulingDeriver(logger)
sched.AttachEmitter(emitter)
require.Len(t, sched.NextStep(), 0, "start empty")
sched.OnEvent(StepReqEvent{})
require.Len(t, sched.NextStep(), 1, "take request")
......
......@@ -32,7 +32,7 @@ type EngineResetDeriver struct {
}
func NewEngineResetDeriver(ctx context.Context, log log.Logger, cfg *rollup.Config,
l1 sync.L1Chain, l2 sync.L2Chain, syncCfg *sync.Config, emitter event.Emitter) *EngineResetDeriver {
l1 sync.L1Chain, l2 sync.L2Chain, syncCfg *sync.Config) *EngineResetDeriver {
return &EngineResetDeriver{
ctx: ctx,
log: log,
......@@ -40,22 +40,28 @@ func NewEngineResetDeriver(ctx context.Context, log log.Logger, cfg *rollup.Conf
l1: l1,
l2: l2,
syncCfg: syncCfg,
emitter: emitter,
}
}
func (d *EngineResetDeriver) OnEvent(ev event.Event) {
func (d *EngineResetDeriver) AttachEmitter(em event.Emitter) {
d.emitter = em
}
func (d *EngineResetDeriver) OnEvent(ev event.Event) bool {
switch ev.(type) {
case ResetEngineRequestEvent:
result, err := sync.FindL2Heads(d.ctx, d.cfg, d.l1, d.l2, d.log, d.syncCfg)
if err != nil {
d.emitter.Emit(rollup.ResetEvent{Err: fmt.Errorf("failed to find the L2 Heads to start from: %w", err)})
return
return true
}
d.emitter.Emit(ForceEngineResetEvent{
Unsafe: result.Unsafe,
Safe: result.Safe,
Finalized: result.Finalized,
})
default:
return false
}
return true
}
......@@ -155,17 +155,20 @@ type EngDeriver struct {
var _ event.Deriver = (*EngDeriver)(nil)
func NewEngDeriver(log log.Logger, ctx context.Context, cfg *rollup.Config,
ec *EngineController, emitter event.Emitter) *EngDeriver {
ec *EngineController) *EngDeriver {
return &EngDeriver{
log: log,
cfg: cfg,
ec: ec,
ctx: ctx,
emitter: emitter,
}
}
func (d *EngDeriver) OnEvent(ev event.Event) {
func (d *EngDeriver) AttachEmitter(em event.Emitter) {
d.emitter = em
}
func (d *EngDeriver) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case TryBackupUnsafeReorgEvent:
// If we don't need to call FCU to restore unsafeHead using backupUnsafe, keep going b/c
......@@ -204,7 +207,7 @@ func (d *EngDeriver) OnEvent(ev event.Event) {
ref, err := derive.PayloadToBlockRef(d.cfg, x.Envelope.ExecutionPayload)
if err != nil {
d.log.Error("failed to decode L2 block ref from payload", "err", err)
return
return true
}
if err := d.ec.InsertUnsafePayload(d.ctx, x.Envelope, ref); err != nil {
d.log.Info("failed to insert payload", "ref", ref,
......@@ -259,16 +262,19 @@ func (d *EngDeriver) OnEvent(ev event.Event) {
case PromoteFinalizedEvent:
if x.Ref.Number < d.ec.Finalized().Number {
d.log.Error("Cannot rewind finality,", "ref", x.Ref, "finalized", d.ec.Finalized())
return
return true
}
if x.Ref.Number > d.ec.SafeL2Head().Number {
d.log.Error("Block must be safe before it can be finalized", "ref", x.Ref, "safe", d.ec.SafeL2Head())
return
return true
}
d.ec.SetFinalizedHead(x.Ref)
// Try to apply the forkchoice changes
d.emitter.Emit(TryUpdateEngineEvent{})
default:
return false
}
return true
}
// onForceNextSafeAttributes inserts the provided attributes, reorging away any conflicting unsafe chain.
......
......@@ -10,7 +10,7 @@ type Event interface {
}
type Deriver interface {
OnEvent(ev Event)
OnEvent(ev Event) bool
}
type Emitter interface {
......@@ -41,10 +41,12 @@ func (fn EmitterFunc) Emit(ev Event) {
// Technically this is a DeMux: single input to multi output.
type DeriverMux []Deriver
func (s *DeriverMux) OnEvent(ev Event) {
func (s *DeriverMux) OnEvent(ev Event) bool {
out := false
for _, d := range *s {
d.OnEvent(ev)
out = d.OnEvent(ev) || out
}
return out
}
var _ Deriver = (*DeriverMux)(nil)
......@@ -64,10 +66,10 @@ func (d NoopDeriver) OnEvent(ev Event) {}
// DeriverFunc implements the Deriver interface as a function,
// similar to how the std-lib http HandlerFunc implements a Handler.
// This can be used for small in-place derivers, test helpers, etc.
type DeriverFunc func(ev Event)
type DeriverFunc func(ev Event) bool
func (fn DeriverFunc) OnEvent(ev Event) {
fn(ev)
func (fn DeriverFunc) OnEvent(ev Event) bool {
return fn(ev)
}
type NoopEmitter struct{}
......
......@@ -15,14 +15,17 @@ func (ev TestEvent) String() string {
func TestDeriverMux_OnEvent(t *testing.T) {
result := ""
a := DeriverFunc(func(ev Event) {
a := DeriverFunc(func(ev Event) bool {
result += fmt.Sprintf("A:%s\n", ev)
return true
})
b := DeriverFunc(func(ev Event) {
b := DeriverFunc(func(ev Event) bool {
result += fmt.Sprintf("B:%s\n", ev)
return true
})
c := DeriverFunc(func(ev Event) {
c := DeriverFunc(func(ev Event) bool {
result += fmt.Sprintf("C:%s\n", ev)
return true
})
x := DeriverMux{}
......
package event
type Executable interface {
RunEvent(ev AnnotatedEvent)
}
// ExecutableFunc implements the Executable interface as a function,
// similar to how the std-lib http HandlerFunc implements a Handler.
// This can be used for small in-place executables, test helpers, etc.
type ExecutableFunc func(ev AnnotatedEvent)
func (fn ExecutableFunc) RunEvent(ev AnnotatedEvent) {
fn(ev)
}
type Executor interface {
Add(d Executable, opts *ExecutorOpts) (leaveExecutor func())
Enqueue(ev AnnotatedEvent) error
}
package event
import (
"context"
"fmt"
"io"
"slices"
"sync"
"sync/atomic"
)
// Don't queue up an endless number of events.
// At some point it's better to drop events and warn something is exploding the number of events.
const sanityEventLimit = 1000
type GlobalSyncExec struct {
eventsLock sync.Mutex
events []AnnotatedEvent
handles []*globalHandle
handlesLock sync.RWMutex
ctx context.Context
}
var _ Executor = (*GlobalSyncExec)(nil)
func NewGlobalSynchronous(ctx context.Context) *GlobalSyncExec {
return &GlobalSyncExec{ctx: ctx}
}
func (gs *GlobalSyncExec) Add(d Executable, _ *ExecutorOpts) (leaveExecutor func()) {
gs.handlesLock.Lock()
defer gs.handlesLock.Unlock()
h := &globalHandle{d: d}
h.g.Store(gs)
gs.handles = append(gs.handles, h)
return h.leave
}
func (gs *GlobalSyncExec) remove(h *globalHandle) {
gs.handlesLock.Lock()
defer gs.handlesLock.Unlock()
// Linear search to delete is fine,
// since we delete much less frequently than we process events with these.
for i, v := range gs.handles {
if v == h {
gs.handles = slices.Delete(gs.handles, i, i+1)
return
}
}
}
func (gs *GlobalSyncExec) Enqueue(ev AnnotatedEvent) error {
gs.eventsLock.Lock()
defer gs.eventsLock.Unlock()
// sanity limit, never queue too many events
if len(gs.events) >= sanityEventLimit {
return fmt.Errorf("something is very wrong, queued up too many events! Dropping event %q", ev)
}
gs.events = append(gs.events, ev)
return nil
}
func (gs *GlobalSyncExec) pop() AnnotatedEvent {
gs.eventsLock.Lock()
defer gs.eventsLock.Unlock()
if len(gs.events) == 0 {
return AnnotatedEvent{}
}
first := gs.events[0]
gs.events = gs.events[1:]
return first
}
func (gs *GlobalSyncExec) processEvent(ev AnnotatedEvent) {
gs.handlesLock.RLock() // read lock, to allow Drain() to be called during event processing.
defer gs.handlesLock.RUnlock()
for _, h := range gs.handles {
h.onEvent(ev)
}
}
func (gs *GlobalSyncExec) Drain() error {
for {
if gs.ctx.Err() != nil {
return gs.ctx.Err()
}
ev := gs.pop()
if ev.Event == nil {
return nil
}
// Note: event execution may call Drain(), that is allowed.
gs.processEvent(ev)
}
}
func (gs *GlobalSyncExec) DrainUntil(fn func(ev Event) bool, excl bool) error {
// In order of operation:
// stopExcl: stop draining, and leave the event.
// no stopExcl, and no event: EOF, exhausted events before condition hit.
// no stopExcl, and event: process event.
// stopIncl: stop draining, after having processed the event first.
iter := func() (ev AnnotatedEvent, stopIncl bool, stopExcl bool) {
gs.eventsLock.Lock()
defer gs.eventsLock.Unlock()
if len(gs.events) == 0 {
return AnnotatedEvent{}, false, false
}
ev = gs.events[0]
stop := fn(ev.Event)
if excl && stop {
ev = AnnotatedEvent{}
stopExcl = true
} else {
gs.events = gs.events[1:]
}
if stop {
stopIncl = true
}
return
}
for {
if gs.ctx.Err() != nil {
return gs.ctx.Err()
}
// includes popping of the event, so we can handle Drain() calls by onEvent() execution
ev, stopIncl, stopExcl := iter()
if stopExcl {
return nil
}
if ev.Event == nil {
return io.EOF
}
gs.processEvent(ev)
if stopIncl {
return nil
}
}
}
type globalHandle struct {
g atomic.Pointer[GlobalSyncExec]
d Executable
}
func (gh *globalHandle) onEvent(ev AnnotatedEvent) {
if gh.g.Load() == nil { // don't process more events while we are being removed
return
}
gh.d.RunEvent(ev)
}
func (gh *globalHandle) leave() {
if old := gh.g.Swap(nil); old != nil {
old.remove(gh)
}
}
package event
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
func TestGlobalExecutor(t *testing.T) {
count := 0
ex := ExecutableFunc(func(ev AnnotatedEvent) {
count += 1
})
exec := NewGlobalSynchronous(context.Background())
leave := exec.Add(ex, nil)
require.NoError(t, exec.Drain(), "can drain, even if empty")
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: TestEvent{}}))
require.Equal(t, 0, count, "no processing yet, queued event")
require.NoError(t, exec.Drain())
require.Equal(t, 1, count, "processed event")
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: TestEvent{}}))
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: TestEvent{}}))
require.Equal(t, 1, count, "no processing yet, queued events")
require.NoError(t, exec.Drain())
require.Equal(t, 3, count, "processed events")
leave()
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: TestEvent{}}))
require.NotEqual(t, exec.Drain(), "after deriver leaves the executor can still drain events")
require.Equal(t, 3, count, "didn't process event after trigger close")
}
func TestQueueSanityLimit(t *testing.T) {
count := 0
ex := ExecutableFunc(func(ev AnnotatedEvent) {
count += 1
})
exec := NewGlobalSynchronous(context.Background())
leave := exec.Add(ex, nil)
defer leave()
// emit 1 too many events
for i := 0; i < sanityEventLimit; i++ {
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: TestEvent{}}))
}
require.ErrorContains(t, exec.Enqueue(AnnotatedEvent{Event: TestEvent{}}), "too many events")
require.NoError(t, exec.Drain())
require.Equal(t, sanityEventLimit, count, "processed all non-dropped events")
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: TestEvent{}}))
require.NoError(t, exec.Drain())
require.Equal(t, sanityEventLimit+1, count, "back to normal after drain")
}
type CyclicEvent struct {
Count int
}
func (ev CyclicEvent) String() string {
return "cyclic-event"
}
func TestSynchronousCyclic(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
var exec *GlobalSyncExec
result := false
ex := ExecutableFunc(func(ev AnnotatedEvent) {
logger.Info("received event", "event", ev)
switch x := ev.Event.(type) {
case CyclicEvent:
if x.Count < 10 {
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: CyclicEvent{Count: x.Count + 1}}))
} else {
result = true
}
}
})
exec = NewGlobalSynchronous(context.Background())
leave := exec.Add(ex, nil)
defer leave()
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: CyclicEvent{Count: 0}}))
require.NoError(t, exec.Drain())
require.True(t, result, "expecting event processing to fully recurse")
}
func TestDrainCancel(t *testing.T) {
count := 0
ctx, cancel := context.WithCancel(context.Background())
ex := ExecutableFunc(func(ev AnnotatedEvent) {
count += 1
cancel()
})
exec := NewGlobalSynchronous(ctx)
leave := exec.Add(ex, nil)
defer leave()
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: TestEvent{}}))
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: TestEvent{}}))
drainErr := exec.Drain()
require.NotNil(t, ctx.Err())
require.ErrorIs(t, ctx.Err(), drainErr)
require.Equal(t, 1, count, "drain must be canceled before next event is processed")
}
func TestDrainUntilCancel(t *testing.T) {
count := 0
ctx, cancel := context.WithCancel(context.Background())
ex := ExecutableFunc(func(ev AnnotatedEvent) {
count += 1
if _, ok := ev.Event.(FooEvent); ok {
cancel()
}
})
exec := NewGlobalSynchronous(ctx)
leave := exec.Add(ex, nil)
defer leave()
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: TestEvent{}}))
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: FooEvent{}}))
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: TestEvent{}}))
drainErr := exec.DrainUntil(Is[FooEvent], false)
require.NoError(t, drainErr, "drained right until context started to matter")
require.Equal(t, 2, count, "drain must be stopped at Foo (incl)")
drainErr = exec.DrainUntil(Is[TestEvent], false)
require.NotNil(t, ctx.Err())
require.NotNil(t, drainErr)
require.ErrorIs(t, ctx.Err(), drainErr)
require.Equal(t, 2, count, "drain must be canceled, not processed next TestEvent")
}
func TestDrainUntilExcl(t *testing.T) {
count := 0
ex := ExecutableFunc(func(ev AnnotatedEvent) {
count += 1
})
exec := NewGlobalSynchronous(context.Background())
leave := exec.Add(ex, nil)
defer leave()
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: TestEvent{}}))
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: FooEvent{}}))
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: TestEvent{}}))
require.NoError(t, exec.Enqueue(AnnotatedEvent{Event: TestEvent{}}))
require.NoError(t, exec.DrainUntil(Is[FooEvent], true))
require.Equal(t, 1, count, "Foo must not be processed yet")
require.NoError(t, exec.DrainUntil(Is[FooEvent], true))
require.Equal(t, 1, count, "Foo still not processed, excl on first element")
require.NoError(t, exec.DrainUntil(Is[FooEvent], false))
require.Equal(t, 2, count, "Foo is processed, remainder is not, stop is inclusive now")
require.NoError(t, exec.Drain())
require.Equal(t, 4, count, "Done")
}
......@@ -3,6 +3,7 @@ package event
type Metrics interface {
RecordEmittedEvent(name string)
RecordProcessedEvent(name string)
RecordEventsRateLimited()
}
type NoopMetrics struct {
......@@ -12,4 +13,6 @@ func (n NoopMetrics) RecordEmittedEvent(name string) {}
func (n NoopMetrics) RecordProcessedEvent(name string) {}
func (n NoopMetrics) RecordEventsRateLimited() {}
var _ Metrics = NoopMetrics{}
package event
import "golang.org/x/time/rate"
type ExecutorOpts struct {
Capacity int // If there is a local buffer capacity
}
type EmitterOpts struct {
Limiting bool
Rate rate.Limit
Burst int
OnLimited func()
}
// RegisterOpts represents the set of parameters to configure a
// new deriver/emitter with that is registered with an event System.
// These options may be reused for multiple registrations.
type RegisterOpts struct {
Executor ExecutorOpts
Emitter EmitterOpts
}
// 200 events may be buffered per deriver before back-pressure has to kick in
const eventsBuffer = 200
// 10,000 events per second is plenty.
// If we are going through more events, the driver needs to breathe, and warn the user of a potential issue.
const eventsLimit = rate.Limit(10_000)
// 500 events of burst: the maximum amount of events to eat up
// past the rate limit before the rate limit becomes applicable.
const eventsBurst = 500
func DefaultRegisterOpts() *RegisterOpts {
return &RegisterOpts{
Executor: ExecutorOpts{
Capacity: eventsBuffer,
},
Emitter: EmitterOpts{
Limiting: true,
Rate: eventsLimit,
Burst: eventsBurst,
OnLimited: nil,
},
}
}
package event
import (
"context"
"io"
"sync"
"github.com/ethereum/go-ethereum/log"
)
// Don't queue up an endless number of events.
// At some point it's better to drop events and warn something is exploding the number of events.
const sanityEventLimit = 1000
// Queue is a event.Emitter that a event.Deriver can emit events to.
// The events will be queued up, and can then be executed synchronously by calling the Drain function,
// which will apply all events to the root Deriver.
// New events may be queued up while events are being processed by the root rollup.Deriver.
type Queue struct {
// The lock is no-op in FP execution, if running in synchronous FP-VM.
// This lock ensures that all emitted events are merged together correctly,
// if this util is used in a concurrent context.
evLock sync.Mutex
events []Event
log log.Logger
ctx context.Context
root Deriver
metrics Metrics
}
var _ EmitterDrainer = (*Queue)(nil)
func NewQueue(log log.Logger, ctx context.Context, root Deriver, metrics Metrics) *Queue {
return &Queue{
log: log,
ctx: ctx,
root: root,
metrics: metrics,
}
}
func (s *Queue) Emit(event Event) {
s.evLock.Lock()
defer s.evLock.Unlock()
s.log.Debug("Emitting event", "event", event)
s.metrics.RecordEmittedEvent(event.String())
if s.ctx.Err() != nil {
s.log.Warn("Ignoring emitted event during shutdown", "event", event)
return
}
// sanity limit, never queue too many events
if len(s.events) >= sanityEventLimit {
s.log.Error("Something is very wrong, queued up too many events! Dropping event", "ev", event)
return
}
s.events = append(s.events, event)
}
func (s *Queue) Drain() error {
for {
if s.ctx.Err() != nil {
return s.ctx.Err()
}
if len(s.events) == 0 {
return nil
}
s.evLock.Lock()
first := s.events[0]
s.events = s.events[1:]
s.evLock.Unlock()
s.log.Debug("Processing event", "event", first)
s.root.OnEvent(first)
s.metrics.RecordProcessedEvent(first.String())
}
}
func (s *Queue) DrainUntil(fn func(ev Event) bool, excl bool) error {
for {
if s.ctx.Err() != nil {
return s.ctx.Err()
}
if len(s.events) == 0 {
return io.EOF
}
s.evLock.Lock()
first := s.events[0]
stop := fn(first)
if excl && stop {
s.evLock.Unlock()
return nil
}
s.events = s.events[1:]
s.evLock.Unlock()
s.log.Debug("Processing event", "event", first)
s.root.OnEvent(first)
s.metrics.RecordProcessedEvent(first.String())
if stop {
return nil
}
}
}
var _ Emitter = (*Queue)(nil)
package event
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
func TestQueue(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
ctx, cancel := context.WithCancel(context.Background())
count := 0
deriver := DeriverFunc(func(ev Event) {
count += 1
})
syncEv := NewQueue(logger, ctx, deriver, NoopMetrics{})
require.NoError(t, syncEv.Drain(), "can drain, even if empty")
syncEv.Emit(TestEvent{})
require.Equal(t, 0, count, "no processing yet, queued event")
require.NoError(t, syncEv.Drain())
require.Equal(t, 1, count, "processed event")
syncEv.Emit(TestEvent{})
syncEv.Emit(TestEvent{})
require.Equal(t, 1, count, "no processing yet, queued events")
require.NoError(t, syncEv.Drain())
require.Equal(t, 3, count, "processed events")
cancel()
syncEv.Emit(TestEvent{})
require.Equal(t, ctx.Err(), syncEv.Drain(), "no draining after close")
require.Equal(t, 3, count, "didn't process event after trigger close")
}
func TestQueueSanityLimit(t *testing.T) {
logger := testlog.Logger(t, log.LevelCrit) // expecting error log of hitting sanity limit
count := 0
deriver := DeriverFunc(func(ev Event) {
count += 1
})
syncEv := NewQueue(logger, context.Background(), deriver, NoopMetrics{})
// emit 1 too many events
for i := 0; i < sanityEventLimit+1; i++ {
syncEv.Emit(TestEvent{})
}
require.NoError(t, syncEv.Drain())
require.Equal(t, sanityEventLimit, count, "processed all non-dropped events")
syncEv.Emit(TestEvent{})
require.NoError(t, syncEv.Drain())
require.Equal(t, sanityEventLimit+1, count, "back to normal after drain")
}
type CyclicEvent struct {
Count int
}
func (ev CyclicEvent) String() string {
return "cyclic-event"
}
func TestSynchronousCyclic(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
var emitter Emitter
result := false
deriver := DeriverFunc(func(ev Event) {
logger.Info("received event", "event", ev)
switch x := ev.(type) {
case CyclicEvent:
if x.Count < 10 {
emitter.Emit(CyclicEvent{Count: x.Count + 1})
} else {
result = true
}
}
})
syncEv := NewQueue(logger, context.Background(), deriver, NoopMetrics{})
emitter = syncEv
syncEv.Emit(CyclicEvent{Count: 0})
require.NoError(t, syncEv.Drain())
require.True(t, result, "expecting event processing to fully recurse")
}
package event
import (
"context"
"fmt"
"slices"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/log"
)
type System interface {
// Register registers a named event-emitter, optionally processing events itself:
// deriver may be nil, not all registrants have to process events.
// A non-nil deriver may implement AttachEmitter to automatically attach the Emitter to it,
// before the deriver itself becomes executable.
Register(name string, deriver Deriver, opts *RegisterOpts) Emitter
// Unregister removes a named emitter,
// also removing it from the set of events-receiving derivers (if registered with non-nil deriver).
Unregister(name string) (old Emitter)
// AddTracer registers a tracer to capture all event deriver/emitter work. It runs until RemoveTracer is called.
// Duplicate tracers are allowed.
AddTracer(t Tracer)
// RemoveTracer removes a tracer. This is a no-op if the tracer was not previously added.
// It will remove all added duplicates of the tracer.
RemoveTracer(t Tracer)
// Stop shuts down the System by un-registering all derivers/emitters.
Stop()
}
type AttachEmitter interface {
AttachEmitter(em Emitter)
}
type AnnotatedEvent struct {
Event Event
EmitContext uint64 // uniquely identifies the emission of the event, useful for debugging and creating diagrams
}
// systemActor is a deriver and/or emitter, registered in System with a name.
// If deriving, the actor is added as Executable to the Executor of the System.
type systemActor struct {
name string
sys *Sys
// To manage the execution peripherals, like rate-limiting, of this deriver
ctx context.Context
cancel context.CancelFunc
deriv Deriver
leaveExecutor func()
// 0 if event does not originate from Deriver-handling of another event
currentEvent uint64
}
// Emit is called by the end-user
func (r *systemActor) Emit(ev Event) {
if r.ctx.Err() != nil {
return
}
r.sys.emit(r.name, r.currentEvent, ev)
}
// RunEvent is called by the events executor.
// While different things may execute in parallel, only one event is executed per entry at a time.
func (r *systemActor) RunEvent(ev AnnotatedEvent) {
if r.deriv == nil {
return
}
if r.ctx.Err() != nil {
return
}
prev := r.currentEvent
start := time.Now()
r.currentEvent = r.sys.recordDerivStart(r.name, ev, start)
effect := r.deriv.OnEvent(ev.Event)
elapsed := time.Since(start)
r.sys.recordDerivEnd(r.name, ev, r.currentEvent, start, elapsed, effect)
r.currentEvent = prev
}
// Sys is the canonical implementation of System.
type Sys struct {
regs map[string]*systemActor
regsLock sync.Mutex
log log.Logger
executor Executor
// used to generate a unique id for each event deriver processing call.
derivContext atomic.Uint64
// used to generate a unique id for each event-emission.
emitContext atomic.Uint64
tracers []Tracer
tracersLock sync.RWMutex
}
func NewSystem(log log.Logger, ex Executor) *Sys {
return &Sys{
regs: make(map[string]*systemActor),
executor: ex,
log: log,
}
}
func (s *Sys) Register(name string, deriver Deriver, opts *RegisterOpts) Emitter {
s.regsLock.Lock()
defer s.regsLock.Unlock()
if _, ok := s.regs[name]; ok {
panic(fmt.Errorf("a deriver/emitter with name %q already exists", name))
}
ctx, cancel := context.WithCancel(context.Background())
r := &systemActor{
name: name,
deriv: deriver,
sys: s,
ctx: ctx,
cancel: cancel,
}
s.regs[name] = r
var em Emitter = r
if opts.Emitter.Limiting {
limitedCallback := opts.Emitter.OnLimited
em = NewLimiter(ctx, r, opts.Emitter.Rate, opts.Emitter.Burst, func() {
r.sys.recordRateLimited(name, r.currentEvent)
if limitedCallback != nil {
limitedCallback()
}
})
}
// If it can emit, attach an emitter to it
if attachTo, ok := deriver.(AttachEmitter); ok {
attachTo.AttachEmitter(em)
}
// If it can derive, add it to the executor (and only after attaching the emitter)
if deriver != nil {
r.leaveExecutor = s.executor.Add(r, &opts.Executor)
}
return em
}
func (s *Sys) Unregister(name string) (previous Emitter) {
s.regsLock.Lock()
defer s.regsLock.Unlock()
return s.unregister(name)
}
func (s *Sys) unregister(name string) (previous Emitter) {
r, ok := s.regs[name]
if !ok {
return nil
}
r.cancel()
// if this was registered as deriver with the executor, then leave the executor
if r.leaveExecutor != nil {
r.leaveExecutor()
}
delete(s.regs, name)
return r
}
// Stop shuts down the system
// by unregistering all emitters/derivers,
// freeing up executor resources.
func (s *Sys) Stop() {
s.regsLock.Lock()
defer s.regsLock.Unlock()
for _, r := range s.regs {
s.unregister(r.name)
}
}
func (s *Sys) AddTracer(t Tracer) {
s.tracersLock.Lock()
defer s.tracersLock.Unlock()
s.tracers = append(s.tracers, t)
}
func (s *Sys) RemoveTracer(t Tracer) {
s.tracersLock.Lock()
defer s.tracersLock.Unlock()
// We are not removing tracers often enough to optimize the deletion;
// instead we prefer fast and simple tracer iteration during regular operation.
s.tracers = slices.DeleteFunc(s.tracers, func(v Tracer) bool {
return v == t
})
}
// recordDeriv records that the deriver by name [deriv] is processing event [ev].
// This returns a unique integer (during lifetime of Sys), usable as ID to reference processing.
func (s *Sys) recordDerivStart(name string, ev AnnotatedEvent, startTime time.Time) uint64 {
derivContext := s.derivContext.Add(1)
s.tracersLock.RLock()
defer s.tracersLock.RUnlock()
for _, t := range s.tracers {
t.OnDeriveStart(name, ev, derivContext, startTime)
}
return derivContext
}
func (s *Sys) recordDerivEnd(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time, duration time.Duration, effect bool) {
s.tracersLock.RLock()
defer s.tracersLock.RUnlock()
for _, t := range s.tracers {
t.OnDeriveEnd(name, ev, derivContext, startTime, duration, effect)
}
}
func (s *Sys) recordRateLimited(name string, derivContext uint64) {
s.tracersLock.RLock()
defer s.tracersLock.RUnlock()
s.log.Warn("Event-system emitter component was rate-limited", "emitter", name)
for _, t := range s.tracers {
t.OnRateLimited(name, derivContext)
}
}
func (s *Sys) recordEmit(name string, ev AnnotatedEvent, derivContext uint64, emitTime time.Time) {
s.tracersLock.RLock()
defer s.tracersLock.RUnlock()
for _, t := range s.tracers {
t.OnEmit(name, ev, derivContext, emitTime)
}
}
// emit an event [ev] during the derivation of another event, referenced by derivContext.
// If the event was emitted not as part of deriver event execution, then the derivContext is 0.
// The name of the emitter is provided to further contextualize the event.
func (s *Sys) emit(name string, derivContext uint64, ev Event) {
emitContext := s.emitContext.Add(1)
annotated := AnnotatedEvent{Event: ev, EmitContext: emitContext}
emitTime := time.Now()
s.recordEmit(name, annotated, derivContext, emitTime)
err := s.executor.Enqueue(annotated)
if err != nil {
s.log.Error("Failed to enqueue event", "emitter", name, "event", ev, "context", derivContext)
return
}
}
package event
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
func TestSysTracing(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
ex := NewGlobalSynchronous(context.Background())
sys := NewSystem(logger, ex)
count := 0
foo := DeriverFunc(func(ev Event) bool {
switch ev.(type) {
case TestEvent:
count += 1
return true
}
return false
})
lgr, logs := testlog.CaptureLogger(t, log.LevelDebug)
logTracer := NewLogTracer(lgr, log.LevelDebug)
sys.AddTracer(logTracer)
em := sys.Register("foo", foo, DefaultRegisterOpts())
em.Emit(TestEvent{})
require.Equal(t, 0, count, "no event processing before synchronous executor explicitly drains")
require.NoError(t, ex.Drain())
require.Equal(t, 1, count)
hasDebugLevel := testlog.NewLevelFilter(log.LevelDebug)
require.NotNil(t, logs.FindLog(hasDebugLevel,
testlog.NewMessageContainsFilter("Emitting event")))
require.NotNil(t, logs.FindLog(hasDebugLevel,
testlog.NewMessageContainsFilter("Processing event")))
require.NotNil(t, logs.FindLog(hasDebugLevel,
testlog.NewMessageContainsFilter("Processed event")))
em.Emit(FooEvent{})
require.NoError(t, ex.Drain())
require.Equal(t, 1, count, "foo does not count")
em.Emit(TestEvent{})
require.NoError(t, ex.Drain())
require.Equal(t, 2, count)
logs.Clear()
sys.RemoveTracer(logTracer)
em.Emit(TestEvent{})
require.NoError(t, ex.Drain())
require.Equal(t, 3, count)
require.Equal(t, 0, len(*logs.Logs), "no logs when tracer is not active anymore")
}
func TestSystemBroadcast(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
ex := NewGlobalSynchronous(context.Background())
sys := NewSystem(logger, ex)
fooCount := 0
foo := DeriverFunc(func(ev Event) bool {
switch ev.(type) {
case TestEvent:
fooCount += 1
case FooEvent:
fooCount += 1
default:
return false
}
return true
})
barCount := 0
bar := DeriverFunc(func(ev Event) bool {
switch ev.(type) {
case TestEvent:
barCount += 1
case BarEvent:
barCount += 1
default:
return false
}
return true
})
fooEm := sys.Register("foo", foo, DefaultRegisterOpts())
fooEm.Emit(TestEvent{})
barEm := sys.Register("bar", bar, DefaultRegisterOpts())
barEm.Emit(TestEvent{})
// events are broadcast to every deriver, regardless who sends them
require.NoError(t, ex.Drain())
require.Equal(t, 2, fooCount)
require.Equal(t, 2, barCount)
// emit from bar, process in foo
barEm.Emit(FooEvent{})
require.NoError(t, ex.Drain())
require.Equal(t, 3, fooCount)
require.Equal(t, 2, barCount)
// emit from foo, process in bar
fooEm.Emit(BarEvent{})
require.NoError(t, ex.Drain())
require.Equal(t, 3, fooCount)
require.Equal(t, 3, barCount)
}
package event
import (
"time"
)
type Tracer interface {
OnDeriveStart(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time)
OnDeriveEnd(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time, duration time.Duration, effect bool)
OnRateLimited(name string, derivContext uint64)
OnEmit(name string, ev AnnotatedEvent, derivContext uint64, emitTime time.Time)
}
package event
import (
"time"
"golang.org/x/exp/slog"
"github.com/ethereum/go-ethereum/log"
)
type LogTracer struct {
log log.Logger
lvl slog.Level
}
var _ Tracer = (*LogTracer)(nil)
func NewLogTracer(log log.Logger, lvl slog.Level) *LogTracer {
return &LogTracer{
log: log,
lvl: lvl,
}
}
func (lt *LogTracer) OnDeriveStart(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time) {
lt.log.Log(lt.lvl, "Processing event", "deriver", name, "event", ev.Event,
"emit_context", ev.EmitContext, "deriv_context", derivContext)
}
func (lt *LogTracer) OnDeriveEnd(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time, duration time.Duration, effect bool) {
lt.log.Log(lt.lvl, "Processed event", "deriver", name, "duration", duration,
"event", ev.Event, "emit_context", ev.EmitContext, "deriv_context", derivContext, "effect", effect)
}
func (lt *LogTracer) OnRateLimited(name string, derivContext uint64) {
lt.log.Log(lt.lvl, "Rate-limited event-emission", "emitter", name, "context", derivContext)
}
func (lt *LogTracer) OnEmit(name string, ev AnnotatedEvent, derivContext uint64, emitTime time.Time) {
lt.log.Log(lt.lvl, "Emitting event", "emitter", name, "event", ev.Event, "emit_context", ev.EmitContext, "deriv_context", derivContext)
}
package event
import "time"
type MetricsTracer struct {
metrics Metrics
}
var _ Tracer = (*MetricsTracer)(nil)
func (mt *MetricsTracer) OnDeriveStart(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time) {
}
func (mt *MetricsTracer) OnDeriveEnd(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time, duration time.Duration, effect bool) {
if !effect { // don't count events that were just pass-through and not of any effect
return
}
mt.metrics.RecordProcessedEvent(ev.Event.String())
}
func (mt *MetricsTracer) OnRateLimited(name string, derivContext uint64) {
mt.metrics.RecordEventsRateLimited()
}
func (mt *MetricsTracer) OnEmit(name string, ev AnnotatedEvent, derivContext uint64, emitTime time.Time) {
mt.metrics.RecordEmittedEvent(ev.Event.String())
}
......@@ -94,7 +94,7 @@ type Finalizer struct {
l1Fetcher FinalizerL1Interface
}
func NewFinalizer(ctx context.Context, log log.Logger, cfg *rollup.Config, l1Fetcher FinalizerL1Interface, emitter event.Emitter) *Finalizer {
func NewFinalizer(ctx context.Context, log log.Logger, cfg *rollup.Config, l1Fetcher FinalizerL1Interface) *Finalizer {
lookback := calcFinalityLookback(cfg)
return &Finalizer{
ctx: ctx,
......@@ -104,10 +104,13 @@ func NewFinalizer(ctx context.Context, log log.Logger, cfg *rollup.Config, l1Fet
finalityData: make([]FinalityData, 0, lookback),
finalityLookback: lookback,
l1Fetcher: l1Fetcher,
emitter: emitter,
}
}
func (fi *Finalizer) AttachEmitter(em event.Emitter) {
fi.emitter = em
}
// FinalizedL1 identifies the L1 chain (incl.) that included and/or produced all the finalized L2 blocks.
// This may return a zeroed ID if no finalization signals have been seen yet.
func (fi *Finalizer) FinalizedL1() (out eth.L1BlockRef) {
......@@ -131,7 +134,7 @@ func (ev TryFinalizeEvent) String() string {
return "try-finalize"
}
func (fi *Finalizer) OnEvent(ev event.Event) {
func (fi *Finalizer) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case FinalizeL1Event:
fi.onL1Finalized(x.FinalizedL1)
......@@ -145,7 +148,10 @@ func (fi *Finalizer) OnEvent(ev event.Event) {
fi.tryFinalize()
case engine.ForkchoiceUpdateEvent:
fi.lastFinalizedL2 = x.FinalizedL2Head
default:
return false
}
return true
}
// onL1Finalized applies a L1 finality signal
......
......@@ -191,7 +191,8 @@ func TestEngineQueue_Finalize(t *testing.T) {
l1F.ExpectL1BlockRefByNumber(refD.Number, refD, nil)
emitter := &testutils.MockEmitter{}
fi := NewFinalizer(context.Background(), logger, &rollup.Config{}, l1F, emitter)
fi := NewFinalizer(context.Background(), logger, &rollup.Config{}, l1F)
fi.AttachEmitter(emitter)
// now say C1 was included in D and became the new safe head
fi.OnEvent(engine.SafeDerivedEvent{Safe: refC1, DerivedFrom: refD})
......@@ -225,7 +226,8 @@ func TestEngineQueue_Finalize(t *testing.T) {
l1F.ExpectL1BlockRefByNumber(refD.Number, refD, nil) // to check what was derived from (same in this case)
emitter := &testutils.MockEmitter{}
fi := NewFinalizer(context.Background(), logger, &rollup.Config{}, l1F, emitter)
fi := NewFinalizer(context.Background(), logger, &rollup.Config{}, l1F)
fi.AttachEmitter(emitter)
// now say C1 was included in D and became the new safe head
fi.OnEvent(engine.SafeDerivedEvent{Safe: refC1, DerivedFrom: refD})
......@@ -264,7 +266,8 @@ func TestEngineQueue_Finalize(t *testing.T) {
defer l1F.AssertExpectations(t)
emitter := &testutils.MockEmitter{}
fi := NewFinalizer(context.Background(), logger, &rollup.Config{}, l1F, emitter)
fi := NewFinalizer(context.Background(), logger, &rollup.Config{}, l1F)
fi.AttachEmitter(emitter)
fi.OnEvent(engine.SafeDerivedEvent{Safe: refC1, DerivedFrom: refD})
fi.OnEvent(derive.DeriverIdleEvent{Origin: refD})
......@@ -349,7 +352,8 @@ func TestEngineQueue_Finalize(t *testing.T) {
l1F.ExpectL1BlockRefByNumber(refC.Number, refC, nil) // check what we derived the L2 block from
emitter := &testutils.MockEmitter{}
fi := NewFinalizer(context.Background(), logger, &rollup.Config{}, l1F, emitter)
fi := NewFinalizer(context.Background(), logger, &rollup.Config{}, l1F)
fi.AttachEmitter(emitter)
// now say B1 was included in C and became the new safe head
fi.OnEvent(engine.SafeDerivedEvent{Safe: refB1, DerivedFrom: refC})
......@@ -385,7 +389,8 @@ func TestEngineQueue_Finalize(t *testing.T) {
l1F.ExpectL1BlockRefByNumber(refE.Number, refE, nil) // post-reorg
emitter := &testutils.MockEmitter{}
fi := NewFinalizer(context.Background(), logger, &rollup.Config{}, l1F, emitter)
fi := NewFinalizer(context.Background(), logger, &rollup.Config{}, l1F)
fi.AttachEmitter(emitter)
// now say B1 was included in C and became the new safe head
fi.OnEvent(engine.SafeDerivedEvent{Safe: refB1, DerivedFrom: refC})
......
......@@ -28,10 +28,10 @@ type PlasmaFinalizer struct {
}
func NewPlasmaFinalizer(ctx context.Context, log log.Logger, cfg *rollup.Config,
l1Fetcher FinalizerL1Interface, emitter event.Emitter,
l1Fetcher FinalizerL1Interface,
backend PlasmaBackend) *PlasmaFinalizer {
inner := NewFinalizer(ctx, log, cfg, l1Fetcher, emitter)
inner := NewFinalizer(ctx, log, cfg, l1Fetcher)
// In alt-da mode, the finalization signal is proxied through the plasma manager.
// Finality signal will come from the DA contract or L1 finality whichever is last.
......@@ -46,11 +46,12 @@ func NewPlasmaFinalizer(ctx context.Context, log log.Logger, cfg *rollup.Config,
}
}
func (fi *PlasmaFinalizer) OnEvent(ev event.Event) {
func (fi *PlasmaFinalizer) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case FinalizeL1Event:
fi.backend.Finalize(x.FinalizedL1)
return true
default:
fi.Finalizer.OnEvent(ev)
return fi.Finalizer.OnEvent(ev)
}
}
......@@ -97,7 +97,8 @@ func TestPlasmaFinalityData(t *testing.T) {
}
emitter := &testutils.MockEmitter{}
fi := NewPlasmaFinalizer(context.Background(), logger, cfg, l1F, emitter, plasmaBackend)
fi := NewPlasmaFinalizer(context.Background(), logger, cfg, l1F, plasmaBackend)
fi.AttachEmitter(emitter)
require.NotNil(t, plasmaBackend.forwardTo, "plasma backend must have access to underlying standard finalizer")
require.Equal(t, expFinalityLookback, cap(fi.finalityData))
......
......@@ -57,7 +57,7 @@ func NewStatusTracker(log log.Logger, metrics Metrics) *StatusTracker {
return st
}
func (st *StatusTracker) OnEvent(ev event.Event) {
func (st *StatusTracker) OnEvent(ev event.Event) bool {
st.mu.Lock()
defer st.mu.Unlock()
......@@ -110,7 +110,7 @@ func (st *StatusTracker) OnEvent(ev event.Event) {
st.data.SafeL2 = x.Safe
st.data.FinalizedL2 = x.Finalized
default: // other events do not affect the sync status
return
return false
}
// If anything changes, then copy the state to the published SyncStatus
......@@ -121,6 +121,7 @@ func (st *StatusTracker) OnEvent(ev event.Event) {
published = st.data
st.published.Store(&published)
}
return true
}
// SyncStatus is thread safe, and reads the latest view of L1 and L2 block labels
......
......@@ -37,12 +37,15 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher,
}
pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l1BlobsSource, plasma.Disabled, l2Source, metrics.NoopMetrics)
pipelineDeriver := derive.NewPipelineDeriver(context.Background(), pipeline, d)
pipelineDeriver := derive.NewPipelineDeriver(context.Background(), pipeline)
pipelineDeriver.AttachEmitter(d)
ec := engine.NewEngineController(l2Source, logger, metrics.NoopMetrics, cfg, &sync.Config{SyncMode: sync.CLSync}, d)
engineDeriv := engine.NewEngDeriver(logger, context.Background(), cfg, ec, d)
engineDeriv := engine.NewEngDeriver(logger, context.Background(), cfg, ec)
engineDeriv.AttachEmitter(d)
syncCfg := &sync.Config{SyncMode: sync.CLSync}
engResetDeriv := engine.NewEngineResetDeriver(context.Background(), logger, cfg, l1Source, l2Source, syncCfg, d)
engResetDeriv := engine.NewEngineResetDeriver(context.Background(), logger, cfg, l1Source, l2Source, syncCfg)
engResetDeriv.AttachEmitter(d)
prog := &ProgramDeriver{
logger: logger,
......
......@@ -33,8 +33,9 @@ func TestDriver(t *testing.T) {
logger: logger,
end: end,
}
d.deriver = event.DeriverFunc(func(ev event.Event) {
d.deriver = event.DeriverFunc(func(ev event.Event) bool {
onEvent(d, end, ev)
return true
})
return d
}
......
......@@ -33,7 +33,7 @@ func (d *ProgramDeriver) Result() error {
return d.result
}
func (d *ProgramDeriver) OnEvent(ev event.Event) {
func (d *ProgramDeriver) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case engine.EngineResetConfirmedEvent:
d.Emitter.Emit(derive.ConfirmPipelineResetEvent{})
......@@ -84,6 +84,7 @@ func (d *ProgramDeriver) OnEvent(ev event.Event) {
// Other events can be ignored safely.
// They are broadcast, but only consumed by the other derivers,
// or do not affect the state-transition.
return
return false
}
return true
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment