Commit 747c9e7d authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

Add Admin RPC for Adding New Chain Monitors (#11792)

* Add Admin RPC for Adding New Chain Monitors

* Update op-supervisor/supervisor/backend/db/db.go

---------
Co-authored-by: default avatarprotolambda <proto@protolambda.com>
parent fb735d6d
...@@ -25,11 +25,16 @@ import ( ...@@ -25,11 +25,16 @@ import (
) )
type SupervisorBackend struct { type SupervisorBackend struct {
ctx context.Context
started atomic.Bool started atomic.Bool
logger log.Logger logger log.Logger
m Metrics
dataDir string
chainMonitors []*source.ChainMonitor chainMonitors map[types.ChainID]*source.ChainMonitor
db *db.ChainsDB db *db.ChainsDB
maintenanceCancel context.CancelFunc
} }
var _ frontend.Backend = (*SupervisorBackend)(nil) var _ frontend.Backend = (*SupervisorBackend)(nil)
...@@ -37,53 +42,70 @@ var _ frontend.Backend = (*SupervisorBackend)(nil) ...@@ -37,53 +42,70 @@ var _ frontend.Backend = (*SupervisorBackend)(nil)
var _ io.Closer = (*SupervisorBackend)(nil) var _ io.Closer = (*SupervisorBackend)(nil)
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) { func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
// attempt to prepare the data directory
if err := prepDataDir(cfg.Datadir); err != nil { if err := prepDataDir(cfg.Datadir); err != nil {
return nil, err return nil, err
} }
// create the head tracker
headTracker, err := heads.NewHeadTracker(filepath.Join(cfg.Datadir, "heads.json")) headTracker, err := heads.NewHeadTracker(filepath.Join(cfg.Datadir, "heads.json"))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to load existing heads: %w", err) return nil, fmt.Errorf("failed to load existing heads: %w", err)
} }
logDBs := make(map[types.ChainID]db.LogStorage)
chainRPCs := make(map[types.ChainID]string) // create the chains db
chainClients := make(map[types.ChainID]client.RPC) db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, headTracker)
// create an empty map of chain monitors
chainMonitors := make(map[types.ChainID]*source.ChainMonitor, len(cfg.L2RPCs))
// create the supervisor backend
super := &SupervisorBackend{
logger: logger,
m: m,
dataDir: cfg.Datadir,
chainMonitors: chainMonitors,
db: db,
}
// from the RPC strings, have the supervisor backend create a chain monitor
for _, rpc := range cfg.L2RPCs { for _, rpc := range cfg.L2RPCs {
rpcClient, chainID, err := createRpcClient(ctx, logger, rpc) err := super.addFromRPC(ctx, logger, rpc)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to add chain monitor for rpc %v: %w", rpc, err)
} }
cm := newChainMetrics(chainID, m)
path, err := prepLogDBPath(chainID, cfg.Datadir)
if err != nil {
return nil, fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err)
}
logDB, err := logs.NewFromFile(logger, cm, path)
if err != nil {
return nil, fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
}
logDBs[chainID] = logDB
chainRPCs[chainID] = rpc
chainClients[chainID] = rpcClient
}
chainsDB := db.NewChainsDB(logDBs, headTracker)
if err := chainsDB.Resume(); err != nil {
return nil, fmt.Errorf("failed to resume chains db: %w", err)
} }
return super, nil
}
chainMonitors := make([]*source.ChainMonitor, 0, len(cfg.L2RPCs)) // addFromRPC adds a chain monitor to the supervisor backend from an rpc endpoint
for chainID, rpc := range chainRPCs { // it does not expect to be called after the backend has been started
cm := newChainMetrics(chainID, m) func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, rpc string) error {
monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, chainClients[chainID], chainsDB) // create the rpc client, which yields the chain id
if err != nil { rpcClient, chainID, err := createRpcClient(su.ctx, logger, rpc)
return nil, fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err) if err != nil {
} return err
chainMonitors = append(chainMonitors, monitor)
} }
return &SupervisorBackend{ // create metrics and a logdb for the chain
logger: logger, cm := newChainMetrics(chainID, su.m)
chainMonitors: chainMonitors, path, err := prepLogDBPath(chainID, su.dataDir)
db: chainsDB, if err != nil {
}, nil return fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err)
}
logDB, err := logs.NewFromFile(logger, cm, path)
if err != nil {
return fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
}
monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, rpcClient, su.db)
if err != nil {
return fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
}
if su.chainMonitors[chainID] != nil {
return fmt.Errorf("chain monitor for chain %v already exists", chainID)
}
su.chainMonitors[chainID] = monitor
su.db.AddLogDB(chainID, logDB)
return nil
} }
func createRpcClient(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) { func createRpcClient(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
...@@ -99,9 +121,14 @@ func createRpcClient(ctx context.Context, logger log.Logger, rpc string) (client ...@@ -99,9 +121,14 @@ func createRpcClient(ctx context.Context, logger log.Logger, rpc string) (client
} }
func (su *SupervisorBackend) Start(ctx context.Context) error { func (su *SupervisorBackend) Start(ctx context.Context) error {
// ensure we only start once
if !su.started.CompareAndSwap(false, true) { if !su.started.CompareAndSwap(false, true) {
return errors.New("already started") return errors.New("already started")
} }
// initiate "Resume" on the chains db, which rewinds the database to the last block that is guaranteed to have been fully recorded
if err := su.db.Resume(); err != nil {
return fmt.Errorf("failed to resume chains db: %w", err)
}
// start chain monitors // start chain monitors
for _, monitor := range su.chainMonitors { for _, monitor := range su.chainMonitors {
if err := monitor.Start(); err != nil { if err := monitor.Start(); err != nil {
...@@ -109,7 +136,9 @@ func (su *SupervisorBackend) Start(ctx context.Context) error { ...@@ -109,7 +136,9 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
} }
} }
// start db maintenance loop // start db maintenance loop
su.db.StartCrossHeadMaintenance(ctx) maintinenceCtx, cancel := context.WithCancel(ctx)
su.db.StartCrossHeadMaintenance(maintinenceCtx)
su.maintenanceCancel = cancel
return nil return nil
} }
...@@ -117,12 +146,16 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error { ...@@ -117,12 +146,16 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
if !su.started.CompareAndSwap(true, false) { if !su.started.CompareAndSwap(true, false) {
return errors.New("already stopped") return errors.New("already stopped")
} }
// signal the maintenance loop to stop
su.maintenanceCancel()
// collect errors from stopping chain monitors
var errs error var errs error
for _, monitor := range su.chainMonitors { for _, monitor := range su.chainMonitors {
if err := monitor.Stop(); err != nil { if err := monitor.Stop(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err)) errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err))
} }
} }
// close the database
if err := su.db.Close(); err != nil { if err := su.db.Close(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to close database: %w", err)) errs = errors.Join(errs, fmt.Errorf("failed to close database: %w", err))
} }
...@@ -134,6 +167,18 @@ func (su *SupervisorBackend) Close() error { ...@@ -134,6 +167,18 @@ func (su *SupervisorBackend) Close() error {
return nil return nil
} }
// AddL2RPC adds a new L2 chain to the supervisor backend
// it stops and restarts the backend to add the new chain
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error {
if err := su.Stop(ctx); err != nil {
return fmt.Errorf("failed to stop backend: %w", err)
}
if err := su.addFromRPC(ctx, su.logger, rpc); err != nil {
return fmt.Errorf("failed to add chain monitor: %w", err)
}
return su.Start(ctx)
}
func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) { func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
chainID := identifier.ChainID chainID := identifier.ChainID
blockNum := identifier.BlockNumber blockNum := identifier.BlockNumber
......
...@@ -52,9 +52,17 @@ func NewChainsDB(logDBs map[types.ChainID]LogStorage, heads HeadsStorage) *Chain ...@@ -52,9 +52,17 @@ func NewChainsDB(logDBs map[types.ChainID]LogStorage, heads HeadsStorage) *Chain
} }
} }
func (db *ChainsDB) AddLogDB(chain types.ChainID, logDB LogStorage) {
if db.logDBs[chain] != nil {
log.Warn("overwriting existing logDB for chain", "chain", chain)
}
db.logDBs[chain] = logDB
}
// Resume prepares the chains db to resume recording events after a restart. // Resume prepares the chains db to resume recording events after a restart.
// It rewinds the database to the last block that is guaranteed to have been fully recorded to the database // It rewinds the database to the last block that is guaranteed to have been fully recorded to the database
// to ensure it can resume recording from the first log of the next block. // to ensure it can resume recording from the first log of the next block.
// TODO(#11793): we can rename this to something more descriptive like "PrepareWithRollback"
func (db *ChainsDB) Resume() error { func (db *ChainsDB) Resume() error {
for chain, logStore := range db.logDBs { for chain, logStore := range db.logDBs {
if err := Resume(logStore); err != nil { if err := Resume(logStore); err != nil {
......
...@@ -39,6 +39,10 @@ func (m *MockBackend) Stop(ctx context.Context) error { ...@@ -39,6 +39,10 @@ func (m *MockBackend) Stop(ctx context.Context) error {
return nil return nil
} }
func (m *MockBackend) AddL2RPC(ctx context.Context, rpc string) error {
return nil
}
func (m *MockBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) { func (m *MockBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
return types.CrossUnsafe, nil return types.CrossUnsafe, nil
} }
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
type AdminBackend interface { type AdminBackend interface {
Start(ctx context.Context) error Start(ctx context.Context) error
Stop(ctx context.Context) error Stop(ctx context.Context) error
AddL2RPC(ctx context.Context, rpc string) error
} }
type QueryBackend interface { type QueryBackend interface {
...@@ -61,3 +62,8 @@ func (a *AdminFrontend) Start(ctx context.Context) error { ...@@ -61,3 +62,8 @@ func (a *AdminFrontend) Start(ctx context.Context) error {
func (a *AdminFrontend) Stop(ctx context.Context) error { func (a *AdminFrontend) Stop(ctx context.Context) error {
return a.Supervisor.Stop(ctx) return a.Supervisor.Stop(ctx)
} }
// AddL2RPC adds a new L2 chain to the supervisor backend
func (a *AdminFrontend) AddL2RPC(ctx context.Context, rpc string) error {
return a.Supervisor.AddL2RPC(ctx, rpc)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment