Commit 1fe7e1ec authored by Henri Devieux's avatar Henri Devieux Committed by GitHub

[op-conductor] e2e tests - conductor rpc test fixes (#9576)

* Fix a race condition in tests

* 2nd try with CI

* 3rd try with CI

* 4th try with CI

* 5th try

* Use existing wait util

* helper function for checking leader transfer

* bugfix

* Finish conductor rpc e2e tests

* reduce loop intervals

* increase timeouts to fix flake

---------
Co-authored-by: default avatarFrancis Li <francis.li@coinbase.com>
parent 091e9c03
......@@ -405,7 +405,7 @@ func (oc *OpConductor) Leader(_ context.Context) bool {
}
// LeaderWithID returns the current leader's server ID and address.
func (oc *OpConductor) LeaderWithID(_ context.Context) (string, string) {
func (oc *OpConductor) LeaderWithID(_ context.Context) *consensus.ServerInfo {
return oc.cons.LeaderWithID()
}
......@@ -444,6 +444,16 @@ func (oc *OpConductor) SequencerHealthy(_ context.Context) bool {
return oc.healthy.Load()
}
// ClusterMembership returns current cluster's membership information.
func (oc *OpConductor) ClusterMembership(_ context.Context) ([]*consensus.ServerInfo, error) {
return oc.cons.ClusterMembership()
}
// LatestUnsafePayload returns the latest unsafe payload envelope from FSM.
func (oc *OpConductor) LatestUnsafePayload(_ context.Context) *eth.ExecutionPayloadEnvelope {
return oc.cons.LatestUnsafePayload()
}
func (oc *OpConductor) loop() {
defer oc.wg.Done()
......
......@@ -4,6 +4,34 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
)
// ServerSuffrage determines whether a Server in a Configuration gets a vote.
type ServerSuffrage int
const (
// Voter is a server whose vote is counted in elections.
Voter ServerSuffrage = iota
// Nonvoter is a server that receives log entries but is not considered for
// elections or commitment purposes.
Nonvoter
)
func (s ServerSuffrage) String() string {
switch s {
case Voter:
return "Voter"
case Nonvoter:
return "Nonvoter"
}
return "ServerSuffrage"
}
// ServerInfo defines the server information.
type ServerInfo struct {
ID string `json:"id"`
Addr string `json:"addr"`
Suffrage ServerSuffrage `json:"suffrage"`
}
// Consensus defines the consensus interface for leadership election.
//
//go:generate mockery --name Consensus --output mocks/ --with-expecter=true
......@@ -21,13 +49,15 @@ type Consensus interface {
// Leader returns if it is the leader of the cluster.
Leader() bool
// LeaderWithID returns the leader's server ID and address.
LeaderWithID() (string, string)
LeaderWithID() *ServerInfo
// ServerID returns the server ID of the consensus.
ServerID() string
// TransferLeader triggers leadership transfer to another member in the cluster.
TransferLeader() error
// TransferLeaderTo triggers leadership transfer to a specific member in the cluster.
TransferLeaderTo(id, addr string) error
// ClusterMembership returns the current cluster membership configuration.
ClusterMembership() ([]*ServerInfo, error)
// CommitPayload commits latest unsafe payload to the FSM.
CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error
......
......@@ -3,7 +3,9 @@
package mocks
import (
consensus "github.com/ethereum-optimism/optimism/op-conductor/consensus"
eth "github.com/ethereum-optimism/optimism/op-service/eth"
mock "github.com/stretchr/testify/mock"
)
......@@ -114,6 +116,63 @@ func (_c *Consensus_AddVoter_Call) RunAndReturn(run func(string, string) error)
return _c
}
// ClusterMembership provides a mock function with given fields:
func (_m *Consensus) ClusterMembership() ([]*consensus.ServerInfo, error) {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for ClusterMembership")
}
var r0 []*consensus.ServerInfo
var r1 error
if rf, ok := ret.Get(0).(func() ([]*consensus.ServerInfo, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() []*consensus.ServerInfo); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*consensus.ServerInfo)
}
}
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Consensus_ClusterMembership_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClusterMembership'
type Consensus_ClusterMembership_Call struct {
*mock.Call
}
// ClusterMembership is a helper method to define mock.On call
func (_e *Consensus_Expecter) ClusterMembership() *Consensus_ClusterMembership_Call {
return &Consensus_ClusterMembership_Call{Call: _e.mock.On("ClusterMembership")}
}
func (_c *Consensus_ClusterMembership_Call) Run(run func()) *Consensus_ClusterMembership_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Consensus_ClusterMembership_Call) Return(_a0 []*consensus.ServerInfo, _a1 error) *Consensus_ClusterMembership_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *Consensus_ClusterMembership_Call) RunAndReturn(run func() ([]*consensus.ServerInfo, error)) *Consensus_ClusterMembership_Call {
_c.Call.Return(run)
return _c
}
// CommitUnsafePayload provides a mock function with given fields: payload
func (_m *Consensus) CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error {
ret := _m.Called(payload)
......@@ -346,31 +405,23 @@ func (_c *Consensus_LeaderCh_Call) RunAndReturn(run func() <-chan bool) *Consens
}
// LeaderWithID provides a mock function with given fields:
func (_m *Consensus) LeaderWithID() (string, string) {
func (_m *Consensus) LeaderWithID() *consensus.ServerInfo {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for LeaderWithID")
}
var r0 string
var r1 string
if rf, ok := ret.Get(0).(func() (string, string)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() string); ok {
var r0 *consensus.ServerInfo
if rf, ok := ret.Get(0).(func() *consensus.ServerInfo); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
if ret.Get(0) != nil {
r0 = ret.Get(0).(*consensus.ServerInfo)
}
if rf, ok := ret.Get(1).(func() string); ok {
r1 = rf()
} else {
r1 = ret.Get(1).(string)
}
return r0, r1
return r0
}
// Consensus_LeaderWithID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LeaderWithID'
......@@ -390,12 +441,12 @@ func (_c *Consensus_LeaderWithID_Call) Run(run func()) *Consensus_LeaderWithID_C
return _c
}
func (_c *Consensus_LeaderWithID_Call) Return(_a0 string, _a1 string) *Consensus_LeaderWithID_Call {
_c.Call.Return(_a0, _a1)
func (_c *Consensus_LeaderWithID_Call) Return(_a0 *consensus.ServerInfo) *Consensus_LeaderWithID_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Consensus_LeaderWithID_Call) RunAndReturn(run func() (string, string)) *Consensus_LeaderWithID_Call {
func (_c *Consensus_LeaderWithID_Call) RunAndReturn(run func() *consensus.ServerInfo) *Consensus_LeaderWithID_Call {
_c.Call.Return(run)
return _c
}
......
......@@ -145,9 +145,13 @@ func (rc *RaftConsensus) Leader() bool {
}
// LeaderWithID implements Consensus, it returns the leader's server ID and address.
func (rc *RaftConsensus) LeaderWithID() (string, string) {
func (rc *RaftConsensus) LeaderWithID() *ServerInfo {
addr, id := rc.r.LeaderWithID()
return string(id), string(addr)
return &ServerInfo{
ID: string(id),
Addr: string(addr),
Suffrage: Voter, // leader will always be Voter
}
}
// LeaderCh implements Consensus, it returns a channel that will be notified when leadership status changes (true = leader, false = follower).
......@@ -221,3 +225,21 @@ func (rc *RaftConsensus) LatestUnsafePayload() *eth.ExecutionPayloadEnvelope {
payload := rc.unsafeTracker.UnsafeHead()
return payload
}
// ClusterMembership implements Consensus, it returns the current cluster membership configuration.
func (rc *RaftConsensus) ClusterMembership() ([]*ServerInfo, error) {
var future raft.ConfigurationFuture
if future = rc.r.GetConfiguration(); future.Error() != nil {
return nil, future.Error()
}
var servers []*ServerInfo
for _, srv := range future.Configuration().Servers {
servers = append(servers, &ServerInfo{
ID: string(srv.ID),
Addr: string(srv.Address),
Suffrage: ServerSuffrage(srv.Suffrage),
})
}
return servers, nil
}
......@@ -6,17 +6,13 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-conductor/consensus"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
var ErrNotLeader = errors.New("refusing to proxy request to non-leader sequencer")
type ServerInfo struct {
ID string `json:"id"`
Addr string `json:"addr"`
}
// API defines the interface for the op-conductor API.
type API interface {
// Pause pauses op-conductor.
......@@ -30,7 +26,7 @@ type API interface {
// Leader returns true if the server is the leader.
Leader(ctx context.Context) (bool, error)
// LeaderWithID returns the current leader's server info.
LeaderWithID(ctx context.Context) (*ServerInfo, error)
LeaderWithID(ctx context.Context) (*consensus.ServerInfo, error)
// AddServerAsVoter adds a server as a voter to the cluster.
AddServerAsVoter(ctx context.Context, id string, addr string) error
// AddServerAsNonvoter adds a server as a non-voter to the cluster. non-voter will not participate in leader election.
......@@ -41,6 +37,8 @@ type API interface {
TransferLeader(ctx context.Context) error
// TransferLeaderToServer transfers leadership to a specific server.
TransferLeaderToServer(ctx context.Context, id string, addr string) error
// ClusterMembership returns the current cluster membership configuration.
ClusterMembership(ctx context.Context) ([]*consensus.ServerInfo, error)
// APIs called by op-node
// Active returns true if op-conductor is active.
......
......@@ -5,6 +5,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-conductor/consensus"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -16,13 +17,14 @@ type conductor interface {
SequencerHealthy(ctx context.Context) bool
Leader(ctx context.Context) bool
LeaderWithID(ctx context.Context) (string, string)
LeaderWithID(ctx context.Context) *consensus.ServerInfo
AddServerAsVoter(ctx context.Context, id string, addr string) error
AddServerAsNonvoter(ctx context.Context, id string, addr string) error
RemoveServer(ctx context.Context, id string) error
TransferLeader(ctx context.Context) error
TransferLeaderToServer(ctx context.Context, id string, addr string) error
CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
ClusterMembership(ctx context.Context) ([]*consensus.ServerInfo, error)
}
// APIBackend is the backend implementation of the API.
......@@ -69,12 +71,8 @@ func (api *APIBackend) Leader(ctx context.Context) (bool, error) {
}
// LeaderWithID implements API, returns the leader's server ID and address (not necessarily the current conductor).
func (api *APIBackend) LeaderWithID(ctx context.Context) (*ServerInfo, error) {
id, addr := api.con.LeaderWithID(ctx)
return &ServerInfo{
ID: id,
Addr: addr,
}, nil
func (api *APIBackend) LeaderWithID(ctx context.Context) (*consensus.ServerInfo, error) {
return api.con.LeaderWithID(ctx), nil
}
// Pause implements API.
......@@ -92,12 +90,14 @@ func (api *APIBackend) Resume(ctx context.Context) error {
return api.con.Resume(ctx)
}
// TransferLeader implements API.
// TransferLeader implements API. With Raft implementation, a successful call does not mean that leadership transfer is complete
// It just means that leadership transfer is in progress (current leader has initiated a new leader election round and stepped down as leader)
func (api *APIBackend) TransferLeader(ctx context.Context) error {
return api.con.TransferLeader(ctx)
}
// TransferLeaderToServer implements API.
// TransferLeaderToServer implements API. With Raft implementation, a successful call does not mean that leadership transfer is complete
// It just means that leadership transfer is in progress (current leader has initiated a new leader election round and stepped down as leader)
func (api *APIBackend) TransferLeaderToServer(ctx context.Context, id string, addr string) error {
return api.con.TransferLeaderToServer(ctx, id, addr)
}
......@@ -106,3 +106,8 @@ func (api *APIBackend) TransferLeaderToServer(ctx context.Context, id string, ad
func (api *APIBackend) SequencerHealthy(ctx context.Context) (bool, error) {
return api.con.SequencerHealthy(ctx), nil
}
// ClusterMembership implements API.
func (api *APIBackend) ClusterMembership(ctx context.Context) ([]*consensus.ServerInfo, error) {
return api.con.ClusterMembership(ctx)
}
......@@ -5,6 +5,7 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-conductor/consensus"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -61,8 +62,8 @@ func (c *APIClient) Leader(ctx context.Context) (bool, error) {
}
// LeaderWithID implements API.
func (c *APIClient) LeaderWithID(ctx context.Context) (*ServerInfo, error) {
var info *ServerInfo
func (c *APIClient) LeaderWithID(ctx context.Context) (*consensus.ServerInfo, error) {
var info *consensus.ServerInfo
err := c.c.CallContext(ctx, &info, prefixRPC("leaderWithID"))
return info, err
}
......@@ -98,3 +99,10 @@ func (c *APIClient) SequencerHealthy(ctx context.Context) (bool, error) {
err := c.c.CallContext(ctx, &healthy, prefixRPC("sequencerHealthy"))
return healthy, err
}
// ClusterMembership implements API.
func (c *APIClient) ClusterMembership(ctx context.Context) ([]*consensus.ServerInfo, error) {
var info []*consensus.ServerInfo
err := c.c.CallContext(ctx, &info, prefixRPC("clusterMembership"))
return info, err
}
......@@ -95,7 +95,7 @@ func setupSequencerFailoverTest(t *testing.T) (*System, map[string]*conductor) {
c2 := conductors[Sequencer2Name]
c3 := conductors[Sequencer3Name]
require.NoError(t, waitForLeadershipChange(t, c1, true))
require.NoError(t, waitForLeadership(t, c1))
require.NoError(t, c1.client.AddServerAsVoter(ctx, Sequencer2Name, c2.ConsensusEndpoint()))
require.NoError(t, c1.client.AddServerAsVoter(ctx, Sequencer3Name, c3.ConsensusEndpoint()))
require.True(t, leader(t, ctx, c1))
......@@ -301,24 +301,56 @@ func sequencerCfg(rpcPort int) *rollupNode.Config {
}
}
func waitForLeadershipChange(t *testing.T, c *conductor, leader bool) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
func waitForLeadership(t *testing.T, c *conductor) error {
condition := func() (bool, error) {
isLeader, err := c.client.Leader(context.Background())
if err != nil {
return false, err
}
return isLeader, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
isLeader, err := c.client.Leader(ctx)
return wait.For(ctx, 1*time.Second, condition)
}
func waitForLeadershipChange(t *testing.T, prev *conductor, prevID string, conductors map[string]*conductor, sys *System) string {
condition := func() (bool, error) {
isLeader, err := prev.client.Leader(context.Background())
if err != nil {
return err
return false, err
}
if isLeader == leader {
return nil
return !isLeader, nil
}
time.Sleep(500 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := wait.For(ctx, 1*time.Second, condition)
require.NoError(t, err)
ensureOnlyOneLeader(t, sys, conductors)
newLeader, err := prev.client.LeaderWithID(ctx)
require.NoError(t, err)
require.NotEmpty(t, newLeader.ID)
require.NotEqual(t, prevID, newLeader.ID, "Expected a new leader")
require.NoError(t, waitForSequencerStatusChange(t, sys.RollupClient(newLeader.ID), true))
return newLeader.ID
}
func waitForSequencerStatusChange(t *testing.T, rollupClient *sources.RollupClient, active bool) error {
condition := func() (bool, error) {
isActive, err := rollupClient.SequencerActive(context.Background())
if err != nil {
return false, err
}
return isActive == active, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return wait.For(ctx, 1*time.Second, condition)
}
func leader(t *testing.T, ctx context.Context, con *conductor) bool {
......@@ -373,3 +405,38 @@ func findLeader(t *testing.T, conductors map[string]*conductor) (string, *conduc
}
return "", nil
}
func findFollower(t *testing.T, conductors map[string]*conductor) (string, *conductor) {
for id, con := range conductors {
if !leader(t, context.Background(), con) {
return id, con
}
}
return "", nil
}
func ensureOnlyOneLeader(t *testing.T, sys *System, conductors map[string]*conductor) {
condition := func() (bool, error) {
leaders := 0
ctx := context.Background()
for name, con := range conductors {
leader, err := con.client.Leader(ctx)
if err != nil {
continue
}
active, err := sys.RollupClient(name).SequencerActive(ctx)
if err != nil {
continue
}
if leader && active {
leaders++
}
}
return leaders == 1, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
require.NoError(t, wait.For(ctx, 1*time.Second, condition))
}
package op_e2e
import (
"context"
"sort"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-conductor/consensus"
)
// [Category: Initial Setup]
......@@ -17,3 +21,118 @@ func TestSequencerFailover_SetupCluster(t *testing.T) {
require.NotNil(t, con, "Expected conductor to be non-nil")
}
}
// [Category: conductor rpc]
// In this test, we test all rpcs exposed by conductor.
func TestSequencerFailover_ConductorRPC(t *testing.T) {
ctx := context.Background()
sys, conductors := setupSequencerFailoverTest(t)
defer sys.Close()
// SequencerHealthy, Leader, AddServerAsVoter are used in setup already.
// Test ClusterMembership
t.Log("Testing ClusterMembership")
c1 := conductors[Sequencer1Name]
c2 := conductors[Sequencer2Name]
c3 := conductors[Sequencer3Name]
membership, err := c1.client.ClusterMembership(ctx)
require.NoError(t, err)
require.Equal(t, 3, len(membership), "Expected 3 members in cluster")
ids := make([]string, 0)
for _, member := range membership {
ids = append(ids, member.ID)
require.Equal(t, consensus.Voter, member.Suffrage, "Expected all members to be voters")
}
sort.Strings(ids)
require.Equal(t, []string{Sequencer1Name, Sequencer2Name, Sequencer3Name}, ids, "Expected all sequencers to be in cluster")
// Test Active & Pause & Resume
t.Log("Testing Active & Pause & Resume")
active, err := c1.client.Active(ctx)
require.NoError(t, err)
require.True(t, active, "Expected conductor to be active")
err = c1.client.Pause(ctx)
require.NoError(t, err)
active, err = c1.client.Active(ctx)
require.NoError(t, err)
require.False(t, active, "Expected conductor to be paused")
err = c1.client.Resume(ctx)
require.NoError(t, err)
active, err = c1.client.Active(ctx)
require.NoError(t, err)
require.True(t, active, "Expected conductor to be active")
t.Log("Testing LeaderWithID")
leader1, err := c1.client.LeaderWithID(ctx)
require.NoError(t, err)
leader2, err := c2.client.LeaderWithID(ctx)
require.NoError(t, err)
leader3, err := c3.client.LeaderWithID(ctx)
require.NoError(t, err)
require.Equal(t, leader1.ID, leader2.ID, "Expected leader ID to be the same")
require.Equal(t, leader1.ID, leader3.ID, "Expected leader ID to be the same")
t.Log("Testing TransferLeader")
lid, leader := findLeader(t, conductors)
err = leader.client.TransferLeader(ctx)
require.NoError(t, err, "Expected leader to transfer leadership to another node")
_ = waitForLeadershipChange(t, leader, lid, conductors, sys)
// old leader now became follower, we're trying to transfer leadership directly back to it.
t.Log("Testing TransferLeaderToServer")
fid, follower := lid, leader
lid, leader = findLeader(t, conductors)
err = leader.client.TransferLeaderToServer(ctx, fid, follower.ConsensusEndpoint())
require.NoError(t, err, "Expected leader to transfer leadership to follower")
newID := waitForLeadershipChange(t, leader, lid, conductors, sys)
require.Equal(t, fid, newID, "Expected leader to transfer to %s", fid)
leader = follower
// Test AddServerAsNonvoter, do not start a new sequencer just for this purpose, use Sequencer3's rpc to start conductor.
// This is fine as this mainly tests conductor's ability to add itself into the raft consensus cluster as a nonvoter.
t.Log("Testing AddServerAsNonvoter")
nonvoter := setupConductor(
t, VerifierName, t.TempDir(),
sys.RollupEndpoint(Sequencer3Name),
sys.NodeEndpoint(Sequencer3Name),
findAvailablePort(t),
false,
*sys.RollupConfig,
)
err = leader.client.AddServerAsNonvoter(ctx, VerifierName, nonvoter.ConsensusEndpoint())
require.NoError(t, err, "Expected leader to add non-voter")
membership, err = leader.client.ClusterMembership(ctx)
require.NoError(t, err)
require.Equal(t, 4, len(membership), "Expected 4 members in cluster")
require.Equal(t, consensus.Nonvoter, membership[3].Suffrage, "Expected last member to be non-voter")
t.Log("Testing RemoveServer, call remove on follower, expected to fail")
lid, leader = findLeader(t, conductors)
fid, follower = findFollower(t, conductors)
err = follower.client.RemoveServer(ctx, lid)
require.ErrorContains(t, err, "node is not the leader", "Expected follower to fail to remove leader")
membership, err = c1.client.ClusterMembership(ctx)
require.NoError(t, err)
require.Equal(t, 4, len(membership), "Expected 4 members in cluster")
t.Log("Testing RemoveServer, call remove on leader, expect non-voter to be removed")
err = leader.client.RemoveServer(ctx, VerifierName)
require.NoError(t, err, "Expected leader to remove non-voter")
membership, err = c1.client.ClusterMembership(ctx)
require.NoError(t, err)
require.Equal(t, 3, len(membership), "Expected 2 members in cluster after removal")
require.NotContains(t, membership, VerifierName, "Expected follower to be removed from cluster")
t.Log("Testing RemoveServer, call remove on leader, expect voter to be removed")
err = leader.client.RemoveServer(ctx, fid)
require.NoError(t, err, "Expected leader to remove follower")
membership, err = c1.client.ClusterMembership(ctx)
require.NoError(t, err)
require.Equal(t, 2, len(membership), "Expected 2 members in cluster after removal")
require.NotContains(t, membership, fid, "Expected follower to be removed from cluster")
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment