backend.go 18.9 KB
Newer Older
1 2 3 4 5
package backend

import (
	"context"
	"errors"
6
	"fmt"
7 8
	"sync/atomic"

9 10 11
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/log"

12
	"github.com/ethereum-optimism/optimism/op-service/client"
13
	"github.com/ethereum-optimism/optimism/op-service/eth"
14
	"github.com/ethereum-optimism/optimism/op-service/locks"
15
	"github.com/ethereum-optimism/optimism/op-service/sources"
16
	"github.com/ethereum-optimism/optimism/op-supervisor/config"
17
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/cross"
18
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
19
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/sync"
20
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
21
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/l1access"
22
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
23
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/syncnode"
24 25
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
26 27 28 29
)

type SupervisorBackend struct {
	started atomic.Bool
30
	logger  log.Logger
31 32
	m       Metrics
	dataDir string
33

34
	// depSet is the dependency set that the backend uses to know about the chains it is indexing
35 36
	depSet depset.DependencySet

37
	// chainDBs is the primary interface to the databases, including logs, derived-from information and L1 finalization
38
	chainDBs *db.ChainsDB
39

40 41
	// l1Accessor provides access to the L1 chain for the L1 processor and subscribes to new block events
	l1Accessor *l1access.L1Accessor
42

43
	// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
44
	chainProcessors locks.RWMap[types.ChainID, *processors.ChainProcessor]
45 46 47
	// crossProcessors are used to index cross-chain dependency validity data once the log events are indexed
	crossSafeProcessors   locks.RWMap[types.ChainID, *cross.Worker]
	crossUnsafeProcessors locks.RWMap[types.ChainID, *cross.Worker]
48

49 50
	// syncNodesController controls the derivation or reset of the sync nodes
	syncNodesController *syncnode.SyncNodesController
51

52 53 54
	// synchronousProcessors disables background-workers,
	// requiring manual triggers for the backend to process l2 data.
	synchronousProcessors bool
55 56 57 58

	// chainMetrics are used to track metrics for each chain
	// they are reused for processors and databases of the same chain
	chainMetrics locks.RWMap[types.ChainID, *chainMetrics]
59 60 61 62
}

var _ frontend.Backend = (*SupervisorBackend)(nil)

63
var errAlreadyStopped = errors.New("already stopped")
64

65
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
66
	// attempt to prepare the data directory
67
	if err := db.PrepDataDir(cfg.Datadir); err != nil {
68 69
		return nil, err
	}
70

71 72 73 74 75 76
	// Load the dependency set
	depSet, err := cfg.DependencySetSource.LoadDependencySet(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to load dependency set: %w", err)
	}

77 78 79 80 81 82 83 84 85 86 87 88 89
	// Sync the databases from the remote server if configured
	// We only attempt to sync a database if it doesn't exist; we don't update existing databases
	if cfg.DatadirSyncEndpoint != "" {
		syncCfg := sync.Config{DataDir: cfg.Datadir, Logger: logger}
		syncClient, err := sync.NewClient(syncCfg, cfg.DatadirSyncEndpoint)
		if err != nil {
			return nil, fmt.Errorf("failed to create db sync client: %w", err)
		}
		if err := syncClient.SyncAll(ctx, depSet.Chains(), false); err != nil {
			return nil, fmt.Errorf("failed to sync databases: %w", err)
		}
	}

90
	// create initial per-chain resources
91
	chainsDBs := db.NewChainsDB(logger, depSet)
92

93 94 95 96 97
	l1Accessor := l1access.NewL1Accessor(
		logger,
		nil,
		processors.MaybeUpdateFinalizedL1Fn(context.Background(), logger, chainsDBs),
	)
98

99 100
	// create the supervisor backend
	super := &SupervisorBackend{
101 102 103 104 105 106
		logger:     logger,
		m:          m,
		dataDir:    cfg.Datadir,
		depSet:     depSet,
		chainDBs:   chainsDBs,
		l1Accessor: l1Accessor,
107 108 109 110
		// For testing we can avoid running the processors.
		synchronousProcessors: cfg.SynchronousProcessors,
	}

111 112 113
	// create node controller
	super.syncNodesController = syncnode.NewSyncNodesController(logger, depSet, chainsDBs, super)

114 115 116 117 118
	// Initialize the resources of the supervisor backend.
	// Stop the supervisor if any of the resources fails to be initialized.
	if err := super.initResources(ctx, cfg); err != nil {
		err = fmt.Errorf("failed to init resources: %w", err)
		return nil, errors.Join(err, super.Stop(ctx))
119 120
	}

121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
	return super, nil
}

// initResources initializes all the resources, such as DBs and processors for chains.
// An error may returned, without closing the thus-far initialized resources.
// Upon error the caller should call Stop() on the supervisor backend to clean up and release resources.
func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Config) error {
	chains := su.depSet.Chains()

	// for each chain known to the dependency set, create the necessary DB resources
	for _, chainID := range chains {
		if err := su.openChainDBs(chainID); err != nil {
			return fmt.Errorf("failed to open chain %s: %w", chainID, err)
		}
	}

137 138 139
	// initialize all cross-unsafe processors
	for _, chainID := range chains {
		worker := cross.NewCrossUnsafeWorker(su.logger, chainID, su.chainDBs)
140
		su.crossUnsafeProcessors.Set(chainID, worker)
141 142 143 144
	}
	// initialize all cross-safe processors
	for _, chainID := range chains {
		worker := cross.NewCrossSafeWorker(su.logger, chainID, su.chainDBs)
145
		su.crossSafeProcessors.Set(chainID, worker)
146
	}
147 148 149 150 151
	// For each chain initialize a chain processor service,
	// after cross-unsafe workers are ready to receive updates
	for _, chainID := range chains {
		logProcessor := processors.NewLogProcessor(chainID, su.chainDBs)
		chainProcessor := processors.NewChainProcessor(su.logger, chainID, logProcessor, su.chainDBs, su.onIndexedLocalUnsafeData)
152
		su.chainProcessors.Set(chainID, chainProcessor)
153
	}
154

155 156 157 158 159 160 161 162
	if cfg.L1RPC != "" {
		if err := su.attachL1RPC(ctx, cfg.L1RPC); err != nil {
			return fmt.Errorf("failed to create L1 processor: %w", err)
		}
	} else {
		su.logger.Warn("No L1 RPC configured, L1 processor will not be started")
	}

163 164 165 166 167 168 169
	setups, err := cfg.SyncSources.Load(ctx, su.logger)
	if err != nil {
		return fmt.Errorf("failed to load sync-source setups: %w", err)
	}
	// the config has some sync sources (RPC connections) to attach to the chain-processors
	for _, srcSetup := range setups {
		src, err := srcSetup.Setup(ctx, su.logger)
170
		if err != nil {
171 172
			return fmt.Errorf("failed to set up sync source: %w", err)
		}
173
		if _, err := su.AttachSyncNode(ctx, src, false); err != nil {
174
			return fmt.Errorf("failed to attach sync source %s: %w", src, err)
175
		}
176
	}
177
	return nil
178
}
179

180 181 182 183 184 185 186
// onIndexedLocalUnsafeData is called by the event indexing workers.
// This signals to cross-unsafe workers that there's data to index.
func (su *SupervisorBackend) onIndexedLocalUnsafeData() {
	// We signal all workers, since dependencies on a chain may be unblocked
	// by new data on other chains.
	// Busy workers don't block processing.
	// The signal is picked up only if the worker is running in the background.
187
	su.crossUnsafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool {
188
		w.OnNewData()
189 190
		return true
	})
191 192 193 194 195 196 197 198 199
}

// onNewLocalSafeData is called by the safety-indexing.
// This signals to cross-safe workers that there's data to index.
func (su *SupervisorBackend) onNewLocalSafeData() {
	// We signal all workers, since dependencies on a chain may be unblocked
	// by new data on other chains.
	// Busy workers don't block processing.
	// The signal is picked up only if the worker is running in the background.
200
	su.crossSafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool {
201
		w.OnNewData()
202 203
		return true
	})
204 205
}

206 207 208 209 210
// openChainDBs initializes all the DB resources of a specific chain.
// It is a sub-task of initResources.
func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error {
	cm := newChainMetrics(chainID, su.m)
	// create metrics and a logdb for the chain
211
	su.chainMetrics.Set(chainID, cm)
212 213

	logDB, err := db.OpenLogDB(su.logger, chainID, su.dataDir, cm)
214
	if err != nil {
215
		return fmt.Errorf("failed to open logDB of chain %s: %w", chainID, err)
216
	}
217 218 219
	su.chainDBs.AddLogDB(chainID, logDB)

	localDB, err := db.OpenLocalDerivedFromDB(su.logger, chainID, su.dataDir, cm)
220
	if err != nil {
221
		return fmt.Errorf("failed to open local derived-from DB of chain %s: %w", chainID, err)
222
	}
223 224 225
	su.chainDBs.AddLocalDerivedFromDB(chainID, localDB)

	crossDB, err := db.OpenCrossDerivedFromDB(su.logger, chainID, su.dataDir, cm)
226
	if err != nil {
227
		return fmt.Errorf("failed to open cross derived-from DB of chain %s: %w", chainID, err)
228
	}
229 230 231
	su.chainDBs.AddCrossDerivedFromDB(chainID, crossDB)

	su.chainDBs.AddCrossUnsafeTracker(chainID)
232 233 234

	su.chainDBs.AddSubscriptions(chainID)

235 236 237
	return nil
}

238 239 240
// AttachSyncNode attaches a node to be managed by the supervisor.
// If noSubscribe, the node is not actively polled/subscribed to, and requires manual Node.PullEvents calls.
func (su *SupervisorBackend) AttachSyncNode(ctx context.Context, src syncnode.SyncNode, noSubscribe bool) (syncnode.Node, error) {
241
	su.logger.Info("attaching sync source to chain processor", "source", src)
242

243
	chainID, err := src.ChainID(ctx)
244
	if err != nil {
245
		return nil, fmt.Errorf("failed to identify chain ID of sync source: %w", err)
246 247
	}
	if !su.depSet.HasChain(chainID) {
248
		return nil, fmt.Errorf("chain %s is not part of the interop dependency set: %w", chainID, types.ErrUnknownChain)
249
	}
250 251
	err = su.AttachProcessorSource(chainID, src)
	if err != nil {
252
		return nil, fmt.Errorf("failed to attach sync source to processor: %w", err)
253
	}
254
	return su.syncNodesController.AttachNodeController(chainID, src, noSubscribe)
255 256 257
}

func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src processors.Source) error {
258
	proc, ok := su.chainProcessors.Get(chainID)
259 260 261 262
	if !ok {
		return fmt.Errorf("unknown chain %s, cannot attach RPC to processor", chainID)
	}
	proc.SetSource(src)
263
	return nil
264 265
}

266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
func (su *SupervisorBackend) attachL1RPC(ctx context.Context, l1RPCAddr string) error {
	su.logger.Info("attaching L1 RPC to L1 processor", "rpc", l1RPCAddr)

	logger := su.logger.New("l1-rpc", l1RPCAddr)
	l1RPC, err := client.NewRPC(ctx, logger, l1RPCAddr)
	if err != nil {
		return fmt.Errorf("failed to setup L1 RPC: %w", err)
	}
	l1Client, err := sources.NewL1Client(
		l1RPC,
		su.logger,
		nil,
		// placeholder config for the L1
		sources.L1ClientSimpleConfig(true, sources.RPCKindBasic, 100))
	if err != nil {
		return fmt.Errorf("failed to setup L1 Client: %w", err)
	}
	su.AttachL1Source(l1Client)
	return nil
}

287 288 289 290 291
// AttachL1Source attaches an L1 source to the L1 accessor
// if the L1 accessor does not exist, it is created
// if an L1 source is already attached, it is replaced
func (su *SupervisorBackend) AttachL1Source(source l1access.L1Source) {
	su.l1Accessor.AttachClient(source)
292 293
}

294
func (su *SupervisorBackend) Start(ctx context.Context) error {
295
	// ensure we only start once
296 297 298
	if !su.started.CompareAndSwap(false, true) {
		return errors.New("already started")
	}
299

300 301
	// initiate "ResumeFromLastSealedBlock" on the chains db,
	// which rewinds the database to the last block that is guaranteed to have been fully recorded
302
	if err := su.chainDBs.ResumeFromLastSealedBlock(); err != nil {
303 304
		return fmt.Errorf("failed to resume chains db: %w", err)
	}
305 306 307

	if !su.synchronousProcessors {
		// Make all the chain-processors run automatic background processing
308
		su.chainProcessors.Range(func(_ types.ChainID, processor *processors.ChainProcessor) bool {
309
			processor.StartBackground()
310 311 312
			return true
		})
		su.crossUnsafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool {
313
			worker.StartBackground()
314 315 316
			return true
		})
		su.crossSafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool {
317
			worker.StartBackground()
318 319
			return true
		})
320 321
	}

322 323 324 325 326
	return nil
}

func (su *SupervisorBackend) Stop(ctx context.Context) error {
	if !su.started.CompareAndSwap(true, false) {
327
		return errAlreadyStopped
328
	}
329
	su.logger.Info("Closing supervisor backend")
330

331
	// close all processors
332
	su.chainProcessors.Range(func(id types.ChainID, processor *processors.ChainProcessor) bool {
333 334
		su.logger.Info("stopping chain processor", "chainID", id)
		processor.Close()
335 336 337
		return true
	})
	su.chainProcessors.Clear()
338

339
	su.crossUnsafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool {
340 341
		su.logger.Info("stopping cross-unsafe processor", "chainID", id)
		worker.Close()
342 343 344
		return true
	})
	su.crossUnsafeProcessors.Clear()
345

346
	su.crossSafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool {
347 348
		su.logger.Info("stopping cross-safe processor", "chainID", id)
		worker.Close()
349 350 351
		return true
	})
	su.crossSafeProcessors.Clear()
352

353 354
	su.syncNodesController.Close()

355
	// close the databases
356
	return su.chainDBs.Close()
357 358
}

359
// AddL2RPC attaches an RPC as the RPC for the given chain, overriding the previous RPC source, if any.
360
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string, jwtSecret eth.Bytes32) error {
361
	setupSrc := &syncnode.RPCDialSetup{
362 363 364 365 366 367 368
		JWTSecret: jwtSecret,
		Endpoint:  rpc,
	}
	src, err := setupSrc.Setup(ctx, su.logger)
	if err != nil {
		return fmt.Errorf("failed to set up sync source from RPC: %w", err)
	}
369 370
	_, err = su.AttachSyncNode(ctx, src, false)
	return err
371 372
}

373 374 375 376 377 378 379
// Internal methods, for processors
// ----------------------------

func (su *SupervisorBackend) DependencySet() depset.DependencySet {
	return su.depSet
}

380 381 382
// Query methods
// ----------------------------

383 384
func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
	logHash := types.PayloadHashToLogHash(payloadHash, identifier.Origin)
Axel Kingsley's avatar
Axel Kingsley committed
385 386 387
	chainID := identifier.ChainID
	blockNum := identifier.BlockNumber
	logIdx := identifier.LogIndex
388
	_, err := su.chainDBs.Check(chainID, blockNum, identifier.Timestamp, logIdx, logHash)
389
	if errors.Is(err, types.ErrFuture) {
390
		su.logger.Debug("Future message", "identifier", identifier, "payloadHash", payloadHash, "err", err)
391
		return types.LocalUnsafe, nil
Axel Kingsley's avatar
Axel Kingsley committed
392
	}
393
	if errors.Is(err, types.ErrConflict) {
394
		su.logger.Debug("Conflicting message", "identifier", identifier, "payloadHash", payloadHash, "err", err)
Axel Kingsley's avatar
Axel Kingsley committed
395 396
		return types.Invalid, nil
	}
397 398 399
	if err != nil {
		return types.Invalid, fmt.Errorf("failed to check log: %w", err)
	}
400
	return su.chainDBs.Safest(chainID, blockNum, logIdx)
401 402
}

403 404 405
func (su *SupervisorBackend) CheckMessages(
	messages []types.Message,
	minSafety types.SafetyLevel) error {
406 407
	su.logger.Debug("Checking messages", "count", len(messages), "minSafety", minSafety)

408
	for _, msg := range messages {
409 410
		su.logger.Debug("Checking message",
			"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
411 412
		safety, err := su.CheckMessage(msg.Identifier, msg.PayloadHash)
		if err != nil {
413 414
			su.logger.Error("Check message failed", "err", err,
				"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
415 416 417
			return fmt.Errorf("failed to check message: %w", err)
		}
		if !safety.AtLeastAsSafe(minSafety) {
418 419 420
			su.logger.Error("Message is not sufficiently safe",
				"safety", safety, "minSafety", minSafety,
				"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
421 422 423 424 425 426 427 428 429
			return fmt.Errorf("message %v (safety level: %v) does not meet the minimum safety %v",
				msg.Identifier,
				safety,
				minSafety)
		}
	}
	return nil
}

430 431
func (su *SupervisorBackend) CrossSafe(ctx context.Context, chainID types.ChainID) (types.DerivedIDPair, error) {
	p, err := su.chainDBs.CrossSafe(chainID)
432
	if err != nil {
433
		return types.DerivedIDPair{}, err
434
	}
435 436 437 438 439 440 441 442
	return types.DerivedIDPair{
		DerivedFrom: p.DerivedFrom.ID(),
		Derived:     p.Derived.ID(),
	}, nil
}

func (su *SupervisorBackend) LocalSafe(ctx context.Context, chainID types.ChainID) (types.DerivedIDPair, error) {
	p, err := su.chainDBs.LocalSafe(chainID)
443
	if err != nil {
444
		return types.DerivedIDPair{}, err
445
	}
446 447 448
	return types.DerivedIDPair{
		DerivedFrom: p.DerivedFrom.ID(),
		Derived:     p.Derived.ID(),
449 450 451
	}, nil
}

452 453
func (su *SupervisorBackend) LocalUnsafe(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
	v, err := su.chainDBs.LocalUnsafe(chainID)
454
	if err != nil {
455
		return eth.BlockID{}, err
456
	}
457 458 459 460 461
	return v.ID(), nil
}

func (su *SupervisorBackend) CrossUnsafe(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
	v, err := su.chainDBs.CrossUnsafe(chainID)
462
	if err != nil {
463
		return eth.BlockID{}, err
464
	}
465 466
	return v.ID(), nil
}
467

468 469 470 471 472 473
func (su *SupervisorBackend) SafeDerivedAt(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockID) (eth.BlockID, error) {
	v, err := su.chainDBs.SafeDerivedAt(chainID, derivedFrom)
	if err != nil {
		return eth.BlockID{}, err
	}
	return v.ID(), nil
474 475 476
}

func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
477 478 479 480 481
	v, err := su.chainDBs.Finalized(chainID)
	if err != nil {
		return eth.BlockID{}, err
	}
	return v.ID(), nil
482 483
}

484 485 486 487
func (su *SupervisorBackend) FinalizedL1() eth.BlockRef {
	return su.chainDBs.FinalizedL1()
}

488 489
func (su *SupervisorBackend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) {
	v, err := su.chainDBs.CrossDerivedFromBlockRef(chainID, derived)
490
	if err != nil {
491
		return eth.BlockRef{}, err
492
	}
493
	return v, nil
494 495
}

496 497 498 499
func (su *SupervisorBackend) L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error) {
	return su.l1Accessor.L1BlockRefByNumber(ctx, number)
}

500 501 502
// Update methods
// ----------------------------

503
func (su *SupervisorBackend) UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error {
504
	ch, ok := su.chainProcessors.Get(chainID)
505
	if !ok {
506
		return types.ErrUnknownChain
Axel Kingsley's avatar
Axel Kingsley committed
507
	}
508 509 510
	return ch.OnNewHead(head)
}

511
func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
512 513 514 515 516 517
	err := su.chainDBs.UpdateLocalSafe(chainID, derivedFrom, lastDerived)
	if err != nil {
		return err
	}
	su.onNewLocalSafeData()
	return nil
518
}
519

520 521 522 523
func (su *SupervisorBackend) RecordNewL1(ctx context.Context, chain types.ChainID, ref eth.BlockRef) error {
	return su.chainDBs.RecordNewL1(chain, ref)
}

524 525 526 527
// Access to synchronous processing for tests
// ----------------------------

func (su *SupervisorBackend) SyncEvents(chainID types.ChainID) error {
528
	ch, ok := su.chainProcessors.Get(chainID)
529 530 531 532 533 534 535 536
	if !ok {
		return types.ErrUnknownChain
	}
	ch.ProcessToHead()
	return nil
}

func (su *SupervisorBackend) SyncCrossUnsafe(chainID types.ChainID) error {
537
	ch, ok := su.crossUnsafeProcessors.Get(chainID)
538 539 540 541 542 543 544
	if !ok {
		return types.ErrUnknownChain
	}
	return ch.ProcessWork()
}

func (su *SupervisorBackend) SyncCrossSafe(chainID types.ChainID) error {
545
	ch, ok := su.crossSafeProcessors.Get(chainID)
546 547 548 549 550
	if !ok {
		return types.ErrUnknownChain
	}
	return ch.ProcessWork()
}
551

552
// SyncFinalizedL1 is a test-only method to update the finalized L1 block without the use of a subscription
553
func (su *SupervisorBackend) SyncFinalizedL1(ref eth.BlockRef) {
554 555
	fn := processors.MaybeUpdateFinalizedL1Fn(context.Background(), su.logger, su.chainDBs)
	fn(context.Background(), ref)
556
}