Commit 4a525b59 authored by zhiqiangxu's avatar zhiqiangxu Committed by GitHub

add `p2p.sync.onlyreqtostatic` flag to p2p flags (#11011)

* add p2p.sync.onlyreqtostatic flag to p2p flags

* fix for review
parent d283e9be
......@@ -14,46 +14,47 @@ func p2pEnv(envprefix, v string) []string {
}
var (
DisableP2PName = "p2p.disable"
NoDiscoveryName = "p2p.no-discovery"
ScoringName = "p2p.scoring"
PeerScoringName = "p2p.scoring.peers"
PeerScoreBandsName = "p2p.score.bands"
BanningName = "p2p.ban.peers"
BanningThresholdName = "p2p.ban.threshold"
BanningDurationName = "p2p.ban.duration"
TopicScoringName = "p2p.scoring.topics"
P2PPrivPathName = "p2p.priv.path"
P2PPrivRawName = "p2p.priv.raw"
ListenIPName = "p2p.listen.ip"
ListenTCPPortName = "p2p.listen.tcp"
ListenUDPPortName = "p2p.listen.udp"
AdvertiseIPName = "p2p.advertise.ip"
AdvertiseTCPPortName = "p2p.advertise.tcp"
AdvertiseUDPPortName = "p2p.advertise.udp"
BootnodesName = "p2p.bootnodes"
StaticPeersName = "p2p.static"
NetRestrictName = "p2p.netrestrict"
HostMuxName = "p2p.mux"
HostSecurityName = "p2p.security"
PeersLoName = "p2p.peers.lo"
PeersHiName = "p2p.peers.hi"
PeersGraceName = "p2p.peers.grace"
NATName = "p2p.nat"
UserAgentName = "p2p.useragent"
TimeoutNegotiationName = "p2p.timeout.negotiation"
TimeoutAcceptName = "p2p.timeout.accept"
TimeoutDialName = "p2p.timeout.dial"
PeerstorePathName = "p2p.peerstore.path"
DiscoveryPathName = "p2p.discovery.path"
SequencerP2PKeyName = "p2p.sequencer.key"
GossipMeshDName = "p2p.gossip.mesh.d"
GossipMeshDloName = "p2p.gossip.mesh.lo"
GossipMeshDhiName = "p2p.gossip.mesh.dhi"
GossipMeshDlazyName = "p2p.gossip.mesh.dlazy"
GossipFloodPublishName = "p2p.gossip.mesh.floodpublish"
SyncReqRespName = "p2p.sync.req-resp"
P2PPingName = "p2p.ping"
DisableP2PName = "p2p.disable"
NoDiscoveryName = "p2p.no-discovery"
ScoringName = "p2p.scoring"
PeerScoringName = "p2p.scoring.peers"
PeerScoreBandsName = "p2p.score.bands"
BanningName = "p2p.ban.peers"
BanningThresholdName = "p2p.ban.threshold"
BanningDurationName = "p2p.ban.duration"
TopicScoringName = "p2p.scoring.topics"
P2PPrivPathName = "p2p.priv.path"
P2PPrivRawName = "p2p.priv.raw"
ListenIPName = "p2p.listen.ip"
ListenTCPPortName = "p2p.listen.tcp"
ListenUDPPortName = "p2p.listen.udp"
AdvertiseIPName = "p2p.advertise.ip"
AdvertiseTCPPortName = "p2p.advertise.tcp"
AdvertiseUDPPortName = "p2p.advertise.udp"
BootnodesName = "p2p.bootnodes"
StaticPeersName = "p2p.static"
NetRestrictName = "p2p.netrestrict"
HostMuxName = "p2p.mux"
HostSecurityName = "p2p.security"
PeersLoName = "p2p.peers.lo"
PeersHiName = "p2p.peers.hi"
PeersGraceName = "p2p.peers.grace"
NATName = "p2p.nat"
UserAgentName = "p2p.useragent"
TimeoutNegotiationName = "p2p.timeout.negotiation"
TimeoutAcceptName = "p2p.timeout.accept"
TimeoutDialName = "p2p.timeout.dial"
PeerstorePathName = "p2p.peerstore.path"
DiscoveryPathName = "p2p.discovery.path"
SequencerP2PKeyName = "p2p.sequencer.key"
GossipMeshDName = "p2p.gossip.mesh.d"
GossipMeshDloName = "p2p.gossip.mesh.lo"
GossipMeshDhiName = "p2p.gossip.mesh.dhi"
GossipMeshDlazyName = "p2p.gossip.mesh.dlazy"
GossipFloodPublishName = "p2p.gossip.mesh.floodpublish"
SyncReqRespName = "p2p.sync.req-resp"
SyncOnlyReqToStaticName = "p2p.sync.onlyreqtostatic"
P2PPingName = "p2p.ping"
)
func deprecatedP2PFlags(envPrefix string) []cli.Flag {
......@@ -393,6 +394,14 @@ func P2PFlags(envPrefix string) []cli.Flag {
EnvVars: p2pEnv(envPrefix, "SYNC_REQ_RESP"),
Category: P2PCategory,
},
&cli.BoolFlag{
Name: SyncOnlyReqToStaticName,
Usage: "Configure P2P to forward RequestL2Range requests to static peers only.",
Value: false,
Required: false,
EnvVars: p2pEnv(envPrefix, "SYNC_ONLYREQTOSTATIC"),
Category: P2PCategory,
},
&cli.BoolFlag{
Name: P2PPingName,
Usage: "Enables P2P ping-pong background service",
......
......@@ -66,6 +66,7 @@ func NewConfig(ctx *cli.Context, rollupCfg *rollup.Config) (*p2p.Config, error)
conf.EnableReqRespSync = ctx.Bool(flags.SyncReqRespName)
conf.EnablePingService = ctx.Bool(flags.P2PPingName)
conf.SyncOnlyReqToStatic = ctx.Bool(flags.SyncOnlyReqToStaticName)
return conf, nil
}
......
......@@ -126,7 +126,8 @@ type Config struct {
// Underlying store that hosts connection-gater and peerstore data.
Store ds.Batching
EnableReqRespSync bool
EnableReqRespSync bool
SyncOnlyReqToStatic bool
EnablePingService bool
}
......
......@@ -18,6 +18,7 @@ import (
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/sec/insecure"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
......@@ -39,10 +40,16 @@ const (
staticPeerTag = "static"
)
type HostNewStream interface {
NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (network.Stream, error)
}
type ExtraHostFeatures interface {
host.Host
ConnectionGater() gating.BlockingConnectionGater
ConnectionManager() connmgr.ConnManager
IsStatic(peerID peer.ID) bool
SyncOnlyReqToStatic() bool
}
type extraHost struct {
......@@ -51,11 +58,14 @@ type extraHost struct {
connMgr connmgr.ConnManager
log log.Logger
staticPeers []*peer.AddrInfo
staticPeers []*peer.AddrInfo
staticPeerIDs map[peer.ID]struct{}
pinging *PingService
quitC chan struct{}
syncOnlyReqToStatic bool
}
func (e *extraHost) ConnectionGater() gating.BlockingConnectionGater {
......@@ -66,6 +76,15 @@ func (e *extraHost) ConnectionManager() connmgr.ConnManager {
return e.connMgr
}
func (e *extraHost) IsStatic(peerID peer.ID) bool {
_, exists := e.staticPeerIDs[peerID]
return exists
}
func (e *extraHost) SyncOnlyReqToStatic() bool {
return e.syncOnlyReqToStatic
}
func (e *extraHost) Close() error {
close(e.quitC)
if e.pinging != nil {
......@@ -236,6 +255,7 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host
}
staticPeers := make([]*peer.AddrInfo, 0, len(conf.StaticPeers))
staticPeerIDs := make(map[peer.ID]struct{})
for _, peerAddr := range conf.StaticPeers {
addr, err := peer.AddrInfoFromP2pAddr(peerAddr)
if err != nil {
......@@ -246,14 +266,17 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host
continue
}
staticPeers = append(staticPeers, addr)
staticPeerIDs[addr.ID] = struct{}{}
}
out := &extraHost{
Host: h,
connMgr: connMngr,
log: log,
staticPeers: staticPeers,
quitC: make(chan struct{}),
Host: h,
connMgr: connMngr,
log: log,
staticPeers: staticPeers,
staticPeerIDs: staticPeerIDs,
quitC: make(chan struct{}),
syncOnlyReqToStatic: conf.SyncOnlyReqToStatic,
}
if conf.EnablePingService {
......
......@@ -106,7 +106,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
}
// Activate the P2P req-resp sync if enabled by feature-flag.
if setup.ReqRespSyncEnabled() && !elSyncEnabled {
n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer)
n.syncCl = NewSyncClient(log, rollupCfg, n.host, gossipIn.OnUnsafeL2Payload, metrics, n.appScorer)
n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(nw network.Network, conn network.Conn) {
n.syncCl.AddPeer(conn.RemotePeer())
......
......@@ -276,9 +276,12 @@ type SyncClient struct {
// Don't allow anything to be added to the wait-group while, or after, we are shutting down.
// This is protected by peersLock.
closingPeers bool
extra ExtraHostFeatures
syncOnlyReqToStatic bool
}
func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rcv receivePayloadFn, metrics SyncClientMetrics, appScorer SyncPeerScorer) *SyncClient {
func NewSyncClient(log log.Logger, cfg *rollup.Config, host HostNewStream, rcv receivePayloadFn, metrics SyncClientMetrics, appScorer SyncPeerScorer) *SyncClient {
ctx, cancel := context.WithCancel(context.Background())
c := &SyncClient{
......@@ -286,7 +289,7 @@ func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rc
cfg: cfg,
metrics: metrics,
appScorer: appScorer,
newStreamFn: newStream,
newStreamFn: host.NewStream,
payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID),
peers: make(map[peer.ID]context.CancelFunc),
quarantineByNum: make(map[uint64]common.Hash),
......@@ -301,6 +304,10 @@ func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rc
resCancel: cancel,
receivePayload: rcv,
}
if extra, ok := host.(ExtraHostFeatures); ok && extra.SyncOnlyReqToStatic() {
c.extra = extra
c.syncOnlyReqToStatic = true
}
// never errors with positive LRU cache size
// TODO(CLI-3733): if we had an LRU based on on total payloads size, instead of payload count,
......@@ -556,6 +563,15 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
// so we don't be too aggressive to the server.
rl := rate.NewLimiter(peerServerBlocksRateLimit, peerServerBlocksBurst)
// if onlyReqToStatic is on, ensure that only static peers are dealing with the request
peerRequests := s.peerRequests
if s.syncOnlyReqToStatic && !s.extra.IsStatic(id) {
// for non-static peers, set peerRequests to nil
// this will effectively make the peer loop not perform outgoing sync-requests.
// while sync-requests will block, the loop may still process other events (if added in the future).
peerRequests = nil
}
for {
// wait for a global allocation to be available
if err := s.globalRL.Wait(ctx); err != nil {
......@@ -568,7 +584,7 @@ func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) {
// once the peer is available, wait for a sync request.
select {
case pr := <-s.peerRequests:
case pr := <-peerRequests:
if !s.activeRangeRequests.get(pr.rangeReqId) {
log.Debug("dropping cancelled p2p sync request", "num", pr.num)
s.inFlight.delete(pr.num)
......
......@@ -163,7 +163,7 @@ func TestSinglePeerSync(t *testing.T) {
hostA.SetStreamHandler(PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber)
// Setup host B as the client
cl := NewSyncClient(log.New("role", "client"), cfg, hostB.NewStream, receivePayload, metrics.NoopMetrics, &NoopApplicationScorer{})
cl := NewSyncClient(log.New("role", "client"), cfg, hostB, receivePayload, metrics.NoopMetrics, &NoopApplicationScorer{})
// Setup host B (client) to sync from its peer Host A (server)
cl.AddPeer(hostA.ID())
......@@ -224,7 +224,7 @@ func TestMultiPeerSync(t *testing.T) {
payloadByNumber := MakeStreamHandler(ctx, log.New("serve", "payloads_by_number"), srv.HandleSyncRequest)
h.SetStreamHandler(PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber)
cl := NewSyncClient(log.New("role", "client"), cfg, h.NewStream, receivePayload, metrics.NoopMetrics, &NoopApplicationScorer{})
cl := NewSyncClient(log.New("role", "client"), cfg, h, receivePayload, metrics.NoopMetrics, &NoopApplicationScorer{})
return cl, received
}
......@@ -356,7 +356,7 @@ func TestNetworkNotifyAddPeerAndRemovePeer(t *testing.T) {
require.NoError(t, err, "failed to launch host B")
defer hostB.Close()
syncCl := NewSyncClient(log, cfg, hostA.NewStream, func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayloadEnvelope) error {
syncCl := NewSyncClient(log, cfg, hostA, func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayloadEnvelope) error {
return nil
}, metrics.NoopMetrics, &NoopApplicationScorer{})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment