Commit 9b9ba1ea authored by Francis Li's avatar Francis Li Committed by GitHub

[op-conductor] improve start sequencer logic (#9145)

* op-conductor: improve start sequencer logic

* update

* update

* updated based on suggestion
parent 5e0dc463
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"math/rand" "math/rand"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -19,6 +20,7 @@ import ( ...@@ -19,6 +20,7 @@ import (
"github.com/ethereum-optimism/optimism/op-conductor/health" "github.com/ethereum-optimism/optimism/op-conductor/health"
conductorrpc "github.com/ethereum-optimism/optimism/op-conductor/rpc" conductorrpc "github.com/ethereum-optimism/optimism/op-conductor/rpc"
opp2p "github.com/ethereum-optimism/optimism/op-node/p2p" opp2p "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-service/cliapp" "github.com/ethereum-optimism/optimism/op-service/cliapp"
opclient "github.com/ethereum-optimism/optimism/op-service/client" opclient "github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/dial"
...@@ -567,6 +569,7 @@ func (oc *OpConductor) stopSequencer() error { ...@@ -567,6 +569,7 @@ func (oc *OpConductor) stopSequencer() error {
func (oc *OpConductor) startSequencer() error { func (oc *OpConductor) startSequencer() error {
oc.log.Info("starting sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load()) oc.log.Info("starting sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load())
ctx := context.Background()
// When starting sequencer, we need to make sure that the current node has the latest unsafe head from the consensus protocol // When starting sequencer, we need to make sure that the current node has the latest unsafe head from the consensus protocol
// If not, then we wait for the unsafe head to catch up or gossip it to op-node manually from op-conductor. // If not, then we wait for the unsafe head to catch up or gossip it to op-node manually from op-conductor.
...@@ -574,7 +577,7 @@ func (oc *OpConductor) startSequencer() error { ...@@ -574,7 +577,7 @@ func (oc *OpConductor) startSequencer() error {
if unsafeInCons == nil { if unsafeInCons == nil {
return errors.New("failed to get latest unsafe block from consensus") return errors.New("failed to get latest unsafe block from consensus")
} }
unsafeInNode, err := oc.ctrl.LatestUnsafeBlock(context.Background()) unsafeInNode, err := oc.ctrl.LatestUnsafeBlock(ctx)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to get latest unsafe block from EL during startSequencer phase") return errors.Wrap(err, "failed to get latest unsafe block from EL during startSequencer phase")
} }
...@@ -590,15 +593,20 @@ func (oc *OpConductor) startSequencer() error { ...@@ -590,15 +593,20 @@ func (oc *OpConductor) startSequencer() error {
if uint64(unsafeInCons.ExecutionPayload.BlockNumber)-unsafeInNode.NumberU64() == 1 { if uint64(unsafeInCons.ExecutionPayload.BlockNumber)-unsafeInNode.NumberU64() == 1 {
// tries to post the unsafe head to op-node when head is only 1 block behind (most likely due to gossip delay) // tries to post the unsafe head to op-node when head is only 1 block behind (most likely due to gossip delay)
if err = oc.ctrl.PostUnsafePayload(context.Background(), unsafeInCons); err != nil { if err = oc.ctrl.PostUnsafePayload(ctx, unsafeInCons); err != nil {
oc.log.Error("failed to post unsafe head payload envelope to op-node", "err", err) oc.log.Error("failed to post unsafe head payload envelope to op-node", "err", err)
} }
} }
return ErrUnsafeHeadMismarch // return error to allow retry return ErrUnsafeHeadMismarch // return error to allow retry
} }
if err := oc.ctrl.StartSequencer(context.Background(), unsafeInCons.ExecutionPayload.BlockHash); err != nil { if err = oc.ctrl.StartSequencer(ctx, unsafeInCons.ExecutionPayload.BlockHash); err != nil {
return errors.Wrap(err, "failed to start sequencer") // cannot directly compare using Errors.Is because the error is returned from an JSON RPC server which lost its type.
if !strings.Contains(err.Error(), driver.ErrSequencerAlreadyStarted.Error()) {
return fmt.Errorf("failed to start sequencer: %w", err)
} else {
oc.log.Warn("sequencer already started.", "err", err)
}
} }
oc.seqActive.Store(true) oc.seqActive.Store(true)
......
...@@ -22,6 +22,8 @@ import ( ...@@ -22,6 +22,8 @@ import (
"github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-service/retry"
) )
var ErrSequencerAlreadyStarted = errors.New("sequencer already running")
// Deprecated: use eth.SyncStatus instead. // Deprecated: use eth.SyncStatus instead.
type SyncStatus = eth.SyncStatus type SyncStatus = eth.SyncStatus
...@@ -412,7 +414,7 @@ func (s *Driver) eventLoop() { ...@@ -412,7 +414,7 @@ func (s *Driver) eventLoop() {
case resp := <-s.startSequencer: case resp := <-s.startSequencer:
unsafeHead := s.engineController.UnsafeL2Head().Hash unsafeHead := s.engineController.UnsafeL2Head().Hash
if !s.driverConfig.SequencerStopped { if !s.driverConfig.SequencerStopped {
resp.err <- errors.New("sequencer already running") resp.err <- ErrSequencerAlreadyStarted
} else if !bytes.Equal(unsafeHead[:], resp.hash[:]) { } else if !bytes.Equal(unsafeHead[:], resp.hash[:]) {
resp.err <- fmt.Errorf("block hash does not match: head %s, received %s", unsafeHead.String(), resp.hash.String()) resp.err <- fmt.Errorf("block hash does not match: head %s, received %s", unsafeHead.String(), resp.hash.String())
} else { } else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment