backend.go 12.3 KB
Newer Older
1 2 3 4 5
package backend

import (
	"context"
	"errors"
6
	"fmt"
7
	"sync"
8
	"sync/atomic"
9
	"time"
10

11 12 13
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/log"

14 15
	"github.com/ethereum-optimism/optimism/op-service/client"
	"github.com/ethereum-optimism/optimism/op-service/dial"
16
	"github.com/ethereum-optimism/optimism/op-service/eth"
17
	"github.com/ethereum-optimism/optimism/op-service/sources"
18
	"github.com/ethereum-optimism/optimism/op-supervisor/config"
19
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
20
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
21
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
22
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
23 24
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
25 26 27 28
)

type SupervisorBackend struct {
	started atomic.Bool
29
	logger  log.Logger
30 31
	m       Metrics
	dataDir string
32

33 34 35 36 37
	// RW lock to avoid concurrent map mutations.
	// Read = any chain may be used and mutated.
	// Write = set of chains is changing.
	mu sync.RWMutex

38
	// depSet is the dependency set that the backend uses to know about the chains it is indexing
39 40
	depSet depset.DependencySet

41 42
	// chainDBs holds on to the DB indices for each chain
	chainDBs *db.ChainsDB
43 44 45

	// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
	chainProcessors map[types.ChainID]*processors.ChainProcessor
46 47 48 49 50 51 52 53

	// synchronousProcessors disables background-workers,
	// requiring manual triggers for the backend to process anything.
	synchronousProcessors bool

	// chainMetrics are used to track metrics for each chain
	// they are reused for processors and databases of the same chain
	chainMetrics map[types.ChainID]*chainMetrics
54 55 56 57
}

var _ frontend.Backend = (*SupervisorBackend)(nil)

58
var errAlreadyStopped = errors.New("already stopped")
59

60
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
61
	// attempt to prepare the data directory
62
	if err := db.PrepDataDir(cfg.Datadir); err != nil {
63 64
		return nil, err
	}
65

66 67 68 69 70
	// Load the dependency set
	depSet, err := cfg.DependencySetSource.LoadDependencySet(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to load dependency set: %w", err)
	}
71
	chains := depSet.Chains()
72

73 74 75 76
	// create initial per-chain resources
	chainsDBs := db.NewChainsDB(logger)
	chainProcessors := make(map[types.ChainID]*processors.ChainProcessor, len(chains))
	chainMetrics := make(map[types.ChainID]*chainMetrics, len(chains))
77 78 79

	// create the supervisor backend
	super := &SupervisorBackend{
80 81 82
		logger:          logger,
		m:               m,
		dataDir:         cfg.Datadir,
83
		depSet:          depSet,
84
		chainDBs:        chainsDBs,
85
		chainProcessors: chainProcessors,
86 87 88 89 90 91 92 93 94 95
		chainMetrics:    chainMetrics,
		// For testing we can avoid running the processors.
		synchronousProcessors: cfg.SynchronousProcessors,
	}

	// Initialize the resources of the supervisor backend.
	// Stop the supervisor if any of the resources fails to be initialized.
	if err := super.initResources(ctx, cfg); err != nil {
		err = fmt.Errorf("failed to init resources: %w", err)
		return nil, errors.Join(err, super.Stop(ctx))
96 97
	}

98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
	return super, nil
}

// initResources initializes all the resources, such as DBs and processors for chains.
// An error may returned, without closing the thus-far initialized resources.
// Upon error the caller should call Stop() on the supervisor backend to clean up and release resources.
func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Config) error {
	chains := su.depSet.Chains()

	// for each chain known to the dependency set, create the necessary DB resources
	for _, chainID := range chains {
		if err := su.openChainDBs(chainID); err != nil {
			return fmt.Errorf("failed to open chain %s: %w", chainID, err)
		}
	}

	// for each chain initialize a chain processor service
	for _, chainID := range chains {
		logProcessor := processors.NewLogProcessor(chainID, su.chainDBs)
		chainProcessor := processors.NewChainProcessor(su.logger, chainID, logProcessor, su.chainDBs)
		su.chainProcessors[chainID] = chainProcessor
	}

	// the config has some RPC connections to attach to the chain-processors
122
	for _, rpc := range cfg.L2RPCs {
123
		err := su.attachRPC(ctx, rpc)
124
		if err != nil {
125
			return fmt.Errorf("failed to add chain monitor for rpc %v: %w", rpc, err)
126
		}
127
	}
128
	return nil
129
}
130

131 132 133 134 135 136 137 138
// openChainDBs initializes all the DB resources of a specific chain.
// It is a sub-task of initResources.
func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error {
	cm := newChainMetrics(chainID, su.m)
	// create metrics and a logdb for the chain
	su.chainMetrics[chainID] = cm

	logDB, err := db.OpenLogDB(su.logger, chainID, su.dataDir, cm)
139
	if err != nil {
140
		return fmt.Errorf("failed to open logDB of chain %s: %w", chainID, err)
141
	}
142 143 144
	su.chainDBs.AddLogDB(chainID, logDB)

	localDB, err := db.OpenLocalDerivedFromDB(su.logger, chainID, su.dataDir, cm)
145
	if err != nil {
146
		return fmt.Errorf("failed to open local derived-from DB of chain %s: %w", chainID, err)
147
	}
148 149 150
	su.chainDBs.AddLocalDerivedFromDB(chainID, localDB)

	crossDB, err := db.OpenCrossDerivedFromDB(su.logger, chainID, su.dataDir, cm)
151
	if err != nil {
152
		return fmt.Errorf("failed to open cross derived-from DB of chain %s: %w", chainID, err)
153
	}
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
	su.chainDBs.AddCrossDerivedFromDB(chainID, crossDB)

	su.chainDBs.AddCrossUnsafeTracker(chainID)
	return nil
}

func (su *SupervisorBackend) attachRPC(ctx context.Context, rpc string) error {
	su.logger.Info("attaching RPC to chain processor", "rpc", rpc)

	logger := su.logger.New("rpc", rpc)
	// create the rpc client, which yields the chain id
	rpcClient, chainID, err := clientForL2(ctx, logger, rpc)
	if err != nil {
		return err
	}
	if !su.depSet.HasChain(chainID) {
		return fmt.Errorf("chain %s is not part of the interop dependency set: %w", chainID, db.ErrUnknownChain)
171
	}
172 173 174 175 176
	cm, ok := su.chainMetrics[chainID]
	if !ok {
		return fmt.Errorf("failed to find metrics for chain %v", chainID)
	}
	// create an RPC client that the processor can use
177 178
	cl, err := processors.NewEthClient(
		ctx,
179
		logger.New("chain", chainID),
180 181 182 183 184
		cm,
		rpc,
		rpcClient, 2*time.Second,
		false,
		sources.RPCKindStandard)
185
	if err != nil {
186
		return err
187
	}
188 189 190 191 192 193 194 195 196 197 198 199
	return su.AttachProcessorSource(chainID, cl)
}

func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src processors.Source) error {
	su.mu.RLock()
	defer su.mu.RUnlock()

	proc, ok := su.chainProcessors[chainID]
	if !ok {
		return fmt.Errorf("unknown chain %s, cannot attach RPC to processor", chainID)
	}
	proc.SetSource(src)
200
	return nil
201 202
}

203
func clientForL2(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
204 205
	ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
	if err != nil {
206
		return nil, types.ChainID{}, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
207 208 209
	}
	chainID, err := ethClient.ChainID(ctx)
	if err != nil {
210
		return nil, types.ChainID{}, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err)
211
	}
212
	return client.NewBaseRPCClient(ethClient.Client()), types.ChainIDFromBig(chainID), nil
213 214
}

215
func (su *SupervisorBackend) Start(ctx context.Context) error {
216 217 218
	su.mu.Lock()
	defer su.mu.Unlock()

219
	// ensure we only start once
220 221 222
	if !su.started.CompareAndSwap(false, true) {
		return errors.New("already started")
	}
223

224 225
	// initiate "ResumeFromLastSealedBlock" on the chains db,
	// which rewinds the database to the last block that is guaranteed to have been fully recorded
226
	if err := su.chainDBs.ResumeFromLastSealedBlock(); err != nil {
227 228
		return fmt.Errorf("failed to resume chains db: %w", err)
	}
229 230 231 232 233 234 235 236

	if !su.synchronousProcessors {
		// Make all the chain-processors run automatic background processing
		for _, processor := range su.chainProcessors {
			processor.StartBackground()
		}
	}

237 238 239 240
	return nil
}

func (su *SupervisorBackend) Stop(ctx context.Context) error {
241 242 243
	su.mu.Lock()
	defer su.mu.Unlock()

244
	if !su.started.CompareAndSwap(true, false) {
245
		return errAlreadyStopped
246
	}
247 248 249 250
	// close all processors
	for id, processor := range su.chainProcessors {
		su.logger.Info("stopping chain processor", "chainID", id)
		processor.Close()
251
	}
252 253
	clear(su.chainProcessors)
	// close the databases
254
	return su.chainDBs.Close()
255 256
}

257
// AddL2RPC attaches an RPC as the RPC for the given chain, overriding the previous RPC source, if any.
258
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error {
259 260
	su.mu.RLock() // read-lock: we only modify an existing chain, we don't add/remove chains
	defer su.mu.RUnlock()
261

262
	return su.attachRPC(ctx, rpc)
263 264
}

265 266 267
// Query methods
// ----------------------------

268
func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
269 270 271
	su.mu.RLock()
	defer su.mu.RUnlock()

272 273 274
	chainID := identifier.ChainID
	blockNum := identifier.BlockNumber
	logIdx := identifier.LogIndex
275
	_, err := su.chainDBs.Check(chainID, blockNum, uint32(logIdx), payloadHash)
276
	if errors.Is(err, entrydb.ErrFuture) {
277
		return types.LocalUnsafe, nil
278
	}
279
	if errors.Is(err, entrydb.ErrConflict) {
280 281
		return types.Invalid, nil
	}
282 283 284
	if err != nil {
		return types.Invalid, fmt.Errorf("failed to check log: %w", err)
	}
285
	return su.chainDBs.Safest(chainID, blockNum, uint32(logIdx))
286 287
}

288 289 290
func (su *SupervisorBackend) CheckMessages(
	messages []types.Message,
	minSafety types.SafetyLevel) error {
291 292 293
	su.mu.RLock()
	defer su.mu.RUnlock()

294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
	for _, msg := range messages {
		safety, err := su.CheckMessage(msg.Identifier, msg.PayloadHash)
		if err != nil {
			return fmt.Errorf("failed to check message: %w", err)
		}
		if !safety.AtLeastAsSafe(minSafety) {
			return fmt.Errorf("message %v (safety level: %v) does not meet the minimum safety %v",
				msg.Identifier,
				safety,
				minSafety)
		}
	}
	return nil
}

309 310 311 312
func (su *SupervisorBackend) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) {
	su.mu.RLock()
	defer su.mu.RUnlock()

313
	head, err := su.chainDBs.LocalUnsafe(chainID)
314 315
	if err != nil {
		return types.ReferenceView{}, fmt.Errorf("failed to get local-unsafe head: %w", err)
316
	}
317
	cross, err := su.chainDBs.CrossUnsafe(chainID)
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
	if err != nil {
		return types.ReferenceView{}, fmt.Errorf("failed to get cross-unsafe head: %w", err)
	}

	// TODO(#11693): check `unsafe` input to detect reorg conflicts

	return types.ReferenceView{
		Local: head.ID(),
		Cross: cross.ID(),
	}, nil
}

func (su *SupervisorBackend) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) {
	su.mu.RLock()
	defer su.mu.RUnlock()

334
	_, localSafe, err := su.chainDBs.LocalSafe(chainID)
335 336
	if err != nil {
		return types.ReferenceView{}, fmt.Errorf("failed to get local-safe head: %w", err)
337
	}
338
	_, crossSafe, err := su.chainDBs.CrossSafe(chainID)
339
	if err != nil {
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
		return types.ReferenceView{}, fmt.Errorf("failed to get cross-safe head: %w", err)
	}

	// TODO(#11693): check `safe` input to detect reorg conflicts

	return types.ReferenceView{
		Local: localSafe.ID(),
		Cross: crossSafe.ID(),
	}, nil
}

func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
	su.mu.RLock()
	defer su.mu.RUnlock()

355 356 357 358 359
	v, err := su.chainDBs.Finalized(chainID)
	if err != nil {
		return eth.BlockID{}, err
	}
	return v.ID(), nil
360 361 362 363 364 365
}

func (su *SupervisorBackend) DerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockID, err error) {
	su.mu.RLock()
	defer su.mu.RUnlock()

366 367 368 369 370
	v, err := su.chainDBs.DerivedFrom(chainID, derived)
	if err != nil {
		return eth.BlockID{}, err
	}
	return v.ID(), nil
371 372 373 374 375 376 377 378 379 380 381
}

// Update methods
// ----------------------------

func (su *SupervisorBackend) UpdateLocalUnsafe(chainID types.ChainID, head eth.BlockRef) error {
	su.mu.RLock()
	defer su.mu.RUnlock()
	ch, ok := su.chainProcessors[chainID]
	if !ok {
		return db.ErrUnknownChain
382
	}
383 384 385 386 387 388 389
	return ch.OnNewHead(head)
}

func (su *SupervisorBackend) UpdateLocalSafe(chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
	su.mu.RLock()
	defer su.mu.RUnlock()

390
	return su.chainDBs.UpdateLocalSafe(chainID, derivedFrom, lastDerived)
391
}
392

393 394 395 396
func (su *SupervisorBackend) UpdateFinalizedL1(chainID types.ChainID, finalized eth.BlockRef) error {
	su.mu.RLock()
	defer su.mu.RUnlock()

397
	return su.chainDBs.UpdateFinalizedL1(finalized)
398
}