Commit b03d22e7 authored by Adrian Sutton's avatar Adrian Sutton

op-node: Include topic params in the peer scoring params.

parent d4f5782d
...@@ -52,7 +52,7 @@ func Main(cliCtx *cli.Context) error { ...@@ -52,7 +52,7 @@ func Main(cliCtx *cli.Context) error {
return err return err
} }
p2pConfig, err := p2pcli.NewConfig(cliCtx, config.BlockTime) p2pConfig, err := p2pcli.NewConfig(cliCtx, config)
if err != nil { if err != nil {
return fmt.Errorf("failed to load p2p config: %w", err) return fmt.Errorf("failed to load p2p config: %w", err)
} }
......
...@@ -813,7 +813,7 @@ func TestSystemDenseTopology(t *testing.T) { ...@@ -813,7 +813,7 @@ func TestSystemDenseTopology(t *testing.T) {
// Set peer scoring for each node, but without banning // Set peer scoring for each node, but without banning
for _, node := range cfg.Nodes { for _, node := range cfg.Nodes {
params, err := p2p.GetScoringParams("light", 2) params, err := p2p.GetScoringParams("light", &node.Rollup)
require.NoError(t, err) require.NoError(t, err)
node.P2P = &p2p.Config{ node.P2P = &p2p.Config{
ScoringParams: params, ScoringParams: params,
......
...@@ -30,7 +30,6 @@ var ( ...@@ -30,7 +30,6 @@ var (
Name: "p2p.scoring", Name: "p2p.scoring",
Usage: "Sets the peer scoring strategy for the P2P stack. Can be one of: none or light.", Usage: "Sets the peer scoring strategy for the P2P stack. Can be one of: none or light.",
Required: false, Required: false,
Value: "none",
EnvVar: p2pEnv("PEER_SCORING"), EnvVar: p2pEnv("PEER_SCORING"),
} }
PeerScoring = cli.StringFlag{ PeerScoring = cli.StringFlag{
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"os" "os"
"strings" "strings"
"github.com/ethereum-optimism/optimism/op-node/rollup"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync" "github.com/ipfs/go-datastore/sync"
leveldb "github.com/ipfs/go-ds-leveldb" leveldb "github.com/ipfs/go-ds-leveldb"
...@@ -24,7 +25,7 @@ import ( ...@@ -24,7 +25,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
) )
func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) { func NewConfig(ctx *cli.Context, rollupCfg *rollup.Config) (*p2p.Config, error) {
conf := &p2p.Config{} conf := &p2p.Config{}
if ctx.GlobalBool(flags.DisableP2P.Name) { if ctx.GlobalBool(flags.DisableP2P.Name) {
...@@ -54,7 +55,7 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) { ...@@ -54,7 +55,7 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) {
return nil, fmt.Errorf("failed to load p2p gossip options: %w", err) return nil, fmt.Errorf("failed to load p2p gossip options: %w", err)
} }
if err := loadScoringParams(conf, ctx, blockTime); err != nil { if err := loadScoringParams(conf, ctx, rollupCfg); err != nil {
return nil, fmt.Errorf("failed to load p2p peer scoring options: %w", err) return nil, fmt.Errorf("failed to load p2p peer scoring options: %w", err)
} }
...@@ -81,7 +82,7 @@ func validatePort(p uint) (uint16, error) { ...@@ -81,7 +82,7 @@ func validatePort(p uint) (uint16, error) {
} }
// loadScoringParams loads the peer scoring options from the CLI context. // loadScoringParams loads the peer scoring options from the CLI context.
func loadScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64) error { func loadScoringParams(conf *p2p.Config, ctx *cli.Context, rollupCfg *rollup.Config) error {
scoringLevel := ctx.GlobalString(flags.Scoring.Name) scoringLevel := ctx.GlobalString(flags.Scoring.Name)
// Check old names for backwards compatibility // Check old names for backwards compatibility
if scoringLevel == "" { if scoringLevel == "" {
...@@ -91,7 +92,7 @@ func loadScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64) err ...@@ -91,7 +92,7 @@ func loadScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64) err
scoringLevel = ctx.GlobalString(flags.TopicScoring.Name) scoringLevel = ctx.GlobalString(flags.TopicScoring.Name)
} }
if scoringLevel != "" { if scoringLevel != "" {
params, err := p2p.GetScoringParams(scoringLevel, blockTime) params, err := p2p.GetScoringParams(scoringLevel, rollupCfg)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -53,8 +53,7 @@ type SetupP2P interface { ...@@ -53,8 +53,7 @@ type SetupP2P interface {
// ScoringParams defines the various types of peer scoring parameters. // ScoringParams defines the various types of peer scoring parameters.
type ScoringParams struct { type ScoringParams struct {
PeerScoring pubsub.PeerScoreParams PeerScoring pubsub.PeerScoreParams
TopicScoring pubsub.TopicScoreParams
} }
// Config sets up a p2p host and discv5 service from configuration. // Config sets up a p2p host and discv5 service from configuration.
...@@ -157,13 +156,6 @@ func (conf *Config) BanDuration() time.Duration { ...@@ -157,13 +156,6 @@ func (conf *Config) BanDuration() time.Duration {
return conf.BanningDuration return conf.BanningDuration
} }
func (conf *Config) TopicScoringParams() *pubsub.TopicScoreParams {
if conf.ScoringParams == nil {
return nil
}
return &conf.ScoringParams.TopicScoring
}
func (conf *Config) ReqRespSyncEnabled() bool { func (conf *Config) ReqRespSyncEnabled() bool {
return conf.EnableReqRespSync return conf.EnableReqRespSync
} }
......
...@@ -52,7 +52,6 @@ var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0} ...@@ -52,7 +52,6 @@ var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
type GossipSetupConfigurables interface { type GossipSetupConfigurables interface {
PeerScoringParams() *pubsub.PeerScoreParams PeerScoringParams() *pubsub.PeerScoreParams
TopicScoringParams() *pubsub.TopicScoreParams
// ConfigureGossip creates configuration options to apply to the GossipSub setup // ConfigureGossip creates configuration options to apply to the GossipSub setup
ConfigureGossip(rollupCfg *rollup.Config) []pubsub.Option ConfigureGossip(rollupCfg *rollup.Config) []pubsub.Option
} }
...@@ -423,7 +422,7 @@ func (p *publisher) Close() error { ...@@ -423,7 +422,7 @@ func (p *publisher) Close() error {
return p.blocksTopic.Close() return p.blocksTopic.Close()
} }
func JoinGossip(p2pCtx context.Context, self peer.ID, topicScoreParams *pubsub.TopicScoreParams, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) { func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) {
val := guardGossipValidator(log, logValidationResult(self, "validated block", log, BuildBlocksValidator(log, cfg, runCfg))) val := guardGossipValidator(log, logValidationResult(self, "validated block", log, BuildBlocksValidator(log, cfg, runCfg)))
blocksTopicName := blocksTopicV1(cfg) blocksTopicName := blocksTopicV1(cfg)
err := ps.RegisterTopicValidator(blocksTopicName, err := ps.RegisterTopicValidator(blocksTopicName,
...@@ -443,12 +442,6 @@ func JoinGossip(p2pCtx context.Context, self peer.ID, topicScoreParams *pubsub.T ...@@ -443,12 +442,6 @@ func JoinGossip(p2pCtx context.Context, self peer.ID, topicScoreParams *pubsub.T
} }
go LogTopicEvents(p2pCtx, log.New("topic", "blocks"), blocksTopicEvents) go LogTopicEvents(p2pCtx, log.New("topic", "blocks"), blocksTopicEvents)
if topicScoreParams != nil {
if err = blocksTopic.SetScoreParams(topicScoreParams); err != nil {
return nil, fmt.Errorf("failed to set topic score params: %w", err)
}
}
subscription, err := blocksTopic.Subscribe() subscription, err := blocksTopic.Subscribe()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to subscribe to blocks gossip topic: %w", err) return nil, fmt.Errorf("failed to subscribe to blocks gossip topic: %w", err)
......
...@@ -133,7 +133,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -133,7 +133,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
if err != nil { if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err) return fmt.Errorf("failed to start gossipsub router: %w", err)
} }
n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), setup.TopicScoringParams(), n.gs, log, rollupCfg, runCfg, gossipIn) n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
if err != nil { if err != nil {
return fmt.Errorf("failed to join blocks gossip topic: %w", err) return fmt.Errorf("failed to join blocks gossip topic: %w", err)
} }
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"math" "math"
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
) )
...@@ -12,6 +13,15 @@ import ( ...@@ -12,6 +13,15 @@ import (
// DecayToZero is the decay factor for a peer's score to zero. // DecayToZero is the decay factor for a peer's score to zero.
const DecayToZero = 0.01 const DecayToZero = 0.01
// MeshWeight is the weight of the mesh delivery topic.
const MeshWeight = -0.7
// MaxInMeshScore is the maximum score for being in the mesh.
const MaxInMeshScore = 10
// DecayEpoch is the number of epochs to decay the score over.
const DecayEpoch = time.Duration(5)
// ScoreDecay returns the decay factor for a given duration. // ScoreDecay returns the decay factor for a given duration.
func ScoreDecay(duration time.Duration, slot time.Duration) float64 { func ScoreDecay(duration time.Duration, slot time.Duration) float64 {
numOfTimes := duration / slot numOfTimes := duration / slot
...@@ -22,8 +32,8 @@ func ScoreDecay(duration time.Duration, slot time.Duration) float64 { ...@@ -22,8 +32,8 @@ func ScoreDecay(duration time.Duration, slot time.Duration) float64 {
// See [PeerScoreParams] for detailed documentation. // See [PeerScoreParams] for detailed documentation.
// //
// [PeerScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#PeerScoreParams // [PeerScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#PeerScoreParams
var LightPeerScoreParams = func(blockTime uint64) pubsub.PeerScoreParams { var LightPeerScoreParams = func(cfg *rollup.Config) pubsub.PeerScoreParams {
slot := time.Duration(blockTime) * time.Second slot := time.Duration(cfg.BlockTime) * time.Second
if slot == 0 { if slot == 0 {
slot = 2 * time.Second slot = 2 * time.Second
} }
...@@ -32,8 +42,29 @@ var LightPeerScoreParams = func(blockTime uint64) pubsub.PeerScoreParams { ...@@ -32,8 +42,29 @@ var LightPeerScoreParams = func(blockTime uint64) pubsub.PeerScoreParams {
epoch := 6 * slot epoch := 6 * slot
tenEpochs := 10 * epoch tenEpochs := 10 * epoch
oneHundredEpochs := 100 * epoch oneHundredEpochs := 100 * epoch
invalidDecayPeriod := 50 * epoch
return pubsub.PeerScoreParams{ return pubsub.PeerScoreParams{
Topics: make(map[string]*pubsub.TopicScoreParams), Topics: map[string]*pubsub.TopicScoreParams{
blocksTopicV1(cfg): {
TopicWeight: 0.8,
TimeInMeshWeight: MaxInMeshScore / inMeshCap(slot),
TimeInMeshQuantum: slot,
TimeInMeshCap: inMeshCap(slot),
FirstMessageDeliveriesWeight: 1,
FirstMessageDeliveriesDecay: ScoreDecay(20*epoch, slot),
FirstMessageDeliveriesCap: 23,
MeshMessageDeliveriesWeight: MeshWeight,
MeshMessageDeliveriesDecay: ScoreDecay(DecayEpoch*epoch, slot),
MeshMessageDeliveriesCap: float64(uint64(epoch/slot) * uint64(DecayEpoch)),
MeshMessageDeliveriesThreshold: float64(uint64(epoch/slot) * uint64(DecayEpoch) / 10),
MeshMessageDeliveriesWindow: 2 * time.Second,
MeshMessageDeliveriesActivation: 4 * epoch,
MeshFailurePenaltyWeight: MeshWeight,
MeshFailurePenaltyDecay: ScoreDecay(DecayEpoch*epoch, slot),
InvalidMessageDeliveriesWeight: -140.4475,
InvalidMessageDeliveriesDecay: ScoreDecay(invalidDecayPeriod, slot),
},
},
TopicScoreCap: 34, TopicScoreCap: 34,
AppSpecificScore: func(p peer.ID) float64 { AppSpecificScore: func(p peer.ID) float64 {
return 0 return 0
...@@ -51,12 +82,16 @@ var LightPeerScoreParams = func(blockTime uint64) pubsub.PeerScoreParams { ...@@ -51,12 +82,16 @@ var LightPeerScoreParams = func(blockTime uint64) pubsub.PeerScoreParams {
} }
} }
func GetScoringParams(name string, blockTime uint64) (*ScoringParams, error) { // the cap for `inMesh` time scoring.
func inMeshCap(slot time.Duration) float64 {
return float64((3600 * time.Second) / slot)
}
func GetScoringParams(name string, cfg *rollup.Config) (*ScoringParams, error) {
switch name { switch name {
case "light": case "light":
return &ScoringParams{ return &ScoringParams{
PeerScoring: LightPeerScoreParams(blockTime), PeerScoring: LightPeerScoreParams(cfg),
TopicScoring: LightTopicScoreParams(blockTime),
}, nil }, nil
case "none": case "none":
return nil, nil return nil, nil
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
) )
...@@ -42,15 +43,16 @@ func (testSuite *PeerParamsTestSuite) TestNewPeerScoreThresholds() { ...@@ -42,15 +43,16 @@ func (testSuite *PeerParamsTestSuite) TestNewPeerScoreThresholds() {
// TestGetPeerScoreParams validates the peer score parameters. // TestGetPeerScoreParams validates the peer score parameters.
func (testSuite *PeerParamsTestSuite) TestGetPeerScoreParams_None() { func (testSuite *PeerParamsTestSuite) TestGetPeerScoreParams_None() {
params, err := GetScoringParams("none", 1) params, err := GetScoringParams("none", &chaincfg.Goerli)
testSuite.NoError(err) testSuite.NoError(err)
testSuite.Nil(params) testSuite.Nil(params)
} }
// TestLightPeerScoreParams validates the light peer score params. // TestLightPeerScoreParams validates the light peer score params.
func (testSuite *PeerParamsTestSuite) TestGetPeerScoreParams_Light() { func (testSuite *PeerParamsTestSuite) TestGetPeerScoreParams_Light() {
blockTime := uint64(1) cfg := chaincfg.Goerli
slot := time.Duration(blockTime) * time.Second cfg.BlockTime = 1
slot := time.Duration(cfg.BlockTime) * time.Second
epoch := 6 * slot epoch := 6 * slot
oneHundredEpochs := 100 * epoch oneHundredEpochs := 100 * epoch
...@@ -60,10 +62,14 @@ func (testSuite *PeerParamsTestSuite) TestGetPeerScoreParams_Light() { ...@@ -60,10 +62,14 @@ func (testSuite *PeerParamsTestSuite) TestGetPeerScoreParams_Light() {
testSuite.Equal(0.9261187281287935, decay) testSuite.Equal(0.9261187281287935, decay)
// Test the params // Test the params
scoringParams, err := GetScoringParams("light", blockTime) scoringParams, err := GetScoringParams("light", &cfg)
peerParams := scoringParams.PeerScoring peerParams := scoringParams.PeerScoring
testSuite.NoError(err) testSuite.NoError(err)
testSuite.Equal(peerParams.Topics, make(map[string]*pubsub.TopicScoreParams)) // Topics should contain options for block topic
testSuite.Len(peerParams.Topics, 1)
topicParams, ok := peerParams.Topics[blocksTopicV1(&cfg)]
testSuite.True(ok, "should have block topic params")
testSuite.NotZero(topicParams.TimeInMeshQuantum)
testSuite.Equal(peerParams.TopicScoreCap, float64(34)) testSuite.Equal(peerParams.TopicScoreCap, float64(34))
testSuite.Equal(peerParams.AppSpecificWeight, float64(1)) testSuite.Equal(peerParams.AppSpecificWeight, float64(1))
testSuite.Equal(peerParams.IPColocationFactorWeight, float64(-35)) testSuite.Equal(peerParams.IPColocationFactorWeight, float64(-35))
...@@ -79,8 +85,10 @@ func (testSuite *PeerParamsTestSuite) TestGetPeerScoreParams_Light() { ...@@ -79,8 +85,10 @@ func (testSuite *PeerParamsTestSuite) TestGetPeerScoreParams_Light() {
// TestParamsZeroBlockTime validates peer score params use default slot for 0 block time. // TestParamsZeroBlockTime validates peer score params use default slot for 0 block time.
func (testSuite *PeerParamsTestSuite) TestParamsZeroBlockTime() { func (testSuite *PeerParamsTestSuite) TestParamsZeroBlockTime() {
cfg := chaincfg.Goerli
cfg.BlockTime = 0
slot := 2 * time.Second slot := 2 * time.Second
params, err := GetScoringParams("light", uint64(0)) params, err := GetScoringParams("light", &cfg)
testSuite.NoError(err) testSuite.NoError(err)
testSuite.Equal(params.PeerScoring.DecayInterval, slot) testSuite.Equal(params.PeerScoring.DecayInterval, slot)
} }
...@@ -115,9 +115,6 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts [] ...@@ -115,9 +115,6 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
DecayInterval: time.Second, DecayInterval: time.Second,
DecayToZero: 0.01, DecayToZero: 0.01,
}, },
TopicScoring: pubsub.TopicScoreParams{
TimeInMeshQuantum: time.Second,
},
}, },
}, scorer, logger)...) }, scorer, logger)...)
ps, err := pubsub.NewGossipSubWithRouter(ctx, h, rt, opts...) ps, err := pubsub.NewGossipSubWithRouter(ctx, h, rt, opts...)
......
...@@ -85,10 +85,6 @@ func (p *Prepared) BanDuration() time.Duration { ...@@ -85,10 +85,6 @@ func (p *Prepared) BanDuration() time.Duration {
return 1 * time.Hour return 1 * time.Hour
} }
func (p *Prepared) TopicScoringParams() *pubsub.TopicScoreParams {
return nil
}
func (p *Prepared) Disabled() bool { func (p *Prepared) Disabled() bool {
return false return false
} }
......
package p2p
import (
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
// MeshWeight is the weight of the mesh delivery topic.
const MeshWeight = -0.7
// MaxInMeshScore is the maximum score for being in the mesh.
const MaxInMeshScore = 10
// DecayEpoch is the number of epochs to decay the score over.
const DecayEpoch = time.Duration(5)
// LightTopicScoreParams is a default instantiation of [pubsub.TopicScoreParams].
// See [TopicScoreParams] for detailed documentation.
//
// [TopicScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#TopicScoreParams
var LightTopicScoreParams = func(blockTime uint64) pubsub.TopicScoreParams {
slot := time.Duration(blockTime) * time.Second
if slot == 0 {
slot = 2 * time.Second
}
epoch := 6 * slot
invalidDecayPeriod := 50 * epoch
return pubsub.TopicScoreParams{
TopicWeight: 0.8,
TimeInMeshWeight: MaxInMeshScore / inMeshCap(slot),
TimeInMeshQuantum: slot,
TimeInMeshCap: inMeshCap(slot),
FirstMessageDeliveriesWeight: 1,
FirstMessageDeliveriesDecay: ScoreDecay(20*epoch, slot),
FirstMessageDeliveriesCap: 23,
MeshMessageDeliveriesWeight: MeshWeight,
MeshMessageDeliveriesDecay: ScoreDecay(DecayEpoch*epoch, slot),
MeshMessageDeliveriesCap: float64(uint64(epoch/slot) * uint64(DecayEpoch)),
MeshMessageDeliveriesThreshold: float64(uint64(epoch/slot) * uint64(DecayEpoch) / 10),
MeshMessageDeliveriesWindow: 2 * time.Second,
MeshMessageDeliveriesActivation: 4 * epoch,
MeshFailurePenaltyWeight: MeshWeight,
MeshFailurePenaltyDecay: ScoreDecay(DecayEpoch*epoch, slot),
InvalidMessageDeliveriesWeight: -140.4475,
InvalidMessageDeliveriesDecay: ScoreDecay(invalidDecayPeriod, slot),
}
}
// the cap for `inMesh` time scoring.
func inMeshCap(slot time.Duration) float64 {
return float64((3600 * time.Second) / slot)
}
...@@ -43,7 +43,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -43,7 +43,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return nil, fmt.Errorf("failed to load p2p signer: %w", err) return nil, fmt.Errorf("failed to load p2p signer: %w", err)
} }
p2pConfig, err := p2pcli.NewConfig(ctx, rollupConfig.BlockTime) p2pConfig, err := p2pcli.NewConfig(ctx, rollupConfig)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to load p2p config: %w", err) return nil, fmt.Errorf("failed to load p2p config: %w", err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment