Commit 04757a06 authored by angel-ding-cb's avatar angel-ding-cb Committed by GitHub

Added Stop API in the conductor (#11463)

* add stop api in the conductor

* add tests

* fix test cases

* fix conductor stop api test

* re-trigger ci tests
parent 389d2511
...@@ -23,6 +23,8 @@ type API interface { ...@@ -23,6 +23,8 @@ type API interface {
Pause(ctx context.Context) error Pause(ctx context.Context) error
// Resume resumes op-conductor. // Resume resumes op-conductor.
Resume(ctx context.Context) error Resume(ctx context.Context) error
// Stop stops op-conductor.
Stop(ctx context.Context) error
// Paused returns true if op-conductor is paused. // Paused returns true if op-conductor is paused.
Paused(ctx context.Context) (bool, error) Paused(ctx context.Context) (bool, error)
// Stopped returns true if op-conductor is stopped. // Stopped returns true if op-conductor is stopped.
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
type conductor interface { type conductor interface {
Pause(ctx context.Context) error Pause(ctx context.Context) error
Resume(ctx context.Context) error Resume(ctx context.Context) error
Stop(ctx context.Context) error
Paused() bool Paused() bool
Stopped() bool Stopped() bool
SequencerHealthy(ctx context.Context) bool SequencerHealthy(ctx context.Context) bool
...@@ -115,6 +116,11 @@ func (api *APIBackend) Resume(ctx context.Context) error { ...@@ -115,6 +116,11 @@ func (api *APIBackend) Resume(ctx context.Context) error {
return api.con.Resume(ctx) return api.con.Resume(ctx)
} }
// Stop implements API.
func (api *APIBackend) Stop(ctx context.Context) error {
return api.con.Stop(ctx)
}
// TransferLeader implements API. With Raft implementation, a successful call does not mean that leadership transfer is complete // TransferLeader implements API. With Raft implementation, a successful call does not mean that leadership transfer is complete
// It just means that leadership transfer is in progress (current leader has initiated a new leader election round and stepped down as leader) // It just means that leadership transfer is in progress (current leader has initiated a new leader election round and stepped down as leader)
func (api *APIBackend) TransferLeader(ctx context.Context) error { func (api *APIBackend) TransferLeader(ctx context.Context) error {
......
...@@ -102,6 +102,11 @@ func (c *APIClient) Resume(ctx context.Context) error { ...@@ -102,6 +102,11 @@ func (c *APIClient) Resume(ctx context.Context) error {
return c.c.CallContext(ctx, nil, prefixRPC("resume")) return c.c.CallContext(ctx, nil, prefixRPC("resume"))
} }
// Stop implements API.
func (c *APIClient) Stop(ctx context.Context) error {
return c.c.CallContext(ctx, nil, prefixRPC("stop"))
}
// TransferLeader implements API. // TransferLeader implements API.
func (c *APIClient) TransferLeader(ctx context.Context) error { func (c *APIClient) TransferLeader(ctx context.Context) error {
return c.c.CallContext(ctx, nil, prefixRPC("transferLeader")) return c.c.CallContext(ctx, nil, prefixRPC("transferLeader"))
......
...@@ -49,7 +49,7 @@ func TestSequencerFailover_ConductorRPC(t *testing.T) { ...@@ -49,7 +49,7 @@ func TestSequencerFailover_ConductorRPC(t *testing.T) {
sort.Strings(ids) sort.Strings(ids)
require.Equal(t, []string{Sequencer1Name, Sequencer2Name, Sequencer3Name}, ids, "Expected all sequencers to be in cluster") require.Equal(t, []string{Sequencer1Name, Sequencer2Name, Sequencer3Name}, ids, "Expected all sequencers to be in cluster")
// Test Active & Pause & Resume // Test Active & Pause & Resume & Stop
t.Log("Testing Active & Pause & Resume") t.Log("Testing Active & Pause & Resume")
active, err := c1.client.Active(ctx) active, err := c1.client.Active(ctx)
require.NoError(t, err) require.NoError(t, err)
...@@ -161,6 +161,13 @@ func TestSequencerFailover_ConductorRPC(t *testing.T) { ...@@ -161,6 +161,13 @@ func TestSequencerFailover_ConductorRPC(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 2, len(membership.Servers), "Expected 2 members in cluster after removal") require.Equal(t, 2, len(membership.Servers), "Expected 2 members in cluster after removal")
require.NotContains(t, memberIDs(membership), fid, "Expected follower to be removed from cluster") require.NotContains(t, memberIDs(membership), fid, "Expected follower to be removed from cluster")
// Testing the stop api
t.Log("Testing Stop API")
err = c1.client.Stop(ctx)
require.NoError(t, err)
_, err = c1.client.Stopped(ctx)
require.Error(t, err, "Expected no connection to the conductor since it's stopped")
} }
// [Category: Sequencer Failover] // [Category: Sequencer Failover]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment