Commit b58f6b44 authored by protolambda's avatar protolambda

op-service: interrupt handling improvement

parent 3be62ed7
...@@ -4,9 +4,10 @@ import ( ...@@ -4,9 +4,10 @@ import (
"context" "context"
"os" "os"
"github.com/ethereum/go-ethereum/log"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio" "github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/log"
) )
var ( var (
...@@ -15,16 +16,10 @@ var ( ...@@ -15,16 +16,10 @@ var (
) )
func main() { func main() {
// This is the most root context, used to propagate
// cancellations to all spawned application-level goroutines
ctx, cancel := context.WithCancel(context.Background())
go func() {
opio.BlockOnInterrupts()
cancel()
}()
oplog.SetupDefaults() oplog.SetupDefaults()
app := newCli(GitCommit, GitDate) app := newCli(GitCommit, GitDate)
// sub-commands set up their individual interrupt lifecycles, which can block on the given interrupt as needed.
ctx := opio.WithInterruptBlocker(context.Background())
if err := app.RunContext(ctx, os.Args); err != nil { if err := app.RunContext(ctx, os.Args); err != nil {
log.Error("application failed", "err", err) log.Error("application failed", "err", err)
os.Exit(1) os.Exit(1)
......
package main package main
import ( import (
"context"
"os" "os"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-batcher/batcher" "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp" "github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics/doc" "github.com/ethereum-optimism/optimism/op-service/metrics/doc"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -38,7 +40,8 @@ func main() { ...@@ -38,7 +40,8 @@ func main() {
}, },
} }
err := app.Run(os.Args) ctx := opio.WithInterruptBlocker(context.Background())
err := app.RunContext(ctx, os.Args)
if err != nil { if err != nil {
log.Crit("Application failed", "message", err) log.Crit("Application failed", "message", err)
} }
......
...@@ -21,6 +21,7 @@ import ( ...@@ -21,6 +21,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/cliapp" "github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics/doc" "github.com/ethereum-optimism/optimism/op-service/metrics/doc"
"github.com/ethereum-optimism/optimism/op-service/opio"
) )
var ( var (
...@@ -58,7 +59,8 @@ func main() { ...@@ -58,7 +59,8 @@ func main() {
}, },
} }
err := app.Run(os.Args) ctx := opio.WithInterruptBlocker(context.Background())
err := app.RunContext(ctx, os.Args)
if err != nil { if err != nil {
log.Crit("Application failed", "message", err) log.Crit("Application failed", "message", err)
} }
......
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"os"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
...@@ -30,21 +29,22 @@ type Lifecycle interface { ...@@ -30,21 +29,22 @@ type Lifecycle interface {
// a shutdown when the Stop context is not expired. // a shutdown when the Stop context is not expired.
type LifecycleAction func(ctx *cli.Context, close context.CancelCauseFunc) (Lifecycle, error) type LifecycleAction func(ctx *cli.Context, close context.CancelCauseFunc) (Lifecycle, error)
var interruptErr = errors.New("interrupt signal")
// LifecycleCmd turns a LifecycleAction into an CLI action, // LifecycleCmd turns a LifecycleAction into an CLI action,
// by instrumenting it with CLI context and signal based termination. // by instrumenting it with CLI context and signal based termination.
// The signals are catched with the opio.BlockFn attached to the context, if any.
// If no block function is provided, it adds default interrupt handling.
// The app may continue to run post-processing until fully shutting down. // The app may continue to run post-processing until fully shutting down.
// The user can force an early shut-down during post-processing by sending a second interruption signal. // The user can force an early shut-down during post-processing by sending a second interruption signal.
func LifecycleCmd(fn LifecycleAction) cli.ActionFunc { func LifecycleCmd(fn LifecycleAction) cli.ActionFunc {
return lifecycleCmd(fn, opio.BlockOnInterruptsContext)
}
type waitSignalFn func(ctx context.Context, signals ...os.Signal)
var interruptErr = errors.New("interrupt signal")
func lifecycleCmd(fn LifecycleAction, blockOnInterrupt waitSignalFn) cli.ActionFunc {
return func(ctx *cli.Context) error { return func(ctx *cli.Context) error {
hostCtx := ctx.Context hostCtx := ctx.Context
blockOnInterrupt := opio.BlockerFromContext(hostCtx)
if blockOnInterrupt == nil { // add default interrupt blocker to context if none is set.
hostCtx = opio.WithInterruptBlocker(hostCtx)
blockOnInterrupt = opio.BlockerFromContext(hostCtx)
}
appCtx, appCancel := context.WithCancelCause(hostCtx) appCtx, appCancel := context.WithCancelCause(hostCtx)
ctx.Context = appCtx ctx.Context = appCtx
......
...@@ -3,12 +3,13 @@ package cliapp ...@@ -3,12 +3,13 @@ package cliapp
import ( import (
"context" "context"
"errors" "errors"
"os"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-service/opio"
) )
type fakeLifecycle struct { type fakeLifecycle struct {
...@@ -77,19 +78,19 @@ func TestLifecycleCmd(t *testing.T) { ...@@ -77,19 +78,19 @@ func TestLifecycleCmd(t *testing.T) {
return app, nil return app, nil
} }
// puppeteer a system signal waiter with a test signal channel
fakeSignalWaiter := func(ctx context.Context, signals ...os.Signal) {
select {
case <-ctx.Done():
case <-signalCh:
}
}
// turn our mock app and system signal into a lifecycle-managed command // turn our mock app and system signal into a lifecycle-managed command
actionFn := lifecycleCmd(mockAppFn, fakeSignalWaiter) actionFn := LifecycleCmd(mockAppFn)
// try to shut the test down after being locked more than a minute // try to shut the test down after being locked more than a minute
ctx, cancel := context.WithTimeout(context.Background(), time.Minute) ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
// puppeteer system signal interrupts by hooking up the test signal channel as "blocker" for the app to use.
ctx = opio.WithBlocker(ctx, func(ctx context.Context) {
select {
case <-ctx.Done():
case <-signalCh:
}
})
t.Cleanup(cancel) t.Cleanup(cancel)
// create a fake CLI context to run our command with // create a fake CLI context to run our command with
......
...@@ -41,3 +41,74 @@ func BlockOnInterruptsContext(ctx context.Context, signals ...os.Signal) { ...@@ -41,3 +41,74 @@ func BlockOnInterruptsContext(ctx context.Context, signals ...os.Signal) {
signal.Stop(interruptChannel) signal.Stop(interruptChannel)
} }
} }
type interruptContextKeyType struct{}
var blockerContextKey = interruptContextKeyType{}
type interruptCatcher struct {
incoming chan os.Signal
}
// Block blocks until either an interrupt signal is received, or the context is cancelled.
// No error is returned on interrupt.
func (c *interruptCatcher) Block(ctx context.Context) {
select {
case <-c.incoming:
case <-ctx.Done():
}
}
// WithInterruptBlocker attaches an interrupt handler to the context,
// which continues to receive signals after every block.
// This helps functions block on individual consecutive interrupts.
func WithInterruptBlocker(ctx context.Context) context.Context {
if ctx.Value(blockerContextKey) != nil { // already has an interrupt handler
return ctx
}
catcher := &interruptCatcher{
incoming: make(chan os.Signal, 10),
}
signal.Notify(catcher.incoming, DefaultInterruptSignals...)
return context.WithValue(ctx, blockerContextKey, BlockFn(catcher.Block))
}
// WithBlocker overrides the interrupt blocker value,
// e.g. to insert a block-function for testing CLI shutdown without actual process signals.
func WithBlocker(ctx context.Context, fn BlockFn) context.Context {
return context.WithValue(ctx, blockerContextKey, fn)
}
// BlockFn simply blocks until the implementation of the blocker interrupts it, or till the given context is cancelled.
type BlockFn func(ctx context.Context)
// BlockerFromContext returns a BlockFn that blocks on interrupts when called.
func BlockerFromContext(ctx context.Context) BlockFn {
v := ctx.Value(blockerContextKey)
if v == nil {
return nil
}
return v.(BlockFn)
}
// CancelOnInterrupt cancels the given context on interrupt.
// If a BlockFn is attached to the context, this is used as interrupt-blocking.
// If not, then the context blocks on a manually handled interrupt signal.
func CancelOnInterrupt(ctx context.Context) context.Context {
inner, cancel := context.WithCancel(ctx)
blockOnInterrupt := BlockerFromContext(ctx)
if blockOnInterrupt == nil {
blockOnInterrupt = func(ctx context.Context) {
BlockOnInterruptsContext(ctx) // default signals
}
}
go func() {
blockOnInterrupt(ctx)
cancel()
}()
return inner
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment