Commit c7b91ab2 authored by Matt Joiner's avatar Matt Joiner Committed by GitHub

Conductor and sequencer p2p refactoring (#11455)

* Shutdown sequencer before stopping p2p

* Check p2p isn't also disabled
Co-authored-by: default avatarSebastian Stammler <seb@oplabs.co>

* Remove missed time.Sleep

* Fix up use of SetupP2P.Disabled

* Revert error check after RPC boundary

* Add comment about context for StopSequencer

* Add Config.p2pEnabled

* op-node: Make Config.P2PEnabled public

---------
Co-authored-by: default avatarSebastian Stammler <seb@oplabs.co>
parent e53a86ac
......@@ -1511,4 +1511,3 @@ func (c *ConsolePrecompile) Log_59cfcbe3(p0 *big.Int, p1 *big.Int, p2 *big.Int,
func (c *ConsolePrecompile) Log_193fb800(p0 *big.Int, p1 *big.Int, p2 *big.Int, p3 *big.Int) {
c.log(p0, p1, p2, p3)
}
......@@ -647,9 +647,12 @@ func (oc *OpConductor) action() {
oc.log.Debug("exiting action with status and error", "status", status, "err", err)
if err != nil {
oc.log.Error("failed to execute step, queueing another one to retry", "err", err, "status", status)
time.Sleep(oc.retryBackoff())
oc.queueAction()
select {
case <-oc.shutdownCtx.Done():
case <-time.After(oc.retryBackoff()):
oc.log.Error("failed to execute step, queueing another one to retry", "err", err, "status", status)
oc.queueAction()
}
return
}
......@@ -683,18 +686,33 @@ func (oc *OpConductor) transferLeader() error {
}
func (oc *OpConductor) stopSequencer() error {
oc.log.Info("stopping sequencer", "server", oc.cons.ServerID(), "leader", oc.leader.Load(), "healthy", oc.healthy.Load(), "active", oc.seqActive.Load())
_, err := oc.ctrl.StopSequencer(context.Background())
if err != nil {
oc.log.Info(
"stopping sequencer",
"server", oc.cons.ServerID(),
"leader", oc.leader.Load(),
"healthy", oc.healthy.Load(),
"active", oc.seqActive.Load())
// Quoting (@zhwrd): StopSequencer is called after conductor loses leadership. In the event that
// the StopSequencer call fails, it actually has little real consequences because the sequencer
// cant produce a block and gossip / commit it to the raft log (requires leadership). Once
// conductor comes back up it will check its leader and sequencer state and attempt to stop the
// sequencer again. So it is "okay" to fail to stop a sequencer, the state will eventually be
// rectified and we won't have two active sequencers that are actually producing blocks.
//
// To that end we allow to cancel the StopSequencer call if we're shutting down.
latestHead, err := oc.ctrl.StopSequencer(oc.shutdownCtx)
if err == nil {
// None of the consensus state should have changed here so don't log it again.
oc.log.Info("stopped sequencer", "latestHead", latestHead)
} else {
if strings.Contains(err.Error(), driver.ErrSequencerAlreadyStopped.Error()) {
oc.log.Warn("sequencer already stopped.", "err", err)
oc.log.Warn("sequencer already stopped", "err", err)
} else {
return errors.Wrap(err, "failed to stop sequencer")
}
}
oc.metrics.RecordStopSequencer(err == nil)
oc.seqActive.Store(false)
return nil
}
......
......@@ -415,7 +415,7 @@ func (sys *System) Close() {
}
for name, node := range sys.RollupNodes {
if err := node.Stop(postCtx); err != nil && !errors.Is(err, rollupNode.ErrAlreadyClosed) {
if err := node.Stop(postCtx); err != nil && !errors.Is(err, rollupNode.ErrAlreadyClosed) && !errors.Is(err, postCtx.Err()) {
combinedErr = errors.Join(combinedErr, fmt.Errorf("stop rollup node %v: %w", name, err))
}
}
......
......@@ -172,3 +172,7 @@ func (cfg *Config) Check() error {
}
return nil
}
func (cfg *Config) P2PEnabled() bool {
return cfg.P2P != nil && !cfg.P2P.Disabled()
}
......@@ -8,6 +8,8 @@ import (
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup/sequencing"
"github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p/core/peer"
......@@ -40,6 +42,8 @@ type closableSafeDB interface {
}
type OpNode struct {
// Retain the config to test for active features rather than test for runtime state.
cfg *Config
log log.Logger
appVersion string
metrics *metrics.Metrics
......@@ -93,6 +97,7 @@ func New(ctx context.Context, cfg *Config, log log.Logger, appVersion string, m
}
n := &OpNode{
cfg: cfg,
log: log,
appVersion: appVersion,
metrics: m,
......@@ -134,7 +139,7 @@ func (n *OpNode) init(ctx context.Context, cfg *Config) error {
if err := n.initP2PSigner(ctx, cfg); err != nil {
return fmt.Errorf("failed to init the P2P signer: %w", err)
}
if err := n.initP2P(ctx, cfg); err != nil {
if err := n.initP2P(cfg); err != nil {
return fmt.Errorf("failed to init the P2P stack: %w", err)
}
// Only expose the server at the end, ensuring all RPC backend components are initialized.
......@@ -407,7 +412,7 @@ func (n *OpNode) initRPCServer(cfg *Config) error {
if err != nil {
return err
}
if n.p2pNode != nil {
if n.p2pEnabled() {
server.EnableP2P(p2p.NewP2PAPIBackend(n.p2pNode, n.log, n.metrics))
}
if cfg.RPC.EnableAdmin {
......@@ -454,14 +459,20 @@ func (n *OpNode) initPProf(cfg *Config) error {
return nil
}
func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
if cfg.P2P != nil {
func (n *OpNode) p2pEnabled() bool {
return n.cfg.P2PEnabled()
}
func (n *OpNode) initP2P(cfg *Config) (err error) {
if n.p2pNode != nil {
panic("p2p node already initialized")
}
if n.p2pEnabled() {
// TODO(protocol-quest/97): Use EL Sync instead of CL Alt sync for fetching missing blocks in the payload queue.
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false)
if err != nil || p2pNode == nil {
return err
n.p2pNode, err = p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics, false)
if err != nil {
return
}
n.p2pNode = p2pNode
if n.p2pNode.Dv5Udp() != nil {
go n.p2pNode.DiscoveryProcess(n.resourcesCtx, n.log, &cfg.Rollup, cfg.P2P.TargetPeers())
}
......@@ -469,15 +480,14 @@ func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
return nil
}
func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) error {
func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) (err error) {
// the p2p signer setup is optional
if cfg.P2PSigner == nil {
return nil
return
}
// p2pSigner may still be nil, the signer setup may not create any signer, the signer is optional
var err error
n.p2pSigner, err = cfg.P2PSigner.SetupSigner(ctx)
return err
return
}
func (n *OpNode) Start(ctx context.Context) error {
......@@ -533,7 +543,7 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa
n.tracer.OnPublishL2Payload(ctx, envelope)
// publish to p2p, if we are running p2p at all
if n.p2pNode != nil {
if n.p2pEnabled() {
payload := envelope.ExecutionPayload
if n.p2pSigner == nil {
return fmt.Errorf("node has no p2p signer, payload %s cannot be published", payload.ID())
......@@ -547,7 +557,7 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa
func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *eth.ExecutionPayloadEnvelope) error {
// ignore if it's from ourselves
if n.p2pNode != nil && from == n.p2pNode.Host().ID() {
if n.p2pEnabled() && from == n.p2pNode.Host().ID() {
return nil
}
......@@ -568,9 +578,13 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *
}
func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
if n.p2pNode != nil && n.p2pNode.AltSyncEnabled() {
if n.p2pEnabled() && n.p2pNode.AltSyncEnabled() {
if unixTimeStale(start.Time, 12*time.Hour) {
n.log.Debug("ignoring request to sync L2 range, timestamp is too old for p2p", "start", start, "end", end, "start_time", start.Time)
n.log.Debug(
"ignoring request to sync L2 range, timestamp is too old for p2p",
"start", start,
"end", end,
"start_time", start.Time)
return nil
}
return n.p2pNode.RequestL2Range(ctx, start, end)
......@@ -606,10 +620,26 @@ func (n *OpNode) Stop(ctx context.Context) error {
result = multierror.Append(result, fmt.Errorf("failed to close RPC server: %w", err))
}
}
// Stop sequencer and report last hash. l2Driver can be nil if we're cleaning up a failed init.
if n.l2Driver != nil {
latestHead, err := n.l2Driver.StopSequencer(ctx)
switch {
case errors.Is(err, sequencing.ErrSequencerNotEnabled):
case errors.Is(err, driver.ErrSequencerAlreadyStopped):
n.log.Info("stopping node: sequencer already stopped", "latestHead", latestHead)
case err == nil:
n.log.Info("stopped sequencer", "latestHead", latestHead)
default:
result = multierror.Append(result, fmt.Errorf("error stopping sequencer: %w", err))
}
}
if n.p2pNode != nil {
if err := n.p2pNode.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %w", err))
}
// Prevent further use of p2p.
n.p2pNode = nil
}
if n.p2pSigner != nil {
if err := n.p2pSigner.Close(); err != nil {
......
......@@ -48,6 +48,7 @@ type HostMetrics interface {
// SetupP2P provides a host and discovery service for usage in the rollup node.
type SetupP2P interface {
Check() error
// Looks like this was started to prevent partially inited p2p.
Disabled() bool
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error)
......
......@@ -52,10 +52,23 @@ type NodeP2P struct {
// NewNodeP2P creates a new p2p node, and returns a reference to it. If the p2p is disabled, it returns nil.
// If metrics are configured, a bandwidth monitor will be spawned in a goroutine.
func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer, elSyncEnabled bool) (*NodeP2P, error) {
func NewNodeP2P(
resourcesCtx context.Context,
rollupCfg *rollup.Config,
log log.Logger,
setup SetupP2P,
gossipIn GossipIn,
l2Chain L2Chain,
runCfg GossipRuntimeConfig,
metrics metrics.Metricer,
elSyncEnabled bool,
) (*NodeP2P, error) {
if setup == nil {
return nil, errors.New("p2p node cannot be created without setup")
}
if setup.Disabled() {
return nil, errors.New("SetupP2P.Disabled is true")
}
var n NodeP2P
if err := n.init(resourcesCtx, rollupCfg, log, setup, gossipIn, l2Chain, runCfg, metrics, elSyncEnabled); err != nil {
closeErr := n.Close()
......@@ -65,12 +78,24 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.
return nil, err
}
if n.host == nil {
return nil, nil
// See prior comment about n.host optionality:
// TODO(CLI-4016): host is not optional, NodeP2P as a whole is.
panic("host is not optional if p2p is enabled")
}
return &n, nil
}
func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer, elSyncEnabled bool) error {
func (n *NodeP2P) init(
resourcesCtx context.Context,
rollupCfg *rollup.Config,
log log.Logger,
setup SetupP2P,
gossipIn GossipIn,
l2Chain L2Chain,
runCfg GossipRuntimeConfig,
metrics metrics.Metricer,
elSyncEnabled bool,
) error {
bwc := p2pmetrics.NewBandwidthCounter()
n.log = log
......@@ -85,86 +110,83 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
return fmt.Errorf("failed to start p2p host: %w", err)
}
// TODO(CLI-4016): host is not optional, NodeP2P as a whole is. This if statement is wrong
if n.host != nil {
// Enable extra features, if any. During testing we don't setup the most advanced host all the time.
if extra, ok := n.host.(ExtraHostFeatures); ok {
n.gater = extra.ConnectionGater()
n.connMgr = extra.ConnectionManager()
}
eps, ok := n.host.Peerstore().(store.ExtendedPeerstore)
if !ok {
return fmt.Errorf("cannot init without extended peerstore: %w", err)
}
n.store = eps
scoreParams := setup.PeerScoringParams()
// Enable extra features, if any. During testing we don't setup the most advanced host all the time.
if extra, ok := n.host.(ExtraHostFeatures); ok {
n.gater = extra.ConnectionGater()
n.connMgr = extra.ConnectionManager()
}
eps, ok := n.host.Peerstore().(store.ExtendedPeerstore)
if !ok {
return fmt.Errorf("cannot init without extended peerstore: %w", err)
}
n.store = eps
scoreParams := setup.PeerScoringParams()
if scoreParams != nil {
n.appScorer = newPeerApplicationScorer(resourcesCtx, log, clock.SystemClock, &scoreParams.ApplicationScoring, eps, n.host.Network().Peers)
} else {
n.appScorer = &NoopApplicationScorer{}
}
// Activate the P2P req-resp sync if enabled by feature-flag.
if setup.ReqRespSyncEnabled() && !elSyncEnabled {
n.syncCl = NewSyncClient(log, rollupCfg, n.host, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer)
n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(nw network.Network, conn network.Conn) {
n.syncCl.AddPeer(conn.RemotePeer())
},
DisconnectedF: func(nw network.Network, conn network.Conn) {
// only when no connection is available, we can remove the peer
if nw.Connectedness(conn.RemotePeer()) == network.NotConnected {
n.syncCl.RemovePeer(conn.RemotePeer())
}
},
})
n.syncCl.Start()
// the host may already be connected to peers, add them all to the sync client
for _, peerID := range n.host.Network().Peers() {
n.syncCl.AddPeer(peerID)
}
if l2Chain != nil { // Only enable serving side of req-resp sync if we have a data-source, to make minimal P2P testing easy
n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics)
// register the sync protocol with libp2p host
payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest)
n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
}
if scoreParams != nil {
n.appScorer = newPeerApplicationScorer(resourcesCtx, log, clock.SystemClock, &scoreParams.ApplicationScoring, eps, n.host.Network().Peers)
} else {
n.appScorer = &NoopApplicationScorer{}
}
// Activate the P2P req-resp sync if enabled by feature-flag.
if setup.ReqRespSyncEnabled() && !elSyncEnabled {
n.syncCl = NewSyncClient(log, rollupCfg, n.host, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer)
n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(nw network.Network, conn network.Conn) {
n.syncCl.AddPeer(conn.RemotePeer())
},
DisconnectedF: func(nw network.Network, conn network.Conn) {
// only when no connection is available, we can remove the peer
if nw.Connectedness(conn.RemotePeer()) == network.NotConnected {
n.syncCl.RemovePeer(conn.RemotePeer())
}
},
})
n.syncCl.Start()
// the host may already be connected to peers, add them all to the sync client
for _, peerID := range n.host.Network().Peers() {
n.syncCl.AddPeer(peerID)
}
n.scorer = NewScorer(rollupCfg, eps, metrics, n.appScorer, log)
// notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log)
if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err)
if l2Chain != nil { // Only enable serving side of req-resp sync if we have a data-source, to make minimal P2P testing easy
n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics)
// register the sync protocol with libp2p host
payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest)
n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
}
n.gsOut, err = JoinGossip(n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
if err != nil {
return fmt.Errorf("failed to join blocks gossip topic: %w", err)
}
log.Info("started p2p host", "addrs", n.host.Addrs(), "peerID", n.host.ID().String())
}
n.scorer = NewScorer(rollupCfg, eps, metrics, n.appScorer, log)
// notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log)
if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err)
}
n.gsOut, err = JoinGossip(n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
if err != nil {
return fmt.Errorf("failed to join blocks gossip topic: %w", err)
}
log.Info("started p2p host", "addrs", n.host.Addrs(), "peerID", n.host.ID().String())
tcpPort, err := FindActiveTCPPort(n.host)
if err != nil {
log.Warn("failed to find what TCP port p2p is binded to", "err", err)
}
tcpPort, err := FindActiveTCPPort(n.host)
if err != nil {
log.Warn("failed to find what TCP port p2p is binded to", "err", err)
}
// All nil if disabled.
n.dv5Local, n.dv5Udp, err = setup.Discovery(log.New("p2p", "discv5"), rollupCfg, tcpPort)
if err != nil {
return fmt.Errorf("failed to start discv5: %w", err)
}
// All nil if disabled.
n.dv5Local, n.dv5Udp, err = setup.Discovery(log.New("p2p", "discv5"), rollupCfg, tcpPort)
if err != nil {
return fmt.Errorf("failed to start discv5: %w", err)
}
if metrics != nil {
go metrics.RecordBandwidth(resourcesCtx, bwc)
}
if metrics != nil {
go metrics.RecordBandwidth(resourcesCtx, bwc)
}
if setup.BanPeers() {
n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, setup.BanThreshold(), setup.BanDuration())
n.peerMonitor.Start()
}
n.appScorer.start()
if setup.BanPeers() {
n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, setup.BanThreshold(), setup.BanDuration())
n.peerMonitor.Start()
}
n.appScorer.start()
return nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment