Commit 21375b4a authored by protolambda's avatar protolambda Committed by GitHub

op-node: handle crit error events, lift event-system out of driver (#11932)

parent 8dd6fb36
......@@ -8,13 +8,11 @@ import (
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup/sequencing"
"github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/event"
gethevent "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
......@@ -24,6 +22,8 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-node/rollup/sequencing"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
......@@ -52,6 +52,9 @@ type OpNode struct {
l1SafeSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)
l1FinalizedSub ethereum.Subscription // Subscription to get L1 safe blocks, a.k.a. justified data (polling)
eventSys event.System
eventDrain event.Drainer
l1Source *sources.L1Client // L1 Client to fetch data from
l2Driver *driver.Driver // L2 Engine to Sync
l2Source *sources.EngineClient // L2 Execution Engine RPC bindings
......@@ -126,6 +129,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config) error {
if err := n.initTracer(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the trace: %w", err)
}
n.initEventSystem()
if err := n.initL1(ctx, cfg); err != nil {
return fmt.Errorf("failed to init L1: %w", err)
}
......@@ -159,6 +163,16 @@ func (n *OpNode) init(ctx context.Context, cfg *Config) error {
return nil
}
func (n *OpNode) initEventSystem() {
// This executor will be configurable in the future, for parallel event processing
executor := event.NewGlobalSynchronous(n.resourcesCtx)
sys := event.NewSystem(n.log, executor)
sys.AddTracer(event.NewMetricsTracer(n.metrics))
sys.Register("node", event.DeriverFunc(n.onEvent), event.DefaultRegisterOpts())
n.eventSys = sys
n.eventDrain = executor
}
func (n *OpNode) initTracer(ctx context.Context, cfg *Config) error {
if cfg.Tracer != nil {
n.tracer = cfg.Tracer
......@@ -185,7 +199,7 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
}
// Keep subscribed to the L1 heads, which keeps the L1 maintainer pointing to the best headers to sync
n.l1HeadsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) {
n.l1HeadsSub = gethevent.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (gethevent.Subscription, error) {
if err != nil {
n.log.Warn("resubscribing after failed L1 subscription", "err", err)
}
......@@ -410,7 +424,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config) error {
} else {
n.safeDB = safedb.Disabled
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source,
n.l2Driver = driver.NewDriver(n.eventSys, n.eventDrain, &cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source,
n.supervisor, n.beacon, n, n, n.log, n.metrics, cfg.ConfigPersistence, n.safeDB, &cfg.Sync, sequencerConductor, altDA)
return nil
}
......@@ -509,6 +523,20 @@ func (n *OpNode) Start(ctx context.Context) error {
return nil
}
// onEvent handles broadcast events.
// The OpNode itself is a deriver to catch system-critical events.
// Other event-handling should be encapsulated into standalone derivers.
func (n *OpNode) onEvent(ev event.Event) bool {
switch x := ev.(type) {
case rollup.CriticalErrorEvent:
n.log.Error("Critical error", "err", x.Err)
n.cancel(fmt.Errorf("critical error: %w", x.Err))
return true
default:
return false
}
}
func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) {
n.tracer.OnNewL1Head(ctx, sig)
......@@ -679,6 +707,10 @@ func (n *OpNode) Stop(ctx context.Context) error {
}
}
if n.eventSys != nil {
n.eventSys.Stop()
}
if n.safeDB != nil {
if err := n.safeDB.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close safe head db: %w", err))
......
......@@ -150,8 +150,14 @@ type SequencerStateListener interface {
SequencerStopped() error
}
type Drain interface {
Drain() error
}
// NewDriver composes an events handler that tracks L1 state, triggers L2 Derivation, and optionally sequences new L2 blocks.
func NewDriver(
sys event.Registry,
drain Drain,
driverCfg *Config,
cfg *rollup.Config,
l2 L2Chain,
......@@ -170,17 +176,6 @@ func NewDriver(
) *Driver {
driverCtx, driverCancel := context.WithCancel(context.Background())
var executor event.Executor
var drain func() error
// This instantiation will be one of more options: soon there will be a parallel events executor
{
s := event.NewGlobalSynchronous(driverCtx)
executor = s
drain = s.Drain
}
sys := event.NewSystem(log, executor)
sys.AddTracer(event.NewMetricsTracer(metrics))
opts := event.DefaultRegisterOpts()
// If interop is scheduled we start the driver.
......@@ -236,7 +231,7 @@ func NewDriver(
L2: l2,
Log: log,
Ctx: driverCtx,
Drain: drain,
Drain: drain.Drain,
}
sys.Register("sync", syncDeriver, opts)
......@@ -260,12 +255,11 @@ func NewDriver(
driverEmitter := sys.Register("driver", nil, opts)
driver := &Driver{
eventSys: sys,
statusTracker: statusTracker,
SyncDeriver: syncDeriver,
sched: schedDeriv,
emitter: driverEmitter,
drain: drain,
drain: drain.Drain,
stateReq: make(chan chan struct{}),
forceReset: make(chan chan struct{}, 10),
driverConfig: driverCfg,
......
......@@ -26,8 +26,6 @@ import (
type SyncStatus = eth.SyncStatus
type Driver struct {
eventSys event.System
statusTracker SyncStatusTracker
*SyncDeriver
......@@ -100,7 +98,6 @@ func (s *Driver) Start() error {
func (s *Driver) Close() error {
s.driverCancel()
s.wg.Wait()
s.eventSys.Stop()
s.sequencer.Close()
return nil
}
......@@ -282,27 +279,6 @@ func (s *Driver) eventLoop() {
}
}
// OnEvent handles broadcasted events.
// The Driver itself is a deriver to catch system-critical events.
// Other event-handling should be encapsulated into standalone derivers.
func (s *Driver) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case rollup.CriticalErrorEvent:
s.Log.Error("Derivation process critical error", "err", x.Err)
// we need to unblock event-processing to be able to close
go func() {
logger := s.Log
err := s.Close()
if err != nil {
logger.Error("Failed to shutdown driver on critical error", "err", err)
}
}()
return true
default:
return false
}
}
type SyncDeriver struct {
// The derivation pipeline is reset whenever we reorg.
// The derivation pipeline determines the new l2Safe.
......
......@@ -37,12 +37,5 @@ func (ev ResetEvent) String() string {
return "reset-event"
}
type CriticalErrorEvent struct {
Err error
}
var _ event.Event = CriticalErrorEvent{}
func (ev CriticalErrorEvent) String() string {
return "critical-error"
}
// CriticalErrorEvent is an alias for event.CriticalErrorEvent
type CriticalErrorEvent = event.CriticalErrorEvent
......@@ -75,3 +75,13 @@ func (fn DeriverFunc) OnEvent(ev Event) bool {
type NoopEmitter struct{}
func (e NoopEmitter) Emit(ev Event) {}
type CriticalErrorEvent struct {
Err error
}
var _ Event = CriticalErrorEvent{}
func (ev CriticalErrorEvent) String() string {
return "critical-error"
}
......@@ -11,7 +11,7 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type System interface {
type Registry interface {
// Register registers a named event-emitter, optionally processing events itself:
// deriver may be nil, not all registrants have to process events.
// A non-nil deriver may implement AttachEmitter to automatically attach the Emitter to it,
......@@ -20,6 +20,10 @@ type System interface {
// Unregister removes a named emitter,
// also removing it from the set of events-receiving derivers (if registered with non-nil deriver).
Unregister(name string) (old Emitter)
}
type System interface {
Registry
// AddTracer registers a tracer to capture all event deriver/emitter work. It runs until RemoveTracer is called.
// Duplicate tracers are allowed.
AddTracer(t Tracer)
......@@ -73,6 +77,10 @@ func (r *systemActor) RunEvent(ev AnnotatedEvent) {
if r.ctx.Err() != nil {
return
}
if r.sys.abort.Load() && !Is[CriticalErrorEvent](ev.Event) {
// if aborting, and not the CriticalErrorEvent itself, then do not process the event
return
}
prev := r.currentEvent
start := time.Now()
......@@ -99,6 +107,9 @@ type Sys struct {
tracers []Tracer
tracersLock sync.RWMutex
// if true, no events may be processed, except CriticalError itself
abort atomic.Bool
}
func NewSystem(log log.Logger, ex Executor) *Sys {
......@@ -240,6 +251,12 @@ func (s *Sys) emit(name string, derivContext uint64, ev Event) {
emitContext := s.emitContext.Add(1)
annotated := AnnotatedEvent{Event: ev, EmitContext: emitContext}
// As soon as anything emits a critical event,
// make the system aware, before the executor event schedules it for processing.
if Is[CriticalErrorEvent](ev) {
s.abort.Store(true)
}
emitTime := time.Now()
s.recordEmit(name, annotated, derivContext, emitTime)
......
......@@ -2,6 +2,7 @@ package event
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/require"
......@@ -104,3 +105,46 @@ func TestSystemBroadcast(t *testing.T) {
require.Equal(t, 3, fooCount)
require.Equal(t, 3, barCount)
}
func TestCriticalError(t *testing.T) {
logger := testlog.Logger(t, log.LevelError)
count := 0
seenCrit := 0
deriverFn := DeriverFunc(func(ev Event) bool {
switch ev.(type) {
case CriticalErrorEvent:
seenCrit += 1
default:
count += 1
}
return true
})
exec := NewGlobalSynchronous(context.Background())
sys := NewSystem(logger, exec)
emitterA := sys.Register("a", deriverFn, DefaultRegisterOpts())
emitterB := sys.Register("b", deriverFn, DefaultRegisterOpts())
require.NoError(t, exec.Drain(), "can drain, even if empty")
emitterA.Emit(TestEvent{})
require.Equal(t, 0, count, "no processing yet, queued event")
require.NoError(t, exec.Drain())
require.Equal(t, 2, count, "both A and B processed the event")
emitterA.Emit(TestEvent{})
emitterB.Emit(TestEvent{})
testErr := errors.New("test crit error")
emitterB.Emit(CriticalErrorEvent{Err: testErr})
require.Equal(t, 2, count, "no processing yet, queued events")
require.Equal(t, 0, seenCrit, "critical error events are still scheduled like normal")
require.True(t, sys.abort.Load(), "we are aware of the crit")
require.NoError(t, exec.Drain())
require.Equal(t, 2, count, "still no processing, since we hit a crit error, the events are ignored")
require.Equal(t, 2, seenCrit, "but everyone has seen the crit now")
// We are able to stop the processing now
sys.Stop()
emitterA.Emit(TestEvent{})
require.NoError(t, exec.Drain(), "system is closed, no further event processing")
require.Equal(t, 2, count)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment