Commit c4a50034 authored by Brian Bland's avatar Brian Bland Committed by GitHub

op-conductor: Fix leader promotion when node is behind consensus by 1 block (#10707)

* op-conductor: Fix leader promotion when node is behind consensus by 1 block

* Do not transfer leadership when posting unsafe head

* Fix related log line
parent b92dc8ff
...@@ -593,11 +593,11 @@ func (oc *OpConductor) action() { ...@@ -593,11 +593,11 @@ func (oc *OpConductor) action() {
// 1. current node is follower, active sequencer became unhealthy and started the leadership transfer process. // 1. current node is follower, active sequencer became unhealthy and started the leadership transfer process.
// however if leadership transfer took longer than the time for health monitor to treat the node as unhealthy, // however if leadership transfer took longer than the time for health monitor to treat the node as unhealthy,
// then basically the entire network is stalled and we need to start sequencing in this case. // then basically the entire network is stalled and we need to start sequencing in this case.
if !oc.prevState.leader && !oc.prevState.active { if !oc.prevState.leader && !oc.prevState.active && !errors.Is(oc.hcerr, health.ErrSequencerConnectionDown) {
_, _, cerr := oc.compareUnsafeHead(oc.shutdownCtx)
if cerr == nil && !errors.Is(oc.hcerr, health.ErrSequencerConnectionDown) {
// if unsafe in consensus is the same as unsafe in op-node, then it is scenario #1 and we should start sequencer.
err = oc.startSequencer() err = oc.startSequencer()
if err != nil {
oc.log.Error("failed to start sequencer, transferring leadership instead", "server", oc.cons.ServerID(), "err", err)
} else {
break break
} }
} }
...@@ -703,7 +703,6 @@ func (oc *OpConductor) startSequencer() error { ...@@ -703,7 +703,6 @@ func (oc *OpConductor) startSequencer() error {
// If not, then we wait for the unsafe head to catch up or gossip it to op-node manually from op-conductor. // If not, then we wait for the unsafe head to catch up or gossip it to op-node manually from op-conductor.
unsafeInCons, unsafeInNode, err := oc.compareUnsafeHead(ctx) unsafeInCons, unsafeInNode, err := oc.compareUnsafeHead(ctx)
// if there's a mismatch, try to post the unsafe head to op-node // if there's a mismatch, try to post the unsafe head to op-node
if err != nil {
if errors.Is(err, ErrUnsafeHeadMismatch) && uint64(unsafeInCons.ExecutionPayload.BlockNumber)-unsafeInNode.NumberU64() == 1 { if errors.Is(err, ErrUnsafeHeadMismatch) && uint64(unsafeInCons.ExecutionPayload.BlockNumber)-unsafeInNode.NumberU64() == 1 {
// tries to post the unsafe head to op-node when head is only 1 block behind (most likely due to gossip delay) // tries to post the unsafe head to op-node when head is only 1 block behind (most likely due to gossip delay)
oc.log.Debug( oc.log.Debug(
...@@ -713,10 +712,11 @@ func (oc *OpConductor) startSequencer() error { ...@@ -713,10 +712,11 @@ func (oc *OpConductor) startSequencer() error {
"node_num", unsafeInNode.NumberU64(), "node_num", unsafeInNode.NumberU64(),
"node_hash", unsafeInNode.Hash().Hex(), "node_hash", unsafeInNode.Hash().Hex(),
) )
if innerErr := oc.ctrl.PostUnsafePayload(ctx, unsafeInCons); innerErr != nil { if err := oc.ctrl.PostUnsafePayload(ctx, unsafeInCons); err != nil {
oc.log.Error("failed to post unsafe head payload envelope to op-node", "err", innerErr) oc.log.Error("failed to post unsafe head payload envelope to op-node", "err", err)
} return err
} }
} else if err != nil {
return err return err
} }
......
...@@ -293,7 +293,7 @@ func (s *OpConductorTestSuite) TestScenario1() { ...@@ -293,7 +293,7 @@ func (s *OpConductorTestSuite) TestScenario1() {
// unsafe in consensus is different than unsafe in node. // unsafe in consensus is different than unsafe in node.
mockPayload := &eth.ExecutionPayloadEnvelope{ mockPayload := &eth.ExecutionPayloadEnvelope{
ExecutionPayload: &eth.ExecutionPayload{ ExecutionPayload: &eth.ExecutionPayload{
BlockNumber: 2, BlockNumber: 3,
BlockHash: [32]byte{4, 5, 6}, BlockHash: [32]byte{4, 5, 6},
}, },
} }
...@@ -434,7 +434,8 @@ func (s *OpConductorTestSuite) TestScenario4() { ...@@ -434,7 +434,8 @@ func (s *OpConductorTestSuite) TestScenario4() {
} }
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1) s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1) s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1)
s.ctrl.EXPECT().PostUnsafePayload(mock.Anything, mock.Anything).Return(nil).Times(1) s.ctrl.EXPECT().PostUnsafePayload(mock.Anything, mockPayload).Return(errors.New("simulated PostUnsafePayload failure")).Times(1)
s.ctrl.EXPECT().StartSequencer(mock.Anything, mockPayload.ExecutionPayload.BlockHash).Return(nil).Times(1)
s.updateLeaderStatusAndExecuteAction(true) s.updateLeaderStatusAndExecuteAction(true)
...@@ -442,16 +443,14 @@ func (s *OpConductorTestSuite) TestScenario4() { ...@@ -442,16 +443,14 @@ func (s *OpConductorTestSuite) TestScenario4() {
s.True(s.conductor.leader.Load()) s.True(s.conductor.leader.Load())
s.True(s.conductor.healthy.Load()) s.True(s.conductor.healthy.Load())
s.False(s.conductor.seqActive.Load()) s.False(s.conductor.seqActive.Load())
s.ctrl.AssertNotCalled(s.T(), "StartSequencer", mock.Anything, mock.Anything) s.cons.AssertNumberOfCalls(s.T(), "LatestUnsafePayload", 1)
s.ctrl.AssertNumberOfCalls(s.T(), "LatestUnsafeBlock", 1) s.ctrl.AssertNumberOfCalls(s.T(), "LatestUnsafeBlock", 1)
s.ctrl.AssertNumberOfCalls(s.T(), "PostUnsafePayload", 1) s.ctrl.AssertNumberOfCalls(s.T(), "PostUnsafePayload", 1)
s.cons.AssertNumberOfCalls(s.T(), "LatestUnsafePayload", 1) s.ctrl.AssertNotCalled(s.T(), "StartSequencer", mock.Anything, mock.Anything)
// unsafe caught up, we try to start sequencer at specified block and succeeds
mockBlockInfo.InfoNum = 2
mockBlockInfo.InfoHash = [32]byte{1, 2, 3}
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1) s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1) s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1)
s.ctrl.EXPECT().PostUnsafePayload(mock.Anything, mockPayload).Return(nil).Times(1)
s.ctrl.EXPECT().StartSequencer(mock.Anything, mockBlockInfo.InfoHash).Return(nil).Times(1) s.ctrl.EXPECT().StartSequencer(mock.Anything, mockBlockInfo.InfoHash).Return(nil).Times(1)
s.executeAction() s.executeAction()
...@@ -460,10 +459,10 @@ func (s *OpConductorTestSuite) TestScenario4() { ...@@ -460,10 +459,10 @@ func (s *OpConductorTestSuite) TestScenario4() {
s.True(s.conductor.leader.Load()) s.True(s.conductor.leader.Load())
s.True(s.conductor.healthy.Load()) s.True(s.conductor.healthy.Load())
s.True(s.conductor.seqActive.Load()) s.True(s.conductor.seqActive.Load())
s.cons.AssertNumberOfCalls(s.T(), "LatestUnsafePayload", 2)
s.ctrl.AssertNumberOfCalls(s.T(), "LatestUnsafeBlock", 2) s.ctrl.AssertNumberOfCalls(s.T(), "LatestUnsafeBlock", 2)
s.ctrl.AssertNumberOfCalls(s.T(), "PostUnsafePayload", 1) s.ctrl.AssertNumberOfCalls(s.T(), "PostUnsafePayload", 2)
s.ctrl.AssertNumberOfCalls(s.T(), "StartSequencer", 1) s.ctrl.AssertNumberOfCalls(s.T(), "StartSequencer", 1)
s.cons.AssertNumberOfCalls(s.T(), "LatestUnsafePayload", 2)
} }
// In this test, we have a follower that is healthy and not sequencing, we send a unhealthy update to it and expect it to stay as follower and not start sequencing. // In this test, we have a follower that is healthy and not sequencing, we send a unhealthy update to it and expect it to stay as follower and not start sequencing.
...@@ -718,8 +717,8 @@ func (s *OpConductorTestSuite) TestFailureAndRetry3() { ...@@ -718,8 +717,8 @@ func (s *OpConductorTestSuite) TestFailureAndRetry3() {
healthy: false, healthy: false,
active: false, active: false,
}, s.conductor.prevState) }, s.conductor.prevState)
s.cons.AssertNumberOfCalls(s.T(), "LatestUnsafePayload", 2) s.cons.AssertNumberOfCalls(s.T(), "LatestUnsafePayload", 1)
s.ctrl.AssertNumberOfCalls(s.T(), "LatestUnsafeBlock", 2) s.ctrl.AssertNumberOfCalls(s.T(), "LatestUnsafeBlock", 1)
s.ctrl.AssertNumberOfCalls(s.T(), "StartSequencer", 1) s.ctrl.AssertNumberOfCalls(s.T(), "StartSequencer", 1)
s.log.Info("4. stay unhealthy for a bit while catching up") s.log.Info("4. stay unhealthy for a bit while catching up")
...@@ -755,6 +754,102 @@ func (s *OpConductorTestSuite) TestFailureAndRetry3() { ...@@ -755,6 +754,102 @@ func (s *OpConductorTestSuite) TestFailureAndRetry3() {
}, 2*time.Second, time.Millisecond) }, 2*time.Second, time.Millisecond)
} }
// This test is similar to TestFailureAndRetry3, but the consensus payload is one block ahead of the new leader's unsafe head.
// Then leadership transfer happened, and the follower became leader. We expect it to start sequencing and catch up eventually.
// 1. [follower, healthy, not sequencing] -- become unhealthy -->
// 2. [follower, unhealthy, not sequencing] -- gained leadership -->
// 3. [leader, unhealthy, not sequencing] -- start sequencing -->
// 4. [leader, unhealthy, sequencing] -> become healthy again -->
// 5. [leader, healthy, sequencing]
func (s *OpConductorTestSuite) TestFailureAndRetry4() {
s.enableSynchronization()
// set initial state, healthy follower
s.conductor.leader.Store(false)
s.conductor.healthy.Store(true)
s.conductor.seqActive.Store(false)
s.conductor.prevState = &state{
leader: false,
healthy: true,
active: false,
}
s.log.Info("1. become unhealthy")
s.updateHealthStatusAndExecuteAction(health.ErrSequencerNotHealthy)
s.False(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
s.False(s.conductor.seqActive.Load())
s.Equal(&state{
leader: false,
healthy: false,
active: false,
}, s.conductor.prevState)
s.log.Info("2 & 3. gained leadership, post unsafe payload and start sequencing")
mockPayload := &eth.ExecutionPayloadEnvelope{
ExecutionPayload: &eth.ExecutionPayload{
BlockNumber: 2,
BlockHash: [32]byte{4, 5, 6},
},
}
mockBlockInfo := &testutils.MockBlockInfo{
InfoNum: 1,
InfoHash: [32]byte{1, 2, 3},
}
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(2)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(2)
s.ctrl.EXPECT().PostUnsafePayload(mock.Anything, mockPayload).Return(nil).Times(1)
s.ctrl.EXPECT().StartSequencer(mock.Anything, mockPayload.ExecutionPayload.BlockHash).Return(nil).Times(1)
s.updateLeaderStatusAndExecuteAction(true)
s.True(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
s.True(s.conductor.seqActive.Load())
s.Equal(&state{
leader: true,
healthy: false,
active: false,
}, s.conductor.prevState)
s.cons.AssertNumberOfCalls(s.T(), "LatestUnsafePayload", 1)
s.ctrl.AssertNumberOfCalls(s.T(), "LatestUnsafeBlock", 1)
s.ctrl.AssertNumberOfCalls(s.T(), "PostUnsafePayload", 1)
s.ctrl.AssertNumberOfCalls(s.T(), "StartSequencer", 1)
s.log.Info("4. stay unhealthy for a bit while catching up")
s.updateHealthStatusAndExecuteAction(health.ErrSequencerNotHealthy)
s.True(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
s.True(s.conductor.seqActive.Load())
s.Equal(&state{
leader: true,
healthy: false,
active: false,
}, s.conductor.prevState)
s.log.Info("5. become healthy again")
s.updateHealthStatusAndExecuteAction(nil)
// need to use eventually here because starting from step 4, the loop is gonna queue an action and retry until it became healthy again.
// use eventually here avoids the situation where health update is consumed after the action is executed.
s.Eventually(func() bool {
res := s.conductor.leader.Load() == true &&
s.conductor.healthy.Load() == true &&
s.conductor.seqActive.Load() == true &&
s.conductor.prevState.Equal(&state{
leader: true,
healthy: true,
active: true,
})
if !res {
s.executeAction()
}
return res
}, 2*time.Second, 100*time.Millisecond)
}
func (s *OpConductorTestSuite) TestHandleInitError() { func (s *OpConductorTestSuite) TestHandleInitError() {
// This will cause an error in the init function, which should cause the conductor to stop successfully without issues. // This will cause an error in the init function, which should cause the conductor to stop successfully without issues.
_, err := New(s.ctx, &s.cfg, s.log, s.version) _, err := New(s.ctx, &s.cfg, s.log, s.version)
......
...@@ -114,7 +114,7 @@ func (p *ActiveL2RollupProvider) findActiveEndpoints(ctx context.Context) error ...@@ -114,7 +114,7 @@ func (p *ActiveL2RollupProvider) findActiveEndpoints(ctx context.Context) error
if offset != 0 || p.currentRollupClient == nil { if offset != 0 || p.currentRollupClient == nil {
if err := p.dialSequencer(ctx, idx); err != nil { if err := p.dialSequencer(ctx, idx); err != nil {
errs = errors.Join(errs, err) errs = errors.Join(errs, err)
p.log.Warn("Error dialing next sequencer.", "err", err, "index", p.rollupIndex) p.log.Warn("Error dialing next sequencer.", "err", err, "index", idx)
continue continue
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment