Commit f7ce7829 authored by protolambda's avatar protolambda Committed by GitHub

op-supervisor: use event system like op-node, for easy synchronization (#13752)

* op-supervisor: use event system like op-node, for easy synchronization

* op-supervisor,op-node: simplify local-safe derivation updates

* op-supervisor: fixes, report back problem if local-safe update fails

* op-e2e: fix fp interop action test

* op-service: bigger timeout on eventual-case in poll test
parent f5fc73db
......@@ -41,10 +41,6 @@ var interopJWTSecret = [32]byte{4}
type InteropControl interface {
PullEvents(ctx context.Context) (pulledAny bool, err error)
AwaitSentCrossUnsafeUpdate(ctx context.Context, minNum uint64) error
AwaitSentCrossSafeUpdate(ctx context.Context, minNum uint64) error
AwaitSentFinalizedUpdate(ctx context.Context, minNum uint64) error
}
// L2Verifier is an actor that functions like a rollup node,
......@@ -448,21 +444,6 @@ func (s *L2Verifier) ActL2InsertUnsafePayload(payload *eth.ExecutionPayloadEnvel
}
}
func (s *L2Verifier) AwaitSentCrossUnsafeUpdate(t Testing, minNum uint64) {
require.NotNil(t, s.InteropControl, "must be managed by op-supervisor")
require.NoError(t, s.InteropControl.AwaitSentCrossUnsafeUpdate(t.Ctx(), minNum))
}
func (s *L2Verifier) AwaitSentCrossSafeUpdate(t Testing, minNum uint64) {
require.NotNil(t, s.InteropControl, "must be managed by op-supervisor")
require.NoError(t, s.InteropControl.AwaitSentCrossSafeUpdate(t.Ctx(), minNum))
}
func (s *L2Verifier) AwaitSentFinalizedUpdate(t Testing, minNum uint64) {
require.NotNil(t, s.InteropControl, "must be managed by op-supervisor")
require.NoError(t, s.InteropControl.AwaitSentFinalizedUpdate(t.Ctx(), minNum))
}
func (s *L2Verifier) SyncSupervisor(t Testing) {
require.NotNil(t, s.InteropControl, "must be managed by op-supervisor")
_, err := s.InteropControl.PullEvents(t.Ctx())
......
......@@ -17,7 +17,7 @@ import (
"github.com/ethereum-optimism/optimism/op-e2e/actions/helpers"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
......@@ -128,32 +128,22 @@ func (is *InteropSetup) CreateActors() *InteropActors {
// SupervisorActor represents a supervisor, instrumented to run synchronously for action-test purposes.
type SupervisorActor struct {
exec *event.GlobalSyncExec
backend *backend.SupervisorBackend
frontend.QueryFrontend
frontend.AdminFrontend
}
func (sa *SupervisorActor) SyncEvents(t helpers.Testing, chainID types.ChainID) {
require.NoError(t, sa.backend.SyncEvents(chainID))
func (sa *SupervisorActor) ProcessFull(t helpers.Testing) {
require.NoError(t, sa.exec.Drain(), "process all supervisor events")
}
func (sa *SupervisorActor) SyncCrossUnsafe(t helpers.Testing, chainID types.ChainID) {
err := sa.backend.SyncCrossUnsafe(chainID)
if err != nil {
require.ErrorIs(t, err, types.ErrFuture)
}
}
func (sa *SupervisorActor) SyncCrossSafe(t helpers.Testing, chainID types.ChainID) {
err := sa.backend.SyncCrossSafe(chainID)
if err != nil {
require.ErrorIs(t, err, types.ErrFuture)
}
func (sa *SupervisorActor) SignalLatestL1(t helpers.Testing) {
require.NoError(t, sa.backend.PullLatestL1())
}
func (sa *SupervisorActor) SyncFinalizedL1(t helpers.Testing, ref eth.BlockRef) {
sa.backend.SyncFinalizedL1(ref)
require.Equal(t, ref, sa.backend.FinalizedL1())
func (sa *SupervisorActor) SignalFinalizedL1(t helpers.Testing) {
require.NoError(t, sa.backend.PullFinalizedL1())
}
// worldToDepSet converts a set of chain configs into a dependency-set for the supervisor.
......@@ -182,10 +172,12 @@ func NewSupervisor(t helpers.Testing, logger log.Logger, depSet depset.Dependenc
Datadir: supervisorDataDir,
SyncSources: &syncnode.CLISyncNodes{}, // sources are added dynamically afterwards
}
b, err := backend.NewSupervisorBackend(t.Ctx(),
logger.New("role", "supervisor"), metrics.NoopMetrics, svCfg)
evExec := event.NewGlobalSynchronous(t.Ctx())
b, err := backend.NewSupervisorBackend(t.Ctx(), logger, metrics.NoopMetrics, svCfg, evExec)
require.NoError(t, err)
b.SetConfDepthL1(0)
return &SupervisorActor{
exec: evExec,
backend: b,
QueryFrontend: frontend.QueryFrontend{
Supervisor: b,
......
......@@ -37,15 +37,8 @@ func TestFullInterop(gt *testing.T) {
status := actors.ChainA.Sequencer.SyncStatus()
require.Equal(t, uint64(0), status.UnsafeL2.Number)
// sync chain A
actors.Supervisor.SyncEvents(t, actors.ChainA.ChainID)
actors.Supervisor.SyncCrossUnsafe(t, actors.ChainA.ChainID)
actors.Supervisor.SyncCrossSafe(t, actors.ChainA.ChainID)
// sync chain B
actors.Supervisor.SyncEvents(t, actors.ChainB.ChainID)
actors.Supervisor.SyncCrossUnsafe(t, actors.ChainB.ChainID)
actors.Supervisor.SyncCrossSafe(t, actors.ChainB.ChainID)
// sync initial chain A and B
actors.Supervisor.ProcessFull(t)
// Build L2 block on chain A
actors.ChainA.Sequencer.ActL2StartBlock(t)
......@@ -62,9 +55,7 @@ func TestFullInterop(gt *testing.T) {
actors.ChainA.Sequencer.SyncSupervisor(t)
// Verify as cross-unsafe with supervisor
actors.Supervisor.SyncEvents(t, actors.ChainA.ChainID)
actors.Supervisor.SyncCrossUnsafe(t, actors.ChainA.ChainID)
actors.ChainA.Sequencer.AwaitSentCrossUnsafeUpdate(t, 1)
actors.Supervisor.ProcessFull(t)
actors.ChainA.Sequencer.ActL2PipelineFull(t)
status = actors.ChainA.Sequencer.SyncStatus()
require.Equal(t, head, status.UnsafeL2.ID())
......@@ -83,6 +74,7 @@ func TestFullInterop(gt *testing.T) {
// it needs the supervisor to see the L1 block first,
// and provide it to the node.
actors.ChainA.Sequencer.ActL2EventsUntil(t, event.Is[derive.ExhaustedL1Event], 100, false)
actors.Supervisor.SignalLatestL1(t) // supervisor will be aware of latest L1
actors.ChainA.Sequencer.SyncSupervisor(t) // supervisor to react to exhaust-L1
actors.ChainA.Sequencer.ActL2PipelineFull(t) // node to complete syncing to L1 head.
......@@ -97,12 +89,14 @@ func TestFullInterop(gt *testing.T) {
n := actors.ChainA.SequencerEngine.L2Chain().CurrentSafeBlock().Number.Uint64()
require.Equal(t, uint64(0), n)
// Make the supervisor aware of the new L1 block
actors.Supervisor.SignalLatestL1(t)
// Ingest the new local-safe event
actors.ChainA.Sequencer.SyncSupervisor(t)
// Cross-safe verify it
actors.Supervisor.SyncCrossSafe(t, actors.ChainA.ChainID)
actors.ChainA.Sequencer.AwaitSentCrossSafeUpdate(t, 1)
actors.Supervisor.ProcessFull(t)
actors.ChainA.Sequencer.ActL2PipelineFull(t)
status = actors.ChainA.Sequencer.SyncStatus()
require.Equal(t, head, status.UnsafeL2.ID())
......@@ -119,8 +113,8 @@ func TestFullInterop(gt *testing.T) {
actors.L1Miner.ActL1FinalizeNext(t)
actors.ChainA.Sequencer.ActL1SafeSignal(t) // TODO old source of finality
actors.ChainA.Sequencer.ActL1FinalizedSignal(t)
actors.Supervisor.SyncFinalizedL1(t, status.HeadL1)
actors.ChainA.Sequencer.AwaitSentFinalizedUpdate(t, 1)
actors.Supervisor.SignalFinalizedL1(t)
actors.Supervisor.ProcessFull(t)
actors.ChainA.Sequencer.ActL2PipelineFull(t)
finalizedL2BlockID, err := actors.Supervisor.Finalized(t.Ctx(), actors.ChainA.ChainID)
require.NoError(t, err)
......@@ -155,15 +149,8 @@ func TestInteropFaultProofs(gt *testing.T) {
status := actors.ChainA.Sequencer.SyncStatus()
require.Equal(t, uint64(0), status.UnsafeL2.Number)
// sync chain A
actors.Supervisor.SyncEvents(t, actors.ChainA.ChainID)
actors.Supervisor.SyncCrossUnsafe(t, actors.ChainA.ChainID)
actors.Supervisor.SyncCrossSafe(t, actors.ChainA.ChainID)
// sync chain B
actors.Supervisor.SyncEvents(t, actors.ChainB.ChainID)
actors.Supervisor.SyncCrossUnsafe(t, actors.ChainB.ChainID)
actors.Supervisor.SyncCrossSafe(t, actors.ChainB.ChainID)
// sync chain A and B
actors.Supervisor.ProcessFull(t)
// Build L2 block on chain A
actors.ChainA.Sequencer.ActL2StartBlock(t)
......@@ -180,16 +167,11 @@ func TestInteropFaultProofs(gt *testing.T) {
actors.ChainB.Sequencer.SyncSupervisor(t)
// Verify as cross-unsafe with supervisor
actors.Supervisor.SyncEvents(t, actors.ChainA.ChainID)
actors.Supervisor.SyncEvents(t, actors.ChainB.ChainID)
actors.Supervisor.SyncCrossUnsafe(t, actors.ChainA.ChainID)
actors.Supervisor.SyncCrossUnsafe(t, actors.ChainB.ChainID)
actors.ChainA.Sequencer.AwaitSentCrossUnsafeUpdate(t, 1)
actors.Supervisor.ProcessFull(t)
actors.ChainA.Sequencer.ActL2PipelineFull(t)
status = actors.ChainA.Sequencer.SyncStatus()
require.Equal(gt, uint64(1), status.UnsafeL2.Number)
require.Equal(gt, uint64(1), status.CrossUnsafeL2.Number)
actors.ChainB.Sequencer.AwaitSentCrossUnsafeUpdate(t, 1)
actors.ChainB.Sequencer.ActL2PipelineFull(t)
status = actors.ChainB.Sequencer.SyncStatus()
require.Equal(gt, uint64(1), status.UnsafeL2.Number)
......@@ -202,6 +184,7 @@ func TestInteropFaultProofs(gt *testing.T) {
actors.L1Miner.ActL1IncludeTx(actors.ChainA.BatcherAddr)(t)
actors.L1Miner.ActL1IncludeTx(actors.ChainB.BatcherAddr)(t)
actors.L1Miner.ActL1EndBlock(t)
actors.Supervisor.SignalLatestL1(t)
// The node will exhaust L1 data,
// it needs the supervisor to see the L1 block first, and provide it to the node.
actors.ChainA.Sequencer.ActL2EventsUntil(t, event.Is[derive.ExhaustedL1Event], 100, false)
......@@ -223,13 +206,10 @@ func TestInteropFaultProofs(gt *testing.T) {
actors.ChainB.Sequencer.SyncSupervisor(t)
// Cross-safe verify it
actors.Supervisor.SyncCrossSafe(t, actors.ChainA.ChainID)
actors.Supervisor.SyncCrossSafe(t, actors.ChainB.ChainID)
actors.ChainA.Sequencer.AwaitSentCrossSafeUpdate(t, 1)
actors.Supervisor.ProcessFull(t)
actors.ChainA.Sequencer.ActL2PipelineFull(t)
status = actors.ChainA.Sequencer.SyncStatus()
require.Equal(gt, uint64(1), status.SafeL2.Number)
actors.ChainB.Sequencer.AwaitSentCrossSafeUpdate(t, 1)
actors.ChainB.Sequencer.ActL2PipelineFull(t)
status = actors.ChainB.Sequencer.SyncStatus()
require.Equal(gt, uint64(1), status.SafeL2.Number)
......
......@@ -115,16 +115,19 @@ func (m *ManagedMode) OnEvent(ev event.Event) bool {
ref := x.Ref.BlockRef()
m.events.Send(&supervisortypes.ManagedEvent{UnsafeBlock: &ref})
case engine.LocalSafeUpdateEvent:
m.log.Info("Emitting local safe update because of L2 block", "derivedFrom", x.DerivedFrom, "derived", x.Ref)
m.events.Send(&supervisortypes.ManagedEvent{DerivationUpdate: &supervisortypes.DerivedBlockRefPair{
DerivedFrom: x.DerivedFrom,
Derived: x.Ref.BlockRef(),
}})
case derive.DeriverL1StatusEvent:
m.log.Info("Emitting local safe update because of L1 traversal", "derivedFrom", x.Origin, "derived", x.LastL2)
m.events.Send(&supervisortypes.ManagedEvent{DerivationUpdate: &supervisortypes.DerivedBlockRefPair{
DerivedFrom: x.Origin,
Derived: x.LastL2.BlockRef(),
}})
case derive.ExhaustedL1Event:
m.log.Info("Exhausted L1 data", "derivedFrom", x.L1Ref, "derived", x.LastL2)
m.events.Send(&supervisortypes.ManagedEvent{ExhaustL1: &supervisortypes.DerivedBlockRefPair{
DerivedFrom: x.L1Ref,
Derived: x.LastL2.BlockRef(),
......
......@@ -12,6 +12,20 @@ type RWMap[K comparable, V any] struct {
mu sync.RWMutex
}
// Default creates a value at the given key, if the key is not set yet.
func (m *RWMap[K, V]) Default(key K, fn func() V) (changed bool) {
m.mu.Lock()
defer m.mu.Unlock()
if m.inner == nil {
m.inner = make(map[K]V)
}
_, ok := m.inner[key]
if !ok {
m.inner[key] = fn()
}
return !ok // if it exists, nothing changed
}
func (m *RWMap[K, V]) Has(key K) (ok bool) {
m.mu.RLock()
defer m.mu.RUnlock()
......
......@@ -60,4 +60,27 @@ func TestRWMap(t *testing.T) {
// remove a non-existent value
m.Delete(132983213)
m.Set(10001, 100)
m.Default(10001, func() int64 {
t.Fatal("should not replace existing value")
return 0
})
m.Default(10002, func() int64 {
return 42
})
v, ok = m.Get(10002)
require.True(t, ok)
require.Equal(t, int64(42), v)
}
func TestRWMap_DefaultOnEmpty(t *testing.T) {
m := &RWMap[uint64, int64]{}
// this should work, even if the first call to the map.
m.Default(10002, func() int64 {
return 42
})
v, ok := m.Get(10002)
require.True(t, ok)
require.Equal(t, int64(42), v)
}
package tasks
import (
"context"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
)
// Poller runs a function on repeat at a set interval.
// Warning: ticks can be missed, if the function execution is slow.
type Poller struct {
fn func()
clock clock.Clock
interval time.Duration
ticker clock.Ticker // nil if not running
mu sync.Mutex
ctx context.Context // non-nil when running
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewPoller(fn func(), clock clock.Clock, interval time.Duration) *Poller {
return &Poller{
fn: fn,
clock: clock,
interval: interval,
}
}
// Start starts polling in a background routine.
// Duplicate start calls are ignored. Only one routine runs.
func (pd *Poller) Start() {
pd.mu.Lock()
defer pd.mu.Unlock()
if pd.ctx != nil {
return // already running
}
pd.ctx, pd.cancel = context.WithCancel(context.Background())
pd.ticker = pd.clock.NewTicker(pd.interval)
pd.wg.Add(1)
go func() {
defer pd.wg.Done()
defer pd.ticker.Stop()
for {
select {
case <-pd.ticker.Ch():
pd.fn()
case <-pd.ctx.Done():
return // quiting
}
}
}()
}
// Stop stops the polling. Duplicate calls are ignored.
// Only if active the polling routine is stopped.
func (pd *Poller) Stop() {
pd.mu.Lock()
defer pd.mu.Unlock()
if pd.ctx == nil {
return // not running, nothing to stop
}
pd.cancel()
pd.wg.Wait()
pd.ctx = nil
pd.cancel = nil
pd.ticker = nil
}
// SetInterval changes the polling interval.
func (pd *Poller) SetInterval(interval time.Duration) {
pd.mu.Lock()
defer pd.mu.Unlock()
pd.interval = interval
// if we're currently running, change the interval of the active ticker
if pd.ticker != nil {
pd.ticker.Reset(interval)
}
}
package tasks
import (
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-service/clock"
)
const eventualTimeout = 10 * time.Second
func TestPoller(t *testing.T) {
cl := clock.NewDeterministicClock(time.Now())
counter := new(atomic.Int64)
poller := NewPoller(func() {
counter.Add(1)
}, cl, time.Second*5)
poller.Start()
cl.AdvanceTime(time.Second * 6) // hit the first tick
require.Eventually(t, func() bool {
t.Log("counter", counter.Load())
return counter.Load() == 1
}, eventualTimeout, time.Millisecond*100)
cl.AdvanceTime(time.Second * 3) // no hit yet, 9 seconds have passsed now
require.Never(t, func() bool {
return counter.Load() == 2
}, time.Second, time.Millisecond*100)
// hit the second tick at 10s
cl.AdvanceTime(time.Second * 2) // 11 seconds have passed now
require.Eventually(t, func() bool {
return counter.Load() == 2
}, eventualTimeout, time.Millisecond*100)
poller.Stop()
// Poller was stopped, this shouldn't affect it
cl.AdvanceTime(time.Second * 1000)
// We should have stopped counting
require.Never(t, func() bool {
return counter.Load() > 2
}, time.Second, time.Millisecond*100)
// Start back up
poller.Start()
// No previously buffered ticks
require.Never(t, func() bool {
return counter.Load() > 2
}, time.Second, time.Millisecond*100)
// Change the interval, so we poll faster
poller.SetInterval(time.Second * 2)
cl.AdvanceTime(time.Second * 3)
require.Eventually(t, func() bool {
t.Log("counter", counter.Load())
return counter.Load() == 3
}, eventualTimeout, time.Millisecond*100)
poller.Stop()
}
......@@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/locks"
......@@ -22,6 +23,7 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/l1access"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
......@@ -33,6 +35,11 @@ type SupervisorBackend struct {
m Metrics
dataDir string
eventSys event.System
sysContext context.Context
sysCancel context.CancelFunc
// depSet is the dependency set that the backend uses to know about the chains it is indexing
depSet depset.DependencySet
......@@ -44,9 +51,7 @@ type SupervisorBackend struct {
// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
chainProcessors locks.RWMap[types.ChainID, *processors.ChainProcessor]
// crossProcessors are used to index cross-chain dependency validity data once the log events are indexed
crossSafeProcessors locks.RWMap[types.ChainID, *cross.Worker]
crossUnsafeProcessors locks.RWMap[types.ChainID, *cross.Worker]
syncSources locks.RWMap[types.ChainID, syncnode.SyncSource]
// syncNodesController controls the derivation or reset of the sync nodes
......@@ -59,13 +64,17 @@ type SupervisorBackend struct {
// chainMetrics are used to track metrics for each chain
// they are reused for processors and databases of the same chain
chainMetrics locks.RWMap[types.ChainID, *chainMetrics]
emitter event.Emitter
}
var _ event.AttachEmitter = (*SupervisorBackend)(nil)
var _ frontend.Backend = (*SupervisorBackend)(nil)
var errAlreadyStopped = errors.New("already stopped")
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
func NewSupervisorBackend(ctx context.Context, logger log.Logger,
m Metrics, cfg *config.Config, eventExec event.Executor) (*SupervisorBackend, error) {
// attempt to prepare the data directory
if err := db.PrepDataDir(cfg.Datadir); err != nil {
return nil, err
......@@ -90,14 +99,16 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
}
}
eventSys := event.NewSystem(logger, eventExec)
sysCtx, sysCancel := context.WithCancel(ctx)
// create initial per-chain resources
chainsDBs := db.NewChainsDB(logger, depSet)
eventSys.Register("chainsDBs", chainsDBs, event.DefaultRegisterOpts())
l1Accessor := l1access.NewL1Accessor(
logger,
nil,
processors.MaybeUpdateFinalizedL1Fn(context.Background(), logger, chainsDBs),
)
l1Accessor := l1access.NewL1Accessor(sysCtx, logger, nil)
eventSys.Register("l1Accessor", l1Accessor, event.DefaultRegisterOpts())
// create the supervisor backend
super := &SupervisorBackend{
......@@ -109,10 +120,15 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
l1Accessor: l1Accessor,
// For testing we can avoid running the processors.
synchronousProcessors: cfg.SynchronousProcessors,
eventSys: eventSys,
sysCancel: sysCancel,
sysContext: sysCtx,
}
eventSys.Register("backend", super, event.DefaultRegisterOpts())
// create node controller
super.syncNodesController = syncnode.NewSyncNodesController(logger, depSet, chainsDBs, super)
super.syncNodesController = syncnode.NewSyncNodesController(logger, depSet, eventSys, super)
eventSys.Register("sync-controller", super.syncNodesController, event.DefaultRegisterOpts())
// Initialize the resources of the supervisor backend.
// Stop the supervisor if any of the resources fails to be initialized.
......@@ -124,6 +140,31 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
return super, nil
}
func (su *SupervisorBackend) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case superevents.LocalUnsafeReceivedEvent:
su.emitter.Emit(superevents.ChainProcessEvent{
ChainID: x.ChainID,
Target: x.NewLocalUnsafe.Number,
})
case superevents.LocalUnsafeUpdateEvent:
su.emitter.Emit(superevents.UpdateCrossUnsafeRequestEvent{
ChainID: x.ChainID,
})
case superevents.LocalSafeUpdateEvent:
su.emitter.Emit(superevents.UpdateCrossSafeRequestEvent{
ChainID: x.ChainID,
})
default:
return false
}
return true
}
func (su *SupervisorBackend) AttachEmitter(em event.Emitter) {
su.emitter = em
}
// initResources initializes all the resources, such as DBs and processors for chains.
// An error may returned, without closing the thus-far initialized resources.
// Upon error the caller should call Stop() on the supervisor backend to clean up and release resources.
......@@ -137,21 +178,23 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
}
}
eventOpts := event.DefaultRegisterOpts()
// initialize all cross-unsafe processors
for _, chainID := range chains {
worker := cross.NewCrossUnsafeWorker(su.logger, chainID, su.chainDBs)
su.crossUnsafeProcessors.Set(chainID, worker)
su.eventSys.Register(fmt.Sprintf("cross-unsafe-%s", chainID), worker, eventOpts)
}
// initialize all cross-safe processors
for _, chainID := range chains {
worker := cross.NewCrossSafeWorker(su.logger, chainID, su.chainDBs)
su.crossSafeProcessors.Set(chainID, worker)
su.eventSys.Register(fmt.Sprintf("cross-safe-%s", chainID), worker, eventOpts)
}
// For each chain initialize a chain processor service,
// after cross-unsafe workers are ready to receive updates
for _, chainID := range chains {
logProcessor := processors.NewLogProcessor(chainID, su.chainDBs, su.depSet)
chainProcessor := processors.NewChainProcessor(su.logger, chainID, logProcessor, su.chainDBs, su.onIndexedLocalUnsafeData)
chainProcessor := processors.NewChainProcessor(su.sysContext, su.logger, chainID, logProcessor, su.chainDBs)
su.eventSys.Register(fmt.Sprintf("events-%s", chainID), chainProcessor, eventOpts)
su.chainProcessors.Set(chainID, chainProcessor)
}
// initialize sync sources
......@@ -184,32 +227,6 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
return nil
}
// onIndexedLocalUnsafeData is called by the event indexing workers.
// This signals to cross-unsafe workers that there's data to index.
func (su *SupervisorBackend) onIndexedLocalUnsafeData() {
// We signal all workers, since dependencies on a chain may be unblocked
// by new data on other chains.
// Busy workers don't block processing.
// The signal is picked up only if the worker is running in the background.
su.crossUnsafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool {
w.OnNewData()
return true
})
}
// onNewLocalSafeData is called by the safety-indexing.
// This signals to cross-safe workers that there's data to index.
func (su *SupervisorBackend) onNewLocalSafeData() {
// We signal all workers, since dependencies on a chain may be unblocked
// by new data on other chains.
// Busy workers don't block processing.
// The signal is picked up only if the worker is running in the background.
su.crossSafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool {
w.OnNewData()
return true
})
}
// openChainDBs initializes all the DB resources of a specific chain.
// It is a sub-task of initResources.
func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error {
......@@ -237,8 +254,6 @@ func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error {
su.chainDBs.AddCrossUnsafeTracker(chainID)
su.chainDBs.AddSubscriptions(chainID)
return nil
}
......@@ -308,7 +323,7 @@ func (su *SupervisorBackend) attachL1RPC(ctx context.Context, l1RPCAddr string)
// if the L1 accessor does not exist, it is created
// if an L1 source is already attached, it is replaced
func (su *SupervisorBackend) AttachL1Source(source l1access.L1Source) {
su.l1Accessor.AttachClient(source)
su.l1Accessor.AttachClient(source, !su.synchronousProcessors)
}
func (su *SupervisorBackend) Start(ctx context.Context) error {
......@@ -323,22 +338,6 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
return fmt.Errorf("failed to resume chains db: %w", err)
}
if !su.synchronousProcessors {
// Make all the chain-processors run automatic background processing
su.chainProcessors.Range(func(_ types.ChainID, processor *processors.ChainProcessor) bool {
processor.StartBackground()
return true
})
su.crossUnsafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool {
worker.StartBackground()
return true
})
su.crossSafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool {
worker.StartBackground()
return true
})
}
return nil
}
......@@ -348,27 +347,10 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
}
su.logger.Info("Closing supervisor backend")
// close all processors
su.chainProcessors.Range(func(id types.ChainID, processor *processors.ChainProcessor) bool {
su.logger.Info("stopping chain processor", "chainID", id)
processor.Close()
return true
})
su.chainProcessors.Clear()
su.crossUnsafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool {
su.logger.Info("stopping cross-unsafe processor", "chainID", id)
worker.Close()
return true
})
su.crossUnsafeProcessors.Clear()
su.sysCancel()
defer su.eventSys.Stop()
su.crossSafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool {
su.logger.Info("stopping cross-safe processor", "chainID", id)
worker.Close()
return true
})
su.crossSafeProcessors.Clear()
su.chainProcessors.Clear()
su.syncNodesController.Close()
......@@ -557,60 +539,17 @@ func (su *SupervisorBackend) SuperRootAtTimestamp(ctx context.Context, timestamp
}, nil
}
// Update methods
// ----------------------------
func (su *SupervisorBackend) UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error {
ch, ok := su.chainProcessors.Get(chainID)
if !ok {
return types.ErrUnknownChain
}
return ch.OnNewHead(head)
// PullLatestL1 makes the supervisor aware of the latest L1 block. Exposed for testing purposes.
func (su *SupervisorBackend) PullLatestL1() error {
return su.l1Accessor.PullLatest()
}
func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
err := su.chainDBs.UpdateLocalSafe(chainID, derivedFrom, lastDerived)
if err != nil {
return err
}
su.onNewLocalSafeData()
return nil
}
func (su *SupervisorBackend) RecordNewL1(ctx context.Context, chain types.ChainID, ref eth.BlockRef) error {
return su.chainDBs.RecordNewL1(chain, ref)
}
// Access to synchronous processing for tests
// ----------------------------
func (su *SupervisorBackend) SyncEvents(chainID types.ChainID) error {
ch, ok := su.chainProcessors.Get(chainID)
if !ok {
return types.ErrUnknownChain
}
ch.ProcessToHead()
return nil
}
func (su *SupervisorBackend) SyncCrossUnsafe(chainID types.ChainID) error {
ch, ok := su.crossUnsafeProcessors.Get(chainID)
if !ok {
return types.ErrUnknownChain
}
return ch.ProcessWork()
}
func (su *SupervisorBackend) SyncCrossSafe(chainID types.ChainID) error {
ch, ok := su.crossSafeProcessors.Get(chainID)
if !ok {
return types.ErrUnknownChain
}
return ch.ProcessWork()
// PullFinalizedL1 makes the supervisor aware of the finalized L1 block. Exposed for testing purposes.
func (su *SupervisorBackend) PullFinalizedL1() error {
return su.l1Accessor.PullFinalized()
}
// SyncFinalizedL1 is a test-only method to update the finalized L1 block without the use of a subscription
func (su *SupervisorBackend) SyncFinalizedL1(ref eth.BlockRef) {
fn := processors.MaybeUpdateFinalizedL1Fn(context.Background(), su.logger, su.chainDBs)
fn(context.Background(), ref)
// SetConfDepthL1 changes the confirmation depth of the L1 chain that is accessible to the supervisor.
func (su *SupervisorBackend) SetConfDepthL1(depth uint64) {
su.l1Accessor.SetConfDepth(depth)
}
......@@ -13,6 +13,7 @@ import (
types2 "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
......@@ -24,6 +25,7 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
......@@ -61,7 +63,8 @@ func TestBackendLifetime(t *testing.T) {
Datadir: dataDir,
}
b, err := NewSupervisorBackend(context.Background(), logger, m, cfg)
ex := event.NewGlobalSynchronous(context.Background())
b, err := NewSupervisorBackend(context.Background(), logger, m, cfg, ex)
require.NoError(t, err)
t.Log("initialized!")
......@@ -106,12 +109,13 @@ func TestBackendLifetime(t *testing.T) {
src.ExpectBlockRefByNumber(2, eth.L1BlockRef{}, ethereum.NotFound)
err = b.UpdateLocalUnsafe(context.Background(), chainA, blockY)
require.NoError(t, err)
b.emitter.Emit(superevents.LocalUnsafeReceivedEvent{
ChainID: chainA,
NewLocalUnsafe: blockY,
})
// Make the processing happen, so we can rely on the new chain information,
// and not run into errors for future data that isn't mocked at this time.
proc, _ := b.chainProcessors.Get(chainA)
proc.ProcessToHead()
require.NoError(t, ex.Drain())
_, err = b.CrossUnsafe(context.Background(), chainA)
require.ErrorIs(t, err, types.ErrFuture, "still no data yet, need cross-unsafe")
......
package cross
import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
......@@ -26,7 +27,7 @@ type CrossSafeDeps interface {
UpdateCrossSafe(chain types.ChainID, l1View eth.BlockRef, lastCrossDerived eth.BlockRef) error
}
func CrossSafeUpdate(ctx context.Context, logger log.Logger, chainID types.ChainID, d CrossSafeDeps) error {
func CrossSafeUpdate(logger log.Logger, chainID types.ChainID, d CrossSafeDeps) error {
logger.Debug("Cross-safe update call")
// TODO(#11693): establish L1 reorg-lock of scopeDerivedFrom
// defer unlock once we are done checking the chain
......@@ -100,13 +101,6 @@ func scopedCrossSafeUpdate(logger log.Logger, chainID types.ChainID, d CrossSafe
return candidateScope, nil
}
func NewCrossSafeWorker(logger log.Logger, chainID types.ChainID, d CrossSafeDeps) *Worker {
logger = logger.New("chain", chainID)
return NewWorker(logger, func(ctx context.Context) error {
return CrossSafeUpdate(ctx, logger, chainID, d)
})
}
func sliceOfExecMsgs(execMsgs map[uint32]*types.ExecutingMessage) []*types.ExecutingMessage {
msgs := make([]*types.ExecutingMessage, 0, len(execMsgs))
for _, msg := range execMsgs {
......@@ -114,3 +108,39 @@ func sliceOfExecMsgs(execMsgs map[uint32]*types.ExecutingMessage) []*types.Execu
}
return msgs
}
type CrossSafeWorker struct {
logger log.Logger
chainID types.ChainID
d CrossSafeDeps
}
func (c *CrossSafeWorker) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case superevents.UpdateCrossSafeRequestEvent:
if x.ChainID != c.chainID {
return false
}
if err := CrossSafeUpdate(c.logger, c.chainID, c.d); err != nil {
if errors.Is(err, types.ErrFuture) {
c.logger.Debug("Worker awaits additional blocks", "err", err)
} else {
c.logger.Warn("Failed to process work", "err", err)
}
}
default:
return false
}
return true
}
var _ event.Deriver = (*CrossUnsafeWorker)(nil)
func NewCrossSafeWorker(logger log.Logger, chainID types.ChainID, d CrossSafeDeps) *CrossSafeWorker {
logger = logger.New("chain", chainID)
return &CrossSafeWorker{
logger: logger,
chainID: chainID,
d: d,
}
}
package cross
import (
"context"
"errors"
"testing"
......@@ -16,7 +15,6 @@ import (
func TestCrossSafeUpdate(t *testing.T) {
t.Run("scopedCrossSafeUpdate passes", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
csd := &mockCrossSafeDeps{}
......@@ -36,11 +34,10 @@ func TestCrossSafeUpdate(t *testing.T) {
csd.deps = mockDependencySet{}
// when scopedCrossSafeUpdate returns no error,
// no error is returned
err := CrossSafeUpdate(ctx, logger, chainID, csd)
err := CrossSafeUpdate(logger, chainID, csd)
require.NoError(t, err)
})
t.Run("scopedCrossSafeUpdate returns error", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
csd := &mockCrossSafeDeps{}
......@@ -56,11 +53,10 @@ func TestCrossSafeUpdate(t *testing.T) {
// when scopedCrossSafeUpdate returns an error,
// (by way of OpenBlock returning an error),
// the error is returned
err := CrossSafeUpdate(ctx, logger, chainID, csd)
err := CrossSafeUpdate(logger, chainID, csd)
require.ErrorContains(t, err, "some error")
})
t.Run("scopedCrossSafeUpdate returns ErrOutOfScope", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
csd := &mockCrossSafeDeps{}
......@@ -98,7 +94,7 @@ func TestCrossSafeUpdate(t *testing.T) {
// CrossSafeUpdate proceeds anyway and calls UpdateCrossSafe
// the update uses the new scope returned by NextDerivedFrom
// and a crossSafeRef made from the current crossSafe and its parent
err := CrossSafeUpdate(ctx, logger, chainID, csd)
err := CrossSafeUpdate(logger, chainID, csd)
require.NoError(t, err)
require.Equal(t, chainID, updatingChain)
require.Equal(t, newScope, updatingCandidateScope)
......@@ -106,7 +102,6 @@ func TestCrossSafeUpdate(t *testing.T) {
require.Equal(t, crossSafeRef, updatingCandidate)
})
t.Run("NextDerivedFrom returns error", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
csd := &mockCrossSafeDeps{}
......@@ -125,11 +120,10 @@ func TestCrossSafeUpdate(t *testing.T) {
// when scopedCrossSafeUpdate returns Out of Scope error,
// and NextDerivedFrom returns an error,
// the error is returned
err := CrossSafeUpdate(ctx, logger, chainID, csd)
err := CrossSafeUpdate(logger, chainID, csd)
require.ErrorContains(t, err, "some error")
})
t.Run("PreviousDerived returns error", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
csd := &mockCrossSafeDeps{}
......@@ -148,11 +142,10 @@ func TestCrossSafeUpdate(t *testing.T) {
// when scopedCrossSafeUpdate returns Out of Scope error,
// and PreviousDerived returns an error,
// the error is returned
err := CrossSafeUpdate(ctx, logger, chainID, csd)
err := CrossSafeUpdate(logger, chainID, csd)
require.ErrorContains(t, err, "some error")
})
t.Run("UpdateCrossSafe returns error", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
csd := &mockCrossSafeDeps{}
......@@ -171,7 +164,7 @@ func TestCrossSafeUpdate(t *testing.T) {
// when scopedCrossSafeUpdate returns Out of Scope error,
// and UpdateCrossSafe returns an error,
// the error is returned
err := CrossSafeUpdate(ctx, logger, chainID, csd)
err := CrossSafeUpdate(logger, chainID, csd)
require.ErrorContains(t, err, "some error")
})
}
......
package cross
import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
......@@ -22,7 +23,7 @@ type CrossUnsafeDeps interface {
UpdateCrossUnsafe(chain types.ChainID, crossUnsafe types.BlockSeal) error
}
func CrossUnsafeUpdate(ctx context.Context, logger log.Logger, chainID types.ChainID, d CrossUnsafeDeps) error {
func CrossUnsafeUpdate(logger log.Logger, chainID types.ChainID, d CrossUnsafeDeps) error {
var candidate types.BlockSeal
var execMsgs []*types.ExecutingMessage
......@@ -71,9 +72,38 @@ func CrossUnsafeUpdate(ctx context.Context, logger log.Logger, chainID types.Cha
return nil
}
func NewCrossUnsafeWorker(logger log.Logger, chainID types.ChainID, d CrossUnsafeDeps) *Worker {
type CrossUnsafeWorker struct {
logger log.Logger
chainID types.ChainID
d CrossUnsafeDeps
}
func (c *CrossUnsafeWorker) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case superevents.UpdateCrossUnsafeRequestEvent:
if x.ChainID != c.chainID {
return false
}
if err := CrossUnsafeUpdate(c.logger, c.chainID, c.d); err != nil {
if errors.Is(err, types.ErrFuture) {
c.logger.Debug("Worker awaits additional blocks", "err", err)
} else {
c.logger.Warn("Failed to process work", "err", err)
}
}
default:
return false
}
return true
}
var _ event.Deriver = (*CrossUnsafeWorker)(nil)
func NewCrossUnsafeWorker(logger log.Logger, chainID types.ChainID, d CrossUnsafeDeps) *CrossUnsafeWorker {
logger = logger.New("chain", chainID)
return NewWorker(logger, func(ctx context.Context) error {
return CrossUnsafeUpdate(ctx, logger, chainID, d)
})
return &CrossUnsafeWorker{
logger: logger,
chainID: chainID,
d: d,
}
}
package cross
import (
"context"
"errors"
"testing"
......@@ -16,7 +15,6 @@ import (
func TestCrossUnsafeUpdate(t *testing.T) {
t.Run("CrossUnsafe returns error", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
......@@ -26,11 +24,10 @@ func TestCrossUnsafeUpdate(t *testing.T) {
usd.deps = mockDependencySet{}
// when an error is returned by CrossUnsafe,
// the error is returned
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
err := CrossUnsafeUpdate(logger, chainID, usd)
require.ErrorContains(t, err, "some error")
})
t.Run("CrossUnsafe returns ErrFuture", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
......@@ -40,11 +37,10 @@ func TestCrossUnsafeUpdate(t *testing.T) {
usd.deps = mockDependencySet{}
// when a ErrFuture is returned by CrossUnsafe,
// no error is returned
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
err := CrossUnsafeUpdate(logger, chainID, usd)
require.NoError(t, err)
})
t.Run("OpenBlock returns error", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
......@@ -54,11 +50,10 @@ func TestCrossUnsafeUpdate(t *testing.T) {
usd.deps = mockDependencySet{}
// when an error is returned by OpenBlock,
// the error is returned
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
err := CrossUnsafeUpdate(logger, chainID, usd)
require.ErrorContains(t, err, "some error")
})
t.Run("opened block parent hash does not match", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
......@@ -73,11 +68,10 @@ func TestCrossUnsafeUpdate(t *testing.T) {
usd.deps = mockDependencySet{}
// when the parent hash of the opened block does not match the cross-unsafe block,
// an ErrConflict is returned
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
err := CrossUnsafeUpdate(logger, chainID, usd)
require.ErrorIs(t, err, types.ErrConflict)
})
t.Run("CrossSafeHazards returns error", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
......@@ -97,11 +91,10 @@ func TestCrossUnsafeUpdate(t *testing.T) {
}
// when CrossSafeHazards returns an error,
// the error is returned
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
err := CrossUnsafeUpdate(logger, chainID, usd)
require.ErrorContains(t, err, "some error")
})
t.Run("HazardUnsafeFrontierChecks returns error", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
......@@ -128,11 +121,10 @@ func TestCrossUnsafeUpdate(t *testing.T) {
}
// when HazardUnsafeFrontierChecks returns an error,
// the error is returned
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
err := CrossUnsafeUpdate(logger, chainID, usd)
require.ErrorContains(t, err, "some error")
})
t.Run("HazardCycleChecks returns error", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
......@@ -152,12 +144,11 @@ func TestCrossUnsafeUpdate(t *testing.T) {
usd.deps = mockDependencySet{}
// HazardCycleChecks returns an error with appropriate wrapping
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
err := CrossUnsafeUpdate(logger, chainID, usd)
require.ErrorContains(t, err, "cycle detected")
require.ErrorContains(t, err, "failed to verify block")
})
t.Run("successful update", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LevelDebug)
chainID := types.ChainIDFromUInt64(0)
usd := &mockCrossUnsafeDeps{}
......@@ -181,7 +172,7 @@ func TestCrossUnsafeUpdate(t *testing.T) {
}
// when there are no errors, the cross-unsafe block is updated
// the updated block is the block opened in OpenBlock
err := CrossUnsafeUpdate(ctx, logger, chainID, usd)
err := CrossUnsafeUpdate(logger, chainID, usd)
require.NoError(t, err)
require.Equal(t, chainID, updatingChainID)
require.Equal(t, types.BlockSealFromRef(bl), updatingBlock)
......
package cross
import (
"context"
"errors"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log"
)
// Worker iterates work
type Worker struct {
log log.Logger
// workFn is the function to call to process the scope
workFn workFn
// channel with capacity of 1, full if there is work to do
poke chan struct{}
pollDuration time.Duration
// lifetime management of the chain processor
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// workFn is a function used by the worker
// it is opaque to the worker, and is set by the constructor
type workFn func(ctx context.Context) error
// NewWorker creates a new worker to process updates
func NewWorker(log log.Logger, workFn workFn) *Worker {
ctx, cancel := context.WithCancel(context.Background())
out := &Worker{
log: log,
poke: make(chan struct{}, 1),
// The data may have changed, and we may have missed a poke, so re-attempt regularly.
pollDuration: 250 * time.Millisecond,
ctx: ctx,
cancel: cancel,
}
out.workFn = workFn
return out
}
func (s *Worker) StartBackground() {
s.wg.Add(1)
go s.worker()
}
func (s *Worker) ProcessWork() error {
return s.workFn(s.ctx)
}
func (s *Worker) worker() {
defer s.wg.Done()
delay := time.NewTicker(s.pollDuration)
for {
if s.ctx.Err() != nil { // check if we are closing down
return
}
// do the work
err := s.workFn(s.ctx)
if err != nil {
if errors.Is(err, s.ctx.Err()) {
return
}
if errors.Is(err, types.ErrFuture) {
s.log.Debug("Worker awaits additional blocks", "err", err)
} else {
s.log.Warn("Failed to process work", "err", err)
}
}
// await next time we process, or detect shutdown
select {
case <-s.ctx.Done():
delay.Stop()
return
case <-s.poke:
s.log.Debug("Continuing cross-safe verification after hint of new data")
continue
case <-delay.C:
s.log.Debug("Checking for cross-safe updates")
continue
}
}
}
func (s *Worker) OnNewData() {
// signal that we have something to process
select {
case s.poke <- struct{}{}:
default:
// already requested an update
}
}
func (s *Worker) Close() {
s.cancel()
s.wg.Wait()
}
package cross
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestWorker(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
t.Run("do work", func(t *testing.T) {
var count int32
w := NewWorker(logger, func(ctx context.Context) error {
atomic.AddInt32(&count, 1)
return nil
})
t.Cleanup(w.Close)
// when ProcessWork is called, the workFn is called once
require.NoError(t, w.ProcessWork())
require.EqualValues(t, 1, atomic.LoadInt32(&count))
})
t.Run("background worker", func(t *testing.T) {
var count int32
w := NewWorker(logger, func(ctx context.Context) error {
atomic.AddInt32(&count, 1)
return nil
})
t.Cleanup(w.Close)
// set a long poll duration so the worker does not auto-run
w.pollDuration = 100 * time.Second
// when StartBackground is called, the worker runs in the background
// the count should increment once
w.StartBackground()
require.Eventually(t, func() bool {
return atomic.LoadInt32(&count) == 1
}, 2*time.Second, 100*time.Millisecond)
})
t.Run("background worker OnNewData", func(t *testing.T) {
var count int32
w := NewWorker(logger, func(ctx context.Context) error {
atomic.AddInt32(&count, 1)
return nil
})
t.Cleanup(w.Close)
// set a long poll duration so the worker does not auto-run
w.pollDuration = 100 * time.Second
// when StartBackground is called, the worker runs in the background
// the count should increment once
w.StartBackground()
require.Eventually(t, func() bool {
return atomic.LoadInt32(&count) == 1
}, 2*time.Second, 100*time.Millisecond)
// when OnNewData is called, the worker runs again
w.OnNewData()
require.Eventually(t, func() bool {
return atomic.LoadInt32(&count) == 2
}, 2*time.Second, 100*time.Millisecond)
// and due to the long poll duration, the worker does not run again
require.Never(t, func() bool {
return atomic.LoadInt32(&count) > 2
}, time.Second, 100*time.Millisecond)
})
t.Run("background fast poll", func(t *testing.T) {
var count int32
w := NewWorker(logger, func(ctx context.Context) error {
atomic.AddInt32(&count, 1)
return nil
})
t.Cleanup(w.Close)
// set a long poll duration so the worker does not auto-run
w.pollDuration = 100 * time.Millisecond
// when StartBackground is called, the worker runs in the background
// the count should increment rapidly and reach at least 10 in 1 second
w.StartBackground()
require.Eventually(t, func() bool {
return atomic.LoadInt32(&count) >= 10
}, 2*time.Second, 100*time.Millisecond)
})
t.Run("close", func(t *testing.T) {
var count int32
w := NewWorker(logger, func(ctx context.Context) error {
atomic.AddInt32(&count, 1)
return nil
})
t.Cleanup(w.Close) // close on cleanup in case of early error
// set a long poll duration so the worker does not auto-run
w.pollDuration = 100 * time.Millisecond
// when StartBackground is called, the worker runs in the background
// the count should increment rapidly and reach at least 10 in 1 second
w.StartBackground()
require.Eventually(t, func() bool {
return atomic.LoadInt32(&count) >= 10
}, 10*time.Second, time.Second)
// once the worker is closed, it stops running
// and the count does not increment
w.Close()
stopCount := atomic.LoadInt32(&count)
require.Never(t, func() bool {
return atomic.LoadInt32(&count) != stopCount
}, time.Second, 100*time.Millisecond)
})
}
package db
import (
"errors"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
// maybeInitSafeDB initializes the chain database if it is not already initialized
// it checks if the Local Safe database is empty, and loads it with the Anchor Point if so
func (db *ChainsDB) maybeInitSafeDB(id types.ChainID, anchor types.DerivedBlockRefPair) {
_, err := db.LocalSafe(id)
if errors.Is(err, types.ErrFuture) {
db.logger.Debug("initializing chain database", "chain", id)
if err := db.UpdateCrossSafe(id, anchor.DerivedFrom, anchor.Derived); err != nil {
db.logger.Warn("failed to initialize cross safe", "chain", id, "error", err)
}
db.UpdateLocalSafe(id, anchor.DerivedFrom, anchor.Derived)
} else if err != nil {
db.logger.Warn("failed to check if chain database is initialized", "chain", id, "error", err)
} else {
db.logger.Debug("chain database already initialized", "chain", id)
}
}
func (db *ChainsDB) maybeInitEventsDB(id types.ChainID, anchor types.DerivedBlockRefPair) {
_, _, _, err := db.OpenBlock(id, 0)
if errors.Is(err, types.ErrFuture) {
db.logger.Debug("initializing events database", "chain", id)
err := db.SealBlock(id, anchor.Derived)
if err != nil {
db.logger.Warn("failed to seal initial block", "chain", id, "error", err)
}
db.logger.Debug("initialized events database", "chain", id)
} else if err != nil {
db.logger.Warn("failed to check if logDB is initialized", "chain", id, "error", err)
} else {
db.logger.Debug("events database already initialized", "chain", id)
}
}
......@@ -8,13 +8,14 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/locks"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/fromda"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
gethevent "github.com/ethereum/go-ethereum/event"
)
type LogStorage interface {
......@@ -87,12 +88,6 @@ type ChainsDB struct {
// cross-safe: index of L2 blocks we know to only have cross-L2 valid dependencies
crossDBs locks.RWMap[types.ChainID, CrossDerivedFromStorage]
localUnsafeFeeds locks.RWMap[types.ChainID, *gethevent.FeedOf[types.BlockSeal]]
crossUnsafeFeeds locks.RWMap[types.ChainID, *gethevent.FeedOf[types.BlockSeal]]
localSafeFeeds locks.RWMap[types.ChainID, *gethevent.FeedOf[types.DerivedBlockSealPair]]
crossSafeFeeds locks.RWMap[types.ChainID, *gethevent.FeedOf[types.DerivedBlockSealPair]]
l2FinalityFeeds locks.RWMap[types.ChainID, *gethevent.FeedOf[types.BlockSeal]]
// finalized: the L1 finality progress. This can be translated into what may be considered as finalized in L2.
// It is initially zeroed, and the L2 finality query will return
// an error until it has this L1 finality to work with.
......@@ -103,8 +98,13 @@ type ChainsDB struct {
depSet depset.DependencySet
logger log.Logger
// emitter used to signal when the DB changes, for other modules to react to
emitter event.Emitter
}
var _ event.AttachEmitter = (*ChainsDB)(nil)
func NewChainsDB(l log.Logger, depSet depset.DependencySet) *ChainsDB {
return &ChainsDB{
logger: l,
......@@ -112,6 +112,25 @@ func NewChainsDB(l log.Logger, depSet depset.DependencySet) *ChainsDB {
}
}
func (db *ChainsDB) AttachEmitter(em event.Emitter) {
db.emitter = em
}
func (db *ChainsDB) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case superevents.AnchorEvent:
db.maybeInitEventsDB(x.ChainID, x.Anchor)
db.maybeInitSafeDB(x.ChainID, x.Anchor)
case superevents.LocalDerivedEvent:
db.UpdateLocalSafe(x.ChainID, x.Derived.DerivedFrom, x.Derived.Derived)
case superevents.FinalizedL1RequestEvent:
db.onFinalizedL1(x.FinalizedL1)
default:
return false
}
return true
}
func (db *ChainsDB) AddLogDB(chainID types.ChainID, logDB LogStorage) {
if db.logDBs.Has(chainID) {
db.logger.Warn("overwriting existing log DB for chain", "chain", chainID)
......@@ -143,14 +162,6 @@ func (db *ChainsDB) AddCrossUnsafeTracker(chainID types.ChainID) {
db.crossUnsafe.Set(chainID, &locks.RWValue[types.BlockSeal]{})
}
func (db *ChainsDB) AddSubscriptions(chainID types.ChainID) {
locks.InitPtrMaybe(&db.l2FinalityFeeds, chainID)
locks.InitPtrMaybe(&db.crossSafeFeeds, chainID)
locks.InitPtrMaybe(&db.localSafeFeeds, chainID)
locks.InitPtrMaybe(&db.crossUnsafeFeeds, chainID)
locks.InitPtrMaybe(&db.localUnsafeFeeds, chainID)
}
// ResumeFromLastSealedBlock prepares the chains db to resume recording events after a restart.
// It rewinds the database to the last block that is guaranteed to have been fully recorded to the database,
// to ensure it can resume recording from the first log of the next block.
......
package db
import (
"fmt"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
gethevent "github.com/ethereum/go-ethereum/event"
)
func (db *ChainsDB) SubscribeLocalUnsafe(chainID types.ChainID, c chan<- types.BlockSeal) (gethevent.Subscription, error) {
sub, ok := db.localUnsafeFeeds.Get(chainID)
if !ok {
return nil, fmt.Errorf("cannot subscribe to local-unsafe: %w: %s", types.ErrUnknownChain, chainID)
}
return sub.Subscribe(c), nil
}
func (db *ChainsDB) SubscribeCrossUnsafe(chainID types.ChainID, c chan<- types.BlockSeal) (gethevent.Subscription, error) {
sub, ok := db.localUnsafeFeeds.Get(chainID)
if !ok {
return nil, fmt.Errorf("cannot subscribe to cross-unsafe: %w: %s", types.ErrUnknownChain, chainID)
}
return sub.Subscribe(c), nil
}
func (db *ChainsDB) SubscribeLocalSafe(chainID types.ChainID, c chan<- types.DerivedBlockSealPair) (gethevent.Subscription, error) {
sub, ok := db.localSafeFeeds.Get(chainID)
if !ok {
return nil, fmt.Errorf("cannot subscribe to cross-safe: %w: %s", types.ErrUnknownChain, chainID)
}
return sub.Subscribe(c), nil
}
func (db *ChainsDB) SubscribeCrossSafe(chainID types.ChainID, c chan<- types.DerivedBlockSealPair) (gethevent.Subscription, error) {
sub, ok := db.crossSafeFeeds.Get(chainID)
if !ok {
return nil, fmt.Errorf("cannot subscribe to cross-safe: %w: %s", types.ErrUnknownChain, chainID)
}
return sub.Subscribe(c), nil
}
func (db *ChainsDB) SubscribeFinalized(chainID types.ChainID, c chan<- types.BlockSeal) (gethevent.Subscription, error) {
sub, ok := db.l2FinalityFeeds.Get(chainID)
if !ok {
return nil, fmt.Errorf("cannot subscribe to finalized: %w: %s", types.ErrUnknownChain, chainID)
}
return sub.Subscribe(c), nil
}
package db
import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
......@@ -33,10 +33,10 @@ func (db *ChainsDB) SealBlock(chain types.ChainID, block eth.BlockRef) error {
return fmt.Errorf("failed to seal block %v: %w", block, err)
}
db.logger.Info("Updated local unsafe", "chain", chain, "block", block)
feed, ok := db.localUnsafeFeeds.Get(chain)
if ok {
feed.Send(types.BlockSealFromRef(block))
}
db.emitter.Emit(superevents.LocalUnsafeUpdateEvent{
ChainID: chain,
NewLocalUnsafe: block,
})
return nil
}
......@@ -48,23 +48,31 @@ func (db *ChainsDB) Rewind(chain types.ChainID, headBlockNum uint64) error {
return logDB.Rewind(headBlockNum)
}
func (db *ChainsDB) UpdateLocalSafe(chain types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
func (db *ChainsDB) UpdateLocalSafe(chain types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) {
logger := db.logger.New("chain", chain, "derivedFrom", derivedFrom, "lastDerived", lastDerived)
localDB, ok := db.localDBs.Get(chain)
if !ok {
return fmt.Errorf("cannot UpdateLocalSafe: %w: %v", types.ErrUnknownChain, chain)
logger.Error("Cannot update local-safe DB, unknown chain")
return
}
db.logger.Debug("Updating local safe", "chain", chain, "derivedFrom", derivedFrom, "lastDerived", lastDerived)
logger.Debug("Updating local safe DB")
if err := localDB.AddDerived(derivedFrom, lastDerived); err != nil {
return err
db.logger.Warn("Failed to update local safe")
db.emitter.Emit(superevents.LocalSafeOutOfSyncEvent{
ChainID: chain,
L1Ref: derivedFrom,
Err: err,
})
return
}
feed, ok := db.localSafeFeeds.Get(chain)
if ok {
feed.Send(types.DerivedBlockSealPair{
db.logger.Info("Updated local safe DB")
db.emitter.Emit(superevents.LocalSafeUpdateEvent{
ChainID: chain,
NewLocalSafe: types.DerivedBlockSealPair{
DerivedFrom: types.BlockSealFromRef(derivedFrom),
Derived: types.BlockSealFromRef(lastDerived),
},
})
}
return nil
}
func (db *ChainsDB) UpdateCrossUnsafe(chain types.ChainID, crossUnsafe types.BlockSeal) error {
......@@ -73,11 +81,11 @@ func (db *ChainsDB) UpdateCrossUnsafe(chain types.ChainID, crossUnsafe types.Blo
return fmt.Errorf("cannot UpdateCrossUnsafe: %w: %s", types.ErrUnknownChain, chain)
}
v.Set(crossUnsafe)
feed, ok := db.crossUnsafeFeeds.Get(chain)
if ok {
feed.Send(crossUnsafe)
}
db.logger.Info("Updated cross-unsafe", "chain", chain, "crossUnsafe", crossUnsafe)
db.emitter.Emit(superevents.CrossUnsafeUpdateEvent{
ChainID: chain,
NewCrossUnsafe: crossUnsafe,
})
return nil
}
......@@ -90,89 +98,40 @@ func (db *ChainsDB) UpdateCrossSafe(chain types.ChainID, l1View eth.BlockRef, la
return err
}
db.logger.Info("Updated cross-safe", "chain", chain, "l1View", l1View, "lastCrossDerived", lastCrossDerived)
// notify subscribers
sub, ok := db.crossSafeFeeds.Get(chain)
if ok {
sub.Send(types.DerivedBlockSealPair{
db.emitter.Emit(superevents.CrossSafeUpdateEvent{
ChainID: chain,
NewCrossSafe: types.DerivedBlockSealPair{
DerivedFrom: types.BlockSealFromRef(l1View),
Derived: types.BlockSealFromRef(lastCrossDerived),
},
})
}
return nil
}
func (db *ChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error {
func (db *ChainsDB) onFinalizedL1(finalized eth.BlockRef) {
// Lock, so we avoid race-conditions in-between getting (for comparison) and setting.
// Unlock is managed explicitly, in this function so we can call NotifyL2Finalized after releasing the lock.
db.finalizedL1.Lock()
if v := db.finalizedL1.Value; v.Number > finalized.Number {
if v := db.finalizedL1.Value; v != (eth.BlockRef{}) && v.Number > finalized.Number {
db.finalizedL1.Unlock()
return fmt.Errorf("cannot rewind finalized L1 head from %s to %s", v, finalized)
db.logger.Warn("Cannot rewind finalized L1 block", "current", v, "signal", finalized)
return
}
db.finalizedL1.Value = finalized
db.logger.Info("Updated finalized L1", "finalizedL1", finalized)
db.finalizedL1.Unlock()
// whenver the L1 Finalized changes, the L2 Finalized may change, notify subscribers
db.emitter.Emit(superevents.FinalizedL1UpdateEvent{
FinalizedL1: finalized,
})
// whenever the L1 Finalized changes, the L2 Finalized may change, notify subscribers
for _, chain := range db.depSet.Chains() {
db.NotifyL2Finalized(chain)
}
return nil
}
// NotifyL2Finalized notifies all L2 finality subscribers of the latest L2 finalized block for the given chain.
func (db *ChainsDB) NotifyL2Finalized(chain types.ChainID) {
f, err := db.Finalized(chain)
if err != nil {
db.logger.Error("Failed to get finalized L1 block", "chain", chain, "err", err)
return
}
sub, ok := db.l2FinalityFeeds.Get(chain)
if ok {
sub.Send(f)
}
}
// RecordNewL1 records a new L1 block in the database for a given chain.
// It uses the latest derived L2 block as the derived block for the new L1 block.
// It also triggers L2 Finality Notifications, as a new L1 may change L2 finality.
// NOTE: callers to this function are responsible for ensuring that advancing the L1 block is correct
// (ie that no further L2 blocks need to be recorded) because if the L1 block is recorded with a gap in derived blocks,
// the database is considered corrupted and the supervisor will not be able to proceed without pruning the database.
// The database cannot protect against this because it is does not know how many L2 blocks to expect for a given L1 block.
func (db *ChainsDB) RecordNewL1(chain types.ChainID, ref eth.BlockRef) error {
// get local derivation database
ldb, ok := db.localDBs.Get(chain)
if !ok {
return fmt.Errorf("cannot RecordNewL1 to chain %s: %w", chain, types.ErrUnknownChain)
}
// get the latest derived and derivedFrom blocks
derivedFrom, derived, err := ldb.Latest()
fin, err := db.Finalized(chain)
if err != nil {
return fmt.Errorf("failed to get latest derivedFrom for chain %s: %w", chain, err)
}
// make a ref from the latest derived block
derivedParent, err := ldb.PreviousDerived(derived.ID())
if errors.Is(err, types.ErrFuture) {
db.logger.Warn("Empty DB, Recording first L1 block", "chain", chain, "err", err)
} else if err != nil {
db.logger.Warn("Failed to get latest derivedfrom to insert new L1 block", "chain", chain, "err", err)
return err
db.logger.Warn("Unable to determine finalized L2 block", "chain", chain, "l1Finalized", finalized)
continue
}
derivedRef := derived.MustWithParent(derivedParent.ID())
// don't push the new L1 block if it's not newer than the latest derived block
if derivedFrom.Number >= ref.Number {
db.logger.Warn("L1 block has already been processed for this height", "chain", chain, "block", ref, "latest", derivedFrom)
return nil
}
// the database is extended with the new L1 and the existing L2
if err = db.UpdateLocalSafe(chain, ref, derivedRef); err != nil {
db.logger.Error("Failed to update local safe", "chain", chain, "block", ref, "derived", derived, "err", err)
return err
db.emitter.Emit(superevents.FinalizedL2UpdateEvent{ChainID: chain, FinalizedL2: fin})
}
// now tht the db has the new L1, we can attempt to to notify the L2 finality subscribers
db.NotifyL2Finalized(chain)
return nil
}
package backend
type Executor interface {
Start()
}
......@@ -3,14 +3,20 @@ package l1access
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
)
const reqTimeout = time.Second * 10
type L1Source interface {
L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error)
......@@ -25,10 +31,12 @@ type L1Source interface {
// When requests for blocks are more recent than the tip minus the confirmation depth, a NotFound error is returned.
type L1Accessor struct {
log log.Logger
client L1Source // may be nil if no source is attached
clientMu sync.RWMutex
finalityHandler eth.HeadSignalFn
emitter event.Emitter
finalitySub ethereum.Subscription
// tipHeight is the height of the L1 chain tip
......@@ -36,22 +44,36 @@ type L1Accessor struct {
tipHeight uint64
latestSub ethereum.Subscription
confDepth uint64
// to interrupt requests, so the system can shut down quickly
sysCtx context.Context
}
func NewL1Accessor(log log.Logger, client L1Source, finalityHandler eth.HeadSignalFn) *L1Accessor {
var _ event.AttachEmitter = (*L1Accessor)(nil)
func NewL1Accessor(sysCtx context.Context, log log.Logger, client L1Source) *L1Accessor {
return &L1Accessor{
log: log.New("service", "l1-processor"),
client: client,
finalityHandler: finalityHandler,
// placeholder confirmation depth
confDepth: 2,
sysCtx: sysCtx,
}
}
// AttachClient attaches a new client to the processor
// if an existing client is attached, the old subscriptions are unsubscribed
// and new subscriptions are created
func (p *L1Accessor) AttachClient(client L1Source) {
func (p *L1Accessor) AttachEmitter(em event.Emitter) {
p.emitter = em
}
func (p *L1Accessor) OnEvent(ev event.Event) bool {
return false
}
// AttachClient attaches a new client to the processor.
// If an existing client was attached, the old subscriptions are unsubscribed.
// New subscriptions are created if subscribe is true.
// If subscribe is false, L1 status has to be fetched manually with PullFinalized and PullLatest.
func (p *L1Accessor) AttachClient(client L1Source, subscribe bool) {
p.clientMu.Lock()
defer p.clientMu.Unlock()
......@@ -67,11 +89,8 @@ func (p *L1Accessor) AttachClient(client L1Source) {
p.client = client
// resubscribe to the finality handler
p.SubscribeFinalityHandler()
// if we have a handler function, resubscribe to the finality handler
if p.finalityHandler != nil {
if client != nil && subscribe {
p.SubscribeLatestHandler()
p.SubscribeFinalityHandler()
}
}
......@@ -80,23 +99,66 @@ func (p *L1Accessor) SubscribeFinalityHandler() {
p.finalitySub = eth.PollBlockChanges(
p.log,
p.client,
p.finalityHandler,
p.onFinalized,
eth.Finalized,
3*time.Second,
10*time.Second)
reqTimeout)
}
func (p *L1Accessor) SubscribeLatestHandler() {
p.latestSub = eth.PollBlockChanges(
p.log,
p.client,
p.SetTipHeight,
p.onLatest,
eth.Unsafe,
3*time.Second,
10*time.Second)
reqTimeout)
}
func (p *L1Accessor) SetConfDepth(depth uint64) {
p.confDepth = depth
}
func (p *L1Accessor) PullFinalized() error {
p.clientMu.RLock()
defer p.clientMu.RUnlock()
if p.client == nil {
return errors.New("no L1 source configured")
}
ctx, cancel := context.WithTimeout(p.sysCtx, reqTimeout)
defer cancel()
ref, err := p.client.L1BlockRefByLabel(ctx, eth.Finalized)
if err != nil {
return fmt.Errorf("failed to pull finalized block ref: %w", err)
}
p.onFinalized(p.sysCtx, ref)
return nil
}
func (p *L1Accessor) PullLatest() error {
p.clientMu.RLock()
defer p.clientMu.RUnlock()
if p.client == nil {
return errors.New("no L1 source configured")
}
ctx, cancel := context.WithTimeout(p.sysCtx, reqTimeout)
defer cancel()
ref, err := p.client.L1BlockRefByLabel(ctx, eth.Unsafe)
if err != nil {
return fmt.Errorf("failed to pull latest block ref: %w", err)
}
p.onLatest(p.sysCtx, ref)
return nil
}
func (p *L1Accessor) onFinalized(ctx context.Context, ref eth.L1BlockRef) {
p.emitter.Emit(superevents.FinalizedL1RequestEvent{FinalizedL1: ref})
}
func (p *L1Accessor) SetTipHeight(ctx context.Context, ref eth.L1BlockRef) {
func (p *L1Accessor) onLatest(ctx context.Context, ref eth.L1BlockRef) {
p.tipHeight = ref.Number
}
......
......@@ -40,7 +40,7 @@ func TestL1Accessor(t *testing.T) {
Number: number,
}, nil
}
accessor := NewL1Accessor(log, source, nil)
accessor := NewL1Accessor(context.Background(), log, source)
accessor.tipHeight = 10
// Test L1BlockRefByNumber
......@@ -54,7 +54,7 @@ func TestL1Accessor(t *testing.T) {
// attach a new source
source2 := &mockL1Source{}
accessor.AttachClient(source2)
accessor.AttachClient(source2, false)
require.Equal(t, source2, accessor.client)
}
......@@ -6,7 +6,6 @@ import (
"fmt"
"slices"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum"
......@@ -14,7 +13,9 @@ import (
gethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
......@@ -48,57 +49,42 @@ type ChainProcessor struct {
chain types.ChainID
systemContext context.Context
processor LogProcessor
rewinder DatabaseRewinder
// the last known head. May be 0 if not known.
lastHead atomic.Uint64
// channel with capacity of 1, full if there is work to do
newHead chan struct{}
// to signal to the other services that new indexed data is available
onIndexed func()
// lifetime management of the chain processor
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
emitter event.Emitter
maxFetcherThreads int
}
func NewChainProcessor(log log.Logger, chain types.ChainID, processor LogProcessor, rewinder DatabaseRewinder, onIndexed func()) *ChainProcessor {
ctx, cancel := context.WithCancel(context.Background())
var _ event.AttachEmitter = (*ChainProcessor)(nil)
var _ event.Deriver = (*ChainProcessor)(nil)
func NewChainProcessor(systemContext context.Context, log log.Logger, chain types.ChainID, processor LogProcessor, rewinder DatabaseRewinder) *ChainProcessor {
out := &ChainProcessor{
systemContext: systemContext,
log: log.New("chain", chain),
client: nil,
chain: chain,
processor: processor,
rewinder: rewinder,
newHead: make(chan struct{}, 1),
onIndexed: onIndexed,
ctx: ctx,
cancel: cancel,
maxFetcherThreads: 10,
}
return out
}
func (s *ChainProcessor) AttachEmitter(em event.Emitter) {
s.emitter = em
}
func (s *ChainProcessor) SetSource(cl Source) {
s.clientLock.Lock()
defer s.clientLock.Unlock()
s.client = cl
}
func (s *ChainProcessor) StartBackground() {
s.wg.Add(1)
go s.worker()
}
func (s *ChainProcessor) ProcessToHead() {
s.work()
}
func (s *ChainProcessor) nextNum() uint64 {
headNum, ok := s.rewinder.LatestBlockNum(s.chain)
if !ok {
......@@ -107,36 +93,21 @@ func (s *ChainProcessor) nextNum() uint64 {
return headNum + 1
}
// worker is the main loop of the chain processor's worker
// it manages work by request or on a timer, and watches for shutdown
func (s *ChainProcessor) worker() {
defer s.wg.Done()
delay := time.NewTicker(time.Second * 5)
for {
// await next time we process, or detect shutdown
select {
case <-s.ctx.Done():
delay.Stop()
return
case <-s.newHead:
s.log.Debug("Responding to new head signal")
s.work()
case <-delay.C:
s.log.Debug("Checking for updates")
s.work()
func (s *ChainProcessor) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case superevents.ChainProcessEvent:
if x.ChainID != s.chain {
return false
}
s.onRequest(x.Target)
default:
return false
}
return true
}
// work processes the next block in the chain repeatedly until it reaches the head
func (s *ChainProcessor) work() {
for {
if s.ctx.Err() != nil { // check if we are closing down
return
}
_, err := s.rangeUpdate()
target := s.nextNum()
func (s *ChainProcessor) onRequest(target uint64) {
_, err := s.rangeUpdate(target)
if err != nil {
if errors.Is(err, ethereum.NotFound) {
s.log.Debug("Event-indexer cannot find next block yet", "target", target, "err", err)
......@@ -145,17 +116,18 @@ func (s *ChainProcessor) work() {
} else {
s.log.Error("Failed to process new block", "err", err)
}
} else if x := s.lastHead.Load(); target+1 <= x {
s.log.Debug("Continuing with next block", "newTarget", target+1, "lastHead", x)
continue // instantly continue processing, no need to idle
} else if x := s.nextNum(); x <= target {
s.log.Debug("Continuing with next block", "target", target, "next", x)
s.emitter.Emit(superevents.ChainProcessEvent{
ChainID: s.chain,
Target: target,
}) // instantly continue processing, no need to idle
} else {
s.log.Debug("Idling block-processing, reached latest block", "head", target)
}
return
}
}
func (s *ChainProcessor) rangeUpdate() (int, error) {
func (s *ChainProcessor) rangeUpdate(target uint64) (int, error) {
s.clientLock.Lock()
defer s.clientLock.Unlock()
if s.client == nil {
......@@ -165,7 +137,7 @@ func (s *ChainProcessor) rangeUpdate() (int, error) {
// define the range of blocks to fetch
// [next, last] inclusive with a max of s.fetcherThreads blocks
next := s.nextNum()
last := s.lastHead.Load()
last := target
nums := make([]uint64, 0, s.maxFetcherThreads)
for i := next; i <= last; i++ {
......@@ -201,7 +173,7 @@ func (s *ChainProcessor) rangeUpdate() (int, error) {
defer func() { parallelResults <- result }()
// fetch the block ref
ctx, cancel := context.WithTimeout(s.ctx, time.Second*10)
ctx, cancel := context.WithTimeout(s.systemContext, time.Second*10)
nextL1, err := s.client.BlockRefByNumber(ctx, num)
cancel()
if err != nil {
......@@ -217,7 +189,7 @@ func (s *ChainProcessor) rangeUpdate() (int, error) {
result.blockRef = &next
// fetch receipts
ctx, cancel = context.WithTimeout(s.ctx, time.Second*10)
ctx, cancel = context.WithTimeout(s.systemContext, time.Second*10)
receipts, err := s.client.FetchReceipts(ctx, next.Hash)
cancel()
if err != nil {
......@@ -258,7 +230,7 @@ func (s *ChainProcessor) rangeUpdate() (int, error) {
return i, fmt.Errorf("failed to fetch block %d: %w", results[i].num, results[i].err)
}
// process the receipts
err := s.process(s.ctx, *results[i].blockRef, results[i].receipts)
err := s.process(s.systemContext, *results[i].blockRef, results[i].receipts)
if err != nil {
return i, fmt.Errorf("failed to process block %d: %w", results[i].num, err)
}
......@@ -283,24 +255,5 @@ func (s *ChainProcessor) process(ctx context.Context, next eth.BlockRef, receipt
return err
}
s.log.Info("Indexed block events", "block", next, "txs", len(receipts))
s.onIndexed()
return nil
}
func (s *ChainProcessor) OnNewHead(head eth.BlockRef) error {
// update the latest target
s.lastHead.Store(head.Number)
// signal that we have something to process
select {
case s.newHead <- struct{}{}:
default:
// already requested an update
}
return nil
}
func (s *ChainProcessor) Close() {
s.cancel()
s.wg.Wait()
}
package processors
import (
"context"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
)
type chainsDB interface {
FinalizedL1() eth.BlockRef
UpdateFinalizedL1(finalized eth.BlockRef) error
}
// MaybeUpdateFinalizedL1Fn returns a HeadSignalFn that updates the database with the new finalized block if it is newer than the current one.
func MaybeUpdateFinalizedL1Fn(ctx context.Context, logger log.Logger, db chainsDB) eth.HeadSignalFn {
return func(ctx context.Context, ref eth.L1BlockRef) {
// do something with the new block
logger.Debug("Received new Finalized L1 block", "block", ref)
currentFinalized := db.FinalizedL1()
if currentFinalized.Number > ref.Number {
logger.Warn("Finalized block in database is newer than subscribed finalized block", "current", currentFinalized, "new", ref)
return
}
if ref.Number > currentFinalized.Number || currentFinalized == (eth.BlockRef{}) {
// update the database with the new finalized block
if err := db.UpdateFinalizedL1(ref); err != nil {
logger.Warn("Failed to update finalized L1", "err", err)
return
}
logger.Debug("Updated finalized L1 block", "block", ref)
}
}
}
......@@ -18,11 +18,6 @@ type LogStorage interface {
AddLog(chain types.ChainID, logHash common.Hash, parentBlock eth.BlockID, logIdx uint32, execMsg *types.ExecutingMessage) error
}
type ChainsDBClientForLogProcessor interface {
SealBlock(chain types.ChainID, block eth.BlockRef) error
AddLog(chain types.ChainID, logHash common.Hash, parentBlock eth.BlockID, logIdx uint32, execMsg *types.ExecutingMessage) error
}
type logProcessor struct {
chain types.ChainID
logStore LogStorage
......
package superevents
import (
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type ChainProcessEvent struct {
ChainID types.ChainID
Target uint64
}
func (ev ChainProcessEvent) String() string {
return "chain-process"
}
type UpdateCrossUnsafeRequestEvent struct {
ChainID types.ChainID
}
func (ev UpdateCrossUnsafeRequestEvent) String() string {
return "update-cross-unsafe-request"
}
type UpdateCrossSafeRequestEvent struct {
ChainID types.ChainID
}
func (ev UpdateCrossSafeRequestEvent) String() string {
return "update-cross-safe-request"
}
type LocalUnsafeUpdateEvent struct {
ChainID types.ChainID
NewLocalUnsafe eth.BlockRef
}
func (ev LocalUnsafeUpdateEvent) String() string {
return "local-unsafe-update"
}
type LocalSafeUpdateEvent struct {
ChainID types.ChainID
NewLocalSafe types.DerivedBlockSealPair
}
func (ev LocalSafeUpdateEvent) String() string {
return "local-safe-update"
}
type CrossUnsafeUpdateEvent struct {
ChainID types.ChainID
NewCrossUnsafe types.BlockSeal
}
func (ev CrossUnsafeUpdateEvent) String() string {
return "cross-unsafe-update"
}
type CrossSafeUpdateEvent struct {
ChainID types.ChainID
NewCrossSafe types.DerivedBlockSealPair
}
func (ev CrossSafeUpdateEvent) String() string {
return "cross-safe-update"
}
type FinalizedL1RequestEvent struct {
FinalizedL1 eth.BlockRef
}
func (ev FinalizedL1RequestEvent) String() string {
return "finalized-l1-request"
}
type FinalizedL1UpdateEvent struct {
FinalizedL1 eth.BlockRef
}
func (ev FinalizedL1UpdateEvent) String() string {
return "finalized-l1-update"
}
type FinalizedL2UpdateEvent struct {
ChainID types.ChainID
FinalizedL2 types.BlockSeal
}
func (ev FinalizedL2UpdateEvent) String() string {
return "finalized-l2-update"
}
type LocalSafeOutOfSyncEvent struct {
ChainID types.ChainID
L1Ref eth.BlockRef
Err error
}
func (ev LocalSafeOutOfSyncEvent) String() string {
return "local-safe-out-of-sync"
}
type LocalUnsafeReceivedEvent struct {
ChainID types.ChainID
NewLocalUnsafe eth.BlockRef
}
func (ev LocalUnsafeReceivedEvent) String() string {
return "local-unsafe-received"
}
type LocalDerivedEvent struct {
ChainID types.ChainID
Derived types.DerivedBlockRefPair
}
func (ev LocalDerivedEvent) String() string {
return "local-derived"
}
type AnchorEvent struct {
ChainID types.ChainID
Anchor types.DerivedBlockRefPair
}
func (ev AnchorEvent) String() string {
return "anchor"
}
......@@ -2,13 +2,15 @@ package syncnode
import (
"context"
"errors"
"fmt"
"sync/atomic"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/locks"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
......@@ -18,24 +20,38 @@ import (
type SyncNodesController struct {
logger log.Logger
id atomic.Uint64
controllers locks.RWMap[types.ChainID, *locks.RWMap[*ManagedNode, struct{}]]
eventSys event.System
emitter event.Emitter
backend backend
db chainsDB
depSet depset.DependencySet
}
var _ event.AttachEmitter = (*SyncNodesController)(nil)
// NewSyncNodesController creates a new SyncNodeController
func NewSyncNodesController(l log.Logger, depset depset.DependencySet, db chainsDB, backend backend) *SyncNodesController {
func NewSyncNodesController(l log.Logger, depset depset.DependencySet, eventSys event.System, backend backend) *SyncNodesController {
return &SyncNodesController{
logger: l,
depSet: depset,
db: db,
eventSys: eventSys,
backend: backend,
}
}
func (snc *SyncNodesController) AttachEmitter(em event.Emitter) {
snc.emitter = em
}
func (snc *SyncNodesController) OnEvent(ev event.Event) bool {
return false
}
func (snc *SyncNodesController) Close() error {
snc.controllers.Range(func(chainID types.ChainID, controllers *locks.RWMap[*ManagedNode, struct{}]) bool {
controllers.Range(func(node *ManagedNode, _ struct{}) bool {
......@@ -49,59 +65,30 @@ func (snc *SyncNodesController) Close() error {
// AttachNodeController attaches a node to be managed by the supervisor.
// If noSubscribe, the node is not actively polled/subscribed to, and requires manual ManagedNode.PullEvents calls.
func (snc *SyncNodesController) AttachNodeController(id types.ChainID, ctrl SyncControl, noSubscribe bool) (Node, error) {
if !snc.depSet.HasChain(id) {
return nil, fmt.Errorf("chain %v not in dependency set: %w", id, types.ErrUnknownChain)
func (snc *SyncNodesController) AttachNodeController(chainID types.ChainID, ctrl SyncControl, noSubscribe bool) (Node, error) {
if !snc.depSet.HasChain(chainID) {
return nil, fmt.Errorf("chain %v not in dependency set: %w", chainID, types.ErrUnknownChain)
}
// lazy init the controllers map for this chain
if !snc.controllers.Has(id) {
snc.controllers.Set(id, &locks.RWMap[*ManagedNode, struct{}]{})
}
controllersForChain, _ := snc.controllers.Get(id)
node := NewManagedNode(snc.logger, id, ctrl, snc.db, snc.backend, noSubscribe)
snc.controllers.Default(chainID, func() *locks.RWMap[*ManagedNode, struct{}] {
return &locks.RWMap[*ManagedNode, struct{}]{}
})
controllersForChain, _ := snc.controllers.Get(chainID)
node := NewManagedNode(snc.logger, chainID, ctrl, snc.backend, noSubscribe)
nodeID := snc.id.Add(1)
name := fmt.Sprintf("syncnode-%s-%d", chainID, nodeID)
snc.eventSys.Register(name, node, event.DefaultRegisterOpts())
controllersForChain.Set(node, struct{}{})
anchor, err := ctrl.AnchorPoint(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to get anchor point: %w", err)
}
snc.maybeInitSafeDB(id, anchor)
snc.maybeInitEventsDB(id, anchor)
snc.emitter.Emit(superevents.AnchorEvent{
ChainID: chainID,
Anchor: anchor,
})
node.Start()
return node, nil
}
// maybeInitSafeDB initializes the chain database if it is not already initialized
// it checks if the Local Safe database is empty, and loads it with the Anchor Point if so
func (snc *SyncNodesController) maybeInitSafeDB(id types.ChainID, anchor types.DerivedBlockRefPair) {
_, err := snc.db.LocalSafe(id)
if errors.Is(err, types.ErrFuture) {
snc.logger.Debug("initializing chain database", "chain", id)
if err := snc.db.UpdateCrossSafe(id, anchor.DerivedFrom, anchor.Derived); err != nil {
snc.logger.Warn("failed to initialize cross safe", "chain", id, "error", err)
}
if err := snc.db.UpdateLocalSafe(id, anchor.DerivedFrom, anchor.Derived); err != nil {
snc.logger.Warn("failed to initialize local safe", "chain", id, "error", err)
}
snc.logger.Debug("initialized chain database", "chain", id, "anchor", anchor)
} else if err != nil {
snc.logger.Warn("failed to check if chain database is initialized", "chain", id, "error", err)
} else {
snc.logger.Debug("chain database already initialized", "chain", id)
}
}
func (snc *SyncNodesController) maybeInitEventsDB(id types.ChainID, anchor types.DerivedBlockRefPair) {
_, _, _, err := snc.db.OpenBlock(id, 0)
if errors.Is(err, types.ErrFuture) {
snc.logger.Debug("initializing events database", "chain", id)
err := snc.backend.UpdateLocalUnsafe(context.Background(), id, anchor.Derived)
if err != nil {
snc.logger.Warn("failed to seal initial block", "chain", id, "error", err)
}
snc.logger.Debug("initialized events database", "chain", id)
} else if err != nil {
snc.logger.Warn("failed to check if logDB is initialized", "chain", id, "error", err)
} else {
snc.logger.Debug("events database already initialized", "chain", id)
}
}
......@@ -4,69 +4,20 @@ import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum"
gethevent "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum"
gethevent "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
type mockChainsDB struct {
localSafeFn func(chainID types.ChainID) (types.DerivedBlockSealPair, error)
updateLocalSafeFn func(chainID types.ChainID, ref eth.BlockRef, derived eth.BlockRef) error
updateCrossSafeFn func(chainID types.ChainID, ref eth.BlockRef, derived eth.BlockRef) error
openBlockFn func(chainID types.ChainID, i uint64) (seal eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, err error)
subscribeCrossUnsafe gethevent.FeedOf[types.BlockSeal]
subscribeCrosSafe gethevent.FeedOf[types.DerivedBlockSealPair]
subscribeFinalized gethevent.FeedOf[types.BlockSeal]
}
func (m *mockChainsDB) OpenBlock(chainID types.ChainID, i uint64) (seal eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, err error) {
if m.openBlockFn != nil {
return m.openBlockFn(chainID, i)
}
return eth.BlockRef{}, 0, nil, nil
}
func (m *mockChainsDB) UpdateLocalSafe(chainID types.ChainID, ref eth.BlockRef, derived eth.BlockRef) error {
if m.updateLocalSafeFn != nil {
return m.updateLocalSafeFn(chainID, ref, derived)
}
return nil
}
func (m *mockChainsDB) LocalSafe(chainID types.ChainID) (types.DerivedBlockSealPair, error) {
if m.localSafeFn != nil {
return m.localSafeFn(chainID)
}
return types.DerivedBlockSealPair{}, nil
}
func (m *mockChainsDB) UpdateCrossSafe(chainID types.ChainID, ref eth.BlockRef, derived eth.BlockRef) error {
if m.updateCrossSafeFn != nil {
return m.updateCrossSafeFn(chainID, ref, derived)
}
return nil
}
func (m *mockChainsDB) SubscribeCrossUnsafe(chainID types.ChainID, c chan<- types.BlockSeal) (gethevent.Subscription, error) {
return m.subscribeCrossUnsafe.Subscribe(c), nil
}
func (m *mockChainsDB) SubscribeCrossSafe(chainID types.ChainID, c chan<- types.DerivedBlockSealPair) (gethevent.Subscription, error) {
return m.subscribeCrosSafe.Subscribe(c), nil
}
func (m *mockChainsDB) SubscribeFinalized(chainID types.ChainID, c chan<- types.BlockSeal) (gethevent.Subscription, error) {
return m.subscribeFinalized.Subscribe(c), nil
}
var _ chainsDB = (*mockChainsDB)(nil)
type mockSyncControl struct {
anchorPointFn func(ctx context.Context) (types.DerivedBlockRefPair, error)
provideL1Fn func(ctx context.Context, ref eth.BlockRef) error
......@@ -134,11 +85,7 @@ func (m *mockSyncControl) UpdateFinalized(ctx context.Context, id eth.BlockID) e
var _ SyncControl = (*mockSyncControl)(nil)
type mockBackend struct {
updateLocalUnsafeFn func(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error
updateLocalSafeFn func(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error
recordL1Fn func(ctx context.Context, chain types.ChainID, ref eth.BlockRef) error
}
type mockBackend struct{}
func (m *mockBackend) LocalSafe(ctx context.Context, chainID types.ChainID) (pair types.DerivedIDPair, err error) {
return types.DerivedIDPair{}, nil
......@@ -148,10 +95,6 @@ func (m *mockBackend) LocalUnsafe(ctx context.Context, chainID types.ChainID) (e
return eth.BlockID{}, nil
}
func (m *mockBackend) LatestUnsafe(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
return eth.BlockID{}, nil
}
func (m *mockBackend) SafeDerivedAt(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockID) (derived eth.BlockID, err error) {
return eth.BlockID{}, nil
}
......@@ -160,31 +103,10 @@ func (m *mockBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth
return eth.BlockID{}, nil
}
func (m *mockBackend) UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
if m.updateLocalSafeFn != nil {
return m.updateLocalSafeFn(ctx, chainID, derivedFrom, lastDerived)
}
return nil
}
func (m *mockBackend) UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error {
if m.updateLocalUnsafeFn != nil {
return m.updateLocalUnsafeFn(ctx, chainID, head)
}
return nil
}
func (m *mockBackend) L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error) {
return eth.L1BlockRef{}, nil
}
func (m *mockBackend) RecordNewL1(ctx context.Context, chain types.ChainID, ref eth.BlockRef) error {
if m.recordL1Fn != nil {
return m.recordL1Fn(ctx, chain, ref)
}
return nil
}
var _ backend = (*mockBackend)(nil)
func sampleDepSet(t *testing.T) depset.DependencySet {
......@@ -205,11 +127,38 @@ func sampleDepSet(t *testing.T) depset.DependencySet {
return depSet
}
type eventMonitor struct {
anchorCalled int
localDerived int
receivedLocalUnsafe int
}
func (m *eventMonitor) OnEvent(ev event.Event) bool {
switch ev.(type) {
case superevents.AnchorEvent:
m.anchorCalled += 1
case superevents.LocalDerivedEvent:
m.localDerived += 1
case superevents.LocalUnsafeReceivedEvent:
m.receivedLocalUnsafe += 1
default:
return false
}
return true
}
// TestInitFromAnchorPoint tests that the SyncNodesController uses the Anchor Point to initialize databases
func TestInitFromAnchorPoint(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
depSet := sampleDepSet(t)
controller := NewSyncNodesController(logger, depSet, &mockChainsDB{}, &mockBackend{})
ex := event.NewGlobalSynchronous(context.Background())
eventSys := event.NewSystem(logger, ex)
mon := &eventMonitor{}
eventSys.Register("monitor", mon, event.DefaultRegisterOpts())
controller := NewSyncNodesController(logger, depSet, eventSys, &mockBackend{})
eventSys.Register("controller", controller, event.DefaultRegisterOpts())
require.Zero(t, controller.controllers.Len(), "controllers should be empty to start")
......@@ -223,53 +172,18 @@ func TestInitFromAnchorPoint(t *testing.T) {
}, nil
}
// have the local safe return an error to trigger the initialization
controller.db.(*mockChainsDB).localSafeFn = func(chainID types.ChainID) (types.DerivedBlockSealPair, error) {
return types.DerivedBlockSealPair{}, types.ErrFuture
}
// record when the updateLocalSafe function is called
localCalled := 0
controller.db.(*mockChainsDB).updateLocalSafeFn = func(chainID types.ChainID, ref eth.BlockRef, derived eth.BlockRef) error {
localCalled++
return nil
}
// record when the updateCrossSafe function is called
crossCalled := 0
controller.db.(*mockChainsDB).updateCrossSafeFn = func(chainID types.ChainID, ref eth.BlockRef, derived eth.BlockRef) error {
crossCalled++
return nil
}
// have OpenBlock return an error to trigger the initialization
controller.db.(*mockChainsDB).openBlockFn = func(chainID types.ChainID, i uint64) (seal eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, err error) {
return eth.BlockRef{}, 0, nil, types.ErrFuture
}
unsafeCalled := 0
controller.backend.(*mockBackend).updateLocalUnsafeFn = func(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error {
unsafeCalled++
return nil
}
// after the first attach, both databases are called for update
_, err := controller.AttachNodeController(types.ChainIDFromUInt64(900), &ctrl, false)
require.NoError(t, err)
require.Equal(t, 1, localCalled, "local safe should have been updated once")
require.Equal(t, 1, crossCalled, "cross safe should have been updated twice")
require.Equal(t, 1, unsafeCalled, "local unsafe should have been updated once")
require.NoError(t, ex.Drain())
require.Equal(t, 1, mon.anchorCalled, "an anchor point should be received")
// reset the local safe function to return no error
controller.db.(*mockChainsDB).localSafeFn = nil
// reset the open block function to return no error
controller.db.(*mockChainsDB).openBlockFn = nil
// after the second attach, there are no additional updates (no empty signal from the DB)
// on second attach we send the anchor again; it's up to the DB to use it or not.
ctrl2 := mockSyncControl{}
_, err = controller.AttachNodeController(types.ChainIDFromUInt64(901), &ctrl2, false)
require.NoError(t, err)
require.Equal(t, 1, localCalled, "local safe should have been updated once")
require.Equal(t, 1, crossCalled, "cross safe should have been updated once")
require.Equal(t, 1, unsafeCalled, "local unsafe should have been updated once")
require.NoError(t, ex.Drain())
require.Equal(t, 2, mon.anchorCalled, "anchor point again")
}
// TestAttachNodeController tests the AttachNodeController function of the SyncNodesController.
......@@ -277,8 +191,10 @@ func TestInitFromAnchorPoint(t *testing.T) {
func TestAttachNodeController(t *testing.T) {
logger := log.New()
depSet := sampleDepSet(t)
controller := NewSyncNodesController(logger, depSet, &mockChainsDB{}, &mockBackend{})
ex := event.NewGlobalSynchronous(context.Background())
eventSys := event.NewSystem(logger, ex)
controller := NewSyncNodesController(logger, depSet, eventSys, &mockBackend{})
eventSys.Register("controller", controller, event.DefaultRegisterOpts())
require.Zero(t, controller.controllers.Len(), "controllers should be empty to start")
// Attach a controller for chain 900
......
......@@ -51,8 +51,4 @@ type SyncNode interface {
type Node interface {
PullEvents(ctx context.Context) (pulledAny bool, err error)
AwaitSentCrossUnsafeUpdate(ctx context.Context, minNum uint64) error
AwaitSentCrossSafeUpdate(ctx context.Context, minNum uint64) error
AwaitSentFinalizedUpdate(ctx context.Context, minNum uint64) error
}
......@@ -14,31 +14,19 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/locks"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
gethevent "github.com/ethereum/go-ethereum/event"
)
type chainsDB interface {
LocalSafe(chainID types.ChainID) (types.DerivedBlockSealPair, error)
OpenBlock(chainID types.ChainID, blockNum uint64) (seal eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, err error)
UpdateLocalSafe(chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error
UpdateCrossSafe(chainID types.ChainID, l1View eth.BlockRef, lastCrossDerived eth.BlockRef) error
SubscribeCrossUnsafe(chainID types.ChainID, c chan<- types.BlockSeal) (gethevent.Subscription, error)
SubscribeCrossSafe(chainID types.ChainID, c chan<- types.DerivedBlockSealPair) (gethevent.Subscription, error)
SubscribeFinalized(chainID types.ChainID, c chan<- types.BlockSeal) (gethevent.Subscription, error)
}
type backend interface {
UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error
UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error
LocalSafe(ctx context.Context, chainID types.ChainID) (pair types.DerivedIDPair, err error)
LocalUnsafe(ctx context.Context, chainID types.ChainID) (eth.BlockID, error)
SafeDerivedAt(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockID) (derived eth.BlockID, err error)
Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error)
L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
RecordNewL1(ctx context.Context, chainID types.ChainID, l1 eth.BlockRef) error
}
const (
......@@ -53,28 +41,23 @@ type ManagedNode struct {
backend backend
lastSentCrossUnsafe locks.Watch[eth.BlockID]
lastSentCrossSafe locks.Watch[types.DerivedIDPair]
lastSentFinalized locks.Watch[eth.BlockID]
// when the supervisor has a cross-safe update for the node
crossSafeUpdateChan chan types.DerivedBlockSealPair
// when the supervisor has a cross-unsafe update for the node
crossUnsafeUpdateChan chan types.BlockSeal
// when the supervisor has a finality update for the node
finalizedUpdateChan chan types.BlockSeal
// when the node has an update for us
// When the node has an update for us
// Nil when node events are pulled synchronously.
nodeEvents chan *types.ManagedEvent
subscriptions []gethevent.Subscription
emitter event.Emitter
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewManagedNode(log log.Logger, id types.ChainID, node SyncControl, db chainsDB, backend backend, noSubscribe bool) *ManagedNode {
var _ event.AttachEmitter = (*ManagedNode)(nil)
var _ event.Deriver = (*ManagedNode)(nil)
func NewManagedNode(log log.Logger, id types.ChainID, node SyncControl, backend backend, noSubscribe bool) *ManagedNode {
ctx, cancel := context.WithCancel(context.Background())
m := &ManagedNode{
log: log.New("chain", id),
......@@ -84,7 +67,6 @@ func NewManagedNode(log log.Logger, id types.ChainID, node SyncControl, db chain
ctx: ctx,
cancel: cancel,
}
m.SubscribeToDBEvents(db)
if !noSubscribe {
m.SubscribeToNodeEvents()
}
......@@ -92,25 +74,37 @@ func NewManagedNode(log log.Logger, id types.ChainID, node SyncControl, db chain
return m
}
func (m *ManagedNode) SubscribeToDBEvents(db chainsDB) {
m.crossUnsafeUpdateChan = make(chan types.BlockSeal, 10)
m.crossSafeUpdateChan = make(chan types.DerivedBlockSealPair, 10)
m.finalizedUpdateChan = make(chan types.BlockSeal, 10)
if sub, err := db.SubscribeCrossUnsafe(m.chainID, m.crossUnsafeUpdateChan); err != nil {
m.log.Warn("failed to subscribe to cross unsafe", "err", err)
} else {
m.subscriptions = append(m.subscriptions, sub)
}
if sub, err := db.SubscribeCrossSafe(m.chainID, m.crossSafeUpdateChan); err != nil {
m.log.Warn("failed to subscribe to cross safe", "err", err)
} else {
m.subscriptions = append(m.subscriptions, sub)
}
if sub, err := db.SubscribeFinalized(m.chainID, m.finalizedUpdateChan); err != nil {
m.log.Warn("failed to subscribe to finalized", "err", err)
} else {
m.subscriptions = append(m.subscriptions, sub)
}
func (m *ManagedNode) AttachEmitter(em event.Emitter) {
m.emitter = em
}
func (m *ManagedNode) OnEvent(ev event.Event) bool {
switch x := ev.(type) {
case superevents.CrossUnsafeUpdateEvent:
if x.ChainID != m.chainID {
return false
}
m.onCrossUnsafeUpdate(x.NewCrossUnsafe)
case superevents.CrossSafeUpdateEvent:
if x.ChainID != m.chainID {
return false
}
m.onCrossSafeUpdate(x.NewCrossSafe)
case superevents.FinalizedL2UpdateEvent:
if x.ChainID != m.chainID {
return false
}
m.onFinalizedL2(x.FinalizedL2)
case superevents.LocalSafeOutOfSyncEvent:
if x.ChainID != m.chainID {
return false
}
m.resetSignal(x.Err, x.L1Ref)
// TODO: watch for reorg events from DB. Send a reset signal to op-node if needed
default:
return false
}
return true
}
func (m *ManagedNode) SubscribeToNodeEvents() {
......@@ -159,13 +153,7 @@ func (m *ManagedNode) Start() {
case <-m.ctx.Done():
m.log.Info("Exiting node syncing")
return
case seal := <-m.crossUnsafeUpdateChan:
m.onCrossUnsafeUpdate(seal)
case pair := <-m.crossSafeUpdateChan:
m.onCrossSafeUpdate(pair)
case seal := <-m.finalizedUpdateChan:
m.onFinalizedL2(seal)
case ev := <-m.nodeEvents:
case ev := <-m.nodeEvents: // nil, indefinitely blocking, if no node-events subscriber is set up.
m.onNodeEvent(ev)
}
}
......@@ -229,7 +217,6 @@ func (m *ManagedNode) onCrossUnsafeUpdate(seal types.BlockSeal) {
m.log.Warn("Node failed cross-unsafe updating", "err", err)
return
}
m.lastSentCrossUnsafe.Set(id)
}
func (m *ManagedNode) onCrossSafeUpdate(pair types.DerivedBlockSealPair) {
......@@ -242,7 +229,6 @@ func (m *ManagedNode) onCrossSafeUpdate(pair types.DerivedBlockSealPair) {
m.log.Warn("Node failed cross-safe updating", "err", err)
return
}
m.lastSentCrossSafe.Set(pairIDs)
}
func (m *ManagedNode) onFinalizedL2(seal types.BlockSeal) {
......@@ -255,31 +241,33 @@ func (m *ManagedNode) onFinalizedL2(seal types.BlockSeal) {
m.log.Warn("Node failed finality updating", "err", err)
return
}
m.lastSentFinalized.Set(id)
}
func (m *ManagedNode) onUnsafeBlock(unsafeRef eth.BlockRef) {
m.log.Info("Node has new unsafe block", "unsafeBlock", unsafeRef)
ctx, cancel := context.WithTimeout(m.ctx, internalTimeout)
defer cancel()
if err := m.backend.UpdateLocalUnsafe(ctx, m.chainID, unsafeRef); err != nil {
m.log.Warn("Backend failed to pick up on new unsafe block", "unsafeBlock", unsafeRef, "err", err)
// TODO: if conflict error -> send reset to drop
// TODO: if future error -> send reset to rewind
// TODO: if out of order -> warn, just old data
}
m.emitter.Emit(superevents.LocalUnsafeReceivedEvent{
ChainID: m.chainID,
NewLocalUnsafe: unsafeRef,
})
}
func (m *ManagedNode) onDerivationUpdate(pair types.DerivedBlockRefPair) {
m.log.Info("Node derived new block", "derived", pair.Derived,
"derivedParent", pair.Derived.ParentID(), "derivedFrom", pair.DerivedFrom)
ctx, cancel := context.WithTimeout(m.ctx, internalTimeout)
defer cancel()
if err := m.backend.UpdateLocalSafe(ctx, m.chainID, pair.DerivedFrom, pair.Derived); err != nil {
m.log.Warn("Backend failed to process local-safe update",
"derived", pair.Derived, "derivedFrom", pair.DerivedFrom, "err", err)
m.resetSignal(err, pair.DerivedFrom)
}
m.emitter.Emit(superevents.LocalDerivedEvent{
ChainID: m.chainID,
Derived: pair,
})
// TODO: keep synchronous local-safe DB update feedback?
// We'll still need more async ways of doing this for reorg handling.
//ctx, cancel := context.WithTimeout(m.ctx, internalTimeout)
//defer cancel()
//if err := m.backend.UpdateLocalSafe(ctx, m.chainID, pair.DerivedFrom, pair.Derived); err != nil {
// m.log.Warn("Backend failed to process local-safe update",
// "derived", pair.Derived, "derivedFrom", pair.DerivedFrom, "err", err)
// m.resetSignal(err, pair.DerivedFrom)
//}
}
func (m *ManagedNode) resetSignal(errSignal error, l1Ref eth.BlockRef) {
......@@ -358,36 +346,6 @@ func (m *ManagedNode) onExhaustL1Event(completed types.DerivedBlockRefPair) {
// but does not fit on the derivation state.
return
}
// now that the node has the next L1 block, we can add it to the database
// this ensures that only the L1 *or* the L2 ever increments in the derivation database,
// as RecordNewL1 will insert the new L1 block with the latest L2 block
ctx, cancel := context.WithTimeout(m.ctx, internalTimeout)
defer cancel()
err = m.backend.RecordNewL1(ctx, m.chainID, nextL1)
if err != nil {
m.log.Warn("Failed to record new L1 block", "l1Block", nextL1, "err", err)
}
}
func (m *ManagedNode) AwaitSentCrossUnsafeUpdate(ctx context.Context, minNum uint64) error {
_, err := m.lastSentCrossUnsafe.Catch(ctx, func(id eth.BlockID) bool {
return id.Number >= minNum
})
return err
}
func (m *ManagedNode) AwaitSentCrossSafeUpdate(ctx context.Context, minNum uint64) error {
_, err := m.lastSentCrossSafe.Catch(ctx, func(pair types.DerivedIDPair) bool {
return pair.Derived.Number >= minNum
})
return err
}
func (m *ManagedNode) AwaitSentFinalizedUpdate(ctx context.Context, minNum uint64) error {
_, err := m.lastSentFinalized.Catch(ctx, func(id eth.BlockID) bool {
return id.Number >= minNum
})
return err
}
func (m *ManagedNode) Close() error {
......
......@@ -5,31 +5,39 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestEventResponse(t *testing.T) {
chainID := types.ChainIDFromUInt64(1)
logger := testlog.Logger(t, log.LvlInfo)
syncCtrl := &mockSyncControl{}
db := &mockChainsDB{}
backend := &mockBackend{}
node := NewManagedNode(logger, chainID, syncCtrl, db, backend, false)
ex := event.NewGlobalSynchronous(context.Background())
eventSys := event.NewSystem(logger, ex)
mon := &eventMonitor{}
eventSys.Register("monitor", mon, event.DefaultRegisterOpts())
node := NewManagedNode(logger, chainID, syncCtrl, backend, false)
eventSys.Register("node", node, event.DefaultRegisterOpts())
emitter := eventSys.Register("test", nil, event.DefaultRegisterOpts())
crossUnsafe := 0
crossSafe := 0
finalized := 0
nodeUnsafe := 0
nodeDerivation := 0
nodeExhausted := 0
// recordL1 is called along with nodeExhausted
recordL1 := 0
// the node will call UpdateCrossUnsafe when a cross-unsafe event is received from the database
syncCtrl.updateCrossUnsafeFn = func(ctx context.Context, id eth.BlockID) error {
......@@ -47,26 +55,12 @@ func TestEventResponse(t *testing.T) {
return nil
}
// track events from the node
// the node will call UpdateLocalUnsafe when a new unsafe block is received
backend.updateLocalUnsafeFn = func(ctx context.Context, chID types.ChainID, unsafe eth.BlockRef) error {
nodeUnsafe++
return nil
}
// the node will call UpdateLocalSafe when a new safe and L1 derivation source is received
backend.updateLocalSafeFn = func(ctx context.Context, chainID types.ChainID, derivedFrom eth.L1BlockRef, lastDerived eth.L1BlockRef) error {
nodeDerivation++
return nil
}
// the node will call ProvideL1 when the node is exhausted and needs a new L1 derivation source
syncCtrl.provideL1Fn = func(ctx context.Context, nextL1 eth.BlockRef) error {
nodeExhausted++
return nil
}
backend.recordL1Fn = func(ctx context.Context, chainID types.ChainID, ref eth.L1BlockRef) error {
recordL1++
return nil
}
// TODO(#13595): rework node-reset, and include testing for it here
node.Start()
......@@ -74,9 +68,10 @@ func TestEventResponse(t *testing.T) {
// send events and continue to do so until at least one of each type has been received
require.Eventually(t, func() bool {
// send in one event of each type
db.subscribeCrossUnsafe.Send(types.BlockSeal{})
db.subscribeCrosSafe.Send(types.DerivedBlockSealPair{})
db.subscribeFinalized.Send(types.BlockSeal{})
emitter.Emit(superevents.CrossUnsafeUpdateEvent{ChainID: chainID})
emitter.Emit(superevents.CrossSafeUpdateEvent{ChainID: chainID})
emitter.Emit(superevents.FinalizedL2UpdateEvent{ChainID: chainID})
syncCtrl.subscribeEvents.Send(&types.ManagedEvent{
UnsafeBlock: &eth.BlockRef{Number: 1}})
syncCtrl.subscribeEvents.Send(&types.ManagedEvent{
......@@ -84,14 +79,13 @@ func TestEventResponse(t *testing.T) {
syncCtrl.subscribeEvents.Send(&types.ManagedEvent{
ExhaustL1: &types.DerivedBlockRefPair{DerivedFrom: eth.BlockRef{Number: 1}, Derived: eth.BlockRef{Number: 2}}})
require.NoError(t, ex.Drain())
return crossUnsafe >= 1 &&
crossSafe >= 1 &&
finalized >= 1 &&
nodeUnsafe >= 1 &&
nodeDerivation >= 1 &&
mon.receivedLocalUnsafe >= 1 &&
mon.localDerived >= 1 &&
nodeExhausted >= 1
}, 4*time.Second, 250*time.Millisecond)
// recordL1 is called every time nodeExhausted is called
require.Equal(t, nodeExhausted, recordL1)
}
......@@ -5,15 +5,19 @@ import (
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-node/rollup/event"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/tasks"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend"
......@@ -34,6 +38,8 @@ type SupervisorService struct {
metrics metrics.Metricer
poller *tasks.Poller
backend Backend
pprofService *oppprof.Service
......@@ -72,11 +78,21 @@ func (su *SupervisorService) initFromCLIConfig(ctx context.Context, cfg *config.
}
func (su *SupervisorService) initBackend(ctx context.Context, cfg *config.Config) error {
// In the future we may introduce other executors.
// For now, we just use a synchronous executor, and poll the drain function of it.
ex := event.NewGlobalSynchronous(ctx)
su.poller = tasks.NewPoller(func() {
if err := ex.Drain(); err != nil {
su.log.Warn("Failed to execute events", "err", err)
}
}, clock.SystemClock, time.Millisecond*100)
if cfg.MockRun {
su.backend = backend.NewMockBackend()
return nil
}
be, err := backend.NewSupervisorBackend(ctx, su.log, su.metrics, cfg)
be, err := backend.NewSupervisorBackend(ctx, su.log, su.metrics, cfg, ex)
if err != nil {
return fmt.Errorf("failed to create supervisor backend: %w", err)
}
......@@ -179,6 +195,8 @@ func (su *SupervisorService) Start(ctx context.Context) error {
return fmt.Errorf("unable to start RPC server: %w", err)
}
su.poller.Start()
if err := su.backend.Start(ctx); err != nil {
return fmt.Errorf("unable to start backend: %w", err)
}
......@@ -219,6 +237,10 @@ func (su *SupervisorService) Stop(ctx context.Context) error {
}
}
su.log.Info("JSON-RPC server stopped")
if su.poller != nil {
su.poller.Stop()
}
su.log.Info("Event processing stopped")
return result
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment