Commit ab8a7abe authored by protolambda's avatar protolambda

op-node: shutdown cleanup, idle after halt

parent 546fb2c7
......@@ -14,18 +14,16 @@ import (
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/geth"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/clock"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
......@@ -36,8 +34,6 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
......@@ -46,15 +42,19 @@ import (
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
"github.com/ethereum-optimism/optimism/op-e2e/config"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/geth"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/metrics"
rollupNode "github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/sources"
proposermetrics "github.com/ethereum-optimism/optimism/op-proposer/metrics"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/eth"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -277,8 +277,11 @@ func (sys *System) Close() {
sys.BatchSubmitter.StopIfRunning(ctx)
}
postCtx, postCancel := context.WithCancel(context.Background())
postCancel() // immediate shutdown, no allowance for idling
for _, node := range sys.RollupNodes {
node.Close()
node.Stop(postCtx)
}
for _, ei := range sys.EthInstances {
ei.Close()
......@@ -333,8 +336,10 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
didErrAfterStart := false
defer func() {
if didErrAfterStart {
postCtx, postCancel := context.WithCancel(context.Background())
postCancel() // immediate shutdown, no allowance for idling
for _, node := range sys.RollupNodes {
node.Close()
node.Stop(postCtx)
}
for _, ei := range sys.EthInstances {
ei.Close()
......@@ -595,12 +600,25 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
}
c.Rollup.LogDescription(cfg.Loggers[name], chaincfg.L2ChainIDToNetworkDisplayName)
node, err := rollupNode.New(context.Background(), &c, cfg.Loggers[name], snapLog, "", metrics.NewMetrics(""))
l := cfg.Loggers[name]
var cycle cliapp.Lifecycle
c.Cancel = func(errCause error) {
l.Warn("node requested early shutdown!", "err", errCause)
go func() {
postCtx, postCancel := context.WithCancel(context.Background())
postCancel() // don't allow the stopping to continue for longer than needed
if err := cycle.Stop(postCtx); err != nil {
t.Error(err)
}
l.Warn("closed op-node!")
}()
}
node, err := rollupNode.New(context.Background(), &c, l, snapLog, "", metrics.NewMetrics(""))
if err != nil {
didErrAfterStart = true
return nil, err
}
cycle = node
err = node.Start(context.Background())
if err != nil {
didErrAfterStart = true
......
......@@ -1528,7 +1528,7 @@ func TestRequiredProtocolVersionChangeAndHalt(t *testing.T) {
// wait for the required protocol version to take effect by halting the verifier that opted in, and halting the op-geth node that opted in.
_, err = retry.Do(context.Background(), 10, retry.Fixed(time.Second*10), func() (struct{}, error) {
if !sys.RollupNodes["verifier"].Closed() {
if !sys.RollupNodes["verifier"].Stopped() {
return struct{}{}, errors.New("verifier rollup node is not closed yet")
}
return struct{}{}, nil
......
......@@ -2,29 +2,25 @@ package main
import (
"context"
"net"
"fmt"
"os"
"strconv"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/cmd/doc"
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/log"
opnode "github.com/ethereum-optimism/optimism/op-node"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/cmd/doc"
"github.com/ethereum-optimism/optimism/op-node/cmd/genesis"
"github.com/ethereum-optimism/optimism/op-node/cmd/p2p"
"github.com/ethereum-optimism/optimism/op-node/flags"
"github.com/ethereum-optimism/optimism/op-node/heartbeat"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/version"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
)
var (
......@@ -58,7 +54,7 @@ func main() {
app.Name = "op-node"
app.Usage = "Optimism Rollup Node"
app.Description = "The Optimism Rollup Node derives L2 block inputs from L1 data and drives an external L2 Execution Engine to build a L2 chain."
app.Action = RollupNodeMain
app.Action = cliapp.LifecycleCmd(RollupNodeMain)
app.Commands = []*cli.Command{
{
Name: "p2p",
......@@ -80,8 +76,7 @@ func main() {
}
}
func RollupNodeMain(ctx *cli.Context) error {
log.Info("Initializing Rollup Node")
func RollupNodeMain(ctx *cli.Context, closeApp context.CancelCauseFunc) (cliapp.Lifecycle, error) {
logCfg := oplog.ReadCLIConfig(ctx)
log := oplog.NewLogger(oplog.AppOut(ctx), logCfg)
oplog.SetGlobalLogHandler(log.GetHandler())
......@@ -90,13 +85,13 @@ func RollupNodeMain(ctx *cli.Context) error {
cfg, err := opnode.NewConfig(ctx, log)
if err != nil {
log.Error("Unable to create the rollup node config", "error", err)
return err
return nil, fmt.Errorf("unable to create the rollup node config: %w", err)
}
cfg.Cancel = closeApp
snapshotLog, err := opnode.NewSnapshotLogger(ctx)
if err != nil {
log.Error("Unable to create snapshot root logger", "error", err)
return err
return nil, fmt.Errorf("unable to create snapshot root logger: %w", err)
}
// Only pretty-print the banner if it is a terminal log. Other log it as key-value pairs.
......@@ -106,60 +101,10 @@ func RollupNodeMain(ctx *cli.Context) error {
cfg.Rollup.LogDescription(log, chaincfg.L2ChainIDToNetworkDisplayName)
}
n, err := node.New(context.Background(), cfg, log, snapshotLog, VersionWithMeta, m)
n, err := node.New(ctx.Context, cfg, log, snapshotLog, VersionWithMeta, m)
if err != nil {
log.Error("Unable to create the rollup node", "error", err)
return err
return nil, fmt.Errorf("unable to create the rollup node: %w", err)
}
log.Info("Starting rollup node", "version", VersionWithMeta)
if err := n.Start(context.Background()); err != nil {
log.Error("Unable to start rollup node", "error", err)
return err
}
defer n.Close()
m.RecordInfo(VersionWithMeta)
m.RecordUp()
log.Info("Rollup node started")
if cfg.Heartbeat.Enabled {
var peerID string
if cfg.P2P.Disabled() {
peerID = "disabled"
} else {
peerID = n.P2P().Host().ID().String()
}
beatCtx, beatCtxCancel := context.WithCancel(context.Background())
payload := &heartbeat.Payload{
Version: version.Version,
Meta: version.Meta,
Moniker: cfg.Heartbeat.Moniker,
PeerID: peerID,
ChainID: cfg.Rollup.L2ChainID.Uint64(),
}
go func() {
if err := heartbeat.Beat(beatCtx, log, cfg.Heartbeat.URL, payload); err != nil {
log.Error("heartbeat goroutine crashed", "err", err)
}
}()
defer beatCtxCancel()
}
if cfg.Pprof.Enabled {
pprofCtx, pprofCancel := context.WithCancel(context.Background())
go func() {
log.Info("pprof server started", "addr", net.JoinHostPort(cfg.Pprof.ListenAddr, strconv.Itoa(cfg.Pprof.ListenPort)))
if err := oppprof.ListenAndServe(pprofCtx, cfg.Pprof.ListenAddr, cfg.Pprof.ListenPort); err != nil {
log.Error("error starting pprof", "err", err)
}
}()
defer pprofCancel()
}
opio.BlockOnInterrupts()
return nil
return n, nil
}
package node
import (
"context"
"errors"
"fmt"
"math"
......@@ -56,6 +57,9 @@ type Config struct {
// To halt when detecting the node does not support a signaled protocol version
// change of the given severity (major/minor/patch). Disabled if empty.
RollupHalt string
// Cancel to request a premature shutdown of the node itself, e.g. when halting. This may be nil.
Cancel context.CancelCauseFunc
}
type RPCConfig struct {
......
......@@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"net"
"strconv"
"sync/atomic"
"time"
......@@ -15,11 +17,14 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/heartbeat"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/version"
"github.com/ethereum-optimism/optimism/op-service/eth"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum-optimism/optimism/op-service/retry"
)
......@@ -49,15 +54,26 @@ type OpNode struct {
resourcesCtx context.Context
resourcesClose context.CancelFunc
// resource context for anything that stays around for the post-processing phase: e.g. metrics.
postResourcesCtx context.Context
postResourcesClose context.CancelFunc
// Indicates when it's safe to close data sources used by the runtimeConfig bg loader
runtimeConfigReloaderDone chan struct{}
closed atomic.Bool
// cancels execution prematurely, e.g. to halt. This may be nil.
cancel context.CancelCauseFunc
halted bool
}
// The OpNode handles incoming gossip
var _ p2p.GossipIn = (*OpNode)(nil)
// New creates a new OpNode instance.
// The provided ctx argument is for the span of initialization only;
// the node will immediately Stop(ctx) before finishing initialization if the context is canceled during initialization.
func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logger, appVersion string, m *metrics.Metrics) (*OpNode, error) {
if err := cfg.Check(); err != nil {
return nil, err
......@@ -68,15 +84,18 @@ func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logge
appVersion: appVersion,
metrics: m,
rollupHalt: cfg.RollupHalt,
cancel: cfg.Cancel,
}
// not a context leak, gossipsub is closed with a context.
n.resourcesCtx, n.resourcesClose = context.WithCancel(context.Background())
n.postResourcesCtx, n.postResourcesClose = context.WithCancel(context.Background())
err := n.init(ctx, cfg, snapshotLog)
if err != nil {
log.Error("Error initializing the rollup node", "err", err)
// ensure we always close the node resources if we fail to initialize the node.
if closeErr := n.Close(); closeErr != nil {
if closeErr := n.Stop(ctx); closeErr != nil {
return nil, multierror.Append(err, closeErr)
}
return nil, err
......@@ -85,6 +104,7 @@ func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logge
}
func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
n.log.Info("Initializing rollup node", "version", n.appVersion)
if err := n.initTracer(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the trace: %w", err)
}
......@@ -113,6 +133,14 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initMetricsServer(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the metrics server: %w", err)
}
n.metrics.RecordInfo(n.appVersion)
n.metrics.RecordUp()
if err := n.initHeartbeat(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the heartbeat service: %w", err)
}
if err := n.initPProf(ctx, cfg); err != nil {
return fmt.Errorf("failed to init pprof server: %w", err)
}
return nil
}
......@@ -206,16 +234,20 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error {
// initialize the runtime config before unblocking
if _, err := retry.Do(ctx, 5, retry.Fixed(time.Second*10), func() (eth.L1BlockRef, error) {
return reload(ctx)
ref, err := reload(ctx)
if errors.Is(err, errNodeHalt) { // don't retry on halt error
err = nil
}
return ref, err
}); err != nil {
return fmt.Errorf("failed to load runtime configuration repeatedly, last error: %w", err)
}
// start a background loop, to keep reloading it at the configured reload interval
reloader := func(ctx context.Context, reloadInterval time.Duration) bool {
reloader := func(ctx context.Context, reloadInterval time.Duration) {
if reloadInterval <= 0 {
n.log.Debug("not running runtime-config reloading background loop")
return false
return
}
ticker := time.NewTicker(reloadInterval)
defer ticker.Stop()
......@@ -229,13 +261,17 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error {
case errNodeHalt, nil:
n.log.Debug("reloaded runtime config", "l1_head", l1Head)
if err == errNodeHalt {
return true
n.halted = true
if n.cancel != nil {
n.cancel(errNodeHalt)
}
return
}
default:
n.log.Warn("failed to reload runtime config", "err", err)
}
case <-ctx.Done():
return false
return
}
}
}
......@@ -243,13 +279,8 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error {
n.runtimeConfigReloaderDone = make(chan struct{})
// Manages the lifetime of reloader. In order to safely Close the OpNode
go func(ctx context.Context, reloadInterval time.Duration) {
halt := reloader(ctx, reloadInterval)
reloader(ctx, reloadInterval)
close(n.runtimeConfigReloaderDone)
if halt {
if err := n.Close(); err != nil {
n.log.Error("Failed to halt rollup", "err", err)
}
}
}(n.resourcesCtx, cfg.RuntimeConfigReloadInterval) // this keeps running after initialization
return nil
}
......@@ -319,13 +350,53 @@ func (n *OpNode) initMetricsServer(ctx context.Context, cfg *Config) error {
}
n.log.Info("starting metrics server", "addr", cfg.Metrics.ListenAddr, "port", cfg.Metrics.ListenPort)
go func() {
if err := n.metrics.Serve(ctx, cfg.Metrics.ListenAddr, cfg.Metrics.ListenPort); err != nil {
if err := n.metrics.Serve(n.postResourcesCtx, cfg.Metrics.ListenAddr, cfg.Metrics.ListenPort); err != nil {
log.Crit("error starting metrics server", "err", err)
}
}()
return nil
}
func (n *OpNode) initHeartbeat(_ context.Context, cfg *Config) error {
if !cfg.Heartbeat.Enabled {
return nil
}
var peerID string
if cfg.P2P.Disabled() {
peerID = "disabled"
} else {
peerID = n.P2P().Host().ID().String()
}
payload := &heartbeat.Payload{
Version: version.Version,
Meta: version.Meta,
Moniker: cfg.Heartbeat.Moniker,
PeerID: peerID,
ChainID: cfg.Rollup.L2ChainID.Uint64(),
}
go func(url string) {
if err := heartbeat.Beat(n.resourcesCtx, n.log, url, payload); err != nil {
log.Error("heartbeat goroutine crashed", "err", err)
}
}(cfg.Heartbeat.URL)
return nil
}
func (n *OpNode) initPProf(_ context.Context, cfg *Config) error {
if !cfg.Pprof.Enabled {
return nil
}
log.Info("pprof server started", "addr", net.JoinHostPort(cfg.Pprof.ListenAddr, strconv.Itoa(cfg.Pprof.ListenPort)))
go func(listenAddr string, listenPort int) {
if err := oppprof.ListenAndServe(n.resourcesCtx, listenAddr, listenPort); err != nil {
log.Error("error starting pprof", "err", err)
}
}(cfg.Pprof.ListenAddr, cfg.Pprof.ListenPort)
return nil
}
func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
if cfg.P2P != nil {
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics)
......@@ -369,6 +440,7 @@ func (n *OpNode) Start(ctx context.Context) error {
n.log.Info("Started L2-RPC sync service")
}
log.Info("Rollup node started")
return nil
}
......@@ -473,8 +545,9 @@ func (n *OpNode) RuntimeConfig() ReadonlyRuntimeConfig {
return n.runCfg
}
// Close closes all resources.
func (n *OpNode) Close() error {
// Stop stops the node and closes all resources.
// If the provided ctx is expired, the node will accelerate the stop where possible, but still fully close.
func (n *OpNode) Stop(ctx context.Context) error {
if n.closed.Load() {
return errors.New("node is already closed")
}
......@@ -537,10 +610,25 @@ func (n *OpNode) Close() error {
n.closed.Store(true)
}
if n.halted {
// if we had a halt upon initialization, idle for a while, with open metrics, to prevent a rapid restart-loop
tim := time.NewTimer(time.Minute * 5)
n.log.Warn("halted, idling to avoid immediate shutdown repeats")
defer tim.Stop()
select {
case <-tim.C:
case <-ctx.Done():
}
}
// Close metrics only after we are done idling
// TODO(7534): This should be refactored to a series of Close() calls to the respective resources.
n.postResourcesClose()
return result.ErrorOrNil()
}
func (n *OpNode) Closed() bool {
func (n *OpNode) Stopped() bool {
return n.closed.Load()
}
......
package cliapp
import (
"context"
"errors"
"fmt"
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
type Lifecycle interface {
// Start starts a service. A service only fully starts once. Subsequent starts may return an error.
// A context is provided to end the service during setup.
// The caller should call Stop to clean up after failing to start.
Start(ctx context.Context) error
// Stop stops a service gracefully.
// The provided ctx can force an accelerated shutdown,
// but the node still has to completely stop.
Stop(ctx context.Context) error
// Stopped determines if the service was stopped with Stop.
Stopped() bool
}
// LifecycleAction instantiates a Lifecycle based on a CLI context.
// With the close argument a lifecycle may choose to shut itself down.
// A service may choose to idle, dump debug information or otherwise delay
// a shutdown when the Stop context is not expired.
type LifecycleAction func(ctx *cli.Context, close context.CancelCauseFunc) (Lifecycle, error)
// LifecycleCmd turns a LifecycleAction into an CLI action,
// by instrumenting it with CLI context and signal based termination.
// The app may continue to run post-processing until fully shutting down.
// The user can force an early shut-down during post-processing by sending a second interruption signal.
func LifecycleCmd(fn LifecycleAction) cli.ActionFunc {
return func(ctx *cli.Context) error {
hostCtx := ctx.Context
appCtx, appCancel := context.WithCancelCause(hostCtx)
ctx.Context = appCtx
go func() {
opio.BlockOnInterruptsContext(appCtx)
appCancel(errors.New("interrupt signal"))
}()
appLifecycle, err := fn(ctx, appCancel)
if err != nil {
return fmt.Errorf("failed to setup: %w", err)
}
if err := appLifecycle.Start(appCtx); err != nil {
return fmt.Errorf("failed to start: %w", err)
}
// wait for app to be closed (through interrupt, or app requests to be stopped by closing the context)
<-appCtx.Done()
// Graceful stop context.
// This allows the service to idle before shutdown, if halted. User may interrupt.
stopCtx, stopCancel := context.WithCancel(hostCtx)
go func() {
opio.BlockOnInterruptsContext(stopCtx)
stopCancel()
}()
// Execute graceful stop.
stopErr := appLifecycle.Stop(stopCtx)
stopCancel()
if stopErr != nil {
return fmt.Errorf("failed to stop app: %w", stopErr)
}
return nil
}
}
package opio
import (
"context"
"os"
"os/signal"
"syscall"
......@@ -24,3 +25,19 @@ func BlockOnInterrupts(signals ...os.Signal) {
signal.Notify(interruptChannel, signals...)
<-interruptChannel
}
// BlockOnInterruptsContext blocks until a SIGTERM is received.
// Passing in signals will override the default signals.
// The function will stop blocking if the context is closed.
func BlockOnInterruptsContext(ctx context.Context, signals ...os.Signal) {
if len(signals) == 0 {
signals = DefaultInterruptSignals
}
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, signals...)
select {
case <-interruptChannel:
case <-ctx.Done():
signal.Stop(interruptChannel)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment