Commit aa28bcfd authored by Francis Li's avatar Francis Li Committed by GitHub

change health monitor to return error instead of bool (#9198)

parent 6ffd27fe
......@@ -30,9 +30,10 @@ import (
)
var (
ErrResumeTimeout = errors.New("timeout to resume conductor")
ErrPauseTimeout = errors.New("timeout to pause conductor")
ErrUnsafeHeadMismarch = errors.New("unsafe head mismatch")
ErrResumeTimeout = errors.New("timeout to resume conductor")
ErrPauseTimeout = errors.New("timeout to pause conductor")
ErrUnsafeHeadMismarch = errors.New("unsafe head mismatch")
ErrUnableToRetrieveUnsafeHeadFromConsensus = errors.New("unable to retrieve unsafe head from consensus")
)
// New creates a new OpConductor instance.
......@@ -248,10 +249,11 @@ type OpConductor struct {
hmon health.HealthMonitor
leader atomic.Bool
healthy atomic.Bool
seqActive atomic.Bool
healthy atomic.Bool
hcerr error // error from health check
healthUpdateCh <-chan bool
healthUpdateCh <-chan error
leaderUpdateCh <-chan bool
actionFn func() // actionFn defines the action to be executed to bring the sequencer to the desired state.
......@@ -469,15 +471,21 @@ func (oc *OpConductor) handleLeaderUpdate(leader bool) {
}
// handleHealthUpdate handles health update from health monitor.
func (oc *OpConductor) handleHealthUpdate(healthy bool) {
func (oc *OpConductor) handleHealthUpdate(hcerr error) {
healthy := hcerr == nil
if !healthy {
oc.log.Error("Sequencer is unhealthy", "server", oc.cons.ServerID())
oc.log.Error("Sequencer is unhealthy", "server", oc.cons.ServerID(), "err", hcerr)
// always queue an action if it's unhealthy, it could be an no-op in the handler.
oc.queueAction()
}
if healthy != oc.healthy.Load() {
oc.healthy.Store(healthy)
// queue an action if health status changed.
oc.queueAction()
}
oc.healthy.Store(healthy)
oc.hcerr = hcerr
}
// action tries to bring the sequencer to the desired state, a retry will be queued if any action failed.
......@@ -572,13 +580,40 @@ func (oc *OpConductor) startSequencer() error {
// When starting sequencer, we need to make sure that the current node has the latest unsafe head from the consensus protocol
// If not, then we wait for the unsafe head to catch up or gossip it to op-node manually from op-conductor.
unsafeInCons, unsafeInNode, err := oc.compareUnsafeHead(ctx)
// if there's a mismatch, try to post the unsafe head to op-node
if err != nil {
if errors.Is(err, ErrUnsafeHeadMismarch) && uint64(unsafeInCons.ExecutionPayload.BlockNumber)-unsafeInNode.NumberU64() == 1 {
// tries to post the unsafe head to op-node when head is only 1 block behind (most likely due to gossip delay)
if innerErr := oc.ctrl.PostUnsafePayload(ctx, unsafeInCons); innerErr != nil {
oc.log.Error("failed to post unsafe head payload envelope to op-node", "err", innerErr)
}
}
return err
}
if err = oc.ctrl.StartSequencer(ctx, unsafeInCons.ExecutionPayload.BlockHash); err != nil {
// cannot directly compare using Errors.Is because the error is returned from an JSON RPC server which lost its type.
if !strings.Contains(err.Error(), driver.ErrSequencerAlreadyStarted.Error()) {
return fmt.Errorf("failed to start sequencer: %w", err)
} else {
oc.log.Warn("sequencer already started.", "err", err)
}
}
oc.seqActive.Store(true)
return nil
}
func (oc *OpConductor) compareUnsafeHead(ctx context.Context) (*eth.ExecutionPayloadEnvelope, eth.BlockInfo, error) {
unsafeInCons := oc.cons.LatestUnsafePayload()
if unsafeInCons == nil {
return errors.New("failed to get latest unsafe block from consensus")
return nil, nil, ErrUnableToRetrieveUnsafeHeadFromConsensus
}
unsafeInNode, err := oc.ctrl.LatestUnsafeBlock(ctx)
if err != nil {
return errors.Wrap(err, "failed to get latest unsafe block from EL during startSequencer phase")
return unsafeInCons, nil, errors.Wrap(err, "failed to get latest unsafe block from EL during compareUnsafeHead phase")
}
if unsafeInCons.ExecutionPayload.BlockHash != unsafeInNode.Hash() {
......@@ -590,26 +625,10 @@ func (oc *OpConductor) startSequencer() error {
"node_block_num", unsafeInNode.NumberU64(),
)
if uint64(unsafeInCons.ExecutionPayload.BlockNumber)-unsafeInNode.NumberU64() == 1 {
// tries to post the unsafe head to op-node when head is only 1 block behind (most likely due to gossip delay)
if err = oc.ctrl.PostUnsafePayload(ctx, unsafeInCons); err != nil {
oc.log.Error("failed to post unsafe head payload envelope to op-node", "err", err)
}
}
return ErrUnsafeHeadMismarch // return error to allow retry
return unsafeInCons, unsafeInNode, ErrUnsafeHeadMismarch
}
if err = oc.ctrl.StartSequencer(ctx, unsafeInCons.ExecutionPayload.BlockHash); err != nil {
// cannot directly compare using Errors.Is because the error is returned from an JSON RPC server which lost its type.
if !strings.Contains(err.Error(), driver.ErrSequencerAlreadyStarted.Error()) {
return fmt.Errorf("failed to start sequencer: %w", err)
} else {
oc.log.Warn("sequencer already started.", "err", err)
}
}
oc.seqActive.Store(true)
return nil
return unsafeInCons, unsafeInNode, nil
}
func (oc *OpConductor) updateSequencerActiveStatus() error {
......
......@@ -17,6 +17,7 @@ import (
clientmocks "github.com/ethereum-optimism/optimism/op-conductor/client/mocks"
consensusmocks "github.com/ethereum-optimism/optimism/op-conductor/consensus/mocks"
"github.com/ethereum-optimism/optimism/op-conductor/health"
healthmocks "github.com/ethereum-optimism/optimism/op-conductor/health/mocks"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
......@@ -80,7 +81,7 @@ type OpConductorTestSuite struct {
conductor *OpConductor
healthUpdateCh chan bool
healthUpdateCh chan error
leaderUpdateCh chan bool
ctx context.Context
......@@ -114,7 +115,7 @@ func (s *OpConductorTestSuite) SetupTest() {
s.NoError(err)
s.conductor = conductor
s.healthUpdateCh = make(chan bool)
s.healthUpdateCh = make(chan error)
s.hmon.EXPECT().Start().Return(nil)
s.conductor.healthUpdateCh = s.healthUpdateCh
......@@ -153,7 +154,14 @@ func (s *OpConductorTestSuite) execute(fn func()) {
s.wg.Wait()
}
func (s *OpConductorTestSuite) updateStatusAndExecuteAction(ch chan bool, status bool) {
func (s *OpConductorTestSuite) updateLeaderStatusAndExecuteAction(ch chan bool, status bool) {
fn := func() {
ch <- status
}
s.execute(fn)
}
func (s *OpConductorTestSuite) updateHealthStatusAndExecuteAction(ch chan error, status error) {
fn := func() {
ch <- status
}
......@@ -172,7 +180,7 @@ func (s *OpConductorTestSuite) TestControlLoop1() {
s.True(s.conductor.Paused())
// Send health update, make sure it can still be consumed.
s.healthUpdateCh <- true
s.healthUpdateCh <- nil
// Resume
s.ctrl.EXPECT().SequencerActive(mock.Anything).Return(false, nil)
......@@ -247,7 +255,7 @@ func (s *OpConductorTestSuite) TestScenario1() {
s.cons.EXPECT().TransferLeader().Return(nil)
// become leader
s.updateStatusAndExecuteAction(s.leaderUpdateCh, true)
s.updateLeaderStatusAndExecuteAction(s.leaderUpdateCh, true)
// expect to transfer leadership, go back to [follower, not healthy, not sequencing]
s.False(s.conductor.leader.Load())
......@@ -267,7 +275,7 @@ func (s *OpConductorTestSuite) TestScenario2() {
s.conductor.seqActive.Store(false)
// become healthy
s.updateStatusAndExecuteAction(s.healthUpdateCh, true)
s.updateHealthStatusAndExecuteAction(s.healthUpdateCh, nil)
// expect to stay as follower, go to [follower, healthy, not sequencing]
s.False(s.conductor.leader.Load())
......@@ -302,7 +310,7 @@ func (s *OpConductorTestSuite) TestScenario3() {
s.False(s.conductor.seqActive.Load())
// become leader
s.updateStatusAndExecuteAction(s.leaderUpdateCh, true)
s.updateLeaderStatusAndExecuteAction(s.leaderUpdateCh, true)
// [leader, healthy, sequencing]
s.True(s.conductor.leader.Load())
......@@ -335,7 +343,7 @@ func (s *OpConductorTestSuite) TestScenario4() {
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1)
s.ctrl.EXPECT().PostUnsafePayload(mock.Anything, mock.Anything).Return(nil).Times(1)
s.updateStatusAndExecuteAction(s.leaderUpdateCh, true)
s.updateLeaderStatusAndExecuteAction(s.leaderUpdateCh, true)
// [leader, healthy, not sequencing]
s.True(s.conductor.leader.Load())
......@@ -376,7 +384,7 @@ func (s *OpConductorTestSuite) TestScenario5() {
s.conductor.seqActive.Store(false)
// become unhealthy
s.updateStatusAndExecuteAction(s.healthUpdateCh, false)
s.updateHealthStatusAndExecuteAction(s.healthUpdateCh, health.ErrSequencerNotHealthy)
// expect to stay as follower, go to [follower, not healthy, not sequencing]
s.False(s.conductor.leader.Load())
......@@ -397,7 +405,7 @@ func (s *OpConductorTestSuite) TestScenario6() {
s.ctrl.EXPECT().StopSequencer(mock.Anything).Return(common.Hash{}, nil).Times(1)
// step down as leader
s.updateStatusAndExecuteAction(s.leaderUpdateCh, false)
s.updateLeaderStatusAndExecuteAction(s.leaderUpdateCh, false)
// expect to stay as follower, go to [follower, healthy, not sequencing]
s.False(s.conductor.leader.Load())
......@@ -421,7 +429,7 @@ func (s *OpConductorTestSuite) TestScenario7() {
s.ctrl.EXPECT().StopSequencer(mock.Anything).Return(common.Hash{}, nil).Times(1)
// become unhealthy
s.updateStatusAndExecuteAction(s.healthUpdateCh, false)
s.updateHealthStatusAndExecuteAction(s.healthUpdateCh, health.ErrSequencerNotHealthy)
// expect to step down as leader and stop sequencing
s.False(s.conductor.leader.Load())
......@@ -451,7 +459,7 @@ func (s *OpConductorTestSuite) TestFailureAndRetry1() {
s.cons.EXPECT().TransferLeader().Return(err).Times(1)
s.ctrl.EXPECT().StopSequencer(mock.Anything).Return(common.Hash{}, err).Times(1)
s.updateStatusAndExecuteAction(s.healthUpdateCh, false)
s.updateHealthStatusAndExecuteAction(s.healthUpdateCh, health.ErrSequencerNotHealthy)
s.True(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
......@@ -503,7 +511,7 @@ func (s *OpConductorTestSuite) TestFailureAndRetry2() {
s.cons.EXPECT().TransferLeader().Return(nil).Times(1)
s.ctrl.EXPECT().StopSequencer(mock.Anything).Return(common.Hash{}, err).Times(1)
s.updateStatusAndExecuteAction(s.healthUpdateCh, false)
s.updateHealthStatusAndExecuteAction(s.healthUpdateCh, health.ErrSequencerNotHealthy)
s.False(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
......
......@@ -108,19 +108,19 @@ func (_c *HealthMonitor_Stop_Call) RunAndReturn(run func() error) *HealthMonitor
}
// Subscribe provides a mock function with given fields:
func (_m *HealthMonitor) Subscribe() <-chan bool {
func (_m *HealthMonitor) Subscribe() <-chan error {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Subscribe")
}
var r0 <-chan bool
if rf, ok := ret.Get(0).(func() <-chan bool); ok {
var r0 <-chan error
if rf, ok := ret.Get(0).(func() <-chan error); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan bool)
r0 = ret.Get(0).(<-chan error)
}
}
......@@ -144,12 +144,12 @@ func (_c *HealthMonitor_Subscribe_Call) Run(run func()) *HealthMonitor_Subscribe
return _c
}
func (_c *HealthMonitor_Subscribe_Call) Return(_a0 <-chan bool) *HealthMonitor_Subscribe_Call {
func (_c *HealthMonitor_Subscribe_Call) Return(_a0 <-chan error) *HealthMonitor_Subscribe_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *HealthMonitor_Subscribe_Call) RunAndReturn(run func() <-chan bool) *HealthMonitor_Subscribe_Call {
func (_c *HealthMonitor_Subscribe_Call) RunAndReturn(run func() <-chan error) *HealthMonitor_Subscribe_Call {
_c.Call.Return(run)
return _c
}
......
......@@ -2,6 +2,7 @@ package health
import (
"context"
"errors"
"sync"
"time"
......@@ -12,12 +13,17 @@ import (
"github.com/ethereum-optimism/optimism/op-service/dial"
)
var (
ErrSequencerNotHealthy = errors.New("sequencer is not healthy")
ErrSequencerConnectionDown = errors.New("cannot connect to sequencer rpc endpoints")
)
// HealthMonitor defines the interface for monitoring the health of the sequencer.
//
//go:generate mockery --name HealthMonitor --output mocks/ --with-expecter=true
type HealthMonitor interface {
// Subscribe returns a channel that will be notified for every health check.
Subscribe() <-chan bool
Subscribe() <-chan error
// Start starts the health check.
Start() error
// Stop stops the health check.
......@@ -33,7 +39,7 @@ func NewSequencerHealthMonitor(log log.Logger, interval, unsafeInterval, safeInt
log: log,
done: make(chan struct{}),
interval: interval,
healthUpdateCh: make(chan bool),
healthUpdateCh: make(chan error),
rollupCfg: rollupCfg,
unsafeInterval: unsafeInterval,
safeInterval: safeInterval,
......@@ -54,7 +60,7 @@ type SequencerHealthMonitor struct {
safeInterval uint64
minPeerCount uint64
interval uint64
healthUpdateCh chan bool
healthUpdateCh chan error
lastSeenUnsafeNum uint64
lastSeenUnsafeTime uint64
......@@ -85,7 +91,7 @@ func (hm *SequencerHealthMonitor) Stop() error {
}
// Subscribe implements HealthMonitor.
func (hm *SequencerHealthMonitor) Subscribe() <-chan bool {
func (hm *SequencerHealthMonitor) Subscribe() <-chan error {
return hm.healthUpdateCh
}
......@@ -111,12 +117,12 @@ func (hm *SequencerHealthMonitor) loop() {
// 2. unsafe head is not too far behind now (measured by unsafeInterval)
// 3. safe head is progressing every configured batch submission interval
// 4. peer count is above the configured minimum
func (hm *SequencerHealthMonitor) healthCheck() bool {
func (hm *SequencerHealthMonitor) healthCheck() error {
ctx := context.Background()
status, err := hm.node.SyncStatus(ctx)
if err != nil {
hm.log.Error("health monitor failed to get sync status", "err", err)
return false
return ErrSequencerConnectionDown
}
now := uint64(time.Now().Unix())
......@@ -135,7 +141,7 @@ func (hm *SequencerHealthMonitor) healthCheck() bool {
"last_seen_unsafe_time", hm.lastSeenUnsafeTime,
"unsafe_interval", hm.unsafeInterval,
)
return false
return ErrSequencerNotHealthy
}
}
if status.UnsafeL2.Number > hm.lastSeenUnsafeNum {
......@@ -151,7 +157,7 @@ func (hm *SequencerHealthMonitor) healthCheck() bool {
"unsafe_head_time", status.UnsafeL2.Time,
"unsafe_interval", hm.unsafeInterval,
)
return false
return ErrSequencerNotHealthy
}
if now-status.SafeL2.Time > hm.safeInterval {
......@@ -162,18 +168,18 @@ func (hm *SequencerHealthMonitor) healthCheck() bool {
"safe_head_time", status.SafeL2.Time,
"safe_interval", hm.safeInterval,
)
return false
return ErrSequencerNotHealthy
}
stats, err := hm.p2p.PeerStats(ctx)
if err != nil {
hm.log.Error("health monitor failed to get peer stats", "err", err)
return false
return ErrSequencerConnectionDown
}
if uint64(stats.Connected) < hm.minPeerCount {
hm.log.Error("peer count is below minimum", "connected", stats.Connected, "minPeerCount", hm.minPeerCount)
return false
return ErrSequencerNotHealthy
}
return true
return nil
}
......@@ -80,7 +80,7 @@ func (s *HealthMonitorTestSuite) TestUnhealthyLowPeerCount() {
healthUpdateCh := s.monitor.Subscribe()
healthy := <-healthUpdateCh
s.False(healthy)
s.NotNil(healthy)
}
func (s *HealthMonitorTestSuite) TestUnhealthyUnsafeHeadNotProgressing() {
......@@ -108,9 +108,9 @@ func (s *HealthMonitorTestSuite) TestUnhealthyUnsafeHeadNotProgressing() {
for i := 0; i < 3; i++ {
healthy := <-healthUpdateCh
if i < 2 {
s.True(healthy)
s.Nil(healthy)
} else {
s.False(healthy)
s.NotNil(healthy)
}
}
}
......@@ -143,9 +143,9 @@ func (s *HealthMonitorTestSuite) TestUnhealthySafeHeadNotProgressing() {
for i := 0; i < 6; i++ {
healthy := <-healthUpdateCh
if i < 5 {
s.True(healthy)
s.Nil(healthy)
} else {
s.False(healthy)
s.NotNil(healthy)
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment