backend.go 15.7 KB
Newer Older
1 2 3 4 5
package backend

import (
	"context"
	"errors"
6
	"fmt"
7
	"sync/atomic"
8
	"time"
9

10 11 12
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/log"

13 14
	"github.com/ethereum-optimism/optimism/op-service/client"
	"github.com/ethereum-optimism/optimism/op-service/dial"
15
	"github.com/ethereum-optimism/optimism/op-service/eth"
16
	"github.com/ethereum-optimism/optimism/op-service/locks"
17
	"github.com/ethereum-optimism/optimism/op-service/sources"
18
	"github.com/ethereum-optimism/optimism/op-supervisor/config"
19
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/cross"
20
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
21
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
22
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
23 24
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
25 26 27 28
)

type SupervisorBackend struct {
	started atomic.Bool
29
	logger  log.Logger
30 31
	m       Metrics
	dataDir string
32

33
	// depSet is the dependency set that the backend uses to know about the chains it is indexing
34 35
	depSet depset.DependencySet

36 37
	// chainDBs holds on to the DB indices for each chain
	chainDBs *db.ChainsDB
38 39

	// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
40
	chainProcessors locks.RWMap[types.ChainID, *processors.ChainProcessor]
41

42
	// crossSafeProcessors take local-safe data and promote it to cross-safe when verified
43
	crossSafeProcessors locks.RWMap[types.ChainID, *cross.Worker]
44 45

	// crossUnsafeProcessors take local-unsafe data and promote it to cross-unsafe when verified
46 47 48 49 50
	crossUnsafeProcessors locks.RWMap[types.ChainID, *cross.Worker]

	// chainMetrics are used to track metrics for each chain
	// they are reused for processors and databases of the same chain
	chainMetrics locks.RWMap[types.ChainID, *chainMetrics]
51

52 53 54
	// synchronousProcessors disables background-workers,
	// requiring manual triggers for the backend to process anything.
	synchronousProcessors bool
55 56 57 58
}

var _ frontend.Backend = (*SupervisorBackend)(nil)

59
var errAlreadyStopped = errors.New("already stopped")
60

61
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
62
	// attempt to prepare the data directory
63
	if err := db.PrepDataDir(cfg.Datadir); err != nil {
64 65
		return nil, err
	}
66

67 68 69 70 71 72
	// Load the dependency set
	depSet, err := cfg.DependencySetSource.LoadDependencySet(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to load dependency set: %w", err)
	}

73
	// create initial per-chain resources
74
	chainsDBs := db.NewChainsDB(logger, depSet)
75 76 77

	// create the supervisor backend
	super := &SupervisorBackend{
78 79 80 81 82
		logger:   logger,
		m:        m,
		dataDir:  cfg.Datadir,
		depSet:   depSet,
		chainDBs: chainsDBs,
83 84 85 86 87 88 89 90 91
		// For testing we can avoid running the processors.
		synchronousProcessors: cfg.SynchronousProcessors,
	}

	// Initialize the resources of the supervisor backend.
	// Stop the supervisor if any of the resources fails to be initialized.
	if err := super.initResources(ctx, cfg); err != nil {
		err = fmt.Errorf("failed to init resources: %w", err)
		return nil, errors.Join(err, super.Stop(ctx))
92 93
	}

94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
	return super, nil
}

// initResources initializes all the resources, such as DBs and processors for chains.
// An error may returned, without closing the thus-far initialized resources.
// Upon error the caller should call Stop() on the supervisor backend to clean up and release resources.
func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Config) error {
	chains := su.depSet.Chains()

	// for each chain known to the dependency set, create the necessary DB resources
	for _, chainID := range chains {
		if err := su.openChainDBs(chainID); err != nil {
			return fmt.Errorf("failed to open chain %s: %w", chainID, err)
		}
	}

110 111 112
	// initialize all cross-unsafe processors
	for _, chainID := range chains {
		worker := cross.NewCrossUnsafeWorker(su.logger, chainID, su.chainDBs)
113
		su.crossUnsafeProcessors.Set(chainID, worker)
114 115 116 117
	}
	// initialize all cross-safe processors
	for _, chainID := range chains {
		worker := cross.NewCrossSafeWorker(su.logger, chainID, su.chainDBs)
118
		su.crossSafeProcessors.Set(chainID, worker)
119
	}
120 121 122 123 124
	// For each chain initialize a chain processor service,
	// after cross-unsafe workers are ready to receive updates
	for _, chainID := range chains {
		logProcessor := processors.NewLogProcessor(chainID, su.chainDBs)
		chainProcessor := processors.NewChainProcessor(su.logger, chainID, logProcessor, su.chainDBs, su.onIndexedLocalUnsafeData)
125
		su.chainProcessors.Set(chainID, chainProcessor)
126
	}
127

128
	// the config has some RPC connections to attach to the chain-processors
129
	for _, rpc := range cfg.L2RPCs {
130
		err := su.attachRPC(ctx, rpc)
131
		if err != nil {
132
			return fmt.Errorf("failed to add chain monitor for rpc %v: %w", rpc, err)
133
		}
134
	}
135
	return nil
136
}
137

138 139 140 141 142 143 144
// onIndexedLocalUnsafeData is called by the event indexing workers.
// This signals to cross-unsafe workers that there's data to index.
func (su *SupervisorBackend) onIndexedLocalUnsafeData() {
	// We signal all workers, since dependencies on a chain may be unblocked
	// by new data on other chains.
	// Busy workers don't block processing.
	// The signal is picked up only if the worker is running in the background.
145
	su.crossUnsafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool {
146
		w.OnNewData()
147 148
		return true
	})
149 150 151 152 153 154 155 156 157
}

// onNewLocalSafeData is called by the safety-indexing.
// This signals to cross-safe workers that there's data to index.
func (su *SupervisorBackend) onNewLocalSafeData() {
	// We signal all workers, since dependencies on a chain may be unblocked
	// by new data on other chains.
	// Busy workers don't block processing.
	// The signal is picked up only if the worker is running in the background.
158
	su.crossSafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool {
159
		w.OnNewData()
160 161
		return true
	})
162 163
}

164 165 166 167 168
// openChainDBs initializes all the DB resources of a specific chain.
// It is a sub-task of initResources.
func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error {
	cm := newChainMetrics(chainID, su.m)
	// create metrics and a logdb for the chain
169
	su.chainMetrics.Set(chainID, cm)
170 171

	logDB, err := db.OpenLogDB(su.logger, chainID, su.dataDir, cm)
172
	if err != nil {
173
		return fmt.Errorf("failed to open logDB of chain %s: %w", chainID, err)
174
	}
175 176 177
	su.chainDBs.AddLogDB(chainID, logDB)

	localDB, err := db.OpenLocalDerivedFromDB(su.logger, chainID, su.dataDir, cm)
178
	if err != nil {
179
		return fmt.Errorf("failed to open local derived-from DB of chain %s: %w", chainID, err)
180
	}
181 182 183
	su.chainDBs.AddLocalDerivedFromDB(chainID, localDB)

	crossDB, err := db.OpenCrossDerivedFromDB(su.logger, chainID, su.dataDir, cm)
184
	if err != nil {
185
		return fmt.Errorf("failed to open cross derived-from DB of chain %s: %w", chainID, err)
186
	}
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
	su.chainDBs.AddCrossDerivedFromDB(chainID, crossDB)

	su.chainDBs.AddCrossUnsafeTracker(chainID)
	return nil
}

func (su *SupervisorBackend) attachRPC(ctx context.Context, rpc string) error {
	su.logger.Info("attaching RPC to chain processor", "rpc", rpc)

	logger := su.logger.New("rpc", rpc)
	// create the rpc client, which yields the chain id
	rpcClient, chainID, err := clientForL2(ctx, logger, rpc)
	if err != nil {
		return err
	}
	if !su.depSet.HasChain(chainID) {
203
		return fmt.Errorf("chain %s is not part of the interop dependency set: %w", chainID, types.ErrUnknownChain)
204
	}
205
	cm, ok := su.chainMetrics.Get(chainID)
206 207 208 209
	if !ok {
		return fmt.Errorf("failed to find metrics for chain %v", chainID)
	}
	// create an RPC client that the processor can use
210 211
	cl, err := processors.NewEthClient(
		ctx,
212
		logger.New("chain", chainID),
213 214 215 216 217
		cm,
		rpc,
		rpcClient, 2*time.Second,
		false,
		sources.RPCKindStandard)
218
	if err != nil {
219
		return err
220
	}
221 222 223 224
	return su.AttachProcessorSource(chainID, cl)
}

func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src processors.Source) error {
225
	proc, ok := su.chainProcessors.Get(chainID)
226 227 228 229
	if !ok {
		return fmt.Errorf("unknown chain %s, cannot attach RPC to processor", chainID)
	}
	proc.SetSource(src)
230
	return nil
231 232
}

233
func clientForL2(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
234 235
	ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
	if err != nil {
236
		return nil, types.ChainID{}, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
237 238 239
	}
	chainID, err := ethClient.ChainID(ctx)
	if err != nil {
240
		return nil, types.ChainID{}, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err)
241
	}
242
	return client.NewBaseRPCClient(ethClient.Client()), types.ChainIDFromBig(chainID), nil
243 244
}

245
func (su *SupervisorBackend) Start(ctx context.Context) error {
246
	// ensure we only start once
247 248 249
	if !su.started.CompareAndSwap(false, true) {
		return errors.New("already started")
	}
250

251 252
	// initiate "ResumeFromLastSealedBlock" on the chains db,
	// which rewinds the database to the last block that is guaranteed to have been fully recorded
253
	if err := su.chainDBs.ResumeFromLastSealedBlock(); err != nil {
254 255
		return fmt.Errorf("failed to resume chains db: %w", err)
	}
256 257 258

	if !su.synchronousProcessors {
		// Make all the chain-processors run automatic background processing
259
		su.chainProcessors.Range(func(_ types.ChainID, processor *processors.ChainProcessor) bool {
260
			processor.StartBackground()
261 262 263
			return true
		})
		su.crossUnsafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool {
264
			worker.StartBackground()
265 266 267
			return true
		})
		su.crossSafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool {
268
			worker.StartBackground()
269 270
			return true
		})
271 272
	}

273 274 275 276 277
	return nil
}

func (su *SupervisorBackend) Stop(ctx context.Context) error {
	if !su.started.CompareAndSwap(true, false) {
278
		return errAlreadyStopped
279
	}
280
	su.logger.Info("Closing supervisor backend")
281
	// close all processors
282
	su.chainProcessors.Range(func(id types.ChainID, processor *processors.ChainProcessor) bool {
283 284
		su.logger.Info("stopping chain processor", "chainID", id)
		processor.Close()
285 286 287
		return true
	})
	su.chainProcessors.Clear()
288

289
	su.crossUnsafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool {
290 291
		su.logger.Info("stopping cross-unsafe processor", "chainID", id)
		worker.Close()
292 293 294
		return true
	})
	su.crossUnsafeProcessors.Clear()
295

296
	su.crossSafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool {
297 298
		su.logger.Info("stopping cross-safe processor", "chainID", id)
		worker.Close()
299 300 301
		return true
	})
	su.crossSafeProcessors.Clear()
302

303
	// close the databases
304
	return su.chainDBs.Close()
305 306
}

307
// AddL2RPC attaches an RPC as the RPC for the given chain, overriding the previous RPC source, if any.
308
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error {
309
	return su.attachRPC(ctx, rpc)
310 311
}

312 313 314 315 316 317 318
// Internal methods, for processors
// ----------------------------

func (su *SupervisorBackend) DependencySet() depset.DependencySet {
	return su.depSet
}

319 320 321
// Query methods
// ----------------------------

322 323
func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
	logHash := types.PayloadHashToLogHash(payloadHash, identifier.Origin)
324 325 326
	chainID := identifier.ChainID
	blockNum := identifier.BlockNumber
	logIdx := identifier.LogIndex
327
	_, err := su.chainDBs.Check(chainID, blockNum, logIdx, logHash)
328
	if errors.Is(err, types.ErrFuture) {
329
		su.logger.Debug("Future message", "identifier", identifier, "payloadHash", payloadHash, "err", err)
330
		return types.LocalUnsafe, nil
331
	}
332
	if errors.Is(err, types.ErrConflict) {
333
		su.logger.Debug("Conflicting message", "identifier", identifier, "payloadHash", payloadHash, "err", err)
334 335
		return types.Invalid, nil
	}
336 337 338
	if err != nil {
		return types.Invalid, fmt.Errorf("failed to check log: %w", err)
	}
339
	return su.chainDBs.Safest(chainID, blockNum, logIdx)
340 341
}

342 343 344
func (su *SupervisorBackend) CheckMessages(
	messages []types.Message,
	minSafety types.SafetyLevel) error {
345 346
	su.logger.Debug("Checking messages", "count", len(messages), "minSafety", minSafety)

347
	for _, msg := range messages {
348 349
		su.logger.Debug("Checking message",
			"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
350 351
		safety, err := su.CheckMessage(msg.Identifier, msg.PayloadHash)
		if err != nil {
352 353
			su.logger.Error("Check message failed", "err", err,
				"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
354 355 356
			return fmt.Errorf("failed to check message: %w", err)
		}
		if !safety.AtLeastAsSafe(minSafety) {
357 358 359
			su.logger.Error("Message is not sufficiently safe",
				"safety", safety, "minSafety", minSafety,
				"identifier", msg.Identifier, "payloadHash", msg.PayloadHash.String())
360 361 362 363 364 365 366 367 368
			return fmt.Errorf("message %v (safety level: %v) does not meet the minimum safety %v",
				msg.Identifier,
				safety,
				minSafety)
		}
	}
	return nil
}

369
func (su *SupervisorBackend) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) {
370
	head, err := su.chainDBs.LocalUnsafe(chainID)
371 372
	if err != nil {
		return types.ReferenceView{}, fmt.Errorf("failed to get local-unsafe head: %w", err)
373
	}
374
	cross, err := su.chainDBs.CrossUnsafe(chainID)
375 376 377 378 379 380 381 382 383 384 385 386 387
	if err != nil {
		return types.ReferenceView{}, fmt.Errorf("failed to get cross-unsafe head: %w", err)
	}

	// TODO(#11693): check `unsafe` input to detect reorg conflicts

	return types.ReferenceView{
		Local: head.ID(),
		Cross: cross.ID(),
	}, nil
}

func (su *SupervisorBackend) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) {
388
	_, localSafe, err := su.chainDBs.LocalSafe(chainID)
389 390
	if err != nil {
		return types.ReferenceView{}, fmt.Errorf("failed to get local-safe head: %w", err)
391
	}
392
	_, crossSafe, err := su.chainDBs.CrossSafe(chainID)
393
	if err != nil {
394 395 396 397 398 399 400 401 402 403 404 405
		return types.ReferenceView{}, fmt.Errorf("failed to get cross-safe head: %w", err)
	}

	// TODO(#11693): check `safe` input to detect reorg conflicts

	return types.ReferenceView{
		Local: localSafe.ID(),
		Cross: crossSafe.ID(),
	}, nil
}

func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
406 407 408 409 410
	v, err := su.chainDBs.Finalized(chainID)
	if err != nil {
		return eth.BlockID{}, err
	}
	return v.ID(), nil
411 412
}

413 414
func (su *SupervisorBackend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) {
	v, err := su.chainDBs.CrossDerivedFromBlockRef(chainID, derived)
415
	if err != nil {
416
		return eth.BlockRef{}, err
417
	}
418
	return v, nil
419 420 421 422 423
}

// Update methods
// ----------------------------

424
func (su *SupervisorBackend) UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error {
425
	ch, ok := su.chainProcessors.Get(chainID)
426
	if !ok {
427
		return types.ErrUnknownChain
428
	}
429 430 431
	return ch.OnNewHead(head)
}

432
func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
433 434 435 436 437 438
	err := su.chainDBs.UpdateLocalSafe(chainID, derivedFrom, lastDerived)
	if err != nil {
		return err
	}
	su.onNewLocalSafeData()
	return nil
439
}
440

441
func (su *SupervisorBackend) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.BlockRef) error {
442
	return su.chainDBs.UpdateFinalizedL1(finalized)
443
}
444 445 446 447 448

// Access to synchronous processing for tests
// ----------------------------

func (su *SupervisorBackend) SyncEvents(chainID types.ChainID) error {
449
	ch, ok := su.chainProcessors.Get(chainID)
450 451 452 453 454 455 456 457
	if !ok {
		return types.ErrUnknownChain
	}
	ch.ProcessToHead()
	return nil
}

func (su *SupervisorBackend) SyncCrossUnsafe(chainID types.ChainID) error {
458
	ch, ok := su.crossUnsafeProcessors.Get(chainID)
459 460 461 462 463 464 465
	if !ok {
		return types.ErrUnknownChain
	}
	return ch.ProcessWork()
}

func (su *SupervisorBackend) SyncCrossSafe(chainID types.ChainID) error {
466
	ch, ok := su.crossSafeProcessors.Get(chainID)
467 468 469 470 471
	if !ok {
		return types.ErrUnknownChain
	}
	return ch.ProcessWork()
}