Commit 4a487b89 authored by Brian Bland's avatar Brian Bland Committed by GitHub

feat: op-conductor strongly consistent reads (#10619)

* op-conductor: add more logs for raft debugging

* Add barrier

* LatestUnsafePayload reads in a strongly consistent fashion

* Atomic swap OpConductor.healthy

* Fix conductor/service_test

* Add test for when LatestUnsafePayload returns an error

* Update some method comments

---------
Co-authored-by: default avatarFrancis Li <francis.li@coinbase.com>
parent df2aebaf
......@@ -30,10 +30,10 @@ import (
)
var (
ErrResumeTimeout = errors.New("timeout to resume conductor")
ErrPauseTimeout = errors.New("timeout to pause conductor")
ErrUnsafeHeadMismarch = errors.New("unsafe head mismatch")
ErrUnableToRetrieveUnsafeHeadFromConsensus = errors.New("unable to retrieve unsafe head from consensus")
ErrResumeTimeout = errors.New("timeout to resume conductor")
ErrPauseTimeout = errors.New("timeout to pause conductor")
ErrUnsafeHeadMismatch = errors.New("unsafe head mismatch")
ErrNoUnsafeHead = errors.New("no unsafe head")
)
// New creates a new OpConductor instance.
......@@ -441,7 +441,7 @@ func (oc *OpConductor) TransferLeaderToServer(_ context.Context, id string, addr
return oc.cons.TransferLeaderTo(id, addr)
}
// CommitUnsafePayload commits a unsafe payload (latest head) to the cluster FSM.
// CommitUnsafePayload commits an unsafe payload (latest head) to the cluster FSM ensuring strong consistency by leveraging Raft consensus mechanisms.
func (oc *OpConductor) CommitUnsafePayload(_ context.Context, payload *eth.ExecutionPayloadEnvelope) error {
return oc.cons.CommitUnsafePayload(payload)
}
......@@ -456,8 +456,8 @@ func (oc *OpConductor) ClusterMembership(_ context.Context) ([]*consensus.Server
return oc.cons.ClusterMembership()
}
// LatestUnsafePayload returns the latest unsafe payload envelope from FSM.
func (oc *OpConductor) LatestUnsafePayload(_ context.Context) *eth.ExecutionPayloadEnvelope {
// LatestUnsafePayload returns the latest unsafe payload envelope from FSM in a strongly consistent fashion.
func (oc *OpConductor) LatestUnsafePayload(_ context.Context) (*eth.ExecutionPayloadEnvelope, error) {
return oc.cons.LatestUnsafePayload()
}
......@@ -522,12 +522,11 @@ func (oc *OpConductor) handleHealthUpdate(hcerr error) {
oc.queueAction()
}
if healthy != oc.healthy.Load() {
if oc.healthy.Swap(healthy) != healthy {
// queue an action if health status changed.
oc.queueAction()
}
oc.healthy.Store(healthy)
oc.hcerr = hcerr
}
......@@ -668,8 +667,15 @@ func (oc *OpConductor) startSequencer() error {
unsafeInCons, unsafeInNode, err := oc.compareUnsafeHead(ctx)
// if there's a mismatch, try to post the unsafe head to op-node
if err != nil {
if errors.Is(err, ErrUnsafeHeadMismarch) && uint64(unsafeInCons.ExecutionPayload.BlockNumber)-unsafeInNode.NumberU64() == 1 {
if errors.Is(err, ErrUnsafeHeadMismatch) && uint64(unsafeInCons.ExecutionPayload.BlockNumber)-unsafeInNode.NumberU64() == 1 {
// tries to post the unsafe head to op-node when head is only 1 block behind (most likely due to gossip delay)
oc.log.Debug(
"posting unsafe head to op-node",
"consensus_num", uint64(unsafeInCons.ExecutionPayload.BlockNumber),
"consensus_hash", unsafeInCons.ExecutionPayload.BlockHash.Hex(),
"node_num", unsafeInNode.NumberU64(),
"node_hash", unsafeInNode.Hash().Hex(),
)
if innerErr := oc.ctrl.PostUnsafePayload(ctx, unsafeInCons); innerErr != nil {
oc.log.Error("failed to post unsafe head payload envelope to op-node", "err", innerErr)
}
......@@ -692,9 +698,12 @@ func (oc *OpConductor) startSequencer() error {
}
func (oc *OpConductor) compareUnsafeHead(ctx context.Context) (*eth.ExecutionPayloadEnvelope, eth.BlockInfo, error) {
unsafeInCons := oc.cons.LatestUnsafePayload()
unsafeInCons, err := oc.cons.LatestUnsafePayload()
if err != nil {
return nil, nil, errors.Wrap(err, "unable to retrieve unsafe head from consensus")
}
if unsafeInCons == nil {
return nil, nil, ErrUnableToRetrieveUnsafeHeadFromConsensus
return nil, nil, ErrNoUnsafeHead
}
unsafeInNode, err := oc.ctrl.LatestUnsafeBlock(ctx)
......@@ -702,17 +711,17 @@ func (oc *OpConductor) compareUnsafeHead(ctx context.Context) (*eth.ExecutionPay
return unsafeInCons, nil, errors.Wrap(err, "failed to get latest unsafe block from EL during compareUnsafeHead phase")
}
oc.log.Debug("comparing unsafe head", "consensus", unsafeInCons.ExecutionPayload.BlockNumber, "node", unsafeInNode.NumberU64())
oc.log.Debug("comparing unsafe head", "consensus", uint64(unsafeInCons.ExecutionPayload.BlockNumber), "node", unsafeInNode.NumberU64())
if unsafeInCons.ExecutionPayload.BlockHash != unsafeInNode.Hash() {
oc.log.Warn(
"latest unsafe block in consensus is not the same as the one in op-node",
"consensus_hash", unsafeInCons.ExecutionPayload.BlockHash,
"consensus_block_num", unsafeInCons.ExecutionPayload.BlockNumber,
"consensus_num", uint64(unsafeInCons.ExecutionPayload.BlockNumber),
"node_hash", unsafeInNode.Hash(),
"node_block_num", unsafeInNode.NumberU64(),
"node_num", unsafeInNode.NumberU64(),
)
return unsafeInCons, unsafeInNode, ErrUnsafeHeadMismarch
return unsafeInCons, unsafeInNode, ErrUnsafeHeadMismatch
}
return unsafeInCons, unsafeInNode, nil
......
......@@ -298,7 +298,7 @@ func (s *OpConductorTestSuite) TestScenario1() {
InfoHash: [32]byte{1, 2, 3},
}
s.cons.EXPECT().TransferLeader().Return(nil)
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(1)
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1)
// become leader
......@@ -317,6 +317,42 @@ func (s *OpConductorTestSuite) TestScenario1() {
s.cons.AssertNumberOfCalls(s.T(), "TransferLeader", 1)
}
// In this test, we have a follower that is not healthy and not sequencing, it becomes leader through election.
// But since it fails to compare the unsafe head to the value stored in consensus, we expect it to transfer leadership to another node.
// [follower, not healthy, not sequencing] -- become leader --> [leader, not healthy, not sequencing] -- transfer leadership --> [follower, not healthy, not sequencing]
func (s *OpConductorTestSuite) TestScenario1Err() {
s.enableSynchronization()
// set initial state
s.conductor.leader.Store(false)
s.conductor.healthy.Store(false)
s.conductor.seqActive.Store(false)
s.conductor.hcerr = health.ErrSequencerNotHealthy
s.conductor.prevState = &state{
leader: false,
healthy: false,
active: false,
}
s.cons.EXPECT().LatestUnsafePayload().Return(nil, errors.New("fake connection error")).Times(1)
s.cons.EXPECT().TransferLeader().Return(nil)
// become leader
s.updateLeaderStatusAndExecuteAction(true)
// expect to transfer leadership, go back to [follower, not healthy, not sequencing]
s.False(s.conductor.leader.Load())
s.False(s.conductor.healthy.Load())
s.False(s.conductor.seqActive.Load())
s.Equal(health.ErrSequencerNotHealthy, s.conductor.hcerr)
s.Equal(&state{
leader: true,
healthy: false,
active: false,
}, s.conductor.prevState)
s.cons.AssertNumberOfCalls(s.T(), "TransferLeader", 1)
}
// In this test, we have a follower that is not healthy and not sequencing. it becomes healthy and we expect it to stay as follower and not start sequencing.
// [follower, not healthy, not sequencing] -- become healthy --> [follower, healthy, not sequencing]
func (s *OpConductorTestSuite) TestScenario2() {
......@@ -353,7 +389,7 @@ func (s *OpConductorTestSuite) TestScenario3() {
InfoNum: 1,
InfoHash: [32]byte{1, 2, 3},
}
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(1)
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1)
s.ctrl.EXPECT().StartSequencer(mock.Anything, mock.Anything).Return(nil).Times(1)
......@@ -392,7 +428,7 @@ func (s *OpConductorTestSuite) TestScenario4() {
InfoNum: 1,
InfoHash: [32]byte{2, 3, 4},
}
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(1)
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1)
s.ctrl.EXPECT().PostUnsafePayload(mock.Anything, mock.Anything).Return(nil).Times(1)
......@@ -410,7 +446,7 @@ func (s *OpConductorTestSuite) TestScenario4() {
// unsafe caught up, we try to start sequencer at specified block and succeeds
mockBlockInfo.InfoNum = 2
mockBlockInfo.InfoHash = [32]byte{1, 2, 3}
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(1)
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(1)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(1)
s.ctrl.EXPECT().StartSequencer(mock.Anything, mockBlockInfo.InfoHash).Return(nil).Times(1)
......@@ -664,7 +700,7 @@ func (s *OpConductorTestSuite) TestFailureAndRetry3() {
InfoNum: 1,
InfoHash: [32]byte{1, 2, 3},
}
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload).Times(2)
s.cons.EXPECT().LatestUnsafePayload().Return(mockPayload, nil).Times(2)
s.ctrl.EXPECT().LatestUnsafeBlock(mock.Anything).Return(mockBlockInfo, nil).Times(2)
s.ctrl.EXPECT().StartSequencer(mock.Anything, mockBlockInfo.InfoHash).Return(nil).Times(1)
......
......@@ -59,10 +59,10 @@ type Consensus interface {
// ClusterMembership returns the current cluster membership configuration.
ClusterMembership() ([]*ServerInfo, error)
// CommitPayload commits latest unsafe payload to the FSM.
// CommitPayload commits latest unsafe payload to the FSM in a strongly consistent fashion.
CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error
// LatestUnsafeBlock returns the latest unsafe payload from FSM.
LatestUnsafePayload() *eth.ExecutionPayloadEnvelope
// LatestUnsafeBlock returns the latest unsafe payload from FSM in a strongly consistent fashion.
LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, error)
// Shutdown shuts down the consensus protocol client.
Shutdown() error
......
......@@ -266,7 +266,7 @@ func (_c *Consensus_DemoteVoter_Call) RunAndReturn(run func(string) error) *Cons
}
// LatestUnsafePayload provides a mock function with given fields:
func (_m *Consensus) LatestUnsafePayload() *eth.ExecutionPayloadEnvelope {
func (_m *Consensus) LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, error) {
ret := _m.Called()
if len(ret) == 0 {
......@@ -274,6 +274,10 @@ func (_m *Consensus) LatestUnsafePayload() *eth.ExecutionPayloadEnvelope {
}
var r0 *eth.ExecutionPayloadEnvelope
var r1 error
if rf, ok := ret.Get(0).(func() (*eth.ExecutionPayloadEnvelope, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() *eth.ExecutionPayloadEnvelope); ok {
r0 = rf()
} else {
......@@ -282,7 +286,13 @@ func (_m *Consensus) LatestUnsafePayload() *eth.ExecutionPayloadEnvelope {
}
}
return r0
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Consensus_LatestUnsafePayload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LatestUnsafePayload'
......@@ -302,12 +312,12 @@ func (_c *Consensus_LatestUnsafePayload_Call) Run(run func()) *Consensus_LatestU
return _c
}
func (_c *Consensus_LatestUnsafePayload_Call) Return(_a0 *eth.ExecutionPayloadEnvelope) *Consensus_LatestUnsafePayload_Call {
_c.Call.Return(_a0)
func (_c *Consensus_LatestUnsafePayload_Call) Return(_a0 *eth.ExecutionPayloadEnvelope, _a1 error) *Consensus_LatestUnsafePayload_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *Consensus_LatestUnsafePayload_Call) RunAndReturn(run func() *eth.ExecutionPayloadEnvelope) *Consensus_LatestUnsafePayload_Call {
func (_c *Consensus_LatestUnsafePayload_Call) RunAndReturn(run func() (*eth.ExecutionPayloadEnvelope, error)) *Consensus_LatestUnsafePayload_Call {
_c.Call.Return(run)
return _c
}
......
......@@ -75,7 +75,7 @@ func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, b
return nil, errors.Wrap(err, "failed to create raft tcp transport")
}
fsm := &unsafeHeadTracker{}
fsm := NewUnsafeHeadTracker(log)
r, err := raft.NewRaft(rc, fsm, logStore, stableStore, snapshotStore, transport)
if err != nil {
......@@ -140,8 +140,7 @@ func (rc *RaftConsensus) DemoteVoter(id string) error {
// Leader implements Consensus, it returns true if it is the leader of the cluster.
func (rc *RaftConsensus) Leader() bool {
_, id := rc.r.LeaderWithID()
return id == rc.serverID
return rc.r.State() == raft.Leader
}
// LeaderWithID implements Consensus, it returns the leader's server ID and address.
......@@ -205,8 +204,10 @@ func (rc *RaftConsensus) Shutdown() error {
return nil
}
// CommitUnsafePayload implements Consensus, it commits latest unsafe payload to the cluster FSM.
// CommitUnsafePayload implements Consensus, it commits latest unsafe payload to the cluster FSM in a strongly consistent fashion.
func (rc *RaftConsensus) CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error {
rc.log.Debug("committing unsafe payload", "number", uint64(payload.ExecutionPayload.BlockNumber), "hash", payload.ExecutionPayload.BlockHash.Hex())
var buf bytes.Buffer
if _, err := payload.MarshalSSZ(&buf); err != nil {
return errors.Wrap(err, "failed to marshal payload envelope")
......@@ -216,14 +217,18 @@ func (rc *RaftConsensus) CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelo
if err := f.Error(); err != nil {
return errors.Wrap(err, "failed to apply payload envelope")
}
rc.log.Debug("unsafe payload committed", "number", uint64(payload.ExecutionPayload.BlockNumber), "hash", payload.ExecutionPayload.BlockHash.Hex())
return nil
}
// LatestUnsafePayload implements Consensus, it returns the latest unsafe payload from FSM.
func (rc *RaftConsensus) LatestUnsafePayload() *eth.ExecutionPayloadEnvelope {
payload := rc.unsafeTracker.UnsafeHead()
return payload
// LatestUnsafePayload implements Consensus, it returns the latest unsafe payload from FSM in a strongly consistent fashion.
func (rc *RaftConsensus) LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, error) {
if err := rc.r.Barrier(defaultTimeout).Error(); err != nil {
return nil, errors.Wrap(err, "failed to apply barrier")
}
return rc.unsafeTracker.UnsafeHead(), nil
}
// ClusterMembership implements Consensus, it returns the current cluster membership configuration.
......
......@@ -16,10 +16,17 @@ var _ raft.FSM = (*unsafeHeadTracker)(nil)
// unsafeHeadTracker implements raft.FSM for storing unsafe head payload into raft consensus layer.
type unsafeHeadTracker struct {
log log.Logger
mtx sync.RWMutex
unsafeHead *eth.ExecutionPayloadEnvelope
}
func NewUnsafeHeadTracker(log log.Logger) *unsafeHeadTracker {
return &unsafeHeadTracker{
log: log,
}
}
// Apply implements raft.FSM, it applies the latest change (latest unsafe head payload) to FSM.
func (t *unsafeHeadTracker) Apply(l *raft.Log) interface{} {
if l.Data == nil || len(l.Data) == 0 {
......@@ -33,6 +40,7 @@ func (t *unsafeHeadTracker) Apply(l *raft.Log) interface{} {
t.mtx.Lock()
defer t.mtx.Unlock()
t.log.Debug("applying new unsafe head", "number", uint64(data.ExecutionPayload.BlockNumber), "hash", data.ExecutionPayload.BlockHash.Hex())
if t.unsafeHead == nil || t.unsafeHead.ExecutionPayload.BlockNumber < data.ExecutionPayload.BlockNumber {
t.unsafeHead = data
}
......
......@@ -8,22 +8,24 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/hashicorp/raft"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
)
type Bytes32 [32]byte
func createPayloadEnvelope() *eth.ExecutionPayloadEnvelope {
func createPayloadEnvelope(blockNum uint64) *eth.ExecutionPayloadEnvelope {
hash := common.HexToHash("0x12345")
one := hexutil.Uint64(1)
return &eth.ExecutionPayloadEnvelope{
ParentBeaconBlockRoot: &hash,
ExecutionPayload: &eth.ExecutionPayload{
BlockNumber: eth.Uint64Quantity(222),
BlockNumber: eth.Uint64Quantity(blockNum),
BlockHash: common.HexToHash("0x888"),
Withdrawals: &types.Withdrawals{{Index: 1, Validator: 2, Address: common.HexToAddress("0x123"), Amount: 3}},
ExcessBlobGas: &one,
......@@ -32,11 +34,12 @@ func createPayloadEnvelope() *eth.ExecutionPayloadEnvelope {
}
func TestUnsafeHeadTracker(t *testing.T) {
tracker := &unsafeHeadTracker{
unsafeHead: createPayloadEnvelope(),
log: testlog.Logger(t, log.LevelDebug),
unsafeHead: createPayloadEnvelope(222),
}
t.Run("Apply", func(t *testing.T) {
data := createPayloadEnvelope()
data := createPayloadEnvelope(333)
var buf bytes.Buffer
_, err := data.MarshalSSZ(&buf)
......@@ -44,17 +47,27 @@ func TestUnsafeHeadTracker(t *testing.T) {
l := raft.Log{Data: buf.Bytes()}
require.Nil(t, tracker.Apply(&l))
require.Equal(t, hexutil.Uint64(222), tracker.unsafeHead.ExecutionPayload.BlockNumber)
require.Equal(t, hexutil.Uint64(333), tracker.unsafeHead.ExecutionPayload.BlockNumber)
})
t.Run("Snapshot", func(t *testing.T) {
snapshot, err := tracker.Snapshot()
require.NoError(t, err)
sink := new(raft.DiscardSnapshotSink)
err = snapshot.Persist(sink)
require.NoError(t, err)
})
t.Run("Restore", func(t *testing.T) {
data := createPayloadEnvelope()
data := createPayloadEnvelope(333)
mrc, err := NewMockReadCloser(data)
require.NoError(t, err)
err = tracker.Restore(mrc)
require.NoError(t, err)
require.Equal(t, hexutil.Uint64(222), tracker.unsafeHead.ExecutionPayload.BlockNumber)
require.Equal(t, hexutil.Uint64(333), tracker.unsafeHead.ExecutionPayload.BlockNumber)
})
}
......
......@@ -70,6 +70,7 @@ func TestCommitAndRead(t *testing.T) {
// ExecutionPayloadEnvelope is expected to succeed when unmarshalling a blockV3
require.NoError(t, err)
unsafeHead := cons.LatestUnsafePayload()
unsafeHead, err := cons.LatestUnsafePayload()
require.NoError(t, err)
require.Equal(t, payload, unsafeHead)
}
......@@ -47,7 +47,7 @@ type API interface {
// APIs called by op-node
// Active returns true if op-conductor is active (not paused or stopped).
Active(ctx context.Context) (bool, error)
// CommitUnsafePayload commits a unsafe payload (latest head) to the consensus layer.
// CommitUnsafePayload commits an unsafe payload (latest head) to the consensus layer.
CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment