backend.go 17.3 KB
Newer Older
1 2 3 4 5
package backend

import (
	"context"
	"errors"
6
	"fmt"
7
	"sync/atomic"
8
	"time"
9

10 11 12
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/log"

13 14
	"github.com/ethereum-optimism/optimism/op-service/client"
	"github.com/ethereum-optimism/optimism/op-service/dial"
15
	"github.com/ethereum-optimism/optimism/op-service/eth"
16
	"github.com/ethereum-optimism/optimism/op-service/locks"
17
	"github.com/ethereum-optimism/optimism/op-service/sources"
18
	"github.com/ethereum-optimism/optimism/op-supervisor/config"
19
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/cross"
20
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
21
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
22
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
23 24
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
25 26 27 28
)

type SupervisorBackend struct {
	started atomic.Bool
29
	logger  log.Logger
30 31
	m       Metrics
	dataDir string
32

33
	// depSet is the dependency set that the backend uses to know about the chains it is indexing
34 35
	depSet depset.DependencySet

36 37
	// chainDBs holds on to the DB indices for each chain
	chainDBs *db.ChainsDB
38

39 40 41
	// l1Processor watches for new L1 blocks, updates the local-safe DB, and kicks off derivation orchestration
	l1Processor *processors.L1Processor

42
	// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
43
	chainProcessors locks.RWMap[types.ChainID, *processors.ChainProcessor]
44

45
	// crossSafeProcessors take local-safe data and promote it to cross-safe when verified
46
	crossSafeProcessors locks.RWMap[types.ChainID, *cross.Worker]
47 48

	// crossUnsafeProcessors take local-unsafe data and promote it to cross-unsafe when verified
49 50 51 52 53
	crossUnsafeProcessors locks.RWMap[types.ChainID, *cross.Worker]

	// chainMetrics are used to track metrics for each chain
	// they are reused for processors and databases of the same chain
	chainMetrics locks.RWMap[types.ChainID, *chainMetrics]
54

55 56 57
	// synchronousProcessors disables background-workers,
	// requiring manual triggers for the backend to process anything.
	synchronousProcessors bool
58 59 60 61
}

var _ frontend.Backend = (*SupervisorBackend)(nil)

62
var errAlreadyStopped = errors.New("already stopped")
63

64
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
65
	// attempt to prepare the data directory
66
	if err := db.PrepDataDir(cfg.Datadir); err != nil {
67 68
		return nil, err
	}
69

70 71 72 73 74 75
	// Load the dependency set
	depSet, err := cfg.DependencySetSource.LoadDependencySet(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to load dependency set: %w", err)
	}

76
	// create initial per-chain resources
77
	chainsDBs := db.NewChainsDB(logger, depSet)
78 79 80

	// create the supervisor backend
	super := &SupervisorBackend{
81 82 83 84 85
		logger:   logger,
		m:        m,
		dataDir:  cfg.Datadir,
		depSet:   depSet,
		chainDBs: chainsDBs,
86 87 88 89 90 91 92 93 94
		// For testing we can avoid running the processors.
		synchronousProcessors: cfg.SynchronousProcessors,
	}

	// Initialize the resources of the supervisor backend.
	// Stop the supervisor if any of the resources fails to be initialized.
	if err := super.initResources(ctx, cfg); err != nil {
		err = fmt.Errorf("failed to init resources: %w", err)
		return nil, errors.Join(err, super.Stop(ctx))
95 96
	}

97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
	return super, nil
}

// initResources initializes all the resources, such as DBs and processors for chains.
// An error may returned, without closing the thus-far initialized resources.
// Upon error the caller should call Stop() on the supervisor backend to clean up and release resources.
func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Config) error {
	chains := su.depSet.Chains()

	// for each chain known to the dependency set, create the necessary DB resources
	for _, chainID := range chains {
		if err := su.openChainDBs(chainID); err != nil {
			return fmt.Errorf("failed to open chain %s: %w", chainID, err)
		}
	}

113 114 115
	// initialize all cross-unsafe processors
	for _, chainID := range chains {
		worker := cross.NewCrossUnsafeWorker(su.logger, chainID, su.chainDBs)
116
		su.crossUnsafeProcessors.Set(chainID, worker)
117 118 119 120
	}
	// initialize all cross-safe processors
	for _, chainID := range chains {
		worker := cross.NewCrossSafeWorker(su.logger, chainID, su.chainDBs)
121
		su.crossSafeProcessors.Set(chainID, worker)
122
	}
123 124 125 126 127
	// For each chain initialize a chain processor service,
	// after cross-unsafe workers are ready to receive updates
	for _, chainID := range chains {
		logProcessor := processors.NewLogProcessor(chainID, su.chainDBs)
		chainProcessor := processors.NewChainProcessor(su.logger, chainID, logProcessor, su.chainDBs, su.onIndexedLocalUnsafeData)
128
		su.chainProcessors.Set(chainID, chainProcessor)
129
	}
130

131 132 133 134 135 136 137 138
	if cfg.L1RPC != "" {
		if err := su.attachL1RPC(ctx, cfg.L1RPC); err != nil {
			return fmt.Errorf("failed to create L1 processor: %w", err)
		}
	} else {
		su.logger.Warn("No L1 RPC configured, L1 processor will not be started")
	}

139
	// the config has some RPC connections to attach to the chain-processors
140
	for _, rpc := range cfg.L2RPCs {
141
		err := su.attachRPC(ctx, rpc)
142
		if err != nil {
143
			return fmt.Errorf("failed to add chain monitor for rpc %v: %w", rpc, err)
144
		}
145
	}
146
	return nil
147
}
148

149 150 151 152 153 154 155
// onIndexedLocalUnsafeData is called by the event indexing workers.
// This signals to cross-unsafe workers that there's data to index.
func (su *SupervisorBackend) onIndexedLocalUnsafeData() {
	// We signal all workers, since dependencies on a chain may be unblocked
	// by new data on other chains.
	// Busy workers don't block processing.
	// The signal is picked up only if the worker is running in the background.
156
	su.crossUnsafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool {
157
		w.OnNewData()
158 159
		return true
	})
160 161 162 163 164 165 166 167 168
}

// onNewLocalSafeData is called by the safety-indexing.
// This signals to cross-safe workers that there's data to index.
func (su *SupervisorBackend) onNewLocalSafeData() {
	// We signal all workers, since dependencies on a chain may be unblocked
	// by new data on other chains.
	// Busy workers don't block processing.
	// The signal is picked up only if the worker is running in the background.
169
	su.crossSafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool {
170
		w.OnNewData()
171 172
		return true
	})
173 174
}

175 176 177 178 179
// openChainDBs initializes all the DB resources of a specific chain.
// It is a sub-task of initResources.
func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error {
	cm := newChainMetrics(chainID, su.m)
	// create metrics and a logdb for the chain
180
	su.chainMetrics.Set(chainID, cm)
181 182

	logDB, err := db.OpenLogDB(su.logger, chainID, su.dataDir, cm)
183
	if err != nil {
184
		return fmt.Errorf("failed to open logDB of chain %s: %w", chainID, err)
185
	}
186 187 188
	su.chainDBs.AddLogDB(chainID, logDB)

	localDB, err := db.OpenLocalDerivedFromDB(su.logger, chainID, su.dataDir, cm)
189
	if err != nil {
190
		return fmt.Errorf("failed to open local derived-from DB of chain %s: %w", chainID, err)
191
	}
192 193 194
	su.chainDBs.AddLocalDerivedFromDB(chainID, localDB)

	crossDB, err := db.OpenCrossDerivedFromDB(su.logger, chainID, su.dataDir, cm)
195
	if err != nil {
196
		return fmt.Errorf("failed to open cross derived-from DB of chain %s: %w", chainID, err)
197
	}
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
	su.chainDBs.AddCrossDerivedFromDB(chainID, crossDB)

	su.chainDBs.AddCrossUnsafeTracker(chainID)
	return nil
}

func (su *SupervisorBackend) attachRPC(ctx context.Context, rpc string) error {
	su.logger.Info("attaching RPC to chain processor", "rpc", rpc)

	logger := su.logger.New("rpc", rpc)
	// create the rpc client, which yields the chain id
	rpcClient, chainID, err := clientForL2(ctx, logger, rpc)
	if err != nil {
		return err
	}
	if !su.depSet.HasChain(chainID) {
214
		return fmt.Errorf("chain %s is not part of the interop dependency set: %w", chainID, types.ErrUnknownChain)
215
	}
216
	cm, ok := su.chainMetrics.Get(chainID)
217 218 219 220
	if !ok {
		return fmt.Errorf("failed to find metrics for chain %v", chainID)
	}
	// create an RPC client that the processor can use
221 222
	cl, err := processors.NewEthClient(
		ctx,
223
		logger.New("chain", chainID),
224 225 226 227 228
		cm,
		rpc,
		rpcClient, 2*time.Second,
		false,
		sources.RPCKindStandard)
229
	if err != nil {
230
		return err
231
	}
232 233 234 235
	return su.AttachProcessorSource(chainID, cl)
}

func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src processors.Source) error {
236
	proc, ok := su.chainProcessors.Get(chainID)
237 238 239 240
	if !ok {
		return fmt.Errorf("unknown chain %s, cannot attach RPC to processor", chainID)
	}
	proc.SetSource(src)
241
	return nil
242 243
}

244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
func (su *SupervisorBackend) attachL1RPC(ctx context.Context, l1RPCAddr string) error {
	su.logger.Info("attaching L1 RPC to L1 processor", "rpc", l1RPCAddr)

	logger := su.logger.New("l1-rpc", l1RPCAddr)
	l1RPC, err := client.NewRPC(ctx, logger, l1RPCAddr)
	if err != nil {
		return fmt.Errorf("failed to setup L1 RPC: %w", err)
	}
	l1Client, err := sources.NewL1Client(
		l1RPC,
		su.logger,
		nil,
		// placeholder config for the L1
		sources.L1ClientSimpleConfig(true, sources.RPCKindBasic, 100))
	if err != nil {
		return fmt.Errorf("failed to setup L1 Client: %w", err)
	}
	su.AttachL1Source(l1Client)
	return nil
}

// attachL1Source attaches an L1 source to the L1 processor.
// If the L1 processor does not exist, it is created and started.
func (su *SupervisorBackend) AttachL1Source(source processors.L1Source) {
	if su.l1Processor == nil {
		su.l1Processor = processors.NewL1Processor(su.logger, su.chainDBs, source)
		su.l1Processor.Start()
	} else {
		su.l1Processor.AttachClient(source)
	}
}

276
func clientForL2(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
277 278
	ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
	if err != nil {
279
		return nil, types.ChainID{}, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
280 281 282
	}
	chainID, err := ethClient.ChainID(ctx)
	if err != nil {
283
		return nil, types.ChainID{}, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err)
284
	}
285
	return client.NewBaseRPCClient(ethClient.Client()), types.ChainIDFromBig(chainID), nil
286 287
}

288
func (su *SupervisorBackend) Start(ctx context.Context) error {
289
	// ensure we only start once
290 291 292
	if !su.started.CompareAndSwap(false, true) {
		return errors.New("already started")
	}
293

294 295
	// initiate "ResumeFromLastSealedBlock" on the chains db,
	// which rewinds the database to the last block that is guaranteed to have been fully recorded
296
	if err := su.chainDBs.ResumeFromLastSealedBlock(); err != nil {
297 298
		return fmt.Errorf("failed to resume chains db: %w", err)
	}
299

300 301 302 303 304
	// start the L1 processor if it exists
	if su.l1Processor != nil {
		su.l1Processor.Start()
	}

305 306
	if !su.synchronousProcessors {
		// Make all the chain-processors run automatic background processing
307
		su.chainProcessors.Range(func(_ types.ChainID, processor *processors.ChainProcessor) bool {
308
			processor.StartBackground()
309 310 311
			return true
		})
		su.crossUnsafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool {
312
			worker.StartBackground()
313 314 315
			return true
		})
		su.crossSafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool {
316
			worker.StartBackground()
317 318
			return true
		})
319 320
	}

321 322 323 324 325
	return nil
}

func (su *SupervisorBackend) Stop(ctx context.Context) error {
	if !su.started.CompareAndSwap(true, false) {
326
		return errAlreadyStopped
327
	}
328
	su.logger.Info("Closing supervisor backend")
329 330 331 332 333 334

	// stop the L1 processor
	if su.l1Processor != nil {
		su.l1Processor.Stop()
	}

335
	// close all processors
336
	su.chainProcessors.Range(func(id types.ChainID, processor *processors.ChainProcessor) bool {
337 338
		su.logger.Info("stopping chain processor", "chainID", id)
		processor.Close()
339 340 341
		return true
	})
	su.chainProcessors.Clear()
342

343
	su.crossUnsafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool {
344 345
		su.logger.Info("stopping cross-unsafe processor", "chainID", id)
		worker.Close()
346 347 348
		return true
	})
	su.crossUnsafeProcessors.Clear()
349

350
	su.crossSafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool {
351 352
		su.logger.Info("stopping cross-safe processor", "chainID", id)
		worker.Close()
353 354 355
		return true
	})
	su.crossSafeProcessors.Clear()
356

357
	// close the databases
358
	return su.chainDBs.Close()
359 360
}

361
// AddL2RPC attaches an RPC as the RPC for the given chain, overriding the previous RPC source, if any.
362
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error {
363
	return su.attachRPC(ctx, rpc)
364 365
}

366 367 368 369 370 371 372
// Internal methods, for processors
// ----------------------------

func (su *SupervisorBackend) DependencySet() depset.DependencySet {
	return su.depSet
}

373 374 375
// Query methods
// ----------------------------

376 377
func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
	logHash := types.PayloadHashToLogHash(payloadHash, identifier.Origin)
378 379 380
	chainID := identifier.ChainID
	blockNum := identifier.BlockNumber
	logIdx := identifier.LogIndex
381
	_, err := su.chainDBs.Check(chainID, blockNum, logIdx, logHash)
382
	if errors.Is(err, types.ErrFuture) {
383
		su.logger.Debug("Future message", "identifier", identifier, "payloadHash", payloadHash, "err", err)
384
		return types.LocalUnsafe, nil
385
	}
386
	if errors.Is(err, types.ErrConflict) {
387
		su.logger.Debug("Conflicting message", "identifier", identifier, "payloadHash", payloadHash, "err", err)
388 389
		return types.Invalid, nil
	}
390 391 392
	if err != nil {
		return types.Invalid, fmt.Errorf("failed to check log: %w", err)
	}
393
	return su.chainDBs.Safest(chainID, blockNum, logIdx)
394 395
}

396 397 398
func (su *SupervisorBackend) CheckMessages(
	messages []types.Message,
	minSafety types.SafetyLevel) error {
399 400
	su.logger.Debug("Checking messages", "count", len(messages), "minSafety", minSafety)

401
	for _, msg := range messages {
402 403
		su.logger.Debug("Checking message",
			"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
404 405
		safety, err := su.CheckMessage(msg.Identifier, msg.PayloadHash)
		if err != nil {
406 407
			su.logger.Error("Check message failed", "err", err,
				"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
408 409 410
			return fmt.Errorf("failed to check message: %w", err)
		}
		if !safety.AtLeastAsSafe(minSafety) {
411 412 413
			su.logger.Error("Message is not sufficiently safe",
				"safety", safety, "minSafety", minSafety,
				"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
414 415 416 417 418 419 420 421 422
			return fmt.Errorf("message %v (safety level: %v) does not meet the minimum safety %v",
				msg.Identifier,
				safety,
				minSafety)
		}
	}
	return nil
}

423
func (su *SupervisorBackend) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) {
424
	head, err := su.chainDBs.LocalUnsafe(chainID)
425 426
	if err != nil {
		return types.ReferenceView{}, fmt.Errorf("failed to get local-unsafe head: %w", err)
427
	}
428
	cross, err := su.chainDBs.CrossUnsafe(chainID)
429 430 431 432 433 434 435 436 437 438 439 440 441
	if err != nil {
		return types.ReferenceView{}, fmt.Errorf("failed to get cross-unsafe head: %w", err)
	}

	// TODO(#11693): check `unsafe` input to detect reorg conflicts

	return types.ReferenceView{
		Local: head.ID(),
		Cross: cross.ID(),
	}, nil
}

func (su *SupervisorBackend) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) {
442
	_, localSafe, err := su.chainDBs.LocalSafe(chainID)
443 444
	if err != nil {
		return types.ReferenceView{}, fmt.Errorf("failed to get local-safe head: %w", err)
445
	}
446
	_, crossSafe, err := su.chainDBs.CrossSafe(chainID)
447
	if err != nil {
448 449 450 451 452 453 454 455 456 457 458 459
		return types.ReferenceView{}, fmt.Errorf("failed to get cross-safe head: %w", err)
	}

	// TODO(#11693): check `safe` input to detect reorg conflicts

	return types.ReferenceView{
		Local: localSafe.ID(),
		Cross: crossSafe.ID(),
	}, nil
}

func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
460 461 462 463 464
	v, err := su.chainDBs.Finalized(chainID)
	if err != nil {
		return eth.BlockID{}, err
	}
	return v.ID(), nil
465 466
}

467 468
func (su *SupervisorBackend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) {
	v, err := su.chainDBs.CrossDerivedFromBlockRef(chainID, derived)
469
	if err != nil {
470
		return eth.BlockRef{}, err
471
	}
472
	return v, nil
473 474 475 476 477
}

// Update methods
// ----------------------------

478
func (su *SupervisorBackend) UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error {
479
	ch, ok := su.chainProcessors.Get(chainID)
480
	if !ok {
481
		return types.ErrUnknownChain
482
	}
483 484 485
	return ch.OnNewHead(head)
}

486
func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
487 488 489 490 491 492
	err := su.chainDBs.UpdateLocalSafe(chainID, derivedFrom, lastDerived)
	if err != nil {
		return err
	}
	su.onNewLocalSafeData()
	return nil
493
}
494

495
func (su *SupervisorBackend) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.BlockRef) error {
496
	return su.chainDBs.UpdateFinalizedL1(finalized)
497
}
498 499 500 501 502

// Access to synchronous processing for tests
// ----------------------------

func (su *SupervisorBackend) SyncEvents(chainID types.ChainID) error {
503
	ch, ok := su.chainProcessors.Get(chainID)
504 505 506 507 508 509 510 511
	if !ok {
		return types.ErrUnknownChain
	}
	ch.ProcessToHead()
	return nil
}

func (su *SupervisorBackend) SyncCrossUnsafe(chainID types.ChainID) error {
512
	ch, ok := su.crossUnsafeProcessors.Get(chainID)
513 514 515 516 517 518 519
	if !ok {
		return types.ErrUnknownChain
	}
	return ch.ProcessWork()
}

func (su *SupervisorBackend) SyncCrossSafe(chainID types.ChainID) error {
520
	ch, ok := su.crossSafeProcessors.Get(chainID)
521 522 523 524 525
	if !ok {
		return types.ErrUnknownChain
	}
	return ch.ProcessWork()
}