Commit d520441b authored by Matt Joiner's avatar Matt Joiner Committed by GitHub

Use context interrupts consistently in more places (#11511)

* Use context interrupts consistently in more places

* Fix CI lint errors

(cherry picked from commit 0410b7e448e063fa9e30295fbe423ff1c0171d12)

* op-service/ctxinterrupt: address review comments

---------
Co-authored-by: default avatarprotolambda <proto@protolambda.com>
parent db8154b0
......@@ -5,9 +5,8 @@ import (
"errors"
"fmt"
"os"
"os/signal"
"syscall"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/cannon/cmd"
......@@ -23,18 +22,7 @@ func main() {
cmd.WitnessCommand,
cmd.RunCommand,
}
ctx, cancel := context.WithCancel(context.Background())
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
go func() {
for {
<-c
cancel()
fmt.Println("\r\nExiting...")
}
}()
ctx := ctxinterrupt.WithSignalWaiterMain(context.Background())
err := app.RunContext(ctx, os.Args)
if err != nil {
if errors.Is(err, ctx.Err()) {
......
......@@ -6,8 +6,8 @@ import (
"github.com/urfave/cli/v2"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
func StartDAServer(cliCtx *cli.Context) error {
......@@ -55,7 +55,5 @@ func StartDAServer(cliCtx *cli.Context) error {
}
}()
opio.BlockOnInterrupts()
return nil
return ctxinterrupt.Wait(cliCtx.Context)
}
......@@ -9,8 +9,8 @@ import (
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
var Version = "v0.0.1"
......@@ -26,7 +26,7 @@ func main() {
app.Description = "Service for storing AltDA inputs"
app.Action = StartDAServer
ctx := opio.WithInterruptBlocker(context.Background())
ctx := ctxinterrupt.WithSignalWaiterMain(context.Background())
err := app.RunContext(ctx, os.Args)
if err != nil {
log.Crit("Application failed", "message", err)
......
......@@ -11,9 +11,9 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics/doc"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/log"
)
......@@ -40,7 +40,7 @@ func main() {
},
}
ctx := opio.WithInterruptBlocker(context.Background())
ctx := ctxinterrupt.WithSignalWaiterMain(context.Background())
err := app.RunContext(ctx, os.Args)
if err != nil {
log.Crit("Application failed", "message", err)
......
......@@ -17,10 +17,10 @@ import (
"github.com/ethereum-optimism/optimism/op-node/p2p"
p2pcli "github.com/ethereum-optimism/optimism/op-node/p2p/cli"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
"github.com/ethereum-optimism/optimism/op-service/eth"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum-optimism/optimism/op-service/opio"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
)
......@@ -116,9 +116,7 @@ func Main(cliCtx *cli.Context) error {
m.RecordUp()
}
opio.BlockOnInterrupts()
return nil
return ctxinterrupt.Wait(ctx)
}
// validateConfig ensures the minimal config required to run a bootnode
......
......@@ -32,10 +32,10 @@ import (
op_service "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum-optimism/optimism/op-service/predeploys"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum-optimism/optimism/op-service/sources"
......@@ -168,7 +168,7 @@ func makeCommandAction(fn CheckAction) func(c *cli.Context) error {
logCfg := oplog.ReadCLIConfig(c)
logger := oplog.NewLogger(c.App.Writer, logCfg)
c.Context = opio.CancelOnInterrupt(c.Context)
c.Context = ctxinterrupt.WithCancelOnInterrupt(c.Context)
l1Cl, err := ethclient.DialContext(c.Context, c.String(EndpointL1.Name))
if err != nil {
return fmt.Errorf("failed to dial L1 RPC: %w", err)
......
......@@ -9,8 +9,8 @@ import (
"github.com/ethereum-optimism/optimism/op-chain-ops/cmd/check-fjord/checks"
op_service "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/urfave/cli/v2"
......@@ -54,7 +54,7 @@ func makeCommandAction(fn CheckAction) func(c *cli.Context) error {
logCfg := oplog.ReadCLIConfig(c)
logger := oplog.NewLogger(c.App.Writer, logCfg)
c.Context = opio.CancelOnInterrupt(c.Context)
c.Context = ctxinterrupt.WithCancelOnInterrupt(c.Context)
l2Cl, err := ethclient.DialContext(c.Context, c.String(EndpointL2.Name))
if err != nil {
return fmt.Errorf("failed to dial L2 RPC: %w", err)
......
......@@ -33,8 +33,8 @@ import (
op_service "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
var EnvPrefix = "OP_SIMULATE"
......@@ -82,7 +82,7 @@ func main() {
}
func mainAction(c *cli.Context) error {
ctx := opio.CancelOnInterrupt(c.Context)
ctx := ctxinterrupt.WithCancelOnInterrupt(c.Context)
logCfg := oplog.ReadCLIConfig(c)
logger := oplog.NewLogger(c.App.Writer, logCfg)
......
......@@ -15,8 +15,8 @@ import (
"github.com/ethereum-optimism/optimism/op-challenger/version"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
var (
......@@ -29,7 +29,7 @@ var VersionWithMeta = opservice.FormatVersion(version.Version, GitCommit, GitDat
func main() {
args := os.Args
ctx := opio.WithInterruptBlocker(context.Background())
ctx := ctxinterrupt.WithSignalWaiterMain(context.Background())
if err := run(ctx, args, func(ctx context.Context, l log.Logger, config *config.Config) (cliapp.Lifecycle, error) {
return challenger.Main(ctx, l, config, metrics.NewMetrics())
}); err != nil {
......
......@@ -7,8 +7,8 @@ import (
"github.com/ethereum-optimism/optimism/op-challenger/flags"
contractMetrics "github.com/ethereum-optimism/optimism/op-challenger/game/fault/contracts/metrics"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
......@@ -20,7 +20,7 @@ type ContractCreator[T any] func(context.Context, contractMetrics.ContractMetric
func Interruptible(action cli.ActionFunc) cli.ActionFunc {
return func(ctx *cli.Context) error {
ctx.Context = opio.CancelOnInterrupt(ctx.Context)
ctx.Context = ctxinterrupt.WithCancelOnInterrupt(ctx.Context)
return action(ctx)
}
}
......
......@@ -12,8 +12,8 @@ import (
"github.com/ethereum-optimism/optimism/op-conductor/flags"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
var (
......@@ -34,7 +34,7 @@ func main() {
app.Action = cliapp.LifecycleCmd(OpConductorMain)
app.Commands = []*cli.Command{}
ctx := opio.WithInterruptBlocker(context.Background())
ctx := ctxinterrupt.WithSignalWaiterMain(context.Background())
err := app.RunContext(ctx, os.Args)
if err != nil {
log.Crit("Application failed", "message", err)
......
......@@ -29,7 +29,7 @@ func NewUnsafeHeadTracker(log log.Logger) *unsafeHeadTracker {
// Apply implements raft.FSM, it applies the latest change (latest unsafe head payload) to FSM.
func (t *unsafeHeadTracker) Apply(l *raft.Log) interface{} {
if l.Data == nil || len(l.Data) == 0 {
if len(l.Data) == 0 {
return fmt.Errorf("log data is nil or empty")
}
......
......@@ -14,8 +14,8 @@ import (
"github.com/ethereum-optimism/optimism/op-dispute-mon/version"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
var (
......@@ -28,7 +28,7 @@ var VersionWithMeta = opservice.FormatVersion(version.Version, GitCommit, GitDat
func main() {
args := os.Args
ctx := opio.WithInterruptBlocker(context.Background())
ctx := ctxinterrupt.WithSignalWaiterMain(context.Background())
if err := run(ctx, args, monitor.Main); err != nil {
log.Crit("Application failed", "err", err)
}
......
......@@ -73,7 +73,8 @@ func run(configPath string) error {
fmt.Printf("================== op-geth shim awaiting termination ==========================\n")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(sigs)
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
select {
case <-sigs:
......
......@@ -20,9 +20,9 @@ import (
"github.com/ethereum-optimism/optimism/op-node/version"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics/doc"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
var (
......@@ -64,7 +64,7 @@ func main() {
},
}
ctx := opio.WithInterruptBlocker(context.Background())
ctx := ctxinterrupt.WithSignalWaiterMain(context.Background())
err := app.RunContext(ctx, os.Args)
if err != nil {
log.Crit("Application failed", "message", err)
......
package main
import (
"context"
"os"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/urfave/cli/v2"
......@@ -38,7 +41,8 @@ func main() {
},
}
err := app.Run(os.Args)
ctx := ctxinterrupt.WithSignalWaiterMain(context.Background())
err := app.RunContext(ctx, os.Args)
if err != nil {
log.Crit("Application failed", "message", err)
}
......
......@@ -7,7 +7,7 @@ import (
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
)
type Lifecycle interface {
......@@ -29,36 +29,26 @@ type Lifecycle interface {
// a shutdown when the Stop context is not expired.
type LifecycleAction func(ctx *cli.Context, close context.CancelCauseFunc) (Lifecycle, error)
var interruptErr = errors.New("interrupt signal")
// LifecycleCmd turns a LifecycleAction into an CLI action,
// by instrumenting it with CLI context and signal based termination.
// The signals are caught with the opio.BlockFn attached to the context, if any.
// If no block function is provided, it adds default interrupt handling.
// by instrumenting it with CLI context and signal based cancellation.
// The signals are caught with the ctxinterrupt.waiter attached to the context, or default
// interrupt signal handling if not already provided.
// The app may continue to run post-processing until fully shutting down.
// The user can force an early shut-down during post-processing by sending a second interruption signal.
func LifecycleCmd(fn LifecycleAction) cli.ActionFunc {
return func(ctx *cli.Context) error {
hostCtx := ctx.Context
blockOnInterrupt := opio.BlockerFromContext(hostCtx)
if blockOnInterrupt == nil { // add default interrupt blocker to context if none is set.
hostCtx = opio.WithInterruptBlocker(hostCtx)
blockOnInterrupt = opio.BlockerFromContext(hostCtx)
}
appCtx, appCancel := context.WithCancelCause(hostCtx)
hostCtx, stop := ctxinterrupt.WithSignalWaiter(ctx.Context)
defer stop()
appCtx, appCancel := context.WithCancelCause(ctxinterrupt.WithCancelOnInterrupt(hostCtx))
// This is updated so the fn callback cli.Context uses the appCtx we just made.
ctx.Context = appCtx
go func() {
blockOnInterrupt(appCtx)
appCancel(interruptErr)
}()
appLifecycle, err := fn(ctx, appCancel)
if err != nil {
// join errors to include context cause (nil errors are dropped)
return errors.Join(
fmt.Errorf("failed to setup: %w", err),
context.Cause(appCtx),
context.Cause(ctx.Context),
)
}
......@@ -75,15 +65,10 @@ func LifecycleCmd(fn LifecycleAction) cli.ActionFunc {
// Graceful stop context.
// This allows the service to idle before shutdown, if halted. User may interrupt.
stopCtx, stopCancel := context.WithCancelCause(hostCtx)
go func() {
blockOnInterrupt(stopCtx)
stopCancel(interruptErr)
}()
stopCtx := ctxinterrupt.WithCancelOnInterrupt(hostCtx)
// Execute graceful stop.
stopErr := appLifecycle.Stop(stopCtx)
stopCancel(nil)
// note: Stop implementation may choose to suppress a context error,
// if it handles it well (e.g. stop idling after a halt).
if stopErr != nil {
......
......@@ -9,9 +9,11 @@ import (
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
)
var mockInterruptErr = errors.New("mock interrupt")
type fakeLifecycle struct {
startCh, stopCh chan error
stopped bool
......@@ -85,11 +87,14 @@ func TestLifecycleCmd(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
// puppeteer system signal interrupts by hooking up the test signal channel as "blocker" for the app to use.
ctx = opio.WithBlocker(ctx, func(ctx context.Context) {
ctx = ctxinterrupt.WithWaiterFunc(ctx, func(ctx context.Context) (interrupt, ctxErr error) {
select {
case <-ctx.Done():
ctxErr = context.Cause(ctx)
case <-signalCh:
interrupt = mockInterruptErr
}
return
})
t.Cleanup(cancel)
......@@ -124,7 +129,7 @@ func TestLifecycleCmd(t *testing.T) {
signalCh, _, _, _, resultCh, _ := appSetup(t)
signalCh <- struct{}{}
res := <-resultCh
require.ErrorIs(t, res, interruptErr)
require.ErrorIs(t, res, mockInterruptErr)
require.ErrorContains(t, res, "failed to setup")
})
t.Run("failed init", func(t *testing.T) {
......@@ -142,7 +147,7 @@ func TestLifecycleCmd(t *testing.T) {
require.False(t, app.Stopped())
signalCh <- struct{}{}
res := <-resultCh
require.ErrorIs(t, res, interruptErr)
require.ErrorIs(t, res, mockInterruptErr)
require.ErrorContains(t, res, "failed to start")
require.True(t, app.Stopped())
})
......@@ -178,7 +183,7 @@ func TestLifecycleCmd(t *testing.T) {
signalCh <- struct{}{} // start graceful shutdown
signalCh <- struct{}{} // interrupt before the shutdown process is allowed to complete
res := <-resultCh
require.ErrorIs(t, res, interruptErr)
require.ErrorIs(t, res, mockInterruptErr)
require.ErrorContains(t, res, "failed to stop")
require.True(t, app.Stopped()) // still fully closes, interrupts only accelerate shutdown where possible.
})
......
package ctxinterrupt
import (
"context"
)
// Newtyping empty struct prevents collision with other empty struct keys in the Context.
type interruptWaiterContextKeyType struct{}
var waiterContextKey = interruptWaiterContextKeyType{}
// WithInterruptWaiter overrides the interrupt waiter value, e.g. to insert a function that mocks
// interrupt signals for testing CLI shutdown without actual process signals.
func WithWaiterFunc(ctx context.Context, fn WaiterFunc) context.Context {
return withInterruptWaiter(ctx, fn)
}
func withInterruptWaiter(ctx context.Context, value waiter) context.Context {
return context.WithValue(ctx, waiterContextKey, value)
}
// contextInterruptWaiter returns a interruptWaiter that blocks on interrupts when called.
func contextInterruptWaiter(ctx context.Context) waiter {
v := ctx.Value(waiterContextKey)
if v == nil {
return nil
}
return v.(waiter)
}
package ctxinterrupt
import (
"context"
"testing"
"github.com/stretchr/testify/require"
)
func TestContextKeyIsUnique(t *testing.T) {
ass := require.New(t)
ctx := context.Background()
ass.Nil(ctx.Value(waiterContextKey))
ctx = context.WithValue(ctx, waiterContextKey, 1)
ass.Equal(ctx.Value(waiterContextKey), 1)
ctx = context.WithValue(ctx, waiterContextKey, 2)
ass.Equal(ctx.Value(waiterContextKey), 2)
ass.Nil(ctx.Value(struct{}{}))
}
// Implements interrupts: events that normally signal intent to cancel a Context, but may be
// repeated to encourage closure of new Contexts used to clean up resources.
package ctxinterrupt
package ctxinterrupt
import (
"context"
)
// Wait blocks until an interrupt is received, defaulting to interrupting on the default
// signals if no interrupt blocker is present in the Context. Returns nil if an interrupt occurs,
// else the Context error when it's done.
func Wait(ctx context.Context) error {
iw := contextInterruptWaiter(ctx)
if iw == nil {
catcher := newSignalWaiter()
defer catcher.Stop()
iw = catcher
}
return iw.waitForInterrupt(ctx).CtxError
}
// WithSignalWaiter attaches an interrupt signal handler to the context which continues to receive
// signals after every wait, and also prevents the interrupt signals being handled before we're
// ready to wait for them. This helps functions wait on individual consecutive interrupts.
func WithSignalWaiter(ctx context.Context) (_ context.Context, stop func()) {
if ctx.Value(waiterContextKey) != nil { // already has an interrupt waiter
return ctx, func() {}
}
catcher := newSignalWaiter()
return withInterruptWaiter(ctx, catcher), catcher.Stop
}
// WithSignalWaiterMain returns a Context with a signal interrupt blocker and leaks the destructor. Intended for use in
// main functions where we exit right after using the returned context anyway.
func WithSignalWaiterMain(ctx context.Context) context.Context {
ctx, _ = WithSignalWaiter(ctx)
return ctx
}
// WithCancelOnInterrupt returns a Context that is cancelled when Wait returns on the waiter in ctx.
// If there's no waiter, the default interrupt signals are used: In this case the signal hooking is
// not stopped until the original ctx is cancelled.
func WithCancelOnInterrupt(ctx context.Context) context.Context {
interruptWaiter := contextInterruptWaiter(ctx)
ctx, cancel := context.WithCancelCause(ctx)
stop := func() {}
if interruptWaiter == nil {
catcher := newSignalWaiter()
stop = catcher.Stop
interruptWaiter = catcher
}
go func() {
defer stop()
cancel(interruptWaiter.waitForInterrupt(ctx).Cause())
}()
return ctx
}
package ctxinterrupt
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
)
// defaultSignals is a set of default interrupt signals.
var defaultSignals = []os.Signal{
// Let's not catch SIGQUIT as it's expected to terminate with a stack trace in Go. os.Kill
// should not/cannot be caught on most systems.
os.Interrupt,
syscall.SIGTERM,
}
type signalWaiter struct {
incoming chan os.Signal
}
func newSignalWaiter() signalWaiter {
catcher := signalWaiter{
// Buffer, in case we are slow to act on older signals,
// but still want to handle repeat-signals as special case (e.g. to force shutdown)
incoming: make(chan os.Signal, 10),
}
signal.Notify(catcher.incoming, defaultSignals...)
return catcher
}
func (me signalWaiter) Stop() {
signal.Stop(me.incoming)
}
// Block blocks until either an interrupt signal is received, or the context is cancelled.
// No error is returned on interrupt.
func (me signalWaiter) waitForInterrupt(ctx context.Context) waitResult {
select {
case signalValue, ok := <-me.incoming:
if !ok {
// Signal channels are not closed.
panic("signal channel closed")
}
return waitResult{Interrupt: fmt.Errorf("received interrupt signal %v", signalValue)}
case <-ctx.Done():
return waitResult{CtxError: context.Cause(ctx)}
}
}
package ctxinterrupt
import (
"context"
"fmt"
)
// waiter describes a value that can wait for interrupts and context cancellation at the same time.
type waiter interface {
waitForInterrupt(ctx context.Context) waitResult
}
// Waits for an interrupt or context cancellation. ctxErr should be the context.Cause of ctx when it
// is done. interrupt is only inspected if ctxErr is nil, and is not required to be set.
type WaiterFunc func(ctx context.Context) (interrupt, ctxErr error)
func (me WaiterFunc) waitForInterrupt(ctx context.Context) (res waitResult) {
res.Interrupt, res.CtxError = me(ctx)
return
}
// Either CtxError is not nil and is set to the context error cause, or the wait was interrupted.
type waitResult struct {
// Not required to be non-nil on an interrupt.
Interrupt error
// Maybe set this using context.Cause.
CtxError error
}
func (me waitResult) Cause() error {
if me.CtxError != nil {
return me.CtxError
}
if me.Interrupt != nil {
return fmt.Errorf("interrupted: %w", me.Interrupt)
}
return nil
}
package opio
import (
"context"
"os"
"os/signal"
"syscall"
)
// DefaultInterruptSignals is a set of default interrupt signals.
var DefaultInterruptSignals = []os.Signal{
os.Interrupt,
os.Kill,
syscall.SIGTERM,
syscall.SIGQUIT,
}
// BlockOnInterrupts blocks until a SIGTERM is received.
// Passing in signals will override the default signals.
func BlockOnInterrupts(signals ...os.Signal) {
if len(signals) == 0 {
signals = DefaultInterruptSignals
}
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, signals...)
<-interruptChannel
}
// BlockOnInterruptsContext blocks until a SIGTERM is received.
// Passing in signals will override the default signals.
// The function will stop blocking if the context is closed.
func BlockOnInterruptsContext(ctx context.Context, signals ...os.Signal) {
if len(signals) == 0 {
signals = DefaultInterruptSignals
}
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, signals...)
select {
case <-interruptChannel:
case <-ctx.Done():
signal.Stop(interruptChannel)
}
}
type interruptContextKeyType struct{}
var blockerContextKey = interruptContextKeyType{}
type interruptCatcher struct {
incoming chan os.Signal
}
// Block blocks until either an interrupt signal is received, or the context is cancelled.
// No error is returned on interrupt.
func (c *interruptCatcher) Block(ctx context.Context) {
select {
case <-c.incoming:
case <-ctx.Done():
}
}
// WithInterruptBlocker attaches an interrupt handler to the context,
// which continues to receive signals after every block.
// This helps functions block on individual consecutive interrupts.
func WithInterruptBlocker(ctx context.Context) context.Context {
if ctx.Value(blockerContextKey) != nil { // already has an interrupt handler
return ctx
}
catcher := &interruptCatcher{
incoming: make(chan os.Signal, 10),
}
signal.Notify(catcher.incoming, DefaultInterruptSignals...)
return context.WithValue(ctx, blockerContextKey, BlockFn(catcher.Block))
}
// WithBlocker overrides the interrupt blocker value,
// e.g. to insert a block-function for testing CLI shutdown without actual process signals.
func WithBlocker(ctx context.Context, fn BlockFn) context.Context {
return context.WithValue(ctx, blockerContextKey, fn)
}
// BlockFn simply blocks until the implementation of the blocker interrupts it, or till the given context is cancelled.
type BlockFn func(ctx context.Context)
// BlockerFromContext returns a BlockFn that blocks on interrupts when called.
func BlockerFromContext(ctx context.Context) BlockFn {
v := ctx.Value(blockerContextKey)
if v == nil {
return nil
}
return v.(BlockFn)
}
// CancelOnInterrupt cancels the given context on interrupt.
// If a BlockFn is attached to the context, this is used as interrupt-blocking.
// If not, then the context blocks on a manually handled interrupt signal.
func CancelOnInterrupt(ctx context.Context) context.Context {
inner, cancel := context.WithCancel(ctx)
blockOnInterrupt := BlockerFromContext(ctx)
if blockOnInterrupt == nil {
blockOnInterrupt = func(ctx context.Context) {
BlockOnInterruptsContext(ctx) // default signals
}
}
go func() {
blockOnInterrupt(ctx)
cancel()
}()
return inner
}
......@@ -5,13 +5,13 @@ import (
"errors"
"fmt"
"os"
"os/signal"
"path/filepath"
"reflect"
"strings"
"syscall"
"time"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"
......@@ -86,37 +86,27 @@ func ParseAddress(address string) (common.Address, error) {
return common.Address{}, fmt.Errorf("invalid address: %v", address)
}
// CloseAction runs the function in the background, until it finishes or until it is closed by the user with an interrupt.
func CloseAction(fn func(ctx context.Context, shutdown <-chan struct{}) error) error {
stopped := make(chan error, 1)
shutdown := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(context.Background())
// CloseAction runs the function in the background, until it finishes or until it is closed by the
// user with an interrupt.
func CloseAction(ctx context.Context, fn func(ctx context.Context) error) error {
ctx, stop := ctxinterrupt.WithSignalWaiter(ctx)
defer stop()
finished := make(chan error, 1)
go func() {
stopped <- fn(ctx, shutdown)
finished <- fn(ctx)
}()
doneCh := make(chan os.Signal, 1)
signal.Notify(doneCh, []os.Signal{
os.Interrupt,
os.Kill,
syscall.SIGTERM,
syscall.SIGQUIT,
}...)
select {
case <-doneCh:
cancel()
shutdown <- struct{}{}
case <-ctx.Done():
// Stop catching interrupts.
stop()
select {
case err := <-stopped:
case err := <-finished:
return err
case <-time.After(time.Second * 10):
return errors.New("command action is unresponsive for more than 10 seconds... shutting down")
}
case err := <-stopped:
cancel()
case err := <-finished:
return err
}
}
......
......@@ -11,9 +11,9 @@ import (
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/ctxinterrupt"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics/doc"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum-optimism/optimism/op-supervisor/flags"
"github.com/ethereum-optimism/optimism/op-supervisor/metrics"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor"
......@@ -26,7 +26,7 @@ var (
)
func main() {
ctx := opio.WithInterruptBlocker(context.Background())
ctx := ctxinterrupt.WithSignalWaiterMain(context.Background())
err := run(ctx, os.Args, fromConfig)
if err != nil {
log.Crit("Application failed", "message", err)
......
......@@ -504,7 +504,7 @@ var (
metricsCfg := opmetrics.ReadCLIConfig(ctx)
return opservice.CloseAction(func(ctx context.Context, shutdown <-chan struct{}) error {
return opservice.CloseAction(ctx.Context, func(ctx context.Context) error {
registry := opmetrics.NewRegistry()
metrics := engine.NewMetrics("wheel", registry)
if metricsCfg.Enabled {
......@@ -519,7 +519,7 @@ var (
}
}()
}
return engine.Auto(ctx, metrics, client, l, shutdown, settings)
return engine.Auto(ctx, metrics, client, l, settings)
})
}),
}
......
......@@ -189,7 +189,13 @@ func newPayloadAttributes(evp sources.EngineVersionProvider, timestamp uint64, p
return pa
}
func Auto(ctx context.Context, metrics Metricer, client *sources.EngineAPIClient, log log.Logger, shutdown <-chan struct{}, settings *BlockBuildingSettings) error {
func Auto(
ctx context.Context,
metrics Metricer,
client *sources.EngineAPIClient,
log log.Logger,
settings *BlockBuildingSettings,
) error {
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
......@@ -197,9 +203,6 @@ func Auto(ctx context.Context, metrics Metricer, client *sources.EngineAPIClient
var buildErr error
for {
select {
case <-shutdown:
log.Info("shutting down")
return nil
case <-ctx.Done():
log.Info("context closed", "err", ctx.Err())
return ctx.Err()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment