Commit 1ebbd291 authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

use RWMap for Supervisor Backend (#12785)

* use RWMap for Supervisor Backend

* adjust Clear method
parent 39dc0794
......@@ -46,3 +46,10 @@ func (m *RWMap[K, V]) Range(f func(key K, value V) bool) {
}
}
}
// Clear removes all key-value pairs from the map.
func (m *RWMap[K, V]) Clear() {
m.mu.Lock()
defer m.mu.Unlock()
clear(m.inner)
}
......@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
......@@ -14,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/locks"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/cross"
......@@ -30,11 +30,6 @@ type SupervisorBackend struct {
m Metrics
dataDir string
// RW lock to avoid concurrent map mutations.
// Read = any chain may be used and mutated.
// Write = set of chains is changing.
mu sync.RWMutex
// depSet is the dependency set that the backend uses to know about the chains it is indexing
depSet depset.DependencySet
......@@ -42,21 +37,21 @@ type SupervisorBackend struct {
chainDBs *db.ChainsDB
// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
chainProcessors map[types.ChainID]*processors.ChainProcessor
chainProcessors locks.RWMap[types.ChainID, *processors.ChainProcessor]
// crossSafeProcessors take local-safe data and promote it to cross-safe when verified
crossSafeProcessors map[types.ChainID]*cross.Worker
crossSafeProcessors locks.RWMap[types.ChainID, *cross.Worker]
// crossUnsafeProcessors take local-unsafe data and promote it to cross-unsafe when verified
crossUnsafeProcessors map[types.ChainID]*cross.Worker
crossUnsafeProcessors locks.RWMap[types.ChainID, *cross.Worker]
// chainMetrics are used to track metrics for each chain
// they are reused for processors and databases of the same chain
chainMetrics locks.RWMap[types.ChainID, *chainMetrics]
// synchronousProcessors disables background-workers,
// requiring manual triggers for the backend to process anything.
synchronousProcessors bool
// chainMetrics are used to track metrics for each chain
// they are reused for processors and databases of the same chain
chainMetrics map[types.ChainID]*chainMetrics
}
var _ frontend.Backend = (*SupervisorBackend)(nil)
......@@ -74,7 +69,6 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
if err != nil {
return nil, fmt.Errorf("failed to load dependency set: %w", err)
}
chains := depSet.Chains()
// create initial per-chain resources
chainsDBs := db.NewChainsDB(logger, depSet)
......@@ -86,10 +80,6 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
dataDir: cfg.Datadir,
depSet: depSet,
chainDBs: chainsDBs,
chainProcessors: make(map[types.ChainID]*processors.ChainProcessor, len(chains)),
chainMetrics: make(map[types.ChainID]*chainMetrics, len(chains)),
crossUnsafeProcessors: make(map[types.ChainID]*cross.Worker, len(chains)),
crossSafeProcessors: make(map[types.ChainID]*cross.Worker, len(chains)),
// For testing we can avoid running the processors.
synchronousProcessors: cfg.SynchronousProcessors,
}
......@@ -120,19 +110,19 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
// initialize all cross-unsafe processors
for _, chainID := range chains {
worker := cross.NewCrossUnsafeWorker(su.logger, chainID, su.chainDBs)
su.crossUnsafeProcessors[chainID] = worker
su.crossUnsafeProcessors.Set(chainID, worker)
}
// initialize all cross-safe processors
for _, chainID := range chains {
worker := cross.NewCrossSafeWorker(su.logger, chainID, su.chainDBs)
su.crossSafeProcessors[chainID] = worker
su.crossSafeProcessors.Set(chainID, worker)
}
// For each chain initialize a chain processor service,
// after cross-unsafe workers are ready to receive updates
for _, chainID := range chains {
logProcessor := processors.NewLogProcessor(chainID, su.chainDBs)
chainProcessor := processors.NewChainProcessor(su.logger, chainID, logProcessor, su.chainDBs, su.onIndexedLocalUnsafeData)
su.chainProcessors[chainID] = chainProcessor
su.chainProcessors.Set(chainID, chainProcessor)
}
// the config has some RPC connections to attach to the chain-processors
......@@ -148,31 +138,27 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
// onIndexedLocalUnsafeData is called by the event indexing workers.
// This signals to cross-unsafe workers that there's data to index.
func (su *SupervisorBackend) onIndexedLocalUnsafeData() {
su.mu.RLock()
defer su.mu.RUnlock()
// We signal all workers, since dependencies on a chain may be unblocked
// by new data on other chains.
// Busy workers don't block processing.
// The signal is picked up only if the worker is running in the background.
for _, w := range su.crossUnsafeProcessors {
su.crossUnsafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool {
w.OnNewData()
}
return true
})
}
// onNewLocalSafeData is called by the safety-indexing.
// This signals to cross-safe workers that there's data to index.
func (su *SupervisorBackend) onNewLocalSafeData() {
su.mu.RLock()
defer su.mu.RUnlock()
// We signal all workers, since dependencies on a chain may be unblocked
// by new data on other chains.
// Busy workers don't block processing.
// The signal is picked up only if the worker is running in the background.
for _, w := range su.crossSafeProcessors {
su.crossSafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool {
w.OnNewData()
}
return true
})
}
// openChainDBs initializes all the DB resources of a specific chain.
......@@ -180,7 +166,7 @@ func (su *SupervisorBackend) onNewLocalSafeData() {
func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error {
cm := newChainMetrics(chainID, su.m)
// create metrics and a logdb for the chain
su.chainMetrics[chainID] = cm
su.chainMetrics.Set(chainID, cm)
logDB, err := db.OpenLogDB(su.logger, chainID, su.dataDir, cm)
if err != nil {
......@@ -216,7 +202,7 @@ func (su *SupervisorBackend) attachRPC(ctx context.Context, rpc string) error {
if !su.depSet.HasChain(chainID) {
return fmt.Errorf("chain %s is not part of the interop dependency set: %w", chainID, types.ErrUnknownChain)
}
cm, ok := su.chainMetrics[chainID]
cm, ok := su.chainMetrics.Get(chainID)
if !ok {
return fmt.Errorf("failed to find metrics for chain %v", chainID)
}
......@@ -236,10 +222,7 @@ func (su *SupervisorBackend) attachRPC(ctx context.Context, rpc string) error {
}
func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src processors.Source) error {
su.mu.RLock()
defer su.mu.RUnlock()
proc, ok := su.chainProcessors[chainID]
proc, ok := su.chainProcessors.Get(chainID)
if !ok {
return fmt.Errorf("unknown chain %s, cannot attach RPC to processor", chainID)
}
......@@ -260,9 +243,6 @@ func clientForL2(ctx context.Context, logger log.Logger, rpc string) (client.RPC
}
func (su *SupervisorBackend) Start(ctx context.Context) error {
su.mu.Lock()
defer su.mu.Unlock()
// ensure we only start once
if !su.started.CompareAndSwap(false, true) {
return errors.New("already started")
......@@ -276,46 +256,49 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
if !su.synchronousProcessors {
// Make all the chain-processors run automatic background processing
for _, processor := range su.chainProcessors {
su.chainProcessors.Range(func(_ types.ChainID, processor *processors.ChainProcessor) bool {
processor.StartBackground()
}
for _, worker := range su.crossUnsafeProcessors {
return true
})
su.crossUnsafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool {
worker.StartBackground()
}
for _, worker := range su.crossSafeProcessors {
return true
})
su.crossSafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool {
worker.StartBackground()
}
return true
})
}
return nil
}
func (su *SupervisorBackend) Stop(ctx context.Context) error {
su.mu.Lock()
defer su.mu.Unlock()
if !su.started.CompareAndSwap(true, false) {
return errAlreadyStopped
}
su.logger.Info("Closing supervisor backend")
// close all processors
for id, processor := range su.chainProcessors {
su.chainProcessors.Range(func(id types.ChainID, processor *processors.ChainProcessor) bool {
su.logger.Info("stopping chain processor", "chainID", id)
processor.Close()
}
clear(su.chainProcessors)
return true
})
su.chainProcessors.Clear()
for id, worker := range su.crossUnsafeProcessors {
su.crossUnsafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool {
su.logger.Info("stopping cross-unsafe processor", "chainID", id)
worker.Close()
}
clear(su.crossUnsafeProcessors)
return true
})
su.crossUnsafeProcessors.Clear()
for id, worker := range su.crossSafeProcessors {
su.crossSafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool {
su.logger.Info("stopping cross-safe processor", "chainID", id)
worker.Close()
}
clear(su.crossSafeProcessors)
return true
})
su.crossSafeProcessors.Clear()
// close the databases
return su.chainDBs.Close()
......@@ -323,9 +306,6 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
// AddL2RPC attaches an RPC as the RPC for the given chain, overriding the previous RPC source, if any.
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error {
su.mu.RLock() // read-lock: we only modify an existing chain, we don't add/remove chains
defer su.mu.RUnlock()
return su.attachRPC(ctx, rpc)
}
......@@ -340,9 +320,6 @@ func (su *SupervisorBackend) DependencySet() depset.DependencySet {
// ----------------------------
func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
su.mu.RLock()
defer su.mu.RUnlock()
logHash := types.PayloadHashToLogHash(payloadHash, identifier.Origin)
chainID := identifier.ChainID
blockNum := identifier.BlockNumber
......@@ -365,9 +342,6 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa
func (su *SupervisorBackend) CheckMessages(
messages []types.Message,
minSafety types.SafetyLevel) error {
su.mu.RLock()
defer su.mu.RUnlock()
su.logger.Debug("Checking messages", "count", len(messages), "minSafety", minSafety)
for _, msg := range messages {
......@@ -393,9 +367,6 @@ func (su *SupervisorBackend) CheckMessages(
}
func (su *SupervisorBackend) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) {
su.mu.RLock()
defer su.mu.RUnlock()
head, err := su.chainDBs.LocalUnsafe(chainID)
if err != nil {
return types.ReferenceView{}, fmt.Errorf("failed to get local-unsafe head: %w", err)
......@@ -414,9 +385,6 @@ func (su *SupervisorBackend) UnsafeView(ctx context.Context, chainID types.Chain
}
func (su *SupervisorBackend) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) {
su.mu.RLock()
defer su.mu.RUnlock()
_, localSafe, err := su.chainDBs.LocalSafe(chainID)
if err != nil {
return types.ReferenceView{}, fmt.Errorf("failed to get local-safe head: %w", err)
......@@ -435,9 +403,6 @@ func (su *SupervisorBackend) SafeView(ctx context.Context, chainID types.ChainID
}
func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
su.mu.RLock()
defer su.mu.RUnlock()
v, err := su.chainDBs.Finalized(chainID)
if err != nil {
return eth.BlockID{}, err
......@@ -446,9 +411,6 @@ func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainI
}
func (su *SupervisorBackend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) {
su.mu.RLock()
defer su.mu.RUnlock()
v, err := su.chainDBs.CrossDerivedFromBlockRef(chainID, derived)
if err != nil {
return eth.BlockRef{}, err
......@@ -460,9 +422,7 @@ func (su *SupervisorBackend) CrossDerivedFrom(ctx context.Context, chainID types
// ----------------------------
func (su *SupervisorBackend) UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error {
su.mu.RLock()
defer su.mu.RUnlock()
ch, ok := su.chainProcessors[chainID]
ch, ok := su.chainProcessors.Get(chainID)
if !ok {
return types.ErrUnknownChain
}
......@@ -470,9 +430,6 @@ func (su *SupervisorBackend) UpdateLocalUnsafe(ctx context.Context, chainID type
}
func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
su.mu.RLock()
defer su.mu.RUnlock()
err := su.chainDBs.UpdateLocalSafe(chainID, derivedFrom, lastDerived)
if err != nil {
return err
......@@ -482,9 +439,6 @@ func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types.
}
func (su *SupervisorBackend) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.BlockRef) error {
su.mu.RLock()
defer su.mu.RUnlock()
return su.chainDBs.UpdateFinalizedL1(finalized)
}
......@@ -492,9 +446,7 @@ func (su *SupervisorBackend) UpdateFinalizedL1(ctx context.Context, chainID type
// ----------------------------
func (su *SupervisorBackend) SyncEvents(chainID types.ChainID) error {
su.mu.RLock()
defer su.mu.RUnlock()
ch, ok := su.chainProcessors[chainID]
ch, ok := su.chainProcessors.Get(chainID)
if !ok {
return types.ErrUnknownChain
}
......@@ -503,9 +455,7 @@ func (su *SupervisorBackend) SyncEvents(chainID types.ChainID) error {
}
func (su *SupervisorBackend) SyncCrossUnsafe(chainID types.ChainID) error {
su.mu.RLock()
defer su.mu.RUnlock()
ch, ok := su.crossUnsafeProcessors[chainID]
ch, ok := su.crossUnsafeProcessors.Get(chainID)
if !ok {
return types.ErrUnknownChain
}
......@@ -513,9 +463,7 @@ func (su *SupervisorBackend) SyncCrossUnsafe(chainID types.ChainID) error {
}
func (su *SupervisorBackend) SyncCrossSafe(chainID types.ChainID) error {
su.mu.RLock()
defer su.mu.RUnlock()
ch, ok := su.crossSafeProcessors[chainID]
ch, ok := su.crossSafeProcessors.Get(chainID)
if !ok {
return types.ErrUnknownChain
}
......
......@@ -117,7 +117,8 @@ func TestBackendLifetime(t *testing.T) {
require.NoError(t, err)
// Make the processing happen, so we can rely on the new chain information,
// and not run into errors for future data that isn't mocked at this time.
b.chainProcessors[chainA].ProcessToHead()
proc, _ := b.chainProcessors.Get(chainA)
proc.ProcessToHead()
_, err = b.UnsafeView(context.Background(), chainA, types.ReferenceView{})
require.ErrorIs(t, err, types.ErrFuture, "still no data yet, need cross-unsafe")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment