service.go 6.74 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
package supervisor

import (
	"context"
	"errors"
	"fmt"
	"sync/atomic"

	"github.com/ethereum/go-ethereum/log"
	"github.com/ethereum/go-ethereum/rpc"

	"github.com/ethereum-optimism/optimism/op-service/cliapp"
	"github.com/ethereum-optimism/optimism/op-service/httputil"
	opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
	"github.com/ethereum-optimism/optimism/op-service/oppprof"
	oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
17
	"github.com/ethereum-optimism/optimism/op-supervisor/config"
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
	"github.com/ethereum-optimism/optimism/op-supervisor/metrics"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
)

type Backend interface {
	frontend.Backend
}

// SupervisorService implements the full-environment bells and whistles around the Supervisor.
// This includes the setup and teardown of metrics, pprof, admin RPC, regular RPC etc.
type SupervisorService struct {
	closing atomic.Bool

	log log.Logger

	metrics metrics.Metricer

	backend Backend

	pprofService *oppprof.Service
	metricsSrv   *httputil.HTTPServer
	rpcServer    *oprpc.Server
}

var _ cliapp.Lifecycle = (*SupervisorService)(nil)

45
func SupervisorFromConfig(ctx context.Context, cfg *config.Config, logger log.Logger) (*SupervisorService, error) {
46 47 48 49 50 51 52
	su := &SupervisorService{log: logger}
	if err := su.initFromCLIConfig(ctx, cfg); err != nil {
		return nil, errors.Join(err, su.Stop(ctx)) // try to clean up our failed initialization attempt
	}
	return su, nil
}

53
func (su *SupervisorService) initFromCLIConfig(ctx context.Context, cfg *config.Config) error {
54 55 56 57 58 59 60
	su.initMetrics(cfg)
	if err := su.initPProf(cfg); err != nil {
		return fmt.Errorf("failed to start PProf server: %w", err)
	}
	if err := su.initMetricsServer(cfg); err != nil {
		return fmt.Errorf("failed to start Metrics server: %w", err)
	}
61 62 63
	if err := su.initBackend(ctx, cfg); err != nil {
		return fmt.Errorf("failed to start backend: %w", err)
	}
64 65 66 67 68 69
	if err := su.initRPCServer(cfg); err != nil {
		return fmt.Errorf("failed to start RPC server: %w", err)
	}
	return nil
}

70
func (su *SupervisorService) initBackend(ctx context.Context, cfg *config.Config) error {
71 72
	if cfg.MockRun {
		su.backend = backend.NewMockBackend()
73 74
		return nil
	}
75 76 77 78 79 80 81 82 83 84 85
	// the flag is a string slice, which has the potential to have empty strings
	filterBlank := func(in []string) []string {
		out := make([]string, 0, len(in))
		for _, s := range in {
			if s != "" {
				out = append(out, s)
			}
		}
		return out
	}
	cfg.L2RPCs = filterBlank(cfg.L2RPCs)
86 87 88
	be, err := backend.NewSupervisorBackend(ctx, su.log, su.metrics, cfg)
	if err != nil {
		return fmt.Errorf("failed to create supervisor backend: %w", err)
89
	}
90 91
	su.backend = be
	return nil
92 93
}

94
func (su *SupervisorService) initMetrics(cfg *config.Config) {
95 96 97 98 99 100 101 102 103
	if cfg.MetricsConfig.Enabled {
		procName := "default"
		su.metrics = metrics.NewMetrics(procName)
		su.metrics.RecordInfo(cfg.Version)
	} else {
		su.metrics = metrics.NoopMetrics
	}
}

104
func (su *SupervisorService) initPProf(cfg *config.Config) error {
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
	su.pprofService = oppprof.New(
		cfg.PprofConfig.ListenEnabled,
		cfg.PprofConfig.ListenAddr,
		cfg.PprofConfig.ListenPort,
		cfg.PprofConfig.ProfileType,
		cfg.PprofConfig.ProfileDir,
		cfg.PprofConfig.ProfileFilename,
	)

	if err := su.pprofService.Start(); err != nil {
		return fmt.Errorf("failed to start pprof service: %w", err)
	}

	return nil
}

121
func (su *SupervisorService) initMetricsServer(cfg *config.Config) error {
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
	if !cfg.MetricsConfig.Enabled {
		su.log.Info("Metrics disabled")
		return nil
	}
	m, ok := su.metrics.(opmetrics.RegistryMetricer)
	if !ok {
		return fmt.Errorf("metrics were enabled, but metricer %T does not expose registry for metrics-server", su.metrics)
	}
	su.log.Debug("Starting metrics server", "addr", cfg.MetricsConfig.ListenAddr, "port", cfg.MetricsConfig.ListenPort)
	metricsSrv, err := opmetrics.StartServer(m.Registry(), cfg.MetricsConfig.ListenAddr, cfg.MetricsConfig.ListenPort)
	if err != nil {
		return fmt.Errorf("failed to start metrics server: %w", err)
	}
	su.log.Info("Started metrics server", "addr", metricsSrv.Addr())
	su.metricsSrv = metricsSrv
	return nil
}

140
func (su *SupervisorService) initRPCServer(cfg *config.Config) error {
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
	server := oprpc.NewServer(
		cfg.RPC.ListenAddr,
		cfg.RPC.ListenPort,
		cfg.Version,
		oprpc.WithLogger(su.log),
		//oprpc.WithHTTPRecorder(su.metrics), // TODO(protocol-quest#286) hook up metrics to RPC server
	)
	if cfg.RPC.EnableAdmin {
		su.log.Info("Admin RPC enabled")
		server.AddAPI(rpc.API{
			Namespace:     "admin",
			Service:       &frontend.AdminFrontend{Supervisor: su.backend},
			Authenticated: true, // TODO(protocol-quest#286): enforce auth on this or not?
		})
	}
	server.AddAPI(rpc.API{
		Namespace:     "supervisor",
		Service:       &frontend.QueryFrontend{Supervisor: su.backend},
		Authenticated: false,
	})
161 162 163 164 165
	server.AddAPI(rpc.API{
		Namespace:     "supervisor",
		Service:       &frontend.UpdatesFrontend{Supervisor: su.backend},
		Authenticated: false,
	})
166 167 168 169 170 171 172 173 174 175
	su.rpcServer = server
	return nil
}

func (su *SupervisorService) Start(ctx context.Context) error {
	su.log.Info("Starting JSON-RPC server")
	if err := su.rpcServer.Start(); err != nil {
		return fmt.Errorf("unable to start RPC server: %w", err)
	}

176 177 178 179
	if err := su.backend.Start(ctx); err != nil {
		return fmt.Errorf("unable to start backend: %w", err)
	}

180
	su.metrics.RecordUp()
181
	su.log.Info("JSON-RPC Server started", "endpoint", su.rpcServer.Endpoint())
182 183 184 185 186
	return nil
}

func (su *SupervisorService) Stop(ctx context.Context) error {
	if !su.closing.CompareAndSwap(false, true) {
187
		su.log.Warn("Supervisor is already closing")
188 189
		return nil // already closing
	}
190
	su.log.Info("Stopping JSON-RPC server")
191 192 193 194 195 196
	var result error
	if su.rpcServer != nil {
		if err := su.rpcServer.Stop(); err != nil {
			result = errors.Join(result, fmt.Errorf("failed to stop RPC server: %w", err))
		}
	}
197
	su.log.Info("Stopped RPC Server")
198
	if su.backend != nil {
199
		if err := su.backend.Stop(ctx); err != nil {
200 201 202
			result = errors.Join(result, fmt.Errorf("failed to close supervisor backend: %w", err))
		}
	}
203
	su.log.Info("Stopped Backend")
204 205 206 207 208
	if su.pprofService != nil {
		if err := su.pprofService.Stop(ctx); err != nil {
			result = errors.Join(result, fmt.Errorf("failed to stop PProf server: %w", err))
		}
	}
209
	su.log.Info("Stopped PProf")
210 211 212 213 214
	if su.metricsSrv != nil {
		if err := su.metricsSrv.Stop(ctx); err != nil {
			result = errors.Join(result, fmt.Errorf("failed to stop metrics server: %w", err))
		}
	}
215
	su.log.Info("JSON-RPC server stopped")
216 217 218 219 220 221
	return result
}

func (su *SupervisorService) Stopped() bool {
	return su.closing.Load()
}
222 223 224 225 226 227

func (su *SupervisorService) RPC() string {
	// the RPC endpoint is assumed to be HTTP
	// TODO(#11032): make this flexible for ws if the server supports it
	return "http://" + su.rpcServer.Endpoint()
}