Commit 9759ad46 authored by Adrian Sutton's avatar Adrian Sutton

op-e2e: Automatically close System at the end of the test

parent 65d58130
...@@ -27,6 +27,10 @@ import ( ...@@ -27,6 +27,10 @@ import (
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
) )
var (
ErrAlreadyStopped = errors.New("already stopped")
)
type BatcherConfig struct { type BatcherConfig struct {
NetworkTimeout time.Duration NetworkTimeout time.Duration
PollInterval time.Duration PollInterval time.Duration
...@@ -285,7 +289,7 @@ func (bs *BatcherService) Kill() error { ...@@ -285,7 +289,7 @@ func (bs *BatcherService) Kill() error {
// If the provided ctx is cancelled, the stopping is forced, i.e. the batching work is killed non-gracefully. // If the provided ctx is cancelled, the stopping is forced, i.e. the batching work is killed non-gracefully.
func (bs *BatcherService) Stop(ctx context.Context) error { func (bs *BatcherService) Stop(ctx context.Context) error {
if bs.stopped.Load() { if bs.stopped.Load() {
return errors.New("already stopped") return ErrAlreadyStopped
} }
bs.Log.Info("Stopping batcher") bs.Log.Info("Stopping batcher")
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"path" "path"
"sort" "sort"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"time" "time"
...@@ -274,6 +275,9 @@ type System struct { ...@@ -274,6 +275,9 @@ type System struct {
// Note that this time travel may occur in a single block, creating a very large difference in the Time // Note that this time travel may occur in a single block, creating a very large difference in the Time
// on sequential blocks. // on sequential blocks.
TimeTravelClock *clock.AdvancingClock TimeTravelClock *clock.AdvancingClock
t *testing.T
closed atomic.Bool
} }
func (sys *System) NodeEndpoint(name string) string { func (sys *System) NodeEndpoint(name string) string {
...@@ -281,23 +285,41 @@ func (sys *System) NodeEndpoint(name string) string { ...@@ -281,23 +285,41 @@ func (sys *System) NodeEndpoint(name string) string {
} }
func (sys *System) Close() { func (sys *System) Close() {
if !sys.closed.CompareAndSwap(false, true) {
// Already closed.
return
}
postCtx, postCancel := context.WithCancel(context.Background()) postCtx, postCancel := context.WithCancel(context.Background())
postCancel() // immediate shutdown, no allowance for idling postCancel() // immediate shutdown, no allowance for idling
var combinedErr error
if sys.L2OutputSubmitter != nil { if sys.L2OutputSubmitter != nil {
_ = sys.L2OutputSubmitter.Kill() if err := sys.L2OutputSubmitter.Kill(); err != nil && !errors.Is(err, l2os.ErrAlreadyStopped) {
combinedErr = errors.Join(combinedErr, fmt.Errorf("stop L2OutputSubmitter: %w", err))
}
} }
if sys.BatchSubmitter != nil { if sys.BatchSubmitter != nil {
_ = sys.BatchSubmitter.Kill() if err := sys.BatchSubmitter.Kill(); err != nil && !errors.Is(err, bss.ErrAlreadyStopped) {
combinedErr = errors.Join(combinedErr, fmt.Errorf("stop BatchSubmitter: %w", err))
}
} }
for _, node := range sys.RollupNodes { for name, node := range sys.RollupNodes {
_ = node.Stop(postCtx) if err := node.Stop(postCtx); err != nil && !errors.Is(err, rollupNode.ErrAlreadyClosed) {
combinedErr = errors.Join(combinedErr, fmt.Errorf("stop rollup node %v: %w", name, err))
}
}
for name, ei := range sys.EthInstances {
if err := ei.Close(); err != nil && !errors.Is(err, node.ErrNodeStopped) {
combinedErr = errors.Join(combinedErr, fmt.Errorf("stop EthInstance %v: %w", name, err))
}
} }
for _, ei := range sys.EthInstances { if sys.Mocknet != nil {
ei.Close() if err := sys.Mocknet.Close(); err != nil {
combinedErr = errors.Join(combinedErr, fmt.Errorf("stop Mocknet: %w", err))
} }
sys.Mocknet.Close() }
require.NoError(sys.t, combinedErr, "Failed to stop system")
} }
type systemConfigHook func(sCfg *SystemConfig, s *System) type systemConfigHook func(sCfg *SystemConfig, s *System)
...@@ -343,20 +365,10 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -343,20 +365,10 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
Clients: make(map[string]*ethclient.Client), Clients: make(map[string]*ethclient.Client),
RawClients: make(map[string]*rpc.Client), RawClients: make(map[string]*rpc.Client),
RollupNodes: make(map[string]*rollupNode.OpNode), RollupNodes: make(map[string]*rollupNode.OpNode),
t: t,
} }
didErrAfterStart := false // Automatically stop the system at the end of the test
defer func() { t.Cleanup(sys.Close)
if didErrAfterStart {
postCtx, postCancel := context.WithCancel(context.Background())
postCancel() // immediate shutdown, no allowance for idling
for _, node := range sys.RollupNodes {
_ = node.Stop(postCtx)
}
for _, ei := range sys.EthInstances {
ei.Close()
}
}
}()
c := clock.SystemClock c := clock.SystemClock
if cfg.SupportL1TimeTravel { if cfg.SupportL1TimeTravel {
...@@ -471,7 +483,6 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -471,7 +483,6 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
} }
err = l1Node.Start() err = l1Node.Start()
if err != nil { if err != nil {
didErrAfterStart = true
return nil, err return nil, err
} }
...@@ -488,7 +499,6 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -488,7 +499,6 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
} }
err = gethInst.Node.Start() err = gethInst.Node.Start()
if err != nil { if err != nil {
didErrAfterStart = true
return nil, err return nil, err
} }
ethClient = gethInst ethClient = gethInst
...@@ -524,7 +534,6 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -524,7 +534,6 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
defer cancel() defer cancel()
l1Srv, err := l1Node.RPCHandler() l1Srv, err := l1Node.RPCHandler()
if err != nil { if err != nil {
didErrAfterStart = true
return nil, err return nil, err
} }
rawL1Client := rpc.DialInProc(l1Srv) rawL1Client := rpc.DialInProc(l1Srv)
...@@ -534,7 +543,6 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -534,7 +543,6 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
for name, ethInst := range sys.EthInstances { for name, ethInst := range sys.EthInstances {
rawClient, err := rpc.DialContext(ctx, ethInst.WSEndpoint()) rawClient, err := rpc.DialContext(ctx, ethInst.WSEndpoint())
if err != nil { if err != nil {
didErrAfterStart = true
return nil, err return nil, err
} }
client := ethclient.NewClient(rawClient) client := ethclient.NewClient(rawClient)
...@@ -641,13 +649,11 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste ...@@ -641,13 +649,11 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
} }
node, err := rollupNode.New(context.Background(), &c, l, snapLog, "", metrics.NewMetrics("")) node, err := rollupNode.New(context.Background(), &c, l, snapLog, "", metrics.NewMetrics(""))
if err != nil { if err != nil {
didErrAfterStart = true
return nil, err return nil, err
} }
cycle = node cycle = node
err = node.Start(context.Background()) err = node.Start(context.Background())
if err != nil { if err != nil {
didErrAfterStart = true
return nil, err return nil, err
} }
sys.RollupNodes[name] = node sys.RollupNodes[name] = node
......
...@@ -30,6 +30,10 @@ import ( ...@@ -30,6 +30,10 @@ import (
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
) )
var (
ErrAlreadyClosed = errors.New("node is already closed")
)
type OpNode struct { type OpNode struct {
log log.Logger log log.Logger
appVersion string appVersion string
...@@ -553,7 +557,7 @@ func (n *OpNode) RuntimeConfig() ReadonlyRuntimeConfig { ...@@ -553,7 +557,7 @@ func (n *OpNode) RuntimeConfig() ReadonlyRuntimeConfig {
// If the provided ctx is expired, the node will accelerate the stop where possible, but still fully close. // If the provided ctx is expired, the node will accelerate the stop where possible, but still fully close.
func (n *OpNode) Stop(ctx context.Context) error { func (n *OpNode) Stop(ctx context.Context) error {
if n.closed.Load() { if n.closed.Load() {
return errors.New("node is already closed") return ErrAlreadyClosed
} }
var result *multierror.Error var result *multierror.Error
......
...@@ -26,6 +26,10 @@ import ( ...@@ -26,6 +26,10 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
var (
ErrAlreadyStopped = errors.New("already stopped")
)
type ProposerConfig struct { type ProposerConfig struct {
// How frequently to poll L2 for new finalized outputs // How frequently to poll L2 for new finalized outputs
PollInterval time.Duration PollInterval time.Duration
...@@ -252,7 +256,7 @@ func (ps *ProposerService) Kill() error { ...@@ -252,7 +256,7 @@ func (ps *ProposerService) Kill() error {
// See driver.StopL2OutputSubmitting to temporarily stop the L2Output submitter. // See driver.StopL2OutputSubmitting to temporarily stop the L2Output submitter.
func (ps *ProposerService) Stop(ctx context.Context) error { func (ps *ProposerService) Stop(ctx context.Context) error {
if ps.stopped.Load() { if ps.stopped.Load() {
return errors.New("already stopped") return ErrAlreadyStopped
} }
ps.Log.Info("Stopping Proposer") ps.Log.Info("Stopping Proposer")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment