Commit 21ea079d authored by protolambda's avatar protolambda Committed by GitHub

op-node: fix driver step hot loop, improve events and add utils (#11040)

* op-node: fix driver step hot loop, improvement events and add utils

* op-node: fix verifier event processing liveness

* op-node: add events_rate_limited metric

* op-node: bump events rate limits
parent 6125faed
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -109,7 +110,7 @@ func (s *L2Sequencer) ActL2EndBlock(t Testing) { ...@@ -109,7 +110,7 @@ func (s *L2Sequencer) ActL2EndBlock(t Testing) {
// This will ensure the sync-status and such reflect the latest changes. // This will ensure the sync-status and such reflect the latest changes.
s.synchronousEvents.Emit(engine.TryUpdateEngineEvent{}) s.synchronousEvents.Emit(engine.TryUpdateEngineEvent{})
s.synchronousEvents.Emit(engine.ForkchoiceRequestEvent{}) s.synchronousEvents.Emit(engine.ForkchoiceRequestEvent{})
require.NoError(t, s.synchronousEvents.DrainUntil(func(ev rollup.Event) bool { require.NoError(t, s.synchronousEvents.DrainUntil(func(ev event.Event) bool {
x, ok := ev.(engine.ForkchoiceUpdateEvent) x, ok := ev.(engine.ForkchoiceUpdateEvent)
return ok && x.UnsafeL2Head == s.engine.UnsafeL2Head() return ok && x.UnsafeL2Head == s.engine.UnsafeL2Head()
}, false)) }, false))
......
...@@ -6,11 +6,13 @@ import ( ...@@ -6,11 +6,13 @@ import (
"fmt" "fmt"
"io" "io"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
gnode "github.com/ethereum/go-ethereum/node" gnode "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/node" "github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -19,6 +21,7 @@ import ( ...@@ -19,6 +21,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality" "github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-node/rollup/status" "github.com/ethereum-optimism/optimism/op-node/rollup/status"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
...@@ -41,7 +44,7 @@ type L2Verifier struct { ...@@ -41,7 +44,7 @@ type L2Verifier struct {
syncStatus driver.SyncStatusTracker syncStatus driver.SyncStatusTracker
synchronousEvents *rollup.SynchronousEvents synchronousEvents event.EmitterDrainer
syncDeriver *driver.SyncDeriver syncDeriver *driver.SyncDeriver
...@@ -88,8 +91,13 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri ...@@ -88,8 +91,13 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel) t.Cleanup(cancel)
rootDeriver := &rollup.SynchronousDerivers{} rootDeriver := &event.DeriverMux{}
synchronousEvents := rollup.NewSynchronousEvents(log, ctx, rootDeriver) var synchronousEvents event.EmitterDrainer
synchronousEvents = event.NewQueue(log, ctx, rootDeriver, event.NoopMetrics{})
synchronousEvents = event.NewLimiterDrainer(ctx, synchronousEvents, rate.Limit(1000), 20, func() {
log.Warn("Hitting events rate-limit. An events code-path may be hot-looping.")
t.Fatal("Tests must not hot-loop events")
})
metrics := &testutils.TestDerivationMetrics{} metrics := &testutils.TestDerivationMetrics{}
ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents) ec := engine.NewEngineController(eng, log, metrics, cfg, syncCfg.SyncMode, synchronousEvents)
...@@ -148,7 +156,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri ...@@ -148,7 +156,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, blobsSrc deri
synchronousEvents: synchronousEvents, synchronousEvents: synchronousEvents,
} }
*rootDeriver = rollup.SynchronousDerivers{ *rootDeriver = event.DeriverMux{
syncStatusTracker, syncStatusTracker,
syncDeriver, syncDeriver,
engineResetDeriver, engineResetDeriver,
...@@ -275,7 +283,7 @@ func (s *L2Verifier) ActL1HeadSignal(t Testing) { ...@@ -275,7 +283,7 @@ func (s *L2Verifier) ActL1HeadSignal(t Testing) {
head, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Unsafe) head, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Unsafe)
require.NoError(t, err) require.NoError(t, err)
s.synchronousEvents.Emit(status.L1UnsafeEvent{L1Unsafe: head}) s.synchronousEvents.Emit(status.L1UnsafeEvent{L1Unsafe: head})
require.NoError(t, s.synchronousEvents.DrainUntil(func(ev rollup.Event) bool { require.NoError(t, s.synchronousEvents.DrainUntil(func(ev event.Event) bool {
x, ok := ev.(status.L1UnsafeEvent) x, ok := ev.(status.L1UnsafeEvent)
return ok && x.L1Unsafe == head return ok && x.L1Unsafe == head
}, false)) }, false))
...@@ -286,7 +294,7 @@ func (s *L2Verifier) ActL1SafeSignal(t Testing) { ...@@ -286,7 +294,7 @@ func (s *L2Verifier) ActL1SafeSignal(t Testing) {
safe, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Safe) safe, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Safe)
require.NoError(t, err) require.NoError(t, err)
s.synchronousEvents.Emit(status.L1SafeEvent{L1Safe: safe}) s.synchronousEvents.Emit(status.L1SafeEvent{L1Safe: safe})
require.NoError(t, s.synchronousEvents.DrainUntil(func(ev rollup.Event) bool { require.NoError(t, s.synchronousEvents.DrainUntil(func(ev event.Event) bool {
x, ok := ev.(status.L1SafeEvent) x, ok := ev.(status.L1SafeEvent)
return ok && x.L1Safe == safe return ok && x.L1Safe == safe
}, false)) }, false))
...@@ -297,14 +305,14 @@ func (s *L2Verifier) ActL1FinalizedSignal(t Testing) { ...@@ -297,14 +305,14 @@ func (s *L2Verifier) ActL1FinalizedSignal(t Testing) {
finalized, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Finalized) finalized, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Finalized)
require.NoError(t, err) require.NoError(t, err)
s.synchronousEvents.Emit(finality.FinalizeL1Event{FinalizedL1: finalized}) s.synchronousEvents.Emit(finality.FinalizeL1Event{FinalizedL1: finalized})
require.NoError(t, s.synchronousEvents.DrainUntil(func(ev rollup.Event) bool { require.NoError(t, s.synchronousEvents.DrainUntil(func(ev event.Event) bool {
x, ok := ev.(finality.FinalizeL1Event) x, ok := ev.(finality.FinalizeL1Event)
return ok && x.FinalizedL1 == finalized return ok && x.FinalizedL1 == finalized
}, false)) }, false))
require.Equal(t, finalized, s.syncStatus.SyncStatus().FinalizedL1) require.Equal(t, finalized, s.syncStatus.SyncStatus().FinalizedL1)
} }
func (s *L2Verifier) OnEvent(ev rollup.Event) { func (s *L2Verifier) OnEvent(ev event.Event) {
switch x := ev.(type) { switch x := ev.(type) {
case rollup.L1TemporaryErrorEvent: case rollup.L1TemporaryErrorEvent:
s.log.Warn("L1 temporary error", "err", x.Err) s.log.Warn("L1 temporary error", "err", x.Err)
...@@ -323,13 +331,13 @@ func (s *L2Verifier) OnEvent(ev rollup.Event) { ...@@ -323,13 +331,13 @@ func (s *L2Verifier) OnEvent(ev rollup.Event) {
} }
func (s *L2Verifier) ActL2EventsUntilPending(t Testing, num uint64) { func (s *L2Verifier) ActL2EventsUntilPending(t Testing, num uint64) {
s.ActL2EventsUntil(t, func(ev rollup.Event) bool { s.ActL2EventsUntil(t, func(ev event.Event) bool {
x, ok := ev.(engine.PendingSafeUpdateEvent) x, ok := ev.(engine.PendingSafeUpdateEvent)
return ok && x.PendingSafe.Number == num return ok && x.PendingSafe.Number == num
}, 1000, false) }, 1000, false)
} }
func (s *L2Verifier) ActL2EventsUntil(t Testing, fn func(ev rollup.Event) bool, max int, excl bool) { func (s *L2Verifier) ActL2EventsUntil(t Testing, fn func(ev event.Event) bool, max int, excl bool) {
t.Helper() t.Helper()
if s.l2Building { if s.l2Building {
t.InvalidAction("cannot derive new data while building L2 block") t.InvalidAction("cannot derive new data while building L2 block")
......
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/node/safedb" "github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma" plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-plasma/bindings" "github.com/ethereum-optimism/optimism/op-plasma/bindings"
...@@ -500,7 +501,7 @@ func TestPlasma_SequencerStalledMultiChallenges(gt *testing.T) { ...@@ -500,7 +501,7 @@ func TestPlasma_SequencerStalledMultiChallenges(gt *testing.T) {
// advance the pipeline until it errors out as it is still stuck // advance the pipeline until it errors out as it is still stuck
// on deriving the first commitment // on deriving the first commitment
a.sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool { a.sequencer.ActL2EventsUntil(t, func(ev event.Event) bool {
x, ok := ev.(rollup.EngineTemporaryErrorEvent) x, ok := ev.(rollup.EngineTemporaryErrorEvent)
if ok { if ok {
require.ErrorContains(t, x.Err, "failed to fetch input data") require.ErrorContains(t, x.Err, "failed to fetch input data")
......
...@@ -21,6 +21,7 @@ import ( ...@@ -21,6 +21,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
engine2 "github.com/ethereum-optimism/optimism/op-node/rollup/engine" engine2 "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
...@@ -446,10 +447,7 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) { ...@@ -446,10 +447,7 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) {
// B3 is invalid block // B3 is invalid block
// NextAttributes is called // NextAttributes is called
sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool { sequencer.ActL2EventsUntil(t, event.Is[engine2.ProcessAttributesEvent], 100, true)
_, ok := ev.(engine2.ProcessAttributesEvent)
return ok
}, 100, true)
// mock forkChoiceUpdate error while restoring previous unsafe chain using backupUnsafe. // mock forkChoiceUpdate error while restoring previous unsafe chain using backupUnsafe.
seqEng.ActL2RPCFail(t, eth.InputError{Inner: errors.New("mock L2 RPC error"), Code: eth.InvalidForkchoiceState}) seqEng.ActL2RPCFail(t, eth.InputError{Inner: errors.New("mock L2 RPC error"), Code: eth.InvalidForkchoiceState})
...@@ -582,20 +580,14 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) { ...@@ -582,20 +580,14 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) {
// B3 is invalid block // B3 is invalid block
// wait till attributes processing (excl.) before mocking errors // wait till attributes processing (excl.) before mocking errors
sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool { sequencer.ActL2EventsUntil(t, event.Is[engine2.ProcessAttributesEvent], 100, true)
_, ok := ev.(engine2.ProcessAttributesEvent)
return ok
}, 100, true)
serverErrCnt := 2 serverErrCnt := 2
for i := 0; i < serverErrCnt; i++ { for i := 0; i < serverErrCnt; i++ {
// mock forkChoiceUpdate failure while restoring previous unsafe chain using backupUnsafe. // mock forkChoiceUpdate failure while restoring previous unsafe chain using backupUnsafe.
seqEng.ActL2RPCFail(t, engine.GenericServerError) seqEng.ActL2RPCFail(t, engine.GenericServerError)
// TryBackupUnsafeReorg is called - forkChoiceUpdate returns GenericServerError so retry // TryBackupUnsafeReorg is called - forkChoiceUpdate returns GenericServerError so retry
sequencer.ActL2EventsUntil(t, func(ev rollup.Event) bool { sequencer.ActL2EventsUntil(t, event.Is[rollup.EngineTemporaryErrorEvent], 100, false)
_, ok := ev.(rollup.EngineTemporaryErrorEvent)
return ok
}, 100, false)
// backupUnsafeHead not emptied yet // backupUnsafeHead not emptied yet
require.Equal(t, targetUnsafeHeadHash, sequencer.L2BackupUnsafe().Hash) require.Equal(t, targetUnsafeHeadHash, sequencer.L2BackupUnsafe().Hash)
} }
...@@ -981,11 +973,8 @@ func TestSpanBatchAtomicity_Consolidation(gt *testing.T) { ...@@ -981,11 +973,8 @@ func TestSpanBatchAtomicity_Consolidation(gt *testing.T) {
verifier.l2PipelineIdle = false verifier.l2PipelineIdle = false
for !verifier.l2PipelineIdle { for !verifier.l2PipelineIdle {
// wait for next pending block // wait for next pending block
verifier.ActL2EventsUntil(t, func(ev rollup.Event) bool { verifier.ActL2EventsUntil(t, event.Any(
_, pending := ev.(engine2.PendingSafeUpdateEvent) event.Is[engine2.PendingSafeUpdateEvent], event.Is[derive.DeriverIdleEvent]), 1000, false)
_, idle := ev.(derive.DeriverIdleEvent)
return pending || idle
}, 1000, false)
if verifier.L2PendingSafe().Number < targetHeadNumber { if verifier.L2PendingSafe().Number < targetHeadNumber {
// If the span batch is not fully processed, the safe head must not advance. // If the span batch is not fully processed, the safe head must not advance.
require.Equal(t, verifier.L2Safe().Number, uint64(0)) require.Equal(t, verifier.L2Safe().Number, uint64(0))
...@@ -1033,11 +1022,8 @@ func TestSpanBatchAtomicity_ForceAdvance(gt *testing.T) { ...@@ -1033,11 +1022,8 @@ func TestSpanBatchAtomicity_ForceAdvance(gt *testing.T) {
verifier.l2PipelineIdle = false verifier.l2PipelineIdle = false
for !verifier.l2PipelineIdle { for !verifier.l2PipelineIdle {
// wait for next pending block // wait for next pending block
verifier.ActL2EventsUntil(t, func(ev rollup.Event) bool { verifier.ActL2EventsUntil(t, event.Any(
_, pending := ev.(engine2.PendingSafeUpdateEvent) event.Is[engine2.PendingSafeUpdateEvent], event.Is[derive.DeriverIdleEvent]), 1000, false)
_, idle := ev.(derive.DeriverIdleEvent)
return pending || idle
}, 1000, false)
if verifier.L2PendingSafe().Number < targetHeadNumber { if verifier.L2PendingSafe().Number < targetHeadNumber {
// If the span batch is not fully processed, the safe head must not advance. // If the span batch is not fully processed, the safe head must not advance.
require.Equal(t, verifier.L2Safe().Number, uint64(0)) require.Equal(t, verifier.L2Safe().Number, uint64(0))
......
...@@ -39,6 +39,9 @@ type Metricer interface { ...@@ -39,6 +39,9 @@ type Metricer interface {
RecordSequencingError() RecordSequencingError()
RecordPublishingError() RecordPublishingError()
RecordDerivationError() RecordDerivationError()
RecordEmittedEvent(name string)
RecordProcessedEvent(name string)
RecordEventsRateLimited()
RecordReceivedUnsafePayload(payload *eth.ExecutionPayloadEnvelope) RecordReceivedUnsafePayload(payload *eth.ExecutionPayloadEnvelope)
RecordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash) RecordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash)
RecordL1Ref(name string, ref eth.L1BlockRef) RecordL1Ref(name string, ref eth.L1BlockRef)
...@@ -92,6 +95,11 @@ type Metrics struct { ...@@ -92,6 +95,11 @@ type Metrics struct {
SequencingErrors *metrics.Event SequencingErrors *metrics.Event
PublishingErrors *metrics.Event PublishingErrors *metrics.Event
EmittedEvents *prometheus.CounterVec
ProcessedEvents *prometheus.CounterVec
EventsRateLimited *metrics.Event
DerivedBatches metrics.EventVec DerivedBatches metrics.EventVec
P2PReqDurationSeconds *prometheus.HistogramVec P2PReqDurationSeconds *prometheus.HistogramVec
...@@ -195,6 +203,24 @@ func NewMetrics(procName string) *Metrics { ...@@ -195,6 +203,24 @@ func NewMetrics(procName string) *Metrics {
SequencingErrors: metrics.NewEvent(factory, ns, "", "sequencing_errors", "sequencing errors"), SequencingErrors: metrics.NewEvent(factory, ns, "", "sequencing_errors", "sequencing errors"),
PublishingErrors: metrics.NewEvent(factory, ns, "", "publishing_errors", "p2p publishing errors"), PublishingErrors: metrics.NewEvent(factory, ns, "", "publishing_errors", "p2p publishing errors"),
EmittedEvents: factory.NewCounterVec(
prometheus.CounterOpts{
Namespace: ns,
Subsystem: "events",
Name: "emitted",
Help: "number of emitted events",
}, []string{"event_type"}),
ProcessedEvents: factory.NewCounterVec(
prometheus.CounterOpts{
Namespace: ns,
Subsystem: "events",
Name: "processed",
Help: "number of processed events",
}, []string{"event_type"}),
EventsRateLimited: metrics.NewEvent(factory, ns, "events", "rate_limited", "events rate limiter hits"),
DerivedBatches: metrics.NewEventVec(factory, ns, "", "derived_batches", "derived batches", []string{"type"}), DerivedBatches: metrics.NewEventVec(factory, ns, "", "derived_batches", "derived batches", []string{"type"}),
SequencerInconsistentL1Origin: metrics.NewEvent(factory, ns, "", "sequencer_inconsistent_l1_origin", "events when the sequencer selects an inconsistent L1 origin"), SequencerInconsistentL1Origin: metrics.NewEvent(factory, ns, "", "sequencer_inconsistent_l1_origin", "events when the sequencer selects an inconsistent L1 origin"),
...@@ -441,6 +467,18 @@ func (m *Metrics) RecordPublishingError() { ...@@ -441,6 +467,18 @@ func (m *Metrics) RecordPublishingError() {
m.PublishingErrors.Record() m.PublishingErrors.Record()
} }
func (m *Metrics) RecordEmittedEvent(name string) {
m.EmittedEvents.WithLabelValues(name).Inc()
}
func (m *Metrics) RecordProcessedEvent(name string) {
m.ProcessedEvents.WithLabelValues(name).Inc()
}
func (m *Metrics) RecordEventsRateLimited() {
m.EventsRateLimited.Record()
}
func (m *Metrics) RecordDerivationError() { func (m *Metrics) RecordDerivationError() {
m.DerivationErrors.Record() m.DerivationErrors.Record()
} }
...@@ -642,6 +680,15 @@ func (n *noopMetricer) RecordPublishingError() { ...@@ -642,6 +680,15 @@ func (n *noopMetricer) RecordPublishingError() {
func (n *noopMetricer) RecordDerivationError() { func (n *noopMetricer) RecordDerivationError() {
} }
func (n *noopMetricer) RecordEmittedEvent(name string) {
}
func (n *noopMetricer) RecordProcessedEvent(name string) {
}
func (n *noopMetricer) RecordEventsRateLimited() {
}
func (n *noopMetricer) RecordReceivedUnsafePayload(payload *eth.ExecutionPayloadEnvelope) { func (n *noopMetricer) RecordReceivedUnsafePayload(payload *eth.ExecutionPayloadEnvelope) {
} }
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -31,12 +32,12 @@ type AttributesHandler struct { ...@@ -31,12 +32,12 @@ type AttributesHandler struct {
mu sync.Mutex mu sync.Mutex
emitter rollup.EventEmitter emitter event.Emitter
attributes *derive.AttributesWithParent attributes *derive.AttributesWithParent
} }
func NewAttributesHandler(log log.Logger, cfg *rollup.Config, ctx context.Context, l2 L2, emitter rollup.EventEmitter) *AttributesHandler { func NewAttributesHandler(log log.Logger, cfg *rollup.Config, ctx context.Context, l2 L2, emitter event.Emitter) *AttributesHandler {
return &AttributesHandler{ return &AttributesHandler{
log: log, log: log,
cfg: cfg, cfg: cfg,
...@@ -47,7 +48,7 @@ func NewAttributesHandler(log log.Logger, cfg *rollup.Config, ctx context.Contex ...@@ -47,7 +48,7 @@ func NewAttributesHandler(log log.Logger, cfg *rollup.Config, ctx context.Contex
} }
} }
func (eq *AttributesHandler) OnEvent(ev rollup.Event) { func (eq *AttributesHandler) OnEvent(ev event.Event) {
// Events may be concurrent in the future. Prevent unsafe concurrent modifications to the attributes. // Events may be concurrent in the future. Prevent unsafe concurrent modifications to the attributes.
eq.mu.Lock() eq.mu.Lock()
defer eq.mu.Unlock() defer eq.mu.Unlock()
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -25,14 +26,14 @@ type CLSync struct { ...@@ -25,14 +26,14 @@ type CLSync struct {
cfg *rollup.Config cfg *rollup.Config
metrics Metrics metrics Metrics
emitter rollup.EventEmitter emitter event.Emitter
mu sync.Mutex mu sync.Mutex
unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates
} }
func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, emitter rollup.EventEmitter) *CLSync { func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, emitter event.Emitter) *CLSync {
return &CLSync{ return &CLSync{
log: log, log: log,
cfg: cfg, cfg: cfg,
...@@ -63,7 +64,7 @@ func (ev ReceivedUnsafePayloadEvent) String() string { ...@@ -63,7 +64,7 @@ func (ev ReceivedUnsafePayloadEvent) String() string {
return "received-unsafe-payload" return "received-unsafe-payload"
} }
func (eq *CLSync) OnEvent(ev rollup.Event) { func (eq *CLSync) OnEvent(ev event.Event) {
// Events may be concurrent in the future. Prevent unsafe concurrent modifications to the payloads queue. // Events may be concurrent in the future. Prevent unsafe concurrent modifications to the payloads queue.
eq.mu.Lock() eq.mu.Lock()
defer eq.mu.Unlock() defer eq.mu.Unlock()
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"io" "io"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -67,12 +68,12 @@ type PipelineDeriver struct { ...@@ -67,12 +68,12 @@ type PipelineDeriver struct {
ctx context.Context ctx context.Context
emitter rollup.EventEmitter emitter event.Emitter
needAttributesConfirmation bool needAttributesConfirmation bool
} }
func NewPipelineDeriver(ctx context.Context, pipeline *DerivationPipeline, emitter rollup.EventEmitter) *PipelineDeriver { func NewPipelineDeriver(ctx context.Context, pipeline *DerivationPipeline, emitter event.Emitter) *PipelineDeriver {
return &PipelineDeriver{ return &PipelineDeriver{
pipeline: pipeline, pipeline: pipeline,
ctx: ctx, ctx: ctx,
...@@ -80,7 +81,7 @@ func NewPipelineDeriver(ctx context.Context, pipeline *DerivationPipeline, emitt ...@@ -80,7 +81,7 @@ func NewPipelineDeriver(ctx context.Context, pipeline *DerivationPipeline, emitt
} }
} }
func (d *PipelineDeriver) OnEvent(ev rollup.Event) { func (d *PipelineDeriver) OnEvent(ev event.Event) {
switch x := ev.(type) { switch x := ev.(type) {
case rollup.ResetEvent: case rollup.ResetEvent:
d.pipeline.Reset() d.pipeline.Reset()
......
...@@ -4,6 +4,8 @@ import ( ...@@ -4,6 +4,8 @@ import (
"context" "context"
"time" "time"
"golang.org/x/time/rate"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -14,6 +16,7 @@ import ( ...@@ -14,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality" "github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-node/rollup/status" "github.com/ethereum-optimism/optimism/op-node/rollup/status"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
...@@ -46,6 +49,8 @@ type Metrics interface { ...@@ -46,6 +49,8 @@ type Metrics interface {
EngineMetrics EngineMetrics
L1FetcherMetrics L1FetcherMetrics
SequencerMetrics SequencerMetrics
event.Metrics
RecordEventsRateLimited()
} }
type L1Chain interface { type L1Chain interface {
...@@ -93,7 +98,7 @@ type AttributesHandler interface { ...@@ -93,7 +98,7 @@ type AttributesHandler interface {
type Finalizer interface { type Finalizer interface {
FinalizedL1() eth.L1BlockRef FinalizedL1() eth.L1BlockRef
rollup.Deriver event.Deriver
} }
type PlasmaIface interface { type PlasmaIface interface {
...@@ -106,7 +111,7 @@ type PlasmaIface interface { ...@@ -106,7 +111,7 @@ type PlasmaIface interface {
} }
type SyncStatusTracker interface { type SyncStatusTracker interface {
rollup.Deriver event.Deriver
SyncStatus() *eth.SyncStatus SyncStatus() *eth.SyncStatus
L1Head() eth.L1BlockRef L1Head() eth.L1BlockRef
} }
...@@ -149,6 +154,14 @@ type SequencerStateListener interface { ...@@ -149,6 +154,14 @@ type SequencerStateListener interface {
SequencerStopped() error SequencerStopped() error
} }
// 10,000 events per second is plenty.
// If we are going through more events, the driver needs to breathe, and warn the user of a potential issue.
const eventsLimit = rate.Limit(10_000)
// 500 events of burst: the maximum amount of events to eat up
// past the rate limit before the rate limit becomes applicable.
const eventsBurst = 500
// NewDriver composes an events handler that tracks L1 state, triggers L2 Derivation, and optionally sequences new L2 blocks. // NewDriver composes an events handler that tracks L1 state, triggers L2 Derivation, and optionally sequences new L2 blocks.
func NewDriver( func NewDriver(
driverCfg *Config, driverCfg *Config,
...@@ -167,8 +180,13 @@ func NewDriver( ...@@ -167,8 +180,13 @@ func NewDriver(
plasma PlasmaIface, plasma PlasmaIface,
) *Driver { ) *Driver {
driverCtx, driverCancel := context.WithCancel(context.Background()) driverCtx, driverCancel := context.WithCancel(context.Background())
rootDeriver := &rollup.SynchronousDerivers{} rootDeriver := &event.DeriverMux{}
synchronousEvents := rollup.NewSynchronousEvents(log, driverCtx, rootDeriver) var synchronousEvents event.EmitterDrainer
synchronousEvents = event.NewQueue(log, driverCtx, rootDeriver, metrics)
synchronousEvents = event.NewLimiterDrainer(context.Background(), synchronousEvents, eventsLimit, eventsBurst, func() {
metrics.RecordEventsRateLimited()
log.Warn("Driver is hitting events rate limit.")
})
statusTracker := status.NewStatusTracker(log, metrics) statusTracker := status.NewStatusTracker(log, metrics)
...@@ -240,7 +258,7 @@ func NewDriver( ...@@ -240,7 +258,7 @@ func NewDriver(
sequencerConductor: sequencerConductor, sequencerConductor: sequencerConductor,
} }
*rootDeriver = []rollup.Deriver{ *rootDeriver = []event.Deriver{
syncDeriver, syncDeriver,
engineResetDeriver, engineResetDeriver,
engDeriv, engDeriv,
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality" "github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-node/rollup/status" "github.com/ethereum-optimism/optimism/op-node/rollup/status"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
...@@ -41,7 +42,7 @@ type Driver struct { ...@@ -41,7 +42,7 @@ type Driver struct {
sched *StepSchedulingDeriver sched *StepSchedulingDeriver
synchronousEvents *rollup.SynchronousEvents synchronousEvents event.EmitterDrainer
// Requests to block the event loop for synchronous execution to avoid reading an inconsistent state // Requests to block the event loop for synchronous execution to avoid reading an inconsistent state
stateReq chan chan struct{} stateReq chan chan struct{}
...@@ -365,7 +366,7 @@ func (s *Driver) eventLoop() { ...@@ -365,7 +366,7 @@ func (s *Driver) eventLoop() {
// OnEvent handles broadcasted events. // OnEvent handles broadcasted events.
// The Driver itself is a deriver to catch system-critical events. // The Driver itself is a deriver to catch system-critical events.
// Other event-handling should be encapsulated into standalone derivers. // Other event-handling should be encapsulated into standalone derivers.
func (s *Driver) OnEvent(ev rollup.Event) { func (s *Driver) OnEvent(ev event.Event) {
switch x := ev.(type) { switch x := ev.(type) {
case rollup.CriticalErrorEvent: case rollup.CriticalErrorEvent:
s.Log.Error("Derivation process critical error", "err", x.Err) s.Log.Error("Derivation process critical error", "err", x.Err)
...@@ -381,7 +382,7 @@ func (s *Driver) OnEvent(ev rollup.Event) { ...@@ -381,7 +382,7 @@ func (s *Driver) OnEvent(ev rollup.Event) {
} }
} }
func (s *Driver) Emit(ev rollup.Event) { func (s *Driver) Emit(ev event.Event) {
s.synchronousEvents.Emit(ev) s.synchronousEvents.Emit(ev)
} }
...@@ -408,7 +409,7 @@ type SyncDeriver struct { ...@@ -408,7 +409,7 @@ type SyncDeriver struct {
L1 L1Chain L1 L1Chain
L2 L2Chain L2 L2Chain
Emitter rollup.EventEmitter Emitter event.Emitter
Log log.Logger Log log.Logger
...@@ -417,7 +418,7 @@ type SyncDeriver struct { ...@@ -417,7 +418,7 @@ type SyncDeriver struct {
Drain func() error Drain func() error
} }
func (s *SyncDeriver) OnEvent(ev rollup.Event) { func (s *SyncDeriver) OnEvent(ev event.Event) {
switch x := ev.(type) { switch x := ev.(type) {
case StepEvent: case StepEvent:
s.onStepEvent() s.onStepEvent()
...@@ -508,7 +509,8 @@ func (s *SyncDeriver) onStepEvent() { ...@@ -508,7 +509,8 @@ func (s *SyncDeriver) onStepEvent() {
s.Log.Error("Derivation process error", "err", err) s.Log.Error("Derivation process error", "err", err)
s.Emitter.Emit(StepReqEvent{}) s.Emitter.Emit(StepReqEvent{})
} else { } else {
s.Emitter.Emit(StepReqEvent{ResetBackoff: true}) // continue with the next step if we can // Revisit SyncStep in 1/2 of a L2 block.
s.Emitter.Emit(StepDelayedReqEvent{Delay: (time.Duration(s.Config.BlockTime) * time.Second) / 2})
} }
} }
......
...@@ -5,7 +5,7 @@ import ( ...@@ -5,7 +5,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-service/retry"
) )
...@@ -16,6 +16,14 @@ func (ev ResetStepBackoffEvent) String() string { ...@@ -16,6 +16,14 @@ func (ev ResetStepBackoffEvent) String() string {
return "reset-step-backoff" return "reset-step-backoff"
} }
type StepDelayedReqEvent struct {
Delay time.Duration
}
func (ev StepDelayedReqEvent) String() string {
return "step-delayed-req"
}
type StepReqEvent struct { type StepReqEvent struct {
ResetBackoff bool ResetBackoff bool
} }
...@@ -61,10 +69,10 @@ type StepSchedulingDeriver struct { ...@@ -61,10 +69,10 @@ type StepSchedulingDeriver struct {
log log.Logger log log.Logger
emitter rollup.EventEmitter emitter event.Emitter
} }
func NewStepSchedulingDeriver(log log.Logger, emitter rollup.EventEmitter) *StepSchedulingDeriver { func NewStepSchedulingDeriver(log log.Logger, emitter event.Emitter) *StepSchedulingDeriver {
return &StepSchedulingDeriver{ return &StepSchedulingDeriver{
stepAttempts: 0, stepAttempts: 0,
bOffStrategy: retry.Exponential(), bOffStrategy: retry.Exponential(),
...@@ -88,7 +96,7 @@ func (s *StepSchedulingDeriver) NextDelayedStep() <-chan time.Time { ...@@ -88,7 +96,7 @@ func (s *StepSchedulingDeriver) NextDelayedStep() <-chan time.Time {
return s.delayedStepReq return s.delayedStepReq
} }
func (s *StepSchedulingDeriver) OnEvent(ev rollup.Event) { func (s *StepSchedulingDeriver) OnEvent(ev event.Event) {
step := func() { step := func() {
s.delayedStepReq = nil s.delayedStepReq = nil
select { select {
...@@ -99,6 +107,10 @@ func (s *StepSchedulingDeriver) OnEvent(ev rollup.Event) { ...@@ -99,6 +107,10 @@ func (s *StepSchedulingDeriver) OnEvent(ev rollup.Event) {
} }
switch x := ev.(type) { switch x := ev.(type) {
case StepDelayedReqEvent:
if s.delayedStepReq == nil {
s.delayedStepReq = time.After(x.Delay)
}
case StepReqEvent: case StepReqEvent:
if x.ResetBackoff { if x.ResetBackoff {
s.stepAttempts = 0 s.stepAttempts = 0
......
...@@ -7,14 +7,14 @@ import ( ...@@ -7,14 +7,14 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
) )
func TestStepSchedulingDeriver(t *testing.T) { func TestStepSchedulingDeriver(t *testing.T) {
logger := testlog.Logger(t, log.LevelError) logger := testlog.Logger(t, log.LevelError)
var queued []rollup.Event var queued []event.Event
emitter := rollup.EmitterFunc(func(ev rollup.Event) { emitter := event.EmitterFunc(func(ev event.Event) {
queued = append(queued, ev) queued = append(queued, ev)
}) })
sched := NewStepSchedulingDeriver(logger, emitter) sched := NewStepSchedulingDeriver(logger, emitter)
...@@ -26,7 +26,7 @@ func TestStepSchedulingDeriver(t *testing.T) { ...@@ -26,7 +26,7 @@ func TestStepSchedulingDeriver(t *testing.T) {
require.Empty(t, queued, "only scheduled so far, no step attempts yet") require.Empty(t, queued, "only scheduled so far, no step attempts yet")
<-sched.NextStep() <-sched.NextStep()
sched.OnEvent(StepAttemptEvent{}) sched.OnEvent(StepAttemptEvent{})
require.Equal(t, []rollup.Event{StepEvent{}}, queued, "got step event") require.Equal(t, []event.Event{StepEvent{}}, queued, "got step event")
require.Nil(t, sched.NextDelayedStep(), "no delayed steps yet") require.Nil(t, sched.NextDelayedStep(), "no delayed steps yet")
sched.OnEvent(StepReqEvent{}) sched.OnEvent(StepReqEvent{})
require.NotNil(t, sched.NextDelayedStep(), "2nd attempt before backoff reset causes delayed step to be scheduled") require.NotNil(t, sched.NextDelayedStep(), "2nd attempt before backoff reset causes delayed step to be scheduled")
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
...@@ -54,7 +55,7 @@ type EngineController struct { ...@@ -54,7 +55,7 @@ type EngineController struct {
elStart time.Time elStart time.Time
clock clock.Clock clock clock.Clock
emitter rollup.EventEmitter emitter event.Emitter
// Block Head State // Block Head State
unsafeHead eth.L2BlockRef unsafeHead eth.L2BlockRef
...@@ -78,7 +79,7 @@ type EngineController struct { ...@@ -78,7 +79,7 @@ type EngineController struct {
} }
func NewEngineController(engine ExecEngine, log log.Logger, metrics derive.Metrics, func NewEngineController(engine ExecEngine, log log.Logger, metrics derive.Metrics,
rollupCfg *rollup.Config, syncMode sync.Mode, emitter rollup.EventEmitter) *EngineController { rollupCfg *rollup.Config, syncMode sync.Mode, emitter event.Emitter) *EngineController {
syncStatus := syncStatusCL syncStatus := syncStatusCL
if syncMode == sync.ELSync { if syncMode == sync.ELSync {
syncStatus = syncStatusWillStartEL syncStatus = syncStatusWillStartEL
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
) )
...@@ -27,11 +28,11 @@ type EngineResetDeriver struct { ...@@ -27,11 +28,11 @@ type EngineResetDeriver struct {
l2 sync.L2Chain l2 sync.L2Chain
syncCfg *sync.Config syncCfg *sync.Config
emitter rollup.EventEmitter emitter event.Emitter
} }
func NewEngineResetDeriver(ctx context.Context, log log.Logger, cfg *rollup.Config, func NewEngineResetDeriver(ctx context.Context, log log.Logger, cfg *rollup.Config,
l1 sync.L1Chain, l2 sync.L2Chain, syncCfg *sync.Config, emitter rollup.EventEmitter) *EngineResetDeriver { l1 sync.L1Chain, l2 sync.L2Chain, syncCfg *sync.Config, emitter event.Emitter) *EngineResetDeriver {
return &EngineResetDeriver{ return &EngineResetDeriver{
ctx: ctx, ctx: ctx,
log: log, log: log,
...@@ -43,7 +44,7 @@ func NewEngineResetDeriver(ctx context.Context, log log.Logger, cfg *rollup.Conf ...@@ -43,7 +44,7 @@ func NewEngineResetDeriver(ctx context.Context, log log.Logger, cfg *rollup.Conf
} }
} }
func (d *EngineResetDeriver) OnEvent(ev rollup.Event) { func (d *EngineResetDeriver) OnEvent(ev event.Event) {
switch ev.(type) { switch ev.(type) {
case ResetEngineRequestEvent: case ResetEngineRequestEvent:
result, err := sync.FindL2Heads(d.ctx, d.cfg, d.l1, d.l2, d.log, d.syncCfg) result, err := sync.FindL2Heads(d.ctx, d.cfg, d.l1, d.l2, d.log, d.syncCfg)
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/async"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -148,13 +149,13 @@ type EngDeriver struct { ...@@ -148,13 +149,13 @@ type EngDeriver struct {
cfg *rollup.Config cfg *rollup.Config
ec *EngineController ec *EngineController
ctx context.Context ctx context.Context
emitter rollup.EventEmitter emitter event.Emitter
} }
var _ rollup.Deriver = (*EngDeriver)(nil) var _ event.Deriver = (*EngDeriver)(nil)
func NewEngDeriver(log log.Logger, ctx context.Context, cfg *rollup.Config, func NewEngDeriver(log log.Logger, ctx context.Context, cfg *rollup.Config,
ec *EngineController, emitter rollup.EventEmitter) *EngDeriver { ec *EngineController, emitter event.Emitter) *EngDeriver {
return &EngDeriver{ return &EngDeriver{
log: log, log: log,
cfg: cfg, cfg: cfg,
...@@ -164,7 +165,7 @@ func NewEngDeriver(log log.Logger, ctx context.Context, cfg *rollup.Config, ...@@ -164,7 +165,7 @@ func NewEngDeriver(log log.Logger, ctx context.Context, cfg *rollup.Config,
} }
} }
func (d *EngDeriver) OnEvent(ev rollup.Event) { func (d *EngDeriver) OnEvent(ev event.Event) {
switch x := ev.(type) { switch x := ev.(type) {
case TryBackupUnsafeReorgEvent: case TryBackupUnsafeReorgEvent:
// If we don't need to call FCU to restore unsafeHead using backupUnsafe, keep going b/c // If we don't need to call FCU to restore unsafeHead using backupUnsafe, keep going b/c
......
package rollup package rollup
import "github.com/ethereum/go-ethereum/log" import "github.com/ethereum-optimism/optimism/op-node/rollup/event"
type Event interface {
String() string
}
type Deriver interface {
OnEvent(ev Event)
}
type EventEmitter interface {
Emit(ev Event)
}
type EmitterFunc func(ev Event)
func (fn EmitterFunc) Emit(ev Event) {
fn(ev)
}
// L1TemporaryErrorEvent identifies a temporary issue with the L1 data. // L1TemporaryErrorEvent identifies a temporary issue with the L1 data.
type L1TemporaryErrorEvent struct { type L1TemporaryErrorEvent struct {
Err error Err error
} }
var _ Event = L1TemporaryErrorEvent{} var _ event.Event = L1TemporaryErrorEvent{}
func (ev L1TemporaryErrorEvent) String() string { func (ev L1TemporaryErrorEvent) String() string {
return "l1-temporary-error" return "l1-temporary-error"
...@@ -39,7 +21,7 @@ type EngineTemporaryErrorEvent struct { ...@@ -39,7 +21,7 @@ type EngineTemporaryErrorEvent struct {
Err error Err error
} }
var _ Event = EngineTemporaryErrorEvent{} var _ event.Event = EngineTemporaryErrorEvent{}
func (ev EngineTemporaryErrorEvent) String() string { func (ev EngineTemporaryErrorEvent) String() string {
return "engine-temporary-error" return "engine-temporary-error"
...@@ -49,7 +31,7 @@ type ResetEvent struct { ...@@ -49,7 +31,7 @@ type ResetEvent struct {
Err error Err error
} }
var _ Event = ResetEvent{} var _ event.Event = ResetEvent{}
func (ev ResetEvent) String() string { func (ev ResetEvent) String() string {
return "reset-event" return "reset-event"
...@@ -59,43 +41,8 @@ type CriticalErrorEvent struct { ...@@ -59,43 +41,8 @@ type CriticalErrorEvent struct {
Err error Err error
} }
var _ Event = CriticalErrorEvent{} var _ event.Event = CriticalErrorEvent{}
func (ev CriticalErrorEvent) String() string { func (ev CriticalErrorEvent) String() string {
return "critical-error" return "critical-error"
} }
type SynchronousDerivers []Deriver
func (s *SynchronousDerivers) OnEvent(ev Event) {
for _, d := range *s {
d.OnEvent(ev)
}
}
var _ Deriver = (*SynchronousDerivers)(nil)
type DebugDeriver struct {
Log log.Logger
}
func (d DebugDeriver) OnEvent(ev Event) {
d.Log.Debug("on-event", "event", ev)
}
type NoopDeriver struct{}
func (d NoopDeriver) OnEvent(ev Event) {}
// DeriverFunc implements the Deriver interface as a function,
// similar to how the std-lib http HandlerFunc implements a Handler.
// This can be used for small in-place derivers, test helpers, etc.
type DeriverFunc func(ev Event)
func (fn DeriverFunc) OnEvent(ev Event) {
fn(ev)
}
type NoopEmitter struct{}
func (e NoopEmitter) Emit(ev Event) {}
package event
import "github.com/ethereum/go-ethereum/log"
type Event interface {
// String returns the name of the event.
// The name must be simple and identify the event type, not the event content.
// This name is used for metric-labeling.
String() string
}
type Deriver interface {
OnEvent(ev Event)
}
type Emitter interface {
Emit(ev Event)
}
type Drainer interface {
// Drain processes all events.
Drain() error
// DrainUntil processes all events until a condition is hit.
// If excl, the event that matches the condition is not processed yet.
// If not excl, the event that matches is processed.
DrainUntil(fn func(ev Event) bool, excl bool) error
}
type EmitterDrainer interface {
Emitter
Drainer
}
type EmitterFunc func(ev Event)
func (fn EmitterFunc) Emit(ev Event) {
fn(ev)
}
// DeriverMux takes an event-signal as deriver, and synchronously fans it out to all contained Deriver ends.
// Technically this is a DeMux: single input to multi output.
type DeriverMux []Deriver
func (s *DeriverMux) OnEvent(ev Event) {
for _, d := range *s {
d.OnEvent(ev)
}
}
var _ Deriver = (*DeriverMux)(nil)
type DebugDeriver struct {
Log log.Logger
}
func (d DebugDeriver) OnEvent(ev Event) {
d.Log.Debug("on-event", "event", ev)
}
type NoopDeriver struct{}
func (d NoopDeriver) OnEvent(ev Event) {}
// DeriverFunc implements the Deriver interface as a function,
// similar to how the std-lib http HandlerFunc implements a Handler.
// This can be used for small in-place derivers, test helpers, etc.
type DeriverFunc func(ev Event)
func (fn DeriverFunc) OnEvent(ev Event) {
fn(ev)
}
type NoopEmitter struct{}
func (e NoopEmitter) Emit(ev Event) {}
package rollup package event
import ( import (
"fmt" "fmt"
...@@ -13,7 +13,7 @@ func (ev TestEvent) String() string { ...@@ -13,7 +13,7 @@ func (ev TestEvent) String() string {
return "X" return "X"
} }
func TestSynchronousDerivers_OnEvent(t *testing.T) { func TestDeriverMux_OnEvent(t *testing.T) {
result := "" result := ""
a := DeriverFunc(func(ev Event) { a := DeriverFunc(func(ev Event) {
result += fmt.Sprintf("A:%s\n", ev) result += fmt.Sprintf("A:%s\n", ev)
...@@ -25,26 +25,26 @@ func TestSynchronousDerivers_OnEvent(t *testing.T) { ...@@ -25,26 +25,26 @@ func TestSynchronousDerivers_OnEvent(t *testing.T) {
result += fmt.Sprintf("C:%s\n", ev) result += fmt.Sprintf("C:%s\n", ev)
}) })
x := SynchronousDerivers{} x := DeriverMux{}
x.OnEvent(TestEvent{}) x.OnEvent(TestEvent{})
require.Equal(t, "", result) require.Equal(t, "", result)
x = SynchronousDerivers{a} x = DeriverMux{a}
x.OnEvent(TestEvent{}) x.OnEvent(TestEvent{})
require.Equal(t, "A:X\n", result) require.Equal(t, "A:X\n", result)
result = "" result = ""
x = SynchronousDerivers{a, a} x = DeriverMux{a, a}
x.OnEvent(TestEvent{}) x.OnEvent(TestEvent{})
require.Equal(t, "A:X\nA:X\n", result) require.Equal(t, "A:X\nA:X\n", result)
result = "" result = ""
x = SynchronousDerivers{a, b} x = DeriverMux{a, b}
x.OnEvent(TestEvent{}) x.OnEvent(TestEvent{})
require.Equal(t, "A:X\nB:X\n", result) require.Equal(t, "A:X\nB:X\n", result)
result = "" result = ""
x = SynchronousDerivers{a, b, c} x = DeriverMux{a, b, c}
x.OnEvent(TestEvent{}) x.OnEvent(TestEvent{})
require.Equal(t, "A:X\nB:X\nC:X\n", result) require.Equal(t, "A:X\nB:X\nC:X\n", result)
} }
package event
import (
"context"
"golang.org/x/time/rate"
)
type Limiter[E Emitter] struct {
ctx context.Context
emitter E
rl *rate.Limiter
onLimited func()
}
// NewLimiter returns an event rate-limiter.
// This can be used to prevent event loops from (accidentally) running too hot.
// The eventRate is the number of events per second.
// The eventBurst is the margin of events to eat into until the rate-limit kicks in.
// The onLimited function is optional, and will be called if an emitted event is getting rate-limited
func NewLimiter[E Emitter](ctx context.Context, em E, eventRate rate.Limit, eventBurst int, onLimited func()) *Limiter[E] {
return &Limiter[E]{
ctx: ctx,
emitter: em,
rl: rate.NewLimiter(eventRate, eventBurst),
onLimited: onLimited,
}
}
// Emit is thread-safe, multiple parallel derivers can safely emit events to it.
func (l *Limiter[E]) Emit(ev Event) {
if l.onLimited != nil && l.rl.Tokens() < 1.0 {
l.onLimited()
}
if err := l.rl.Wait(l.ctx); err != nil {
return // ctx error, safe to ignore.
}
l.emitter.Emit(ev)
}
// LimiterDrainer is a variant of Limiter that supports event draining.
type LimiterDrainer Limiter[EmitterDrainer]
func NewLimiterDrainer(ctx context.Context, em EmitterDrainer, eventRate rate.Limit, eventBurst int, onLimited func()) *LimiterDrainer {
return (*LimiterDrainer)(NewLimiter(ctx, em, eventRate, eventBurst, onLimited))
}
func (l *LimiterDrainer) Emit(ev Event) {
l.emitter.Emit(ev)
}
func (l *LimiterDrainer) Drain() error {
return l.emitter.Drain()
}
func (l *LimiterDrainer) DrainUntil(fn func(ev Event) bool, excl bool) error {
return l.emitter.DrainUntil(fn, excl)
}
package event
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)
func TestLimiter(t *testing.T) {
count := uint64(0)
em := EmitterFunc(func(ev Event) {
count += 1
})
hitRateLimitAt := uint64(0)
// Test that we are able to hit the specified rate limit, and no earlier
lim := NewLimiter(context.Background(), em, rate.Limit(10), 10, func() {
if hitRateLimitAt != 0 {
return
}
hitRateLimitAt = count
})
for i := 0; i < 30; i++ {
lim.Emit(TestEvent{})
}
require.LessOrEqual(t, uint64(10), hitRateLimitAt)
}
package event
type Metrics interface {
RecordEmittedEvent(name string)
RecordProcessedEvent(name string)
}
type NoopMetrics struct {
}
func (n NoopMetrics) RecordEmittedEvent(name string) {}
func (n NoopMetrics) RecordProcessedEvent(name string) {}
var _ Metrics = NoopMetrics{}
package rollup package event
import ( import (
"context" "context"
...@@ -12,11 +12,11 @@ import ( ...@@ -12,11 +12,11 @@ import (
// At some point it's better to drop events and warn something is exploding the number of events. // At some point it's better to drop events and warn something is exploding the number of events.
const sanityEventLimit = 1000 const sanityEventLimit = 1000
// SynchronousEvents is a rollup.EventEmitter that a rollup.Deriver can emit events to. // Queue is a event.Emitter that a event.Deriver can emit events to.
// The events will be queued up, and can then be executed synchronously by calling the Drain function, // The events will be queued up, and can then be executed synchronously by calling the Drain function,
// which will apply all events to the root Deriver. // which will apply all events to the root Deriver.
// New events may be queued up while events are being processed by the root rollup.Deriver. // New events may be queued up while events are being processed by the root rollup.Deriver.
type SynchronousEvents struct { type Queue struct {
// The lock is no-op in FP execution, if running in synchronous FP-VM. // The lock is no-op in FP execution, if running in synchronous FP-VM.
// This lock ensures that all emitted events are merged together correctly, // This lock ensures that all emitted events are merged together correctly,
// if this util is used in a concurrent context. // if this util is used in a concurrent context.
...@@ -29,20 +29,28 @@ type SynchronousEvents struct { ...@@ -29,20 +29,28 @@ type SynchronousEvents struct {
ctx context.Context ctx context.Context
root Deriver root Deriver
metrics Metrics
} }
func NewSynchronousEvents(log log.Logger, ctx context.Context, root Deriver) *SynchronousEvents { var _ EmitterDrainer = (*Queue)(nil)
return &SynchronousEvents{
log: log, func NewQueue(log log.Logger, ctx context.Context, root Deriver, metrics Metrics) *Queue {
ctx: ctx, return &Queue{
root: root, log: log,
ctx: ctx,
root: root,
metrics: metrics,
} }
} }
func (s *SynchronousEvents) Emit(event Event) { func (s *Queue) Emit(event Event) {
s.evLock.Lock() s.evLock.Lock()
defer s.evLock.Unlock() defer s.evLock.Unlock()
s.log.Debug("Emitting event", "event", event)
s.metrics.RecordEmittedEvent(event.String())
if s.ctx.Err() != nil { if s.ctx.Err() != nil {
s.log.Warn("Ignoring emitted event during shutdown", "event", event) s.log.Warn("Ignoring emitted event during shutdown", "event", event)
return return
...@@ -56,7 +64,7 @@ func (s *SynchronousEvents) Emit(event Event) { ...@@ -56,7 +64,7 @@ func (s *SynchronousEvents) Emit(event Event) {
s.events = append(s.events, event) s.events = append(s.events, event)
} }
func (s *SynchronousEvents) Drain() error { func (s *Queue) Drain() error {
for { for {
if s.ctx.Err() != nil { if s.ctx.Err() != nil {
return s.ctx.Err() return s.ctx.Err()
...@@ -70,11 +78,13 @@ func (s *SynchronousEvents) Drain() error { ...@@ -70,11 +78,13 @@ func (s *SynchronousEvents) Drain() error {
s.events = s.events[1:] s.events = s.events[1:]
s.evLock.Unlock() s.evLock.Unlock()
s.log.Debug("Processing event", "event", first)
s.root.OnEvent(first) s.root.OnEvent(first)
s.metrics.RecordProcessedEvent(first.String())
} }
} }
func (s *SynchronousEvents) DrainUntil(fn func(ev Event) bool, excl bool) error { func (s *Queue) DrainUntil(fn func(ev Event) bool, excl bool) error {
for { for {
if s.ctx.Err() != nil { if s.ctx.Err() != nil {
return s.ctx.Err() return s.ctx.Err()
...@@ -93,11 +103,13 @@ func (s *SynchronousEvents) DrainUntil(fn func(ev Event) bool, excl bool) error ...@@ -93,11 +103,13 @@ func (s *SynchronousEvents) DrainUntil(fn func(ev Event) bool, excl bool) error
s.events = s.events[1:] s.events = s.events[1:]
s.evLock.Unlock() s.evLock.Unlock()
s.log.Debug("Processing event", "event", first)
s.root.OnEvent(first) s.root.OnEvent(first)
s.metrics.RecordProcessedEvent(first.String())
if stop { if stop {
return nil return nil
} }
} }
} }
var _ EventEmitter = (*SynchronousEvents)(nil) var _ Emitter = (*Queue)(nil)
package rollup package event
import ( import (
"context" "context"
...@@ -11,14 +11,14 @@ import ( ...@@ -11,14 +11,14 @@ import (
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
) )
func TestSynchronousEvents(t *testing.T) { func TestQueue(t *testing.T) {
logger := testlog.Logger(t, log.LevelError) logger := testlog.Logger(t, log.LevelError)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
count := 0 count := 0
deriver := DeriverFunc(func(ev Event) { deriver := DeriverFunc(func(ev Event) {
count += 1 count += 1
}) })
syncEv := NewSynchronousEvents(logger, ctx, deriver) syncEv := NewQueue(logger, ctx, deriver, NoopMetrics{})
require.NoError(t, syncEv.Drain(), "can drain, even if empty") require.NoError(t, syncEv.Drain(), "can drain, even if empty")
syncEv.Emit(TestEvent{}) syncEv.Emit(TestEvent{})
...@@ -38,13 +38,13 @@ func TestSynchronousEvents(t *testing.T) { ...@@ -38,13 +38,13 @@ func TestSynchronousEvents(t *testing.T) {
require.Equal(t, 3, count, "didn't process event after trigger close") require.Equal(t, 3, count, "didn't process event after trigger close")
} }
func TestSynchronousEventsSanityLimit(t *testing.T) { func TestQueueSanityLimit(t *testing.T) {
logger := testlog.Logger(t, log.LevelError) logger := testlog.Logger(t, log.LevelCrit) // expecting error log of hitting sanity limit
count := 0 count := 0
deriver := DeriverFunc(func(ev Event) { deriver := DeriverFunc(func(ev Event) {
count += 1 count += 1
}) })
syncEv := NewSynchronousEvents(logger, context.Background(), deriver) syncEv := NewQueue(logger, context.Background(), deriver, NoopMetrics{})
// emit 1 too many events // emit 1 too many events
for i := 0; i < sanityEventLimit+1; i++ { for i := 0; i < sanityEventLimit+1; i++ {
syncEv.Emit(TestEvent{}) syncEv.Emit(TestEvent{})
...@@ -67,7 +67,7 @@ func (ev CyclicEvent) String() string { ...@@ -67,7 +67,7 @@ func (ev CyclicEvent) String() string {
func TestSynchronousCyclic(t *testing.T) { func TestSynchronousCyclic(t *testing.T) {
logger := testlog.Logger(t, log.LevelError) logger := testlog.Logger(t, log.LevelError)
var emitter EventEmitter var emitter Emitter
result := false result := false
deriver := DeriverFunc(func(ev Event) { deriver := DeriverFunc(func(ev Event) {
logger.Info("received event", "event", ev) logger.Info("received event", "event", ev)
...@@ -80,7 +80,7 @@ func TestSynchronousCyclic(t *testing.T) { ...@@ -80,7 +80,7 @@ func TestSynchronousCyclic(t *testing.T) {
} }
} }
}) })
syncEv := NewSynchronousEvents(logger, context.Background(), deriver) syncEv := NewQueue(logger, context.Background(), deriver, NoopMetrics{})
emitter = syncEv emitter = syncEv
syncEv.Emit(CyclicEvent{Count: 0}) syncEv.Emit(CyclicEvent{Count: 0})
require.NoError(t, syncEv.Drain()) require.NoError(t, syncEv.Drain())
......
package event
// Is as helper function is syntax-sugar to do an Event type check as a boolean function
func Is[T Event](ev Event) bool {
_, ok := ev.(T)
return ok
}
// Any as helper function combines different event conditions into a single function
func Any(fns ...func(ev Event) bool) func(ev Event) bool {
return func(ev Event) bool {
for _, fn := range fns {
if fn(ev) {
return true
}
}
return false
}
}
package event
import (
"testing"
"github.com/stretchr/testify/require"
)
type FooEvent struct{}
func (ev FooEvent) String() string {
return "foo"
}
type BarEvent struct{}
func (ev BarEvent) String() string {
return "bar"
}
func TestIs(t *testing.T) {
require.False(t, Is[TestEvent](FooEvent{}))
require.False(t, Is[TestEvent](BarEvent{}))
require.True(t, Is[FooEvent](FooEvent{}))
require.True(t, Is[BarEvent](BarEvent{}))
}
func TestAny(t *testing.T) {
require.False(t, Any()(FooEvent{}))
require.False(t, Any(Is[BarEvent])(FooEvent{}))
require.True(t, Any(Is[FooEvent])(FooEvent{}))
require.False(t, Any(Is[TestEvent], Is[BarEvent])(FooEvent{}))
require.True(t, Any(Is[TestEvent], Is[BarEvent], Is[FooEvent])(FooEvent{}))
require.True(t, Any(Is[FooEvent], Is[BarEvent], Is[TestEvent])(FooEvent{}))
require.True(t, Any(Is[FooEvent], Is[FooEvent], Is[FooEvent])(FooEvent{}))
require.False(t, Any(Is[FooEvent], Is[FooEvent], Is[FooEvent])(BarEvent{}))
}
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -72,7 +73,7 @@ type Finalizer struct { ...@@ -72,7 +73,7 @@ type Finalizer struct {
ctx context.Context ctx context.Context
emitter rollup.EventEmitter emitter event.Emitter
// finalizedL1 is the currently perceived finalized L1 block. // finalizedL1 is the currently perceived finalized L1 block.
// This may be ahead of the current traversed origin when syncing. // This may be ahead of the current traversed origin when syncing.
...@@ -93,7 +94,7 @@ type Finalizer struct { ...@@ -93,7 +94,7 @@ type Finalizer struct {
l1Fetcher FinalizerL1Interface l1Fetcher FinalizerL1Interface
} }
func NewFinalizer(ctx context.Context, log log.Logger, cfg *rollup.Config, l1Fetcher FinalizerL1Interface, emitter rollup.EventEmitter) *Finalizer { func NewFinalizer(ctx context.Context, log log.Logger, cfg *rollup.Config, l1Fetcher FinalizerL1Interface, emitter event.Emitter) *Finalizer {
lookback := calcFinalityLookback(cfg) lookback := calcFinalityLookback(cfg)
return &Finalizer{ return &Finalizer{
ctx: ctx, ctx: ctx,
...@@ -130,7 +131,7 @@ func (ev TryFinalizeEvent) String() string { ...@@ -130,7 +131,7 @@ func (ev TryFinalizeEvent) String() string {
return "try-finalize" return "try-finalize"
} }
func (fi *Finalizer) OnEvent(ev rollup.Event) { func (fi *Finalizer) OnEvent(ev event.Event) {
switch x := ev.(type) { switch x := ev.(type) {
case FinalizeL1Event: case FinalizeL1Event:
fi.onL1Finalized(x.FinalizedL1) fi.onL1Finalized(x.FinalizedL1)
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
plasma "github.com/ethereum-optimism/optimism/op-plasma" plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -27,7 +28,7 @@ type PlasmaFinalizer struct { ...@@ -27,7 +28,7 @@ type PlasmaFinalizer struct {
} }
func NewPlasmaFinalizer(ctx context.Context, log log.Logger, cfg *rollup.Config, func NewPlasmaFinalizer(ctx context.Context, log log.Logger, cfg *rollup.Config,
l1Fetcher FinalizerL1Interface, emitter rollup.EventEmitter, l1Fetcher FinalizerL1Interface, emitter event.Emitter,
backend PlasmaBackend) *PlasmaFinalizer { backend PlasmaBackend) *PlasmaFinalizer {
inner := NewFinalizer(ctx, log, cfg, l1Fetcher, emitter) inner := NewFinalizer(ctx, log, cfg, l1Fetcher, emitter)
...@@ -45,7 +46,7 @@ func NewPlasmaFinalizer(ctx context.Context, log log.Logger, cfg *rollup.Config, ...@@ -45,7 +46,7 @@ func NewPlasmaFinalizer(ctx context.Context, log log.Logger, cfg *rollup.Config,
} }
} }
func (fi *PlasmaFinalizer) OnEvent(ev rollup.Event) { func (fi *PlasmaFinalizer) OnEvent(ev event.Event) {
switch x := ev.(type) { switch x := ev.(type) {
case FinalizeL1Event: case FinalizeL1Event:
fi.backend.Finalize(x.FinalizedL1) fi.backend.Finalize(x.FinalizedL1)
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
plasma "github.com/ethereum-optimism/optimism/op-plasma" plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
...@@ -134,7 +135,7 @@ func TestPlasmaFinalityData(t *testing.T) { ...@@ -134,7 +135,7 @@ func TestPlasmaFinalityData(t *testing.T) {
emitter.AssertExpectations(t) emitter.AssertExpectations(t)
} }
// might trigger finalization attempt, if expired finality delay // might trigger finalization attempt, if expired finality delay
emitter.ExpectMaybeRun(func(ev rollup.Event) { emitter.ExpectMaybeRun(func(ev event.Event) {
require.IsType(t, TryFinalizeEvent{}, ev) require.IsType(t, TryFinalizeEvent{}, ev)
}) })
fi.OnEvent(derive.DeriverIdleEvent{}) fi.OnEvent(derive.DeriverIdleEvent{})
...@@ -166,7 +167,7 @@ func TestPlasmaFinalityData(t *testing.T) { ...@@ -166,7 +167,7 @@ func TestPlasmaFinalityData(t *testing.T) {
l1F.ExpectL1BlockRefByNumber(commitmentInclusionFinalized.Number, commitmentInclusionFinalized, nil) l1F.ExpectL1BlockRefByNumber(commitmentInclusionFinalized.Number, commitmentInclusionFinalized, nil)
l1F.ExpectL1BlockRefByNumber(commitmentInclusionFinalized.Number, commitmentInclusionFinalized, nil) l1F.ExpectL1BlockRefByNumber(commitmentInclusionFinalized.Number, commitmentInclusionFinalized, nil)
var finalizedL2 eth.L2BlockRef var finalizedL2 eth.L2BlockRef
emitter.ExpectOnceRun(func(ev rollup.Event) { emitter.ExpectOnceRun(func(ev event.Event) {
if x, ok := ev.(engine.PromoteFinalizedEvent); ok { if x, ok := ev.(engine.PromoteFinalizedEvent); ok {
finalizedL2 = x.Ref finalizedL2 = x.Ref
} else { } else {
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/finality" "github.com/ethereum-optimism/optimism/op-node/rollup/finality"
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
) )
...@@ -56,7 +57,7 @@ func NewStatusTracker(log log.Logger, metrics Metrics) *StatusTracker { ...@@ -56,7 +57,7 @@ func NewStatusTracker(log log.Logger, metrics Metrics) *StatusTracker {
return st return st
} }
func (st *StatusTracker) OnEvent(ev rollup.Event) { func (st *StatusTracker) OnEvent(ev event.Event) {
st.mu.Lock() st.mu.Lock()
defer st.mu.Unlock() defer st.mu.Unlock()
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/rollup/sync"
plasma "github.com/ethereum-optimism/optimism/op-plasma" plasma "github.com/ethereum-optimism/optimism/op-plasma"
) )
...@@ -22,10 +23,10 @@ type EndCondition interface { ...@@ -22,10 +23,10 @@ type EndCondition interface {
type Driver struct { type Driver struct {
logger log.Logger logger log.Logger
events []rollup.Event events []event.Event
end EndCondition end EndCondition
deriver rollup.Deriver deriver event.Deriver
} }
func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher,
...@@ -51,7 +52,7 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, ...@@ -51,7 +52,7 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher,
targetBlockNum: targetBlockNum, targetBlockNum: targetBlockNum,
} }
d.deriver = &rollup.SynchronousDerivers{ d.deriver = &event.DeriverMux{
prog, prog,
engineDeriv, engineDeriv,
pipelineDeriver, pipelineDeriver,
...@@ -62,7 +63,7 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, ...@@ -62,7 +63,7 @@ func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher,
return d return d
} }
func (d *Driver) Emit(ev rollup.Event) { func (d *Driver) Emit(ev event.Event) {
if d.end.Closing() { if d.end.Closing() {
return return
} }
......
...@@ -8,7 +8,7 @@ import ( ...@@ -8,7 +8,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
) )
...@@ -26,21 +26,21 @@ func (d *fakeEnd) Result() error { ...@@ -26,21 +26,21 @@ func (d *fakeEnd) Result() error {
} }
func TestDriver(t *testing.T) { func TestDriver(t *testing.T) {
newTestDriver := func(t *testing.T, onEvent func(d *Driver, end *fakeEnd, ev rollup.Event)) *Driver { newTestDriver := func(t *testing.T, onEvent func(d *Driver, end *fakeEnd, ev event.Event)) *Driver {
logger := testlog.Logger(t, log.LevelInfo) logger := testlog.Logger(t, log.LevelInfo)
end := &fakeEnd{} end := &fakeEnd{}
d := &Driver{ d := &Driver{
logger: logger, logger: logger,
end: end, end: end,
} }
d.deriver = rollup.DeriverFunc(func(ev rollup.Event) { d.deriver = event.DeriverFunc(func(ev event.Event) {
onEvent(d, end, ev) onEvent(d, end, ev)
}) })
return d return d
} }
t.Run("insta complete", func(t *testing.T) { t.Run("insta complete", func(t *testing.T) {
d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev rollup.Event) { d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev event.Event) {
end.closing = true end.closing = true
}) })
require.NoError(t, d.RunComplete()) require.NoError(t, d.RunComplete())
...@@ -48,7 +48,7 @@ func TestDriver(t *testing.T) { ...@@ -48,7 +48,7 @@ func TestDriver(t *testing.T) {
t.Run("insta error", func(t *testing.T) { t.Run("insta error", func(t *testing.T) {
mockErr := errors.New("mock error") mockErr := errors.New("mock error")
d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev rollup.Event) { d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev event.Event) {
end.closing = true end.closing = true
end.result = mockErr end.result = mockErr
}) })
...@@ -57,7 +57,7 @@ func TestDriver(t *testing.T) { ...@@ -57,7 +57,7 @@ func TestDriver(t *testing.T) {
t.Run("success after a few events", func(t *testing.T) { t.Run("success after a few events", func(t *testing.T) {
count := 0 count := 0
d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev rollup.Event) { d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev event.Event) {
if count > 3 { if count > 3 {
end.closing = true end.closing = true
return return
...@@ -71,7 +71,7 @@ func TestDriver(t *testing.T) { ...@@ -71,7 +71,7 @@ func TestDriver(t *testing.T) {
t.Run("error after a few events", func(t *testing.T) { t.Run("error after a few events", func(t *testing.T) {
count := 0 count := 0
mockErr := errors.New("mock error") mockErr := errors.New("mock error")
d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev rollup.Event) { d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev event.Event) {
if count > 3 { if count > 3 {
end.closing = true end.closing = true
end.result = mockErr end.result = mockErr
...@@ -85,7 +85,7 @@ func TestDriver(t *testing.T) { ...@@ -85,7 +85,7 @@ func TestDriver(t *testing.T) {
t.Run("exhaust events", func(t *testing.T) { t.Run("exhaust events", func(t *testing.T) {
count := 0 count := 0
d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev rollup.Event) { d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev event.Event) {
if count < 3 { // stop generating events after a while, without changing end condition if count < 3 { // stop generating events after a while, without changing end condition
d.Emit(TestEvent{}) d.Emit(TestEvent{})
} }
...@@ -96,7 +96,7 @@ func TestDriver(t *testing.T) { ...@@ -96,7 +96,7 @@ func TestDriver(t *testing.T) {
t.Run("queued events", func(t *testing.T) { t.Run("queued events", func(t *testing.T) {
count := 0 count := 0
d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev rollup.Event) { d := newTestDriver(t, func(d *Driver, end *fakeEnd, ev event.Event) {
if count < 3 { if count < 3 {
d.Emit(TestEvent{}) d.Emit(TestEvent{})
d.Emit(TestEvent{}) d.Emit(TestEvent{})
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/engine" "github.com/ethereum-optimism/optimism/op-node/rollup/engine"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
) )
// ProgramDeriver expresses how engine and derivation events are // ProgramDeriver expresses how engine and derivation events are
...@@ -17,7 +18,7 @@ import ( ...@@ -17,7 +18,7 @@ import (
type ProgramDeriver struct { type ProgramDeriver struct {
logger log.Logger logger log.Logger
Emitter rollup.EventEmitter Emitter event.Emitter
closing bool closing bool
result error result error
...@@ -32,7 +33,7 @@ func (d *ProgramDeriver) Result() error { ...@@ -32,7 +33,7 @@ func (d *ProgramDeriver) Result() error {
return d.result return d.result
} }
func (d *ProgramDeriver) OnEvent(ev rollup.Event) { func (d *ProgramDeriver) OnEvent(ev event.Event) {
switch x := ev.(type) { switch x := ev.(type) {
case engine.EngineResetConfirmedEvent: case engine.EngineResetConfirmedEvent:
d.Emitter.Emit(derive.ConfirmPipelineResetEvent{}) d.Emitter.Emit(derive.ConfirmPipelineResetEvent{})
......
package testutils package testutils
import ( import (
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/ethereum-optimism/optimism/op-node/rollup"
) )
type MockEmitter struct { type MockEmitter struct {
mock.Mock mock.Mock
} }
func (m *MockEmitter) Emit(ev rollup.Event) { func (m *MockEmitter) Emit(ev event.Event) {
m.Mock.MethodCalled("Emit", ev) m.Mock.MethodCalled("Emit", ev)
} }
func (m *MockEmitter) ExpectOnce(expected rollup.Event) { func (m *MockEmitter) ExpectOnce(expected event.Event) {
m.Mock.On("Emit", expected).Once() m.Mock.On("Emit", expected).Once()
} }
func (m *MockEmitter) ExpectMaybeRun(fn func(ev rollup.Event)) { func (m *MockEmitter) ExpectMaybeRun(fn func(ev event.Event)) {
m.Mock.On("Emit", mock.Anything).Maybe().Run(func(args mock.Arguments) { m.Mock.On("Emit", mock.Anything).Maybe().Run(func(args mock.Arguments) {
fn(args.Get(0).(rollup.Event)) fn(args.Get(0).(event.Event))
}) })
} }
...@@ -28,9 +27,9 @@ func (m *MockEmitter) ExpectOnceType(typ string) { ...@@ -28,9 +27,9 @@ func (m *MockEmitter) ExpectOnceType(typ string) {
m.Mock.On("Emit", mock.AnythingOfType(typ)).Once() m.Mock.On("Emit", mock.AnythingOfType(typ)).Once()
} }
func (m *MockEmitter) ExpectOnceRun(fn func(ev rollup.Event)) { func (m *MockEmitter) ExpectOnceRun(fn func(ev event.Event)) {
m.Mock.On("Emit", mock.Anything).Once().Run(func(args mock.Arguments) { m.Mock.On("Emit", mock.Anything).Once().Run(func(args mock.Arguments) {
fn(args.Get(0).(rollup.Event)) fn(args.Get(0).(event.Event))
}) })
} }
...@@ -38,4 +37,4 @@ func (m *MockEmitter) AssertExpectations(t mock.TestingT) { ...@@ -38,4 +37,4 @@ func (m *MockEmitter) AssertExpectations(t mock.TestingT) {
m.Mock.AssertExpectations(t) m.Mock.AssertExpectations(t)
} }
var _ rollup.EventEmitter = (*MockEmitter)(nil) var _ event.Emitter = (*MockEmitter)(nil)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment