service.go 6.19 KB
Newer Older
1 2 3 4 5 6 7 8 9
package supervisor

import (
	"context"
	"errors"
	"fmt"
	"io"
	"sync/atomic"

10
	"github.com/ethereum-optimism/optimism/op-supervisor/config"
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
	"github.com/ethereum/go-ethereum/log"
	"github.com/ethereum/go-ethereum/rpc"

	"github.com/ethereum-optimism/optimism/op-service/cliapp"
	"github.com/ethereum-optimism/optimism/op-service/httputil"
	opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
	"github.com/ethereum-optimism/optimism/op-service/oppprof"
	oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
	"github.com/ethereum-optimism/optimism/op-supervisor/metrics"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
)

type Backend interface {
	frontend.Backend
	io.Closer
}

// SupervisorService implements the full-environment bells and whistles around the Supervisor.
// This includes the setup and teardown of metrics, pprof, admin RPC, regular RPC etc.
type SupervisorService struct {
	closing atomic.Bool

	log log.Logger

	metrics metrics.Metricer

	backend Backend

	pprofService *oppprof.Service
	metricsSrv   *httputil.HTTPServer
	rpcServer    *oprpc.Server
}

var _ cliapp.Lifecycle = (*SupervisorService)(nil)

47
func SupervisorFromConfig(ctx context.Context, cfg *config.Config, logger log.Logger) (*SupervisorService, error) {
48 49 50 51 52 53 54
	su := &SupervisorService{log: logger}
	if err := su.initFromCLIConfig(ctx, cfg); err != nil {
		return nil, errors.Join(err, su.Stop(ctx)) // try to clean up our failed initialization attempt
	}
	return su, nil
}

55
func (su *SupervisorService) initFromCLIConfig(ctx context.Context, cfg *config.Config) error {
56 57 58 59 60 61 62
	su.initMetrics(cfg)
	if err := su.initPProf(cfg); err != nil {
		return fmt.Errorf("failed to start PProf server: %w", err)
	}
	if err := su.initMetricsServer(cfg); err != nil {
		return fmt.Errorf("failed to start Metrics server: %w", err)
	}
63 64 65
	if err := su.initBackend(ctx, cfg); err != nil {
		return fmt.Errorf("failed to start backend: %w", err)
	}
66 67 68 69 70 71
	if err := su.initRPCServer(cfg); err != nil {
		return fmt.Errorf("failed to start RPC server: %w", err)
	}
	return nil
}

72
func (su *SupervisorService) initBackend(ctx context.Context, cfg *config.Config) error {
73 74
	if cfg.MockRun {
		su.backend = backend.NewMockBackend()
75 76 77 78 79
		return nil
	}
	be, err := backend.NewSupervisorBackend(ctx, su.log, su.metrics, cfg)
	if err != nil {
		return fmt.Errorf("failed to create supervisor backend: %w", err)
80
	}
81 82
	su.backend = be
	return nil
83 84
}

85
func (su *SupervisorService) initMetrics(cfg *config.Config) {
86 87 88 89 90 91 92 93 94
	if cfg.MetricsConfig.Enabled {
		procName := "default"
		su.metrics = metrics.NewMetrics(procName)
		su.metrics.RecordInfo(cfg.Version)
	} else {
		su.metrics = metrics.NoopMetrics
	}
}

95
func (su *SupervisorService) initPProf(cfg *config.Config) error {
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
	su.pprofService = oppprof.New(
		cfg.PprofConfig.ListenEnabled,
		cfg.PprofConfig.ListenAddr,
		cfg.PprofConfig.ListenPort,
		cfg.PprofConfig.ProfileType,
		cfg.PprofConfig.ProfileDir,
		cfg.PprofConfig.ProfileFilename,
	)

	if err := su.pprofService.Start(); err != nil {
		return fmt.Errorf("failed to start pprof service: %w", err)
	}

	return nil
}

112
func (su *SupervisorService) initMetricsServer(cfg *config.Config) error {
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
	if !cfg.MetricsConfig.Enabled {
		su.log.Info("Metrics disabled")
		return nil
	}
	m, ok := su.metrics.(opmetrics.RegistryMetricer)
	if !ok {
		return fmt.Errorf("metrics were enabled, but metricer %T does not expose registry for metrics-server", su.metrics)
	}
	su.log.Debug("Starting metrics server", "addr", cfg.MetricsConfig.ListenAddr, "port", cfg.MetricsConfig.ListenPort)
	metricsSrv, err := opmetrics.StartServer(m.Registry(), cfg.MetricsConfig.ListenAddr, cfg.MetricsConfig.ListenPort)
	if err != nil {
		return fmt.Errorf("failed to start metrics server: %w", err)
	}
	su.log.Info("Started metrics server", "addr", metricsSrv.Addr())
	su.metricsSrv = metricsSrv
	return nil
}

131
func (su *SupervisorService) initRPCServer(cfg *config.Config) error {
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
	server := oprpc.NewServer(
		cfg.RPC.ListenAddr,
		cfg.RPC.ListenPort,
		cfg.Version,
		oprpc.WithLogger(su.log),
		//oprpc.WithHTTPRecorder(su.metrics), // TODO(protocol-quest#286) hook up metrics to RPC server
	)
	if cfg.RPC.EnableAdmin {
		su.log.Info("Admin RPC enabled")
		server.AddAPI(rpc.API{
			Namespace:     "admin",
			Service:       &frontend.AdminFrontend{Supervisor: su.backend},
			Authenticated: true, // TODO(protocol-quest#286): enforce auth on this or not?
		})
	}
	server.AddAPI(rpc.API{
		Namespace:     "supervisor",
		Service:       &frontend.QueryFrontend{Supervisor: su.backend},
		Authenticated: false,
	})
	su.rpcServer = server
	return nil
}

func (su *SupervisorService) Start(ctx context.Context) error {
	su.log.Info("Starting JSON-RPC server")
	if err := su.rpcServer.Start(); err != nil {
		return fmt.Errorf("unable to start RPC server: %w", err)
	}

162 163 164 165
	if err := su.backend.Start(ctx); err != nil {
		return fmt.Errorf("unable to start backend: %w", err)
	}

166
	su.metrics.RecordUp()
167
	su.log.Info("JSON-RPC Server started", "endpoint", su.rpcServer.Endpoint())
168 169 170 171 172 173 174
	return nil
}

func (su *SupervisorService) Stop(ctx context.Context) error {
	if !su.closing.CompareAndSwap(false, true) {
		return nil // already closing
	}
175
	su.log.Info("Stopping JSON-RPC server")
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
	var result error
	if su.rpcServer != nil {
		if err := su.rpcServer.Stop(); err != nil {
			result = errors.Join(result, fmt.Errorf("failed to stop RPC server: %w", err))
		}
	}
	if su.backend != nil {
		if err := su.backend.Close(); err != nil {
			result = errors.Join(result, fmt.Errorf("failed to close supervisor backend: %w", err))
		}
	}
	if su.pprofService != nil {
		if err := su.pprofService.Stop(ctx); err != nil {
			result = errors.Join(result, fmt.Errorf("failed to stop PProf server: %w", err))
		}
	}
	if su.metricsSrv != nil {
		if err := su.metricsSrv.Stop(ctx); err != nil {
			result = errors.Join(result, fmt.Errorf("failed to stop metrics server: %w", err))
		}
	}
197
	su.log.Info("JSON-RPC server stopped")
198 199 200 201 202 203
	return result
}

func (su *SupervisorService) Stopped() bool {
	return su.closing.Load()
}
204 205 206 207 208 209

func (su *SupervisorService) RPC() string {
	// the RPC endpoint is assumed to be HTTP
	// TODO(#11032): make this flexible for ws if the server supports it
	return "http://" + su.rpcServer.Endpoint()
}