Commit c54b656b authored by Francis Li's avatar Francis Li Committed by GitHub

op-conductor: add override to disable HA mode in disaster recovery scenarios (#10976)

* op-conductor: add override to disable HA mode in disaster recovery scenarios

* op-conductor: add leader override in conductor API

* Change leaderOverride to atomic.Bool
parent d74848c2
......@@ -15,6 +15,10 @@ var ErrNotLeader = errors.New("refusing to proxy request to non-leader sequencer
// API defines the interface for the op-conductor API.
type API interface {
// OverrideLeader is used to override the leader status, this is only used to return true for Leader() & LeaderWithID() calls.
// It does not impact the actual raft consensus leadership status. It is supposed to be used when the cluster is unhealthy
// and the node is the only one up, to allow batcher to be able to connect to the node, so that it could download blocks from the manually started sequencer.
OverrideLeader(ctx context.Context) error
// Pause pauses op-conductor.
Pause(ctx context.Context) error
// Resume resumes op-conductor.
......
......@@ -2,6 +2,7 @@ package rpc
import (
"context"
"sync/atomic"
"github.com/ethereum/go-ethereum/log"
......@@ -29,10 +30,10 @@ type conductor interface {
// APIBackend is the backend implementation of the API.
// TODO: (https://github.com/ethereum-optimism/protocol-quest/issues/45) Add metrics tracer here.
// TODO: (https://github.com/ethereum-optimism/protocol-quest/issues/44) add tests after e2e setup.
type APIBackend struct {
log log.Logger
con conductor
leaderOverride atomic.Bool
}
// NewAPIBackend creates a new APIBackend instance.
......@@ -45,6 +46,12 @@ func NewAPIBackend(log log.Logger, con conductor) *APIBackend {
var _ API = (*APIBackend)(nil)
// OverrideLeader implements API.
func (api *APIBackend) OverrideLeader(ctx context.Context) error {
api.leaderOverride.Store(true)
return nil
}
// Paused implements API.
func (api *APIBackend) Paused(ctx context.Context) (bool, error) {
return api.con.Paused(), nil
......@@ -82,11 +89,19 @@ func (api *APIBackend) CommitUnsafePayload(ctx context.Context, payload *eth.Exe
// Leader implements API, returns true if current conductor is leader of the cluster.
func (api *APIBackend) Leader(ctx context.Context) (bool, error) {
return api.con.Leader(ctx), nil
return api.leaderOverride.Load() || api.con.Leader(ctx), nil
}
// LeaderWithID implements API, returns the leader's server ID and address (not necessarily the current conductor).
func (api *APIBackend) LeaderWithID(ctx context.Context) (*consensus.ServerInfo, error) {
if api.leaderOverride.Load() {
return &consensus.ServerInfo{
ID: "N/A (Leader overridden)",
Addr: "N/A",
Suffrage: 0,
}, nil
}
return api.con.LeaderWithID(ctx), nil
}
......
......@@ -27,6 +27,11 @@ func prefixRPC(method string) string {
return RPCNamespace + "_" + method
}
// OverrideLeader implements API.
func (c *APIClient) OverrideLeader(ctx context.Context) error {
return c.c.CallContext(ctx, nil, prefixRPC("overrideLeader"))
}
// Paused implements API.
func (c *APIClient) Paused(ctx context.Context) (bool, error) {
var paused bool
......
......@@ -6,12 +6,11 @@ import (
"fmt"
"io"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
gnode "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/rollup"
......@@ -210,6 +209,10 @@ func (s *l2VerifierBackend) SequencerActive(ctx context.Context) (bool, error) {
return false, nil
}
func (s *l2VerifierBackend) OverrideLeader(ctx context.Context) error {
return nil
}
func (s *l2VerifierBackend) OnUnsafeL2Payload(ctx context.Context, envelope *eth.ExecutionPayloadEnvelope) error {
return nil
}
......
......@@ -5,6 +5,7 @@ import (
"sort"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-conductor/consensus"
......@@ -189,3 +190,44 @@ func TestSequencerFailover_ActiveSequencerDown(t *testing.T) {
require.NoError(t, err)
require.True(t, active, "Expected new leader to be sequencing")
}
// [Category: Disaster Recovery]
// Test that sequencer can successfully be started with the overrideLeader flag set to true.
func TestSequencerFailover_DisasterRecovery_OverrideLeader(t *testing.T) {
sys, conductors, cleanup := setupSequencerFailoverTest(t)
defer cleanup()
// randomly stop 2 nodes in the cluster to simulate a disaster.
ctx := context.Background()
err := conductors[Sequencer1Name].service.Stop(ctx)
require.NoError(t, err)
err = conductors[Sequencer2Name].service.Stop(ctx)
require.NoError(t, err)
require.False(t, conductors[Sequencer3Name].service.Leader(ctx), "Expected sequencer to not be the leader")
active, err := sys.RollupClient(Sequencer3Name).SequencerActive(ctx)
require.NoError(t, err)
require.False(t, active, "Expected sequencer to be inactive")
// Start sequencer without the overrideLeader flag set to true, should fail
err = sys.RollupClient(Sequencer3Name).StartSequencer(ctx, common.Hash{1, 2, 3})
require.ErrorContains(t, err, "sequencer is not the leader, aborting.", "Expected sequencer to fail to start")
// Start sequencer with the overrideLeader flag set to true, should succeed
err = sys.RollupClient(Sequencer3Name).OverrideLeader(ctx)
require.NoError(t, err)
blk, err := sys.NodeClient(Sequencer3Name).BlockByNumber(ctx, nil)
require.NoError(t, err)
err = sys.RollupClient(Sequencer3Name).StartSequencer(ctx, blk.Hash())
require.NoError(t, err)
active, err = sys.RollupClient(Sequencer3Name).SequencerActive(ctx)
require.NoError(t, err)
require.True(t, active, "Expected sequencer to be active")
err = conductors[Sequencer3Name].client.OverrideLeader(ctx)
require.NoError(t, err)
leader, err := conductors[Sequencer3Name].client.Leader(ctx)
require.NoError(t, err)
require.True(t, leader, "Expected conductor to return leader true after override")
}
......@@ -5,11 +5,11 @@ import (
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/version"
"github.com/ethereum-optimism/optimism/op-service/eth"
......@@ -33,6 +33,7 @@ type driverClient interface {
StopSequencer(context.Context) (common.Hash, error)
SequencerActive(context.Context) (bool, error)
OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
OverrideLeader(ctx context.Context) error
}
type SafeDBReader interface {
......@@ -77,7 +78,6 @@ func (n *adminAPI) SequencerActive(ctx context.Context) (bool, error) {
// PostUnsafePayload is a special API that allows posting an unsafe payload to the L2 derivation pipeline.
// It should only be used by op-conductor for sequencer failover scenarios.
// TODO(ethereum-optimism/optimism#9064): op-conductor Dencun changes.
func (n *adminAPI) PostUnsafePayload(ctx context.Context, envelope *eth.ExecutionPayloadEnvelope) error {
recordDur := n.M.RecordRPCServerRequest("admin_postUnsafePayload")
defer recordDur()
......@@ -91,6 +91,13 @@ func (n *adminAPI) PostUnsafePayload(ctx context.Context, envelope *eth.Executio
return n.dr.OnUnsafeL2Payload(ctx, envelope)
}
// OverrideLeader disables sequencer conductor interactions and allow sequencer to run in non-HA mode during disaster recovery scenarios.
func (n *adminAPI) OverrideLeader(ctx context.Context) error {
recordDur := n.M.RecordRPCServerRequest("admin_overrideLeader")
defer recordDur()
return n.dr.OverrideLeader(ctx)
}
type nodeAPI struct {
config *rollup.Config
client l2EthClient
......
......@@ -3,16 +3,17 @@ package node
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/log"
conductorRpc "github.com/ethereum-optimism/optimism/op-conductor/rpc"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/log"
conductorRpc "github.com/ethereum-optimism/optimism/op-conductor/rpc"
)
// ConductorClient is a client for the op-conductor RPC service.
......@@ -21,13 +22,22 @@ type ConductorClient struct {
metrics *metrics.Metrics
log log.Logger
apiClient *conductorRpc.APIClient
// overrideLeader is used to override the leader check for disaster recovery purposes.
// During disaster situations where the cluster is unhealthy (no leader, only 1 or less nodes up),
// set this to true to allow the node to assume sequencing responsibilities without being the leader.
overrideLeader atomic.Bool
}
var _ conductor.SequencerConductor = &ConductorClient{}
// NewConductorClient returns a new conductor client for the op-conductor RPC service.
func NewConductorClient(cfg *Config, log log.Logger, metrics *metrics.Metrics) *ConductorClient {
return &ConductorClient{cfg: cfg, metrics: metrics, log: log}
return &ConductorClient{
cfg: cfg,
metrics: metrics,
log: log,
}
}
// Initialize initializes the conductor client.
......@@ -45,6 +55,10 @@ func (c *ConductorClient) initialize() error {
// Leader returns true if this node is the leader sequencer.
func (c *ConductorClient) Leader(ctx context.Context) (bool, error) {
if c.overrideLeader.Load() {
return true, nil
}
if err := c.initialize(); err != nil {
return false, err
}
......@@ -62,6 +76,10 @@ func (c *ConductorClient) Leader(ctx context.Context) (bool, error) {
// CommitUnsafePayload commits an unsafe payload to the conductor log.
func (c *ConductorClient) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error {
if c.overrideLeader.Load() {
return nil
}
if err := c.initialize(); err != nil {
return err
}
......@@ -78,6 +96,12 @@ func (c *ConductorClient) CommitUnsafePayload(ctx context.Context, payload *eth.
return err
}
// OverrideLeader implements conductor.SequencerConductor.
func (c *ConductorClient) OverrideLeader(ctx context.Context) error {
c.overrideLeader.Store(true)
return nil
}
func (c *ConductorClient) Close() {
if c.apiClient == nil {
return
......
......@@ -283,6 +283,10 @@ func (c *mockDriverClient) OnUnsafeL2Payload(ctx context.Context, payload *eth.E
return c.Mock.MethodCalled("OnUnsafeL2Payload").Get(0).(error)
}
func (c *mockDriverClient) OverrideLeader(ctx context.Context) error {
return c.Mock.MethodCalled("OverrideLeader").Get(0).(error)
}
type mockSafeDBReader struct {
mock.Mock
}
......
......@@ -9,14 +9,21 @@ import (
// SequencerConductor is an interface for the driver to communicate with the sequencer conductor.
// It is used to determine if the current node is the active sequencer, and to commit unsafe payloads to the conductor log.
type SequencerConductor interface {
// Leader returns true if this node is the leader sequencer.
Leader(ctx context.Context) (bool, error)
// CommitUnsafePayload commits an unsafe payload to the conductor FSM.
CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
// OverrideLeader forces current node to be considered leader and be able to start sequencing during disaster situations in HA mode.
OverrideLeader(ctx context.Context) error
// Close closes the conductor client.
Close()
}
// NoOpConductor is a no-op conductor that assumes this node is the leader sequencer.
type NoOpConductor struct{}
var _ SequencerConductor = &NoOpConductor{}
// Leader returns true if this node is the leader sequencer. NoOpConductor always returns true.
func (c *NoOpConductor) Leader(ctx context.Context) (bool, error) {
return true, nil
......@@ -27,5 +34,10 @@ func (c *NoOpConductor) CommitUnsafePayload(ctx context.Context, payload *eth.Ex
return nil
}
// OverrideLeader implements SequencerConductor.
func (c *NoOpConductor) OverrideLeader(ctx context.Context) error {
return nil
}
// Close closes the conductor client.
func (c *NoOpConductor) Close() {}
......@@ -636,6 +636,10 @@ func (s *Driver) SequencerActive(ctx context.Context) (bool, error) {
}
}
func (s *Driver) OverrideLeader(ctx context.Context) error {
return s.sequencerConductor.OverrideLeader(ctx)
}
// syncStatus returns the current sync status, and should only be called synchronously with
// the driver event loop to avoid retrieval of an inconsistent status.
func (s *Driver) syncStatus() *eth.SyncStatus {
......
......@@ -3,10 +3,9 @@ package sources
import (
"context"
"golang.org/x/exp/slog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"golang.org/x/exp/slog"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/client"
......@@ -71,6 +70,10 @@ func (r *RollupClient) PostUnsafePayload(ctx context.Context, payload *eth.Execu
return r.rpc.CallContext(ctx, nil, "admin_postUnsafePayload", payload)
}
func (r *RollupClient) OverrideLeader(ctx context.Context) error {
return r.rpc.CallContext(ctx, nil, "admin_overrideLeader")
}
func (r *RollupClient) SetLogLevel(ctx context.Context, lvl slog.Level) error {
return r.rpc.CallContext(ctx, nil, "admin_setLogLevel", lvl.String())
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment