backend.go 20.9 KB
Newer Older
1 2 3 4 5
package backend

import (
	"context"
	"errors"
6
	"fmt"
7
	"slices"
8 9
	"sync/atomic"

10
	"github.com/ethereum/go-ethereum/common"
11
	"github.com/ethereum/go-ethereum/common/hexutil"
12 13
	"github.com/ethereum/go-ethereum/log"

14
	"github.com/ethereum-optimism/optimism/op-service/client"
15
	"github.com/ethereum-optimism/optimism/op-service/eth"
16
	"github.com/ethereum-optimism/optimism/op-service/locks"
17
	"github.com/ethereum-optimism/optimism/op-service/sources"
18
	"github.com/ethereum-optimism/optimism/op-supervisor/config"
19
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/cross"
20
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
21
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/sync"
22
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
23
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/l1access"
24
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
25
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
26 27
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
28 29 30 31
)

type SupervisorBackend struct {
	started atomic.Bool
32
	logger  log.Logger
33 34
	m       Metrics
	dataDir string
35

36
	// depSet is the dependency set that the backend uses to know about the chains it is indexing
37 38
	depSet depset.DependencySet

39
	// chainDBs is the primary interface to the databases, including logs, derived-from information and L1 finalization
40
	chainDBs *db.ChainsDB
41

42 43
	// l1Accessor provides access to the L1 chain for the L1 processor and subscribes to new block events
	l1Accessor *l1access.L1Accessor
44

45
	// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
46
	chainProcessors locks.RWMap[types.ChainID, *processors.ChainProcessor]
47 48 49
	// crossProcessors are used to index cross-chain dependency validity data once the log events are indexed
	crossSafeProcessors   locks.RWMap[types.ChainID, *cross.Worker]
	crossUnsafeProcessors locks.RWMap[types.ChainID, *cross.Worker]
50
	syncSources           locks.RWMap[types.ChainID, syncnode.SyncSource]
51

52 53
	// syncNodesController controls the derivation or reset of the sync nodes
	syncNodesController *syncnode.SyncNodesController
54

55 56 57
	// synchronousProcessors disables background-workers,
	// requiring manual triggers for the backend to process l2 data.
	synchronousProcessors bool
58 59 60 61

	// chainMetrics are used to track metrics for each chain
	// they are reused for processors and databases of the same chain
	chainMetrics locks.RWMap[types.ChainID, *chainMetrics]
62 63 64 65
}

var _ frontend.Backend = (*SupervisorBackend)(nil)

66
var errAlreadyStopped = errors.New("already stopped")
67

68
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
69
	// attempt to prepare the data directory
70
	if err := db.PrepDataDir(cfg.Datadir); err != nil {
71 72
		return nil, err
	}
73

74 75 76 77 78 79
	// Load the dependency set
	depSet, err := cfg.DependencySetSource.LoadDependencySet(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to load dependency set: %w", err)
	}

80 81 82 83 84 85 86 87 88 89 90 91 92
	// Sync the databases from the remote server if configured
	// We only attempt to sync a database if it doesn't exist; we don't update existing databases
	if cfg.DatadirSyncEndpoint != "" {
		syncCfg := sync.Config{DataDir: cfg.Datadir, Logger: logger}
		syncClient, err := sync.NewClient(syncCfg, cfg.DatadirSyncEndpoint)
		if err != nil {
			return nil, fmt.Errorf("failed to create db sync client: %w", err)
		}
		if err := syncClient.SyncAll(ctx, depSet.Chains(), false); err != nil {
			return nil, fmt.Errorf("failed to sync databases: %w", err)
		}
	}

93
	// create initial per-chain resources
94
	chainsDBs := db.NewChainsDB(logger, depSet)
95

96 97 98 99 100
	l1Accessor := l1access.NewL1Accessor(
		logger,
		nil,
		processors.MaybeUpdateFinalizedL1Fn(context.Background(), logger, chainsDBs),
	)
101

102 103
	// create the supervisor backend
	super := &SupervisorBackend{
104 105 106 107 108 109
		logger:     logger,
		m:          m,
		dataDir:    cfg.Datadir,
		depSet:     depSet,
		chainDBs:   chainsDBs,
		l1Accessor: l1Accessor,
110 111 112 113
		// For testing we can avoid running the processors.
		synchronousProcessors: cfg.SynchronousProcessors,
	}

114 115 116
	// create node controller
	super.syncNodesController = syncnode.NewSyncNodesController(logger, depSet, chainsDBs, super)

117 118 119 120 121
	// Initialize the resources of the supervisor backend.
	// Stop the supervisor if any of the resources fails to be initialized.
	if err := super.initResources(ctx, cfg); err != nil {
		err = fmt.Errorf("failed to init resources: %w", err)
		return nil, errors.Join(err, super.Stop(ctx))
122 123
	}

124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
	return super, nil
}

// initResources initializes all the resources, such as DBs and processors for chains.
// An error may returned, without closing the thus-far initialized resources.
// Upon error the caller should call Stop() on the supervisor backend to clean up and release resources.
func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Config) error {
	chains := su.depSet.Chains()

	// for each chain known to the dependency set, create the necessary DB resources
	for _, chainID := range chains {
		if err := su.openChainDBs(chainID); err != nil {
			return fmt.Errorf("failed to open chain %s: %w", chainID, err)
		}
	}

140 141 142
	// initialize all cross-unsafe processors
	for _, chainID := range chains {
		worker := cross.NewCrossUnsafeWorker(su.logger, chainID, su.chainDBs)
143
		su.crossUnsafeProcessors.Set(chainID, worker)
144 145 146 147
	}
	// initialize all cross-safe processors
	for _, chainID := range chains {
		worker := cross.NewCrossSafeWorker(su.logger, chainID, su.chainDBs)
148
		su.crossSafeProcessors.Set(chainID, worker)
149
	}
150 151 152 153 154
	// For each chain initialize a chain processor service,
	// after cross-unsafe workers are ready to receive updates
	for _, chainID := range chains {
		logProcessor := processors.NewLogProcessor(chainID, su.chainDBs)
		chainProcessor := processors.NewChainProcessor(su.logger, chainID, logProcessor, su.chainDBs, su.onIndexedLocalUnsafeData)
155
		su.chainProcessors.Set(chainID, chainProcessor)
156
	}
157 158 159 160
	// initialize sync sources
	for _, chainID := range chains {
		su.syncSources.Set(chainID, nil)
	}
161

162 163 164 165 166 167 168 169
	if cfg.L1RPC != "" {
		if err := su.attachL1RPC(ctx, cfg.L1RPC); err != nil {
			return fmt.Errorf("failed to create L1 processor: %w", err)
		}
	} else {
		su.logger.Warn("No L1 RPC configured, L1 processor will not be started")
	}

170 171 172 173 174 175 176
	setups, err := cfg.SyncSources.Load(ctx, su.logger)
	if err != nil {
		return fmt.Errorf("failed to load sync-source setups: %w", err)
	}
	// the config has some sync sources (RPC connections) to attach to the chain-processors
	for _, srcSetup := range setups {
		src, err := srcSetup.Setup(ctx, su.logger)
177
		if err != nil {
178 179
			return fmt.Errorf("failed to set up sync source: %w", err)
		}
180
		if _, err := su.AttachSyncNode(ctx, src, false); err != nil {
181
			return fmt.Errorf("failed to attach sync source %s: %w", src, err)
182
		}
183
	}
184
	return nil
185
}
186

187 188 189 190 191 192 193
// onIndexedLocalUnsafeData is called by the event indexing workers.
// This signals to cross-unsafe workers that there's data to index.
func (su *SupervisorBackend) onIndexedLocalUnsafeData() {
	// We signal all workers, since dependencies on a chain may be unblocked
	// by new data on other chains.
	// Busy workers don't block processing.
	// The signal is picked up only if the worker is running in the background.
194
	su.crossUnsafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool {
195
		w.OnNewData()
196 197
		return true
	})
198 199 200 201 202 203 204 205 206
}

// onNewLocalSafeData is called by the safety-indexing.
// This signals to cross-safe workers that there's data to index.
func (su *SupervisorBackend) onNewLocalSafeData() {
	// We signal all workers, since dependencies on a chain may be unblocked
	// by new data on other chains.
	// Busy workers don't block processing.
	// The signal is picked up only if the worker is running in the background.
207
	su.crossSafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool {
208
		w.OnNewData()
209 210
		return true
	})
211 212
}

213 214 215 216 217
// openChainDBs initializes all the DB resources of a specific chain.
// It is a sub-task of initResources.
func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error {
	cm := newChainMetrics(chainID, su.m)
	// create metrics and a logdb for the chain
218
	su.chainMetrics.Set(chainID, cm)
219 220

	logDB, err := db.OpenLogDB(su.logger, chainID, su.dataDir, cm)
221
	if err != nil {
222
		return fmt.Errorf("failed to open logDB of chain %s: %w", chainID, err)
223
	}
224 225 226
	su.chainDBs.AddLogDB(chainID, logDB)

	localDB, err := db.OpenLocalDerivedFromDB(su.logger, chainID, su.dataDir, cm)
227
	if err != nil {
228
		return fmt.Errorf("failed to open local derived-from DB of chain %s: %w", chainID, err)
229
	}
230 231 232
	su.chainDBs.AddLocalDerivedFromDB(chainID, localDB)

	crossDB, err := db.OpenCrossDerivedFromDB(su.logger, chainID, su.dataDir, cm)
233
	if err != nil {
234
		return fmt.Errorf("failed to open cross derived-from DB of chain %s: %w", chainID, err)
235
	}
236 237 238
	su.chainDBs.AddCrossDerivedFromDB(chainID, crossDB)

	su.chainDBs.AddCrossUnsafeTracker(chainID)
239 240 241

	su.chainDBs.AddSubscriptions(chainID)

242 243 244
	return nil
}

245 246 247
// AttachSyncNode attaches a node to be managed by the supervisor.
// If noSubscribe, the node is not actively polled/subscribed to, and requires manual Node.PullEvents calls.
func (su *SupervisorBackend) AttachSyncNode(ctx context.Context, src syncnode.SyncNode, noSubscribe bool) (syncnode.Node, error) {
248
	su.logger.Info("attaching sync source to chain processor", "source", src)
249

250
	chainID, err := src.ChainID(ctx)
251
	if err != nil {
252
		return nil, fmt.Errorf("failed to identify chain ID of sync source: %w", err)
253 254
	}
	if !su.depSet.HasChain(chainID) {
255
		return nil, fmt.Errorf("chain %s is not part of the interop dependency set: %w", chainID, types.ErrUnknownChain)
256
	}
257 258
	err = su.AttachProcessorSource(chainID, src)
	if err != nil {
259
		return nil, fmt.Errorf("failed to attach sync source to processor: %w", err)
260
	}
261 262 263 264
	err = su.AttachSyncSource(chainID, src)
	if err != nil {
		return nil, fmt.Errorf("failed to attach sync source to node: %w", err)
	}
265
	return su.syncNodesController.AttachNodeController(chainID, src, noSubscribe)
266 267 268
}

func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src processors.Source) error {
269
	proc, ok := su.chainProcessors.Get(chainID)
270 271 272 273
	if !ok {
		return fmt.Errorf("unknown chain %s, cannot attach RPC to processor", chainID)
	}
	proc.SetSource(src)
274
	return nil
275 276
}

277 278 279 280 281 282 283 284 285
func (su *SupervisorBackend) AttachSyncSource(chainID types.ChainID, src syncnode.SyncSource) error {
	_, ok := su.syncSources.Get(chainID)
	if !ok {
		return fmt.Errorf("unknown chain %s, cannot attach RPC to sync source", chainID)
	}
	su.syncSources.Set(chainID, src)
	return nil
}

286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
func (su *SupervisorBackend) attachL1RPC(ctx context.Context, l1RPCAddr string) error {
	su.logger.Info("attaching L1 RPC to L1 processor", "rpc", l1RPCAddr)

	logger := su.logger.New("l1-rpc", l1RPCAddr)
	l1RPC, err := client.NewRPC(ctx, logger, l1RPCAddr)
	if err != nil {
		return fmt.Errorf("failed to setup L1 RPC: %w", err)
	}
	l1Client, err := sources.NewL1Client(
		l1RPC,
		su.logger,
		nil,
		// placeholder config for the L1
		sources.L1ClientSimpleConfig(true, sources.RPCKindBasic, 100))
	if err != nil {
		return fmt.Errorf("failed to setup L1 Client: %w", err)
	}
	su.AttachL1Source(l1Client)
	return nil
}

307 308 309 310 311
// AttachL1Source attaches an L1 source to the L1 accessor
// if the L1 accessor does not exist, it is created
// if an L1 source is already attached, it is replaced
func (su *SupervisorBackend) AttachL1Source(source l1access.L1Source) {
	su.l1Accessor.AttachClient(source)
312 313
}

314
func (su *SupervisorBackend) Start(ctx context.Context) error {
315
	// ensure we only start once
316 317 318
	if !su.started.CompareAndSwap(false, true) {
		return errors.New("already started")
	}
319

320 321
	// initiate "ResumeFromLastSealedBlock" on the chains db,
	// which rewinds the database to the last block that is guaranteed to have been fully recorded
322
	if err := su.chainDBs.ResumeFromLastSealedBlock(); err != nil {
323 324
		return fmt.Errorf("failed to resume chains db: %w", err)
	}
325 326 327

	if !su.synchronousProcessors {
		// Make all the chain-processors run automatic background processing
328
		su.chainProcessors.Range(func(_ types.ChainID, processor *processors.ChainProcessor) bool {
329
			processor.StartBackground()
330 331 332
			return true
		})
		su.crossUnsafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool {
333
			worker.StartBackground()
334 335 336
			return true
		})
		su.crossSafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool {
337
			worker.StartBackground()
338 339
			return true
		})
340 341
	}

342 343 344 345 346
	return nil
}

func (su *SupervisorBackend) Stop(ctx context.Context) error {
	if !su.started.CompareAndSwap(true, false) {
347
		return errAlreadyStopped
348
	}
349
	su.logger.Info("Closing supervisor backend")
350

351
	// close all processors
352
	su.chainProcessors.Range(func(id types.ChainID, processor *processors.ChainProcessor) bool {
353 354
		su.logger.Info("stopping chain processor", "chainID", id)
		processor.Close()
355 356 357
		return true
	})
	su.chainProcessors.Clear()
358

359
	su.crossUnsafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool {
360 361
		su.logger.Info("stopping cross-unsafe processor", "chainID", id)
		worker.Close()
362 363 364
		return true
	})
	su.crossUnsafeProcessors.Clear()
365

366
	su.crossSafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool {
367 368
		su.logger.Info("stopping cross-safe processor", "chainID", id)
		worker.Close()
369 370 371
		return true
	})
	su.crossSafeProcessors.Clear()
372

373 374
	su.syncNodesController.Close()

375
	// close the databases
376
	return su.chainDBs.Close()
377 378
}

379
// AddL2RPC attaches an RPC as the RPC for the given chain, overriding the previous RPC source, if any.
380
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string, jwtSecret eth.Bytes32) error {
381
	setupSrc := &syncnode.RPCDialSetup{
382 383 384 385 386 387 388
		JWTSecret: jwtSecret,
		Endpoint:  rpc,
	}
	src, err := setupSrc.Setup(ctx, su.logger)
	if err != nil {
		return fmt.Errorf("failed to set up sync source from RPC: %w", err)
	}
389 390
	_, err = su.AttachSyncNode(ctx, src, false)
	return err
391 392
}

393 394 395 396 397 398 399
// Internal methods, for processors
// ----------------------------

func (su *SupervisorBackend) DependencySet() depset.DependencySet {
	return su.depSet
}

400 401 402
// Query methods
// ----------------------------

403 404
func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
	logHash := types.PayloadHashToLogHash(payloadHash, identifier.Origin)
405 406 407
	chainID := identifier.ChainID
	blockNum := identifier.BlockNumber
	logIdx := identifier.LogIndex
408
	_, err := su.chainDBs.Check(chainID, blockNum, identifier.Timestamp, logIdx, logHash)
409
	if errors.Is(err, types.ErrFuture) {
410
		su.logger.Debug("Future message", "identifier", identifier, "payloadHash", payloadHash, "err", err)
411
		return types.LocalUnsafe, nil
412
	}
413
	if errors.Is(err, types.ErrConflict) {
414
		su.logger.Debug("Conflicting message", "identifier", identifier, "payloadHash", payloadHash, "err", err)
415 416
		return types.Invalid, nil
	}
417 418 419
	if err != nil {
		return types.Invalid, fmt.Errorf("failed to check log: %w", err)
	}
420
	return su.chainDBs.Safest(chainID, blockNum, logIdx)
421 422
}

423 424 425
func (su *SupervisorBackend) CheckMessages(
	messages []types.Message,
	minSafety types.SafetyLevel) error {
426 427
	su.logger.Debug("Checking messages", "count", len(messages), "minSafety", minSafety)

428
	for _, msg := range messages {
429 430
		su.logger.Debug("Checking message",
			"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
431 432
		safety, err := su.CheckMessage(msg.Identifier, msg.PayloadHash)
		if err != nil {
433 434
			su.logger.Error("Check message failed", "err", err,
				"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
435 436 437
			return fmt.Errorf("failed to check message: %w", err)
		}
		if !safety.AtLeastAsSafe(minSafety) {
438 439 440
			su.logger.Error("Message is not sufficiently safe",
				"safety", safety, "minSafety", minSafety,
				"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
441 442 443 444 445 446 447 448 449
			return fmt.Errorf("message %v (safety level: %v) does not meet the minimum safety %v",
				msg.Identifier,
				safety,
				minSafety)
		}
	}
	return nil
}

450 451
func (su *SupervisorBackend) CrossSafe(ctx context.Context, chainID types.ChainID) (types.DerivedIDPair, error) {
	p, err := su.chainDBs.CrossSafe(chainID)
452
	if err != nil {
453
		return types.DerivedIDPair{}, err
454
	}
455 456 457 458 459 460 461 462
	return types.DerivedIDPair{
		DerivedFrom: p.DerivedFrom.ID(),
		Derived:     p.Derived.ID(),
	}, nil
}

func (su *SupervisorBackend) LocalSafe(ctx context.Context, chainID types.ChainID) (types.DerivedIDPair, error) {
	p, err := su.chainDBs.LocalSafe(chainID)
463
	if err != nil {
464
		return types.DerivedIDPair{}, err
465
	}
466 467 468
	return types.DerivedIDPair{
		DerivedFrom: p.DerivedFrom.ID(),
		Derived:     p.Derived.ID(),
469 470 471
	}, nil
}

472 473
func (su *SupervisorBackend) LocalUnsafe(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
	v, err := su.chainDBs.LocalUnsafe(chainID)
474
	if err != nil {
475
		return eth.BlockID{}, err
476
	}
477 478 479 480 481
	return v.ID(), nil
}

func (su *SupervisorBackend) CrossUnsafe(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
	v, err := su.chainDBs.CrossUnsafe(chainID)
482
	if err != nil {
483
		return eth.BlockID{}, err
484
	}
485 486
	return v.ID(), nil
}
487

488 489 490 491 492 493
func (su *SupervisorBackend) SafeDerivedAt(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockID) (eth.BlockID, error) {
	v, err := su.chainDBs.SafeDerivedAt(chainID, derivedFrom)
	if err != nil {
		return eth.BlockID{}, err
	}
	return v.ID(), nil
494 495 496
}

func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
497 498 499 500 501
	v, err := su.chainDBs.Finalized(chainID)
	if err != nil {
		return eth.BlockID{}, err
	}
	return v.ID(), nil
502 503
}

504 505 506 507
func (su *SupervisorBackend) FinalizedL1() eth.BlockRef {
	return su.chainDBs.FinalizedL1()
}

508 509
func (su *SupervisorBackend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) {
	v, err := su.chainDBs.CrossDerivedFromBlockRef(chainID, derived)
510
	if err != nil {
511
		return eth.BlockRef{}, err
512
	}
513
	return v, nil
514 515
}

516 517 518 519
func (su *SupervisorBackend) L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error) {
	return su.l1Accessor.L1BlockRefByNumber(ctx, number)
}

520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559
func (su *SupervisorBackend) SuperRootAtTimestamp(ctx context.Context, timestamp hexutil.Uint64) (types.SuperRootResponse, error) {
	chains := su.depSet.Chains()
	slices.SortFunc(chains, func(a, b types.ChainID) int {
		return a.Cmp(b)
	})
	chainInfos := make([]types.ChainRootInfo, len(chains))
	superRootChains := make([]eth.ChainIDAndOutput, len(chains))
	for i, chainID := range chains {
		src, ok := su.syncSources.Get(chainID)
		if !ok {
			su.logger.Error("bug: unknown chain %s, cannot get sync source", chainID)
			return types.SuperRootResponse{}, fmt.Errorf("unknown chain %s, cannot get sync source", chainID)
		}
		output, err := src.OutputV0AtTimestamp(ctx, uint64(timestamp))
		if err != nil {
			return types.SuperRootResponse{}, err
		}
		pending, err := src.PendingOutputV0AtTimestamp(ctx, uint64(timestamp))
		if err != nil {
			return types.SuperRootResponse{}, err
		}
		canonicalRoot := eth.OutputRoot(output)
		chainInfos[i] = types.ChainRootInfo{
			ChainID:   chainID,
			Canonical: canonicalRoot,
			Pending:   pending.Marshal(),
		}
		superRootChains[i] = eth.ChainIDAndOutput{ChainID: chainID.ToBig().Uint64(), Output: canonicalRoot}
	}
	superRoot := eth.SuperRoot(&eth.SuperV1{
		Timestamp: uint64(timestamp),
		Chains:    superRootChains,
	})
	return types.SuperRootResponse{
		Timestamp: uint64(timestamp),
		SuperRoot: superRoot,
		Chains:    chainInfos,
	}, nil
}

560 561 562
// Update methods
// ----------------------------

563
func (su *SupervisorBackend) UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error {
564
	ch, ok := su.chainProcessors.Get(chainID)
565
	if !ok {
566
		return types.ErrUnknownChain
567
	}
568 569 570
	return ch.OnNewHead(head)
}

571
func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
572 573 574 575 576 577
	err := su.chainDBs.UpdateLocalSafe(chainID, derivedFrom, lastDerived)
	if err != nil {
		return err
	}
	su.onNewLocalSafeData()
	return nil
578
}
579

580 581 582 583
func (su *SupervisorBackend) RecordNewL1(ctx context.Context, chain types.ChainID, ref eth.BlockRef) error {
	return su.chainDBs.RecordNewL1(chain, ref)
}

584 585 586 587
// Access to synchronous processing for tests
// ----------------------------

func (su *SupervisorBackend) SyncEvents(chainID types.ChainID) error {
588
	ch, ok := su.chainProcessors.Get(chainID)
589 590 591 592 593 594 595 596
	if !ok {
		return types.ErrUnknownChain
	}
	ch.ProcessToHead()
	return nil
}

func (su *SupervisorBackend) SyncCrossUnsafe(chainID types.ChainID) error {
597
	ch, ok := su.crossUnsafeProcessors.Get(chainID)
598 599 600 601 602 603 604
	if !ok {
		return types.ErrUnknownChain
	}
	return ch.ProcessWork()
}

func (su *SupervisorBackend) SyncCrossSafe(chainID types.ChainID) error {
605
	ch, ok := su.crossSafeProcessors.Get(chainID)
606 607 608 609 610
	if !ok {
		return types.ErrUnknownChain
	}
	return ch.ProcessWork()
}
611

612
// SyncFinalizedL1 is a test-only method to update the finalized L1 block without the use of a subscription
613
func (su *SupervisorBackend) SyncFinalizedL1(ref eth.BlockRef) {
614 615
	fn := processors.MaybeUpdateFinalizedL1Fn(context.Background(), su.logger, su.chainDBs)
	fn(context.Background(), ref)
616
}