Commit 0c0b907b authored by Francis Li's avatar Francis Li Committed by GitHub

[op-conductor] state transition bug fix (#9248)

* Fix state transition bug

* Finish state transition tests

* Fix monitor bugs

* update upon suggestion
parent f0f6aa7c
......@@ -68,7 +68,7 @@ func NewOpConductor(
cons: cons,
hmon: hmon,
}
oc.actionFn = oc.action
oc.loopActionFn = oc.loopAction
// explicitly set all atomic.Bool values
oc.leader.Store(false) // upon start, it should not be the leader unless specified otherwise by raft bootstrap, in that case, it'll receive a leadership update from consensus.
......@@ -90,6 +90,7 @@ func NewOpConductor(
}
return nil, err
}
oc.prevState = NewState(oc.leader.Load(), oc.healthy.Load(), oc.seqActive.Load())
return oc, nil
}
......@@ -252,10 +253,11 @@ type OpConductor struct {
seqActive atomic.Bool
healthy atomic.Bool
hcerr error // error from health check
prevState *state
healthUpdateCh <-chan error
leaderUpdateCh <-chan bool
actionFn func() // actionFn defines the action to be executed to bring the sequencer to the desired state.
loopActionFn func() // loopActionFn defines the logic to be executed inside control loop.
wg sync.WaitGroup
pauseCh chan struct{}
......@@ -271,6 +273,23 @@ type OpConductor struct {
rpcServer *oprpc.Server
}
type state struct {
leader, healthy, active bool
}
// NewState creates a new state instance.
func NewState(leader, healthy, active bool) *state {
return &state{
leader: leader,
healthy: healthy,
active: active,
}
}
func (s *state) Equal(other *state) bool {
return s.leader == other.leader && s.healthy == other.healthy && s.active == other.active
}
var _ cliapp.Lifecycle = (*OpConductor)(nil)
// Start implements cliapp.Lifecycle.
......@@ -430,29 +449,35 @@ func (oc *OpConductor) loop() {
for {
select {
// We process status update (health, leadership) first regardless of the paused state.
// This way we could properly bring the sequencer to the desired state when resumed.
case healthy := <-oc.healthUpdateCh:
oc.handleHealthUpdate(healthy)
case leader := <-oc.leaderUpdateCh:
oc.handleLeaderUpdate(leader)
case <-oc.pauseCh:
oc.paused.Store(true)
oc.pauseDoneCh <- struct{}{}
case <-oc.resumeCh:
oc.paused.Store(false)
oc.resumeDoneCh <- struct{}{}
// queue an action to make sure sequencer is in the desired state after resume.
oc.queueAction()
case <-oc.shutdownCtx.Done():
return
// Handle control action last, so that when executing the action, we have the latest status and bring the sequencer to the desired state.
case <-oc.actionCh:
oc.actionFn()
default:
oc.loopActionFn()
}
}
}
func (oc *OpConductor) loopAction() {
select {
case healthy := <-oc.healthUpdateCh:
oc.handleHealthUpdate(healthy)
case leader := <-oc.leaderUpdateCh:
oc.handleLeaderUpdate(leader)
case <-oc.pauseCh:
oc.paused.Store(true)
oc.pauseDoneCh <- struct{}{}
case <-oc.resumeCh:
oc.paused.Store(false)
oc.resumeDoneCh <- struct{}{}
// queue an action to make sure sequencer is in the desired state after resume.
oc.queueAction()
case <-oc.shutdownCtx.Done():
return
case <-oc.actionCh:
oc.action()
}
}
func (oc *OpConductor) queueAction() {
select {
case oc.actionCh <- struct{}{}:
......@@ -472,6 +497,7 @@ func (oc *OpConductor) handleLeaderUpdate(leader bool) {
// handleHealthUpdate handles health update from health monitor.
func (oc *OpConductor) handleHealthUpdate(hcerr error) {
oc.log.Debug("received health update", "server", oc.cons.ServerID(), "error", hcerr)
healthy := hcerr == nil
if !healthy {
oc.log.Error("Sequencer is unhealthy", "server", oc.cons.ServerID(), "err", hcerr)
......@@ -495,8 +521,11 @@ func (oc *OpConductor) action() {
}
var err error
status := NewState(oc.leader.Load(), oc.healthy.Load(), oc.seqActive.Load())
oc.log.Debug("entering action with status", "status", status)
// exhaust all cases below for completeness, 3 state, 8 cases.
switch status := struct{ leader, healthy, active bool }{oc.leader.Load(), oc.healthy.Load(), oc.seqActive.Load()}; {
switch {
case !status.leader && !status.healthy && !status.active:
// if follower is not healthy and not sequencing, just log an error
oc.log.Error("server (follower) is not healthy", "server", oc.cons.ServerID())
......@@ -509,9 +538,35 @@ func (oc *OpConductor) action() {
// stop sequencer, this happens when current server steps down as leader.
err = oc.stopSequencer()
case status.leader && !status.healthy && !status.active:
// transfer leadership to another node
// There are 2 scenarios we need to handle:
// 1. current node is follower, active sequencer became unhealthy and started the leadership transfer process.
// however if leadership transfer took longer than the time for health monitor to treat the node as unhealthy,
// then basically the entire network is stalled and we need to start sequencing in this case.
if !oc.prevState.leader && !oc.prevState.active {
_, _, cerr := oc.compareUnsafeHead(oc.shutdownCtx)
if cerr == nil && !errors.Is(oc.hcerr, health.ErrSequencerConnectionDown) {
// if unsafe in consensus is the same as unsafe in op-node, then it is scenario #1 and we should start sequencer.
err = oc.startSequencer()
break
}
}
// 2. for other cases, we should try to transfer leader to another node.
// for example, if follower became a leader and unhealthy at the same time (just unhealthy itself), then we should transfer leadership.
err = oc.transferLeader()
case status.leader && !status.healthy && status.active:
// There are two scenarios we need to handle here:
// 1. we're transitioned from case status.leader && !status.healthy && !status.active, see description above
// then we should continue to sequence blocks and try to bring ourselves back to healthy state.
// note: we need to also make sure that the health error is not due to ErrSequencerConnectionDown
// because in this case, we should stop sequencing and transfer leadership to other nodes.
if oc.prevState.leader && !oc.prevState.healthy && !oc.prevState.active && !errors.Is(oc.hcerr, health.ErrSequencerConnectionDown) {
err = errors.New("waiting for sequencing to become healthy by itself")
break
}
// 2. we're here becasuse an healthy leader became unhealthy itself
// then we should try to stop sequencing locally and transfer leadership.
var result *multierror.Error
// Try to stop sequencer first, but since sequencer is not healthy, we may not be able to stop it.
// In this case, it's fine to continue to try to transfer leadership to another server. This is safe because
......@@ -536,17 +591,25 @@ func (oc *OpConductor) action() {
// normal leader, do nothing
}
oc.log.Debug("exiting action with status and error", "status", status, "err", err)
if err != nil {
oc.log.Error("failed to execute step, queueing another one to retry", "err", err)
// randomly sleep for 0-200ms to avoid excessive retry
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
oc.queueAction()
return
}
if !status.Equal(oc.prevState) {
oc.log.Info("state changed", "prev_state", oc.prevState, "new_state", status)
oc.prevState = status
}
}
// transferLeader tries to transfer leadership to another server.
func (oc *OpConductor) transferLeader() error {
// TransferLeader here will do round robin to try to transfer leadership to the next healthy node.
oc.log.Info("transferring leadership", "server", oc.cons.ServerID())
err := oc.cons.TransferLeader()
if err == nil {
oc.leader.Store(false)
......@@ -567,15 +630,20 @@ func (oc *OpConductor) transferLeader() error {
func (oc *OpConductor) stopSequencer() error {
oc.log.Info("stopping sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load())
if _, err := oc.ctrl.StopSequencer(context.Background()); err != nil {
return errors.Wrap(err, "failed to stop sequencer")
_, err := oc.ctrl.StopSequencer(context.Background())
if err != nil {
if strings.Contains(err.Error(), driver.ErrSequencerAlreadyStopped.Error()) {
oc.log.Warn("sequencer already stopped.", "err", err)
} else {
return errors.Wrap(err, "failed to stop sequencer")
}
}
oc.seqActive.Store(false)
return nil
}
func (oc *OpConductor) startSequencer() error {
oc.log.Info("starting sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load())
ctx := context.Background()
// When starting sequencer, we need to make sure that the current node has the latest unsafe head from the consensus protocol
......@@ -592,6 +660,7 @@ func (oc *OpConductor) startSequencer() error {
return err
}
oc.log.Info("starting sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load())
if err = oc.ctrl.StartSequencer(ctx, unsafeInCons.ExecutionPayload.BlockHash); err != nil {
// cannot directly compare using Errors.Is because the error is returned from an JSON RPC server which lost its type.
if !strings.Contains(err.Error(), driver.ErrSequencerAlreadyStarted.Error()) {
......@@ -616,6 +685,7 @@ func (oc *OpConductor) compareUnsafeHead(ctx context.Context) (*eth.ExecutionPay
return unsafeInCons, nil, errors.Wrap(err, "failed to get latest unsafe block from EL during compareUnsafeHead phase")
}
oc.log.Debug("comparing unsafe head", "consensus", unsafeInCons.ExecutionPayload.BlockNumber, "node", unsafeInNode.NumberU64())
if unsafeInCons.ExecutionPayload.BlockHash != unsafeInNode.Hash() {
oc.log.Warn(
"latest unsafe block in consensus is not the same as the one in op-node",
......@@ -636,6 +706,7 @@ func (oc *OpConductor) updateSequencerActiveStatus() error {
if err != nil {
return errors.Wrap(err, "failed to get sequencer active status")
}
oc.log.Info("sequencer active status updated", "active", active)
oc.seqActive.Store(active)
return nil
}
......@@ -79,12 +79,12 @@ func mockConfig(t *testing.T) Config {
type OpConductorTestSuite struct {
suite.Suite
conductor *OpConductor
conductor *OpConductor
healthUpdateCh chan error
leaderUpdateCh chan bool
ctx context.Context
err error
log log.Logger
cfg Config
version string
......@@ -92,8 +92,9 @@ type OpConductorTestSuite struct {
cons *consensusmocks.Consensus
hmon *healthmocks.HealthMonitor
next chan struct{}
wg sync.WaitGroup
syncEnabled bool // syncEnabled controls whether synchronization is enabled for test actions.
next chan struct{} // next is used to signal when the next action in the test can proceed.
wg sync.WaitGroup // wg ensures that test actions are completed before moving on.
}
func (s *OpConductorTestSuite) SetupSuite() {
......@@ -115,57 +116,75 @@ func (s *OpConductorTestSuite) SetupTest() {
s.NoError(err)
s.conductor = conductor
s.healthUpdateCh = make(chan error)
s.healthUpdateCh = make(chan error, 1)
s.hmon.EXPECT().Start().Return(nil)
s.conductor.healthUpdateCh = s.healthUpdateCh
s.leaderUpdateCh = make(chan bool)
s.leaderUpdateCh = make(chan bool, 1)
s.conductor.leaderUpdateCh = s.leaderUpdateCh
err = s.conductor.Start(s.ctx)
s.NoError(err)
s.False(s.conductor.Stopped())
s.err = errors.New("error")
s.syncEnabled = false // default to no sync, turn it on by calling s.enableSynchronization()
}
func (s *OpConductorTestSuite) TearDownTest() {
s.hmon.EXPECT().Stop().Return(nil)
s.cons.EXPECT().Shutdown().Return(nil)
if s.syncEnabled {
s.wg.Add(1)
s.next <- struct{}{}
}
s.NoError(s.conductor.Stop(s.ctx))
s.True(s.conductor.Stopped())
}
func (s *OpConductorTestSuite) startConductor() {
err := s.conductor.Start(s.ctx)
s.NoError(err)
s.False(s.conductor.Stopped())
}
// enableSynchronization wraps conductor actionFn with extra synchronization logic
// so that we could control the execution of actionFn and observe the internal state transition in between.
func (s *OpConductorTestSuite) enableSynchronization() {
s.conductor.actionFn = func() {
s.syncEnabled = true
s.conductor.loopActionFn = func() {
<-s.next
s.conductor.action()
s.conductor.loopAction()
s.wg.Done()
}
s.startConductor()
}
func (s *OpConductorTestSuite) disableSynchronization() {
s.syncEnabled = false
s.startConductor()
}
func (s *OpConductorTestSuite) execute(fn func()) {
s.wg.Add(1)
s.next <- struct{}{}
if fn != nil {
fn()
}
s.next <- struct{}{}
s.wg.Wait()
}
func (s *OpConductorTestSuite) updateLeaderStatusAndExecuteAction(ch chan bool, status bool) {
func updateStatusAndExecuteAction[T any](s *OpConductorTestSuite, ch chan T, status T) {
fn := func() {
ch <- status
}
s.execute(fn)
s.execute(fn) // this executes status update
s.executeAction()
}
func (s *OpConductorTestSuite) updateHealthStatusAndExecuteAction(ch chan error, status error) {
fn := func() {
ch <- status
}
s.execute(fn)
func (s *OpConductorTestSuite) updateLeaderStatusAndExecuteAction(status bool) {
updateStatusAndExecuteAction[bool](s, s.leaderUpdateCh, status)
}
func (s *OpConductorTestSuite) updateHealthStatusAndExecuteAction(status error) {
updateStatusAndExecuteAction[error](s, s.healthUpdateCh, status)
}
func (s *OpConductorTestSuite) executeAction() {
......@@ -174,6 +193,8 @@ func (s *OpConductorTestSuite) executeAction() {
// Scenario 1: pause -> resume -> stop
func (s *OpConductorTestSuite) TestControlLoop1() {
s.disableSynchronization()
// Pause
err := s.conductor.Pause(s.ctx)
s.NoError(err)
......@@ -181,6 +202,7 @@ func (s *OpConductorTestSuite) TestControlLoop1() {
// Send health update, make sure it can still be consumed.
s.healthUpdateCh <- nil
s.healthUpdateCh <- nil
// Resume
s.ctrl.EXPECT().SequencerActive(mock.Anything).Return(false, nil)
......@@ -198,6 +220,8 @@ func (s *OpConductorTestSuite) TestControlLoop1() {
// Scenario 2: pause -> pause -> resume -> resume
func (s *OpConductorTestSuite) TestControlLoop2() {
s.disableSynchronization()
// Pause
err := s.conductor.Pause(s.ctx)
s.NoError(err)
......@@ -229,6 +253,8 @@ func (s *OpConductorTestSuite) TestControlLoop2() {
// Scenario 3: pause -> stop
func (s *OpConductorTestSuite) TestControlLoop3() {
s.disableSynchronization()
// Pause
err := s.conductor.Pause(s.ctx)
s.NoError(err)
......@@ -242,7 +268,8 @@ func (s *OpConductorTestSuite) TestControlLoop3() {
s.True(s.conductor.Stopped())
}
// In this test, we have a follower that is not healthy and not sequencing, it becomes leader through election and we expect it to transfer leadership to another node.
// In this test, we have a follower that is not healthy and not sequencing, it becomes leader through election.
// But since it does not have the same unsafe head as in consensus. We expect it to transfer leadership to another node.
// [follower, not healthy, not sequencing] -- become leader --> [leader, not healthy, not sequencing] -- transfer leadership --> [follower, not healthy, not sequencing]
func (s *OpConductorTestSuite) TestScenario1() {
s.enableSynchronization()
......@@ -251,17 +278,42 @@ func (s *OpConductorTestSuite) TestScenario1() {
s.conductor.leader.Store(false)
s.conductor.healthy.Store(false)
s.conductor.seqActive.Store(false)
s.conductor.hcerr = health.ErrSequencerNotHealthy
s.conductor.prevState = &state{
leader: false,
healthy: false,
active: false,
}
// unsafe in consensus is different than unsafe in node.
mockPayload := &eth.ExecutionPayloadEnvelope{
ExecutionPayload: &eth.ExecutionPayload{
BlockNumber: 2,
BlockHash: [32]byte{4, 5, 6},
},
}
mockBlockInfo := &testutils.MockBlockInfo{
InfoNum: 1,
InfoHash: [32]byte{1, 2, 3},
}
s.cons.EXPECT().TransferLeader().Return(nil)
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(1)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1)
// become leader
s.updateLeaderStatusAndExecuteAction(s.leaderUpdateCh, true)
s.updateLeaderStatusAndExecuteAction(true)
// expect to transfer leadership, go back to [follower, not healthy, not sequencing]
s.False(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
s.False(s.conductor.seqActive.Load())
s.cons.AssertCalled(s.T(), "TransferLeader")
s.Equal(health.ErrSequencerNotHealthy, s.conductor.hcerr)
s.Equal(&state{
leader: true,
healthy: false,
active: false,
}, s.conductor.prevState)
s.cons.AssertNumberOfCalls(s.T(), "TransferLeader", 1)
}
// In this test, we have a follower that is not healthy and not sequencing. it becomes healthy and we expect it to stay as follower and not start sequencing.
......@@ -275,7 +327,7 @@ func (s *OpConductorTestSuite) TestScenario2() {
s.conductor.seqActive.Store(false)
// become healthy
s.updateHealthStatusAndExecuteAction(s.healthUpdateCh, nil)
s.updateHealthStatusAndExecuteAction(nil)
// expect to stay as follower, go to [follower, healthy, not sequencing]
s.False(s.conductor.leader.Load())
......@@ -310,7 +362,7 @@ func (s *OpConductorTestSuite) TestScenario3() {
s.False(s.conductor.seqActive.Load())
// become leader
s.updateLeaderStatusAndExecuteAction(s.leaderUpdateCh, true)
s.updateLeaderStatusAndExecuteAction(true)
// [leader, healthy, sequencing]
s.True(s.conductor.leader.Load())
......@@ -343,7 +395,7 @@ func (s *OpConductorTestSuite) TestScenario4() {
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1)
s.ctrl.EXPECT().PostUnsafePayload(mock.Anything, mock.Anything).Return(nil).Times(1)
s.updateLeaderStatusAndExecuteAction(s.leaderUpdateCh, true)
s.updateLeaderStatusAndExecuteAction(true)
// [leader, healthy, not sequencing]
s.True(s.conductor.leader.Load())
......@@ -384,7 +436,7 @@ func (s *OpConductorTestSuite) TestScenario5() {
s.conductor.seqActive.Store(false)
// become unhealthy
s.updateHealthStatusAndExecuteAction(s.healthUpdateCh, health.ErrSequencerNotHealthy)
s.updateHealthStatusAndExecuteAction(health.ErrSequencerNotHealthy)
// expect to stay as follower, go to [follower, not healthy, not sequencing]
s.False(s.conductor.leader.Load())
......@@ -405,7 +457,7 @@ func (s *OpConductorTestSuite) TestScenario6() {
s.ctrl.EXPECT().StopSequencer(mock.Anything).Return(common.Hash{}, nil).Times(1)
// step down as leader
s.updateLeaderStatusAndExecuteAction(s.leaderUpdateCh, false)
s.updateLeaderStatusAndExecuteAction(false)
// expect to stay as follower, go to [follower, healthy, not sequencing]
s.False(s.conductor.leader.Load())
......@@ -429,7 +481,7 @@ func (s *OpConductorTestSuite) TestScenario7() {
s.ctrl.EXPECT().StopSequencer(mock.Anything).Return(common.Hash{}, nil).Times(1)
// become unhealthy
s.updateHealthStatusAndExecuteAction(s.healthUpdateCh, health.ErrSequencerNotHealthy)
s.updateHealthStatusAndExecuteAction(health.ErrSequencerNotHealthy)
// expect to step down as leader and stop sequencing
s.False(s.conductor.leader.Load())
......@@ -448,34 +500,50 @@ func (s *OpConductorTestSuite) TestScenario7() {
// 5. [follower, unhealthy, not sequencing]
func (s *OpConductorTestSuite) TestFailureAndRetry1() {
s.enableSynchronization()
err := errors.New("failure")
// set initial state
s.conductor.leader.Store(true)
s.conductor.healthy.Store(true)
s.conductor.seqActive.Store(true)
s.conductor.prevState = &state{
leader: true,
healthy: true,
active: true,
}
// step 1 & 2: become unhealthy, stop sequencing failed, transfer leadership failed
s.cons.EXPECT().TransferLeader().Return(err).Times(1)
s.ctrl.EXPECT().StopSequencer(mock.Anything).Return(common.Hash{}, err).Times(1)
s.cons.EXPECT().TransferLeader().Return(s.err).Times(1)
s.ctrl.EXPECT().StopSequencer(mock.Anything).Return(common.Hash{}, s.err).Times(1)
s.updateHealthStatusAndExecuteAction(s.healthUpdateCh, health.ErrSequencerNotHealthy)
s.updateHealthStatusAndExecuteAction(health.ErrSequencerNotHealthy)
s.True(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
s.True(s.conductor.seqActive.Load())
s.Equal(health.ErrSequencerNotHealthy, s.conductor.hcerr)
s.Equal(&state{
leader: true,
healthy: true,
active: true,
}, s.conductor.prevState)
s.ctrl.AssertNumberOfCalls(s.T(), "StopSequencer", 1)
s.cons.AssertNumberOfCalls(s.T(), "TransferLeader", 1)
// step 3: [leader, unhealthy, sequencing] -- stop sequencing succeeded, transfer leadership failed, retry
s.ctrl.EXPECT().StopSequencer(mock.Anything).Return(common.Hash{}, nil).Times(1)
s.cons.EXPECT().TransferLeader().Return(err).Times(1)
s.cons.EXPECT().TransferLeader().Return(s.err).Times(1)
s.executeAction()
s.True(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
s.False(s.conductor.seqActive.Load())
s.Equal(health.ErrSequencerNotHealthy, s.conductor.hcerr)
s.Equal(&state{
leader: true,
healthy: true,
active: true,
}, s.conductor.prevState)
s.ctrl.AssertNumberOfCalls(s.T(), "StopSequencer", 2)
s.cons.AssertNumberOfCalls(s.T(), "TransferLeader", 2)
......@@ -488,6 +556,12 @@ func (s *OpConductorTestSuite) TestFailureAndRetry1() {
s.False(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
s.False(s.conductor.seqActive.Load())
s.Equal(health.ErrSequencerNotHealthy, s.conductor.hcerr)
s.Equal(&state{
leader: true,
healthy: false,
active: false,
}, s.conductor.prevState)
s.ctrl.AssertNumberOfCalls(s.T(), "StopSequencer", 2)
s.cons.AssertNumberOfCalls(s.T(), "TransferLeader", 3)
}
......@@ -500,22 +574,32 @@ func (s *OpConductorTestSuite) TestFailureAndRetry1() {
// 4. [follower, unhealthy, not sequencing]
func (s *OpConductorTestSuite) TestFailureAndRetry2() {
s.enableSynchronization()
err := errors.New("failure")
// set initial state
s.conductor.leader.Store(true)
s.conductor.healthy.Store(true)
s.conductor.seqActive.Store(true)
s.conductor.prevState = &state{
leader: true,
healthy: true,
active: true,
}
// step 1 & 2: become unhealthy, stop sequencing failed, transfer leadership succeeded, retry
s.cons.EXPECT().TransferLeader().Return(nil).Times(1)
s.ctrl.EXPECT().StopSequencer(mock.Anything).Return(common.Hash{}, err).Times(1)
s.ctrl.EXPECT().StopSequencer(mock.Anything).Return(common.Hash{}, s.err).Times(1)
s.updateHealthStatusAndExecuteAction(s.healthUpdateCh, health.ErrSequencerNotHealthy)
s.updateHealthStatusAndExecuteAction(health.ErrSequencerNotHealthy)
s.False(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
s.True(s.conductor.seqActive.Load())
s.Equal(health.ErrSequencerNotHealthy, s.conductor.hcerr)
s.Equal(&state{
leader: true,
healthy: true,
active: true,
}, s.conductor.prevState)
s.ctrl.AssertNumberOfCalls(s.T(), "StopSequencer", 1)
s.cons.AssertNumberOfCalls(s.T(), "TransferLeader", 1)
......@@ -527,10 +611,109 @@ func (s *OpConductorTestSuite) TestFailureAndRetry2() {
s.False(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
s.False(s.conductor.seqActive.Load())
s.Equal(&state{
leader: false,
healthy: false,
active: true,
}, s.conductor.prevState)
s.ctrl.AssertNumberOfCalls(s.T(), "StopSequencer", 2)
s.cons.AssertNumberOfCalls(s.T(), "TransferLeader", 1)
}
// In this test, we have a follower that is unhealthy (due to active sequencer not producing blocks)
// Then leadership transfer happened, and the follower became leader. We expect it to start sequencing and catch up eventually.
// 1. [follower, healthy, not sequencing] -- become unhealthy -->
// 2. [follower, unhealthy, not sequencing] -- gained leadership -->
// 3. [leader, unhealthy, not sequencing] -- start sequencing -->
// 4. [leader, unhealthy, sequencing] -> become healthy again -->
// 5. [leader, healthy, sequencing]
func (s *OpConductorTestSuite) TestFailureAndRetry3() {
s.enableSynchronization()
// set initial state, healthy follower
s.conductor.leader.Store(false)
s.conductor.healthy.Store(true)
s.conductor.seqActive.Store(false)
s.conductor.prevState = &state{
leader: false,
healthy: true,
active: false,
}
s.log.Info("1. become unhealthy")
s.updateHealthStatusAndExecuteAction(health.ErrSequencerNotHealthy)
s.False(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
s.False(s.conductor.seqActive.Load())
s.Equal(&state{
leader: false,
healthy: false,
active: false,
}, s.conductor.prevState)
s.log.Info("2 & 3. gained leadership, start sequencing")
mockPayload := &eth.ExecutionPayloadEnvelope{
ExecutionPayload: &eth.ExecutionPayload{
BlockNumber: 1,
BlockHash: [32]byte{1, 2, 3},
},
}
mockBlockInfo := &testutils.MockBlockInfo{
InfoNum: 1,
InfoHash: [32]byte{1, 2, 3},
}
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(2)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(2)
s.ctrl.EXPECT().StartSequencer(mock.Anything, mockBlockInfo.InfoHash).Return(nil).Times(1)
s.updateLeaderStatusAndExecuteAction(true)
s.True(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
s.True(s.conductor.seqActive.Load())
s.Equal(&state{
leader: true,
healthy: false,
active: false,
}, s.conductor.prevState)
s.cons.AssertNumberOfCalls(s.T(), "LatestUnsafePayload", 2)
s.ctrl.AssertNumberOfCalls(s.T(), "LatestUnsafeBlock", 2)
s.ctrl.AssertNumberOfCalls(s.T(), "StartSequencer", 1)
s.log.Info("4. stay unhealthy for a bit while catching up")
s.updateHealthStatusAndExecuteAction(health.ErrSequencerNotHealthy)
s.True(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
s.True(s.conductor.seqActive.Load())
s.Equal(&state{
leader: true,
healthy: false,
active: false,
}, s.conductor.prevState)
s.log.Info("5. become healthy again")
s.updateHealthStatusAndExecuteAction(nil)
// need to use eventually here because starting from step 4, the loop is gonna queue an action and retry until it became healthy again.
// use eventually here avoids the situation where health update is consumed after the action is executed.
s.Eventually(func() bool {
res := s.conductor.leader.Load() == true &&
s.conductor.healthy.Load() == true &&
s.conductor.seqActive.Load() == true &&
s.conductor.prevState.Equal(&state{
leader: true,
healthy: true,
active: true,
})
if !res {
s.executeAction()
}
return res
}, 2*time.Second, 100*time.Millisecond)
}
func (s *OpConductorTestSuite) TestHandleInitError() {
// This will cause an error in the init function, which should cause the conductor to stop successfully without issues.
_, err := New(s.ctx, &s.cfg, s.log, s.version)
......@@ -539,6 +722,6 @@ func (s *OpConductorTestSuite) TestHandleInitError() {
s.False(ok)
}
func TestHealthMonitor(t *testing.T) {
func TestControlLoop(t *testing.T) {
suite.Run(t, new(OpConductorTestSuite))
}
......@@ -44,6 +44,7 @@ func NewSequencerHealthMonitor(log log.Logger, interval, unsafeInterval, safeInt
unsafeInterval: unsafeInterval,
safeInterval: safeInterval,
minPeerCount: minPeerCount,
timeProviderFn: currentTimeProvicer,
node: node,
p2p: p2p,
}
......@@ -64,6 +65,8 @@ type SequencerHealthMonitor struct {
lastSeenUnsafeNum uint64
lastSeenUnsafeTime uint64
timeProviderFn func() uint64
node dial.RollupClientInterface
p2p p2p.API
}
......@@ -125,23 +128,17 @@ func (hm *SequencerHealthMonitor) healthCheck() error {
return ErrSequencerConnectionDown
}
now := uint64(time.Now().Unix())
now := hm.timeProviderFn()
var timeDiff, blockDiff, expectedBlocks uint64
if hm.lastSeenUnsafeNum != 0 {
diff := now - hm.lastSeenUnsafeTime
timeDiff = now - hm.lastSeenUnsafeTime
blockDiff = status.UnsafeL2.Number - hm.lastSeenUnsafeNum
// how many blocks do we expect to see, minus 1 to account for edge case with respect to time.
// for example, if diff = 2.001s and block time = 2s, expecting to see 1 block could potentially cause sequencer to be considered unhealthy.
blocks := diff/hm.rollupCfg.BlockTime - 1
if diff > hm.rollupCfg.BlockTime && blocks > status.UnsafeL2.Number-hm.lastSeenUnsafeNum {
hm.log.Error(
"unsafe head is not progressing as expected",
"now", now,
"unsafe_head_num", status.UnsafeL2.Number,
"last_seen_unsafe_num", hm.lastSeenUnsafeNum,
"last_seen_unsafe_time", hm.lastSeenUnsafeTime,
"unsafe_interval", hm.unsafeInterval,
)
return ErrSequencerNotHealthy
expectedBlocks = timeDiff / hm.rollupCfg.BlockTime
if expectedBlocks > 0 {
expectedBlocks--
}
}
if status.UnsafeL2.Number > hm.lastSeenUnsafeNum {
......@@ -149,6 +146,18 @@ func (hm *SequencerHealthMonitor) healthCheck() error {
hm.lastSeenUnsafeTime = now
}
if timeDiff > hm.rollupCfg.BlockTime && expectedBlocks > blockDiff {
hm.log.Error(
"unsafe head is not progressing as expected",
"now", now,
"unsafe_head_num", status.UnsafeL2.Number,
"last_seen_unsafe_num", hm.lastSeenUnsafeNum,
"last_seen_unsafe_time", hm.lastSeenUnsafeTime,
"unsafe_interval", hm.unsafeInterval,
)
return ErrSequencerNotHealthy
}
if now-status.UnsafeL2.Time > hm.unsafeInterval {
hm.log.Error(
"unsafe head is not progressing as expected",
......@@ -183,3 +192,7 @@ func (hm *SequencerHealthMonitor) healthCheck() error {
return nil
}
func currentTimeProvicer() uint64 {
return uint64(time.Now().Unix())
}
......@@ -26,130 +26,192 @@ const (
type HealthMonitorTestSuite struct {
suite.Suite
log log.Logger
rc *testutils.MockRollupClient
pc *p2pMocks.API
interval uint64
unsafeInterval uint64
safeInterval uint64
minPeerCount uint64
rollupCfg *rollup.Config
monitor HealthMonitor
log log.Logger
interval uint64
minPeerCount uint64
rollupCfg *rollup.Config
}
func (s *HealthMonitorTestSuite) SetupSuite() {
s.log = testlog.Logger(s.T(), log.LvlInfo)
s.rc = &testutils.MockRollupClient{}
s.pc = &p2pMocks.API{}
s.log = testlog.Logger(s.T(), log.LvlDebug)
s.interval = 1
s.unsafeInterval = 3
s.safeInterval = 5
s.minPeerCount = minPeerCount
s.rollupCfg = &rollup.Config{
BlockTime: blockTime,
}
}
func (s *HealthMonitorTestSuite) SetupTest() {
s.monitor = NewSequencerHealthMonitor(s.log, s.interval, s.unsafeInterval, s.safeInterval, s.minPeerCount, s.rollupCfg, s.rc, s.pc)
err := s.monitor.Start()
s.NoError(err)
}
func (s *HealthMonitorTestSuite) TearDownTest() {
err := s.monitor.Stop()
func (s *HealthMonitorTestSuite) SetupMonitor(
now, unsafeInterval, safeInterval uint64,
mockRollupClient *testutils.MockRollupClient,
mockP2P *p2pMocks.API,
) *SequencerHealthMonitor {
tp := &timeProvider{now: now}
if mockP2P == nil {
mockP2P = &p2pMocks.API{}
ps1 := &p2p.PeerStats{
Connected: healthyPeerCount,
}
mockP2P.EXPECT().PeerStats(context.Background()).Return(ps1, nil)
}
monitor := &SequencerHealthMonitor{
log: s.log,
done: make(chan struct{}),
interval: s.interval,
healthUpdateCh: make(chan error),
rollupCfg: s.rollupCfg,
unsafeInterval: unsafeInterval,
safeInterval: safeInterval,
minPeerCount: s.minPeerCount,
timeProviderFn: tp.Now,
node: mockRollupClient,
p2p: mockP2P,
}
err := monitor.Start()
s.NoError(err)
return monitor
}
func (s *HealthMonitorTestSuite) TestUnhealthyLowPeerCount() {
s.T().Parallel()
now := uint64(time.Now().Unix())
ss1 := &eth.SyncStatus{
UnsafeL2: eth.L2BlockRef{
Time: now - 1,
},
SafeL2: eth.L2BlockRef{
Time: now - 2,
},
}
s.rc.ExpectSyncStatus(ss1, nil)
rc := &testutils.MockRollupClient{}
ss1 := mockSyncStatus(now-1, 1, now-3, 0)
rc.ExpectSyncStatus(ss1, nil)
rc.ExpectSyncStatus(ss1, nil)
pc := &p2pMocks.API{}
ps1 := &p2p.PeerStats{
Connected: unhealthyPeerCount,
}
s.pc.EXPECT().PeerStats(context.Background()).Return(ps1, nil).Times(1)
pc.EXPECT().PeerStats(context.Background()).Return(ps1, nil).Times(1)
monitor := s.SetupMonitor(now, 60, 60, rc, pc)
healthUpdateCh := s.monitor.Subscribe()
healthUpdateCh := monitor.Subscribe()
healthy := <-healthUpdateCh
s.NotNil(healthy)
s.NoError(monitor.Stop())
}
func (s *HealthMonitorTestSuite) TestUnhealthyUnsafeHeadNotProgressing() {
ps1 := &p2p.PeerStats{
Connected: healthyPeerCount,
}
s.pc.EXPECT().PeerStats(context.Background()).Return(ps1, nil).Times(3)
s.T().Parallel()
now := uint64(time.Now().Unix())
ss1 := &eth.SyncStatus{
UnsafeL2: eth.L2BlockRef{
Number: 5,
Time: now - 1,
},
SafeL2: eth.L2BlockRef{
Number: 1,
Time: now - 2,
},
rc := &testutils.MockRollupClient{}
ss1 := mockSyncStatus(now, 5, now-8, 1)
for i := 0; i < 6; i++ {
rc.ExpectSyncStatus(ss1, nil)
}
s.rc.ExpectSyncStatus(ss1, nil)
s.rc.ExpectSyncStatus(ss1, nil)
s.rc.ExpectSyncStatus(ss1, nil)
healthUpdateCh := s.monitor.Subscribe()
for i := 0; i < 3; i++ {
monitor := s.SetupMonitor(now, 60, 60, rc, nil)
healthUpdateCh := monitor.Subscribe()
for i := 0; i < 5; i++ {
healthy := <-healthUpdateCh
if i < 2 {
if i < 4 {
s.Nil(healthy)
s.Equal(now, monitor.lastSeenUnsafeTime)
s.Equal(uint64(5), monitor.lastSeenUnsafeNum)
} else {
s.NotNil(healthy)
}
}
s.NoError(monitor.Stop())
}
func (s *HealthMonitorTestSuite) TestUnhealthySafeHeadNotProgressing() {
ps1 := &p2p.PeerStats{
Connected: healthyPeerCount,
}
s.pc.EXPECT().PeerStats(context.Background()).Return(ps1, nil).Times(6)
s.T().Parallel()
now := uint64(time.Now().Unix())
syncStatusGenerator := func(unsafeTime uint64) *eth.SyncStatus {
return &eth.SyncStatus{
UnsafeL2: eth.L2BlockRef{
Time: unsafeTime,
},
SafeL2: eth.L2BlockRef{
Time: now,
},
}
}
s.rc.ExpectSyncStatus(syncStatusGenerator(now), nil)
s.rc.ExpectSyncStatus(syncStatusGenerator(now), nil)
s.rc.ExpectSyncStatus(syncStatusGenerator(now+2), nil)
s.rc.ExpectSyncStatus(syncStatusGenerator(now+2), nil)
s.rc.ExpectSyncStatus(syncStatusGenerator(now+4), nil)
s.rc.ExpectSyncStatus(syncStatusGenerator(now+4), nil)
healthUpdateCh := s.monitor.Subscribe()
for i := 0; i < 6; i++ {
rc := &testutils.MockRollupClient{}
rc.ExpectSyncStatus(mockSyncStatus(now, 1, now, 1), nil)
rc.ExpectSyncStatus(mockSyncStatus(now, 1, now, 1), nil)
rc.ExpectSyncStatus(mockSyncStatus(now+2, 2, now, 1), nil)
rc.ExpectSyncStatus(mockSyncStatus(now+2, 2, now, 1), nil)
rc.ExpectSyncStatus(mockSyncStatus(now+4, 3, now, 1), nil)
rc.ExpectSyncStatus(mockSyncStatus(now+4, 3, now, 1), nil)
monitor := s.SetupMonitor(now, 60, 3, rc, nil)
healthUpdateCh := monitor.Subscribe()
for i := 0; i < 5; i++ {
healthy := <-healthUpdateCh
if i < 5 {
if i < 4 {
s.Nil(healthy)
} else {
s.NotNil(healthy)
}
}
s.NoError(monitor.Stop())
}
func (s *HealthMonitorTestSuite) TestHealthyWithUnsafeLag() {
s.T().Parallel()
now := uint64(time.Now().Unix())
rc := &testutils.MockRollupClient{}
// although unsafe has lag of 20 seconds, it's within the configured unsafe interval
// and it is advancing every block time, so it should be considered safe.
rc.ExpectSyncStatus(mockSyncStatus(now-10, 1, now, 1), nil)
rc.ExpectSyncStatus(mockSyncStatus(now-10, 1, now, 1), nil)
rc.ExpectSyncStatus(mockSyncStatus(now-8, 2, now, 1), nil)
rc.ExpectSyncStatus(mockSyncStatus(now-8, 2, now, 1), nil)
monitor := s.SetupMonitor(now, 60, 60, rc, nil)
healthUpdateCh := monitor.Subscribe()
// confirm initial state
s.Zero(monitor.lastSeenUnsafeNum)
s.Zero(monitor.lastSeenUnsafeTime)
// confirm state after first check
healthy := <-healthUpdateCh
s.Nil(healthy)
lastSeenUnsafeTime := monitor.lastSeenUnsafeTime
s.NotZero(monitor.lastSeenUnsafeTime)
s.Equal(uint64(1), monitor.lastSeenUnsafeNum)
healthy = <-healthUpdateCh
s.Nil(healthy)
s.Equal(lastSeenUnsafeTime, monitor.lastSeenUnsafeTime)
s.Equal(uint64(1), monitor.lastSeenUnsafeNum)
healthy = <-healthUpdateCh
s.Nil(healthy)
s.Equal(lastSeenUnsafeTime+2, monitor.lastSeenUnsafeTime)
s.Equal(uint64(2), monitor.lastSeenUnsafeNum)
s.NoError(monitor.Stop())
}
func mockSyncStatus(unsafeTime, unsafeNum, safeTime, safeNum uint64) *eth.SyncStatus {
return &eth.SyncStatus{
UnsafeL2: eth.L2BlockRef{
Time: unsafeTime,
Number: unsafeNum,
},
SafeL2: eth.L2BlockRef{
Time: safeTime,
Number: safeNum,
},
}
}
func TestHealthMonitor(t *testing.T) {
suite.Run(t, new(HealthMonitorTestSuite))
}
type timeProvider struct {
now uint64
}
func (tp *timeProvider) Now() uint64 {
now := tp.now
tp.now++
return now
}
set -e
for i in {1..100}; do
echo "======================="
echo "Running iteration $i"
gotestsum -- -run 'TestControlLoop' ./... --count=1 --timeout=5s -race
if [ $? -ne 0 ]; then
echo "Test failed"
exit 1
fi
done
......@@ -22,7 +22,10 @@ import (
"github.com/ethereum-optimism/optimism/op-service/retry"
)
var ErrSequencerAlreadyStarted = errors.New("sequencer already running")
var (
ErrSequencerAlreadyStarted = errors.New("sequencer already running")
ErrSequencerAlreadyStopped = errors.New("sequencer not running")
)
// Deprecated: use eth.SyncStatus instead.
type SyncStatus = eth.SyncStatus
......@@ -429,7 +432,7 @@ func (s *Driver) eventLoop() {
}
case respCh := <-s.stopSequencer:
if s.driverConfig.SequencerStopped {
respCh <- hashAndError{err: errors.New("sequencer not running")}
respCh <- hashAndError{err: ErrSequencerAlreadyStopped}
} else {
if err := s.sequencerNotifs.SequencerStopped(); err != nil {
respCh <- hashAndError{err: fmt.Errorf("sequencer start notification: %w", err)}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment