backend.go 19.7 KB
Newer Older
1 2 3 4 5
package backend

import (
	"context"
	"errors"
6
	"fmt"
7
	"slices"
8 9
	"sync/atomic"

10
	"github.com/ethereum/go-ethereum/common"
11
	"github.com/ethereum/go-ethereum/common/hexutil"
12 13
	"github.com/ethereum/go-ethereum/log"

14
	"github.com/ethereum-optimism/optimism/op-node/rollup/event"
15
	"github.com/ethereum-optimism/optimism/op-service/client"
16
	"github.com/ethereum-optimism/optimism/op-service/eth"
17
	"github.com/ethereum-optimism/optimism/op-service/locks"
18
	"github.com/ethereum-optimism/optimism/op-service/sources"
19
	"github.com/ethereum-optimism/optimism/op-supervisor/config"
20
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/cross"
21
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
22
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/sync"
23
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
24
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/l1access"
25
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
26
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
27
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
28 29
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
30 31 32 33
)

type SupervisorBackend struct {
	started atomic.Bool
34
	logger  log.Logger
35 36
	m       Metrics
	dataDir string
37

38 39 40 41 42
	eventSys event.System

	sysContext context.Context
	sysCancel  context.CancelFunc

43
	// depSet is the dependency set that the backend uses to know about the chains it is indexing
44 45
	depSet depset.DependencySet

46
	// chainDBs is the primary interface to the databases, including logs, derived-from information and L1 finalization
47
	chainDBs *db.ChainsDB
48

49 50
	// l1Accessor provides access to the L1 chain for the L1 processor and subscribes to new block events
	l1Accessor *l1access.L1Accessor
51

52
	// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
53
	chainProcessors locks.RWMap[eth.ChainID, *processors.ChainProcessor]
54

55
	syncSources locks.RWMap[eth.ChainID, syncnode.SyncSource]
56

57 58
	// syncNodesController controls the derivation or reset of the sync nodes
	syncNodesController *syncnode.SyncNodesController
59

60 61 62
	// synchronousProcessors disables background-workers,
	// requiring manual triggers for the backend to process l2 data.
	synchronousProcessors bool
63 64 65

	// chainMetrics are used to track metrics for each chain
	// they are reused for processors and databases of the same chain
66
	chainMetrics locks.RWMap[eth.ChainID, *chainMetrics]
67 68

	emitter event.Emitter
69 70
}

71
var _ event.AttachEmitter = (*SupervisorBackend)(nil)
72 73
var _ frontend.Backend = (*SupervisorBackend)(nil)

74
var errAlreadyStopped = errors.New("already stopped")
75

76 77
func NewSupervisorBackend(ctx context.Context, logger log.Logger,
	m Metrics, cfg *config.Config, eventExec event.Executor) (*SupervisorBackend, error) {
78
	// attempt to prepare the data directory
79
	if err := db.PrepDataDir(cfg.Datadir); err != nil {
80 81
		return nil, err
	}
82

83 84 85 86 87 88
	// Load the dependency set
	depSet, err := cfg.DependencySetSource.LoadDependencySet(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to load dependency set: %w", err)
	}

89 90 91 92 93 94 95 96 97 98 99 100 101
	// Sync the databases from the remote server if configured
	// We only attempt to sync a database if it doesn't exist; we don't update existing databases
	if cfg.DatadirSyncEndpoint != "" {
		syncCfg := sync.Config{DataDir: cfg.Datadir, Logger: logger}
		syncClient, err := sync.NewClient(syncCfg, cfg.DatadirSyncEndpoint)
		if err != nil {
			return nil, fmt.Errorf("failed to create db sync client: %w", err)
		}
		if err := syncClient.SyncAll(ctx, depSet.Chains(), false); err != nil {
			return nil, fmt.Errorf("failed to sync databases: %w", err)
		}
	}

102 103 104 105
	eventSys := event.NewSystem(logger, eventExec)

	sysCtx, sysCancel := context.WithCancel(ctx)

106
	// create initial per-chain resources
107
	chainsDBs := db.NewChainsDB(logger, depSet)
108
	eventSys.Register("chainsDBs", chainsDBs, event.DefaultRegisterOpts())
109

110 111
	l1Accessor := l1access.NewL1Accessor(sysCtx, logger, nil)
	eventSys.Register("l1Accessor", l1Accessor, event.DefaultRegisterOpts())
112

113 114
	// create the supervisor backend
	super := &SupervisorBackend{
115 116 117 118 119 120
		logger:     logger,
		m:          m,
		dataDir:    cfg.Datadir,
		depSet:     depSet,
		chainDBs:   chainsDBs,
		l1Accessor: l1Accessor,
121 122
		// For testing we can avoid running the processors.
		synchronousProcessors: cfg.SynchronousProcessors,
123 124 125
		eventSys:              eventSys,
		sysCancel:             sysCancel,
		sysContext:            sysCtx,
126
	}
127
	eventSys.Register("backend", super, event.DefaultRegisterOpts())
128

129
	// create node controller
130 131
	super.syncNodesController = syncnode.NewSyncNodesController(logger, depSet, eventSys, super)
	eventSys.Register("sync-controller", super.syncNodesController, event.DefaultRegisterOpts())
132

133 134 135 136 137
	// Initialize the resources of the supervisor backend.
	// Stop the supervisor if any of the resources fails to be initialized.
	if err := super.initResources(ctx, cfg); err != nil {
		err = fmt.Errorf("failed to init resources: %w", err)
		return nil, errors.Join(err, super.Stop(ctx))
138 139
	}

140 141 142
	return super, nil
}

143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
func (su *SupervisorBackend) OnEvent(ev event.Event) bool {
	switch x := ev.(type) {
	case superevents.LocalUnsafeReceivedEvent:
		su.emitter.Emit(superevents.ChainProcessEvent{
			ChainID: x.ChainID,
			Target:  x.NewLocalUnsafe.Number,
		})
	case superevents.LocalUnsafeUpdateEvent:
		su.emitter.Emit(superevents.UpdateCrossUnsafeRequestEvent{
			ChainID: x.ChainID,
		})
	case superevents.LocalSafeUpdateEvent:
		su.emitter.Emit(superevents.UpdateCrossSafeRequestEvent{
			ChainID: x.ChainID,
		})
	default:
		return false
	}
	return true
}

func (su *SupervisorBackend) AttachEmitter(em event.Emitter) {
	su.emitter = em
}

168 169 170 171 172 173 174 175 176 177 178 179 180
// initResources initializes all the resources, such as DBs and processors for chains.
// An error may returned, without closing the thus-far initialized resources.
// Upon error the caller should call Stop() on the supervisor backend to clean up and release resources.
func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Config) error {
	chains := su.depSet.Chains()

	// for each chain known to the dependency set, create the necessary DB resources
	for _, chainID := range chains {
		if err := su.openChainDBs(chainID); err != nil {
			return fmt.Errorf("failed to open chain %s: %w", chainID, err)
		}
	}

181
	eventOpts := event.DefaultRegisterOpts()
182 183 184
	// initialize all cross-unsafe processors
	for _, chainID := range chains {
		worker := cross.NewCrossUnsafeWorker(su.logger, chainID, su.chainDBs)
185
		su.eventSys.Register(fmt.Sprintf("cross-unsafe-%s", chainID), worker, eventOpts)
186 187 188 189
	}
	// initialize all cross-safe processors
	for _, chainID := range chains {
		worker := cross.NewCrossSafeWorker(su.logger, chainID, su.chainDBs)
190
		su.eventSys.Register(fmt.Sprintf("cross-safe-%s", chainID), worker, eventOpts)
191
	}
192 193 194
	// For each chain initialize a chain processor service,
	// after cross-unsafe workers are ready to receive updates
	for _, chainID := range chains {
195
		logProcessor := processors.NewLogProcessor(chainID, su.chainDBs, su.depSet)
196 197
		chainProcessor := processors.NewChainProcessor(su.sysContext, su.logger, chainID, logProcessor, su.chainDBs)
		su.eventSys.Register(fmt.Sprintf("events-%s", chainID), chainProcessor, eventOpts)
198
		su.chainProcessors.Set(chainID, chainProcessor)
199
	}
200 201 202 203
	// initialize sync sources
	for _, chainID := range chains {
		su.syncSources.Set(chainID, nil)
	}
204

205 206 207 208 209 210 211 212
	if cfg.L1RPC != "" {
		if err := su.attachL1RPC(ctx, cfg.L1RPC); err != nil {
			return fmt.Errorf("failed to create L1 processor: %w", err)
		}
	} else {
		su.logger.Warn("No L1 RPC configured, L1 processor will not be started")
	}

213 214 215 216 217 218 219
	setups, err := cfg.SyncSources.Load(ctx, su.logger)
	if err != nil {
		return fmt.Errorf("failed to load sync-source setups: %w", err)
	}
	// the config has some sync sources (RPC connections) to attach to the chain-processors
	for _, srcSetup := range setups {
		src, err := srcSetup.Setup(ctx, su.logger)
220
		if err != nil {
221 222
			return fmt.Errorf("failed to set up sync source: %w", err)
		}
223
		if _, err := su.AttachSyncNode(ctx, src, false); err != nil {
224
			return fmt.Errorf("failed to attach sync source %s: %w", src, err)
225
		}
226
	}
227
	return nil
228
}
229

230 231
// openChainDBs initializes all the DB resources of a specific chain.
// It is a sub-task of initResources.
232
func (su *SupervisorBackend) openChainDBs(chainID eth.ChainID) error {
233 234
	cm := newChainMetrics(chainID, su.m)
	// create metrics and a logdb for the chain
235
	su.chainMetrics.Set(chainID, cm)
236 237

	logDB, err := db.OpenLogDB(su.logger, chainID, su.dataDir, cm)
238
	if err != nil {
239
		return fmt.Errorf("failed to open logDB of chain %s: %w", chainID, err)
240
	}
241 242 243
	su.chainDBs.AddLogDB(chainID, logDB)

	localDB, err := db.OpenLocalDerivedFromDB(su.logger, chainID, su.dataDir, cm)
244
	if err != nil {
245
		return fmt.Errorf("failed to open local derived-from DB of chain %s: %w", chainID, err)
246
	}
247 248 249
	su.chainDBs.AddLocalDerivedFromDB(chainID, localDB)

	crossDB, err := db.OpenCrossDerivedFromDB(su.logger, chainID, su.dataDir, cm)
250
	if err != nil {
251
		return fmt.Errorf("failed to open cross derived-from DB of chain %s: %w", chainID, err)
252
	}
253 254 255
	su.chainDBs.AddCrossDerivedFromDB(chainID, crossDB)

	su.chainDBs.AddCrossUnsafeTracker(chainID)
256

257 258 259
	return nil
}

260 261 262
// AttachSyncNode attaches a node to be managed by the supervisor.
// If noSubscribe, the node is not actively polled/subscribed to, and requires manual Node.PullEvents calls.
func (su *SupervisorBackend) AttachSyncNode(ctx context.Context, src syncnode.SyncNode, noSubscribe bool) (syncnode.Node, error) {
263
	su.logger.Info("attaching sync source to chain processor", "source", src)
264

265
	chainID, err := src.ChainID(ctx)
266
	if err != nil {
267
		return nil, fmt.Errorf("failed to identify chain ID of sync source: %w", err)
268 269
	}
	if !su.depSet.HasChain(chainID) {
270
		return nil, fmt.Errorf("chain %s is not part of the interop dependency set: %w", chainID, types.ErrUnknownChain)
271
	}
272 273
	err = su.AttachProcessorSource(chainID, src)
	if err != nil {
274
		return nil, fmt.Errorf("failed to attach sync source to processor: %w", err)
275
	}
276 277 278 279
	err = su.AttachSyncSource(chainID, src)
	if err != nil {
		return nil, fmt.Errorf("failed to attach sync source to node: %w", err)
	}
280
	return su.syncNodesController.AttachNodeController(chainID, src, noSubscribe)
281 282
}

283
func (su *SupervisorBackend) AttachProcessorSource(chainID eth.ChainID, src processors.Source) error {
284
	proc, ok := su.chainProcessors.Get(chainID)
285 286 287 288
	if !ok {
		return fmt.Errorf("unknown chain %s, cannot attach RPC to processor", chainID)
	}
	proc.SetSource(src)
289
	return nil
290 291
}

292
func (su *SupervisorBackend) AttachSyncSource(chainID eth.ChainID, src syncnode.SyncSource) error {
293 294 295 296 297 298 299 300
	_, ok := su.syncSources.Get(chainID)
	if !ok {
		return fmt.Errorf("unknown chain %s, cannot attach RPC to sync source", chainID)
	}
	su.syncSources.Set(chainID, src)
	return nil
}

301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
func (su *SupervisorBackend) attachL1RPC(ctx context.Context, l1RPCAddr string) error {
	su.logger.Info("attaching L1 RPC to L1 processor", "rpc", l1RPCAddr)

	logger := su.logger.New("l1-rpc", l1RPCAddr)
	l1RPC, err := client.NewRPC(ctx, logger, l1RPCAddr)
	if err != nil {
		return fmt.Errorf("failed to setup L1 RPC: %w", err)
	}
	l1Client, err := sources.NewL1Client(
		l1RPC,
		su.logger,
		nil,
		// placeholder config for the L1
		sources.L1ClientSimpleConfig(true, sources.RPCKindBasic, 100))
	if err != nil {
		return fmt.Errorf("failed to setup L1 Client: %w", err)
	}
	su.AttachL1Source(l1Client)
	return nil
}

322 323 324 325
// AttachL1Source attaches an L1 source to the L1 accessor
// if the L1 accessor does not exist, it is created
// if an L1 source is already attached, it is replaced
func (su *SupervisorBackend) AttachL1Source(source l1access.L1Source) {
326
	su.l1Accessor.AttachClient(source, !su.synchronousProcessors)
327 328
}

329
func (su *SupervisorBackend) Start(ctx context.Context) error {
330
	// ensure we only start once
331 332 333
	if !su.started.CompareAndSwap(false, true) {
		return errors.New("already started")
	}
334

335 336
	// initiate "ResumeFromLastSealedBlock" on the chains db,
	// which rewinds the database to the last block that is guaranteed to have been fully recorded
337
	if err := su.chainDBs.ResumeFromLastSealedBlock(); err != nil {
338 339
		return fmt.Errorf("failed to resume chains db: %w", err)
	}
340

341 342 343 344 345
	return nil
}

func (su *SupervisorBackend) Stop(ctx context.Context) error {
	if !su.started.CompareAndSwap(true, false) {
346
		return errAlreadyStopped
347
	}
348
	su.logger.Info("Closing supervisor backend")
349

350 351
	su.sysCancel()
	defer su.eventSys.Stop()
352

353
	su.chainProcessors.Clear()
354

355 356
	su.syncNodesController.Close()

357
	// close the databases
358
	return su.chainDBs.Close()
359 360
}

361
// AddL2RPC attaches an RPC as the RPC for the given chain, overriding the previous RPC source, if any.
362
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string, jwtSecret eth.Bytes32) error {
363
	setupSrc := &syncnode.RPCDialSetup{
364 365 366 367 368 369 370
		JWTSecret: jwtSecret,
		Endpoint:  rpc,
	}
	src, err := setupSrc.Setup(ctx, su.logger)
	if err != nil {
		return fmt.Errorf("failed to set up sync source from RPC: %w", err)
	}
371 372
	_, err = su.AttachSyncNode(ctx, src, false)
	return err
373 374
}

375 376 377 378 379 380 381
// Internal methods, for processors
// ----------------------------

func (su *SupervisorBackend) DependencySet() depset.DependencySet {
	return su.depSet
}

382 383 384
// Query methods
// ----------------------------

385 386
func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
	logHash := types.PayloadHashToLogHash(payloadHash, identifier.Origin)
387 388 389
	chainID := identifier.ChainID
	blockNum := identifier.BlockNumber
	logIdx := identifier.LogIndex
390
	_, err := su.chainDBs.Check(chainID, blockNum, identifier.Timestamp, logIdx, logHash)
391
	if errors.Is(err, types.ErrFuture) {
392
		su.logger.Debug("Future message", "identifier", identifier, "payloadHash", payloadHash, "err", err)
393
		return types.LocalUnsafe, nil
394
	}
395
	if errors.Is(err, types.ErrConflict) {
396
		su.logger.Debug("Conflicting message", "identifier", identifier, "payloadHash", payloadHash, "err", err)
397 398
		return types.Invalid, nil
	}
399 400 401
	if err != nil {
		return types.Invalid, fmt.Errorf("failed to check log: %w", err)
	}
402
	return su.chainDBs.Safest(chainID, blockNum, logIdx)
403 404
}

405 406 407
func (su *SupervisorBackend) CheckMessages(
	messages []types.Message,
	minSafety types.SafetyLevel) error {
408 409
	su.logger.Debug("Checking messages", "count", len(messages), "minSafety", minSafety)

410
	for _, msg := range messages {
411 412
		su.logger.Debug("Checking message",
			"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
413 414
		safety, err := su.CheckMessage(msg.Identifier, msg.PayloadHash)
		if err != nil {
415 416
			su.logger.Error("Check message failed", "err", err,
				"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
417 418 419
			return fmt.Errorf("failed to check message: %w", err)
		}
		if !safety.AtLeastAsSafe(minSafety) {
420 421 422
			su.logger.Error("Message is not sufficiently safe",
				"safety", safety, "minSafety", minSafety,
				"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
423 424 425 426 427 428 429 430 431
			return fmt.Errorf("message %v (safety level: %v) does not meet the minimum safety %v",
				msg.Identifier,
				safety,
				minSafety)
		}
	}
	return nil
}

432
func (su *SupervisorBackend) CrossSafe(ctx context.Context, chainID eth.ChainID) (types.DerivedIDPair, error) {
433
	p, err := su.chainDBs.CrossSafe(chainID)
434
	if err != nil {
435
		return types.DerivedIDPair{}, err
436
	}
437 438 439 440 441 442
	return types.DerivedIDPair{
		DerivedFrom: p.DerivedFrom.ID(),
		Derived:     p.Derived.ID(),
	}, nil
}

443
func (su *SupervisorBackend) LocalSafe(ctx context.Context, chainID eth.ChainID) (types.DerivedIDPair, error) {
444
	p, err := su.chainDBs.LocalSafe(chainID)
445
	if err != nil {
446
		return types.DerivedIDPair{}, err
447
	}
448 449 450
	return types.DerivedIDPair{
		DerivedFrom: p.DerivedFrom.ID(),
		Derived:     p.Derived.ID(),
451 452 453
	}, nil
}

454
func (su *SupervisorBackend) LocalUnsafe(ctx context.Context, chainID eth.ChainID) (eth.BlockID, error) {
455
	v, err := su.chainDBs.LocalUnsafe(chainID)
456
	if err != nil {
457
		return eth.BlockID{}, err
458
	}
459 460 461
	return v.ID(), nil
}

462
func (su *SupervisorBackend) CrossUnsafe(ctx context.Context, chainID eth.ChainID) (eth.BlockID, error) {
463
	v, err := su.chainDBs.CrossUnsafe(chainID)
464
	if err != nil {
465
		return eth.BlockID{}, err
466
	}
467 468
	return v.ID(), nil
}
469

470
func (su *SupervisorBackend) SafeDerivedAt(ctx context.Context, chainID eth.ChainID, derivedFrom eth.BlockID) (eth.BlockID, error) {
471 472 473 474 475
	v, err := su.chainDBs.SafeDerivedAt(chainID, derivedFrom)
	if err != nil {
		return eth.BlockID{}, err
	}
	return v.ID(), nil
476 477
}

478 479 480 481 482 483 484 485 486 487 488 489 490 491
// AllSafeDerivedAt returns the last derived block for each chain, from the given L1 block
func (su *SupervisorBackend) AllSafeDerivedAt(ctx context.Context, derivedFrom eth.BlockID) (map[eth.ChainID]eth.BlockID, error) {
	chains := su.depSet.Chains()
	ret := map[eth.ChainID]eth.BlockID{}
	for _, chainID := range chains {
		derived, err := su.SafeDerivedAt(ctx, chainID, derivedFrom)
		if err != nil {
			return nil, fmt.Errorf("failed to get last derived block for chain %v: %w", chainID, err)
		}
		ret[chainID] = derived
	}
	return ret, nil
}

492
func (su *SupervisorBackend) Finalized(ctx context.Context, chainID eth.ChainID) (eth.BlockID, error) {
493 494 495 496 497
	v, err := su.chainDBs.Finalized(chainID)
	if err != nil {
		return eth.BlockID{}, err
	}
	return v.ID(), nil
498 499
}

500 501 502 503
func (su *SupervisorBackend) FinalizedL1() eth.BlockRef {
	return su.chainDBs.FinalizedL1()
}

504
func (su *SupervisorBackend) CrossDerivedFrom(ctx context.Context, chainID eth.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) {
505
	v, err := su.chainDBs.CrossDerivedFromBlockRef(chainID, derived)
506
	if err != nil {
507
		return eth.BlockRef{}, err
508
	}
509
	return v, nil
510 511
}

512 513 514 515
func (su *SupervisorBackend) L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error) {
	return su.l1Accessor.L1BlockRefByNumber(ctx, number)
}

516
func (su *SupervisorBackend) SuperRootAtTimestamp(ctx context.Context, timestamp hexutil.Uint64) (eth.SuperRootResponse, error) {
517
	chains := su.depSet.Chains()
518
	slices.SortFunc(chains, func(a, b eth.ChainID) int {
519 520
		return a.Cmp(b)
	})
521
	chainInfos := make([]eth.ChainRootInfo, len(chains))
522 523 524 525 526
	superRootChains := make([]eth.ChainIDAndOutput, len(chains))
	for i, chainID := range chains {
		src, ok := su.syncSources.Get(chainID)
		if !ok {
			su.logger.Error("bug: unknown chain %s, cannot get sync source", chainID)
527
			return eth.SuperRootResponse{}, fmt.Errorf("unknown chain %s, cannot get sync source", chainID)
528 529 530
		}
		output, err := src.OutputV0AtTimestamp(ctx, uint64(timestamp))
		if err != nil {
531
			return eth.SuperRootResponse{}, err
532 533 534
		}
		pending, err := src.PendingOutputV0AtTimestamp(ctx, uint64(timestamp))
		if err != nil {
535
			return eth.SuperRootResponse{}, err
536 537
		}
		canonicalRoot := eth.OutputRoot(output)
538
		chainInfos[i] = eth.ChainRootInfo{
539 540 541 542
			ChainID:   chainID,
			Canonical: canonicalRoot,
			Pending:   pending.Marshal(),
		}
543
		superRootChains[i] = eth.ChainIDAndOutput{ChainID: chainID, Output: canonicalRoot}
544 545 546 547 548
	}
	superRoot := eth.SuperRoot(&eth.SuperV1{
		Timestamp: uint64(timestamp),
		Chains:    superRootChains,
	})
549
	return eth.SuperRootResponse{
550 551 552 553 554 555
		Timestamp: uint64(timestamp),
		SuperRoot: superRoot,
		Chains:    chainInfos,
	}, nil
}

556 557 558
// PullLatestL1 makes the supervisor aware of the latest L1 block. Exposed for testing purposes.
func (su *SupervisorBackend) PullLatestL1() error {
	return su.l1Accessor.PullLatest()
559 560
}

561 562 563
// PullFinalizedL1 makes the supervisor aware of the finalized L1 block. Exposed for testing purposes.
func (su *SupervisorBackend) PullFinalizedL1() error {
	return su.l1Accessor.PullFinalized()
564
}
565

566 567 568
// SetConfDepthL1 changes the confirmation depth of the L1 chain that is accessible to the supervisor.
func (su *SupervisorBackend) SetConfDepthL1(depth uint64) {
	su.l1Accessor.SetConfDepth(depth)
569
}
570 571 572 573 574

// Rewind rolls back the state of the supervisor for the given chain.
func (su *SupervisorBackend) Rewind(chain eth.ChainID, block eth.BlockID) error {
	return su.chainDBs.Rewind(chain, block)
}