Commit 8ba2e1e1 authored by Francis Li's avatar Francis Li Committed by GitHub

[op-conductor] part 2 - core control logic (#8854)

* Implement main control logic

* Add more tests
parent fe6dfa6f
......@@ -3,12 +3,15 @@ package conductor
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/raft"
"github.com/pkg/errors"
"github.com/ethereum-optimism/optimism/op-conductor/client"
......@@ -136,6 +139,7 @@ func (c *OpConductor) initConsensus(ctx context.Context) error {
return errors.Wrap(err, "failed to create raft consensus")
}
c.cons = cons
c.leaderUpdateCh = c.cons.LeaderCh()
return nil
}
......@@ -165,6 +169,7 @@ func (c *OpConductor) initHealthMonitor(ctx context.Context) error {
node,
p2p,
)
c.healthUpdateCh = c.hmon.Subscribe()
return nil
}
......@@ -191,7 +196,9 @@ type OpConductor struct {
healthy atomic.Bool
seqActive atomic.Bool
actionFn func() // actionFn defines the action to be executed to bring the sequencer to the desired state.
healthUpdateCh <-chan bool
leaderUpdateCh <-chan bool
actionFn func() // actionFn defines the action to be executed to bring the sequencer to the desired state.
wg sync.WaitGroup
pauseCh chan struct{}
......@@ -225,8 +232,12 @@ func (oc *OpConductor) Start(ctx context.Context) error {
// Stop implements cliapp.Lifecycle.
func (oc *OpConductor) Stop(ctx context.Context) error {
oc.log.Info("stopping OpConductor")
if oc.Stopped() {
oc.log.Info("OpConductor already stopped")
return nil
}
oc.log.Info("stopping OpConductor")
var result *multierror.Error
// close control loop
......@@ -286,16 +297,14 @@ func (oc *OpConductor) Paused() bool {
func (oc *OpConductor) loop() {
defer oc.wg.Done()
healthUpdate := oc.hmon.Subscribe()
leaderUpdate := oc.cons.LeaderCh()
for {
select {
// We process status update (health, leadership) first regardless of the paused state.
// This way we could properly bring the sequencer to the desired state when resumed.
case healthy := <-healthUpdate:
case healthy := <-oc.healthUpdateCh:
oc.handleHealthUpdate(healthy)
case leader := <-leaderUpdate:
case leader := <-oc.leaderUpdateCh:
oc.handleLeaderUpdate(leader)
case <-oc.pauseCh:
oc.paused.Store(true)
......@@ -349,5 +358,119 @@ func (oc *OpConductor) action() {
return
}
// TODO: (https://github.com/ethereum-optimism/protocol-quest/issues/47) implement
var err error
// exhaust all cases below for completeness, 3 state, 8 cases.
switch status := struct{ leader, healthy, active bool }{oc.leader.Load(), oc.healthy.Load(), oc.seqActive.Load()}; {
case !status.leader && !status.healthy && !status.active:
// if follower is not healthy and not sequencing, just log an error
oc.log.Error("server (follower) is not healthy", "server", oc.cons.ServerID())
case !status.leader && !status.healthy && status.active:
// sequencer is not leader, not healthy, but it is sequencing, stop it
err = oc.stopSequencer()
case !status.leader && status.healthy && !status.active:
// normal follower, do nothing
case !status.leader && status.healthy && status.active:
// stop sequencer, this happens when current server steps down as leader.
err = oc.stopSequencer()
case status.leader && !status.healthy && !status.active:
// transfer leadership to another node
err = oc.transferLeader()
case status.leader && !status.healthy && status.active:
var result *multierror.Error
// Try to stop sequencer first, but since sequencer is not healthy, we may not be able to stop it.
// In this case, it's fine to continue to try to transfer leadership to another server. This is safe because
// 1. if leadership transfer succeeded, then we'll retry and enter case !status.leader && status.healthy && status.active, which will try to stop sequencer.
// 2. even if the retry continues to fail and current server stays in active sequencing mode, it would be safe because our hook in op-node will prevent it from committing any new blocks to the network via p2p (if it's not leader any more)
if e := oc.stopSequencer(); e != nil {
result = multierror.Append(result, e)
}
// try to transfer leadership to another server despite if sequencer is stopped or not. There are 4 scenarios here:
// 1. [sequencer stopped, leadership transfer succeeded] which is the happy case and we handed over sequencing to another server.
// 2. [sequencer stopped, leadership transfer failed] we'll enter into case status.leader && !status.healthy && !status.active and retry transfer leadership.
// 3. [sequencer active, leadership transfer succeeded] we'll enter into case !status.leader && status.healthy && status.active and retry stop sequencer.
// 4. [sequencer active, leadership transfer failed] we're in the same state and will retry here again.
if e := oc.transferLeader(); e != nil {
result = multierror.Append(result, e)
}
err = result.ErrorOrNil()
case status.leader && status.healthy && !status.active:
// start sequencer
err = oc.startSequencer()
case status.leader && status.healthy && status.active:
// normal leader, do nothing
}
if err != nil {
oc.log.Error("failed to execute step, queueing another one to retry", "err", err)
// randomly sleep for 0-200ms to avoid excessive retry
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
oc.queueAction()
}
}
// transferLeader tries to transfer leadership to another server.
func (oc *OpConductor) transferLeader() error {
// TransferLeader here will do round robin to try to transfer leadership to the next healthy node.
err := oc.cons.TransferLeader()
if err == nil {
oc.leader.Store(false)
return nil // success
}
switch {
case errors.Is(err, raft.ErrNotLeader):
// This node is not the leader, do nothing.
oc.log.Warn("cannot transfer leadership since current server is not the leader")
return nil
default:
oc.log.Error("failed to transfer leadership", "err", err)
return err
}
}
func (oc *OpConductor) stopSequencer() error {
oc.log.Info("stopping sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load())
if _, err := oc.ctrl.StopSequencer(context.Background()); err != nil {
return errors.Wrap(err, "failed to stop sequencer")
}
oc.seqActive.Store(false)
return nil
}
func (oc *OpConductor) startSequencer() error {
oc.log.Info("starting sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load())
// When starting sequencer, we need to make sure that the current node has the latest unsafe head from the consensus protocol
// If not, then we wait for the unsafe head to catch up or gossip it to op-node manually from op-conductor.
unsafeInCons := oc.cons.LatestUnsafePayload()
unsafeInNode, err := oc.ctrl.LatestUnsafeBlock(context.Background())
if err != nil {
return errors.Wrap(err, "failed to get latest unsafe block from EL during startSequencer phase")
}
if unsafeInCons.BlockHash != unsafeInNode.Hash() {
oc.log.Warn(
"latest unsafe block in consensus is not the same as the one in op-node",
"consensus_hash", unsafeInCons.BlockHash,
"consensus_block_num", unsafeInCons.BlockNumber,
"node_hash", unsafeInNode.Hash(),
"node_block_num", unsafeInNode.NumberU64(),
)
if uint64(unsafeInCons.BlockNumber)-unsafeInNode.NumberU64() == 1 {
// tries to post the unsafe head to op-node when head is only 1 block behind (most likely due to gossip delay)
if err = oc.ctrl.PostUnsafePayload(context.Background(), &unsafeInCons); err != nil {
oc.log.Error("failed to post unsafe head payload to op-node", "err", err)
}
}
return ErrUnsafeHeadMismarch // return error to allow retry
}
if err := oc.ctrl.StartSequencer(context.Background(), unsafeInCons.BlockHash); err != nil {
return errors.Wrap(err, "failed to start sequencer")
}
oc.seqActive.Store(true)
return nil
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment