Commit ef009e3b authored by Francis Li's avatar Francis Li Committed by GitHub

Implement OverrideLeader & LeaderOverriden method for conductor (#12451)

parent ea60a8a8
...@@ -78,9 +78,10 @@ func NewOpConductor( ...@@ -78,9 +78,10 @@ func NewOpConductor(
oc.loopActionFn = oc.loopAction oc.loopActionFn = oc.loopAction
// explicitly set all atomic.Bool values // explicitly set all atomic.Bool values
oc.leader.Store(false) // upon start, it should not be the leader unless specified otherwise by raft bootstrap, in that case, it'll receive a leadership update from consensus. oc.leader.Store(false) // upon start, it should not be the leader unless specified otherwise by raft bootstrap, in that case, it'll receive a leadership update from consensus.
oc.healthy.Store(true) // default to healthy unless reported otherwise by health monitor. oc.leaderOverride.Store(false) // default to no override.
oc.seqActive.Store(false) // explicitly set to false by default, the real value will be reported after sequencer control initialization. oc.healthy.Store(true) // default to healthy unless reported otherwise by health monitor.
oc.seqActive.Store(false) // explicitly set to false by default, the real value will be reported after sequencer control initialization.
oc.paused.Store(cfg.Paused) oc.paused.Store(cfg.Paused)
oc.stopped.Store(false) oc.stopped.Store(false)
...@@ -287,11 +288,12 @@ type OpConductor struct { ...@@ -287,11 +288,12 @@ type OpConductor struct {
cons consensus.Consensus cons consensus.Consensus
hmon health.HealthMonitor hmon health.HealthMonitor
leader atomic.Bool leader atomic.Bool
seqActive atomic.Bool leaderOverride atomic.Bool
healthy atomic.Bool seqActive atomic.Bool
hcerr error // error from health check healthy atomic.Bool
prevState *state hcerr error // error from health check
prevState *state
healthUpdateCh <-chan error healthUpdateCh <-chan error
leaderUpdateCh <-chan bool leaderUpdateCh <-chan bool
...@@ -472,13 +474,29 @@ func (oc *OpConductor) HTTPEndpoint() string { ...@@ -472,13 +474,29 @@ func (oc *OpConductor) HTTPEndpoint() string {
return fmt.Sprintf("http://%s", oc.rpcServer.Endpoint()) return fmt.Sprintf("http://%s", oc.rpcServer.Endpoint())
} }
func (oc *OpConductor) OverrideLeader(override bool) {
oc.leaderOverride.Store(override)
}
func (oc *OpConductor) LeaderOverridden() bool {
return oc.leaderOverride.Load()
}
// Leader returns true if OpConductor is the leader. // Leader returns true if OpConductor is the leader.
func (oc *OpConductor) Leader(_ context.Context) bool { func (oc *OpConductor) Leader(ctx context.Context) bool {
return oc.cons.Leader() return oc.LeaderOverridden() || oc.cons.Leader()
} }
// LeaderWithID returns the current leader's server ID and address. // LeaderWithID returns the current leader's server ID and address.
func (oc *OpConductor) LeaderWithID(_ context.Context) *consensus.ServerInfo { func (oc *OpConductor) LeaderWithID(ctx context.Context) *consensus.ServerInfo {
if oc.LeaderOverridden() {
return &consensus.ServerInfo{
ID: "N/A (Leader overridden)",
Addr: "N/A",
Suffrage: 0,
}
}
return oc.cons.LeaderWithID() return oc.cons.LeaderWithID()
} }
......
...@@ -15,10 +15,14 @@ var ErrNotLeader = errors.New("refusing to proxy request to non-leader sequencer ...@@ -15,10 +15,14 @@ var ErrNotLeader = errors.New("refusing to proxy request to non-leader sequencer
// API defines the interface for the op-conductor API. // API defines the interface for the op-conductor API.
type API interface { type API interface {
// OverrideLeader is used to override the leader status, this is only used to return true for Leader() & LeaderWithID() calls. // OverrideLeader is used to override or clear override for the leader status.
// It does not impact the actual raft consensus leadership status. It is supposed to be used when the cluster is unhealthy // It does not impact the actual raft consensus leadership status. It is supposed to be used when the cluster is unhealthy
// and the node is the only one up, to allow batcher to be able to connect to the node, so that it could download blocks from the manually started sequencer. // and the node is the only one up, to allow batcher to be able to connect to the node, so that it could download blocks from the manually started sequencer.
OverrideLeader(ctx context.Context) error // override: true => force current conductor to be treated as leader regardless of the actual leadership status in raft.
// override: false => clear the override, return the actual leadership status in raft.
OverrideLeader(ctx context.Context, override bool) error
// LeaderOverridden returns true if the leader status is overridden.
LeaderOverridden(ctx context.Context) (bool, error)
// Pause pauses op-conductor. // Pause pauses op-conductor.
Pause(ctx context.Context) error Pause(ctx context.Context) error
// Resume resumes op-conductor. // Resume resumes op-conductor.
......
...@@ -2,7 +2,6 @@ package rpc ...@@ -2,7 +2,6 @@ package rpc
import ( import (
"context" "context"
"sync/atomic"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -11,6 +10,8 @@ import ( ...@@ -11,6 +10,8 @@ import (
) )
type conductor interface { type conductor interface {
OverrideLeader(override bool)
LeaderOverridden() bool
Pause(ctx context.Context) error Pause(ctx context.Context) error
Resume(ctx context.Context) error Resume(ctx context.Context) error
Stop(ctx context.Context) error Stop(ctx context.Context) error
...@@ -32,9 +33,8 @@ type conductor interface { ...@@ -32,9 +33,8 @@ type conductor interface {
// APIBackend is the backend implementation of the API. // APIBackend is the backend implementation of the API.
// TODO: (https://github.com/ethereum-optimism/protocol-quest/issues/45) Add metrics tracer here. // TODO: (https://github.com/ethereum-optimism/protocol-quest/issues/45) Add metrics tracer here.
type APIBackend struct { type APIBackend struct {
log log.Logger log log.Logger
con conductor con conductor
leaderOverride atomic.Bool
} }
// NewAPIBackend creates a new APIBackend instance. // NewAPIBackend creates a new APIBackend instance.
...@@ -48,11 +48,16 @@ func NewAPIBackend(log log.Logger, con conductor) *APIBackend { ...@@ -48,11 +48,16 @@ func NewAPIBackend(log log.Logger, con conductor) *APIBackend {
var _ API = (*APIBackend)(nil) var _ API = (*APIBackend)(nil)
// OverrideLeader implements API. // OverrideLeader implements API.
func (api *APIBackend) OverrideLeader(ctx context.Context) error { func (api *APIBackend) OverrideLeader(_ context.Context, override bool) error {
api.leaderOverride.Store(true) api.con.OverrideLeader(override)
return nil return nil
} }
// LeaderOverridden implements API.
func (api *APIBackend) LeaderOverridden(_ context.Context) (bool, error) {
return api.con.LeaderOverridden(), nil
}
// Paused implements API. // Paused implements API.
func (api *APIBackend) Paused(ctx context.Context) (bool, error) { func (api *APIBackend) Paused(ctx context.Context) (bool, error) {
return api.con.Paused(), nil return api.con.Paused(), nil
...@@ -90,19 +95,11 @@ func (api *APIBackend) CommitUnsafePayload(ctx context.Context, payload *eth.Exe ...@@ -90,19 +95,11 @@ func (api *APIBackend) CommitUnsafePayload(ctx context.Context, payload *eth.Exe
// Leader implements API, returns true if current conductor is leader of the cluster. // Leader implements API, returns true if current conductor is leader of the cluster.
func (api *APIBackend) Leader(ctx context.Context) (bool, error) { func (api *APIBackend) Leader(ctx context.Context) (bool, error) {
return api.leaderOverride.Load() || api.con.Leader(ctx), nil return api.con.Leader(ctx), nil
} }
// LeaderWithID implements API, returns the leader's server ID and address (not necessarily the current conductor). // LeaderWithID implements API, returns the leader's server ID and address (not necessarily the current conductor).
func (api *APIBackend) LeaderWithID(ctx context.Context) (*consensus.ServerInfo, error) { func (api *APIBackend) LeaderWithID(ctx context.Context) (*consensus.ServerInfo, error) {
if api.leaderOverride.Load() {
return &consensus.ServerInfo{
ID: "N/A (Leader overridden)",
Addr: "N/A",
Suffrage: 0,
}, nil
}
return api.con.LeaderWithID(ctx), nil return api.con.LeaderWithID(ctx), nil
} }
......
...@@ -28,8 +28,15 @@ func prefixRPC(method string) string { ...@@ -28,8 +28,15 @@ func prefixRPC(method string) string {
} }
// OverrideLeader implements API. // OverrideLeader implements API.
func (c *APIClient) OverrideLeader(ctx context.Context) error { func (c *APIClient) OverrideLeader(ctx context.Context, override bool) error {
return c.c.CallContext(ctx, nil, prefixRPC("overrideLeader")) return c.c.CallContext(ctx, nil, prefixRPC("overrideLeader"), override)
}
// LeaderOverridden implements API.
func (c *APIClient) LeaderOverridden(ctx context.Context) (bool, error) {
var overridden bool
err := c.c.CallContext(ctx, &overridden, prefixRPC("leaderOverridden"))
return overridden, err
} }
// Paused implements API. // Paused implements API.
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"testing" "testing"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-conductor/consensus" "github.com/ethereum-optimism/optimism/op-conductor/consensus"
...@@ -232,9 +233,45 @@ func TestSequencerFailover_DisasterRecovery_OverrideLeader(t *testing.T) { ...@@ -232,9 +233,45 @@ func TestSequencerFailover_DisasterRecovery_OverrideLeader(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, active, "Expected sequencer to be active") require.True(t, active, "Expected sequencer to be active")
err = conductors[Sequencer3Name].client.OverrideLeader(ctx) err = conductors[Sequencer3Name].client.OverrideLeader(ctx, true)
require.NoError(t, err) require.NoError(t, err)
leader, err := conductors[Sequencer3Name].client.Leader(ctx) leader, err := conductors[Sequencer3Name].client.Leader(ctx)
require.NoError(t, err) require.NoError(t, err)
require.True(t, leader, "Expected conductor to return leader true after override") require.True(t, leader, "Expected conductor to return leader true after override")
overridden, err := conductors[Sequencer3Name].client.LeaderOverridden(ctx)
require.NoError(t, err)
require.True(t, overridden, "Expected conductor to return leader overridden true after override")
// make sure all proxied method are working correctly.
proxy, err := rpc.DialContext(ctx, conductors[Sequencer3Name].RPCEndpoint())
require.NoError(t, err)
err = proxy.CallContext(ctx, &active, "admin_sequencerActive")
require.NoError(t, err)
require.True(t, active, "Expected sequencer to be active")
err = proxy.CallContext(ctx, nil, "optimism_syncStatus")
require.NoError(t, err)
var block map[string]any
err = proxy.CallContext(ctx, &block, "eth_getBlockByNumber", "latest", false)
require.NoError(t, err)
err = proxy.CallContext(ctx, nil, "optimism_outputAtBlock", block["number"])
require.NoError(t, err)
err = proxy.CallContext(ctx, nil, "optimism_rollupConfig")
require.NoError(t, err)
err = conductors[Sequencer3Name].client.OverrideLeader(ctx, false)
require.NoError(t, err)
overridden, err = conductors[Sequencer3Name].client.LeaderOverridden(ctx)
require.NoError(t, err)
require.False(t, overridden, "Expected conductor to return leader overridden false after override")
err = proxy.CallContext(ctx, &active, "admin_sequencerActive")
require.ErrorContains(t, err, "refusing to proxy request to non-leader sequencer", "Expected sequencer to fail to get active status")
err = proxy.CallContext(ctx, nil, "optimism_syncStatus")
require.ErrorContains(t, err, "refusing to proxy request to non-leader sequencer", "Expected sequencer to fail to get sync status")
err = proxy.CallContext(ctx, nil, "eth_getBlockByNumber", "latest", false)
require.ErrorContains(t, err, "refusing to proxy request to non-leader sequencer", "Expected sequencer to fail to get block by number")
err = proxy.CallContext(ctx, nil, "optimism_outputAtBlock", block["number"])
require.ErrorContains(t, err, "refusing to proxy request to non-leader sequencer", "Expected sequencer to fail to get output at block")
err = proxy.CallContext(ctx, nil, "optimism_rollupConfig")
require.ErrorContains(t, err, "refusing to proxy request to non-leader sequencer", "Expected sequencer to fail to get rollup config")
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment