Commit f080fa3f authored by Andreas Bigger's avatar Andreas Bigger

feat: initial peer and topic scoring configs

parent 4e19f015
......@@ -145,14 +145,19 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams {
// NewGossipSub configures a new pubsub instance with the specified parameters.
// PubSub uses a GossipSubRouter as it's router under the hood.
func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, gossipConf GossipSetupConfigurables, m GossipMetricer) (*pubsub.PubSub, error) {
func NewGossipSub(p2pCtx context.Context, h host.Host, g ConnectionGater, cfg *rollup.Config, gossipConf GossipSetupConfigurables, m GossipMetricer) (*pubsub.PubSub, error) {
denyList, err := pubsub.NewTimeCachedBlacklist(30 * time.Second)
if err != nil {
return nil, err
}
params := BuildGlobalGossipParams(cfg)
peerScoreParams := NewPeerScoreParams()
// TODO: make this configurable behind a cli flag - disabled or default
peerScoreParams, err := GetPeerScoreParams("default")
if err != nil {
return nil, err
}
peerScoreThresholds := NewPeerScoreThresholds()
scorer := NewScorer(g, h.Peerstore(), m)
gossipOpts := []pubsub.Option{
pubsub.WithMaxMessageSize(maxGossipSize),
pubsub.WithMessageIdFn(BuildMsgIdFn(cfg)),
......@@ -168,23 +173,12 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, gossi
pubsub.WithGossipSubParams(params),
pubsub.WithEventTracer(&gossipTracer{m: m}),
pubsub.WithPeerScore(&peerScoreParams, &peerScoreThresholds),
pubsub.WithPeerScoreInspect(BuildPeerScoreInspector(m), peerScoreInspectFrequency),
pubsub.WithPeerScoreInspect(scorer.SnapshotHook(), peerScoreInspectFrequency),
}
gossipOpts = append(gossipOpts, gossipConf.ConfigureGossip(&params)...)
return pubsub.NewGossipSub(p2pCtx, h, gossipOpts...)
}
// BuildPeerScoreInspector returns a function that is called periodically by the pubsub library to inspect the peer scores.
// It is passed into the pubsub library as a [pubsub.ExtendedPeerScoreInspectFn] in the [pubsub.WithPeerScoreInspect] option.
// The returned [pubsub.ExtendedPeerScoreInspectFn] is called with a mapping of peer IDs to peer score snapshots.
func BuildPeerScoreInspector(metricer GossipMetricer) pubsub.ExtendedPeerScoreInspectFn {
return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) {
for id, s := range m {
metricer.RecordPeerScoring(id, s.Score)
}
}
}
func validationResultString(v pubsub.ValidationResult) string {
switch v {
case pubsub.ValidationAccept:
......@@ -443,11 +437,18 @@ func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log
}
go LogTopicEvents(p2pCtx, log.New("topic", "blocks"), blocksTopicEvents)
// TODO: block topic scoring parameters
// TODO: make this configurable behind a cli flag - disabled or default
// Set default block topic scoring parameters
// See prysm: https://github.com/prysmaticlabs/prysm/blob/develop/beacon-chain/p2p/gossip_scoring_params.go
// And research from lighthouse: https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c
// And docs: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#topic-parameter-calculation-and-decay
//err := blocksTopic.SetScoreParams(&pubsub.TopicScoreParams{......})
defaultTopicScoreParams, err := GetTopicScoreParams("default")
if err != nil {
return nil, fmt.Errorf("failed to create default topic score params: %w", err)
}
if err = blocksTopic.SetScoreParams(&defaultTopicScoreParams); err != nil {
return nil, fmt.Errorf("failed to set topic score params: %w", err)
}
subscription, err := blocksTopic.Subscribe()
if err != nil {
......
......@@ -32,6 +32,7 @@ type NodeP2P struct {
dv5Udp *discover.UDPv5 // p2p discovery service
gs *pubsub.PubSub // p2p gossip router
gsOut GossipOut // p2p gossip application interface for publishing
scorer Scorer // p2p peer scorer
}
// NewNodeP2P creates a new p2p node, and returns a reference to it. If the p2p is disabled, it returns nil.
......@@ -77,7 +78,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// unregister identify-push handler. Only identifying on dial is fine, and more robust against spam
n.host.RemoveStreamHandler(identify.IDDelta)
n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, metrics)
n.gs, err = NewGossipSub(resourcesCtx, n.host, n.gater, rollupCfg, setup, metrics)
if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err)
}
......
package p2p
import (
"fmt"
"math"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
)
// TODO: Update these parameters.
const (
slot = 2 * time.Second
epoch = 12 * time.Second
tenEpochs = 10 * epoch
oneHundredEpochs = 100 * epoch
decayToZero = 0.01
)
// ScoreDecay returns the decay factor for a given duration.
func ScoreDecay(duration time.Duration) float64 {
numOfTimes := duration / slot
return math.Pow(decayToZero, 1/float64(numOfTimes))
}
// DefaultPeerScoreParams is a default instantiation of [pubsub.PeerScoreParams].
// See [PeerScoreParams] for detailed documentation.
// Default parameters are loosely based on prysm's peer scoring parameters.
// See [PrysmPeerScoringParams] for more details.
//
// [PeerScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#PeerScoreParams
// [PrysmPeerScoringParams]: https://github.com/prysmaticlabs/prysm/blob/develop/beacon-chain/p2p/gossip_scoring_params.go#L72
var DefaultPeerScoreParams = pubsub.PeerScoreParams{
Topics: make(map[string]*pubsub.TopicScoreParams),
TopicScoreCap: 32.72,
AppSpecificScore: func(p peer.ID) float64 {
return 0
},
AppSpecificWeight: 1,
IPColocationFactorWeight: -35.11,
IPColocationFactorThreshold: 10,
IPColocationFactorWhitelist: nil,
BehaviourPenaltyWeight: -15.92,
BehaviourPenaltyThreshold: 6,
BehaviourPenaltyDecay: ScoreDecay(tenEpochs),
DecayInterval: slot,
DecayToZero: decayToZero,
RetainScore: oneHundredEpochs,
}
// DisabledPeerScoreParams is an instantiation of [pubsub.PeerScoreParams] where all scoring is disabled.
// See [PeerScoreParams] for detailed documentation.
//
// [PeerScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#PeerScoreParams
var DisabledPeerScoreParams = pubsub.PeerScoreParams{
Topics: make(map[string]*pubsub.TopicScoreParams),
TopicScoreCap: 0, // 0 represent no cap
AppSpecificScore: func(p peer.ID) float64 {
return 0
},
AppSpecificWeight: 1,
// ignore colocation scoring
IPColocationFactorWeight: 0,
IPColocationFactorWhitelist: nil,
// 0 disables the behaviour penalty
BehaviourPenaltyWeight: 0,
BehaviourPenaltyDecay: ScoreDecay(tenEpochs),
DecayInterval: slot,
DecayToZero: decayToZero,
RetainScore: oneHundredEpochs,
}
// PeerScoreParamsByName is a map of name to [pubsub.PeerScoreParams].
var PeerScoreParamsByName = map[string]pubsub.PeerScoreParams{
"default": DefaultPeerScoreParams,
"disabled": DisabledPeerScoreParams,
}
// AvailablePeerScoreParams returns a list of available peer score params.
// These can be used as an input to [GetPeerScoreParams] which returns the
// corresponding [pubsub.PeerScoreParams].
func AvailablePeerScoreParams() []string {
var params []string
for name := range PeerScoreParamsByName {
params = append(params, name)
}
return params
}
// GetPeerScoreParams returns the [pubsub.PeerScoreParams] for the given name.
func GetPeerScoreParams(name string) (pubsub.PeerScoreParams, error) {
params, ok := PeerScoreParamsByName[name]
if !ok {
return pubsub.PeerScoreParams{}, fmt.Errorf("invalid params %s", name)
}
return params, nil
}
// NewPeerScoreThresholds returns a default [pubsub.PeerScoreThresholds].
// See [PeerScoreThresholds] for detailed documentation.
//
// [PeerScoreThresholds]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#PeerScoreThresholds
func NewPeerScoreThresholds() pubsub.PeerScoreThresholds {
return pubsub.PeerScoreThresholds{
SkipAtomicValidation: false,
GossipThreshold: -10,
PublishThreshold: -40,
GraylistThreshold: -40,
AcceptPXThreshold: 20,
OpportunisticGraftThreshold: 0.05,
}
}
package p2p
import (
"math"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
)
// AppScoring scores peers based on application-specific metrics.
func AppScoring(p peer.ID) float64 {
return 0
}
// NewPeerScoreParams returns a default [pubsub.PeerScoreParams].
// See [PeerScoreParams] for detailed documentation.
//
// [PeerScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#PeerScoreParams
func NewPeerScoreParams() pubsub.PeerScoreParams {
return pubsub.PeerScoreParams{
SkipAtomicValidation: false,
Topics: make(map[string]*pubsub.TopicScoreParams),
TopicScoreCap: 100, // Aggregate topic score cap (0 for no cap).
AppSpecificScore: AppScoring,
AppSpecificWeight: 1,
IPColocationFactorWeight: -1,
IPColocationFactorThreshold: 1,
BehaviourPenaltyWeight: -1,
BehaviourPenaltyDecay: 0.999,
DecayInterval: 24 * time.Hour,
DecayToZero: 0.001,
RetainScore: math.MaxInt64, // We want to keep scores indefinitely - don't refresh on connect/disconnect
SeenMsgTTL: 0, // Defaults to global TimeCacheDuration when 0
}
}
// NewPeerScoreThresholds returns a default [pubsub.PeerScoreThresholds].
// See [PeerScoreThresholds] for detailed documentation.
//
// [PeerScoreThresholds]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#PeerScoreThresholds
func NewPeerScoreThresholds() pubsub.PeerScoreThresholds {
return pubsub.PeerScoreThresholds{
SkipAtomicValidation: false,
GossipThreshold: -10,
PublishThreshold: -40,
GraylistThreshold: -40,
AcceptPXThreshold: 20,
OpportunisticGraftThreshold: 0.05,
}
}
package p2p
import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
)
const ConnectionFactor = -10
const PeerScoreThreshold = -100
type scorer struct {
connGater ConnectionGater
peerStore peerstore.Peerstore
metricer GossipMetricer
}
// Scorer is a peer scorer that scores peers based on application-specific metrics.
type Scorer interface {
OnConnect()
OnDisconnect()
SnapshotHook() pubsub.ExtendedPeerScoreInspectFn
}
// NewScorer returns a new peer scorer.
func NewScorer(connGater ConnectionGater, peerStore peerstore.Peerstore, metricer GossipMetricer) Scorer {
return &scorer{
connGater: connGater,
peerStore: peerStore,
metricer: metricer,
}
}
// SnapshotHook returns a function that is called periodically by the pubsub library to inspect the peer scores.
// It is passed into the pubsub library as a [pubsub.ExtendedPeerScoreInspectFn] in the [pubsub.WithPeerScoreInspect] option.
// The returned [pubsub.ExtendedPeerScoreInspectFn] is called with a mapping of peer IDs to peer score snapshots.
func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) {
for id, snap := range m {
// Record peer score in the metricer
s.metricer.RecordPeerScoring(id, snap.Score)
// Check if the peer score is below the threshold
// If so, we need to block the peer
if snap.Score < PeerScoreThreshold {
s.connGater.BlockPeer(id)
}
// Unblock peers whose score has recovered to an acceptable level
if (snap.Score > PeerScoreThreshold) && contains(s.connGater.ListBlockedPeers(), id) {
s.connGater.UnblockPeer(id)
}
}
}
}
func contains(peers []peer.ID, id peer.ID) bool {
for _, v := range peers {
if v == id {
return true
}
}
return false
}
// call the two methods below from the notifier
// OnConnect is called when a peer connects.
// See [p2p.NotificationsMetricer] for invocation.
func (s *scorer) OnConnect() {
// record a connection
}
// OnDisconnect is called when a peer disconnects.
// See [p2p.NotificationsMetricer] for invocation.
func (s *scorer) OnDisconnect() {
// record a disconnection
}
func (s *scorer) inspectPeers(peersMap map[peer.ID]*pubsub.PeerScoreSnapshot) {
// peer := s.peerStore.Get(peer.ID)
// loop through each peer ID, get the score
// if the score < the configured threshold, ban the peer
// factor in the number of connections/disconnections into the score
// e.g., score = score - (s.peerConnections[peerID] * ConnectionFactor)
// s.connGater.BanAddr(peerID)
}
package p2p
import (
"fmt"
"math"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
// TODO: Update these parameters.
const (
beaconBlockWeight = 0.8
meshWeight = -0.717
invalidDecayPeriod = 50 * epoch
maxInMeshScore = 10
decayEpoch = time.Duration(5)
)
// DefaultTopicScoreParams is a default instantiation of [pubsub.TopicScoreParams].
// See [TopicScoreParams] for detailed documentation.
// Default parameters are loosely based on prysm's default block topic scoring parameters.
// See [PrysmTopicScoringParams] for more details.
//
// [TopicScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#TopicScoreParams
// [PrysmTopicScoringParams]: https://github.com/prysmaticlabs/prysm/blob/develop/beacon-chain/p2p/gossip_scoring_params.go#L169
var DefaultTopicScoreParams = pubsub.TopicScoreParams{
TopicWeight: beaconBlockWeight,
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: 1,
FirstMessageDeliveriesDecay: scoreDecay(20 * epoch),
FirstMessageDeliveriesCap: 23,
MeshMessageDeliveriesWeight: meshWeight,
MeshMessageDeliveriesDecay: scoreDecay(decayEpoch * epoch),
MeshMessageDeliveriesCap: float64(uint64(epoch/slot) * uint64(decayEpoch)),
MeshMessageDeliveriesThreshold: float64(uint64(epoch/slot) * uint64(decayEpoch) / 10),
MeshMessageDeliveriesWindow: 2 * time.Second,
MeshMessageDeliveriesActivation: 4 * epoch,
MeshFailurePenaltyWeight: meshWeight,
MeshFailurePenaltyDecay: scoreDecay(decayEpoch * epoch),
InvalidMessageDeliveriesWeight: -140.4475,
InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod),
}
// determines the decay rate from the provided time period till
// the decayToZero value. Ex: ( 1 -> 0.01)
func scoreDecay(duration time.Duration) float64 {
numOfTimes := duration / slot
return math.Pow(decayToZero, 1/float64(numOfTimes))
}
// denotes the unit time in mesh for scoring tallying.
func inMeshTime() time.Duration {
return 1 * slot
}
// the cap for `inMesh` time scoring.
func inMeshCap() float64 {
return float64((3600 * time.Second) / inMeshTime())
}
// DisabledTopicScoreParams is an instantiation of [pubsub.TopicScoreParams] where all scoring is disabled.
// See [TopicScoreParams] for detailed documentation.
//
// [TopicScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#TopicScoreParams
var DisabledTopicScoreParams = pubsub.TopicScoreParams{
TopicWeight: 0, // disabled
TimeInMeshWeight: 0, // disabled
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: 0, // disabled
FirstMessageDeliveriesDecay: scoreDecay(20 * epoch),
FirstMessageDeliveriesCap: 23,
MeshMessageDeliveriesWeight: 0, // disabled
MeshMessageDeliveriesDecay: scoreDecay(decayEpoch * epoch),
MeshMessageDeliveriesCap: float64(uint64(epoch/slot) * uint64(decayEpoch)),
MeshMessageDeliveriesThreshold: float64(uint64(epoch/slot) * uint64(decayEpoch) / 10),
MeshMessageDeliveriesWindow: 2 * time.Second,
MeshMessageDeliveriesActivation: 4 * epoch,
MeshFailurePenaltyWeight: 0, // disabled
MeshFailurePenaltyDecay: scoreDecay(decayEpoch * epoch),
InvalidMessageDeliveriesWeight: 0, // disabled
InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod),
}
// TopicScoreParamsByName is a map of name to [pubsub.TopicScoreParams].
var TopicScoreParamsByName = map[string]pubsub.TopicScoreParams{
"default": DefaultTopicScoreParams,
"disabled": DisabledTopicScoreParams,
}
// AvailableTopicScoreParams returns a list of available topic score params.
// These can be used as an input to [GetTopicScoreParams] which returns the
// corresponding [pubsub.TopicScoreParams].
func AvailableTopicScoreParams() []string {
var params []string
for name := range TopicScoreParamsByName {
params = append(params, name)
}
return params
}
// GetTopicScoreParams returns the [pubsub.TopicScoreParams] for the given name.
func GetTopicScoreParams(name string) (pubsub.TopicScoreParams, error) {
params, ok := TopicScoreParamsByName[name]
if !ok {
return pubsub.TopicScoreParams{}, fmt.Errorf("invalid topic params %s", name)
}
return params, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment