Commit 9147c6e9 authored by Brian Bland's avatar Brian Bland Committed by GitHub

op-conductor: Better context management, graceful shutdown (#10804)

parent 71b93116
......@@ -311,7 +311,7 @@ var _ cliapp.Lifecycle = (*OpConductor)(nil)
func (oc *OpConductor) Start(ctx context.Context) error {
oc.log.Info("starting OpConductor")
if err := oc.hmon.Start(); err != nil {
if err := oc.hmon.Start(ctx); err != nil {
return errors.Wrap(err, "failed to start health monitor")
}
......
......@@ -122,7 +122,7 @@ func (s *OpConductorTestSuite) SetupTest() {
s.conductor = conductor
s.healthUpdateCh = make(chan error, 1)
s.hmon.EXPECT().Start().Return(nil)
s.hmon.EXPECT().Start(mock.Anything).Return(nil)
s.conductor.healthUpdateCh = s.healthUpdateCh
s.leaderUpdateCh = make(chan bool, 1)
......
......@@ -2,7 +2,11 @@
package mocks
import mock "github.com/stretchr/testify/mock"
import (
context "context"
mock "github.com/stretchr/testify/mock"
)
// HealthMonitor is an autogenerated mock type for the HealthMonitor type
type HealthMonitor struct {
......@@ -17,17 +21,17 @@ func (_m *HealthMonitor) EXPECT() *HealthMonitor_Expecter {
return &HealthMonitor_Expecter{mock: &_m.Mock}
}
// Start provides a mock function with given fields:
func (_m *HealthMonitor) Start() error {
ret := _m.Called()
// Start provides a mock function with given fields: ctx
func (_m *HealthMonitor) Start(ctx context.Context) error {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for Start")
}
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
......@@ -41,13 +45,14 @@ type HealthMonitor_Start_Call struct {
}
// Start is a helper method to define mock.On call
func (_e *HealthMonitor_Expecter) Start() *HealthMonitor_Start_Call {
return &HealthMonitor_Start_Call{Call: _e.mock.On("Start")}
// - ctx context.Context
func (_e *HealthMonitor_Expecter) Start(ctx interface{}) *HealthMonitor_Start_Call {
return &HealthMonitor_Start_Call{Call: _e.mock.On("Start", ctx)}
}
func (_c *HealthMonitor_Start_Call) Run(run func()) *HealthMonitor_Start_Call {
func (_c *HealthMonitor_Start_Call) Run(run func(ctx context.Context)) *HealthMonitor_Start_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
run(args[0].(context.Context))
})
return _c
}
......@@ -57,7 +62,7 @@ func (_c *HealthMonitor_Start_Call) Return(_a0 error) *HealthMonitor_Start_Call
return _c
}
func (_c *HealthMonitor_Start_Call) RunAndReturn(run func() error) *HealthMonitor_Start_Call {
func (_c *HealthMonitor_Start_Call) RunAndReturn(run func(context.Context) error) *HealthMonitor_Start_Call {
_c.Call.Return(run)
return _c
}
......
......@@ -26,7 +26,7 @@ type HealthMonitor interface {
// Subscribe returns a channel that will be notified for every health check.
Subscribe() <-chan error
// Start starts the health check.
Start() error
Start(ctx context.Context) error
// Stop stops the health check.
Stop() error
}
......@@ -39,7 +39,6 @@ func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interva
return &SequencerHealthMonitor{
log: log,
metrics: metrics,
done: make(chan struct{}),
interval: interval,
healthUpdateCh: make(chan error),
rollupCfg: rollupCfg,
......@@ -57,7 +56,7 @@ func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interva
type SequencerHealthMonitor struct {
log log.Logger
metrics metrics.Metricer
done chan struct{}
cancel context.CancelFunc
wg sync.WaitGroup
rollupCfg *rollup.Config
......@@ -79,10 +78,13 @@ type SequencerHealthMonitor struct {
var _ HealthMonitor = (*SequencerHealthMonitor)(nil)
// Start implements HealthMonitor.
func (hm *SequencerHealthMonitor) Start() error {
func (hm *SequencerHealthMonitor) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
hm.cancel = cancel
hm.log.Info("starting health monitor")
hm.wg.Add(1)
go hm.loop()
go hm.loop(ctx)
hm.log.Info("health monitor started")
return nil
......@@ -91,7 +93,7 @@ func (hm *SequencerHealthMonitor) Start() error {
// Stop implements HealthMonitor.
func (hm *SequencerHealthMonitor) Stop() error {
hm.log.Info("stopping health monitor")
close(hm.done)
hm.cancel()
hm.wg.Wait()
hm.log.Info("health monitor stopped")
......@@ -103,7 +105,7 @@ func (hm *SequencerHealthMonitor) Subscribe() <-chan error {
return hm.healthUpdateCh
}
func (hm *SequencerHealthMonitor) loop() {
func (hm *SequencerHealthMonitor) loop(ctx context.Context) {
defer hm.wg.Done()
duration := time.Duration(hm.interval) * time.Second
......@@ -112,16 +114,16 @@ func (hm *SequencerHealthMonitor) loop() {
for {
select {
case <-hm.done:
case <-ctx.Done():
return
case <-ticker.C:
err := hm.healthCheck()
err := hm.healthCheck(ctx)
hm.metrics.RecordHealthCheck(err == nil, err)
// Ensure that we exit cleanly if told to shutdown while still waiting to publish the health update
select {
case hm.healthUpdateCh <- err:
continue
case <-hm.done:
case <-ctx.Done():
return
}
}
......@@ -133,8 +135,7 @@ func (hm *SequencerHealthMonitor) loop() {
// 2. unsafe head is not too far behind now (measured by unsafeInterval)
// 3. safe head is progressing every configured batch submission interval
// 4. peer count is above the configured minimum
func (hm *SequencerHealthMonitor) healthCheck() error {
ctx := context.Background()
func (hm *SequencerHealthMonitor) healthCheck(ctx context.Context) error {
status, err := hm.node.SyncStatus(ctx)
if err != nil {
hm.log.Error("health monitor failed to get sync status", "err", err)
......
......@@ -6,6 +6,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/ethereum-optimism/optimism/op-conductor/metrics"
......@@ -53,11 +54,10 @@ func (s *HealthMonitorTestSuite) SetupMonitor(
ps1 := &p2p.PeerStats{
Connected: healthyPeerCount,
}
mockP2P.EXPECT().PeerStats(context.Background()).Return(ps1, nil)
mockP2P.EXPECT().PeerStats(mock.Anything).Return(ps1, nil)
}
monitor := &SequencerHealthMonitor{
log: s.log,
done: make(chan struct{}),
interval: s.interval,
metrics: &metrics.NoopMetricsImpl{},
healthUpdateCh: make(chan error),
......@@ -70,7 +70,7 @@ func (s *HealthMonitorTestSuite) SetupMonitor(
node: mockRollupClient,
p2p: mockP2P,
}
err := monitor.Start()
err := monitor.Start(context.Background())
s.NoError(err)
return monitor
}
......@@ -88,7 +88,7 @@ func (s *HealthMonitorTestSuite) TestUnhealthyLowPeerCount() {
ps1 := &p2p.PeerStats{
Connected: unhealthyPeerCount,
}
pc.EXPECT().PeerStats(context.Background()).Return(ps1, nil).Times(1)
pc.EXPECT().PeerStats(mock.Anything).Return(ps1, nil).Times(1)
monitor := s.SetupMonitor(now, 60, 60, rc, pc)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment