Commit c785e356 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge pull request #4743 from ethereum-optimism/refcell/p2p_cli

feat(op-node): Peer Score Hardening
parents 962ac1d9 831f9f62
...@@ -53,7 +53,7 @@ func waitForL1OriginOnL2(l1BlockNum uint64, client *ethclient.Client, timeout ti ...@@ -53,7 +53,7 @@ func waitForL1OriginOnL2(l1BlockNum uint64, client *ethclient.Client, timeout ti
} }
case err := <-headSub.Err(): case err := <-headSub.Err():
return nil, fmt.Errorf("Error in head subscription: %w", err) return nil, fmt.Errorf("error in head subscription: %w", err)
case <-timeoutCh: case <-timeoutCh:
return nil, errors.New("timeout") return nil, errors.New("timeout")
} }
...@@ -101,7 +101,7 @@ func waitForBlock(number *big.Int, client *ethclient.Client, timeout time.Durati ...@@ -101,7 +101,7 @@ func waitForBlock(number *big.Int, client *ethclient.Client, timeout time.Durati
return client.BlockByNumber(ctx, number) return client.BlockByNumber(ctx, number)
} }
case err := <-headSub.Err(): case err := <-headSub.Err():
return nil, fmt.Errorf("Error in head subscription: %w", err) return nil, fmt.Errorf("error in head subscription: %w", err)
case <-timeoutCh: case <-timeoutCh:
return nil, errors.New("timeout") return nil, errors.New("timeout")
} }
......
...@@ -464,7 +464,7 @@ func (cfg SystemConfig) Start() (*System, error) { ...@@ -464,7 +464,7 @@ func (cfg SystemConfig) Start() (*System, error) {
if p, ok := p2pNodes[name]; ok { if p, ok := p2pNodes[name]; ok {
c.P2P = p c.P2P = p
if c.Driver.SequencerEnabled { if c.Driver.SequencerEnabled && c.P2PSigner == nil {
c.P2PSigner = &p2p.PreparedSigner{Signer: p2p.NewLocalSigner(cfg.Secrets.SequencerP2P)} c.P2PSigner = &p2p.PreparedSigner{Signer: p2p.NewLocalSigner(cfg.Secrets.SequencerP2P)}
} }
} }
......
...@@ -28,7 +28,10 @@ import ( ...@@ -28,7 +28,10 @@ import (
"github.com/ethereum-optimism/optimism/op-bindings/predeploys" "github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
rollupNode "github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/withdrawals" "github.com/ethereum-optimism/optimism/op-node/withdrawals"
...@@ -590,7 +593,7 @@ func TestSystemMockP2P(t *testing.T) { ...@@ -590,7 +593,7 @@ func TestSystemMockP2P(t *testing.T) {
// connect the nodes // connect the nodes
cfg.P2PTopology = map[string][]string{ cfg.P2PTopology = map[string][]string{
"verifier": []string{"sequencer"}, "verifier": {"sequencer"},
} }
var published, received []common.Hash var published, received []common.Hash
...@@ -646,6 +649,189 @@ func TestSystemMockP2P(t *testing.T) { ...@@ -646,6 +649,189 @@ func TestSystemMockP2P(t *testing.T) {
require.Contains(t, received, receiptVerif.BlockHash) require.Contains(t, received, receiptVerif.BlockHash)
} }
// TestSystemMockPeerScoring sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that
// the nodes can sync L2 blocks before they are confirmed on L1.
func TestSystemMockPeerScoring(t *testing.T) {
parallel(t)
if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler())
}
cfg := DefaultSystemConfig(t)
// slow down L1 blocks so we can see the L2 blocks arrive well before the L1 blocks do.
// Keep the seq window small so the L2 chain is started quick
cfg.DeployConfig.L1BlockTime = 10
// Append additional nodes to the system to construct a dense p2p network
cfg.Nodes["verifier2"] = &rollupNode.Config{
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
}
cfg.Nodes["verifier3"] = &rollupNode.Config{
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
}
cfg.Loggers["verifier2"] = testlog.Logger(t, log.LvlInfo).New("role", "verifier")
cfg.Loggers["verifier3"] = testlog.Logger(t, log.LvlInfo).New("role", "verifier")
// Construct a new sequencer with an invalid privkey to produce invalid gossip
// We can then test that the peer scoring system will ban the node
sequencer2PrivateKey := cfg.Secrets.Mallory
cfg.Nodes["sequencer2"] = &rollupNode.Config{
Driver: driver.Config{
VerifierConfDepth: 0,
SequencerConfDepth: 0,
SequencerEnabled: true,
},
// Submitter PrivKey is set in system start for rollup nodes where sequencer = true
RPC: rollupNode.RPCConfig{
ListenAddr: "127.0.0.1",
ListenPort: 0,
EnableAdmin: true,
},
L1EpochPollInterval: time.Second * 4,
P2PSigner: &p2p.PreparedSigner{Signer: p2p.NewLocalSigner(sequencer2PrivateKey)},
}
cfg.Loggers["sequencer2"] = testlog.Logger(t, log.LvlInfo).New("role", "sequencer")
// connect the nodes
cfg.P2PTopology = map[string][]string{
"verifier": {"sequencer", "sequencer2", "verifier2", "verifier3"},
"verifier2": {"sequencer", "sequencer2", "verifier", "verifier3"},
"verifier3": {"sequencer", "sequencer2", "verifier", "verifier2"},
}
// Set peer scoring for each node, but without banning
for _, node := range cfg.Nodes {
params, err := p2p.GetPeerScoreParams("light", 2)
require.NoError(t, err)
node.P2P = &p2p.Config{
PeerScoring: params,
BanningEnabled: false,
}
}
var published, published2, received1, received2, received3 []common.Hash
seqTracer, verifTracer, verifTracer2, verifTracer3 := new(FnTracer), new(FnTracer), new(FnTracer), new(FnTracer)
seq2Tracer := new(FnTracer)
seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) {
published = append(published, payload.BlockHash)
}
seq2Tracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) {
published2 = append(published2, payload.BlockHash)
}
verifTracer.OnUnsafeL2PayloadFn = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) {
received1 = append(received1, payload.BlockHash)
}
verifTracer2.OnUnsafeL2PayloadFn = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) {
received2 = append(received2, payload.BlockHash)
}
verifTracer3.OnUnsafeL2PayloadFn = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) {
received3 = append(received3, payload.BlockHash)
}
cfg.Nodes["sequencer"].Tracer = seqTracer
cfg.Nodes["sequencer2"].Tracer = seq2Tracer
cfg.Nodes["verifier"].Tracer = verifTracer
cfg.Nodes["verifier2"].Tracer = verifTracer2
cfg.Nodes["verifier3"].Tracer = verifTracer3
sys, err := cfg.Start()
require.Nil(t, err, "Error starting up system")
defer sys.Close()
l2Seq := sys.Clients["sequencer"]
// l2Seq2 := sys.Clients["sequencer2"]
l2Verif := sys.Clients["verifier"]
l2Verif2 := sys.Clients["verifier2"]
l2Verif3 := sys.Clients["verifier3"]
// Transactor Account
ethPrivKey := cfg.Secrets.Alice
// Submit TX to L2 sequencer node
toAddr := common.Address{0xff, 0xff}
tx := types.MustSignNewTx(ethPrivKey, types.LatestSignerForChainID(cfg.L2ChainIDBig()), &types.DynamicFeeTx{
ChainID: cfg.L2ChainIDBig(),
Nonce: 0,
To: &toAddr,
Value: big.NewInt(1_000_000_000),
GasTipCap: big.NewInt(10),
GasFeeCap: big.NewInt(200),
Gas: 21000,
})
err = l2Seq.SendTransaction(context.Background(), tx)
require.Nil(t, err, "Sending L2 tx to sequencer")
// Wait for tx to be mined on the L2 sequencer chain
receiptSeq, err := waitForTransaction(tx.Hash(), l2Seq, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on sequencer")
// Wait until the block it was first included in shows up in the safe chain on the verifier
receiptVerif, err := waitForTransaction(tx.Hash(), l2Verif, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on verifier")
require.Equal(t, receiptSeq, receiptVerif)
receiptVerif, err = waitForTransaction(tx.Hash(), l2Verif2, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on verifier2")
require.Equal(t, receiptSeq, receiptVerif)
receiptVerif, err = waitForTransaction(tx.Hash(), l2Verif3, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
require.Nil(t, err, "Waiting for L2 tx on verifier3")
require.Equal(t, receiptSeq, receiptVerif)
// Verify that everything that was received was published
require.GreaterOrEqual(t, len(published), len(received1))
require.GreaterOrEqual(t, len(published), len(received2))
require.GreaterOrEqual(t, len(published), len(received3))
require.ElementsMatch(t, published, received1[:len(published)])
require.ElementsMatch(t, published, received2[:len(published)])
require.ElementsMatch(t, published, received3[:len(published)])
// Verify that the tx was received via p2p
require.Contains(t, received1, receiptVerif.BlockHash)
require.Contains(t, received2, receiptVerif.BlockHash)
require.Contains(t, received3, receiptVerif.BlockHash)
// Submit TX to the second (malicious) sequencer node
// toAddr = common.Address{0xff, 0xff}
// maliciousTx := types.MustSignNewTx(ethPrivKey, types.LatestSignerForChainID(cfg.L2ChainIDBig()), &types.DynamicFeeTx{
// ChainID: cfg.L2ChainIDBig(),
// Nonce: 1,
// To: &toAddr,
// Value: big.NewInt(1_000_000_000),
// GasTipCap: big.NewInt(10),
// GasFeeCap: big.NewInt(200),
// Gas: 21000,
// })
// err = l2Seq2.SendTransaction(context.Background(), maliciousTx)
// require.Nil(t, err, "Sending L2 tx to sequencer")
// Wait for tx to be mined on the L2 sequencer chain
// receiptSeq, err = waitForTransaction(maliciousTx.Hash(), l2Seq2, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
// require.Nil(t, err, "Waiting for L2 tx on sequencer")
// Wait until the block it was first included in shows up in the safe chain on the verifier
// receiptVerif, err = waitForTransaction(maliciousTx.Hash(), l2Verif, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
// require.Nil(t, err, "Waiting for L2 tx on verifier")
// require.Equal(t, receiptSeq, receiptVerif)
// receiptVerif, err = waitForTransaction(tx.Hash(), l2Verif2, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
// require.Nil(t, err, "Waiting for L2 tx on verifier2")
// require.Equal(t, receiptSeq, receiptVerif)
// receiptVerif, err = waitForTransaction(tx.Hash(), l2Verif3, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second)
// require.Nil(t, err, "Waiting for L2 tx on verifier3")
// require.Equal(t, receiptSeq, receiptVerif)
}
func TestL1InfoContract(t *testing.T) { func TestL1InfoContract(t *testing.T) {
parallel(t) parallel(t)
if !verboseGethNodes { if !verboseGethNodes {
......
...@@ -25,6 +25,33 @@ var ( ...@@ -25,6 +25,33 @@ var (
Required: false, Required: false,
EnvVar: p2pEnv("NO_DISCOVERY"), EnvVar: p2pEnv("NO_DISCOVERY"),
} }
PeerScoring = cli.StringFlag{
Name: "p2p.scoring.peers",
Usage: "Sets the peer scoring strategy for the P2P stack. " +
"Can be one of: none or light." +
"Custom scoring strategies can be defined in the config file.",
Required: false,
Value: "none",
EnvVar: p2pEnv("PEER_SCORING"),
}
// Banning Flag - whether or not we want to act on the scoring
Banning = cli.BoolFlag{
Name: "p2p.ban.peers",
Usage: "Enables peer banning. This should ONLY be enabled once certain peer scoring is working correctly.",
Required: false,
EnvVar: p2pEnv("PEER_BANNING"),
}
TopicScoring = cli.StringFlag{
Name: "p2p.scoring.topics",
Usage: "Sets the topic scoring strategy. " +
"Can be one of: none or light." +
"Custom scoring strategies can be defined in the config file.",
Required: false,
Value: "none",
EnvVar: p2pEnv("TOPIC_SCORING"),
}
P2PPrivPath = cli.StringFlag{ P2PPrivPath = cli.StringFlag{
Name: "p2p.priv.path", Name: "p2p.priv.path",
Usage: "Read the hex-encoded 32-byte private key for the peer ID from this txt file. Created if not already exists." + Usage: "Read the hex-encoded 32-byte private key for the peer ID from this txt file. Created if not already exists." +
......
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
pb "github.com/libp2p/go-libp2p-pubsub/pb" pb "github.com/libp2p/go-libp2p-pubsub/pb"
libp2pmetrics "github.com/libp2p/go-libp2p/core/metrics" libp2pmetrics "github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
...@@ -64,6 +65,8 @@ type Metricer interface { ...@@ -64,6 +65,8 @@ type Metricer interface {
RecordSequencerBuildingDiffTime(duration time.Duration) RecordSequencerBuildingDiffTime(duration time.Duration)
RecordSequencerSealingTime(duration time.Duration) RecordSequencerSealingTime(duration time.Duration)
Document() []metrics.DocumentedMetric Document() []metrics.DocumentedMetric
// P2P Metrics
RecordPeerScoring(peerID peer.ID, score float64)
} }
// Metrics tracks all the metrics for the op-node. // Metrics tracks all the metrics for the op-node.
...@@ -116,6 +119,7 @@ type Metrics struct { ...@@ -116,6 +119,7 @@ type Metrics struct {
// P2P Metrics // P2P Metrics
PeerCount prometheus.Gauge PeerCount prometheus.Gauge
StreamCount prometheus.Gauge StreamCount prometheus.Gauge
PeerScores *prometheus.GaugeVec
GossipEventsTotal *prometheus.CounterVec GossipEventsTotal *prometheus.CounterVec
BandwidthTotal *prometheus.GaugeVec BandwidthTotal *prometheus.GaugeVec
...@@ -289,6 +293,16 @@ func NewMetrics(procName string) *Metrics { ...@@ -289,6 +293,16 @@ func NewMetrics(procName string) *Metrics {
Name: "stream_count", Name: "stream_count",
Help: "Count of currently connected p2p streams", Help: "Count of currently connected p2p streams",
}), }),
PeerScores: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "peer_scores",
Help: "Peer scoring",
}, []string{
// No label names here since peer ids would open a service attack vector.
// Each peer id would be a separate metric, flooding prometheus.
// See: https://prometheus.io/docs/practices/naming/#labels
}),
GossipEventsTotal: factory.NewCounterVec(prometheus.CounterOpts{ GossipEventsTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Subsystem: "p2p", Subsystem: "p2p",
...@@ -477,6 +491,10 @@ func (m *Metrics) RecordGossipEvent(evType int32) { ...@@ -477,6 +491,10 @@ func (m *Metrics) RecordGossipEvent(evType int32) {
m.GossipEventsTotal.WithLabelValues(pb.TraceEvent_Type_name[evType]).Inc() m.GossipEventsTotal.WithLabelValues(pb.TraceEvent_Type_name[evType]).Inc()
} }
func (m *Metrics) RecordPeerScoring(peerID peer.ID, score float64) {
m.PeerScores.WithLabelValues(peerID.String()).Set(score)
}
func (m *Metrics) IncPeerCount() { func (m *Metrics) IncPeerCount() {
m.PeerCount.Inc() m.PeerCount.Inc()
} }
...@@ -609,6 +627,9 @@ func (n *noopMetricer) RecordSequencerReset() { ...@@ -609,6 +627,9 @@ func (n *noopMetricer) RecordSequencerReset() {
func (n *noopMetricer) RecordGossipEvent(evType int32) { func (n *noopMetricer) RecordGossipEvent(evType int32) {
} }
func (n *noopMetricer) RecordPeerScoring(peerID peer.ID, score float64) {
}
func (n *noopMetricer) IncPeerCount() { func (n *noopMetricer) IncPeerCount() {
} }
......
...@@ -24,7 +24,7 @@ import ( ...@@ -24,7 +24,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
) )
func NewConfig(ctx *cli.Context) (*p2p.Config, error) { func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) {
conf := &p2p.Config{} conf := &p2p.Config{}
if ctx.GlobalBool(flags.DisableP2P.Name) { if ctx.GlobalBool(flags.DisableP2P.Name) {
...@@ -54,6 +54,18 @@ func NewConfig(ctx *cli.Context) (*p2p.Config, error) { ...@@ -54,6 +54,18 @@ func NewConfig(ctx *cli.Context) (*p2p.Config, error) {
return nil, fmt.Errorf("failed to load p2p gossip options: %w", err) return nil, fmt.Errorf("failed to load p2p gossip options: %w", err)
} }
if err := loadPeerScoringParams(conf, ctx, blockTime); err != nil {
return nil, fmt.Errorf("failed to load p2p peer scoring options: %w", err)
}
if err := loadBanningOption(conf, ctx); err != nil {
return nil, fmt.Errorf("failed to load banning option: %w", err)
}
if err := loadTopicScoringParams(conf, ctx, blockTime); err != nil {
return nil, fmt.Errorf("failed to load p2p topic scoring options: %w", err)
}
conf.ConnGater = p2p.DefaultConnGater conf.ConnGater = p2p.DefaultConnGater
conf.ConnMngr = p2p.DefaultConnManager conf.ConnMngr = p2p.DefaultConnManager
...@@ -73,6 +85,49 @@ func validatePort(p uint) (uint16, error) { ...@@ -73,6 +85,49 @@ func validatePort(p uint) (uint16, error) {
return uint16(p), nil return uint16(p), nil
} }
// loadTopicScoringParams loads the topic scoring options from the CLI context.
//
// If the topic scoring options are not set, then the default topic scoring.
func loadTopicScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64) error {
scoringLevel := ctx.GlobalString(flags.TopicScoring.Name)
if scoringLevel != "" {
// Set default block topic scoring parameters
// See prysm: https://github.com/prysmaticlabs/prysm/blob/develop/beacon-chain/p2p/gossip_scoring_params.go
// And research from lighthouse: https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c
// And docs: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#topic-parameter-calculation-and-decay
topicScoreParams, err := p2p.GetTopicScoreParams(scoringLevel, blockTime)
if err != nil {
return err
}
conf.TopicScoring = topicScoreParams
}
return nil
}
// loadPeerScoringParams loads the scoring options from the CLI context.
//
// If the scoring level is not set, no scoring is enabled.
func loadPeerScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64) error {
scoringLevel := ctx.GlobalString(flags.PeerScoring.Name)
if scoringLevel != "" {
peerScoreParams, err := p2p.GetPeerScoreParams(scoringLevel, blockTime)
if err != nil {
return err
}
conf.PeerScoring = peerScoreParams
}
return nil
}
// loadBanningOption loads whether or not to ban peers from the CLI context.
func loadBanningOption(conf *p2p.Config, ctx *cli.Context) error {
ban := ctx.GlobalBool(flags.Banning.Name)
conf.BanningEnabled = ban
return nil
}
func loadListenOpts(conf *p2p.Config, ctx *cli.Context) error { func loadListenOpts(conf *p2p.Config, ctx *cli.Context) error {
listenIP := ctx.GlobalString(flags.ListenIP.Name) listenIP := ctx.GlobalString(flags.ListenIP.Name)
if listenIP != "" { // optional if listenIP != "" { // optional
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core" "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/crypto"
...@@ -49,6 +50,13 @@ type Config struct { ...@@ -49,6 +50,13 @@ type Config struct {
DisableP2P bool DisableP2P bool
NoDiscovery bool NoDiscovery bool
// Pubsub Scoring Parameters
PeerScoring pubsub.PeerScoreParams
TopicScoring pubsub.TopicScoreParams
// Whether to ban peers based on their [PeerScoring] score.
BanningEnabled bool
ListenIP net.IP ListenIP net.IP
ListenTCPPort uint16 ListenTCPPort uint16
...@@ -95,6 +103,7 @@ type Config struct { ...@@ -95,6 +103,7 @@ type Config struct {
ConnMngr func(conf *Config) (connmgr.ConnManager, error) ConnMngr func(conf *Config) (connmgr.ConnManager, error)
} }
//go:generate mockery --name ConnectionGater
type ConnectionGater interface { type ConnectionGater interface {
connmgr.ConnectionGater connmgr.ConnectionGater
...@@ -138,6 +147,18 @@ func (conf *Config) Disabled() bool { ...@@ -138,6 +147,18 @@ func (conf *Config) Disabled() bool {
return conf.DisableP2P return conf.DisableP2P
} }
func (conf *Config) PeerScoringParams() *pubsub.PeerScoreParams {
return &conf.PeerScoring
}
func (conf *Config) BanPeers() bool {
return conf.BanningEnabled
}
func (conf *Config) TopicScoringParams() *pubsub.TopicScoreParams {
return &conf.TopicScoring
}
const maxMeshParam = 1000 const maxMeshParam = 1000
func (conf *Config) Check() error { func (conf *Config) Check() error {
......
...@@ -40,6 +40,8 @@ const ( ...@@ -40,6 +40,8 @@ const (
DefaultMeshDlo = 6 // topic stable mesh low watermark DefaultMeshDlo = 6 // topic stable mesh low watermark
DefaultMeshDhi = 12 // topic stable mesh high watermark DefaultMeshDhi = 12 // topic stable mesh high watermark
DefaultMeshDlazy = 6 // gossip target DefaultMeshDlazy = 6 // gossip target
// peerScoreInspectFrequency is the frequency at which peer scores are inspected
peerScoreInspectFrequency = 15 * time.Second
) )
// Message domains, the msg id function uncompresses to keep data monomorphic, // Message domains, the msg id function uncompresses to keep data monomorphic,
...@@ -49,6 +51,9 @@ var MessageDomainInvalidSnappy = [4]byte{0, 0, 0, 0} ...@@ -49,6 +51,9 @@ var MessageDomainInvalidSnappy = [4]byte{0, 0, 0, 0}
var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0} var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
type GossipSetupConfigurables interface { type GossipSetupConfigurables interface {
PeerScoringParams() *pubsub.PeerScoreParams
TopicScoringParams() *pubsub.TopicScoreParams
BanPeers() bool
ConfigureGossip(params *pubsub.GossipSubParams) []pubsub.Option ConfigureGossip(params *pubsub.GossipSubParams) []pubsub.Option
} }
...@@ -56,8 +61,10 @@ type GossipRuntimeConfig interface { ...@@ -56,8 +61,10 @@ type GossipRuntimeConfig interface {
P2PSequencerAddress() common.Address P2PSequencerAddress() common.Address
} }
//go:generate mockery --name GossipMetricer
type GossipMetricer interface { type GossipMetricer interface {
RecordGossipEvent(evType int32) RecordGossipEvent(evType int32)
RecordPeerScoring(peerID peer.ID, score float64)
} }
func blocksTopicV1(cfg *rollup.Config) string { func blocksTopicV1(cfg *rollup.Config) string {
...@@ -143,7 +150,7 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams { ...@@ -143,7 +150,7 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams {
// NewGossipSub configures a new pubsub instance with the specified parameters. // NewGossipSub configures a new pubsub instance with the specified parameters.
// PubSub uses a GossipSubRouter as it's router under the hood. // PubSub uses a GossipSubRouter as it's router under the hood.
func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, gossipConf GossipSetupConfigurables, m GossipMetricer) (*pubsub.PubSub, error) { func NewGossipSub(p2pCtx context.Context, h host.Host, g ConnectionGater, cfg *rollup.Config, gossipConf GossipSetupConfigurables, m GossipMetricer, log log.Logger) (*pubsub.PubSub, error) {
denyList, err := pubsub.NewTimeCachedBlacklist(30 * time.Second) denyList, err := pubsub.NewTimeCachedBlacklist(30 * time.Second)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -164,9 +171,9 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, gossi ...@@ -164,9 +171,9 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, gossi
pubsub.WithGossipSubParams(params), pubsub.WithGossipSubParams(params),
pubsub.WithEventTracer(&gossipTracer{m: m}), pubsub.WithEventTracer(&gossipTracer{m: m}),
} }
gossipOpts = append(gossipOpts, ConfigurePeerScoring(h, g, gossipConf, m, log)...)
gossipOpts = append(gossipOpts, gossipConf.ConfigureGossip(&params)...) gossipOpts = append(gossipOpts, gossipConf.ConfigureGossip(&params)...)
return pubsub.NewGossipSub(p2pCtx, h, gossipOpts...) return pubsub.NewGossipSub(p2pCtx, h, gossipOpts...)
// TODO: pubsub.WithPeerScoreInspect(inspect, InspectInterval) to update peerstore scores with gossip scores
} }
func validationResultString(v pubsub.ValidationResult) string { func validationResultString(v pubsub.ValidationResult) string {
...@@ -431,7 +438,7 @@ func (p *publisher) Close() error { ...@@ -431,7 +438,7 @@ func (p *publisher) Close() error {
return p.blocksTopic.Close() return p.blocksTopic.Close()
} }
func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) { func JoinGossip(p2pCtx context.Context, self peer.ID, topicScoreParams *pubsub.TopicScoreParams, ps *pubsub.PubSub, log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, gossipIn GossipIn) (GossipOut, error) {
val := guardGossipValidator(log, logValidationResult(self, "validated block", log, BuildBlocksValidator(log, cfg, runCfg))) val := guardGossipValidator(log, logValidationResult(self, "validated block", log, BuildBlocksValidator(log, cfg, runCfg)))
blocksTopicName := blocksTopicV1(cfg) blocksTopicName := blocksTopicV1(cfg)
err := ps.RegisterTopicValidator(blocksTopicName, err := ps.RegisterTopicValidator(blocksTopicName,
...@@ -451,11 +458,14 @@ func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log ...@@ -451,11 +458,14 @@ func JoinGossip(p2pCtx context.Context, self peer.ID, ps *pubsub.PubSub, log log
} }
go LogTopicEvents(p2pCtx, log.New("topic", "blocks"), blocksTopicEvents) go LogTopicEvents(p2pCtx, log.New("topic", "blocks"), blocksTopicEvents)
// TODO: block topic scoring parameters // A [TimeInMeshQuantum] value of 0 means the topic score is disabled.
// See prysm: https://github.com/prysmaticlabs/prysm/blob/develop/beacon-chain/p2p/gossip_scoring_params.go // If we passed a topicScoreParams with [TimeInMeshQuantum] set to 0,
// And research from lighthouse: https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c // libp2p errors since the params will be rejected.
// And docs: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#topic-parameter-calculation-and-decay if topicScoreParams != nil && topicScoreParams.TimeInMeshQuantum != 0 {
//err := blocksTopic.SetScoreParams(&pubsub.TopicScoreParams{......}) if err = blocksTopic.SetScoreParams(topicScoreParams); err != nil {
return nil, fmt.Errorf("failed to set topic score params: %w", err)
}
}
subscription, err := blocksTopic.Subscribe() subscription, err := blocksTopic.Subscribe()
if err != nil { if err != nil {
......
...@@ -168,7 +168,7 @@ func TestP2PFull(t *testing.T) { ...@@ -168,7 +168,7 @@ func TestP2PFull(t *testing.T) {
_, err = p2pClientA.DiscoveryTable(ctx) _, err = p2pClientA.DiscoveryTable(ctx)
// rpc does not preserve error type // rpc does not preserve error type
require.Equal(t, err.Error(), DisabledDiscovery.Error(), "expecting discv5 to be disabled") require.Equal(t, err.Error(), ErrDisabledDiscovery.Error(), "expecting discv5 to be disabled")
require.NoError(t, p2pClientA.BlockPeer(ctx, hostB.ID())) require.NoError(t, p2pClientA.BlockPeer(ctx, hostB.ID()))
blockedPeers, err := p2pClientA.ListBlockedPeers(ctx) blockedPeers, err := p2pClientA.ListBlockedPeers(ctx)
......
// Code generated by mockery v2.14.0. DO NOT EDIT.
package mocks
import (
control "github.com/libp2p/go-libp2p/core/control"
mock "github.com/stretchr/testify/mock"
multiaddr "github.com/multiformats/go-multiaddr"
net "net"
network "github.com/libp2p/go-libp2p/core/network"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// ConnectionGater is an autogenerated mock type for the ConnectionGater type
type ConnectionGater struct {
mock.Mock
}
// BlockAddr provides a mock function with given fields: ip
func (_m *ConnectionGater) BlockAddr(ip net.IP) error {
ret := _m.Called(ip)
var r0 error
if rf, ok := ret.Get(0).(func(net.IP) error); ok {
r0 = rf(ip)
} else {
r0 = ret.Error(0)
}
return r0
}
// BlockPeer provides a mock function with given fields: p
func (_m *ConnectionGater) BlockPeer(p peer.ID) error {
ret := _m.Called(p)
var r0 error
if rf, ok := ret.Get(0).(func(peer.ID) error); ok {
r0 = rf(p)
} else {
r0 = ret.Error(0)
}
return r0
}
// BlockSubnet provides a mock function with given fields: ipnet
func (_m *ConnectionGater) BlockSubnet(ipnet *net.IPNet) error {
ret := _m.Called(ipnet)
var r0 error
if rf, ok := ret.Get(0).(func(*net.IPNet) error); ok {
r0 = rf(ipnet)
} else {
r0 = ret.Error(0)
}
return r0
}
// InterceptAccept provides a mock function with given fields: _a0
func (_m *ConnectionGater) InterceptAccept(_a0 network.ConnMultiaddrs) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(network.ConnMultiaddrs) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// InterceptAddrDial provides a mock function with given fields: _a0, _a1
func (_m *ConnectionGater) InterceptAddrDial(_a0 peer.ID, _a1 multiaddr.Multiaddr) bool {
ret := _m.Called(_a0, _a1)
var r0 bool
if rf, ok := ret.Get(0).(func(peer.ID, multiaddr.Multiaddr) bool); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// InterceptPeerDial provides a mock function with given fields: p
func (_m *ConnectionGater) InterceptPeerDial(p peer.ID) bool {
ret := _m.Called(p)
var r0 bool
if rf, ok := ret.Get(0).(func(peer.ID) bool); ok {
r0 = rf(p)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// InterceptSecured provides a mock function with given fields: _a0, _a1, _a2
func (_m *ConnectionGater) InterceptSecured(_a0 network.Direction, _a1 peer.ID, _a2 network.ConnMultiaddrs) bool {
ret := _m.Called(_a0, _a1, _a2)
var r0 bool
if rf, ok := ret.Get(0).(func(network.Direction, peer.ID, network.ConnMultiaddrs) bool); ok {
r0 = rf(_a0, _a1, _a2)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// InterceptUpgraded provides a mock function with given fields: _a0
func (_m *ConnectionGater) InterceptUpgraded(_a0 network.Conn) (bool, control.DisconnectReason) {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(network.Conn) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
var r1 control.DisconnectReason
if rf, ok := ret.Get(1).(func(network.Conn) control.DisconnectReason); ok {
r1 = rf(_a0)
} else {
r1 = ret.Get(1).(control.DisconnectReason)
}
return r0, r1
}
// ListBlockedAddrs provides a mock function with given fields:
func (_m *ConnectionGater) ListBlockedAddrs() []net.IP {
ret := _m.Called()
var r0 []net.IP
if rf, ok := ret.Get(0).(func() []net.IP); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]net.IP)
}
}
return r0
}
// ListBlockedPeers provides a mock function with given fields:
func (_m *ConnectionGater) ListBlockedPeers() []peer.ID {
ret := _m.Called()
var r0 []peer.ID
if rf, ok := ret.Get(0).(func() []peer.ID); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]peer.ID)
}
}
return r0
}
// ListBlockedSubnets provides a mock function with given fields:
func (_m *ConnectionGater) ListBlockedSubnets() []*net.IPNet {
ret := _m.Called()
var r0 []*net.IPNet
if rf, ok := ret.Get(0).(func() []*net.IPNet); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*net.IPNet)
}
}
return r0
}
// UnblockAddr provides a mock function with given fields: ip
func (_m *ConnectionGater) UnblockAddr(ip net.IP) error {
ret := _m.Called(ip)
var r0 error
if rf, ok := ret.Get(0).(func(net.IP) error); ok {
r0 = rf(ip)
} else {
r0 = ret.Error(0)
}
return r0
}
// UnblockPeer provides a mock function with given fields: p
func (_m *ConnectionGater) UnblockPeer(p peer.ID) error {
ret := _m.Called(p)
var r0 error
if rf, ok := ret.Get(0).(func(peer.ID) error); ok {
r0 = rf(p)
} else {
r0 = ret.Error(0)
}
return r0
}
// UnblockSubnet provides a mock function with given fields: ipnet
func (_m *ConnectionGater) UnblockSubnet(ipnet *net.IPNet) error {
ret := _m.Called(ipnet)
var r0 error
if rf, ok := ret.Get(0).(func(*net.IPNet) error); ok {
r0 = rf(ipnet)
} else {
r0 = ret.Error(0)
}
return r0
}
type mockConstructorTestingTNewConnectionGater interface {
mock.TestingT
Cleanup(func())
}
// NewConnectionGater creates a new instance of ConnectionGater. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewConnectionGater(t mockConstructorTestingTNewConnectionGater) *ConnectionGater {
mock := &ConnectionGater{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.14.0. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// GossipMetricer is an autogenerated mock type for the GossipMetricer type
type GossipMetricer struct {
mock.Mock
}
// RecordGossipEvent provides a mock function with given fields: evType
func (_m *GossipMetricer) RecordGossipEvent(evType int32) {
_m.Called(evType)
}
// RecordPeerScoring provides a mock function with given fields: peerID, score
func (_m *GossipMetricer) RecordPeerScoring(peerID peer.ID, score float64) {
_m.Called(peerID, score)
}
type mockConstructorTestingTNewGossipMetricer interface {
mock.TestingT
Cleanup(func())
}
// NewGossipMetricer creates a new instance of GossipMetricer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewGossipMetricer(t mockConstructorTestingTNewGossipMetricer) *GossipMetricer {
mock := &GossipMetricer{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.14.0. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// PeerGater is an autogenerated mock type for the PeerGater type
type PeerGater struct {
mock.Mock
}
// Update provides a mock function with given fields: _a0, _a1
func (_m *PeerGater) Update(_a0 peer.ID, _a1 float64) {
_m.Called(_a0, _a1)
}
type mockConstructorTestingTNewPeerGater interface {
mock.TestingT
Cleanup(func())
}
// NewPeerGater creates a new instance of PeerGater. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewPeerGater(t mockConstructorTestingTNewPeerGater) *PeerGater {
mock := &PeerGater{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.14.0. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// Peerstore is an autogenerated mock type for the Peerstore type
type Peerstore struct {
mock.Mock
}
// PeerInfo provides a mock function with given fields: _a0
func (_m *Peerstore) PeerInfo(_a0 peer.ID) peer.AddrInfo {
ret := _m.Called(_a0)
var r0 peer.AddrInfo
if rf, ok := ret.Get(0).(func(peer.ID) peer.AddrInfo); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(peer.AddrInfo)
}
return r0
}
// Peers provides a mock function with given fields:
func (_m *Peerstore) Peers() peer.IDSlice {
ret := _m.Called()
var r0 peer.IDSlice
if rf, ok := ret.Get(0).(func() peer.IDSlice); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(peer.IDSlice)
}
}
return r0
}
type mockConstructorTestingTNewPeerstore interface {
mock.TestingT
Cleanup(func())
}
// NewPeerstore creates a new instance of Peerstore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewPeerstore(t mockConstructorTestingTNewPeerstore) *Peerstore {
mock := &Peerstore{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
...@@ -76,12 +76,11 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -76,12 +76,11 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
// notify of any new connections/streams/etc. // notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics)) n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled. // note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, metrics) n.gs, err = NewGossipSub(resourcesCtx, n.host, n.gater, rollupCfg, setup, metrics, log)
if err != nil { if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err) return fmt.Errorf("failed to start gossipsub router: %w", err)
} }
n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), setup.TopicScoringParams(), n.gs, log, rollupCfg, runCfg, gossipIn)
n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), n.gs, log, rollupCfg, runCfg, gossipIn)
if err != nil { if err != nil {
return fmt.Errorf("failed to join blocks gossip topic: %w", err) return fmt.Errorf("failed to join blocks gossip topic: %w", err)
} }
......
package p2p
import (
log "github.com/ethereum/go-ethereum/log"
peer "github.com/libp2p/go-libp2p/core/peer"
slices "golang.org/x/exp/slices"
)
// ConnectionFactor is the factor by which we multiply the connection score.
const ConnectionFactor = -10
// PeerScoreThreshold is the threshold at which we block a peer.
const PeerScoreThreshold = -100
// gater is an internal implementation of the [PeerGater] interface.
type gater struct {
connGater ConnectionGater
log log.Logger
banEnabled bool
}
// PeerGater manages the connection gating of peers.
//
//go:generate mockery --name PeerGater --output mocks/
type PeerGater interface {
// Update handles a peer score update and blocks/unblocks the peer if necessary.
Update(peer.ID, float64)
}
// NewPeerGater returns a new peer gater.
func NewPeerGater(connGater ConnectionGater, log log.Logger, banEnabled bool) PeerGater {
return &gater{
connGater: connGater,
log: log,
banEnabled: banEnabled,
}
}
// Update handles a peer score update and blocks/unblocks the peer if necessary.
func (s *gater) Update(id peer.ID, score float64) {
// Check if the peer score is below the threshold
// If so, we need to block the peer
if score < PeerScoreThreshold && s.banEnabled {
s.log.Warn("peer blocking enabled, blocking peer", "id", id.String(), "score", score)
err := s.connGater.BlockPeer(id)
s.log.Warn("connection gater failed to block peer", id.String(), "err", err)
}
// Unblock peers whose score has recovered to an acceptable level
if (score > PeerScoreThreshold) && slices.Contains(s.connGater.ListBlockedPeers(), id) {
err := s.connGater.UnblockPeer(id)
s.log.Warn("connection gater failed to unblock peer", id.String(), "err", err)
}
}
package p2p_test
import (
"testing"
p2p "github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
testlog "github.com/ethereum-optimism/optimism/op-node/testlog"
log "github.com/ethereum/go-ethereum/log"
peer "github.com/libp2p/go-libp2p/core/peer"
suite "github.com/stretchr/testify/suite"
)
// PeerGaterTestSuite tests peer parameterization.
type PeerGaterTestSuite struct {
suite.Suite
mockGater *p2pMocks.ConnectionGater
logger log.Logger
}
// SetupTest sets up the test suite.
func (testSuite *PeerGaterTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.ConnectionGater{}
testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError)
}
// TestPeerGater runs the PeerGaterTestSuite.
func TestPeerGater(t *testing.T) {
suite.Run(t, new(PeerGaterTestSuite))
}
// TestPeerScoreConstants validates the peer score constants.
func (testSuite *PeerGaterTestSuite) TestPeerScoreConstants() {
testSuite.Equal(-10, p2p.ConnectionFactor)
testSuite.Equal(-100, p2p.PeerScoreThreshold)
}
// TestPeerGaterUpdate tests the peer gater update hook.
func (testSuite *PeerGaterTestSuite) TestPeerGaterUpdate() {
gater := p2p.NewPeerGater(
testSuite.mockGater,
testSuite.logger,
true,
)
// Mock a connection gater peer block call
// Since the peer score is below the [PeerScoreThreshold] of -100,
// the [BlockPeer] method should be called
testSuite.mockGater.On("BlockPeer", peer.ID("peer1")).Return(nil)
// Apply the peer gater update
gater.Update(peer.ID("peer1"), float64(-100))
}
// TestPeerGaterUpdateNoBanning tests the peer gater update hook without banning set
func (testSuite *PeerGaterTestSuite) TestPeerGaterUpdateNoBanning() {
gater := p2p.NewPeerGater(
testSuite.mockGater,
testSuite.logger,
false,
)
// Notice: [BlockPeer] should not be called since banning is not enabled
gater.Update(peer.ID("peer1"), float64(-100000))
}
package p2p
import (
"fmt"
"math"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
)
// DecayToZero is the decay factor for a peer's score to zero.
const DecayToZero = 0.01
// ScoreDecay returns the decay factor for a given duration.
func ScoreDecay(duration time.Duration, slot time.Duration) float64 {
numOfTimes := duration / slot
return math.Pow(DecayToZero, 1/float64(numOfTimes))
}
// LightPeerScoreParams is an instantiation of [pubsub.PeerScoreParams] with light penalties.
// See [PeerScoreParams] for detailed documentation.
//
// [PeerScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#PeerScoreParams
var LightPeerScoreParams = func(blockTime uint64) pubsub.PeerScoreParams {
slot := time.Duration(blockTime) * time.Second
if slot == 0 {
slot = 2 * time.Second
}
// We initialize an "epoch" as 6 blocks suggesting 6 blocks,
// each taking ~ 2 seconds, is 12 seconds
epoch := 6 * slot
tenEpochs := 10 * epoch
oneHundredEpochs := 100 * epoch
return pubsub.PeerScoreParams{
Topics: make(map[string]*pubsub.TopicScoreParams),
TopicScoreCap: 34,
AppSpecificScore: func(p peer.ID) float64 {
return 0
},
AppSpecificWeight: 1,
IPColocationFactorWeight: -35,
IPColocationFactorThreshold: 10,
IPColocationFactorWhitelist: nil,
BehaviourPenaltyWeight: -16,
BehaviourPenaltyThreshold: 6,
BehaviourPenaltyDecay: ScoreDecay(tenEpochs, slot),
DecayInterval: slot,
DecayToZero: DecayToZero,
RetainScore: oneHundredEpochs,
}
}
// DisabledPeerScoreParams is an instantiation of [pubsub.PeerScoreParams] where all scoring is disabled.
// See [PeerScoreParams] for detailed documentation.
//
// [PeerScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#PeerScoreParams
var DisabledPeerScoreParams = func(blockTime uint64) pubsub.PeerScoreParams {
slot := time.Duration(blockTime) * time.Second
if slot == 0 {
slot = 2 * time.Second
}
// We initialize an "epoch" as 6 blocks suggesting 6 blocks,
// each taking ~ 2 seconds, is 12 seconds
epoch := 6 * slot
tenEpochs := 10 * epoch
oneHundredEpochs := 100 * epoch
return pubsub.PeerScoreParams{
Topics: make(map[string]*pubsub.TopicScoreParams),
// 0 represent no cap
TopicScoreCap: 0,
AppSpecificScore: func(p peer.ID) float64 {
return 0
},
AppSpecificWeight: 1,
// ignore colocation scoring
IPColocationFactorWeight: 0,
IPColocationFactorWhitelist: nil,
// 0 disables the behaviour penalty
BehaviourPenaltyWeight: 0,
BehaviourPenaltyDecay: ScoreDecay(tenEpochs, slot),
DecayInterval: slot,
DecayToZero: DecayToZero,
RetainScore: oneHundredEpochs,
}
}
// PeerScoreParamsByName is a map of name to function that returns a [pubsub.PeerScoreParams] based on the provided [rollup.Config].
var PeerScoreParamsByName = map[string](func(blockTime uint64) pubsub.PeerScoreParams){
"light": LightPeerScoreParams,
"none": DisabledPeerScoreParams,
}
// AvailablePeerScoreParams returns a list of available peer score params.
// These can be used as an input to [GetPeerScoreParams] which returns the
// corresponding [pubsub.PeerScoreParams].
func AvailablePeerScoreParams() []string {
var params []string
for name := range PeerScoreParamsByName {
params = append(params, name)
}
return params
}
// GetPeerScoreParams returns the [pubsub.PeerScoreParams] for the given name.
func GetPeerScoreParams(name string, blockTime uint64) (pubsub.PeerScoreParams, error) {
params, ok := PeerScoreParamsByName[name]
if !ok {
return pubsub.PeerScoreParams{}, fmt.Errorf("invalid params %s", name)
}
return params(blockTime), nil
}
// NewPeerScoreThresholds returns a default [pubsub.PeerScoreThresholds].
// See [PeerScoreThresholds] for detailed documentation.
//
// [PeerScoreThresholds]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#PeerScoreThresholds
func NewPeerScoreThresholds() pubsub.PeerScoreThresholds {
return pubsub.PeerScoreThresholds{
SkipAtomicValidation: false,
GossipThreshold: -10,
PublishThreshold: -40,
GraylistThreshold: -40,
AcceptPXThreshold: 20,
OpportunisticGraftThreshold: 0.05,
}
}
package p2p
import (
"math"
"sort"
"testing"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/stretchr/testify/suite"
)
// PeerParamsTestSuite tests peer parameterization.
type PeerParamsTestSuite struct {
suite.Suite
}
// TestPeerParams runs the PeerParamsTestSuite.
func TestPeerParams(t *testing.T) {
suite.Run(t, new(PeerParamsTestSuite))
}
// TestPeerScoreConstants validates the peer score constants.
func (testSuite *PeerParamsTestSuite) TestPeerScoreConstants() {
testSuite.Equal(0.01, DecayToZero)
}
// TestAvailablePeerScoreParams validates the available peer score parameters.
func (testSuite *PeerParamsTestSuite) TestAvailablePeerScoreParams() {
available := AvailablePeerScoreParams()
sort.Strings(available)
expected := []string{"light", "none"}
testSuite.Equal(expected, available)
}
// TestNewPeerScoreThresholds validates the peer score thresholds.
//
// This is tested to ensure that the thresholds are not modified and missed in review.
func (testSuite *PeerParamsTestSuite) TestNewPeerScoreThresholds() {
thresholds := NewPeerScoreThresholds()
expected := pubsub.PeerScoreThresholds{
SkipAtomicValidation: false,
GossipThreshold: -10,
PublishThreshold: -40,
GraylistThreshold: -40,
AcceptPXThreshold: 20,
OpportunisticGraftThreshold: 0.05,
}
testSuite.Equal(expected, thresholds)
}
// TestGetPeerScoreParams validates the peer score parameters.
func (testSuite *PeerParamsTestSuite) TestGetPeerScoreParams() {
params, err := GetPeerScoreParams("light", 1)
testSuite.NoError(err)
expected := LightPeerScoreParams(1)
testSuite.Equal(expected.DecayInterval, params.DecayInterval)
testSuite.Equal(time.Duration(1)*time.Second, params.DecayInterval)
params, err = GetPeerScoreParams("none", 1)
testSuite.NoError(err)
expected = DisabledPeerScoreParams(1)
testSuite.Equal(expected.DecayInterval, params.DecayInterval)
testSuite.Equal(time.Duration(1)*time.Second, params.DecayInterval)
_, err = GetPeerScoreParams("invalid", 1)
testSuite.Error(err)
}
// TestLightPeerScoreParams validates the light peer score params.
func (testSuite *PeerParamsTestSuite) TestLightPeerScoreParams() {
blockTime := uint64(1)
slot := time.Duration(blockTime) * time.Second
epoch := 6 * slot
oneHundredEpochs := 100 * epoch
// calculate the behavior penalty decay
duration := 10 * epoch
decay := math.Pow(DecayToZero, 1/float64(duration/slot))
testSuite.Equal(0.9261187281287935, decay)
// Test the params
params, err := GetPeerScoreParams("light", blockTime)
testSuite.NoError(err)
testSuite.Equal(params.Topics, make(map[string]*pubsub.TopicScoreParams))
testSuite.Equal(params.TopicScoreCap, float64(34))
// testSuite.Equal(params.AppSpecificScore("alice"), float(0))
testSuite.Equal(params.AppSpecificWeight, float64(1))
testSuite.Equal(params.IPColocationFactorWeight, float64(-35))
testSuite.Equal(params.IPColocationFactorThreshold, int(10))
testSuite.Nil(params.IPColocationFactorWhitelist)
testSuite.Equal(params.BehaviourPenaltyWeight, float64(-16))
testSuite.Equal(params.BehaviourPenaltyThreshold, float64(6))
testSuite.Equal(params.BehaviourPenaltyDecay, decay)
testSuite.Equal(params.DecayInterval, slot)
testSuite.Equal(params.DecayToZero, DecayToZero)
testSuite.Equal(params.RetainScore, oneHundredEpochs)
}
// TestDisabledPeerScoreParams validates the disabled peer score params.
func (testSuite *PeerParamsTestSuite) TestDisabledPeerScoreParams() {
blockTime := uint64(1)
slot := time.Duration(blockTime) * time.Second
epoch := 6 * slot
oneHundredEpochs := 100 * epoch
// calculate the behavior penalty decay
duration := 10 * epoch
decay := math.Pow(DecayToZero, 1/float64(duration/slot))
testSuite.Equal(0.9261187281287935, decay)
// Test the params
params, err := GetPeerScoreParams("none", blockTime)
testSuite.NoError(err)
testSuite.Equal(params.Topics, make(map[string]*pubsub.TopicScoreParams))
testSuite.Equal(params.TopicScoreCap, float64(0))
testSuite.Equal(params.AppSpecificWeight, float64(1))
testSuite.Equal(params.IPColocationFactorWeight, float64(0))
testSuite.Nil(params.IPColocationFactorWhitelist)
testSuite.Equal(params.BehaviourPenaltyWeight, float64(0))
testSuite.Equal(params.BehaviourPenaltyDecay, decay)
testSuite.Equal(params.DecayInterval, slot)
testSuite.Equal(params.DecayToZero, DecayToZero)
testSuite.Equal(params.RetainScore, oneHundredEpochs)
}
// TestParamsZeroBlockTime validates peer score params use default slot for 0 block time.
func (testSuite *PeerParamsTestSuite) TestParamsZeroBlockTime() {
slot := 2 * time.Second
params, err := GetPeerScoreParams("none", uint64(0))
testSuite.NoError(err)
testSuite.Equal(params.DecayInterval, slot)
params, err = GetPeerScoreParams("light", uint64(0))
testSuite.NoError(err)
testSuite.Equal(params.DecayInterval, slot)
}
package p2p
import (
log "github.com/ethereum/go-ethereum/log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
peer "github.com/libp2p/go-libp2p/core/peer"
)
type scorer struct {
peerStore Peerstore
metricer GossipMetricer
log log.Logger
gater PeerGater
}
// Peerstore is a subset of the libp2p peerstore.Peerstore interface.
//
//go:generate mockery --name Peerstore --output mocks/
type Peerstore interface {
// PeerInfo returns a peer.PeerInfo struct for given peer.ID.
// This is a small slice of the information Peerstore has on
// that peer, useful to other services.
PeerInfo(peer.ID) peer.AddrInfo
// Peers returns all of the peer IDs stored across all inner stores.
Peers() peer.IDSlice
}
// Scorer is a peer scorer that scores peers based on application-specific metrics.
type Scorer interface {
OnConnect()
OnDisconnect()
SnapshotHook() pubsub.ExtendedPeerScoreInspectFn
}
// NewScorer returns a new peer scorer.
func NewScorer(peerGater PeerGater, peerStore Peerstore, metricer GossipMetricer, log log.Logger) Scorer {
return &scorer{
peerStore: peerStore,
metricer: metricer,
log: log,
gater: peerGater,
}
}
// SnapshotHook returns a function that is called periodically by the pubsub library to inspect the peer scores.
// It is passed into the pubsub library as a [pubsub.ExtendedPeerScoreInspectFn] in the [pubsub.WithPeerScoreInspect] option.
// The returned [pubsub.ExtendedPeerScoreInspectFn] is called with a mapping of peer IDs to peer score snapshots.
func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) {
for id, snap := range m {
// Record peer score in the metricer
s.metricer.RecordPeerScoring(id, snap.Score)
// Update with the peer gater
s.gater.Update(id, snap.Score)
}
}
}
// OnConnect is called when a peer connects.
// See [p2p.NotificationsMetricer] for invocation.
func (s *scorer) OnConnect() {
// no-op
}
// OnDisconnect is called when a peer disconnects.
// See [p2p.NotificationsMetricer] for invocation.
func (s *scorer) OnDisconnect() {
// no-op
}
package p2p_test
import (
"testing"
p2p "github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
"github.com/ethereum-optimism/optimism/op-node/testlog"
log "github.com/ethereum/go-ethereum/log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
peer "github.com/libp2p/go-libp2p/core/peer"
suite "github.com/stretchr/testify/suite"
)
// PeerScorerTestSuite tests peer parameterization.
type PeerScorerTestSuite struct {
suite.Suite
// mockConnGater *p2pMocks.ConnectionGater
mockGater *p2pMocks.PeerGater
mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer
logger log.Logger
}
// SetupTest sets up the test suite.
func (testSuite *PeerScorerTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.PeerGater{}
// testSuite.mockConnGater = &p2pMocks.ConnectionGater{}
testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{}
testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError)
}
// TestPeerScorer runs the PeerScorerTestSuite.
func TestPeerScorer(t *testing.T) {
suite.Run(t, new(PeerScorerTestSuite))
}
// TestPeerScorerOnConnect ensures we can call the OnConnect method on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestPeerScorerOnConnect() {
scorer := p2p.NewScorer(
testSuite.mockGater,
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.logger,
)
scorer.OnConnect()
}
// TestPeerScorerOnDisconnect ensures we can call the OnDisconnect method on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestPeerScorerOnDisconnect() {
scorer := p2p.NewScorer(
testSuite.mockGater,
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.logger,
)
scorer.OnDisconnect()
}
// TestSnapshotHook tests running the snapshot hook on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestSnapshotHook() {
scorer := p2p.NewScorer(
testSuite.mockGater,
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.logger,
)
inspectFn := scorer.SnapshotHook()
// Mock the snapshot updates
// This doesn't return anything
testSuite.mockMetricer.On("RecordPeerScoring", peer.ID("peer1"), float64(-100)).Return(nil)
// Mock the peer gater call
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-100)).Return(nil)
// Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
peer.ID("peer1"): {
Score: -100,
},
}
inspectFn(snapshotMap)
}
// TestSnapshotHookBlockPeer tests running the snapshot hook on the peer scorer with a peer score below the threshold.
// This implies that the peer should be blocked.
func (testSuite *PeerScorerTestSuite) TestSnapshotHookBlockPeer() {
scorer := p2p.NewScorer(
testSuite.mockGater,
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.logger,
)
inspectFn := scorer.SnapshotHook()
// Mock the snapshot updates
// This doesn't return anything
testSuite.mockMetricer.On("RecordPeerScoring", peer.ID("peer1"), float64(-101)).Return(nil)
// Mock the peer gater call
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-101)).Return(nil)
// Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
peer.ID("peer1"): {
Score: -101,
},
}
inspectFn(snapshotMap)
}
package p2p
import (
log "github.com/ethereum/go-ethereum/log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
host "github.com/libp2p/go-libp2p/core/host"
)
// ConfigurePeerScoring configures the peer scoring parameters for the pubsub
func ConfigurePeerScoring(h host.Host, g ConnectionGater, gossipConf GossipSetupConfigurables, m GossipMetricer, log log.Logger) []pubsub.Option {
// If we want to completely disable scoring config here, we can use the [peerScoringParams]
// to return early without returning any [pubsub.Option].
peerScoreParams := gossipConf.PeerScoringParams()
peerScoreThresholds := NewPeerScoreThresholds()
banEnabled := gossipConf.BanPeers()
peerGater := NewPeerGater(g, log, banEnabled)
scorer := NewScorer(peerGater, h.Peerstore(), m, log)
opts := []pubsub.Option{}
// Check the app specific score since libp2p doesn't export it's [validate] function :/
if peerScoreParams != nil && peerScoreParams.AppSpecificScore != nil {
opts = []pubsub.Option{
pubsub.WithPeerScore(peerScoreParams, &peerScoreThresholds),
pubsub.WithPeerScoreInspect(scorer.SnapshotHook(), peerScoreInspectFrequency),
}
} else {
log.Warn("Proceeding with no peer scoring...\nMissing AppSpecificScore in peer scoring params")
}
return opts
}
package p2p_test
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
p2p "github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
testlog "github.com/ethereum-optimism/optimism/op-node/testlog"
mock "github.com/stretchr/testify/mock"
suite "github.com/stretchr/testify/suite"
log "github.com/ethereum/go-ethereum/log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
host "github.com/libp2p/go-libp2p/core/host"
peer "github.com/libp2p/go-libp2p/core/peer"
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
tswarm "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
)
// PeerScoresTestSuite tests peer parameterization.
type PeerScoresTestSuite struct {
suite.Suite
mockGater *p2pMocks.ConnectionGater
mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer
logger log.Logger
}
// SetupTest sets up the test suite.
func (testSuite *PeerScoresTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.ConnectionGater{}
testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{}
testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError)
}
// TestPeerScores runs the PeerScoresTestSuite.
func TestPeerScores(t *testing.T) {
suite.Run(t, new(PeerScoresTestSuite))
}
// getNetHosts generates a slice of hosts using the [libp2p/go-libp2p] library.
func getNetHosts(testSuite *PeerScoresTestSuite, ctx context.Context, n int) []host.Host {
var out []host.Host
for i := 0; i < n; i++ {
netw := tswarm.GenSwarm(testSuite.T())
h := bhost.NewBlankHost(netw)
testSuite.T().Cleanup(func() { h.Close() })
out = append(out, h)
}
return out
}
func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []host.Host) []*pubsub.PubSub {
var psubs []*pubsub.PubSub
logger := testlog.Logger(testSuite.T(), log.LvlCrit)
// For each host, create a default gossipsub router.
for _, h := range hosts {
rt := pubsub.DefaultGossipSubRouter(h)
opts := []pubsub.Option{}
opts = append(opts, p2p.ConfigurePeerScoring(h, testSuite.mockGater, &p2p.Config{
PeerScoring: pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 {
if p == hosts[0].ID() {
return -1000
} else {
return 0
}
},
AppSpecificWeight: 1,
DecayInterval: time.Second,
DecayToZero: 0.01,
},
}, testSuite.mockMetricer, logger)...)
ps, err := pubsub.NewGossipSubWithRouter(ctx, h, rt, opts...)
if err != nil {
panic(err)
}
psubs = append(psubs, ps)
}
return psubs
}
func connectHosts(t *testing.T, hosts []host.Host, d int) {
for i, a := range hosts {
for j := 0; j < d; j++ {
n := rand.Intn(len(hosts))
if n == i {
j--
continue
}
b := hosts[n]
pinfo := a.Peerstore().PeerInfo(a.ID())
err := b.Connect(context.Background(), pinfo)
if err != nil {
t.Fatal(err)
}
}
}
}
// TestNegativeScores tests blocking peers with negative scores.
//
// This follows the testing done in libp2p's gossipsub_test.go [TestGossipsubNegativeScore] function.
func (testSuite *PeerScoresTestSuite) TestNegativeScores() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testSuite.mockMetricer.On("RecordPeerScoring", mock.Anything, float64(0)).Return(nil)
testSuite.mockMetricer.On("RecordPeerScoring", mock.Anything, float64(-1000)).Return(nil)
testSuite.mockGater.On("ListBlockedPeers").Return([]peer.ID{})
// Construct 20 hosts using the [getNetHosts] function.
hosts := getNetHosts(testSuite, ctx, 20)
testSuite.Equal(20, len(hosts))
// Construct 20 gossipsub routers using the [newGossipSubs] function.
pubsubs := newGossipSubs(testSuite, ctx, hosts)
testSuite.Equal(20, len(pubsubs))
// Connect the hosts in a dense network
connectHosts(testSuite.T(), hosts, 10)
// Create subscriptions
var subs []*pubsub.Subscription
var topics []*pubsub.Topic
for _, ps := range pubsubs {
topic, err := ps.Join("test")
testSuite.NoError(err)
sub, err := topic.Subscribe()
testSuite.NoError(err)
subs = append(subs, sub)
topics = append(topics, topic)
}
// Wait and then publish messages
time.Sleep(3 * time.Second)
for i := 0; i < 20; i++ {
msg := []byte(fmt.Sprintf("message %d", i))
topic := topics[i]
err := topic.Publish(ctx, msg)
testSuite.NoError(err)
time.Sleep(20 * time.Millisecond)
}
// Allow gossip to propagate
time.Sleep(2 * time.Second)
// Collects all messages from a subscription
collectAll := func(sub *pubsub.Subscription) []*pubsub.Message {
var res []*pubsub.Message
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
for {
msg, err := sub.Next(ctx)
if err != nil {
break
}
res = append(res, msg)
}
return res
}
// Collect messages for the first host subscription
// This host should only receive 1 message from itself
count := len(collectAll(subs[0]))
testSuite.Equal(1, count)
// Validate that all messages were received from the first peer
for _, sub := range subs[1:] {
all := collectAll(sub)
for _, m := range all {
testSuite.NotEqual(hosts[0].ID(), m.ReceivedFrom)
}
}
}
...@@ -64,6 +64,18 @@ func (p *Prepared) ConfigureGossip(params *pubsub.GossipSubParams) []pubsub.Opti ...@@ -64,6 +64,18 @@ func (p *Prepared) ConfigureGossip(params *pubsub.GossipSubParams) []pubsub.Opti
return nil return nil
} }
func (p *Prepared) PeerScoringParams() *pubsub.PeerScoreParams {
return nil
}
func (p *Prepared) BanPeers() bool {
return false
}
func (p *Prepared) TopicScoringParams() *pubsub.TopicScoreParams {
return nil
}
func (p *Prepared) Disabled() bool { func (p *Prepared) Disabled() bool {
return false return false
} }
...@@ -30,9 +30,9 @@ import ( ...@@ -30,9 +30,9 @@ import (
// - banning peers based on score // - banning peers based on score
var ( var (
DisabledDiscovery = errors.New("discovery disabled") ErrDisabledDiscovery = errors.New("discovery disabled")
NoConnectionManager = errors.New("no connection manager") ErrNoConnectionManager = errors.New("no connection manager")
NoConnectionGater = errors.New("no connection gater") ErrNoConnectionGater = errors.New("no connection gater")
) )
type Node interface { type Node interface {
...@@ -236,7 +236,7 @@ func (s *APIBackend) DiscoveryTable(_ context.Context) ([]*enode.Node, error) { ...@@ -236,7 +236,7 @@ func (s *APIBackend) DiscoveryTable(_ context.Context) ([]*enode.Node, error) {
if dv5 := s.node.Dv5Udp(); dv5 != nil { if dv5 := s.node.Dv5Udp(); dv5 != nil {
return dv5.AllNodes(), nil return dv5.AllNodes(), nil
} else { } else {
return nil, DisabledDiscovery return nil, ErrDisabledDiscovery
} }
} }
...@@ -244,7 +244,7 @@ func (s *APIBackend) BlockPeer(_ context.Context, p peer.ID) error { ...@@ -244,7 +244,7 @@ func (s *APIBackend) BlockPeer(_ context.Context, p peer.ID) error {
recordDur := s.m.RecordRPCServerRequest("opp2p_blockPeer") recordDur := s.m.RecordRPCServerRequest("opp2p_blockPeer")
defer recordDur() defer recordDur()
if gater := s.node.ConnectionGater(); gater == nil { if gater := s.node.ConnectionGater(); gater == nil {
return NoConnectionGater return ErrNoConnectionGater
} else { } else {
return gater.BlockPeer(p) return gater.BlockPeer(p)
} }
...@@ -254,7 +254,7 @@ func (s *APIBackend) UnblockPeer(_ context.Context, p peer.ID) error { ...@@ -254,7 +254,7 @@ func (s *APIBackend) UnblockPeer(_ context.Context, p peer.ID) error {
recordDur := s.m.RecordRPCServerRequest("opp2p_unblockPeer") recordDur := s.m.RecordRPCServerRequest("opp2p_unblockPeer")
defer recordDur() defer recordDur()
if gater := s.node.ConnectionGater(); gater == nil { if gater := s.node.ConnectionGater(); gater == nil {
return NoConnectionGater return ErrNoConnectionGater
} else { } else {
return gater.UnblockPeer(p) return gater.UnblockPeer(p)
} }
...@@ -264,7 +264,7 @@ func (s *APIBackend) ListBlockedPeers(_ context.Context) ([]peer.ID, error) { ...@@ -264,7 +264,7 @@ func (s *APIBackend) ListBlockedPeers(_ context.Context) ([]peer.ID, error) {
recordDur := s.m.RecordRPCServerRequest("opp2p_listBlockedPeers") recordDur := s.m.RecordRPCServerRequest("opp2p_listBlockedPeers")
defer recordDur() defer recordDur()
if gater := s.node.ConnectionGater(); gater == nil { if gater := s.node.ConnectionGater(); gater == nil {
return nil, NoConnectionGater return nil, ErrNoConnectionGater
} else { } else {
return gater.ListBlockedPeers(), nil return gater.ListBlockedPeers(), nil
} }
...@@ -276,7 +276,7 @@ func (s *APIBackend) BlockAddr(_ context.Context, ip net.IP) error { ...@@ -276,7 +276,7 @@ func (s *APIBackend) BlockAddr(_ context.Context, ip net.IP) error {
recordDur := s.m.RecordRPCServerRequest("opp2p_blockAddr") recordDur := s.m.RecordRPCServerRequest("opp2p_blockAddr")
defer recordDur() defer recordDur()
if gater := s.node.ConnectionGater(); gater == nil { if gater := s.node.ConnectionGater(); gater == nil {
return NoConnectionGater return ErrNoConnectionGater
} else { } else {
return gater.BlockAddr(ip) return gater.BlockAddr(ip)
} }
...@@ -286,7 +286,7 @@ func (s *APIBackend) UnblockAddr(_ context.Context, ip net.IP) error { ...@@ -286,7 +286,7 @@ func (s *APIBackend) UnblockAddr(_ context.Context, ip net.IP) error {
recordDur := s.m.RecordRPCServerRequest("opp2p_unblockAddr") recordDur := s.m.RecordRPCServerRequest("opp2p_unblockAddr")
defer recordDur() defer recordDur()
if gater := s.node.ConnectionGater(); gater == nil { if gater := s.node.ConnectionGater(); gater == nil {
return NoConnectionGater return ErrNoConnectionGater
} else { } else {
return gater.UnblockAddr(ip) return gater.UnblockAddr(ip)
} }
...@@ -296,7 +296,7 @@ func (s *APIBackend) ListBlockedAddrs(_ context.Context) ([]net.IP, error) { ...@@ -296,7 +296,7 @@ func (s *APIBackend) ListBlockedAddrs(_ context.Context) ([]net.IP, error) {
recordDur := s.m.RecordRPCServerRequest("opp2p_listBlockedAddrs") recordDur := s.m.RecordRPCServerRequest("opp2p_listBlockedAddrs")
defer recordDur() defer recordDur()
if gater := s.node.ConnectionGater(); gater == nil { if gater := s.node.ConnectionGater(); gater == nil {
return nil, NoConnectionGater return nil, ErrNoConnectionGater
} else { } else {
return gater.ListBlockedAddrs(), nil return gater.ListBlockedAddrs(), nil
} }
...@@ -308,7 +308,7 @@ func (s *APIBackend) BlockSubnet(_ context.Context, ipnet *net.IPNet) error { ...@@ -308,7 +308,7 @@ func (s *APIBackend) BlockSubnet(_ context.Context, ipnet *net.IPNet) error {
recordDur := s.m.RecordRPCServerRequest("opp2p_blockSubnet") recordDur := s.m.RecordRPCServerRequest("opp2p_blockSubnet")
defer recordDur() defer recordDur()
if gater := s.node.ConnectionGater(); gater == nil { if gater := s.node.ConnectionGater(); gater == nil {
return NoConnectionGater return ErrNoConnectionGater
} else { } else {
return gater.BlockSubnet(ipnet) return gater.BlockSubnet(ipnet)
} }
...@@ -318,7 +318,7 @@ func (s *APIBackend) UnblockSubnet(_ context.Context, ipnet *net.IPNet) error { ...@@ -318,7 +318,7 @@ func (s *APIBackend) UnblockSubnet(_ context.Context, ipnet *net.IPNet) error {
recordDur := s.m.RecordRPCServerRequest("opp2p_unblockSubnet") recordDur := s.m.RecordRPCServerRequest("opp2p_unblockSubnet")
defer recordDur() defer recordDur()
if gater := s.node.ConnectionGater(); gater == nil { if gater := s.node.ConnectionGater(); gater == nil {
return NoConnectionGater return ErrNoConnectionGater
} else { } else {
return gater.UnblockSubnet(ipnet) return gater.UnblockSubnet(ipnet)
} }
...@@ -328,7 +328,7 @@ func (s *APIBackend) ListBlockedSubnets(_ context.Context) ([]*net.IPNet, error) ...@@ -328,7 +328,7 @@ func (s *APIBackend) ListBlockedSubnets(_ context.Context) ([]*net.IPNet, error)
recordDur := s.m.RecordRPCServerRequest("opp2p_listBlockedSubnets") recordDur := s.m.RecordRPCServerRequest("opp2p_listBlockedSubnets")
defer recordDur() defer recordDur()
if gater := s.node.ConnectionGater(); gater == nil { if gater := s.node.ConnectionGater(); gater == nil {
return nil, NoConnectionGater return nil, ErrNoConnectionGater
} else { } else {
return gater.ListBlockedSubnets(), nil return gater.ListBlockedSubnets(), nil
} }
...@@ -338,7 +338,7 @@ func (s *APIBackend) ProtectPeer(_ context.Context, p peer.ID) error { ...@@ -338,7 +338,7 @@ func (s *APIBackend) ProtectPeer(_ context.Context, p peer.ID) error {
recordDur := s.m.RecordRPCServerRequest("opp2p_protectPeer") recordDur := s.m.RecordRPCServerRequest("opp2p_protectPeer")
defer recordDur() defer recordDur()
if manager := s.node.ConnectionManager(); manager == nil { if manager := s.node.ConnectionManager(); manager == nil {
return NoConnectionManager return ErrNoConnectionManager
} else { } else {
manager.Protect(p, "api-protected") manager.Protect(p, "api-protected")
return nil return nil
...@@ -349,7 +349,7 @@ func (s *APIBackend) UnprotectPeer(_ context.Context, p peer.ID) error { ...@@ -349,7 +349,7 @@ func (s *APIBackend) UnprotectPeer(_ context.Context, p peer.ID) error {
recordDur := s.m.RecordRPCServerRequest("opp2p_unprotectPeer") recordDur := s.m.RecordRPCServerRequest("opp2p_unprotectPeer")
defer recordDur() defer recordDur()
if manager := s.node.ConnectionManager(); manager == nil { if manager := s.node.ConnectionManager(); manager == nil {
return NoConnectionManager return ErrNoConnectionManager
} else { } else {
manager.Unprotect(p, "api-protected") manager.Unprotect(p, "api-protected")
return nil return nil
......
package p2p
import (
"fmt"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
// MeshWeight is the weight of the mesh delivery topic.
const MeshWeight = -0.7
// MaxInMeshScore is the maximum score for being in the mesh.
const MaxInMeshScore = 10
// DecayEpoch is the number of epochs to decay the score over.
const DecayEpoch = time.Duration(5)
// LightTopicScoreParams is a default instantiation of [pubsub.TopicScoreParams].
// See [TopicScoreParams] for detailed documentation.
//
// [TopicScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#TopicScoreParams
var LightTopicScoreParams = func(blockTime uint64) pubsub.TopicScoreParams {
slot := time.Duration(blockTime) * time.Second
if slot == 0 {
slot = 2 * time.Second
}
epoch := 6 * slot
invalidDecayPeriod := 50 * epoch
return pubsub.TopicScoreParams{
TopicWeight: 0.8,
TimeInMeshWeight: MaxInMeshScore / inMeshCap(slot),
TimeInMeshQuantum: slot,
TimeInMeshCap: inMeshCap(slot),
FirstMessageDeliveriesWeight: 1,
FirstMessageDeliveriesDecay: ScoreDecay(20*epoch, slot),
FirstMessageDeliveriesCap: 23,
MeshMessageDeliveriesWeight: MeshWeight,
MeshMessageDeliveriesDecay: ScoreDecay(DecayEpoch*epoch, slot),
MeshMessageDeliveriesCap: float64(uint64(epoch/slot) * uint64(DecayEpoch)),
MeshMessageDeliveriesThreshold: float64(uint64(epoch/slot) * uint64(DecayEpoch) / 10),
MeshMessageDeliveriesWindow: 2 * time.Second,
MeshMessageDeliveriesActivation: 4 * epoch,
MeshFailurePenaltyWeight: MeshWeight,
MeshFailurePenaltyDecay: ScoreDecay(DecayEpoch*epoch, slot),
InvalidMessageDeliveriesWeight: -140.4475,
InvalidMessageDeliveriesDecay: ScoreDecay(invalidDecayPeriod, slot),
}
}
// the cap for `inMesh` time scoring.
func inMeshCap(slot time.Duration) float64 {
return float64((3600 * time.Second) / slot)
}
// DisabledTopicScoreParams is an instantiation of [pubsub.TopicScoreParams] where all scoring is disabled.
// See [TopicScoreParams] for detailed documentation.
//
// [TopicScoreParams]: https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub@v0.8.1#TopicScoreParams
var DisabledTopicScoreParams = func(blockTime uint64) pubsub.TopicScoreParams {
slot := time.Duration(blockTime) * time.Second
if slot == 0 {
slot = 2 * time.Second
}
epoch := 6 * slot
invalidDecayPeriod := 50 * epoch
return pubsub.TopicScoreParams{
TopicWeight: 0, // disabled
TimeInMeshWeight: 0, // disabled
TimeInMeshQuantum: slot,
TimeInMeshCap: inMeshCap(slot),
FirstMessageDeliveriesWeight: 0, // disabled
FirstMessageDeliveriesDecay: ScoreDecay(20*epoch, slot),
FirstMessageDeliveriesCap: 23,
MeshMessageDeliveriesWeight: 0, // disabled
MeshMessageDeliveriesDecay: ScoreDecay(DecayEpoch*epoch, slot),
MeshMessageDeliveriesCap: float64(uint64(epoch/slot) * uint64(DecayEpoch)),
MeshMessageDeliveriesThreshold: float64(uint64(epoch/slot) * uint64(DecayEpoch) / 10),
MeshMessageDeliveriesWindow: 2 * time.Second,
MeshMessageDeliveriesActivation: 4 * epoch,
MeshFailurePenaltyWeight: 0, // disabled
MeshFailurePenaltyDecay: ScoreDecay(DecayEpoch*epoch, slot),
InvalidMessageDeliveriesWeight: 0, // disabled
InvalidMessageDeliveriesDecay: ScoreDecay(invalidDecayPeriod, slot),
}
}
// TopicScoreParamsByName is a map of name to [pubsub.TopicScoreParams].
var TopicScoreParamsByName = map[string](func(blockTime uint64) pubsub.TopicScoreParams){
"light": LightTopicScoreParams,
"none": DisabledTopicScoreParams,
}
// AvailableTopicScoreParams returns a list of available topic score params.
// These can be used as an input to [GetTopicScoreParams] which returns the
// corresponding [pubsub.TopicScoreParams].
func AvailableTopicScoreParams() []string {
var params []string
for name := range TopicScoreParamsByName {
params = append(params, name)
}
return params
}
// GetTopicScoreParams returns the [pubsub.TopicScoreParams] for the given name.
func GetTopicScoreParams(name string, blockTime uint64) (pubsub.TopicScoreParams, error) {
params, ok := TopicScoreParamsByName[name]
if !ok {
return pubsub.TopicScoreParams{}, fmt.Errorf("invalid topic params %s", name)
}
return params(blockTime), nil
}
...@@ -46,7 +46,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -46,7 +46,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
return nil, fmt.Errorf("failed to load p2p signer: %w", err) return nil, fmt.Errorf("failed to load p2p signer: %w", err)
} }
p2pConfig, err := p2pcli.NewConfig(ctx) p2pConfig, err := p2pcli.NewConfig(ctx, rollupConfig.BlockTime)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to load p2p config: %w", err) return nil, fmt.Errorf("failed to load p2p config: %w", err)
} }
......
...@@ -115,7 +115,6 @@ func (ibc *IterativeBatchCall[K, V]) Fetch(ctx context.Context) error { ...@@ -115,7 +115,6 @@ func (ibc *IterativeBatchCall[K, V]) Fetch(ctx context.Context) error {
} }
return ctx.Err() return ctx.Err()
default: default:
break
} }
break break
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment