Commit 55b67e01 authored by protolambda's avatar protolambda Committed by GitHub

op-supervisor: initial service draft (#10703)

* op-supervisor: initial service draft

* supervisor: Fix log capitalization

* op-supervisor: fix mockrun flag

* op-supervisor: CLI tests

---------
Co-authored-by: default avatarAdrian Sutton <adrian@oplabs.co>
parent 19d7b721
package main
import (
"context"
"os"
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/log"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics/doc"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum-optimism/optimism/op-supervisor/flags"
"github.com/ethereum-optimism/optimism/op-supervisor/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor"
)
var (
Version = "v0.0.1"
GitCommit = ""
GitDate = ""
)
func main() {
ctx := opio.WithInterruptBlocker(context.Background())
err := run(ctx, os.Args, fromConfig)
if err != nil {
log.Crit("Application failed", "message", err)
}
}
func run(ctx context.Context, args []string, fn supervisor.MainFn) error {
oplog.SetupDefaults()
app := cli.NewApp()
app.Flags = cliapp.ProtectFlags(flags.Flags)
app.Version = opservice.FormatVersion(Version, GitCommit, GitDate, "")
app.Name = "op-supervisor"
app.Usage = "op-supervisor monitors cross-L2 interop messaging"
app.Description = "The op-supervisor monitors cross-L2 interop messaging by pre-fetching events and then resolving the cross-L2 dependencies to answer safety queries."
app.Action = cliapp.LifecycleCmd(supervisor.Main(Version, fn))
app.Commands = []*cli.Command{
{
Name: "doc",
Subcommands: doc.NewSubcommands(metrics.NewMetrics("default")),
},
}
return app.RunContext(ctx, args)
}
func fromConfig(ctx context.Context, cfg *supervisor.CLIConfig, logger log.Logger) (cliapp.Lifecycle, error) {
return supervisor.SupervisorFromCLIConfig(ctx, cfg, logger)
}
package main
import (
"context"
"errors"
"fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor"
)
func TestLogLevel(t *testing.T) {
t.Run("RejectInvalid", func(t *testing.T) {
verifyArgsInvalid(t, "unknown level: foo", addRequiredArgs("--log.level=foo"))
})
for _, lvl := range []string{"trace", "debug", "info", "error", "crit"} {
lvl := lvl
t.Run("AcceptValid_"+lvl, func(t *testing.T) {
logger, _, err := dryRunWithArgs(addRequiredArgs("--log.level", lvl))
require.NoError(t, err)
require.NotNil(t, logger)
})
}
}
func TestDefaultCLIOptionsMatchDefaultConfig(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs())
defaultCfgTempl := supervisor.DefaultCLIConfig()
defaultCfg := *defaultCfgTempl
defaultCfg.Version = Version
require.Equal(t, defaultCfg, *cfg)
}
func TestL2RPCs(t *testing.T) {
t.Run("Required", func(t *testing.T) {
verifyArgsInvalid(t, "flag l2-rpcs is required", addRequiredArgsExcept("--l2-rpcs"))
})
t.Run("Valid", func(t *testing.T) {
url1 := "http://example.com:1234"
url2 := "http://foobar.com:1234"
cfg := configForArgs(t, addRequiredArgsExcept("--l2-rpcs", "--l2-rpcs="+url1+","+url2))
require.Equal(t, []string{url1, url2}, cfg.L2RPCs)
})
}
func TestMockRun(t *testing.T) {
t.Run("Valid", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs("--mock-run"))
require.Equal(t, true, cfg.MockRun)
})
}
func verifyArgsInvalid(t *testing.T, messageContains string, cliArgs []string) {
_, _, err := dryRunWithArgs(cliArgs)
require.ErrorContains(t, err, messageContains)
}
func configForArgs(t *testing.T, cliArgs []string) *supervisor.CLIConfig {
_, cfg, err := dryRunWithArgs(cliArgs)
require.NoError(t, err)
return cfg
}
func dryRunWithArgs(cliArgs []string) (log.Logger, *supervisor.CLIConfig, error) {
cfg := new(supervisor.CLIConfig)
var logger log.Logger
fullArgs := append([]string{"op-supervisor"}, cliArgs...)
testErr := errors.New("dry-run")
err := run(context.Background(), fullArgs, func(ctx context.Context, config *supervisor.CLIConfig, log log.Logger) (cliapp.Lifecycle, error) {
logger = log
cfg = config
return nil, testErr
})
if errors.Is(err, testErr) { // expected error
err = nil
}
return logger, cfg, err
}
func addRequiredArgs(args ...string) []string {
req := requiredArgs()
combined := toArgList(req)
return append(combined, args...)
}
func addRequiredArgsExcept(name string, optionalArgs ...string) []string {
req := requiredArgs()
delete(req, name)
return append(toArgList(req), optionalArgs...)
}
func toArgList(req map[string]string) []string {
var combined []string
for name, value := range req {
combined = append(combined, fmt.Sprintf("%s=%s", name, value))
}
return combined
}
func requiredArgs() map[string]string {
args := map[string]string{
"--l2-rpcs": "http://localhost:8545",
}
return args
}
package flags
import (
"fmt"
"github.com/urfave/cli/v2"
opservice "github.com/ethereum-optimism/optimism/op-service"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
)
const EnvVarPrefix = "OP_SUPERVISOR"
func prefixEnvVars(name string) []string {
return opservice.PrefixEnvVar(EnvVarPrefix, name)
}
var (
L2RPCsFlag = &cli.StringSliceFlag{
Name: "l2-rpcs",
Usage: "L2 RPC sources.",
EnvVars: prefixEnvVars("L2_RPCS"),
Value: cli.NewStringSlice("http://localhost:8545"),
}
MockRunFlag = &cli.BoolFlag{
Name: "mock-run",
Usage: "Mock run, no actual backend used, just presenting the service",
EnvVars: prefixEnvVars("MOCK_RUN"),
Hidden: true, // this is for testing only
}
)
var requiredFlags = []cli.Flag{
L2RPCsFlag,
}
var optionalFlags = []cli.Flag{
MockRunFlag,
}
func init() {
optionalFlags = append(optionalFlags, oprpc.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, oplog.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, opmetrics.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, oppprof.CLIFlags(EnvVarPrefix)...)
Flags = append(Flags, requiredFlags...)
Flags = append(Flags, optionalFlags...)
}
// Flags contains the list of configuration options available to the binary.
var Flags []cli.Flag
func CheckRequired(ctx *cli.Context) error {
for _, f := range requiredFlags {
if !ctx.IsSet(f.Names()[0]) {
return fmt.Errorf("flag %s is required", f.Names()[0])
}
}
return nil
}
package flags
import (
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
opservice "github.com/ethereum-optimism/optimism/op-service"
)
// TestOptionalFlagsDontSetRequired asserts that all flags deemed optional set
// the Required field to false.
func TestOptionalFlagsDontSetRequired(t *testing.T) {
for _, flag := range optionalFlags {
reqFlag, ok := flag.(cli.RequiredFlag)
require.True(t, ok)
require.False(t, reqFlag.IsRequired())
}
}
// TestUniqueFlags asserts that all flag names are unique, to avoid accidental conflicts between the many flags.
func TestUniqueFlags(t *testing.T) {
seenCLI := make(map[string]struct{})
for _, flag := range Flags {
for _, name := range flag.Names() {
if _, ok := seenCLI[name]; ok {
t.Errorf("duplicate flag %s", name)
continue
}
seenCLI[name] = struct{}{}
}
}
}
// TestBetaFlags test that all flags starting with "beta." have "BETA_" in the env var, and vice versa.
func TestBetaFlags(t *testing.T) {
for _, flag := range Flags {
envFlag, ok := flag.(interface {
GetEnvVars() []string
})
if !ok || len(envFlag.GetEnvVars()) == 0 { // skip flags without env-var support
continue
}
name := flag.Names()[0]
envName := envFlag.GetEnvVars()[0]
if strings.HasPrefix(name, "beta.") {
require.Contains(t, envName, "BETA_", "%q flag must contain BETA in env var to match \"beta.\" flag name", name)
}
if strings.Contains(envName, "BETA_") {
require.True(t, strings.HasPrefix(name, "beta."), "%q flag must start with \"beta.\" in flag name to match \"BETA_\" env var", name)
}
}
}
func TestHasEnvVar(t *testing.T) {
for _, flag := range Flags {
flag := flag
flagName := flag.Names()[0]
t.Run(flagName, func(t *testing.T) {
envFlagGetter, ok := flag.(interface {
GetEnvVars() []string
})
envFlags := envFlagGetter.GetEnvVars()
require.True(t, ok, "must be able to cast the flag to an EnvVar interface")
require.Equal(t, 1, len(envFlags), "flags should have exactly one env var")
})
}
}
func TestEnvVarFormat(t *testing.T) {
for _, flag := range Flags {
flag := flag
flagName := flag.Names()[0]
t.Run(flagName, func(t *testing.T) {
envFlagGetter, ok := flag.(interface {
GetEnvVars() []string
})
envFlags := envFlagGetter.GetEnvVars()
require.True(t, ok, "must be able to cast the flag to an EnvVar interface")
require.Equal(t, 1, len(envFlags), "flags should have exactly one env var")
expectedEnvVar := opservice.FlagNameToEnvVarName(flagName, "OP_SUPERVISOR")
require.Equal(t, expectedEnvVar, envFlags[0])
})
}
}
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
)
const Namespace = "op_supervisor"
type Metricer interface {
RecordInfo(version string)
RecordUp()
opmetrics.RPCMetricer
Document() []opmetrics.DocumentedMetric
}
type Metrics struct {
ns string
registry *prometheus.Registry
factory opmetrics.Factory
opmetrics.RPCMetrics
info prometheus.GaugeVec
up prometheus.Gauge
}
var _ Metricer = (*Metrics)(nil)
// implements the Registry getter, for metrics HTTP server to hook into
var _ opmetrics.RegistryMetricer = (*Metrics)(nil)
func NewMetrics(procName string) *Metrics {
if procName == "" {
procName = "default"
}
ns := Namespace + "_" + procName
registry := opmetrics.NewRegistry()
factory := opmetrics.With(registry)
return &Metrics{
ns: ns,
registry: registry,
factory: factory,
RPCMetrics: opmetrics.MakeRPCMetrics(ns, factory),
info: *factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "info",
Help: "Pseudo-metric tracking version and config info",
}, []string{
"version",
}),
up: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "up",
Help: "1 if the op-supervisor has finished starting up",
}),
}
}
func (m *Metrics) Registry() *prometheus.Registry {
return m.registry
}
func (m *Metrics) Document() []opmetrics.DocumentedMetric {
return m.factory.Document()
}
// RecordInfo sets a pseudo-metric that contains versioning and config info for the op-supervisor.
func (m *Metrics) RecordInfo(version string) {
m.info.WithLabelValues(version).Set(1)
}
// RecordUp sets the up metric to 1.
func (m *Metrics) RecordUp() {
prometheus.MustRegister()
m.up.Set(1)
}
package metrics
import (
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
)
type noopMetrics struct {
opmetrics.NoopRPCMetrics
}
var NoopMetrics Metricer = new(noopMetrics)
func (*noopMetrics) Document() []opmetrics.DocumentedMetric { return nil }
func (*noopMetrics) RecordInfo(version string) {}
func (*noopMetrics) RecordUp() {}
package backend
import (
"context"
"errors"
"io"
"sync/atomic"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type SupervisorBackend struct {
started atomic.Bool
// TODO(protocol-quest#287): collection of logdbs per chain
// TODO(protocol-quest#288): collection of logdb updating services per chain
}
var _ frontend.Backend = (*SupervisorBackend)(nil)
var _ io.Closer = (*SupervisorBackend)(nil)
func NewSupervisorBackend() *SupervisorBackend {
return &SupervisorBackend{}
}
func (su *SupervisorBackend) Start(ctx context.Context) error {
if !su.started.CompareAndSwap(false, true) {
return errors.New("already started")
}
// TODO(protocol-quest#288): start logdb updating services of all chains
return nil
}
func (su *SupervisorBackend) Stop(ctx context.Context) error {
if !su.started.CompareAndSwap(true, false) {
return errors.New("already stopped")
}
// TODO(protocol-quest#288): stop logdb updating services of all chains
return nil
}
func (su *SupervisorBackend) Close() error {
// TODO(protocol-quest#288): close logdb of all chains
return nil
}
func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
// TODO(protocol-quest#288): hook up to logdb lookup
return types.CrossUnsafe, nil
}
func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error) {
// TODO(protocol-quest#288): hook up to logdb lookup
return types.CrossUnsafe, nil
}
package backend
type Config struct {
// TODO(protocol-quest#288): configure list of chains and their RPC endpoints / potential alternative data sources
}
package backend
import (
"context"
"errors"
"io"
"sync/atomic"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type MockBackend struct {
started atomic.Bool
}
var _ frontend.Backend = (*MockBackend)(nil)
var _ io.Closer = (*MockBackend)(nil)
func NewMockBackend() *MockBackend {
return &MockBackend{}
}
func (m *MockBackend) Start(ctx context.Context) error {
if !m.started.CompareAndSwap(false, true) {
return errors.New("already started")
}
return nil
}
func (m *MockBackend) Stop(ctx context.Context) error {
if !m.started.CompareAndSwap(true, false) {
return errors.New("already stopped")
}
return nil
}
func (m *MockBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
return types.CrossUnsafe, nil
}
func (m *MockBackend) CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error) {
return types.CrossUnsafe, nil
}
func (m *MockBackend) Close() error {
return nil
}
package supervisor
import (
"errors"
"github.com/urfave/cli/v2"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-supervisor/flags"
)
type CLIConfig struct {
Version string
LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig
PprofConfig oppprof.CLIConfig
RPC oprpc.CLIConfig
// MockRun runs the service with a mock backend
MockRun bool
L2RPCs []string
}
func CLIConfigFromCLI(ctx *cli.Context, version string) *CLIConfig {
return &CLIConfig{
Version: version,
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
MockRun: ctx.Bool(flags.MockRunFlag.Name),
L2RPCs: ctx.StringSlice(flags.L2RPCsFlag.Name),
}
}
func (c *CLIConfig) Check() error {
var result error
result = errors.Join(result, c.MetricsConfig.Check())
result = errors.Join(result, c.PprofConfig.Check())
result = errors.Join(result, c.RPC.Check())
return result
}
func DefaultCLIConfig() *CLIConfig {
return &CLIConfig{
Version: "",
LogConfig: oplog.DefaultCLIConfig(),
MetricsConfig: opmetrics.DefaultCLIConfig(),
PprofConfig: oppprof.DefaultCLIConfig(),
RPC: oprpc.DefaultCLIConfig(),
MockRun: false,
L2RPCs: flags.L2RPCsFlag.Value.Value(),
}
}
package supervisor
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestDefaultConfigIsValid(t *testing.T) {
cfg := DefaultCLIConfig()
require.NoError(t, cfg.Check())
}
package supervisor
import (
"context"
"fmt"
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/log"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-supervisor/flags"
)
type MainFn func(ctx context.Context, cfg *CLIConfig, logger log.Logger) (cliapp.Lifecycle, error)
// Main is the entrypoint into the Supervisor.
// This method returns a cliapp.LifecycleAction, to create an op-service CLI-lifecycle-managed supervisor with.
func Main(version string, fn MainFn) cliapp.LifecycleAction {
return func(cliCtx *cli.Context, closeApp context.CancelCauseFunc) (cliapp.Lifecycle, error) {
if err := flags.CheckRequired(cliCtx); err != nil {
return nil, err
}
cfg := CLIConfigFromCLI(cliCtx, version)
if err := cfg.Check(); err != nil {
return nil, fmt.Errorf("invalid CLI flags: %w", err)
}
l := oplog.NewLogger(oplog.AppOut(cliCtx), cfg.LogConfig)
oplog.SetGlobalLogHandler(l.Handler())
opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l)
l.Info("Initializing Supervisor")
return fn(cliCtx.Context, cfg, l)
}
}
package frontend
import (
"context"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
type AdminBackend interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
}
type QueryBackend interface {
CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error)
CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error)
}
type Backend interface {
AdminBackend
QueryBackend
}
type QueryFrontend struct {
Supervisor QueryBackend
}
// CheckMessage checks the safety-level of an individual message.
// The payloadHash references the hash of the message-payload of the message.
func (q *QueryFrontend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
return q.Supervisor.CheckMessage(identifier, payloadHash)
}
// CheckBlock checks the safety-level of an L2 block as a whole.
func (q *QueryFrontend) CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error) {
return q.Supervisor.CheckBlock(chainID, blockHash, blockNumber)
}
type AdminFrontend struct {
Supervisor Backend
}
// Start starts the service, if it was previously stopped.
func (a *AdminFrontend) Start(ctx context.Context) error {
return a.Supervisor.Start(ctx)
}
// Stop stops the service, if it was previously started.
func (a *AdminFrontend) Stop(ctx context.Context) error {
return a.Supervisor.Stop(ctx)
}
package supervisor
import (
"context"
"errors"
"fmt"
"io"
"sync/atomic"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-supervisor/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
)
type Backend interface {
frontend.Backend
io.Closer
}
// SupervisorService implements the full-environment bells and whistles around the Supervisor.
// This includes the setup and teardown of metrics, pprof, admin RPC, regular RPC etc.
type SupervisorService struct {
closing atomic.Bool
log log.Logger
metrics metrics.Metricer
backend Backend
pprofService *oppprof.Service
metricsSrv *httputil.HTTPServer
rpcServer *oprpc.Server
}
var _ cliapp.Lifecycle = (*SupervisorService)(nil)
func SupervisorFromCLIConfig(ctx context.Context, cfg *CLIConfig, logger log.Logger) (*SupervisorService, error) {
su := &SupervisorService{log: logger}
if err := su.initFromCLIConfig(ctx, cfg); err != nil {
return nil, errors.Join(err, su.Stop(ctx)) // try to clean up our failed initialization attempt
}
return su, nil
}
func (su *SupervisorService) initFromCLIConfig(ctx context.Context, cfg *CLIConfig) error {
su.initMetrics(cfg)
if err := su.initPProf(cfg); err != nil {
return fmt.Errorf("failed to start PProf server: %w", err)
}
if err := su.initMetricsServer(cfg); err != nil {
return fmt.Errorf("failed to start Metrics server: %w", err)
}
su.initBackend(cfg)
if err := su.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err)
}
return nil
}
func (su *SupervisorService) initBackend(cfg *CLIConfig) {
if cfg.MockRun {
su.backend = backend.NewMockBackend()
} else {
su.backend = backend.NewSupervisorBackend()
}
}
func (su *SupervisorService) initMetrics(cfg *CLIConfig) {
if cfg.MetricsConfig.Enabled {
procName := "default"
su.metrics = metrics.NewMetrics(procName)
su.metrics.RecordInfo(cfg.Version)
} else {
su.metrics = metrics.NoopMetrics
}
}
func (su *SupervisorService) initPProf(cfg *CLIConfig) error {
su.pprofService = oppprof.New(
cfg.PprofConfig.ListenEnabled,
cfg.PprofConfig.ListenAddr,
cfg.PprofConfig.ListenPort,
cfg.PprofConfig.ProfileType,
cfg.PprofConfig.ProfileDir,
cfg.PprofConfig.ProfileFilename,
)
if err := su.pprofService.Start(); err != nil {
return fmt.Errorf("failed to start pprof service: %w", err)
}
return nil
}
func (su *SupervisorService) initMetricsServer(cfg *CLIConfig) error {
if !cfg.MetricsConfig.Enabled {
su.log.Info("Metrics disabled")
return nil
}
m, ok := su.metrics.(opmetrics.RegistryMetricer)
if !ok {
return fmt.Errorf("metrics were enabled, but metricer %T does not expose registry for metrics-server", su.metrics)
}
su.log.Debug("Starting metrics server", "addr", cfg.MetricsConfig.ListenAddr, "port", cfg.MetricsConfig.ListenPort)
metricsSrv, err := opmetrics.StartServer(m.Registry(), cfg.MetricsConfig.ListenAddr, cfg.MetricsConfig.ListenPort)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
su.log.Info("Started metrics server", "addr", metricsSrv.Addr())
su.metricsSrv = metricsSrv
return nil
}
func (su *SupervisorService) initRPCServer(cfg *CLIConfig) error {
server := oprpc.NewServer(
cfg.RPC.ListenAddr,
cfg.RPC.ListenPort,
cfg.Version,
oprpc.WithLogger(su.log),
//oprpc.WithHTTPRecorder(su.metrics), // TODO(protocol-quest#286) hook up metrics to RPC server
)
if cfg.RPC.EnableAdmin {
su.log.Info("Admin RPC enabled")
server.AddAPI(rpc.API{
Namespace: "admin",
Service: &frontend.AdminFrontend{Supervisor: su.backend},
Authenticated: true, // TODO(protocol-quest#286): enforce auth on this or not?
})
}
server.AddAPI(rpc.API{
Namespace: "supervisor",
Service: &frontend.QueryFrontend{Supervisor: su.backend},
Authenticated: false,
})
su.rpcServer = server
return nil
}
func (su *SupervisorService) Start(ctx context.Context) error {
su.log.Info("Starting JSON-RPC server")
if err := su.rpcServer.Start(); err != nil {
return fmt.Errorf("unable to start RPC server: %w", err)
}
su.metrics.RecordUp()
return nil
}
func (su *SupervisorService) Stop(ctx context.Context) error {
if !su.closing.CompareAndSwap(false, true) {
return nil // already closing
}
var result error
if su.rpcServer != nil {
if err := su.rpcServer.Stop(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop RPC server: %w", err))
}
}
if su.backend != nil {
if err := su.backend.Close(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to close supervisor backend: %w", err))
}
}
if su.pprofService != nil {
if err := su.pprofService.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop PProf server: %w", err))
}
}
if su.metricsSrv != nil {
if err := su.metricsSrv.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop metrics server: %w", err))
}
}
return result
}
func (su *SupervisorService) Stopped() bool {
return su.closing.Load()
}
package supervisor
import (
"context"
"testing"
"time"
"github.com/holiman/uint256"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/dial"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/oppprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)
func TestSupervisorService(t *testing.T) {
cfg := &CLIConfig{
Version: "",
LogConfig: oplog.CLIConfig{
Level: log.LevelError,
Color: false,
Format: oplog.FormatLogFmt,
},
MetricsConfig: opmetrics.CLIConfig{
Enabled: true,
ListenAddr: "127.0.0.1",
ListenPort: 0, // pick a port automatically
},
PprofConfig: oppprof.CLIConfig{
ListenEnabled: true,
ListenAddr: "127.0.0.1",
ListenPort: 0, // pick a port automatically
ProfileType: "",
ProfileDir: "",
ProfileFilename: "",
},
RPC: oprpc.CLIConfig{
ListenAddr: "127.0.0.1",
ListenPort: 0, // pick a port automatically
EnableAdmin: true,
},
MockRun: true,
}
logger := testlog.Logger(t, log.LevelError)
supervisor, err := SupervisorFromCLIConfig(context.Background(), cfg, logger)
require.NoError(t, err)
require.NoError(t, supervisor.Start(context.Background()), "start service")
// run some RPC tests against the service with the mock backend
{
endpoint := "http://" + supervisor.rpcServer.Endpoint()
t.Logf("dialing %s", endpoint)
cl, err := dial.DialRPCClientWithTimeout(context.Background(), time.Second*5, logger, endpoint)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
var dest types.SafetyLevel
err = cl.CallContext(ctx, &dest, "supervisor_checkBlock",
(*hexutil.U256)(uint256.NewInt(1)), common.Hash{0xab}, hexutil.Uint64(123))
cancel()
require.NoError(t, err)
require.Equal(t, types.CrossUnsafe, dest, "expecting mock to return cross-unsafe")
cl.Close()
}
require.NoError(t, supervisor.Stop(context.Background()), "stop service")
}
package types
import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
)
type Identifier struct {
Origin common.Address `json:"origin"`
BlockNumber hexutil.Uint64 `json:"blockNumber"`
LogIndex hexutil.Uint64 `json:"logIndex"`
Timestamp hexutil.Uint64 `json:"timestamp"`
ChainID hexutil.U256 `json:"chainID"`
}
type SafetyLevel string
func (lvl SafetyLevel) String() string {
return string(lvl)
}
func (lvl SafetyLevel) Valid() bool {
switch lvl {
case Finalized, Safe, CrossUnsafe, Unsafe:
return true
default:
return false
}
}
func (lvl SafetyLevel) MarshalText() ([]byte, error) {
return []byte(lvl), nil
}
func (lvl *SafetyLevel) UnmarshalText(text []byte) error {
if lvl == nil {
return errors.New("cannot unmarshal into nil SafetyLevel")
}
x := SafetyLevel(text)
if !x.Valid() {
return fmt.Errorf("unrecognized safety level: %q", text)
}
*lvl = x
return nil
}
const (
Finalized SafetyLevel = "finalized"
Safe SafetyLevel = "safe"
CrossUnsafe SafetyLevel = "cross-unsafe"
Unsafe SafetyLevel = "unsafe"
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment