Commit 2055f3bc authored by Andreas Bigger's avatar Andreas Bigger

pull topic scoring into cli

parent 4db32f78
......@@ -25,14 +25,23 @@ var (
Required: false,
EnvVar: p2pEnv("NO_DISCOVERY"),
}
P2PScoring = cli.StringFlag{
Name: "p2p.scoring",
Usage: "Sets the scoring strategy for the P2P stack. " +
PeerScoring = cli.StringFlag{
Name: "p2p.scoring.peers",
Usage: "Sets the peer scoring strategy for the P2P stack. " +
"Can be one of: none, light, full." +
"Custom scoring strategies can be defined in the config file.",
Required: false,
Value: "none",
EnvVar: p2pEnv("SCORING"),
EnvVar: p2pEnv("PEER_SCORING"),
}
TopicScoring = cli.StringFlag{
Name: "p2p.scoring.topics",
Usage: "Sets the topic scoring strategy. " +
"Can be one of: none, light, full." +
"Custom scoring strategies can be defined in the config file.",
Required: false,
Value: "none",
EnvVar: p2pEnv("TOPIC_SCORING"),
}
P2PPrivPath = cli.StringFlag{
Name: "p2p.priv.path",
......
......@@ -54,8 +54,12 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) {
return nil, fmt.Errorf("failed to load p2p gossip options: %w", err)
}
if err := loadScoringOpts(conf, ctx, blockTime); err != nil {
return nil, fmt.Errorf("failed to load p2p scoring options: %w", err)
if err := loadPeerScoringParams(conf, ctx, blockTime); err != nil {
return nil, fmt.Errorf("failed to load p2p peer scoring options: %w", err)
}
if err := loadTopicScoringParams(conf, ctx, blockTime); err != nil {
return nil, fmt.Errorf("failed to load p2p topic scoring options: %w", err)
}
conf.ConnGater = p2p.DefaultConnGater
......@@ -77,9 +81,30 @@ func validatePort(p uint) (uint16, error) {
return uint16(p), nil
}
// loadScoringOpts loads the scoring options from the CLI context.
func loadScoringOpts(conf *p2p.Config, ctx *cli.Context, blockTime uint64) error {
scoringLevel := ctx.GlobalString(flags.P2PScoring.Name)
// loadTopicScoringParams loads the topic scoring options from the CLI context.
func loadTopicScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64) error {
scoringLevel := ctx.GlobalString(flags.TopicScoring.Name)
if scoringLevel == "" {
scoringLevel = "none"
}
// Set default block topic scoring parameters
// See prysm: https://github.com/prysmaticlabs/prysm/blob/develop/beacon-chain/p2p/gossip_scoring_params.go
// And research from lighthouse: https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c
// And docs: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#topic-parameter-calculation-and-decay
topicScoreParams, err := p2p.GetTopicScoreParams(scoringLevel, blockTime)
if err != nil {
return err
}
conf.TopicScoring = topicScoreParams
return nil
}
// loadPeerScoringParams loads the scoring options from the CLI context.
func loadPeerScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64) error {
scoringLevel := ctx.GlobalString(flags.PeerScoring.Name)
if scoringLevel == "" {
scoringLevel = "none"
}
......@@ -301,6 +326,8 @@ func parsePriv(data string) (*crypto.Secp256k1PrivateKey, error) {
return (p).(*crypto.Secp256k1PrivateKey), nil
}
func loadGossipOptions(conf *p2p.Config, ctx *cli.Context) error {
conf.MeshD = ctx.GlobalInt(flags.GossipMeshDFlag.Name)
conf.MeshDLo = ctx.GlobalInt(flags.GossipMeshDloFlag.Name)
......
......@@ -50,8 +50,9 @@ type Config struct {
DisableP2P bool
NoDiscovery bool
// Pubsub P2P Scoring Configurations
// Pubsub Scoring Parameters
PeerScoring pubsub.PeerScoreParams
TopicScoring pubsub.TopicScoreParams
ListenIP net.IP
ListenTCPPort uint16
......@@ -143,10 +144,14 @@ func (conf *Config) Disabled() bool {
return conf.DisableP2P
}
func (conf *Config) Scoring() *pubsub.PeerScoreParams {
func (conf *Config) PeerScoringParams() *pubsub.PeerScoreParams {
return &conf.PeerScoring
}
func (conf *Config) TopicScoringParams() *pubsub.TopicScoreParams {
return &conf.TopicScoring
}
const maxMeshParam = 1000
func (conf *Config) Check() error {
......
......@@ -41,7 +41,7 @@ const (
DefaultMeshDhi = 12 // topic stable mesh high watermark
DefaultMeshDlazy = 6 // gossip target
// peerScoreInspectFrequency is the frequency at which peer scores are inspected
peerScoreInspectFrequency = 1 * time.Minute
peerScoreInspectFrequency = 15 * time.Second
)
// Message domains, the msg id function uncompresses to keep data monomorphic,
......@@ -51,7 +51,8 @@ var MessageDomainInvalidSnappy = [4]byte{0, 0, 0, 0}
var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
type GossipSetupConfigurables interface {
Scoring() *pubsub.PeerScoreParams
PeerScoringParams() *pubsub.PeerScoreParams
TopicScoringParams() *pubsub.TopicScoreParams
ConfigureGossip(params *pubsub.GossipSubParams) []pubsub.Option
}
......@@ -154,8 +155,6 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, g ConnectionGater, cfg *r
return nil, err
}
params := BuildGlobalGossipParams(cfg)
peerScoreThresholds := NewPeerScoreThresholds()
scorer := NewScorer(g, h.Peerstore(), m, log)
gossipOpts := []pubsub.Option{
pubsub.WithMaxMessageSize(maxGossipSize),
pubsub.WithMessageIdFn(BuildMsgIdFn(cfg)),
......@@ -170,9 +169,8 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, g ConnectionGater, cfg *r
pubsub.WithBlacklist(denyList),
pubsub.WithGossipSubParams(params),
pubsub.WithEventTracer(&gossipTracer{m: m}),
pubsub.WithPeerScore(gossipConf.Scoring(), &peerScoreThresholds),
pubsub.WithPeerScoreInspect(scorer.SnapshotHook(), peerScoreInspectFrequency),
}
gossipOpts = append(gossipOpts, ConfigurePeerScoring(h, g, gossipConf, m, log)...)
gossipOpts = append(gossipOpts, gossipConf.ConfigureGossip(&params)...)
return pubsub.NewGossipSub(p2pCtx, h, gossipOpts...)
}
......@@ -439,7 +437,7 @@ func (p *publisher) Close() error {
return p.blocksTopic.Close()
}
func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) {
func JoinGossip(p2pCtx context.Context, self peer.ID, topicScoreParams *pubsub.TopicScoreParams, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) {
val := guardGossipValidator(log, logValidationResult(self, "validated block", log, BuildBlocksValidator(log, cfg, runCfg)))
blocksTopicName := blocksTopicV1(cfg)
err := ps.RegisterTopicValidator(blocksTopicName,
......@@ -459,16 +457,7 @@ func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log
}
go LogTopicEvents(p2pCtx, log.New("topic", "blocks"), blocksTopicEvents)
// TODO: make this configurable behind a cli flag - disabled or default
// Set default block topic scoring parameters
// See prysm: https://github.com/prysmaticlabs/prysm/blob/develop/beacon-chain/p2p/gossip_scoring_params.go
// And research from lighthouse: https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c
// And docs: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#topic-parameter-calculation-and-decay
defaultTopicScoreParams, err := GetTopicScoreParams("default", cfg)
if err != nil {
return nil, fmt.Errorf("failed to create default topic score params: %w", err)
}
if err = blocksTopic.SetScoreParams(&defaultTopicScoreParams); err != nil {
if err = blocksTopic.SetScoreParams(topicScoreParams); err != nil {
return nil, fmt.Errorf("failed to set topic score params: %w", err)
}
......
......@@ -81,7 +81,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
return fmt.Errorf("failed to start gossipsub router: %w", err)
}
n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), setup.TopicScoringParams(), n.gs, log, rollupCfg, runCfg, gossipIn)
if err != nil {
return fmt.Errorf("failed to join blocks gossip topic: %w", err)
}
......
......@@ -33,7 +33,6 @@ type Peerstore interface {
Peers() peer.IDSlice
}
// Scorer is a peer scorer that scores peers based on application-specific metrics.
type Scorer interface {
OnConnect()
......
......@@ -3,6 +3,8 @@ package p2p_test
import (
"testing"
peer "github.com/libp2p/go-libp2p/core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
p2p "github.com/ethereum-optimism/optimism/op-node/p2p"
log "github.com/ethereum/go-ethereum/log"
node "github.com/ethereum-optimism/optimism/op-node/node"
......@@ -61,3 +63,58 @@ func (testSuite *PeerScorerTestSuite) TestPeerScorerOnDisconnect() {
)
scorer.OnDisconnect()
}
// TestSnapshotHook tests running the snapshot hook on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestSnapshotHook() {
scorer := p2p.NewScorer(
testSuite.mockGater,
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.logger,
)
inspectFn := scorer.SnapshotHook()
// Mock the snapshot updates
// This doesn't return anything
testSuite.mockMetricer.On("RecordPeerScoring", peer.ID("peer1"), float64(-100)).Return(nil)
// Since the peer score is not below the [PeerScoreThreshold] of -100,
// no connection gater method should be called since the peer isn't already blocked
// Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
peer.ID("peer1"): &pubsub.PeerScoreSnapshot{
Score: -100,
},
}
inspectFn(snapshotMap)
}
// TestSnapshotHookBlockPeer tests running the snapshot hook on the peer scorer with a peer score below the threshold.
// This implies that the peer should be blocked.
func (testSuite *PeerScorerTestSuite) TestSnapshotHookBlockPeer() {
scorer := p2p.NewScorer(
testSuite.mockGater,
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.logger,
)
inspectFn := scorer.SnapshotHook()
// Mock the snapshot updates
// This doesn't return anything
testSuite.mockMetricer.On("RecordPeerScoring", peer.ID("peer1"), float64(-101)).Return(nil)
// Mock a connection gater peer block call
// Since the peer score is below the [PeerScoreThreshold] of -100,
// the [BlockPeer] method should be called
testSuite.mockGater.On("BlockPeer", peer.ID("peer1")).Return(nil)
// Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
peer.ID("peer1"): &pubsub.PeerScoreSnapshot{
Score: -101,
},
}
inspectFn(snapshotMap)
}
package p2p
import (
log "github.com/ethereum/go-ethereum/log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
host "github.com/libp2p/go-libp2p/core/host"
)
// ConfigurePeerScoring configures the peer scoring parameters for the pubsub
func ConfigurePeerScoring(h host.Host, g ConnectionGater, gossipConf GossipSetupConfigurables, m GossipMetricer, log log.Logger) []pubsub.Option {
// If we want to completely disable scoring config here, we can use the [peerScoringParams]
// to return early without returning any [pubsub.Option].
peerScoreParams := gossipConf.PeerScoringParams()
peerScoreThresholds := NewPeerScoreThresholds()
scorer := NewScorer(g, h.Peerstore(), m, log)
opts := []pubsub.Option{
pubsub.WithPeerScore(peerScoreParams, &peerScoreThresholds),
pubsub.WithPeerScoreInspect(scorer.SnapshotHook(), peerScoreInspectFrequency),
}
return opts
}
......@@ -64,7 +64,11 @@ func (p *Prepared) ConfigureGossip(params *pubsub.GossipSubParams) []pubsub.Opti
return nil
}
func (p *Prepared) Scoring() *pubsub.PeerScoreParams {
func (p *Prepared) PeerScoringParams() *pubsub.PeerScoreParams {
return nil
}
func (p *Prepared) TopicScoringParams() *pubsub.TopicScoreParams {
return nil
}
......
......@@ -4,7 +4,6 @@ import (
"fmt"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
......@@ -21,13 +20,11 @@ const DecayEpoch = time.Duration(5)
// See [TopicScoreParams] for detailed documentation.
//
// [TopicScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#TopicScoreParams
var DefaultTopicScoreParams = func(cfg *rollup.Config) pubsub.TopicScoreParams {
slot := time.Duration(cfg.BlockTime) * time.Second
var DefaultTopicScoreParams = func(blockTime uint64) pubsub.TopicScoreParams {
slot := time.Duration(blockTime) * time.Second
if slot == 0 {
slot = 2 * time.Second
}
// TODO: tune these params
// TODO: we initialize an "epoch" as 6 blocks suggesting 6 blocks, each taking ~ 2 seconds, is 12 seconds
epoch := 6 * slot
invalidDecayPeriod := 50 * epoch
return pubsub.TopicScoreParams{
......@@ -60,10 +57,11 @@ func inMeshCap(slot time.Duration) float64 {
// See [TopicScoreParams] for detailed documentation.
//
// [TopicScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#TopicScoreParams
var DisabledTopicScoreParams = func(cfg *rollup.Config) pubsub.TopicScoreParams {
slot := time.Duration(cfg.BlockTime) * time.Second
// TODO: tune these params
// TODO: we initialize an "epoch" as 6 blocks suggesting 6 blocks, each taking ~ 2 seconds, is 12 seconds
var DisabledTopicScoreParams = func(blockTime uint64) pubsub.TopicScoreParams {
slot := time.Duration(blockTime) * time.Second
if slot == 0 {
slot = 2 * time.Second
}
epoch := 6 * slot
invalidDecayPeriod := 50 * epoch
return pubsub.TopicScoreParams{
......@@ -88,7 +86,7 @@ var DisabledTopicScoreParams = func(cfg *rollup.Config) pubsub.TopicScoreParams
}
// TopicScoreParamsByName is a map of name to [pubsub.TopicScoreParams].
var TopicScoreParamsByName = map[string](func(*rollup.Config) pubsub.TopicScoreParams){
var TopicScoreParamsByName = map[string](func(blockTime uint64) pubsub.TopicScoreParams){
"default": DefaultTopicScoreParams,
"disabled": DisabledTopicScoreParams,
}
......@@ -105,11 +103,11 @@ func AvailableTopicScoreParams() []string {
}
// GetTopicScoreParams returns the [pubsub.TopicScoreParams] for the given name.
func GetTopicScoreParams(name string, cfg *rollup.Config) (pubsub.TopicScoreParams, error) {
func GetTopicScoreParams(name string, blockTime uint64) (pubsub.TopicScoreParams, error) {
params, ok := TopicScoreParamsByName[name]
if !ok {
return pubsub.TopicScoreParams{}, fmt.Errorf("invalid topic params %s", name)
}
return params(cfg), nil
return params(blockTime), nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment