Commit 87722247 authored by protolambda's avatar protolambda Committed by GitHub

Op-supervisor: dependency set improvements (#12623)

* op-supervisor: DB improvements for cross-safe updates
Co-authored-by: default avataraxelKingsley <axel.kingsley@gmail.com>
Co-authored-by: default avatarTyler Smith <mail@tcry.pt>

* op-supervisor: dependency-set improvements
Co-authored-by: default avataraxelKingsley <axel.kingsley@gmail.com>
Co-authored-by: default avatarTyler Smith <mail@tcry.pt>

---------
Co-authored-by: default avataraxelKingsley <axel.kingsley@gmail.com>
Co-authored-by: default avatarTyler Smith <mail@tcry.pt>
parent a71c4926
...@@ -438,18 +438,23 @@ func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService { ...@@ -438,18 +438,23 @@ func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService {
L2RPCs: []string{}, L2RPCs: []string{},
Datadir: path.Join(s.t.TempDir(), "supervisor"), Datadir: path.Join(s.t.TempDir(), "supervisor"),
} }
depSet := &depset.StaticConfigDependencySet{ depSet := make(map[supervisortypes.ChainID]*depset.StaticConfigDependency)
Dependencies: make(map[supervisortypes.ChainID]*depset.StaticConfigDependency),
}
// Iterate over the L2 chain configs. The L2 nodes don't exist yet. // Iterate over the L2 chain configs. The L2 nodes don't exist yet.
for _, l2Out := range s.worldOutput.L2s { for _, l2Out := range s.worldOutput.L2s {
chainID := supervisortypes.ChainIDFromBig(l2Out.Genesis.Config.ChainID) chainID := supervisortypes.ChainIDFromBig(l2Out.Genesis.Config.ChainID)
depSet.Dependencies[chainID] = &depset.StaticConfigDependency{ index, err := chainID.ToUInt32()
require.NoError(s.t, err)
depSet[chainID] = &depset.StaticConfigDependency{
ChainIndex: supervisortypes.ChainIndex(index),
ActivationTime: 0, ActivationTime: 0,
HistoryMinTime: 0, HistoryMinTime: 0,
} }
} }
cfg.DependencySetSource = depSet stDepSet, err := depset.NewStaticConfigDependencySet(depSet)
require.NoError(s.t, err)
cfg.DependencySetSource = stDepSet
// Create the supervisor with the configuration // Create the supervisor with the configuration
super, err := supervisor.SupervisorFromConfig(context.Background(), cfg, logger) super, err := supervisor.SupervisorFromConfig(context.Background(), cfg, logger)
require.NoError(s.t, err) require.NoError(s.t, err)
......
...@@ -56,13 +56,15 @@ func TestValidateRPCConfig(t *testing.T) { ...@@ -56,13 +56,15 @@ func TestValidateRPCConfig(t *testing.T) {
} }
func validConfig() *Config { func validConfig() *Config {
depSet := &depset.StaticConfigDependencySet{ depSet, err := depset.NewStaticConfigDependencySet(map[types.ChainID]*depset.StaticConfigDependency{
Dependencies: map[types.ChainID]*depset.StaticConfigDependency{ types.ChainIDFromUInt64(900): &depset.StaticConfigDependency{
types.ChainIDFromUInt64(900): &depset.StaticConfigDependency{ ChainIndex: 900,
ActivationTime: 0, ActivationTime: 0,
HistoryMinTime: 0, HistoryMinTime: 0,
},
}, },
})
if err != nil {
panic(err)
} }
// Should be valid using only the required arguments passed in via the constructor. // Should be valid using only the required arguments passed in via the constructor.
return NewConfig([]string{"http://localhost:8545"}, depSet, "./supervisor_testdir") return NewConfig([]string{"http://localhost:8545"}, depSet, "./supervisor_testdir")
......
...@@ -261,6 +261,13 @@ func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error { ...@@ -261,6 +261,13 @@ func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error {
return su.attachRPC(ctx, rpc) return su.attachRPC(ctx, rpc)
} }
// Internal methods, for processors
// ----------------------------
func (su *SupervisorBackend) DependencySet() depset.DependencySet {
return su.depSet
}
// Query methods // Query methods
// ---------------------------- // ----------------------------
......
...@@ -31,24 +31,27 @@ func TestBackendLifetime(t *testing.T) { ...@@ -31,24 +31,27 @@ func TestBackendLifetime(t *testing.T) {
dataDir := t.TempDir() dataDir := t.TempDir()
chainA := types.ChainIDFromUInt64(900) chainA := types.ChainIDFromUInt64(900)
chainB := types.ChainIDFromUInt64(901) chainB := types.ChainIDFromUInt64(901)
cfg := &config.Config{ depSet, err := depset.NewStaticConfigDependencySet(
Version: "test", map[types.ChainID]*depset.StaticConfigDependency{
LogConfig: oplog.CLIConfig{}, chainA: {
MetricsConfig: opmetrics.CLIConfig{}, ChainIndex: 900,
PprofConfig: oppprof.CLIConfig{}, ActivationTime: 42,
RPC: oprpc.CLIConfig{}, HistoryMinTime: 100,
DependencySetSource: &depset.StaticConfigDependencySet{ },
Dependencies: map[types.ChainID]*depset.StaticConfigDependency{ chainB: {
chainA: { ChainIndex: 901,
ActivationTime: 42, ActivationTime: 30,
HistoryMinTime: 100, HistoryMinTime: 20,
},
chainB: {
ActivationTime: 30,
HistoryMinTime: 20,
},
}, },
}, })
require.NoError(t, err)
cfg := &config.Config{
Version: "test",
LogConfig: oplog.CLIConfig{},
MetricsConfig: opmetrics.CLIConfig{},
PprofConfig: oppprof.CLIConfig{},
RPC: oprpc.CLIConfig{},
DependencySetSource: depSet,
SynchronousProcessors: true, SynchronousProcessors: true,
MockRun: false, MockRun: false,
L2RPCs: nil, L2RPCs: nil,
......
...@@ -32,4 +32,10 @@ type DependencySet interface { ...@@ -32,4 +32,10 @@ type DependencySet interface {
// HasChain determines if a chain is being tracked for interop purposes. // HasChain determines if a chain is being tracked for interop purposes.
// See CanExecuteAt and CanInitiateAt to check if a chain may message at a given time. // See CanExecuteAt and CanInitiateAt to check if a chain may message at a given time.
HasChain(chainID types.ChainID) bool HasChain(chainID types.ChainID) bool
// ChainIndexFromID converts a ChainID to a ChainIndex.
ChainIndexFromID(id types.ChainID) (types.ChainIndex, error)
// ChainIDFromIndex converts a ChainIndex to a ChainID.
ChainIDFromIndex(index types.ChainIndex) (types.ChainID, error)
} }
...@@ -15,18 +15,20 @@ import ( ...@@ -15,18 +15,20 @@ import (
func TestDependencySet(t *testing.T) { func TestDependencySet(t *testing.T) {
d := path.Join(t.TempDir(), "tmp_dep_set.json") d := path.Join(t.TempDir(), "tmp_dep_set.json")
depSet := &StaticConfigDependencySet{ depSet, err := NewStaticConfigDependencySet(
Dependencies: map[types.ChainID]*StaticConfigDependency{ map[types.ChainID]*StaticConfigDependency{
types.ChainIDFromUInt64(900): { types.ChainIDFromUInt64(900): {
ChainIndex: 900,
ActivationTime: 42, ActivationTime: 42,
HistoryMinTime: 100, HistoryMinTime: 100,
}, },
types.ChainIDFromUInt64(901): { types.ChainIDFromUInt64(901): {
ChainIndex: 901,
ActivationTime: 30, ActivationTime: 30,
HistoryMinTime: 20, HistoryMinTime: 20,
}, },
}, })
} require.NoError(t, err)
data, err := json.Marshal(depSet) data, err := json.Marshal(depSet)
require.NoError(t, err) require.NoError(t, err)
......
...@@ -2,14 +2,18 @@ package depset ...@@ -2,14 +2,18 @@ package depset
import ( import (
"context" "context"
"encoding/json"
"fmt"
"slices"
"sort" "sort"
"golang.org/x/exp/maps"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
) )
type StaticConfigDependency struct { type StaticConfigDependency struct {
// ChainIndex is the unique short identifier of this chain.
ChainIndex types.ChainIndex `json:"chainIndex"`
// ActivationTime is when the chain becomes part of the dependency set. // ActivationTime is when the chain becomes part of the dependency set.
// This is the minimum timestamp of the inclusion of an executing message. // This is the minimum timestamp of the inclusion of an executing message.
ActivationTime uint64 `json:"activationTime"` ActivationTime uint64 `json:"activationTime"`
...@@ -23,9 +27,62 @@ type StaticConfigDependency struct { ...@@ -23,9 +27,62 @@ type StaticConfigDependency struct {
// StaticConfigDependencySet statically declares a DependencySet. // StaticConfigDependencySet statically declares a DependencySet.
// It can be used as a DependencySetSource itself, by simply returning the itself when loading the set. // It can be used as a DependencySetSource itself, by simply returning the itself when loading the set.
type StaticConfigDependencySet struct { type StaticConfigDependencySet struct {
// dependency info per chain
dependencies map[types.ChainID]*StaticConfigDependency
// cached mapping of chain index to chain ID
indexToID map[types.ChainIndex]types.ChainID
// cached list of chain IDs, sorted by ID value
chainIDs []types.ChainID
}
func NewStaticConfigDependencySet(dependencies map[types.ChainID]*StaticConfigDependency) (*StaticConfigDependencySet, error) {
out := &StaticConfigDependencySet{dependencies: dependencies}
if err := out.hydrate(); err != nil {
return nil, err
}
return out, nil
}
// jsonStaticConfigDependencySet is a util for JSON encoding/decoding,
// to encode/decode just the attributes that matter,
// while wrapping the decoding functionality with additional hydration step.
type jsonStaticConfigDependencySet struct {
Dependencies map[types.ChainID]*StaticConfigDependency `json:"dependencies"` Dependencies map[types.ChainID]*StaticConfigDependency `json:"dependencies"`
} }
func (ds *StaticConfigDependencySet) MarshalJSON() ([]byte, error) {
out := &jsonStaticConfigDependencySet{
Dependencies: ds.dependencies,
}
return json.Marshal(out)
}
func (ds *StaticConfigDependencySet) UnmarshalJSON(data []byte) error {
var v jsonStaticConfigDependencySet
if err := json.Unmarshal(data, &v); err != nil {
return err
}
ds.dependencies = v.Dependencies
return ds.hydrate()
}
// hydrate sets all the cached values, based on the dependencies attribute
func (ds *StaticConfigDependencySet) hydrate() error {
ds.indexToID = make(map[types.ChainIndex]types.ChainID)
ds.chainIDs = make([]types.ChainID, 0, len(ds.dependencies))
for id, dep := range ds.dependencies {
if existing, ok := ds.indexToID[dep.ChainIndex]; ok {
return fmt.Errorf("chain %s cannot have the same index (%d) as chain %s", id, dep.ChainIndex, existing)
}
ds.indexToID[dep.ChainIndex] = id
ds.chainIDs = append(ds.chainIDs, id)
}
sort.Slice(ds.chainIDs, func(i, j int) bool {
return ds.chainIDs[i].Cmp(ds.chainIDs[j]) < 0
})
return nil
}
var _ DependencySetSource = (*StaticConfigDependencySet)(nil) var _ DependencySetSource = (*StaticConfigDependencySet)(nil)
var _ DependencySet = (*StaticConfigDependencySet)(nil) var _ DependencySet = (*StaticConfigDependencySet)(nil)
...@@ -35,7 +92,7 @@ func (ds *StaticConfigDependencySet) LoadDependencySet(ctx context.Context) (Dep ...@@ -35,7 +92,7 @@ func (ds *StaticConfigDependencySet) LoadDependencySet(ctx context.Context) (Dep
} }
func (ds *StaticConfigDependencySet) CanExecuteAt(chainID types.ChainID, execTimestamp uint64) (bool, error) { func (ds *StaticConfigDependencySet) CanExecuteAt(chainID types.ChainID, execTimestamp uint64) (bool, error) {
dep, ok := ds.Dependencies[chainID] dep, ok := ds.dependencies[chainID]
if !ok { if !ok {
return false, nil return false, nil
} }
...@@ -43,7 +100,7 @@ func (ds *StaticConfigDependencySet) CanExecuteAt(chainID types.ChainID, execTim ...@@ -43,7 +100,7 @@ func (ds *StaticConfigDependencySet) CanExecuteAt(chainID types.ChainID, execTim
} }
func (ds *StaticConfigDependencySet) CanInitiateAt(chainID types.ChainID, initTimestamp uint64) (bool, error) { func (ds *StaticConfigDependencySet) CanInitiateAt(chainID types.ChainID, initTimestamp uint64) (bool, error) {
dep, ok := ds.Dependencies[chainID] dep, ok := ds.dependencies[chainID]
if !ok { if !ok {
return false, nil return false, nil
} }
...@@ -51,14 +108,26 @@ func (ds *StaticConfigDependencySet) CanInitiateAt(chainID types.ChainID, initTi ...@@ -51,14 +108,26 @@ func (ds *StaticConfigDependencySet) CanInitiateAt(chainID types.ChainID, initTi
} }
func (ds *StaticConfigDependencySet) Chains() []types.ChainID { func (ds *StaticConfigDependencySet) Chains() []types.ChainID {
out := maps.Keys(ds.Dependencies) return slices.Clone(ds.chainIDs)
sort.Slice(out, func(i, j int) bool {
return out[i].Cmp(out[j]) < 0
})
return out
} }
func (ds *StaticConfigDependencySet) HasChain(chainID types.ChainID) bool { func (ds *StaticConfigDependencySet) HasChain(chainID types.ChainID) bool {
_, ok := ds.Dependencies[chainID] _, ok := ds.dependencies[chainID]
return ok return ok
} }
func (ds *StaticConfigDependencySet) ChainIndexFromID(id types.ChainID) (types.ChainIndex, error) {
dep, ok := ds.dependencies[id]
if !ok {
return 0, types.ErrUnknownChain
}
return dep.ChainIndex, nil
}
func (ds *StaticConfigDependencySet) ChainIDFromIndex(index types.ChainIndex) (types.ChainID, error) {
id, ok := ds.indexToID[index]
if !ok {
return types.ChainID{}, types.ErrUnknownChain
}
return id, nil
}
...@@ -22,6 +22,9 @@ import ( ...@@ -22,6 +22,9 @@ import (
) )
func TestSupervisorService(t *testing.T) { func TestSupervisorService(t *testing.T) {
depSet, err := depset.NewStaticConfigDependencySet(make(map[types.ChainID]*depset.StaticConfigDependency))
require.NoError(t, err)
cfg := &config.Config{ cfg := &config.Config{
Version: "", Version: "",
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
...@@ -47,10 +50,8 @@ func TestSupervisorService(t *testing.T) { ...@@ -47,10 +50,8 @@ func TestSupervisorService(t *testing.T) {
ListenPort: 0, // pick a port automatically ListenPort: 0, // pick a port automatically
EnableAdmin: true, EnableAdmin: true,
}, },
DependencySetSource: &depset.StaticConfigDependencySet{ DependencySetSource: depSet,
Dependencies: make(map[types.ChainID]*depset.StaticConfigDependency), MockRun: true,
},
MockRun: true,
} }
logger := testlog.Logger(t, log.LevelError) logger := testlog.Logger(t, log.LevelError)
supervisor, err := SupervisorFromConfig(context.Background(), cfg, logger) supervisor, err := SupervisorFromConfig(context.Background(), cfg, logger)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment