Commit c26ab41e authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

Initialize using Dependency Set Configuration (#12495)

* Initialize using Dependency Set Configuration

* op-supervisor: init fromda, route fromda metrics, handle cross-unsafe, improve backend resource initialization

* op-supervisor: attach RPC, create processors upfront, implement backend test

* op-supervisor: fix dependency set configuration and test setup

* Update op-supervisor/supervisor/backend/backend.go
Co-authored-by: default avatarAxel Kingsley <axel.kingsley@gmail.com>

---------
Co-authored-by: default avatarprotolambda <proto@protolambda.com>
parent 7ce91656
......@@ -419,7 +419,7 @@ func (s *interopE2ESystem) newL2(id string, l2Out *interopgen.L2Output) l2Set {
func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService {
// Be verbose with op-supervisor, it's in early test phase
logger := testlog.Logger(s.t, log.LevelDebug).New("role", "supervisor")
cfg := supervisorConfig.Config{
cfg := &supervisorConfig.Config{
MetricsConfig: metrics.CLIConfig{
Enabled: false,
},
......@@ -441,9 +441,9 @@ func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService {
depSet := &depset.StaticConfigDependencySet{
Dependencies: make(map[supervisortypes.ChainID]*depset.StaticConfigDependency),
}
for id := range s.l2s {
cfg.L2RPCs = append(cfg.L2RPCs, s.l2s[id].l2Geth.UserRPC().RPC())
chainID := supervisortypes.ChainIDFromBig(s.l2s[id].chainID)
// Iterate over the L2 chain configs. The L2 nodes don't exist yet.
for _, l2Out := range s.worldOutput.L2s {
chainID := supervisortypes.ChainIDFromBig(l2Out.Genesis.Config.ChainID)
depSet.Dependencies[chainID] = &depset.StaticConfigDependency{
ActivationTime: 0,
HistoryMinTime: 0,
......@@ -451,7 +451,7 @@ func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService {
}
cfg.DependencySetSource = depSet
// Create the supervisor with the configuration
super, err := supervisor.SupervisorFromConfig(context.Background(), &cfg, logger)
super, err := supervisor.SupervisorFromConfig(context.Background(), cfg, logger)
require.NoError(s.t, err)
// Start the supervisor
err = super.Start(context.Background())
......@@ -495,7 +495,7 @@ func (s *interopE2ESystem) prepare(t *testing.T, w worldResourcePaths) {
ctx := context.Background()
for _, l2 := range s.l2s {
err := s.SupervisorClient().AddL2RPC(ctx, l2.l2Geth.UserRPC().RPC())
require.NoError(s.t, err, "failed to add L2 RPC to supervisor", "error", err)
require.NoError(s.t, err, "failed to add L2 RPC to supervisor")
}
}
......
......@@ -29,6 +29,10 @@ type Config struct {
// MockRun runs the service with a mock backend
MockRun bool
// SynchronousProcessors disables background-workers,
// requiring manual triggers for the backend to process anything.
SynchronousProcessors bool
L2RPCs []string
Datadir string
}
......
package metrics
import (
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/prometheus/client_golang/prometheus"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
const Namespace = "op_supervisor"
......@@ -18,7 +18,7 @@ type Metricer interface {
CacheAdd(chainID types.ChainID, label string, cacheSize int, evicted bool)
CacheGet(chainID types.ChainID, label string, hit bool)
RecordDBEntryCount(chainID types.ChainID, count int64)
RecordDBEntryCount(chainID types.ChainID, kind string, count int64)
RecordDBSearchEntriesRead(chainID types.ChainID, count int64)
Document() []opmetrics.DocumentedMetric
......@@ -106,9 +106,10 @@ func NewMetrics(procName string) *Metrics {
DBEntryCountVec: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "logdb_entries_current",
Help: "Current number of entries in the log database by chain ID",
Help: "Current number of entries in the database of specified kind and chain ID",
}, []string{
"chain",
"kind",
}),
DBSearchEntriesReadVec: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns,
......@@ -159,8 +160,8 @@ func (m *Metrics) CacheGet(chainID types.ChainID, label string, hit bool) {
}
}
func (m *Metrics) RecordDBEntryCount(chainID types.ChainID, count int64) {
m.DBEntryCountVec.WithLabelValues(chainIDLabel(chainID)).Set(float64(count))
func (m *Metrics) RecordDBEntryCount(chainID types.ChainID, kind string, count int64) {
m.DBEntryCountVec.WithLabelValues(chainIDLabel(chainID), kind).Set(float64(count))
}
func (m *Metrics) RecordDBSearchEntriesRead(chainID types.ChainID, count int64) {
......
......@@ -19,5 +19,5 @@ func (*noopMetrics) RecordUp() {}
func (m *noopMetrics) CacheAdd(_ types.ChainID, _ string, _ int, _ bool) {}
func (m *noopMetrics) CacheGet(_ types.ChainID, _ string, _ bool) {}
func (m *noopMetrics) RecordDBEntryCount(_ types.ChainID, _ int64) {}
func (m *noopMetrics) RecordDBSearchEntriesRead(_ types.ChainID, _ int64) {}
func (m *noopMetrics) RecordDBEntryCount(_ types.ChainID, _ string, _ int64) {}
func (m *noopMetrics) RecordDBSearchEntriesRead(_ types.ChainID, _ int64) {}
......@@ -18,7 +18,6 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
......@@ -36,13 +35,22 @@ type SupervisorBackend struct {
// Write = set of chains is changing.
mu sync.RWMutex
// depSet is the dependency set that the backend uses to know about the chains it is indexing
depSet depset.DependencySet
// db holds on to the DB indices for each chain
db *db.ChainsDB
// chainDBs holds on to the DB indices for each chain
chainDBs *db.ChainsDB
// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
chainProcessors map[types.ChainID]*processors.ChainProcessor
// synchronousProcessors disables background-workers,
// requiring manual triggers for the backend to process anything.
synchronousProcessors bool
// chainMetrics are used to track metrics for each chain
// they are reused for processors and databases of the same chain
chainMetrics map[types.ChainID]*chainMetrics
}
var _ frontend.Backend = (*SupervisorBackend)(nil)
......@@ -51,7 +59,7 @@ var errAlreadyStopped = errors.New("already stopped")
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
// attempt to prepare the data directory
if err := prepDataDir(cfg.Datadir); err != nil {
if err := db.PrepDataDir(cfg.Datadir); err != nil {
return nil, err
}
......@@ -60,12 +68,12 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
if err != nil {
return nil, fmt.Errorf("failed to load dependency set: %w", err)
}
chains := depSet.Chains()
// create the chains db
chainsDB := db.NewChainsDB(logger)
// create an empty map of chain monitors
chainProcessors := make(map[types.ChainID]*processors.ChainProcessor, len(cfg.L2RPCs))
// create initial per-chain resources
chainsDBs := db.NewChainsDB(logger)
chainProcessors := make(map[types.ChainID]*processors.ChainProcessor, len(chains))
chainMetrics := make(map[types.ChainID]*chainMetrics, len(chains))
// create the supervisor backend
super := &SupervisorBackend{
......@@ -73,48 +81,102 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
m: m,
dataDir: cfg.Datadir,
depSet: depSet,
chainDBs: chainsDBs,
chainProcessors: chainProcessors,
db: chainsDB,
chainMetrics: chainMetrics,
// For testing we can avoid running the processors.
synchronousProcessors: cfg.SynchronousProcessors,
}
// Initialize the resources of the supervisor backend.
// Stop the supervisor if any of the resources fails to be initialized.
if err := super.initResources(ctx, cfg); err != nil {
err = fmt.Errorf("failed to init resources: %w", err)
return nil, errors.Join(err, super.Stop(ctx))
}
// from the RPC strings, have the supervisor backend create a chain monitor
// don't start the monitor yet, as we will start all monitors at once when Start is called
return super, nil
}
// initResources initializes all the resources, such as DBs and processors for chains.
// An error may returned, without closing the thus-far initialized resources.
// Upon error the caller should call Stop() on the supervisor backend to clean up and release resources.
func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Config) error {
chains := su.depSet.Chains()
// for each chain known to the dependency set, create the necessary DB resources
for _, chainID := range chains {
if err := su.openChainDBs(chainID); err != nil {
return fmt.Errorf("failed to open chain %s: %w", chainID, err)
}
}
// for each chain initialize a chain processor service
for _, chainID := range chains {
logProcessor := processors.NewLogProcessor(chainID, su.chainDBs)
chainProcessor := processors.NewChainProcessor(su.logger, chainID, logProcessor, su.chainDBs)
su.chainProcessors[chainID] = chainProcessor
}
// the config has some RPC connections to attach to the chain-processors
for _, rpc := range cfg.L2RPCs {
err := super.addFromRPC(ctx, logger, rpc, false)
err := su.attachRPC(ctx, rpc)
if err != nil {
return nil, fmt.Errorf("failed to add chain monitor for rpc %v: %w", rpc, err)
return fmt.Errorf("failed to add chain monitor for rpc %v: %w", rpc, err)
}
}
return super, nil
return nil
}
// addFromRPC adds a chain monitor to the supervisor backend from an rpc endpoint
// it does not expect to be called after the backend has been started
// it will start the monitor if shouldStart is true
func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, rpc string, _ bool) error {
// create the rpc client, which yields the chain id
rpcClient, chainID, err := clientForL2(ctx, logger, rpc)
// openChainDBs initializes all the DB resources of a specific chain.
// It is a sub-task of initResources.
func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error {
cm := newChainMetrics(chainID, su.m)
// create metrics and a logdb for the chain
su.chainMetrics[chainID] = cm
logDB, err := db.OpenLogDB(su.logger, chainID, su.dataDir, cm)
if err != nil {
return err
return fmt.Errorf("failed to open logDB of chain %s: %w", chainID, err)
}
su.logger.Info("adding from rpc connection", "rpc", rpc, "chainID", chainID)
// create metrics and a logdb for the chain
cm := newChainMetrics(chainID, su.m)
path, err := prepLogDBPath(chainID, su.dataDir)
su.chainDBs.AddLogDB(chainID, logDB)
localDB, err := db.OpenLocalDerivedFromDB(su.logger, chainID, su.dataDir, cm)
if err != nil {
return fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err)
return fmt.Errorf("failed to open local derived-from DB of chain %s: %w", chainID, err)
}
logDB, err := logs.NewFromFile(logger, cm, path, true)
su.chainDBs.AddLocalDerivedFromDB(chainID, localDB)
crossDB, err := db.OpenCrossDerivedFromDB(su.logger, chainID, su.dataDir, cm)
if err != nil {
return fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err)
return fmt.Errorf("failed to open cross derived-from DB of chain %s: %w", chainID, err)
}
if su.chainProcessors[chainID] != nil {
return fmt.Errorf("chain monitor for chain %v already exists", chainID)
su.chainDBs.AddCrossDerivedFromDB(chainID, crossDB)
su.chainDBs.AddCrossUnsafeTracker(chainID)
return nil
}
func (su *SupervisorBackend) attachRPC(ctx context.Context, rpc string) error {
su.logger.Info("attaching RPC to chain processor", "rpc", rpc)
logger := su.logger.New("rpc", rpc)
// create the rpc client, which yields the chain id
rpcClient, chainID, err := clientForL2(ctx, logger, rpc)
if err != nil {
return err
}
if !su.depSet.HasChain(chainID) {
return fmt.Errorf("chain %s is not part of the interop dependency set: %w", chainID, db.ErrUnknownChain)
}
// create a client like the monitor would have
cm, ok := su.chainMetrics[chainID]
if !ok {
return fmt.Errorf("failed to find metrics for chain %v", chainID)
}
// create an RPC client that the processor can use
cl, err := processors.NewEthClient(
ctx,
logger,
logger.New("chain", chainID),
cm,
rpc,
rpcClient, 2*time.Second,
......@@ -123,10 +185,18 @@ func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger,
if err != nil {
return err
}
logProcessor := processors.NewLogProcessor(chainID, su.db)
chainProcessor := processors.NewChainProcessor(logger, cl, chainID, logProcessor, su.db)
su.chainProcessors[chainID] = chainProcessor
su.db.AddLogDB(chainID, logDB)
return su.AttachProcessorSource(chainID, cl)
}
func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src processors.Source) error {
su.mu.RLock()
defer su.mu.RUnlock()
proc, ok := su.chainProcessors[chainID]
if !ok {
return fmt.Errorf("unknown chain %s, cannot attach RPC to processor", chainID)
}
proc.SetSource(src)
return nil
}
......@@ -150,12 +220,20 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
if !su.started.CompareAndSwap(false, true) {
return errors.New("already started")
}
// initiate "ResumeFromLastSealedBlock" on the chains db,
// which rewinds the database to the last block that is guaranteed to have been fully recorded
if err := su.db.ResumeFromLastSealedBlock(); err != nil {
if err := su.chainDBs.ResumeFromLastSealedBlock(); err != nil {
return fmt.Errorf("failed to resume chains db: %w", err)
}
// TODO(#12423): init background processors, de-dup with constructor
if !su.synchronousProcessors {
// Make all the chain-processors run automatic background processing
for _, processor := range su.chainProcessors {
processor.StartBackground()
}
}
return nil
}
......@@ -173,17 +251,15 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
}
clear(su.chainProcessors)
// close the databases
return su.db.Close()
return su.chainDBs.Close()
}
// AddL2RPC adds a new L2 chain to the supervisor backend
// it stops and restarts the backend to add the new chain
// AddL2RPC attaches an RPC as the RPC for the given chain, overriding the previous RPC source, if any.
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error {
su.mu.Lock()
defer su.mu.Unlock()
su.mu.RLock() // read-lock: we only modify an existing chain, we don't add/remove chains
defer su.mu.RUnlock()
// start the monitor immediately, as the backend is assumed to already be running
return su.addFromRPC(ctx, su.logger, rpc, true)
return su.attachRPC(ctx, rpc)
}
// Query methods
......@@ -196,7 +272,7 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa
chainID := identifier.ChainID
blockNum := identifier.BlockNumber
logIdx := identifier.LogIndex
_, err := su.db.Check(chainID, blockNum, uint32(logIdx), payloadHash)
_, err := su.chainDBs.Check(chainID, blockNum, uint32(logIdx), payloadHash)
if errors.Is(err, entrydb.ErrFuture) {
return types.LocalUnsafe, nil
}
......@@ -206,7 +282,7 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa
if err != nil {
return types.Invalid, fmt.Errorf("failed to check log: %w", err)
}
return su.db.Safest(chainID, blockNum, uint32(logIdx))
return su.chainDBs.Safest(chainID, blockNum, uint32(logIdx))
}
func (su *SupervisorBackend) CheckMessages(
......@@ -234,11 +310,11 @@ func (su *SupervisorBackend) UnsafeView(ctx context.Context, chainID types.Chain
su.mu.RLock()
defer su.mu.RUnlock()
head, err := su.db.LocalUnsafe(chainID)
head, err := su.chainDBs.LocalUnsafe(chainID)
if err != nil {
return types.ReferenceView{}, fmt.Errorf("failed to get local-unsafe head: %w", err)
}
cross, err := su.db.CrossUnsafe(chainID)
cross, err := su.chainDBs.CrossUnsafe(chainID)
if err != nil {
return types.ReferenceView{}, fmt.Errorf("failed to get cross-unsafe head: %w", err)
}
......@@ -255,11 +331,11 @@ func (su *SupervisorBackend) SafeView(ctx context.Context, chainID types.ChainID
su.mu.RLock()
defer su.mu.RUnlock()
_, localSafe, err := su.db.LocalSafe(chainID)
_, localSafe, err := su.chainDBs.LocalSafe(chainID)
if err != nil {
return types.ReferenceView{}, fmt.Errorf("failed to get local-safe head: %w", err)
}
_, crossSafe, err := su.db.CrossSafe(chainID)
_, crossSafe, err := su.chainDBs.CrossSafe(chainID)
if err != nil {
return types.ReferenceView{}, fmt.Errorf("failed to get cross-safe head: %w", err)
}
......@@ -276,14 +352,22 @@ func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainI
su.mu.RLock()
defer su.mu.RUnlock()
return su.db.Finalized(chainID)
v, err := su.chainDBs.Finalized(chainID)
if err != nil {
return eth.BlockID{}, err
}
return v.ID(), nil
}
func (su *SupervisorBackend) DerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockID, err error) {
su.mu.RLock()
defer su.mu.RUnlock()
return su.db.DerivedFrom(chainID, derived)
v, err := su.chainDBs.DerivedFrom(chainID, derived)
if err != nil {
return eth.BlockID{}, err
}
return v.ID(), nil
}
// Update methods
......@@ -303,12 +387,12 @@ func (su *SupervisorBackend) UpdateLocalSafe(chainID types.ChainID, derivedFrom
su.mu.RLock()
defer su.mu.RUnlock()
return su.db.UpdateLocalSafe(chainID, derivedFrom, lastDerived)
return su.chainDBs.UpdateLocalSafe(chainID, derivedFrom, lastDerived)
}
func (su *SupervisorBackend) UpdateFinalizedL1(chainID types.ChainID, finalized eth.BlockRef) error {
su.mu.RLock()
defer su.mu.RUnlock()
return su.db.UpdateFinalizedL1(finalized)
return su.chainDBs.UpdateFinalizedL1(finalized)
}
package backend
import (
"context"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
types2 "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
func TestBackendLifetime(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
m := metrics.NoopMetrics
dataDir := t.TempDir()
chainA := types.ChainIDFromUInt64(900)
chainB := types.ChainIDFromUInt64(901)
cfg := &config.Config{
Version: "test",
LogConfig: oplog.CLIConfig{},
MetricsConfig: opmetrics.CLIConfig{},
PprofConfig: oppprof.CLIConfig{},
RPC: oprpc.CLIConfig{},
DependencySetSource: &depset.StaticConfigDependencySet{
Dependencies: map[types.ChainID]*depset.StaticConfigDependency{
chainA: {
ActivationTime: 42,
HistoryMinTime: 100,
},
chainB: {
ActivationTime: 30,
HistoryMinTime: 20,
},
},
},
SynchronousProcessors: true,
MockRun: false,
L2RPCs: nil,
Datadir: dataDir,
}
b, err := NewSupervisorBackend(context.Background(), logger, m, cfg)
require.NoError(t, err)
t.Log("initialized!")
src := &testutils.MockL1Source{}
blockX := eth.BlockRef{
Hash: common.Hash{0xaa},
Number: 0,
ParentHash: common.Hash{}, // genesis has no parent hash
Time: 10000,
}
blockY := eth.BlockRef{
Hash: common.Hash{0xbb},
Number: blockX.Number + 1,
ParentHash: blockX.Hash,
Time: blockX.Time + 2,
}
require.NoError(t, b.AttachProcessorSource(chainA, src))
require.FileExists(t, filepath.Join(cfg.Datadir, "900", "log.db"), "must have logs DB 900")
require.FileExists(t, filepath.Join(cfg.Datadir, "901", "log.db"), "must have logs DB 901")
require.FileExists(t, filepath.Join(cfg.Datadir, "900", "local_safe.db"), "must have local safe DB 900")
require.FileExists(t, filepath.Join(cfg.Datadir, "901", "local_safe.db"), "must have local safe DB 901")
require.FileExists(t, filepath.Join(cfg.Datadir, "900", "cross_safe.db"), "must have cross safe DB 900")
require.FileExists(t, filepath.Join(cfg.Datadir, "901", "cross_safe.db"), "must have cross safe DB 901")
err = b.Start(context.Background())
require.NoError(t, err)
t.Log("started!")
_, err = b.UnsafeView(context.Background(), chainA, types.ReferenceView{})
require.ErrorIs(t, err, entrydb.ErrFuture, "no data yet, need local-unsafe")
src.ExpectL1BlockRefByNumber(0, blockX, nil)
src.ExpectFetchReceipts(blockX.Hash, &testutils.MockBlockInfo{
InfoHash: blockX.Hash,
InfoParentHash: blockX.ParentHash,
InfoNum: blockX.Number,
InfoTime: blockX.Time,
InfoReceiptRoot: types2.EmptyReceiptsHash,
}, nil, nil)
src.ExpectL1BlockRefByNumber(1, blockY, nil)
src.ExpectFetchReceipts(blockY.Hash, &testutils.MockBlockInfo{
InfoHash: blockY.Hash,
InfoParentHash: blockY.ParentHash,
InfoNum: blockY.Number,
InfoTime: blockY.Time,
InfoReceiptRoot: types2.EmptyReceiptsHash,
}, nil, nil)
src.ExpectL1BlockRefByNumber(2, eth.L1BlockRef{}, ethereum.NotFound)
err = b.UpdateLocalUnsafe(chainA, blockY)
require.NoError(t, err)
// Make the processing happen, so we can rely on the new chain information,
// and not run into errors for future data that isn't mocked at this time.
b.chainProcessors[chainA].ProcessToHead()
_, err = b.UnsafeView(context.Background(), chainA, types.ReferenceView{})
require.ErrorIs(t, err, entrydb.ErrFuture, "still no data yet, need cross-unsafe")
err = b.chainDBs.UpdateCrossUnsafe(chainA, types.BlockSeal{
Hash: blockX.Hash,
Number: blockX.Number,
Timestamp: blockX.Time,
})
require.NoError(t, err)
v, err := b.UnsafeView(context.Background(), chainA, types.ReferenceView{})
require.NoError(t, err, "have a functioning cross/local unsafe view now")
require.Equal(t, blockX.ID(), v.Cross)
require.Equal(t, blockY.ID(), v.Local)
err = b.Stop(context.Background())
require.NoError(t, err)
t.Log("stopped!")
}
......@@ -10,7 +10,7 @@ type Metrics interface {
CacheAdd(chainID types.ChainID, label string, cacheSize int, evicted bool)
CacheGet(chainID types.ChainID, label string, hit bool)
RecordDBEntryCount(chainID types.ChainID, count int64)
RecordDBEntryCount(chainID types.ChainID, kind string, count int64)
RecordDBSearchEntriesRead(chainID types.ChainID, count int64)
}
......@@ -36,8 +36,8 @@ func (c *chainMetrics) CacheGet(label string, hit bool) {
c.delegate.CacheGet(c.chainID, label, hit)
}
func (c *chainMetrics) RecordDBEntryCount(count int64) {
c.delegate.RecordDBEntryCount(c.chainID, count)
func (c *chainMetrics) RecordDBEntryCount(kind string, count int64) {
c.delegate.RecordDBEntryCount(c.chainID, kind, count)
}
func (c *chainMetrics) RecordDBSearchEntriesRead(count int64) {
......
......@@ -10,11 +10,14 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/fromda"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
var ErrUnknownChain = errors.New("unknown chain")
var (
ErrUnknownChain = errors.New("unknown chain")
)
type LogStorage interface {
io.Closer
......@@ -46,12 +49,14 @@ type LogStorage interface {
}
type LocalDerivedFromStorage interface {
Last() (derivedFrom eth.BlockRef, derived eth.BlockRef, err error)
Latest() (derivedFrom types.BlockSeal, derived types.BlockSeal, err error)
AddDerived(derivedFrom eth.BlockRef, derived eth.BlockRef) error
LastDerived(derivedFrom eth.BlockID) (derived eth.BlockID, err error)
DerivedFrom(derived eth.BlockID) (derivedFrom eth.BlockID, err error)
LastDerivedAt(derivedFrom eth.BlockID) (derived types.BlockSeal, err error)
DerivedFrom(derived eth.BlockID) (derivedFrom types.BlockSeal, err error)
}
var _ LocalDerivedFromStorage = (*fromda.DB)(nil)
type CrossDerivedFromStorage interface {
LocalDerivedFromStorage
// This will start to differ with reorg support
......@@ -71,6 +76,7 @@ type ChainsDB struct {
logDBs map[types.ChainID]LogStorage
// cross-unsafe: how far we have processed the unsafe data.
// If present but set to a zeroed value the cross-unsafe will fallback to cross-safe.
crossUnsafe map[types.ChainID]types.BlockSeal
// local-safe: index of what we optimistically know about L2 blocks being derived from L1
......@@ -97,14 +103,47 @@ func NewChainsDB(l log.Logger) *ChainsDB {
}
}
func (db *ChainsDB) AddLogDB(chain types.ChainID, logDB LogStorage) {
func (db *ChainsDB) AddLogDB(chainID types.ChainID, logDB LogStorage) {
db.mu.Lock()
defer db.mu.Unlock()
if _, ok := db.logDBs[chainID]; ok {
db.logger.Warn("overwriting existing log DB for chain", "chain", chainID)
}
db.logDBs[chainID] = logDB
}
func (db *ChainsDB) AddLocalDerivedFromDB(chainID types.ChainID, dfDB LocalDerivedFromStorage) {
db.mu.Lock()
defer db.mu.Unlock()
if _, ok := db.localDBs[chainID]; ok {
db.logger.Warn("overwriting existing local derived-from DB for chain", "chain", chainID)
}
db.localDBs[chainID] = dfDB
}
func (db *ChainsDB) AddCrossDerivedFromDB(chainID types.ChainID, dfDB CrossDerivedFromStorage) {
db.mu.Lock()
defer db.mu.Unlock()
if _, ok := db.crossDBs[chainID]; ok {
db.logger.Warn("overwriting existing cross derived-from DB for chain", "chain", chainID)
}
db.crossDBs[chainID] = dfDB
}
func (db *ChainsDB) AddCrossUnsafeTracker(chainID types.ChainID) {
db.mu.Lock()
defer db.mu.Unlock()
if db.logDBs[chain] != nil {
log.Warn("overwriting existing logDB for chain", "chain", chain)
if _, ok := db.crossUnsafe[chainID]; ok {
db.logger.Warn("overwriting existing cross-unsafe tracker for chain", "chain", chainID)
}
db.logDBs[chain] = logDB
db.crossUnsafe[chainID] = types.BlockSeal{}
}
// ResumeFromLastSealedBlock prepares the chains db to resume recording events after a restart.
......
package backend
package db
import (
"fmt"
......@@ -8,6 +8,22 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
func prepLocalDerivedFromDBPath(chainID types.ChainID, datadir string) (string, error) {
dir, err := prepChainDir(chainID, datadir)
if err != nil {
return "", err
}
return filepath.Join(dir, "local_safe.db"), nil
}
func prepCrossDerivedFromDBPath(chainID types.ChainID, datadir string) (string, error) {
dir, err := prepChainDir(chainID, datadir)
if err != nil {
return "", err
}
return filepath.Join(dir, "cross_safe.db"), nil
}
func prepLogDBPath(chainID types.ChainID, datadir string) (string, error) {
dir, err := prepChainDir(chainID, datadir)
if err != nil {
......@@ -24,7 +40,7 @@ func prepChainDir(chainID types.ChainID, datadir string) (string, error) {
return dir, nil
}
func prepDataDir(datadir string) error {
func PrepDataDir(datadir string) error {
if err := os.MkdirAll(datadir, 0755); err != nil {
return fmt.Errorf("failed to create data directory %v: %w", datadir, err)
}
......
......@@ -13,10 +13,6 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type Metrics interface {
RecordDBDerivedEntryCount(count int64)
}
type EntryStore interface {
Size() int64
LastEntryIdx() entrydb.EntryIdx
......
package fromda
type Metrics interface {
RecordDBDerivedEntryCount(count int64)
}
type ChainMetrics interface {
RecordDBEntryCount(kind string, count int64)
}
type delegate struct {
inner ChainMetrics
kind string
}
func (d *delegate) RecordDBDerivedEntryCount(count int64) {
d.inner.RecordDBEntryCount(d.kind, count)
}
func AdaptMetrics(chainMetrics ChainMetrics, kind string) Metrics {
return &delegate{
kind: kind,
inner: chainMetrics,
}
}
......@@ -21,7 +21,7 @@ const (
)
type Metrics interface {
RecordDBEntryCount(count int64)
RecordDBEntryCount(kind string, count int64)
RecordDBSearchEntriesRead(count int64)
}
......@@ -122,7 +122,7 @@ func (db *DB) trimToLastSealed() error {
}
func (db *DB) updateEntryCountMetric() {
db.m.RecordDBEntryCount(db.store.Size())
db.m.RecordDBEntryCount("log", db.store.Size())
}
func (db *DB) IteratorStartingAt(sealedNum uint64, logsSince uint32) (Iterator, error) {
......@@ -295,7 +295,7 @@ func (db *DB) newIteratorAt(blockNum uint64, logIndex uint32) (*iterator, error)
}()
// First walk up to the block that we are sealed up to (incl.)
for {
if _, n, _ := iter.SealedBlock(); n == blockNum { // we may already have it exactly
if _, n, ok := iter.SealedBlock(); ok && n == blockNum { // we may already have it exactly
break
}
if err := iter.NextBlock(); errors.Is(err, entrydb.ErrFuture) {
......
......@@ -1133,7 +1133,7 @@ type stubMetrics struct {
entriesReadForSearch int64
}
func (s *stubMetrics) RecordDBEntryCount(count int64) {
func (s *stubMetrics) RecordDBEntryCount(kind string, count int64) {
s.entryCount = count
}
......
package db
import (
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/fromda"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
func OpenLogDB(logger log.Logger, chainID types.ChainID, dataDir string, m logs.Metrics) (*logs.DB, error) {
path, err := prepLogDBPath(chainID, dataDir)
if err != nil {
return nil, fmt.Errorf("failed to create datadir for chain %s: %w", chainID, err)
}
logDB, err := logs.NewFromFile(logger, m, path, true)
if err != nil {
return nil, fmt.Errorf("failed to create logdb for chain %s at %v: %w", chainID, path, err)
}
return logDB, nil
}
func OpenLocalDerivedFromDB(logger log.Logger, chainID types.ChainID, dataDir string, m fromda.ChainMetrics) (*fromda.DB, error) {
path, err := prepLocalDerivedFromDBPath(chainID, dataDir)
if err != nil {
return nil, fmt.Errorf("failed to prepare datadir for chain %s: %w", chainID, err)
}
db, err := fromda.NewFromFile(logger, fromda.AdaptMetrics(m, "local_derived"), path)
if err != nil {
return nil, fmt.Errorf("failed to create local-derived for chain %s at %q: %w", chainID, path, err)
}
return db, nil
}
func OpenCrossDerivedFromDB(logger log.Logger, chainID types.ChainID, dataDir string, m fromda.ChainMetrics) (*fromda.DB, error) {
path, err := prepCrossDerivedFromDBPath(chainID, dataDir)
if err != nil {
return nil, fmt.Errorf("failed to prepare datadir for chain %s: %w", chainID, err)
}
db, err := fromda.NewFromFile(logger, fromda.AdaptMetrics(m, "cross_derived"), path)
if err != nil {
return nil, fmt.Errorf("failed to create cross-derived for chain %s at %q: %w", chainID, path, err)
}
return db, nil
}
......@@ -60,61 +60,69 @@ func (db *ChainsDB) CrossUnsafe(chainID types.ChainID) (types.BlockSeal, error)
if !ok {
return types.BlockSeal{}, ErrUnknownChain
}
// Fall back to cross-safe if cross-unsafe is not known yet
if result == (types.BlockSeal{}) {
_, crossSafe, err := db.CrossSafe(chainID)
if err != nil {
return types.BlockSeal{}, fmt.Errorf("no cross-unsafe known for chain %s, and failed to fall back to cross-safe value: %w", chainID, err)
}
return crossSafe, nil
}
return result, nil
}
func (db *ChainsDB) LocalSafe(chainID types.ChainID) (derivedFrom eth.BlockRef, derived eth.BlockRef, err error) {
func (db *ChainsDB) LocalSafe(chainID types.ChainID) (derivedFrom types.BlockSeal, derived types.BlockSeal, err error) {
db.mu.RLock()
defer db.mu.RUnlock()
localDB, ok := db.localDBs[chainID]
if !ok {
return eth.BlockRef{}, eth.BlockRef{}, ErrUnknownChain
return types.BlockSeal{}, types.BlockSeal{}, ErrUnknownChain
}
return localDB.Last()
return localDB.Latest()
}
func (db *ChainsDB) CrossSafe(chainID types.ChainID) (derivedFrom eth.BlockRef, derived eth.BlockRef, err error) {
func (db *ChainsDB) CrossSafe(chainID types.ChainID) (derivedFrom types.BlockSeal, derived types.BlockSeal, err error) {
db.mu.RLock()
defer db.mu.RUnlock()
crossDB, ok := db.crossDBs[chainID]
if !ok {
return eth.BlockRef{}, eth.BlockRef{}, ErrUnknownChain
return types.BlockSeal{}, types.BlockSeal{}, ErrUnknownChain
}
return crossDB.Last()
return crossDB.Latest()
}
func (db *ChainsDB) Finalized(chainID types.ChainID) (eth.BlockID, error) {
func (db *ChainsDB) Finalized(chainID types.ChainID) (types.BlockSeal, error) {
db.mu.RLock()
defer db.mu.RUnlock()
finalizedL1 := db.finalizedL1
if finalizedL1 == (eth.L1BlockRef{}) {
return eth.BlockID{}, errors.New("no finalized L1 signal, cannot determine L2 finality yet")
return types.BlockSeal{}, errors.New("no finalized L1 signal, cannot determine L2 finality yet")
}
derived, err := db.LastDerivedFrom(chainID, finalizedL1.ID())
if err != nil {
return eth.BlockID{}, errors.New("could not find what was last derived from the finalized L1 block")
return types.BlockSeal{}, errors.New("could not find what was last derived from the finalized L1 block")
}
return derived, nil
}
func (db *ChainsDB) LastDerivedFrom(chainID types.ChainID, derivedFrom eth.BlockID) (derived eth.BlockID, err error) {
func (db *ChainsDB) LastDerivedFrom(chainID types.ChainID, derivedFrom eth.BlockID) (derived types.BlockSeal, err error) {
crossDB, ok := db.crossDBs[chainID]
if !ok {
return eth.BlockID{}, ErrUnknownChain
return types.BlockSeal{}, ErrUnknownChain
}
return crossDB.LastDerived(derivedFrom)
return crossDB.LastDerivedAt(derivedFrom)
}
func (db *ChainsDB) DerivedFrom(chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockID, err error) {
func (db *ChainsDB) DerivedFrom(chainID types.ChainID, derived eth.BlockID) (derivedFrom types.BlockSeal, err error) {
db.mu.RLock()
defer db.mu.RUnlock()
localDB, ok := db.localDBs[chainID]
if !ok {
return eth.BlockID{}, ErrUnknownChain
return types.BlockSeal{}, ErrUnknownChain
}
return localDB.DerivedFrom(derived)
}
......
......@@ -25,4 +25,11 @@ type DependencySet interface {
// This may return an error if the query temporarily cannot be answered.
// E.g. if the DependencySet is syncing new changes.
CanInitiateAt(chainID types.ChainID, initTimestamp uint64) (bool, error)
// Chains returns the list of chains that are part of the dependency set.
Chains() []types.ChainID
// HasChain determines if a chain is being tracked for interop purposes.
// See CanExecuteAt and CanInitiateAt to check if a chain may message at a given time.
HasChain(chainID types.ChainID) bool
}
......@@ -36,6 +36,12 @@ func TestDependencySet(t *testing.T) {
result, err := loader.LoadDependencySet(context.Background())
require.NoError(t, err)
chainIDs := result.Chains()
require.Equal(t, []types.ChainID{
types.ChainIDFromUInt64(900),
types.ChainIDFromUInt64(901),
}, chainIDs)
v, err := result.CanExecuteAt(types.ChainIDFromUInt64(900), 42)
require.NoError(t, err)
require.True(t, v)
......
......@@ -2,6 +2,9 @@ package depset
import (
"context"
"sort"
"golang.org/x/exp/maps"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
......@@ -46,3 +49,16 @@ func (ds *StaticConfigDependencySet) CanInitiateAt(chainID types.ChainID, initTi
}
return initTimestamp >= dep.HistoryMinTime, nil
}
func (ds *StaticConfigDependencySet) Chains() []types.ChainID {
out := maps.Keys(ds.Dependencies)
sort.Slice(out, func(i, j int) bool {
return out[i].Cmp(out[j]) < 0
})
return out
}
func (ds *StaticConfigDependencySet) HasChain(chainID types.ChainID) bool {
_, ok := ds.Dependencies[chainID]
return ok
}
......@@ -2,11 +2,13 @@ package processors
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
gethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
......@@ -15,6 +17,8 @@ import (
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
var ErrNoRPCSource = errors.New("no RPC client configured")
type Source interface {
L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, gethtypes.Receipts, error)
......@@ -38,8 +42,10 @@ func (fn BlockProcessorFn) ProcessBlock(ctx context.Context, block eth.BlockRef)
// ChainProcessor is a HeadProcessor that fills in any skipped blocks between head update events.
// It ensures that, absent reorgs, every block in the chain is processed even if some head advancements are skipped.
type ChainProcessor struct {
log log.Logger
client Source
log log.Logger
client Source
clientLock sync.Mutex
chain types.ChainID
......@@ -51,8 +57,6 @@ type ChainProcessor struct {
// channel with capacity of 1, full if there is work to do
newHead chan struct{}
// bool to indicate if calls are synchronous
synchronous bool
// channel with capacity of 1, to signal work complete if running in synchroneous mode
out chan struct{}
......@@ -62,27 +66,37 @@ type ChainProcessor struct {
wg sync.WaitGroup
}
func NewChainProcessor(log log.Logger, client Source, chain types.ChainID, processor LogProcessor, rewinder DatabaseRewinder) *ChainProcessor {
func NewChainProcessor(log log.Logger, chain types.ChainID, processor LogProcessor, rewinder DatabaseRewinder) *ChainProcessor {
ctx, cancel := context.WithCancel(context.Background())
out := &ChainProcessor{
log: log,
client: client,
log: log.New("chain", chain),
client: nil,
chain: chain,
processor: processor,
rewinder: rewinder,
newHead: make(chan struct{}, 1),
// default to synchronous because we want other processors to wait for this
// in the future we could make this async and have a separate mechanism which forwards the work signal to other processors
synchronous: true,
out: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
out: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
}
out.wg.Add(1)
go out.worker()
return out
}
func (s *ChainProcessor) SetSource(cl Source) {
s.clientLock.Lock()
defer s.clientLock.Unlock()
s.client = cl
}
func (s *ChainProcessor) StartBackground() {
s.wg.Add(1)
go s.worker()
}
func (s *ChainProcessor) ProcessToHead() {
s.work()
}
func (s *ChainProcessor) nextNum() uint64 {
headNum, ok := s.rewinder.LatestBlockNum(s.chain)
if !ok {
......@@ -106,11 +120,6 @@ func (s *ChainProcessor) worker() {
case <-s.newHead:
s.log.Debug("Responding to new head signal")
s.work()
// if this chain processor is synchronous, signal completion
// to be picked up by the caller (ChainProcessor.OnNewHead)
if s.synchronous {
s.out <- struct{}{}
}
case <-delay.C:
s.log.Debug("Checking for updates")
s.work()
......@@ -126,8 +135,14 @@ func (s *ChainProcessor) work() {
}
target := s.nextNum()
if err := s.update(target); err != nil {
s.log.Error("Failed to process new block", "err", err)
// idle until next update trigger
if errors.Is(err, ethereum.NotFound) {
s.log.Info("Cannot find next block yet", "target", target)
} else if errors.Is(err, ErrNoRPCSource) {
s.log.Warn("No RPC source configured, cannot process new blocks")
} else {
s.log.Error("Failed to process new block", "err", err)
// idle until next update trigger
}
} else if x := s.lastHead.Load(); target+1 <= x {
s.log.Debug("Continuing with next block", "newTarget", target+1, "lastHead", x)
continue // instantly continue processing, no need to idle
......@@ -139,6 +154,13 @@ func (s *ChainProcessor) work() {
}
func (s *ChainProcessor) update(nextNum uint64) error {
s.clientLock.Lock()
defer s.clientLock.Unlock()
if s.client == nil {
return ErrNoRPCSource
}
ctx, cancel := context.WithTimeout(s.ctx, time.Second*10)
nextL1, err := s.client.L1BlockRefByNumber(ctx, nextNum)
next := eth.BlockRef{
......@@ -185,10 +207,6 @@ func (s *ChainProcessor) OnNewHead(head eth.BlockRef) error {
default:
// already requested an update
}
// if we are running synchronously, wait for the work to complete
if s.synchronous {
<-s.out
}
return nil
}
......
......@@ -182,6 +182,10 @@ func (id *ChainID) UnmarshalText(data []byte) error {
return nil
}
func (id ChainID) Cmp(other ChainID) int {
return (*uint256.Int)(&id).Cmp((*uint256.Int)(&other))
}
type ReferenceView struct {
Local eth.BlockID `json:"local"`
Cross eth.BlockID `json:"cross"`
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment