Commit 0bb839cf authored by Brian Bland's avatar Brian Bland Committed by GitHub

op-conductor: Add strongly consistent cluster membership APIs (#10823)

* op-conductor: Add optional version parameter to cluster membership changes

* Fix tests

* Add API backwards compatibility for conductor changes

* Clean up some boilerplate
parent 5bbe7170
......@@ -453,18 +453,18 @@ func (oc *OpConductor) LeaderWithID(_ context.Context) *consensus.ServerInfo {
}
// AddServerAsVoter adds a server as a voter to the cluster.
func (oc *OpConductor) AddServerAsVoter(_ context.Context, id string, addr string) error {
return oc.cons.AddVoter(id, addr)
func (oc *OpConductor) AddServerAsVoter(_ context.Context, id string, addr string, version uint64) error {
return oc.cons.AddVoter(id, addr, version)
}
// AddServerAsNonvoter adds a server as a non-voter to the cluster. non-voter will not participate in leader election.
func (oc *OpConductor) AddServerAsNonvoter(_ context.Context, id string, addr string) error {
return oc.cons.AddNonVoter(id, addr)
func (oc *OpConductor) AddServerAsNonvoter(_ context.Context, id string, addr string, version uint64) error {
return oc.cons.AddNonVoter(id, addr, version)
}
// RemoveServer removes a server from the cluster.
func (oc *OpConductor) RemoveServer(_ context.Context, id string) error {
return oc.cons.RemoveServer(id)
func (oc *OpConductor) RemoveServer(_ context.Context, id string, version uint64) error {
return oc.cons.RemoveServer(id, version)
}
// TransferLeader transfers leadership to another server.
......@@ -488,7 +488,7 @@ func (oc *OpConductor) SequencerHealthy(_ context.Context) bool {
}
// ClusterMembership returns current cluster's membership information.
func (oc *OpConductor) ClusterMembership(_ context.Context) ([]*consensus.ServerInfo, error) {
func (oc *OpConductor) ClusterMembership(_ context.Context) (*consensus.ClusterMembership, error) {
return oc.cons.ClusterMembership()
}
......
......@@ -25,6 +25,12 @@ func (s ServerSuffrage) String() string {
return "ServerSuffrage"
}
// ClusterMembership defines a versioned list of servers in the cluster.
type ClusterMembership struct {
Servers []ServerInfo `json:"servers"`
Version uint64 `json:"version"`
}
// ServerInfo defines the server information.
type ServerInfo struct {
ID string `json:"id"`
......@@ -37,13 +43,17 @@ type ServerInfo struct {
//go:generate mockery --name Consensus --output mocks/ --with-expecter=true
type Consensus interface {
// AddVoter adds a voting member into the cluster, voter is eligible to become leader.
AddVoter(id, addr string) error
// If version is non-zero, this will only be applied if the current cluster version matches the expected version.
AddVoter(id, addr string, version uint64) error
// AddNonVoter adds a non-voting member into the cluster, non-voter is not eligible to become leader.
AddNonVoter(id, addr string) error
// If version is non-zero, this will only be applied if the current cluster version matches the expected version.
AddNonVoter(id, addr string, version uint64) error
// DemoteVoter demotes a voting member into a non-voting member, if leader is being demoted, it will cause a new leader election.
DemoteVoter(id string) error
// If version is non-zero, this will only be applied if the current cluster version matches the expected version.
DemoteVoter(id string, version uint64) error
// RemoveServer removes a member (both voter or non-voter) from the cluster, if leader is being removed, it will cause a new leader election.
RemoveServer(id string) error
// If version is non-zero, this will only be applied if the current cluster version matches the expected version.
RemoveServer(id string, version uint64) error
// LeaderCh returns a channel that will be notified when leadership status changes (true = leader, false = follower)
LeaderCh() <-chan bool
// Leader returns if it is the leader of the cluster.
......@@ -56,8 +66,8 @@ type Consensus interface {
TransferLeader() error
// TransferLeaderTo triggers leadership transfer to a specific member in the cluster.
TransferLeaderTo(id, addr string) error
// ClusterMembership returns the current cluster membership configuration.
ClusterMembership() ([]*ServerInfo, error)
// ClusterMembership returns the current cluster membership configuration and associated version.
ClusterMembership() (*ClusterMembership, error)
// CommitPayload commits latest unsafe payload to the FSM in a strongly consistent fashion.
CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error
......
......@@ -22,17 +22,17 @@ func (_m *Consensus) EXPECT() *Consensus_Expecter {
return &Consensus_Expecter{mock: &_m.Mock}
}
// AddNonVoter provides a mock function with given fields: id, addr
func (_m *Consensus) AddNonVoter(id string, addr string) error {
ret := _m.Called(id, addr)
// AddNonVoter provides a mock function with given fields: id, addr, version
func (_m *Consensus) AddNonVoter(id string, addr string, version uint64) error {
ret := _m.Called(id, addr, version)
if len(ret) == 0 {
panic("no return value specified for AddNonVoter")
}
var r0 error
if rf, ok := ret.Get(0).(func(string, string) error); ok {
r0 = rf(id, addr)
if rf, ok := ret.Get(0).(func(string, string, uint64) error); ok {
r0 = rf(id, addr, version)
} else {
r0 = ret.Error(0)
}
......@@ -48,13 +48,14 @@ type Consensus_AddNonVoter_Call struct {
// AddNonVoter is a helper method to define mock.On call
// - id string
// - addr string
func (_e *Consensus_Expecter) AddNonVoter(id interface{}, addr interface{}) *Consensus_AddNonVoter_Call {
return &Consensus_AddNonVoter_Call{Call: _e.mock.On("AddNonVoter", id, addr)}
// - version uint64
func (_e *Consensus_Expecter) AddNonVoter(id interface{}, addr interface{}, version interface{}) *Consensus_AddNonVoter_Call {
return &Consensus_AddNonVoter_Call{Call: _e.mock.On("AddNonVoter", id, addr, version)}
}
func (_c *Consensus_AddNonVoter_Call) Run(run func(id string, addr string)) *Consensus_AddNonVoter_Call {
func (_c *Consensus_AddNonVoter_Call) Run(run func(id string, addr string, version uint64)) *Consensus_AddNonVoter_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(string))
run(args[0].(string), args[1].(string), args[2].(uint64))
})
return _c
}
......@@ -64,22 +65,22 @@ func (_c *Consensus_AddNonVoter_Call) Return(_a0 error) *Consensus_AddNonVoter_C
return _c
}
func (_c *Consensus_AddNonVoter_Call) RunAndReturn(run func(string, string) error) *Consensus_AddNonVoter_Call {
func (_c *Consensus_AddNonVoter_Call) RunAndReturn(run func(string, string, uint64) error) *Consensus_AddNonVoter_Call {
_c.Call.Return(run)
return _c
}
// AddVoter provides a mock function with given fields: id, addr
func (_m *Consensus) AddVoter(id string, addr string) error {
ret := _m.Called(id, addr)
// AddVoter provides a mock function with given fields: id, addr, version
func (_m *Consensus) AddVoter(id string, addr string, version uint64) error {
ret := _m.Called(id, addr, version)
if len(ret) == 0 {
panic("no return value specified for AddVoter")
}
var r0 error
if rf, ok := ret.Get(0).(func(string, string) error); ok {
r0 = rf(id, addr)
if rf, ok := ret.Get(0).(func(string, string, uint64) error); ok {
r0 = rf(id, addr, version)
} else {
r0 = ret.Error(0)
}
......@@ -95,13 +96,14 @@ type Consensus_AddVoter_Call struct {
// AddVoter is a helper method to define mock.On call
// - id string
// - addr string
func (_e *Consensus_Expecter) AddVoter(id interface{}, addr interface{}) *Consensus_AddVoter_Call {
return &Consensus_AddVoter_Call{Call: _e.mock.On("AddVoter", id, addr)}
// - version uint64
func (_e *Consensus_Expecter) AddVoter(id interface{}, addr interface{}, version interface{}) *Consensus_AddVoter_Call {
return &Consensus_AddVoter_Call{Call: _e.mock.On("AddVoter", id, addr, version)}
}
func (_c *Consensus_AddVoter_Call) Run(run func(id string, addr string)) *Consensus_AddVoter_Call {
func (_c *Consensus_AddVoter_Call) Run(run func(id string, addr string, version uint64)) *Consensus_AddVoter_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(string))
run(args[0].(string), args[1].(string), args[2].(uint64))
})
return _c
}
......@@ -111,29 +113,29 @@ func (_c *Consensus_AddVoter_Call) Return(_a0 error) *Consensus_AddVoter_Call {
return _c
}
func (_c *Consensus_AddVoter_Call) RunAndReturn(run func(string, string) error) *Consensus_AddVoter_Call {
func (_c *Consensus_AddVoter_Call) RunAndReturn(run func(string, string, uint64) error) *Consensus_AddVoter_Call {
_c.Call.Return(run)
return _c
}
// ClusterMembership provides a mock function with given fields:
func (_m *Consensus) ClusterMembership() ([]*consensus.ServerInfo, error) {
func (_m *Consensus) ClusterMembership() (*consensus.ClusterMembership, error) {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for ClusterMembership")
}
var r0 []*consensus.ServerInfo
var r0 *consensus.ClusterMembership
var r1 error
if rf, ok := ret.Get(0).(func() ([]*consensus.ServerInfo, error)); ok {
if rf, ok := ret.Get(0).(func() (*consensus.ClusterMembership, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() []*consensus.ServerInfo); ok {
if rf, ok := ret.Get(0).(func() *consensus.ClusterMembership); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*consensus.ServerInfo)
r0 = ret.Get(0).(*consensus.ClusterMembership)
}
}
......@@ -163,12 +165,12 @@ func (_c *Consensus_ClusterMembership_Call) Run(run func()) *Consensus_ClusterMe
return _c
}
func (_c *Consensus_ClusterMembership_Call) Return(_a0 []*consensus.ServerInfo, _a1 error) *Consensus_ClusterMembership_Call {
func (_c *Consensus_ClusterMembership_Call) Return(_a0 *consensus.ClusterMembership, _a1 error) *Consensus_ClusterMembership_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *Consensus_ClusterMembership_Call) RunAndReturn(run func() ([]*consensus.ServerInfo, error)) *Consensus_ClusterMembership_Call {
func (_c *Consensus_ClusterMembership_Call) RunAndReturn(run func() (*consensus.ClusterMembership, error)) *Consensus_ClusterMembership_Call {
_c.Call.Return(run)
return _c
}
......@@ -219,17 +221,17 @@ func (_c *Consensus_CommitUnsafePayload_Call) RunAndReturn(run func(*eth.Executi
return _c
}
// DemoteVoter provides a mock function with given fields: id
func (_m *Consensus) DemoteVoter(id string) error {
ret := _m.Called(id)
// DemoteVoter provides a mock function with given fields: id, version
func (_m *Consensus) DemoteVoter(id string, version uint64) error {
ret := _m.Called(id, version)
if len(ret) == 0 {
panic("no return value specified for DemoteVoter")
}
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(id)
if rf, ok := ret.Get(0).(func(string, uint64) error); ok {
r0 = rf(id, version)
} else {
r0 = ret.Error(0)
}
......@@ -244,13 +246,14 @@ type Consensus_DemoteVoter_Call struct {
// DemoteVoter is a helper method to define mock.On call
// - id string
func (_e *Consensus_Expecter) DemoteVoter(id interface{}) *Consensus_DemoteVoter_Call {
return &Consensus_DemoteVoter_Call{Call: _e.mock.On("DemoteVoter", id)}
// - version uint64
func (_e *Consensus_Expecter) DemoteVoter(id interface{}, version interface{}) *Consensus_DemoteVoter_Call {
return &Consensus_DemoteVoter_Call{Call: _e.mock.On("DemoteVoter", id, version)}
}
func (_c *Consensus_DemoteVoter_Call) Run(run func(id string)) *Consensus_DemoteVoter_Call {
func (_c *Consensus_DemoteVoter_Call) Run(run func(id string, version uint64)) *Consensus_DemoteVoter_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
run(args[0].(string), args[1].(uint64))
})
return _c
}
......@@ -260,7 +263,7 @@ func (_c *Consensus_DemoteVoter_Call) Return(_a0 error) *Consensus_DemoteVoter_C
return _c
}
func (_c *Consensus_DemoteVoter_Call) RunAndReturn(run func(string) error) *Consensus_DemoteVoter_Call {
func (_c *Consensus_DemoteVoter_Call) RunAndReturn(run func(string, uint64) error) *Consensus_DemoteVoter_Call {
_c.Call.Return(run)
return _c
}
......@@ -461,17 +464,17 @@ func (_c *Consensus_LeaderWithID_Call) RunAndReturn(run func() *consensus.Server
return _c
}
// RemoveServer provides a mock function with given fields: id
func (_m *Consensus) RemoveServer(id string) error {
ret := _m.Called(id)
// RemoveServer provides a mock function with given fields: id, version
func (_m *Consensus) RemoveServer(id string, version uint64) error {
ret := _m.Called(id, version)
if len(ret) == 0 {
panic("no return value specified for RemoveServer")
}
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(id)
if rf, ok := ret.Get(0).(func(string, uint64) error); ok {
r0 = rf(id, version)
} else {
r0 = ret.Error(0)
}
......@@ -486,13 +489,14 @@ type Consensus_RemoveServer_Call struct {
// RemoveServer is a helper method to define mock.On call
// - id string
func (_e *Consensus_Expecter) RemoveServer(id interface{}) *Consensus_RemoveServer_Call {
return &Consensus_RemoveServer_Call{Call: _e.mock.On("RemoveServer", id)}
// - version uint64
func (_e *Consensus_Expecter) RemoveServer(id interface{}, version interface{}) *Consensus_RemoveServer_Call {
return &Consensus_RemoveServer_Call{Call: _e.mock.On("RemoveServer", id, version)}
}
func (_c *Consensus_RemoveServer_Call) Run(run func(id string)) *Consensus_RemoveServer_Call {
func (_c *Consensus_RemoveServer_Call) Run(run func(id string, version uint64)) *Consensus_RemoveServer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
run(args[0].(string), args[1].(uint64))
})
return _c
}
......@@ -502,7 +506,7 @@ func (_c *Consensus_RemoveServer_Call) Return(_a0 error) *Consensus_RemoveServer
return _c
}
func (_c *Consensus_RemoveServer_Call) RunAndReturn(run func(string) error) *Consensus_RemoveServer_Call {
func (_c *Consensus_RemoveServer_Call) RunAndReturn(run func(string, uint64) error) *Consensus_RemoveServer_Call {
_c.Call.Return(run)
return _c
}
......
......@@ -112,27 +112,36 @@ func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, b
}
// AddNonVoter implements Consensus, it tries to add a non-voting member into the cluster.
func (rc *RaftConsensus) AddNonVoter(id string, addr string) error {
if err := rc.r.AddNonvoter(raft.ServerID(id), raft.ServerAddress(addr), 0, defaultTimeout).Error(); err != nil {
rc.log.Error("failed to add non-voter", "id", id, "addr", addr, "err", err)
func (rc *RaftConsensus) AddNonVoter(id string, addr string, version uint64) error {
if err := rc.r.AddNonvoter(raft.ServerID(id), raft.ServerAddress(addr), version, defaultTimeout).Error(); err != nil {
rc.log.Error("failed to add non-voter", "id", id, "addr", addr, "version", version, "err", err)
return err
}
return nil
}
// AddVoter implements Consensus, it tries to add a voting member into the cluster.
func (rc *RaftConsensus) AddVoter(id string, addr string) error {
if err := rc.r.AddVoter(raft.ServerID(id), raft.ServerAddress(addr), 0, defaultTimeout).Error(); err != nil {
rc.log.Error("failed to add voter", "id", id, "addr", addr, "err", err)
func (rc *RaftConsensus) AddVoter(id string, addr string, version uint64) error {
if err := rc.r.AddVoter(raft.ServerID(id), raft.ServerAddress(addr), version, defaultTimeout).Error(); err != nil {
rc.log.Error("failed to add voter", "id", id, "addr", addr, "version", version, "err", err)
return err
}
return nil
}
// DemoteVoter implements Consensus, it tries to demote a voting member into a non-voting member in the cluster.
func (rc *RaftConsensus) DemoteVoter(id string) error {
if err := rc.r.DemoteVoter(raft.ServerID(id), 0, defaultTimeout).Error(); err != nil {
rc.log.Error("failed to demote voter", "id", id, "err", err)
func (rc *RaftConsensus) DemoteVoter(id string, version uint64) error {
if err := rc.r.DemoteVoter(raft.ServerID(id), version, defaultTimeout).Error(); err != nil {
rc.log.Error("failed to demote voter", "id", id, "version", version, "err", err)
return err
}
return nil
}
// RemoveServer implements Consensus, it tries to remove a member (both voter or non-voter) from the cluster, if leader is being removed, it will cause a new leader election.
func (rc *RaftConsensus) RemoveServer(id string, version uint64) error {
if err := rc.r.RemoveServer(raft.ServerID(id), version, defaultTimeout).Error(); err != nil {
rc.log.Error("failed to remove voter", "id", id, "version", version, "err", err)
return err
}
return nil
......@@ -158,15 +167,6 @@ func (rc *RaftConsensus) LeaderCh() <-chan bool {
return rc.r.LeaderCh()
}
// RemoveServer implements Consensus, it tries to remove a member (both voter or non-voter) from the cluster, if leader is being removed, it will cause a new leader election.
func (rc *RaftConsensus) RemoveServer(id string) error {
if err := rc.r.RemoveServer(raft.ServerID(id), 0, defaultTimeout).Error(); err != nil {
rc.log.Error("failed to remove voter", "id", id, "err", err)
return err
}
return nil
}
// ServerID implements Consensus, it returns the server ID of the current server.
func (rc *RaftConsensus) ServerID() string {
return string(rc.serverID)
......@@ -232,19 +232,22 @@ func (rc *RaftConsensus) LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, e
}
// ClusterMembership implements Consensus, it returns the current cluster membership configuration.
func (rc *RaftConsensus) ClusterMembership() ([]*ServerInfo, error) {
func (rc *RaftConsensus) ClusterMembership() (*ClusterMembership, error) {
var future raft.ConfigurationFuture
if future = rc.r.GetConfiguration(); future.Error() != nil {
return nil, future.Error()
}
var servers []*ServerInfo
var servers []ServerInfo
for _, srv := range future.Configuration().Servers {
servers = append(servers, &ServerInfo{
servers = append(servers, ServerInfo{
ID: string(srv.ID),
Addr: string(srv.Address),
Suffrage: ServerSuffrage(srv.Suffrage),
})
}
return servers, nil
return &ClusterMembership{
Servers: servers,
Version: future.Index(),
}, nil
}
......@@ -32,17 +32,17 @@ type API interface {
// LeaderWithID returns the current leader's server info.
LeaderWithID(ctx context.Context) (*consensus.ServerInfo, error)
// AddServerAsVoter adds a server as a voter to the cluster.
AddServerAsVoter(ctx context.Context, id string, addr string) error
AddServerAsVoter(ctx context.Context, id string, addr string, version uint64) error
// AddServerAsNonvoter adds a server as a non-voter to the cluster. non-voter will not participate in leader election.
AddServerAsNonvoter(ctx context.Context, id string, addr string) error
AddServerAsNonvoter(ctx context.Context, id string, addr string, version uint64) error
// RemoveServer removes a server from the cluster.
RemoveServer(ctx context.Context, id string) error
RemoveServer(ctx context.Context, id string, version uint64) error
// TransferLeader transfers leadership to another server.
TransferLeader(ctx context.Context) error
// TransferLeaderToServer transfers leadership to a specific server.
TransferLeaderToServer(ctx context.Context, id string, addr string) error
// ClusterMembership returns the current cluster membership configuration.
ClusterMembership(ctx context.Context) ([]*consensus.ServerInfo, error)
ClusterMembership(ctx context.Context) (*consensus.ClusterMembership, error)
// APIs called by op-node
// Active returns true if op-conductor is active (not paused or stopped).
......
......@@ -18,13 +18,13 @@ type conductor interface {
Leader(ctx context.Context) bool
LeaderWithID(ctx context.Context) *consensus.ServerInfo
AddServerAsVoter(ctx context.Context, id string, addr string) error
AddServerAsNonvoter(ctx context.Context, id string, addr string) error
RemoveServer(ctx context.Context, id string) error
AddServerAsVoter(ctx context.Context, id string, addr string, version uint64) error
AddServerAsNonvoter(ctx context.Context, id string, addr string, version uint64) error
RemoveServer(ctx context.Context, id string, version uint64) error
TransferLeader(ctx context.Context) error
TransferLeaderToServer(ctx context.Context, id string, addr string) error
CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
ClusterMembership(ctx context.Context) ([]*consensus.ServerInfo, error)
ClusterMembership(ctx context.Context) (*consensus.ClusterMembership, error)
}
// APIBackend is the backend implementation of the API.
......@@ -61,13 +61,18 @@ func (api *APIBackend) Active(_ context.Context) (bool, error) {
}
// AddServerAsNonvoter implements API.
func (api *APIBackend) AddServerAsNonvoter(ctx context.Context, id string, addr string) error {
return api.con.AddServerAsNonvoter(ctx, id, addr)
func (api *APIBackend) AddServerAsNonvoter(ctx context.Context, id string, addr string, version uint64) error {
return api.con.AddServerAsNonvoter(ctx, id, addr, version)
}
// AddServerAsVoter implements API.
func (api *APIBackend) AddServerAsVoter(ctx context.Context, id string, addr string) error {
return api.con.AddServerAsVoter(ctx, id, addr)
func (api *APIBackend) AddServerAsVoter(ctx context.Context, id string, addr string, version uint64) error {
return api.con.AddServerAsVoter(ctx, id, addr, version)
}
// RemoveServer implements API.
func (api *APIBackend) RemoveServer(ctx context.Context, id string, version uint64) error {
return api.con.RemoveServer(ctx, id, version)
}
// CommitUnsafePayload implements API.
......@@ -90,11 +95,6 @@ func (api *APIBackend) Pause(ctx context.Context) error {
return api.con.Pause(ctx)
}
// RemoveServer implements API.
func (api *APIBackend) RemoveServer(ctx context.Context, id string) error {
return api.con.RemoveServer(ctx, id)
}
// Resume implements API.
func (api *APIBackend) Resume(ctx context.Context) error {
return api.con.Resume(ctx)
......@@ -118,6 +118,6 @@ func (api *APIBackend) SequencerHealthy(ctx context.Context) (bool, error) {
}
// ClusterMembership implements API.
func (api *APIBackend) ClusterMembership(ctx context.Context) ([]*consensus.ServerInfo, error) {
func (api *APIBackend) ClusterMembership(ctx context.Context) (*consensus.ClusterMembership, error) {
return api.con.ClusterMembership(ctx)
}
......@@ -49,13 +49,18 @@ func (c *APIClient) Active(ctx context.Context) (bool, error) {
}
// AddServerAsNonvoter implements API.
func (c *APIClient) AddServerAsNonvoter(ctx context.Context, id string, addr string) error {
return c.c.CallContext(ctx, nil, prefixRPC("addServerAsNonvoter"), id, addr)
func (c *APIClient) AddServerAsNonvoter(ctx context.Context, id string, addr string, version uint64) error {
return c.c.CallContext(ctx, nil, prefixRPC("addServerAsNonvoter"), id, addr, version)
}
// AddServerAsVoter implements API.
func (c *APIClient) AddServerAsVoter(ctx context.Context, id string, addr string) error {
return c.c.CallContext(ctx, nil, prefixRPC("addServerAsVoter"), id, addr)
func (c *APIClient) AddServerAsVoter(ctx context.Context, id string, addr string, version uint64) error {
return c.c.CallContext(ctx, nil, prefixRPC("addServerAsVoter"), id, addr, version)
}
// RemoveServer implements API.
func (c *APIClient) RemoveServer(ctx context.Context, id string, version uint64) error {
return c.c.CallContext(ctx, nil, prefixRPC("removeServer"), id, version)
}
// Close closes the underlying RPC client.
......@@ -87,11 +92,6 @@ func (c *APIClient) Pause(ctx context.Context) error {
return c.c.CallContext(ctx, nil, prefixRPC("pause"))
}
// RemoveServer implements API.
func (c *APIClient) RemoveServer(ctx context.Context, id string) error {
return c.c.CallContext(ctx, nil, prefixRPC("removeServer"), id)
}
// Resume implements API.
func (c *APIClient) Resume(ctx context.Context) error {
return c.c.CallContext(ctx, nil, prefixRPC("resume"))
......@@ -115,8 +115,8 @@ func (c *APIClient) SequencerHealthy(ctx context.Context) (bool, error) {
}
// ClusterMembership implements API.
func (c *APIClient) ClusterMembership(ctx context.Context) ([]*consensus.ServerInfo, error) {
var info []*consensus.ServerInfo
err := c.c.CallContext(ctx, &info, prefixRPC("clusterMembership"))
return info, err
func (c *APIClient) ClusterMembership(ctx context.Context) (*consensus.ClusterMembership, error) {
var clusterMembership consensus.ClusterMembership
err := c.c.CallContext(ctx, &clusterMembership, prefixRPC("clusterMembership"))
return &clusterMembership, err
}
......@@ -17,6 +17,7 @@ import (
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags"
con "github.com/ethereum-optimism/optimism/op-conductor/conductor"
"github.com/ethereum-optimism/optimism/op-conductor/consensus"
conrpc "github.com/ethereum-optimism/optimism/op-conductor/rpc"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait"
rollupNode "github.com/ethereum-optimism/optimism/op-node/node"
......@@ -74,8 +75,8 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor, f
c3 := conductors[Sequencer3Name]
require.NoError(t, waitForLeadership(t, c1))
require.NoError(t, c1.client.AddServerAsVoter(ctx, Sequencer2Name, c2.ConsensusEndpoint()))
require.NoError(t, c1.client.AddServerAsVoter(ctx, Sequencer3Name, c3.ConsensusEndpoint()))
require.NoError(t, c1.client.AddServerAsVoter(ctx, Sequencer2Name, c2.ConsensusEndpoint(), 0))
require.NoError(t, c1.client.AddServerAsVoter(ctx, Sequencer3Name, c3.ConsensusEndpoint(), 0))
require.True(t, leader(t, ctx, c1))
require.False(t, leader(t, ctx, c2))
require.False(t, leader(t, ctx, c3))
......@@ -508,3 +509,11 @@ func ensureOnlyOneLeader(t *testing.T, sys *System, conductors map[string]*condu
}
require.NoError(t, wait.For(ctx, 1*time.Second, condition))
}
func memberIDs(membership *consensus.ClusterMembership) []string {
ids := make([]string, len(membership.Servers))
for _, member := range membership.Servers {
ids = append(ids, member.ID)
}
return ids
}
......@@ -39,9 +39,9 @@ func TestSequencerFailover_ConductorRPC(t *testing.T) {
c3 := conductors[Sequencer3Name]
membership, err := c1.client.ClusterMembership(ctx)
require.NoError(t, err)
require.Equal(t, 3, len(membership), "Expected 3 members in cluster")
require.Equal(t, 3, len(membership.Servers), "Expected 3 members in cluster")
ids := make([]string, 0)
for _, member := range membership {
for _, member := range membership.Servers {
ids = append(ids, member.ID)
require.Equal(t, consensus.Voter, member.Suffrage, "Expected all members to be voters")
}
......@@ -112,37 +112,54 @@ func TestSequencerFailover_ConductorRPC(t *testing.T) {
require.NoError(t, err)
}()
err = leader.client.AddServerAsNonvoter(ctx, VerifierName, nonvoter.ConsensusEndpoint())
membership, err = leader.client.ClusterMembership(ctx)
require.NoError(t, err)
err = leader.client.AddServerAsNonvoter(ctx, VerifierName, nonvoter.ConsensusEndpoint(), membership.Version-1)
require.ErrorContains(t, err, "configuration changed since", "Expected leader to fail to add nonvoter due to version mismatch")
membership, err = leader.client.ClusterMembership(ctx)
require.NoError(t, err)
require.Equal(t, 3, len(membership.Servers), "Expected 3 members in cluster")
err = leader.client.AddServerAsNonvoter(ctx, VerifierName, nonvoter.ConsensusEndpoint(), 0)
require.NoError(t, err, "Expected leader to add non-voter")
membership, err = leader.client.ClusterMembership(ctx)
require.NoError(t, err)
require.Equal(t, 4, len(membership), "Expected 4 members in cluster")
require.Equal(t, consensus.Nonvoter, membership[3].Suffrage, "Expected last member to be non-voter")
require.Equal(t, 4, len(membership.Servers), "Expected 4 members in cluster")
require.Equal(t, consensus.Nonvoter, membership.Servers[3].Suffrage, "Expected last member to be non-voter")
t.Log("Testing RemoveServer, call remove on follower, expected to fail")
lid, leader = findLeader(t, conductors)
fid, follower = findFollower(t, conductors)
err = follower.client.RemoveServer(ctx, lid)
err = follower.client.RemoveServer(ctx, lid, membership.Version)
require.ErrorContains(t, err, "node is not the leader", "Expected follower to fail to remove leader")
membership, err = c1.client.ClusterMembership(ctx)
require.NoError(t, err)
require.Equal(t, 4, len(membership), "Expected 4 members in cluster")
require.Equal(t, 4, len(membership.Servers), "Expected 4 members in cluster")
t.Log("Testing RemoveServer, call remove on leader, expect non-voter to be removed")
err = leader.client.RemoveServer(ctx, VerifierName)
err = leader.client.RemoveServer(ctx, VerifierName, membership.Version)
require.NoError(t, err, "Expected leader to remove non-voter")
membership, err = c1.client.ClusterMembership(ctx)
require.NoError(t, err)
require.Equal(t, 3, len(membership), "Expected 2 members in cluster after removal")
require.NotContains(t, membership, VerifierName, "Expected follower to be removed from cluster")
require.Equal(t, 3, len(membership.Servers), "Expected 2 members in cluster after removal")
require.NotContains(t, memberIDs(membership), VerifierName, "Expected follower to be removed from cluster")
t.Log("Testing RemoveServer, call remove on leader with incorrect version, expect voter not to be removed")
err = leader.client.RemoveServer(ctx, fid, membership.Version-1)
require.ErrorContains(t, err, "configuration changed since", "Expected leader to fail to remove follower due to version mismatch")
membership, err = c1.client.ClusterMembership(ctx)
require.NoError(t, err)
require.Equal(t, 3, len(membership.Servers), "Expected 3 members in cluster after failed removal")
require.Contains(t, memberIDs(membership), fid, "Expected follower to not be removed from cluster")
t.Log("Testing RemoveServer, call remove on leader, expect voter to be removed")
err = leader.client.RemoveServer(ctx, fid)
err = leader.client.RemoveServer(ctx, fid, membership.Version)
require.NoError(t, err, "Expected leader to remove follower")
membership, err = c1.client.ClusterMembership(ctx)
require.NoError(t, err)
require.Equal(t, 2, len(membership), "Expected 2 members in cluster after removal")
require.NotContains(t, membership, fid, "Expected follower to be removed from cluster")
require.Equal(t, 2, len(membership.Servers), "Expected 2 members in cluster after removal")
require.NotContains(t, memberIDs(membership), fid, "Expected follower to be removed from cluster")
}
// [Category: Sequencer Failover]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment