Commit 85dcabe2 authored by Sebastian Stammler's avatar Sebastian Stammler Committed by GitHub

Merge pull request #7538 from ethereum-optimism/op-node-shutdown

op-node: Cleanup shutdown process and support idle after halt
parents ca62018a 9ec88925
......@@ -14,18 +14,16 @@ import (
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/geth"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/clock"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
......@@ -36,8 +34,6 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
......@@ -46,15 +42,19 @@ import (
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
"github.com/ethereum-optimism/optimism/op-e2e/config"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/geth"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/metrics"
rollupNode "github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/sources"
proposermetrics "github.com/ethereum-optimism/optimism/op-proposer/metrics"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-service/eth"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -277,8 +277,11 @@ func (sys *System) Close() {
sys.BatchSubmitter.StopIfRunning(ctx)
}
postCtx, postCancel := context.WithCancel(context.Background())
postCancel() // immediate shutdown, no allowance for idling
for _, node := range sys.RollupNodes {
node.Close()
_ = node.Stop(postCtx)
}
for _, ei := range sys.EthInstances {
ei.Close()
......@@ -333,8 +336,10 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
didErrAfterStart := false
defer func() {
if didErrAfterStart {
postCtx, postCancel := context.WithCancel(context.Background())
postCancel() // immediate shutdown, no allowance for idling
for _, node := range sys.RollupNodes {
node.Close()
_ = node.Stop(postCtx)
}
for _, ei := range sys.EthInstances {
ei.Close()
......@@ -595,12 +600,25 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
}
c.Rollup.LogDescription(cfg.Loggers[name], chaincfg.L2ChainIDToNetworkDisplayName)
node, err := rollupNode.New(context.Background(), &c, cfg.Loggers[name], snapLog, "", metrics.NewMetrics(""))
l := cfg.Loggers[name]
var cycle cliapp.Lifecycle
c.Cancel = func(errCause error) {
l.Warn("node requested early shutdown!", "err", errCause)
go func() {
postCtx, postCancel := context.WithCancel(context.Background())
postCancel() // don't allow the stopping to continue for longer than needed
if err := cycle.Stop(postCtx); err != nil {
t.Error(err)
}
l.Warn("closed op-node!")
}()
}
node, err := rollupNode.New(context.Background(), &c, l, snapLog, "", metrics.NewMetrics(""))
if err != nil {
didErrAfterStart = true
return nil, err
}
cycle = node
err = node.Start(context.Background())
if err != nil {
didErrAfterStart = true
......
......@@ -1528,7 +1528,7 @@ func TestRequiredProtocolVersionChangeAndHalt(t *testing.T) {
// wait for the required protocol version to take effect by halting the verifier that opted in, and halting the op-geth node that opted in.
_, err = retry.Do(context.Background(), 10, retry.Fixed(time.Second*10), func() (struct{}, error) {
if !sys.RollupNodes["verifier"].Closed() {
if !sys.RollupNodes["verifier"].Stopped() {
return struct{}{}, errors.New("verifier rollup node is not closed yet")
}
return struct{}{}, nil
......
......@@ -2,29 +2,25 @@ package main
import (
"context"
"net"
"fmt"
"os"
"strconv"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/cmd/doc"
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/log"
opnode "github.com/ethereum-optimism/optimism/op-node"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/cmd/doc"
"github.com/ethereum-optimism/optimism/op-node/cmd/genesis"
"github.com/ethereum-optimism/optimism/op-node/cmd/p2p"
"github.com/ethereum-optimism/optimism/op-node/flags"
"github.com/ethereum-optimism/optimism/op-node/heartbeat"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/version"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
)
var (
......@@ -58,7 +54,7 @@ func main() {
app.Name = "op-node"
app.Usage = "Optimism Rollup Node"
app.Description = "The Optimism Rollup Node derives L2 block inputs from L1 data and drives an external L2 Execution Engine to build a L2 chain."
app.Action = RollupNodeMain
app.Action = cliapp.LifecycleCmd(RollupNodeMain)
app.Commands = []*cli.Command{
{
Name: "p2p",
......@@ -80,8 +76,7 @@ func main() {
}
}
func RollupNodeMain(ctx *cli.Context) error {
log.Info("Initializing Rollup Node")
func RollupNodeMain(ctx *cli.Context, closeApp context.CancelCauseFunc) (cliapp.Lifecycle, error) {
logCfg := oplog.ReadCLIConfig(ctx)
log := oplog.NewLogger(oplog.AppOut(ctx), logCfg)
oplog.SetGlobalLogHandler(log.GetHandler())
......@@ -90,13 +85,13 @@ func RollupNodeMain(ctx *cli.Context) error {
cfg, err := opnode.NewConfig(ctx, log)
if err != nil {
log.Error("Unable to create the rollup node config", "error", err)
return err
return nil, fmt.Errorf("unable to create the rollup node config: %w", err)
}
cfg.Cancel = closeApp
snapshotLog, err := opnode.NewSnapshotLogger(ctx)
if err != nil {
log.Error("Unable to create snapshot root logger", "error", err)
return err
return nil, fmt.Errorf("unable to create snapshot root logger: %w", err)
}
// Only pretty-print the banner if it is a terminal log. Other log it as key-value pairs.
......@@ -106,60 +101,10 @@ func RollupNodeMain(ctx *cli.Context) error {
cfg.Rollup.LogDescription(log, chaincfg.L2ChainIDToNetworkDisplayName)
}
n, err := node.New(context.Background(), cfg, log, snapshotLog, VersionWithMeta, m)
n, err := node.New(ctx.Context, cfg, log, snapshotLog, VersionWithMeta, m)
if err != nil {
log.Error("Unable to create the rollup node", "error", err)
return err
}
log.Info("Starting rollup node", "version", VersionWithMeta)
if err := n.Start(context.Background()); err != nil {
log.Error("Unable to start rollup node", "error", err)
return err
return nil, fmt.Errorf("unable to create the rollup node: %w", err)
}
defer n.Close()
m.RecordInfo(VersionWithMeta)
m.RecordUp()
log.Info("Rollup node started")
if cfg.Heartbeat.Enabled {
var peerID string
if cfg.P2P.Disabled() {
peerID = "disabled"
} else {
peerID = n.P2P().Host().ID().String()
}
beatCtx, beatCtxCancel := context.WithCancel(context.Background())
payload := &heartbeat.Payload{
Version: version.Version,
Meta: version.Meta,
Moniker: cfg.Heartbeat.Moniker,
PeerID: peerID,
ChainID: cfg.Rollup.L2ChainID.Uint64(),
}
go func() {
if err := heartbeat.Beat(beatCtx, log, cfg.Heartbeat.URL, payload); err != nil {
log.Error("heartbeat goroutine crashed", "err", err)
}
}()
defer beatCtxCancel()
}
if cfg.Pprof.Enabled {
pprofCtx, pprofCancel := context.WithCancel(context.Background())
go func() {
log.Info("pprof server started", "addr", net.JoinHostPort(cfg.Pprof.ListenAddr, strconv.Itoa(cfg.Pprof.ListenPort)))
if err := oppprof.ListenAndServe(pprofCtx, cfg.Pprof.ListenAddr, cfg.Pprof.ListenPort); err != nil {
log.Error("error starting pprof", "err", err)
}
}()
defer pprofCancel()
}
opio.BlockOnInterrupts()
return nil
return n, nil
}
package node
import (
"context"
"errors"
"fmt"
"math"
......@@ -56,6 +57,9 @@ type Config struct {
// To halt when detecting the node does not support a signaled protocol version
// change of the given severity (major/minor/patch). Disabled if empty.
RollupHalt string
// Cancel to request a premature shutdown of the node itself, e.g. when halting. This may be nil.
Cancel context.CancelCauseFunc
}
type RPCConfig struct {
......
......@@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"net"
"strconv"
"sync/atomic"
"time"
......@@ -15,11 +17,14 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/heartbeat"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/version"
"github.com/ethereum-optimism/optimism/op-service/eth"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum-optimism/optimism/op-service/retry"
)
......@@ -49,15 +54,26 @@ type OpNode struct {
resourcesCtx context.Context
resourcesClose context.CancelFunc
// resource context for anything that stays around for the post-processing phase: e.g. metrics.
postResourcesCtx context.Context
postResourcesClose context.CancelFunc
// Indicates when it's safe to close data sources used by the runtimeConfig bg loader
runtimeConfigReloaderDone chan struct{}
closed atomic.Bool
// cancels execution prematurely, e.g. to halt. This may be nil.
cancel context.CancelCauseFunc
halted bool
}
// The OpNode handles incoming gossip
var _ p2p.GossipIn = (*OpNode)(nil)
// New creates a new OpNode instance.
// The provided ctx argument is for the span of initialization only;
// the node will immediately Stop(ctx) before finishing initialization if the context is canceled during initialization.
func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logger, appVersion string, m *metrics.Metrics) (*OpNode, error) {
if err := cfg.Check(); err != nil {
return nil, err
......@@ -68,15 +84,18 @@ func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logge
appVersion: appVersion,
metrics: m,
rollupHalt: cfg.RollupHalt,
cancel: cfg.Cancel,
}
// not a context leak, gossipsub is closed with a context.
n.resourcesCtx, n.resourcesClose = context.WithCancel(context.Background())
n.postResourcesCtx, n.postResourcesClose = context.WithCancel(context.Background())
err := n.init(ctx, cfg, snapshotLog)
if err != nil {
log.Error("Error initializing the rollup node", "err", err)
// ensure we always close the node resources if we fail to initialize the node.
if closeErr := n.Close(); closeErr != nil {
if closeErr := n.Stop(ctx); closeErr != nil {
return nil, multierror.Append(err, closeErr)
}
return nil, err
......@@ -85,6 +104,7 @@ func New(ctx context.Context, cfg *Config, log log.Logger, snapshotLog log.Logge
}
func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) error {
n.log.Info("Initializing rollup node", "version", n.appVersion)
if err := n.initTracer(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the trace: %w", err)
}
......@@ -113,6 +133,10 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initMetricsServer(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the metrics server: %w", err)
}
n.metrics.RecordInfo(n.appVersion)
n.metrics.RecordUp()
n.initHeartbeat(cfg)
n.initPProf(cfg)
return nil
}
......@@ -206,16 +230,20 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error {
// initialize the runtime config before unblocking
if _, err := retry.Do(ctx, 5, retry.Fixed(time.Second*10), func() (eth.L1BlockRef, error) {
return reload(ctx)
ref, err := reload(ctx)
if errors.Is(err, errNodeHalt) { // don't retry on halt error
err = nil
}
return ref, err
}); err != nil {
return fmt.Errorf("failed to load runtime configuration repeatedly, last error: %w", err)
}
// start a background loop, to keep reloading it at the configured reload interval
reloader := func(ctx context.Context, reloadInterval time.Duration) bool {
reloader := func(ctx context.Context, reloadInterval time.Duration) {
if reloadInterval <= 0 {
n.log.Debug("not running runtime-config reloading background loop")
return false
return
}
ticker := time.NewTicker(reloadInterval)
defer ticker.Stop()
......@@ -225,17 +253,23 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error {
// If the reload fails, we will try again the next interval.
// Missing a runtime-config update is not critical, and we do not want to overwhelm the L1 RPC.
l1Head, err := reload(ctx)
switch err {
case errNodeHalt, nil:
n.log.Debug("reloaded runtime config", "l1_head", l1Head)
if err == errNodeHalt {
return true
if err != nil {
if errors.Is(err, errNodeHalt) {
n.halted = true
if n.cancel != nil { // node cancellation is always available when started as CLI app
n.cancel(errNodeHalt)
return
} else {
n.log.Debug("opted to halt, but cannot halt node", "l1_head", l1Head)
}
} else {
n.log.Warn("failed to reload runtime config", "err", err)
}
default:
n.log.Warn("failed to reload runtime config", "err", err)
} else {
n.log.Debug("reloaded runtime config", "l1_head", l1Head)
}
case <-ctx.Done():
return false
return
}
}
}
......@@ -243,13 +277,8 @@ func (n *OpNode) initRuntimeConfig(ctx context.Context, cfg *Config) error {
n.runtimeConfigReloaderDone = make(chan struct{})
// Manages the lifetime of reloader. In order to safely Close the OpNode
go func(ctx context.Context, reloadInterval time.Duration) {
halt := reloader(ctx, reloadInterval)
reloader(ctx, reloadInterval)
close(n.runtimeConfigReloaderDone)
if halt {
if err := n.Close(); err != nil {
n.log.Error("Failed to halt rollup", "err", err)
}
}
}(n.resourcesCtx, cfg.RuntimeConfigReloadInterval) // this keeps running after initialization
return nil
}
......@@ -319,13 +348,51 @@ func (n *OpNode) initMetricsServer(ctx context.Context, cfg *Config) error {
}
n.log.Info("starting metrics server", "addr", cfg.Metrics.ListenAddr, "port", cfg.Metrics.ListenPort)
go func() {
if err := n.metrics.Serve(ctx, cfg.Metrics.ListenAddr, cfg.Metrics.ListenPort); err != nil {
if err := n.metrics.Serve(n.postResourcesCtx, cfg.Metrics.ListenAddr, cfg.Metrics.ListenPort); err != nil {
log.Crit("error starting metrics server", "err", err)
}
}()
return nil
}
func (n *OpNode) initHeartbeat(cfg *Config) {
if !cfg.Heartbeat.Enabled {
return
}
var peerID string
if cfg.P2P.Disabled() {
peerID = "disabled"
} else {
peerID = n.P2P().Host().ID().String()
}
payload := &heartbeat.Payload{
Version: version.Version,
Meta: version.Meta,
Moniker: cfg.Heartbeat.Moniker,
PeerID: peerID,
ChainID: cfg.Rollup.L2ChainID.Uint64(),
}
go func(url string) {
if err := heartbeat.Beat(n.resourcesCtx, n.log, url, payload); err != nil {
log.Error("heartbeat goroutine crashed", "err", err)
}
}(cfg.Heartbeat.URL)
}
func (n *OpNode) initPProf(cfg *Config) {
if !cfg.Pprof.Enabled {
return
}
log.Info("pprof server started", "addr", net.JoinHostPort(cfg.Pprof.ListenAddr, strconv.Itoa(cfg.Pprof.ListenPort)))
go func(listenAddr string, listenPort int) {
if err := oppprof.ListenAndServe(n.resourcesCtx, listenAddr, listenPort); err != nil {
log.Error("error starting pprof", "err", err)
}
}(cfg.Pprof.ListenAddr, cfg.Pprof.ListenPort)
}
func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
if cfg.P2P != nil {
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics)
......@@ -369,6 +436,7 @@ func (n *OpNode) Start(ctx context.Context) error {
n.log.Info("Started L2-RPC sync service")
}
log.Info("Rollup node started")
return nil
}
......@@ -473,8 +541,9 @@ func (n *OpNode) RuntimeConfig() ReadonlyRuntimeConfig {
return n.runCfg
}
// Close closes all resources.
func (n *OpNode) Close() error {
// Stop stops the node and closes all resources.
// If the provided ctx is expired, the node will accelerate the stop where possible, but still fully close.
func (n *OpNode) Stop(ctx context.Context) error {
if n.closed.Load() {
return errors.New("node is already closed")
}
......@@ -537,10 +606,25 @@ func (n *OpNode) Close() error {
n.closed.Store(true)
}
if n.halted {
// if we had a halt upon initialization, idle for a while, with open metrics, to prevent a rapid restart-loop
tim := time.NewTimer(time.Minute * 5)
n.log.Warn("halted, idling to avoid immediate shutdown repeats")
defer tim.Stop()
select {
case <-tim.C:
case <-ctx.Done():
}
}
// Close metrics only after we are done idling
// TODO(7534): This should be refactored to a series of Close() calls to the respective resources.
n.postResourcesClose()
return result.ErrorOrNil()
}
func (n *OpNode) Closed() bool {
func (n *OpNode) Stopped() bool {
return n.closed.Load()
}
......
......@@ -5,6 +5,7 @@ import (
"context"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"sync"
"time"
......@@ -379,10 +380,22 @@ type GossipOut interface {
}
type publisher struct {
log log.Logger
cfg *rollup.Config
log log.Logger
cfg *rollup.Config
// p2pCancel cancels the downstream gossip event-handling functions, independent of the sources.
// A closed gossip event source (event handler or subscription) does not stop any open event iteration,
// thus we have to stop it ourselves this way.
p2pCancel context.CancelFunc
// blocks topic, main handle on block gossip
blocksTopic *pubsub.Topic
runCfg GossipRuntimeConfig
// block events handler, to be cancelled before closing the blocks topic.
blocksEvents *pubsub.TopicEventHandler
// block subscriptions, to be cancelled before closing blocks topic.
blocksSub *pubsub.Subscription
runCfg GossipRuntimeConfig
}
var _ GossipOut = (*publisher)(nil)
......@@ -419,10 +432,13 @@ func (p *publisher) PublishL2Payload(ctx context.Context, payload *eth.Execution
}
func (p *publisher) Close() error {
p.p2pCancel()
p.blocksEvents.Cancel()
p.blocksSub.Cancel()
return p.blocksTopic.Close()
}
func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) {
func JoinGossip(self peer.ID, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) {
val := guardGossipValidator(log, logValidationResult(self, "validated block", log, BuildBlocksValidator(log, cfg, runCfg)))
blocksTopicName := blocksTopicV1(cfg)
err := ps.RegisterTopicValidator(blocksTopicName,
......@@ -440,17 +456,28 @@ func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log
if err != nil {
return nil, fmt.Errorf("failed to create blocks gossip topic handler: %w", err)
}
p2pCtx, p2pCancel := context.WithCancel(context.Background())
go LogTopicEvents(p2pCtx, log.New("topic", "blocks"), blocksTopicEvents)
subscription, err := blocksTopic.Subscribe()
if err != nil {
p2pCancel()
err = errors.Join(err, blocksTopic.Close())
return nil, fmt.Errorf("failed to subscribe to blocks gossip topic: %w", err)
}
subscriber := MakeSubscriber(log, BlocksHandler(gossipIn.OnUnsafeL2Payload))
go subscriber(p2pCtx, subscription)
return &publisher{log: log, cfg: cfg, blocksTopic: blocksTopic, runCfg: runCfg}, nil
return &publisher{
log: log,
cfg: cfg,
blocksTopic: blocksTopic,
blocksEvents: blocksTopicEvents,
blocksSub: subscription,
p2pCancel: p2pCancel,
runCfg: runCfg,
}, nil
}
type TopicSubscriber func(ctx context.Context, sub *pubsub.Subscription)
......
......@@ -135,7 +135,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err)
}
n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
n.gsOut, err = JoinGossip(n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
if err != nil {
return fmt.Errorf("failed to join blocks gossip topic: %w", err)
}
......
package cliapp
import (
"context"
"errors"
"fmt"
"os"
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
type Lifecycle interface {
// Start starts a service. A service only fully starts once. Subsequent starts may return an error.
// A context is provided to end the service during setup.
// The caller should call Stop to clean up after failing to start.
Start(ctx context.Context) error
// Stop stops a service gracefully.
// The provided ctx can force an accelerated shutdown,
// but the node still has to completely stop.
Stop(ctx context.Context) error
// Stopped determines if the service was stopped with Stop.
Stopped() bool
}
// LifecycleAction instantiates a Lifecycle based on a CLI context.
// With the close argument a lifecycle may choose to shut itself down.
// A service may choose to idle, dump debug information or otherwise delay
// a shutdown when the Stop context is not expired.
type LifecycleAction func(ctx *cli.Context, close context.CancelCauseFunc) (Lifecycle, error)
// LifecycleCmd turns a LifecycleAction into an CLI action,
// by instrumenting it with CLI context and signal based termination.
// The app may continue to run post-processing until fully shutting down.
// The user can force an early shut-down during post-processing by sending a second interruption signal.
func LifecycleCmd(fn LifecycleAction) cli.ActionFunc {
return lifecycleCmd(fn, opio.BlockOnInterruptsContext)
}
type waitSignalFn func(ctx context.Context, signals ...os.Signal)
var interruptErr = errors.New("interrupt signal")
func lifecycleCmd(fn LifecycleAction, blockOnInterrupt waitSignalFn) cli.ActionFunc {
return func(ctx *cli.Context) error {
hostCtx := ctx.Context
appCtx, appCancel := context.WithCancelCause(hostCtx)
ctx.Context = appCtx
go func() {
blockOnInterrupt(appCtx)
appCancel(interruptErr)
}()
appLifecycle, err := fn(ctx, appCancel)
if err != nil {
// join errors to include context cause (nil errors are dropped)
return errors.Join(
fmt.Errorf("failed to setup: %w", err),
context.Cause(appCtx),
)
}
if err := appLifecycle.Start(appCtx); err != nil {
// join errors to include context cause (nil errors are dropped)
return errors.Join(
fmt.Errorf("failed to start: %w", err),
context.Cause(appCtx),
)
}
// wait for app to be closed (through interrupt, or app requests to be stopped by closing the context)
<-appCtx.Done()
// Graceful stop context.
// This allows the service to idle before shutdown, if halted. User may interrupt.
stopCtx, stopCancel := context.WithCancelCause(hostCtx)
go func() {
blockOnInterrupt(stopCtx)
stopCancel(interruptErr)
}()
// Execute graceful stop.
stopErr := appLifecycle.Stop(stopCtx)
stopCancel(nil)
// note: Stop implementation may choose to suppress a context error,
// if it handles it well (e.g. stop idling after a halt).
if stopErr != nil {
// join errors to include context cause (nil errors are dropped)
return errors.Join(
fmt.Errorf("failed to stop: %w", stopErr),
context.Cause(stopCtx),
)
}
return nil
}
}
package cliapp
import (
"context"
"errors"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
)
type fakeLifecycle struct {
startCh, stopCh chan error
stopped bool
selfClose context.CancelCauseFunc
}
func (f *fakeLifecycle) Start(ctx context.Context) error {
select {
case err := <-f.startCh:
f.stopped = true
return err
case <-ctx.Done():
f.stopped = true
return ctx.Err()
}
}
func (f *fakeLifecycle) Stop(ctx context.Context) error {
select {
case err := <-f.stopCh:
f.stopped = true
return err
case <-ctx.Done():
f.stopped = true
return ctx.Err()
}
}
func (f *fakeLifecycle) Stopped() bool {
return f.stopped
}
var _ Lifecycle = (*fakeLifecycle)(nil)
func TestLifecycleCmd(t *testing.T) {
appSetup := func(t *testing.T, shareApp **fakeLifecycle) (signalCh chan struct{}, initCh, startCh, stopCh, resultCh chan error) {
signalCh = make(chan struct{})
initCh = make(chan error)
startCh = make(chan error)
stopCh = make(chan error)
resultCh = make(chan error)
// mock an application that may fail at different stages of its lifecycle
mockAppFn := func(ctx *cli.Context, close context.CancelCauseFunc) (Lifecycle, error) {
select {
case <-ctx.Context.Done():
return nil, ctx.Context.Err()
case err := <-initCh:
if err != nil {
return nil, err
}
}
app := &fakeLifecycle{
startCh: startCh,
stopCh: stopCh,
stopped: false,
selfClose: close,
}
if shareApp != nil {
*shareApp = app
}
return app, nil
}
// puppeteer a system signal waiter with a test signal channel
fakeSignalWaiter := func(ctx context.Context, signals ...os.Signal) {
select {
case <-ctx.Done():
case <-signalCh:
}
}
// turn our mock app and system signal into a lifecycle-managed command
actionFn := lifecycleCmd(mockAppFn, fakeSignalWaiter)
// try to shut the test down after being locked more than a minute
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)
// create a fake CLI context to run our command with
cliCtx := &cli.Context{
Context: ctx,
App: &cli.App{
Name: "test-app",
Action: actionFn,
},
Command: nil,
}
// run the command async, it may block etc. The result will be sent back to the tester.
go func() {
result := actionFn(cliCtx)
require.NoError(t, ctx.Err(), "expecting test context to be alive after end still")
// collect the result
resultCh <- result
}()
t.Cleanup(func() {
close(signalCh)
close(initCh)
close(startCh)
close(stopCh)
close(resultCh)
})
return
}
t.Run("interrupt int", func(t *testing.T) {
signalCh, _, _, _, resultCh := appSetup(t, nil)
signalCh <- struct{}{}
res := <-resultCh
require.ErrorIs(t, res, interruptErr)
require.ErrorContains(t, res, "failed to setup")
})
t.Run("failed init", func(t *testing.T) {
_, initCh, _, _, resultCh := appSetup(t, nil)
v := errors.New("TEST INIT ERRROR")
initCh <- v
res := <-resultCh
require.ErrorIs(t, res, v)
require.ErrorContains(t, res, "failed to setup")
})
t.Run("interrupt start", func(t *testing.T) {
var app *fakeLifecycle
signalCh, initCh, _, _, resultCh := appSetup(t, &app)
initCh <- nil
require.False(t, app.Stopped())
signalCh <- struct{}{}
res := <-resultCh
require.ErrorIs(t, res, interruptErr)
require.ErrorContains(t, res, "failed to start")
require.True(t, app.Stopped())
})
t.Run("failed start", func(t *testing.T) {
var app *fakeLifecycle
_, initCh, startCh, _, resultCh := appSetup(t, &app)
initCh <- nil
require.False(t, app.Stopped())
v := errors.New("TEST START ERROR")
startCh <- v
res := <-resultCh
require.ErrorIs(t, res, v)
require.ErrorContains(t, res, "failed to start")
require.True(t, app.Stopped())
})
t.Run("graceful shutdown", func(t *testing.T) {
var app *fakeLifecycle
signalCh, initCh, startCh, stopCh, resultCh := appSetup(t, &app)
initCh <- nil
require.False(t, app.Stopped())
startCh <- nil
signalCh <- struct{}{} // interrupt, but at an expected time
stopCh <- nil // graceful shutdown after interrupt
require.NoError(t, <-resultCh, nil)
require.True(t, app.Stopped())
})
t.Run("interrupted shutdown", func(t *testing.T) {
var app *fakeLifecycle
signalCh, initCh, startCh, _, resultCh := appSetup(t, &app)
initCh <- nil
require.False(t, app.Stopped())
startCh <- nil
signalCh <- struct{}{} // start graceful shutdown
signalCh <- struct{}{} // interrupt before the shutdown process is allowed to complete
res := <-resultCh
require.ErrorIs(t, res, interruptErr)
require.ErrorContains(t, res, "failed to stop")
require.True(t, app.Stopped()) // still fully closes, interrupts only accelerate shutdown where possible.
})
t.Run("failed shutdown", func(t *testing.T) {
var app *fakeLifecycle
signalCh, initCh, startCh, stopCh, resultCh := appSetup(t, &app)
initCh <- nil
require.False(t, app.Stopped())
startCh <- nil
signalCh <- struct{}{} // start graceful shutdown
v := errors.New("TEST STOP ERROR")
stopCh <- v
res := <-resultCh
require.ErrorIs(t, res, v)
require.ErrorContains(t, res, "failed to stop")
require.True(t, app.Stopped())
})
t.Run("app self-close", func(t *testing.T) {
var app *fakeLifecycle
_, initCh, startCh, stopCh, resultCh := appSetup(t, &app)
initCh <- nil
require.False(t, app.Stopped())
startCh <- nil
v := errors.New("TEST SELF CLOSE ERROR")
app.selfClose(v)
stopCh <- nil
require.NoError(t, <-resultCh, "self-close is not considered an error")
require.True(t, app.Stopped())
})
}
package opio
import (
"context"
"os"
"os/signal"
"syscall"
......@@ -24,3 +25,19 @@ func BlockOnInterrupts(signals ...os.Signal) {
signal.Notify(interruptChannel, signals...)
<-interruptChannel
}
// BlockOnInterruptsContext blocks until a SIGTERM is received.
// Passing in signals will override the default signals.
// The function will stop blocking if the context is closed.
func BlockOnInterruptsContext(ctx context.Context, signals ...os.Signal) {
if len(signals) == 0 {
signals = DefaultInterruptSignals
}
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, signals...)
select {
case <-interruptChannel:
case <-ctx.Done():
signal.Stop(interruptChannel)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment