Commit 1e157a72 authored by protolambda's avatar protolambda

op-challenger: cleanup service lifecycle, improve subscription ctx usage

parent 2e35a8f6
......@@ -312,6 +312,10 @@ func (bs *BatcherService) Stop(ctx context.Context) error {
result = errors.Join(result, fmt.Errorf("failed to close balance metricer: %w", err))
}
}
if bs.TxManager != nil {
bs.TxManager.Close()
}
if bs.metricsSrv != nil {
if err := bs.metricsSrv.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop metrics server: %w", err))
......
......@@ -2,22 +2,19 @@ package op_challenger
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-challenger/game"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
)
// Main is the programmatic entry-point for running op-challenger
func Main(ctx context.Context, logger log.Logger, cfg *config.Config) error {
// Main is the programmatic entry-point for running op-challenger with a given configuration.
func Main(ctx context.Context, logger log.Logger, cfg *config.Config) (cliapp.Lifecycle, error) {
if err := cfg.Check(); err != nil {
return err
return nil, err
}
service, err := game.NewService(ctx, logger, cfg)
if err != nil {
return fmt.Errorf("failed to create the fault service: %w", err)
}
return service.MonitorGame(ctx)
srv, err := game.NewService(ctx, logger, cfg)
return srv, err
}
......@@ -12,6 +12,7 @@ import (
func TestMainShouldReturnErrorWhenConfigInvalid(t *testing.T) {
cfg := &config.Config{}
err := Main(context.Background(), testlog.Logger(t, log.LvlInfo), cfg)
app, err := Main(context.Background(), testlog.Logger(t, log.LvlInfo), cfg)
require.ErrorIs(t, err, cfg.Check())
require.Nil(t, app)
}
......@@ -4,16 +4,18 @@ import (
"context"
"os"
op_challenger "github.com/ethereum-optimism/optimism/op-challenger"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/log"
challenger "github.com/ethereum-optimism/optimism/op-challenger"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-challenger/flags"
"github.com/ethereum-optimism/optimism/op-challenger/version"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
var (
......@@ -26,14 +28,15 @@ var VersionWithMeta = opservice.FormatVersion(version.Version, GitCommit, GitDat
func main() {
args := os.Args
if err := run(args, op_challenger.Main); err != nil {
ctx := opio.WithInterruptBlocker(context.Background())
if err := run(ctx, args, challenger.Main); err != nil {
log.Crit("Application failed", "err", err)
}
}
type ConfigAction func(ctx context.Context, log log.Logger, config *config.Config) error
type ConfiguredLifecycle func(ctx context.Context, log log.Logger, config *config.Config) (cliapp.Lifecycle, error)
func run(args []string, action ConfigAction) error {
func run(ctx context.Context, args []string, action ConfiguredLifecycle) error {
oplog.SetupDefaults()
app := cli.NewApp()
......@@ -42,20 +45,20 @@ func run(args []string, action ConfigAction) error {
app.Name = "op-challenger"
app.Usage = "Challenge outputs"
app.Description = "Ensures that on chain outputs are correct."
app.Action = func(ctx *cli.Context) error {
app.Action = cliapp.LifecycleCmd(func(ctx *cli.Context, close context.CancelCauseFunc) (cliapp.Lifecycle, error) {
logger, err := setupLogging(ctx)
if err != nil {
return err
return nil, err
}
logger.Info("Starting op-challenger", "version", VersionWithMeta)
cfg, err := flags.NewConfigFromCLI(ctx)
if err != nil {
return err
return nil, err
}
return action(ctx.Context, logger, cfg)
}
return app.Run(args)
})
return app.RunContext(ctx, args)
}
func setupLogging(ctx *cli.Context) (log.Logger, error) {
......
......@@ -2,15 +2,19 @@ package main
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
)
var (
......@@ -36,7 +40,7 @@ func TestLogLevel(t *testing.T) {
for _, lvl := range []string{"trace", "debug", "info", "error", "crit"} {
lvl := lvl
t.Run("AcceptValid_"+lvl, func(t *testing.T) {
logger, _, err := runWithArgs(addRequiredArgs(config.TraceTypeAlphabet, "--log.level", lvl))
logger, _, err := dryRunWithArgs(addRequiredArgs(config.TraceTypeAlphabet, "--log.level", lvl))
require.NoError(t, err)
require.NotNil(t, logger)
})
......@@ -431,25 +435,29 @@ func TestCannonL2Genesis(t *testing.T) {
}
func verifyArgsInvalid(t *testing.T, messageContains string, cliArgs []string) {
_, _, err := runWithArgs(cliArgs)
_, _, err := dryRunWithArgs(cliArgs)
require.ErrorContains(t, err, messageContains)
}
func configForArgs(t *testing.T, cliArgs []string) config.Config {
_, cfg, err := runWithArgs(cliArgs)
_, cfg, err := dryRunWithArgs(cliArgs)
require.NoError(t, err)
return cfg
}
func runWithArgs(cliArgs []string) (log.Logger, config.Config, error) {
func dryRunWithArgs(cliArgs []string) (log.Logger, config.Config, error) {
cfg := new(config.Config)
var logger log.Logger
fullArgs := append([]string{"op-challenger"}, cliArgs...)
err := run(fullArgs, func(ctx context.Context, log log.Logger, config *config.Config) error {
testErr := errors.New("dry-run")
err := run(context.Background(), fullArgs, func(ctx context.Context, log log.Logger, config *config.Config) (cliapp.Lifecycle, error) {
logger = log
cfg = config
return nil
return nil, testErr
})
if errors.Is(err, testErr) { // expected error
err = nil
}
return logger, *cfg, err
}
......
......@@ -232,6 +232,9 @@ func (m *mockTxManager) From() common.Address {
return m.from
}
func (m *mockTxManager) Close() {
}
type mockContract struct {
calls int
callFails bool
......
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-challenger/game/scheduler"
......@@ -39,6 +40,7 @@ type gameMonitor struct {
allowedGames []common.Address
l1HeadsSub ethereum.Subscription
l1Source *headSource
runState sync.Mutex
}
type MinimalSubscriber interface {
......@@ -126,8 +128,10 @@ func (m *gameMonitor) onNewL1Head(ctx context.Context, sig eth.L1BlockRef) {
}
}
func (m *gameMonitor) resubscribeFunction(ctx context.Context) event.ResubscribeErrFunc {
return func(innerCtx context.Context, err error) (event.Subscription, error) {
func (m *gameMonitor) resubscribeFunction() event.ResubscribeErrFunc {
// The ctx is cancelled as soon as the subscription is returned,
// but is only used to create the subscription, and does not affect the returned subscription.
return func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
m.logger.Warn("resubscribing after failed L1 subscription", "err", err)
}
......@@ -135,18 +139,21 @@ func (m *gameMonitor) resubscribeFunction(ctx context.Context) event.Resubscribe
}
}
func (m *gameMonitor) MonitorGames(ctx context.Context) error {
m.l1HeadsSub = event.ResubscribeErr(time.Second*10, m.resubscribeFunction(ctx))
for {
select {
case <-ctx.Done():
m.l1HeadsSub.Unsubscribe()
return nil
case err, ok := <-m.l1HeadsSub.Err():
if !ok {
return err
}
m.logger.Error("L1 subscription error", "err", err)
}
func (m *gameMonitor) StartMonitoring() {
m.runState.Lock()
defer m.runState.Unlock()
if m.l1HeadsSub != nil {
return // already started
}
m.l1HeadsSub = event.ResubscribeErr(time.Second*10, m.resubscribeFunction())
}
func (m *gameMonitor) StopMonitoring() {
m.runState.Lock()
defer m.runState.Unlock()
if m.l1HeadsSub == nil {
return // already stopped
}
m.l1HeadsSub.Unsubscribe()
m.l1HeadsSub = nil
}
......@@ -84,8 +84,9 @@ func TestMonitorGames(t *testing.T) {
cancel()
}()
err := monitor.MonitorGames(ctx)
require.NoError(t, err)
monitor.StartMonitoring()
<-ctx.Done()
monitor.StopMonitoring()
require.Len(t, sched.scheduled, 1)
require.Equal(t, []common.Address{addr1, addr2}, sched.scheduled[0])
})
......@@ -129,8 +130,9 @@ func TestMonitorGames(t *testing.T) {
cancel()
}()
err := monitor.MonitorGames(ctx)
require.NoError(t, err)
monitor.StartMonitoring()
<-ctx.Done()
monitor.StopMonitoring()
require.NotEmpty(t, sched.scheduled) // We might get more than one update scheduled.
require.Equal(t, []common.Address{addr1, addr2}, sched.scheduled[0])
})
......
This diff is collapsed.
package metrics
import (
"context"
"io"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
......@@ -19,6 +19,8 @@ type Metricer interface {
RecordInfo(version string)
RecordUp()
StartBalanceMetrics(l log.Logger, client *ethclient.Client, account common.Address) io.Closer
// Record Tx metrics
txmetrics.TxMetricer
......@@ -128,17 +130,11 @@ func (m *Metrics) Start(host string, port int) (*httputil.HTTPServer, error) {
}
func (m *Metrics) StartBalanceMetrics(
ctx context.Context,
l log.Logger,
client *ethclient.Client,
account common.Address,
) {
// TODO(7684): util was refactored to close, but ctx is still being used by caller for shutdown
balanceMetric := opmetrics.LaunchBalanceMetrics(l, m.registry, m.ns, client, account)
go func() {
<-ctx.Done()
_ = balanceMetric.Close()
}()
) io.Closer {
return opmetrics.LaunchBalanceMetrics(l, m.registry, m.ns, client, account)
}
// RecordInfo sets a pseudo-metric that contains versioning and
......
package metrics
import (
"io"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
)
......@@ -8,6 +14,10 @@ type NoopMetricsImpl struct {
txmetrics.NoopTxMetrics
}
func (i *NoopMetricsImpl) StartBalanceMetrics(l log.Logger, client *ethclient.Client, account common.Address) io.Closer {
return nil
}
var NoopMetrics Metricer = new(NoopMetricsImpl)
func (*NoopMetricsImpl) RecordInfo(version string) {}
......
......@@ -54,6 +54,8 @@ func (f fakeTxMgr) BlockNumber(_ context.Context) (uint64, error) {
func (f fakeTxMgr) Send(_ context.Context, _ txmgr.TxCandidate) (*types.Receipt, error) {
panic("unimplemented")
}
func (f fakeTxMgr) Close() {
}
func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Client, rollupCl *sources.RollupClient) *L2Proposer {
proposerConfig := proposer.ProposerConfig{
......
......@@ -11,16 +11,19 @@ import (
"testing"
"time"
op_challenger "github.com/ethereum-optimism/optimism/op-challenger"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/log"
challenger "github.com/ethereum-optimism/optimism/op-challenger"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
type Helper struct {
......@@ -28,8 +31,7 @@ type Helper struct {
t *testing.T
require *require.Assertions
dir string
cancel func()
errors chan error
chl cliapp.Lifecycle
}
type Option func(config2 *config.Config)
......@@ -127,20 +129,16 @@ func NewChallenger(t *testing.T, ctx context.Context, l1Endpoint string, name st
log := testlog.Logger(t, log.LvlDebug).New("role", name)
log.Info("Creating challenger", "l1", l1Endpoint)
cfg := NewChallengerConfig(t, l1Endpoint, options...)
chl, err := challenger.Main(ctx, log, cfg)
require.NoError(t, err, "must init challenger")
require.NoError(t, chl.Start(ctx), "must start challenger")
errCh := make(chan error, 1)
ctx, cancel := context.WithCancel(ctx)
go func() {
defer close(errCh)
errCh <- op_challenger.Main(ctx, log, cfg)
}()
return &Helper{
log: log,
t: t,
require: require.New(t),
dir: cfg.Datadir,
cancel: cancel,
errors: errCh,
chl: chl,
}
}
......@@ -179,16 +177,9 @@ func NewChallengerConfig(t *testing.T, l1Endpoint string, options ...Option) *co
}
func (h *Helper) Close() error {
h.cancel()
select {
case <-time.After(1 * time.Minute):
return errors.New("timed out while stopping challenger")
case err := <-h.errors:
if !errors.Is(err, context.Canceled) {
return err
}
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
return h.chl.Stop(ctx)
}
type GameAddr interface {
......
......@@ -174,7 +174,7 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
if err != nil {
n.log.Warn("resubscribing after failed L1 subscription", "err", err)
}
return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewL1Head)
return eth.WatchHeadChanges(ctx, n.l1Source, n.OnNewL1Head)
})
go func() {
err, ok := <-n.l1HeadsSub.Err()
......@@ -186,9 +186,9 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
// Poll for the safe L1 block and finalized block,
// which only change once per epoch at most and may be delayed.
n.l1SafeSub = eth.PollBlockChanges(n.resourcesCtx, n.log, n.l1Source, n.OnNewL1Safe, eth.Safe,
n.l1SafeSub = eth.PollBlockChanges(n.log, n.l1Source, n.OnNewL1Safe, eth.Safe,
cfg.L1EpochPollInterval, time.Second*10)
n.l1FinalizedSub = eth.PollBlockChanges(n.resourcesCtx, n.log, n.l1Source, n.OnNewL1Finalized, eth.Finalized,
n.l1FinalizedSub = eth.PollBlockChanges(n.log, n.l1Source, n.OnNewL1Finalized, eth.Finalized,
cfg.L1EpochPollInterval, time.Second*10)
return nil
}
......@@ -582,6 +582,14 @@ func (n *OpNode) Stop(ctx context.Context) error {
if n.l1HeadsSub != nil {
n.l1HeadsSub.Unsubscribe()
}
// stop polling for L1 safe-head changes
if n.l1SafeSub != nil {
n.l1SafeSub.Unsubscribe()
}
// stop polling for L1 finalized-head changes
if n.l1FinalizedSub != nil {
n.l1FinalizedSub.Unsubscribe()
}
// close L2 driver
if n.l2Driver != nil {
......
......@@ -279,6 +279,11 @@ func (ps *ProposerService) Stop(ctx context.Context) error {
result = errors.Join(result, fmt.Errorf("failed to close balance metricer: %w", err))
}
}
if ps.TxManager != nil {
ps.TxManager.Close()
}
if ps.metricsSrv != nil {
if err := ps.metricsSrv.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop metrics server: %w", err))
......
......@@ -17,7 +17,8 @@ type NewHeadSource interface {
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
}
// WatchHeadChanges wraps a new-head subscription from NewHeadSource to feed the given Tracker
// WatchHeadChanges wraps a new-head subscription from NewHeadSource to feed the given Tracker.
// The ctx is only used to create the subscription, and does not affect the returned subscription.
func WatchHeadChanges(ctx context.Context, src NewHeadSource, fn HeadSignalFn) (ethereum.Subscription, error) {
headChanges := make(chan *types.Header, 10)
sub, err := src.SubscribeNewHead(ctx, headChanges)
......@@ -25,22 +26,33 @@ func WatchHeadChanges(ctx context.Context, src NewHeadSource, fn HeadSignalFn) (
return nil, err
}
return event.NewSubscription(func(quit <-chan struct{}) error {
eventsCtx, eventsCancel := context.WithCancel(context.Background())
defer sub.Unsubscribe()
defer eventsCancel()
// We can handle a quit signal while fn is running, by closing the ctx.
go func() {
select {
case <-quit:
eventsCancel()
case <-eventsCtx.Done(): // don't wait for quit signal if we closed for other reasons.
return
}
}()
for {
select {
case header := <-headChanges:
fn(ctx, L1BlockRef{
fn(eventsCtx, L1BlockRef{
Hash: header.Hash(),
Number: header.Number.Uint64(),
ParentHash: header.ParentHash,
Time: header.Time,
})
case err := <-sub.Err():
return err
case <-ctx.Done():
return ctx.Err()
case <-quit:
case <-eventsCtx.Done():
return nil
case err := <-sub.Err(): // if the underlying subscription fails, stop
return err
}
}
}), nil
......@@ -53,7 +65,7 @@ type L1BlockRefsSource interface {
// PollBlockChanges opens a polling loop to fetch the L1 block reference with the given label,
// on provided interval and with request timeout. Results are returned with provided callback fn,
// which may block to pause/back-pressure polling.
func PollBlockChanges(ctx context.Context, log log.Logger, src L1BlockRefsSource, fn HeadSignalFn,
func PollBlockChanges(log log.Logger, src L1BlockRefsSource, fn HeadSignalFn,
label BlockLabel, interval time.Duration, timeout time.Duration) ethereum.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
if interval <= 0 {
......@@ -61,22 +73,32 @@ func PollBlockChanges(ctx context.Context, log log.Logger, src L1BlockRefsSource
<-quit
return nil
}
eventsCtx, eventsCancel := context.WithCancel(context.Background())
defer eventsCancel()
// We can handle a quit signal while fn is running, by closing the ctx.
go func() {
select {
case <-quit:
eventsCancel()
case <-eventsCtx.Done(): // don't wait for quit signal if we closed for other reasons.
return
}
}()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
reqCtx, reqCancel := context.WithTimeout(ctx, timeout)
reqCtx, reqCancel := context.WithTimeout(eventsCtx, timeout)
ref, err := src.L1BlockRefByLabel(reqCtx, label)
reqCancel()
if err != nil {
log.Warn("failed to poll L1 block", "label", label, "err", err)
} else {
fn(ctx, ref)
fn(eventsCtx, ref)
}
case <-ctx.Done():
return ctx.Err()
case <-quit:
case <-eventsCtx.Done():
return nil
}
}
......
......@@ -43,6 +43,11 @@ func (_m *TxManager) BlockNumber(ctx context.Context) (uint64, error) {
return r0, r1
}
// Close provides a mock function with given fields:
func (_m *TxManager) Close() {
_m.Called()
}
// From provides a mock function with given fields:
func (_m *TxManager) From() common.Address {
ret := _m.Called()
......
......@@ -49,6 +49,9 @@ type TxManager interface {
// BlockNumber returns the most recent block number from the underlying network.
BlockNumber(ctx context.Context) (uint64, error)
// Close the underlying connection
Close()
}
// ETHBackend is the set of methods that the transaction manager uses to resubmit gas & determine
......@@ -80,6 +83,8 @@ type ETHBackend interface {
// EstimateGas returns an estimate of the amount of gas needed to execute the given
// transaction against the current pending block.
EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error)
// Close the underlying eth connection
Close()
}
// SimpleTxManager is a implementation of TxManager that performs linear fee
......@@ -131,6 +136,10 @@ func (m *SimpleTxManager) BlockNumber(ctx context.Context) (uint64, error) {
return m.backend.BlockNumber(ctx)
}
func (m *SimpleTxManager) Close() {
m.backend.Close()
}
// TxCandidate is a transaction candidate that can be submitted to ask the
// [TxManager] to construct a transaction with gas price bounds.
type TxCandidate struct {
......
......@@ -261,6 +261,9 @@ func (b *mockBackend) TransactionReceipt(ctx context.Context, txHash common.Hash
}, nil
}
func (b *mockBackend) Close() {
}
// TestTxMgrConfirmAtMinGasPrice asserts that Send returns the min gas price tx
// if the tx is mined instantly.
func TestTxMgrConfirmAtMinGasPrice(t *testing.T) {
......@@ -755,6 +758,9 @@ func (b *failingBackend) ChainID(ctx context.Context) (*big.Int, error) {
return nil, errors.New("unimplemented")
}
func (b *failingBackend) Close() {
}
// TestWaitMinedReturnsReceiptAfterFailure asserts that WaitMined is able to
// recover from failed calls to the backend. It uses the failedBackend to
// simulate an rpc call failure, followed by the successful return of a receipt.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment