Commit 246d7dbe authored by protolambda's avatar protolambda

op-service: http-server util, Close explicitly to wait for resource cleanup before shutdown

parent a9f62bb3
......@@ -6,15 +6,17 @@ import (
"strings"
"time"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
var (
......@@ -40,16 +42,18 @@ func Main(version string) func(cliCtx *cli.Context) error {
l.Info(fmt.Sprintf("starting endpoint monitor with checkInterval=%s checkDuration=%s", cfg.CheckInterval, cfg.CheckDuration))
endpointMonitor.Start()
ctx := context.Background()
registry := opmetrics.NewRegistry()
registry.MustRegister(MetricWsSubscribeStatus)
metricsCfg := cfg.MetricsConfig
l.Info("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
if err := opmetrics.ListenAndServe(ctx, registry, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil {
srv, err := opmetrics.StartServer(registry, metricsCfg.ListenAddr, metricsCfg.ListenPort)
if err != nil {
l.Error("error starting metrics server", err)
return err
}
defer srv.Close()
opio.BlockOnInterrupts()
return nil
}
......
......@@ -150,11 +150,11 @@ func (a *API) startServer(ctx context.Context) error {
// startMetricsServer ... Starts the metrics server
func (a *API) startMetricsServer(ctx context.Context) error {
a.log.Info("starting metrics server...", "port", a.metricsConfig.Port)
err := metrics.ListenAndServe(ctx, a.metricsRegistry, a.metricsConfig.Host, a.metricsConfig.Port)
srv, err := metrics.StartServer(a.metricsRegistry, a.metricsConfig.Host, a.metricsConfig.Port)
if err != nil {
a.log.Error("metrics server stopped", "err", err)
} else {
a.log.Info("metrics server stopped")
return fmt.Errorf("failed to start metrics server: %w", err)
}
return err
<-ctx.Done()
defer a.log.Info("metrics server stopped")
return srv.Close()
}
......@@ -4,8 +4,9 @@ import (
"context"
"fmt"
"math/big"
"net/http"
"net"
"runtime/debug"
"strconv"
"sync"
"github.com/ethereum/go-ethereum/log"
......@@ -110,27 +111,27 @@ func (i *Indexer) startHttpServer(ctx context.Context) error {
r := chi.NewRouter()
r.Use(middleware.Heartbeat("/healthz"))
server := http.Server{Addr: fmt.Sprintf("%s:%d", i.httpConfig.Host, i.httpConfig.Port), Handler: r}
err := httputil.ListenAndServeContext(ctx, &server)
addr := net.JoinHostPort(i.httpConfig.Host, strconv.Itoa(i.httpConfig.Port))
srv, err := httputil.StartHTTPServer(addr, r)
if err != nil {
i.log.Error("http server stopped", "err", err)
} else {
i.log.Info("http server stopped")
return fmt.Errorf("http server failed to start: %w", err)
}
return err
i.log.Info("http server started", "addr", srv.Addr())
<-ctx.Done()
defer i.log.Info("http server stopped")
return srv.Close()
}
func (i *Indexer) startMetricsServer(ctx context.Context) error {
i.log.Info("starting metrics server...", "port", i.metricsConfig.Port)
err := metrics.ListenAndServe(ctx, i.metricsRegistry, i.metricsConfig.Host, i.metricsConfig.Port)
srv, err := metrics.StartServer(i.metricsRegistry, i.metricsConfig.Host, i.metricsConfig.Port)
if err != nil {
i.log.Error("metrics server stopped", "err", err)
} else {
i.log.Info("metrics server stopped")
return fmt.Errorf("metrics server failed to start: %w", err)
}
return err
i.log.Info("metrics server started", "addr", srv.Addr())
<-ctx.Done()
defer i.log.Info("metrics server stopped")
return srv.Close()
}
// Start starts the indexing service on L1 and L2 chains
......
......@@ -50,28 +50,31 @@ func Main(version string, cliCtx *cli.Context) error {
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Stop pprof and metrics only after main loop returns
defer batchSubmitter.StopIfRunning(context.Background())
pprofConfig := cfg.PprofConfig
if pprofConfig.Enabled {
l.Info("starting pprof", "addr", pprofConfig.ListenAddr, "port", pprofConfig.ListenPort)
go func() {
if err := oppprof.ListenAndServe(ctx, pprofConfig.ListenAddr, pprofConfig.ListenPort); err != nil {
l.Error("error starting pprof", "err", err)
}
}()
l.Debug("starting pprof", "addr", pprofConfig.ListenAddr, "port", pprofConfig.ListenPort)
pprofSrv, err := oppprof.StartServer(pprofConfig.ListenAddr, pprofConfig.ListenPort)
if err != nil {
l.Error("failed to start pprof server", "err", err)
return err
}
l.Info("started pprof server", "addr", pprofSrv.Addr())
defer pprofSrv.Close()
}
metricsCfg := cfg.MetricsConfig
if metricsCfg.Enabled {
l.Info("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
go func() {
if err := m.Serve(ctx, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil {
l.Error("error starting metrics server", "err", err)
}
}()
l.Debug("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
metricsSrv, err := m.Start(metricsCfg.ListenAddr, metricsCfg.ListenPort)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
l.Info("started metrics server", "addr", metricsSrv.Addr())
defer metricsSrv.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m.StartBalanceMetrics(ctx, l, batchSubmitter.L1Client, batchSubmitter.TxManager.From())
}
......@@ -87,7 +90,6 @@ func Main(version string, cliCtx *cli.Context) error {
l.Info("Admin RPC enabled")
}
if err := server.Start(); err != nil {
cancel()
return fmt.Errorf("error starting RPC server: %w", err)
}
......
......@@ -3,6 +3,8 @@ package metrics
import (
"context"
"github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
......@@ -178,8 +180,8 @@ func NewMetrics(procName string) *Metrics {
}
}
func (m *Metrics) Serve(ctx context.Context, host string, port int) error {
return opmetrics.ListenAndServe(ctx, m.registry, host, port)
func (m *Metrics) Start(host string, port int) (*httputil.HTTPServer, error) {
return opmetrics.StartServer(m.registry, host, port)
}
func (m *Metrics) Document() []opmetrics.DocumentedMetric {
......
......@@ -73,12 +73,13 @@ func Main(cliCtx *cli.Context) error {
metricsCfg := opmetrics.ReadCLIConfig(cliCtx)
if metricsCfg.Enabled {
log.Info("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
go func() {
if err := m.Serve(ctx, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil {
log.Error("error starting metrics server", err)
}
}()
log.Debug("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
metricsSrv, err := m.StartServer(metricsCfg.ListenAddr, metricsCfg.ListenPort)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
defer metricsSrv.Close()
log.Info("started metrics server", "addr", metricsSrv.Addr())
m.RecordUp()
}
......
......@@ -2,6 +2,7 @@ package game
import (
"context"
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
......@@ -13,6 +14,7 @@ import (
opClient "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/httputil"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
......@@ -24,6 +26,20 @@ type Service struct {
metrics metrics.Metricer
monitor *gameMonitor
sched *scheduler.Scheduler
pprofSrv *httputil.HTTPServer
metricsSrv *httputil.HTTPServer
}
func (s *Service) Close() error {
var result error
if s.pprofSrv != nil {
result = errors.Join(result, s.pprofSrv.Close())
}
if s.metricsSrv != nil {
result = errors.Join(result, s.metricsSrv.Close())
}
return result
}
// NewService creates a new Service.
......@@ -40,35 +56,42 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se
return nil, fmt.Errorf("failed to dial L1: %w", err)
}
s := &Service{
logger: logger,
metrics: m,
}
pprofConfig := cfg.PprofConfig
if pprofConfig.Enabled {
logger.Info("starting pprof", "addr", pprofConfig.ListenAddr, "port", pprofConfig.ListenPort)
go func() {
if err := oppprof.ListenAndServe(ctx, pprofConfig.ListenAddr, pprofConfig.ListenPort); err != nil {
logger.Error("error starting pprof", "err", err)
}
}()
logger.Debug("starting pprof", "addr", pprofConfig.ListenAddr, "port", pprofConfig.ListenPort)
pprofSrv, err := oppprof.StartServer(pprofConfig.ListenAddr, pprofConfig.ListenPort)
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to start pprof server: %w", err), s.Close())
}
s.pprofSrv = pprofSrv
logger.Info("started pprof server", "addr", pprofSrv.Addr())
}
metricsCfg := cfg.MetricsConfig
if metricsCfg.Enabled {
logger.Info("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
go func() {
if err := m.Serve(ctx, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil {
logger.Error("error starting metrics server", "err", err)
}
}()
logger.Debug("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
metricsSrv, err := m.Start(metricsCfg.ListenAddr, metricsCfg.ListenPort)
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to start metrics server: %w", err), s.Close())
}
logger.Info("started metrics server", "addr", metricsSrv.Addr())
s.metricsSrv = metricsSrv
m.StartBalanceMetrics(ctx, logger, l1Client, txMgr.From())
}
factory, err := bindings.NewDisputeGameFactory(cfg.GameFactoryAddress, l1Client)
if err != nil {
return nil, fmt.Errorf("failed to bind the fault dispute game factory contract: %w", err)
return nil, errors.Join(fmt.Errorf("failed to bind the fault dispute game factory contract: %w", err), s.Close())
}
loader := NewGameLoader(factory)
disk := newDiskManager(cfg.Datadir)
sched := scheduler.NewScheduler(
s.sched = scheduler.NewScheduler(
logger,
m,
disk,
......@@ -79,24 +102,20 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se
pollClient, err := opClient.NewRPCWithClient(ctx, logger, cfg.L1EthRpc, opClient.NewBaseRPCClient(l1Client.Client()), cfg.PollInterval)
if err != nil {
return nil, fmt.Errorf("failed to create RPC client: %w", err)
return nil, errors.Join(fmt.Errorf("failed to create RPC client: %w", err), s.Close())
}
monitor := newGameMonitor(logger, cl, loader, sched, cfg.GameWindow, l1Client.BlockNumber, cfg.GameAllowlist, pollClient)
s.monitor = newGameMonitor(logger, cl, loader, s.sched, cfg.GameWindow, l1Client.BlockNumber, cfg.GameAllowlist, pollClient)
m.RecordInfo(version.SimpleWithMeta)
m.RecordUp()
return &Service{
logger: logger,
metrics: m,
monitor: monitor,
sched: sched,
}, nil
return s, nil
}
// MonitorGame monitors the fault dispute game and attempts to progress it.
func (s *Service) MonitorGame(ctx context.Context) error {
s.sched.Start(ctx)
defer s.sched.Close()
defer s.Close()
return s.monitor.MonitorGames(ctx)
}
......@@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
)
......@@ -122,8 +123,8 @@ func NewMetrics() *Metrics {
}
}
func (m *Metrics) Serve(ctx context.Context, host string, port int) error {
return opmetrics.ListenAndServe(ctx, m.registry, host, port)
func (m *Metrics) Start(host string, port int) (*httputil.HTTPServer, error) {
return opmetrics.StartServer(m.registry, host, port)
}
func (m *Metrics) StartBalanceMetrics(
......
package op_heartbeat
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
......@@ -41,12 +41,10 @@ func Main(version string) func(ctx *cli.Context) error {
oplog.SetGlobalLogHandler(l.GetHandler())
l.Info("starting heartbeat monitor", "version", version)
ctx, cancel := context.WithCancel(context.Background())
go func() {
if err := Start(ctx, l, cfg, version); err != nil {
l.Crit("error starting application", "err", err)
}
}()
srv, err := Start(l, cfg, version)
if err != nil {
l.Crit("error starting application", "err", err)
}
doneCh := make(chan os.Signal, 1)
signal.Notify(doneCh, []os.Signal{
......@@ -56,32 +54,52 @@ func Main(version string) func(ctx *cli.Context) error {
syscall.SIGQUIT,
}...)
<-doneCh
cancel()
return nil
return srv.Close()
}
}
func Start(ctx context.Context, l log.Logger, cfg Config, version string) error {
registry := opmetrics.NewRegistry()
type HeartbeatService struct {
pprof, metrics, http *httputil.HTTPServer
}
func (hs *HeartbeatService) Close() error {
var result error
if hs.pprof != nil {
result = errors.Join(result, hs.pprof.Close())
}
if hs.metrics != nil {
result = errors.Join(result, hs.metrics.Close())
}
if hs.http != nil {
result = errors.Join(result, hs.http.Close())
}
return result
}
func Start(l log.Logger, cfg Config, version string) (*HeartbeatService, error) {
hs := &HeartbeatService{}
registry := opmetrics.NewRegistry()
metricsCfg := cfg.Metrics
if metricsCfg.Enabled {
l.Info("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
go func() {
if err := opmetrics.ListenAndServe(ctx, registry, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil {
l.Error("error starting metrics server", err)
}
}()
l.Debug("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
metricsSrv, err := opmetrics.StartServer(registry, metricsCfg.ListenAddr, metricsCfg.ListenPort)
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to start metrics server: %w", err), hs.Close())
}
hs.metrics = metricsSrv
l.Info("started metrics server", "addr", metricsSrv.Addr())
}
pprofCfg := cfg.Pprof
if pprofCfg.Enabled {
l.Info("starting pprof server", "addr", pprofCfg.ListenAddr, "port", pprofCfg.ListenPort)
go func() {
if err := oppprof.ListenAndServe(ctx, pprofCfg.ListenAddr, pprofCfg.ListenPort); err != nil {
l.Error("error starting pprof server", err)
}
}()
l.Debug("starting pprof", "addr", pprofCfg.ListenAddr, "port", pprofCfg.ListenPort)
pprofSrv, err := oppprof.StartServer(pprofCfg.ListenAddr, pprofCfg.ListenPort)
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to start pprof server: %w", err), hs.Close())
}
l.Info("started pprof server", "addr", pprofSrv.Addr())
hs.pprof = pprofSrv
}
metrics := NewMetrics(registry)
......@@ -92,16 +110,22 @@ func Start(ctx context.Context, l log.Logger, cfg Config, version string) error
recorder := opmetrics.NewPromHTTPRecorder(registry, MetricsNamespace)
mw := opmetrics.NewHTTPRecordingMiddleware(recorder, mux)
server := &http.Server{
Addr: net.JoinHostPort(cfg.HTTPAddr, strconv.Itoa(cfg.HTTPPort)),
MaxHeaderBytes: HTTPMaxHeaderSize,
Handler: mw,
WriteTimeout: 30 * time.Second,
IdleTimeout: time.Minute,
ReadTimeout: 30 * time.Second,
srv, err := httputil.StartHTTPServer(
net.JoinHostPort(cfg.HTTPAddr, strconv.Itoa(cfg.HTTPPort)),
mw,
httputil.WithTimeouts(httputil.HTTPTimeouts{
ReadTimeout: 30 * time.Second,
ReadHeaderTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: time.Minute,
}),
httputil.WithMaxHeaderBytes(HTTPMaxHeaderSize))
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to start HTTP server: %w", err), hs.Close())
}
hs.http = srv
return httputil.ListenAndServeContext(ctx, server)
return hs, nil
}
func Handler(l log.Logger, metrics Metrics) http.HandlerFunc {
......
......@@ -9,7 +9,6 @@ import (
"net"
"net/http"
"testing"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
......@@ -32,18 +31,14 @@ func TestService(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
exitC := make(chan error, 1)
go func() {
exitC <- Start(ctx, log.New(), cfg, "foobar")
}()
srv, err := Start(log.New(), cfg, "foobar")
// Make sure that the service properly starts
select {
case <-time.NewTimer(100 * time.Millisecond).C:
// pass
case err := <-exitC:
t.Fatalf("unexpected error on startup: %v", err)
}
require.NoError(t, err)
defer cancel()
defer func() {
require.NoError(t, srv.Close(), "close heartbeat server")
}()
tests := []struct {
name string
......@@ -132,9 +127,6 @@ func TestService(t *testing.T) {
require.Contains(t, string(metricsBody), tt.metric)
})
}
cancel()
require.NoError(t, <-exitC)
}
func freePort(t *testing.T) int {
......
......@@ -523,19 +523,13 @@ func (m *Metrics) RecordSequencerSealingTime(duration time.Duration) {
m.SequencerSealingDurationSeconds.Observe(float64(duration) / float64(time.Second))
}
// Serve starts the metrics server on the given hostname and port.
// The server will be closed when the passed-in context is cancelled.
func (m *Metrics) Serve(ctx context.Context, hostname string, port int) error {
// StartServer starts the metrics server on the given hostname and port.
func (m *Metrics) StartServer(hostname string, port int) (*ophttp.HTTPServer, error) {
addr := net.JoinHostPort(hostname, strconv.Itoa(port))
server := ophttp.NewHttpServer(promhttp.InstrumentMetricHandler(
h := promhttp.InstrumentMetricHandler(
m.registry, promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}),
))
server.Addr = addr
go func() {
<-ctx.Done()
server.Close()
}()
return server.ListenAndServe()
)
return ophttp.StartHTTPServer(addr, h)
}
func (m *Metrics) Document() []metrics.DocumentedMetric {
......
......@@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p/core/peer"
......@@ -49,15 +51,14 @@ type OpNode struct {
rollupHalt string // when to halt the rollup, disabled if empty
pprofSrv *httputil.HTTPServer
metricsSrv *httputil.HTTPServer
// some resources cannot be stopped directly, like the p2p gossipsub router (not our design),
// and depend on this ctx to be closed.
resourcesCtx context.Context
resourcesClose context.CancelFunc
// resource context for anything that stays around for the post-processing phase: e.g. metrics.
postResourcesCtx context.Context
postResourcesClose context.CancelFunc
// Indicates when it's safe to close data sources used by the runtimeConfig bg loader
runtimeConfigReloaderDone chan struct{}
......@@ -89,8 +90,6 @@ func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logge
// not a context leak, gossipsub is closed with a context.
n.resourcesCtx, n.resourcesClose = context.WithCancel(context.Background())
n.postResourcesCtx, n.postResourcesClose = context.WithCancel(context.Background())
err := n.init(ctx, cfg, snapshotLog)
if err != nil {
log.Error("Error initializing the rollup node", "err", err)
......@@ -130,13 +129,15 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initRPCServer(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the RPC server: %w", err)
}
if err := n.initMetricsServer(ctx, cfg); err != nil {
if err := n.initMetricsServer(cfg); err != nil {
return fmt.Errorf("failed to init the metrics server: %w", err)
}
n.metrics.RecordInfo(n.appVersion)
n.metrics.RecordUp()
n.initHeartbeat(cfg)
n.initPProf(cfg)
if err := n.initPProf(cfg); err != nil {
return fmt.Errorf("failed to init pprof server: %w", err)
}
return nil
}
......@@ -341,17 +342,18 @@ func (n *OpNode) initRPCServer(ctx context.Context, cfg *Config) error {
return nil
}
func (n *OpNode) initMetricsServer(ctx context.Context, cfg *Config) error {
func (n *OpNode) initMetricsServer(cfg *Config) error {
if !cfg.Metrics.Enabled {
n.log.Info("metrics disabled")
return nil
}
n.log.Info("starting metrics server", "addr", cfg.Metrics.ListenAddr, "port", cfg.Metrics.ListenPort)
go func() {
if err := n.metrics.Serve(n.postResourcesCtx, cfg.Metrics.ListenAddr, cfg.Metrics.ListenPort); err != nil {
log.Crit("error starting metrics server", "err", err)
}
}()
n.log.Debug("starting metrics server", "addr", cfg.Metrics.ListenAddr, "port", cfg.Metrics.ListenPort)
metricsSrv, err := n.metrics.StartServer(cfg.Metrics.ListenAddr, cfg.Metrics.ListenPort)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
n.log.Info("started metrics server", "addr", metricsSrv.Addr())
n.metricsSrv = metricsSrv
return nil
}
......@@ -381,16 +383,18 @@ func (n *OpNode) initHeartbeat(cfg *Config) {
}(cfg.Heartbeat.URL)
}
func (n *OpNode) initPProf(cfg *Config) {
func (n *OpNode) initPProf(cfg *Config) error {
if !cfg.Pprof.Enabled {
return
return nil
}
log.Info("pprof server started", "addr", net.JoinHostPort(cfg.Pprof.ListenAddr, strconv.Itoa(cfg.Pprof.ListenPort)))
go func(listenAddr string, listenPort int) {
if err := oppprof.ListenAndServe(n.resourcesCtx, listenAddr, listenPort); err != nil {
log.Error("error starting pprof", "err", err)
}
}(cfg.Pprof.ListenAddr, cfg.Pprof.ListenPort)
log.Debug("starting pprof server", "addr", net.JoinHostPort(cfg.Pprof.ListenAddr, strconv.Itoa(cfg.Pprof.ListenPort)))
srv, err := oppprof.StartServer(cfg.Pprof.ListenAddr, cfg.Pprof.ListenPort)
if err != nil {
return err
}
n.pprofSrv = srv
log.Info("started pprof server", "addr", srv.Addr())
return nil
}
func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
......@@ -617,9 +621,17 @@ func (n *OpNode) Stop(ctx context.Context) error {
}
}
// Close metrics only after we are done idling
// TODO(7534): This should be refactored to a series of Close() calls to the respective resources.
n.postResourcesClose()
// Close metrics and pprof only after we are done idling
if n.pprofSrv != nil {
if err := n.pprofSrv.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close pprof server: %w", err))
}
}
if n.metricsSrv != nil {
if err := n.metricsSrv.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close metrics server: %w", err))
}
}
return result.ErrorOrNil()
}
......
......@@ -3,13 +3,14 @@ package metrics
import (
"context"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
)
......@@ -75,8 +76,8 @@ func NewMetrics(procName string) *Metrics {
}
}
func (m *Metrics) Serve(ctx context.Context, host string, port int) error {
return opmetrics.ListenAndServe(ctx, m.registry, host, port)
func (m *Metrics) Start(host string, port int) (*httputil.HTTPServer, error) {
return opmetrics.StartServer(m.registry, host, port)
}
func (m *Metrics) StartBalanceMetrics(ctx context.Context,
......
......@@ -9,12 +9,13 @@ import (
"sync"
"time"
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-proposer/flags"
......@@ -62,9 +63,7 @@ func Main(version string, cliCtx *cli.Context) error {
}
l.Info("Starting L2 Output Submitter")
ctx, cancel := context.WithCancel(context.Background())
if err := l2OutputSubmitter.Start(); err != nil {
cancel()
l.Error("Unable to start L2 Output Submitter", "error", err)
return err
}
......@@ -73,29 +72,33 @@ func Main(version string, cliCtx *cli.Context) error {
l.Info("L2 Output Submitter started")
pprofConfig := cfg.PprofConfig
if pprofConfig.Enabled {
l.Info("starting pprof", "addr", pprofConfig.ListenAddr, "port", pprofConfig.ListenPort)
go func() {
if err := oppprof.ListenAndServe(ctx, pprofConfig.ListenAddr, pprofConfig.ListenPort); err != nil {
l.Error("error starting pprof", "err", err)
}
}()
l.Debug("starting pprof", "addr", pprofConfig.ListenAddr, "port", pprofConfig.ListenPort)
pprofSrv, err := oppprof.StartServer(pprofConfig.ListenAddr, pprofConfig.ListenPort)
if err != nil {
l.Error("failed to start pprof server", "err", err)
return err
}
l.Info("started pprof server", "addr", pprofSrv.Addr())
defer pprofSrv.Close()
}
metricsCfg := cfg.MetricsConfig
if metricsCfg.Enabled {
l.Info("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
go func() {
if err := m.Serve(ctx, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil {
l.Error("error starting metrics server", "err", err)
}
}()
l.Debug("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
metricsSrv, err := m.Start(metricsCfg.ListenAddr, metricsCfg.ListenPort)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
l.Info("started metrics server", "addr", metricsSrv.Addr())
defer metricsSrv.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m.StartBalanceMetrics(ctx, l, proposerConfig.L1Client, proposerConfig.TxManager.From())
}
rpcCfg := cfg.RPCConfig
server := oprpc.NewServer(rpcCfg.ListenAddr, rpcCfg.ListenPort, version, oprpc.WithLogger(l))
if err := server.Start(); err != nil {
cancel()
return fmt.Errorf("error starting RPC server: %w", err)
}
......@@ -103,7 +106,6 @@ func Main(version string, cliCtx *cli.Context) error {
m.RecordUp()
opio.BlockOnInterrupts()
cancel()
return nil
}
......
package httputil
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sync/atomic"
)
func ListenAndServeContext(ctx context.Context, server *http.Server) error {
errCh := make(chan error, 1)
go func() {
errCh <- server.ListenAndServe()
}()
// HTTPServer wraps a http.Server, while providing conveniences
// like exposing the running state and address.
type HTTPServer struct {
listener net.Listener
srv *http.Server
closed atomic.Bool
}
// HTTPOption applies a change to an HTTP server
type HTTPOption func(srv *HTTPServer) error
func StartHTTPServer(addr string, handler http.Handler, opts ...HTTPOption) (*HTTPServer, error) {
listener, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to bind to address %q: %w", addr, err)
}
select {
case err := <-errCh:
srv := &http.Server{
Handler: handler,
ReadTimeout: DefaultTimeouts.ReadTimeout,
ReadHeaderTimeout: DefaultTimeouts.ReadHeaderTimeout,
WriteTimeout: DefaultTimeouts.WriteTimeout,
IdleTimeout: DefaultTimeouts.IdleTimeout,
}
out := &HTTPServer{listener: listener, srv: srv}
for _, opt := range opts {
if err := opt(out); err != nil {
return nil, errors.Join(fmt.Errorf("failed to apply HTTP option: %w", err), listener.Close())
}
}
go func() {
err := out.srv.Serve(listener)
// no error, unless ErrServerClosed (or unused base context closes, or unused http2 config error)
if errors.Is(err, http.ErrServerClosed) {
return nil
out.closed.Store(true)
} else {
panic(fmt.Errorf("unexpected serve error: %w", err))
}
return err
case <-ctx.Done():
_ = server.Shutdown(context.Background())
}()
return out, nil
}
err := ctx.Err()
if errors.Is(err, context.Canceled) {
return nil
}
return err
func (s *HTTPServer) Closed() bool {
return s.closed.Load()
}
func (s *HTTPServer) Close() error {
// closes the underlying listener too
err := s.srv.Close()
if errors.Is(err, http.ErrServerClosed) {
return nil
}
return err
}
func (s *HTTPServer) Addr() string {
return s.listener.Addr().String()
}
func WithMaxHeaderBytes(max int) HTTPOption {
return func(srv *HTTPServer) error {
srv.srv.MaxHeaderBytes = max
return nil
}
}
package httputil
import "time"
// DefaultTimeouts for HTTP server, based on the RPC timeouts that geth uses.
var DefaultTimeouts = HTTPTimeouts{
ReadTimeout: 30 * time.Second,
ReadHeaderTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 120 * time.Second,
}
// HTTPTimeouts represents the configuration params for the HTTP RPC server.
type HTTPTimeouts struct {
// ReadTimeout is the maximum duration for reading the entire
// request, including the body. A zero or negative value means
// there will be no timeout.
//
// Because ReadTimeout does not let Handlers make per-request
// decisions on each request body's acceptable deadline or
// upload rate, most users will prefer to use
// ReadHeaderTimeout. It is valid to use them both.
ReadTimeout time.Duration
// ReadHeaderTimeout is the amount of time allowed to read
// request headers. The connection's read deadline is reset
// after reading the headers and the Handler can decide what
// is considered too slow for the body. If ReadHeaderTimeout
// is zero, the value of ReadTimeout is used. If both are
// zero, there is no timeout.
ReadHeaderTimeout time.Duration
// WriteTimeout is the maximum duration before timing out
// writes of the response. It is reset whenever a new
// request's header is read. Like ReadTimeout, it does not
// let Handlers make decisions on a per-request basis.
// A zero or negative value means there will be no timeout.
WriteTimeout time.Duration
// IdleTimeout is the maximum amount of time to wait for the
// next request when keep-alives are enabled. If IdleTimeout
// is zero, the value of ReadTimeout is used. If both are
// zero, there is no timeout.
IdleTimeout time.Duration
}
func WithTimeouts(timeouts HTTPTimeouts) HTTPOption {
return func(s *HTTPServer) error {
s.srv.ReadTimeout = timeouts.ReadTimeout
s.srv.ReadHeaderTimeout = timeouts.ReadHeaderTimeout
s.srv.WriteTimeout = timeouts.WriteTimeout
s.srv.IdleTimeout = timeouts.IdleTimeout
return nil
}
}
package metrics
import (
"context"
"net"
"net/http"
"strconv"
"github.com/ethereum-optimism/optimism/op-service/httputil"
......@@ -11,13 +9,10 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func ListenAndServe(ctx context.Context, r *prometheus.Registry, hostname string, port int) error {
func StartServer(r *prometheus.Registry, hostname string, port int) (*httputil.HTTPServer, error) {
addr := net.JoinHostPort(hostname, strconv.Itoa(port))
server := &http.Server{
Addr: addr,
Handler: promhttp.InstrumentMetricHandler(
r, promhttp.HandlerFor(r, promhttp.HandlerOpts{}),
),
}
return httputil.ListenAndServeContext(ctx, server)
h := promhttp.InstrumentMetricHandler(
r, promhttp.HandlerFor(r, promhttp.HandlerOpts{}),
)
return httputil.StartHTTPServer(addr, h)
}
package pprof
import (
"context"
"net"
"net/http"
"net/http/pprof"
......@@ -10,7 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/httputil"
)
func ListenAndServe(ctx context.Context, hostname string, port int) error {
func StartServer(hostname string, port int) (*httputil.HTTPServer, error) {
mux := http.NewServeMux()
// have to do below to support multiple servers, since the
......@@ -22,9 +21,5 @@ func ListenAndServe(ctx context.Context, hostname string, port int) error {
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
addr := net.JoinHostPort(hostname, strconv.Itoa(port))
server := &http.Server{
Addr: addr,
Handler: mux,
}
return httputil.ListenAndServeContext(ctx, server)
return httputil.StartHTTPServer(addr, mux)
}
......@@ -419,11 +419,11 @@ var (
metrics := engine.NewMetrics("wheel", registry)
if metricsCfg.Enabled {
l.Info("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
go func() {
if err := opmetrics.ListenAndServe(ctx, registry, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil {
l.Error("error starting metrics server", err)
}
}()
metricsSrv, err := opmetrics.StartServer(registry, metricsCfg.ListenAddr, metricsCfg.ListenPort)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
defer metricsSrv.Close()
}
return engine.Auto(ctx, metrics, client, l, shutdown, settings)
})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment