Commit f345b4aa authored by Andreas Bigger's avatar Andreas Bigger

fixed issues

parent 2055f3bc
......@@ -34,6 +34,15 @@ var (
Value: "none",
EnvVar: p2pEnv("PEER_SCORING"),
}
// Banning Flag - whether or not we want to act on the scoring
Banning = cli.BoolFlag{
Name: "p2p.ban.peers",
Usage: "Enables peer banning. This should ONLY be enabled once certain peer scoring is working correctly.",
Required: false,
EnvVar: p2pEnv("PEER_BANNING"),
}
TopicScoring = cli.StringFlag{
Name: "p2p.scoring.topics",
Usage: "Sets the topic scoring strategy. " +
......
......@@ -58,6 +58,10 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) {
return nil, fmt.Errorf("failed to load p2p peer scoring options: %w", err)
}
if err := loadBanningOption(conf, ctx); err != nil {
return nil, fmt.Errorf("failed to load banning option: %w", err)
}
if err := loadTopicScoringParams(conf, ctx, blockTime); err != nil {
return nil, fmt.Errorf("failed to load p2p topic scoring options: %w", err)
}
......@@ -82,39 +86,45 @@ func validatePort(p uint) (uint16, error) {
}
// loadTopicScoringParams loads the topic scoring options from the CLI context.
//
// If the topic scoring options are not set, then the default topic scoring.
func loadTopicScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64) error {
scoringLevel := ctx.GlobalString(flags.TopicScoring.Name)
if scoringLevel == "" {
scoringLevel = "none"
}
// Set default block topic scoring parameters
// See prysm: https://github.com/prysmaticlabs/prysm/blob/develop/beacon-chain/p2p/gossip_scoring_params.go
// And research from lighthouse: https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c
// And docs: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#topic-parameter-calculation-and-decay
topicScoreParams, err := p2p.GetTopicScoreParams(scoringLevel, blockTime)
if err != nil {
return err
if scoringLevel != "" {
// Set default block topic scoring parameters
// See prysm: https://github.com/prysmaticlabs/prysm/blob/develop/beacon-chain/p2p/gossip_scoring_params.go
// And research from lighthouse: https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c
// And docs: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#topic-parameter-calculation-and-decay
topicScoreParams, err := p2p.GetTopicScoreParams(scoringLevel, blockTime)
if err != nil {
return err
}
conf.TopicScoring = topicScoreParams
}
conf.TopicScoring = topicScoreParams
return nil
}
// loadPeerScoringParams loads the scoring options from the CLI context.
//
// If the scoring level is not set, no scoring is enabled.
func loadPeerScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64) error {
scoringLevel := ctx.GlobalString(flags.PeerScoring.Name)
if scoringLevel == "" {
scoringLevel = "none"
if scoringLevel != "" {
peerScoreParams, err := p2p.GetPeerScoreParams(scoringLevel, blockTime)
if err != nil {
return err
}
conf.PeerScoring = peerScoreParams
}
peerScoreParams, err := p2p.GetPeerScoreParams(scoringLevel, blockTime)
if err != nil {
return err
}
return nil
}
conf.PeerScoring = peerScoreParams
// loadBanningOption loads whether or not to ban peers from the CLI context.
func loadBanningOption(conf *p2p.Config, ctx *cli.Context) error {
ban := ctx.GlobalBool(flags.Banning.Name)
conf.BanningEnabled = ban
return nil
}
......
......@@ -54,6 +54,9 @@ type Config struct {
PeerScoring pubsub.PeerScoreParams
TopicScoring pubsub.TopicScoreParams
// Whether to ban peers based on their [PeerScoring] score.
BanningEnabled bool
ListenIP net.IP
ListenTCPPort uint16
......@@ -148,6 +151,10 @@ func (conf *Config) PeerScoringParams() *pubsub.PeerScoreParams {
return &conf.PeerScoring
}
func (conf *Config) BanPeers() bool {
return conf.BanningEnabled
}
func (conf *Config) TopicScoringParams() *pubsub.TopicScoreParams {
return &conf.TopicScoring
}
......
......@@ -53,6 +53,7 @@ var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
type GossipSetupConfigurables interface {
PeerScoringParams() *pubsub.PeerScoreParams
TopicScoringParams() *pubsub.TopicScoreParams
BanPeers() bool
ConfigureGossip(params *pubsub.GossipSubParams) []pubsub.Option
}
......@@ -457,8 +458,13 @@ func JoinGossip(p2pCtx context.Context, self peer.ID, topicScoreParams *pubsub.T
}
go LogTopicEvents(p2pCtx, log.New("topic", "blocks"), blocksTopicEvents)
if err = blocksTopic.SetScoreParams(topicScoreParams); err != nil {
return nil, fmt.Errorf("failed to set topic score params: %w", err)
// A [TimeInMeshQuantum] value of 0 means the topic score is disabled.
// If we passed a topicScoreParams with [TimeInMeshQuantum] set to 0,
// libp2p errors since the params will be rejected.
if topicScoreParams != nil && topicScoreParams.TimeInMeshQuantum != 0 {
if err = blocksTopic.SetScoreParams(topicScoreParams); err != nil {
return nil, fmt.Errorf("failed to set topic score params: %w", err)
}
}
subscription, err := blocksTopic.Subscribe()
......
// Code generated by mockery v2.14.0. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// PeerGater is an autogenerated mock type for the PeerGater type
type PeerGater struct {
mock.Mock
}
// Update provides a mock function with given fields: _a0, _a1
func (_m *PeerGater) Update(_a0 peer.ID, _a1 float64) {
_m.Called(_a0, _a1)
}
type mockConstructorTestingTNewPeerGater interface {
mock.TestingT
Cleanup(func())
}
// NewPeerGater creates a new instance of PeerGater. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewPeerGater(t mockConstructorTestingTNewPeerGater) *PeerGater {
mock := &PeerGater{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
......@@ -76,11 +76,11 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
// notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
fmt.Printf("Constructing gossip sub...\n")
n.gs, err = NewGossipSub(resourcesCtx, n.host, n.gater, rollupCfg, setup, metrics, log)
if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err)
}
n.gsOut, err = JoinGossip(resourcesCtx, n.host.ID(), setup.TopicScoringParams(), n.gs, log, rollupCfg, runCfg, gossipIn)
if err != nil {
return fmt.Errorf("failed to join blocks gossip topic: %w", err)
......
package p2p
import (
slices "golang.org/x/exp/slices"
log "github.com/ethereum/go-ethereum/log"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// ConnectionFactor is the factor by which we multiply the connection score.
const ConnectionFactor = -10
// PeerScoreThreshold is the threshold at which we block a peer.
const PeerScoreThreshold = -100
// gater is an internal implementation of the [PeerGater] interface.
type gater struct {
connGater ConnectionGater
log log.Logger
banEnabled bool
}
// PeerGater manages the connection gating of peers.
//
//go:generate mockery --name PeerGater --output mocks/
type PeerGater interface {
// Update handles a peer score update and blocks/unblocks the peer if necessary.
Update(peer.ID, float64)
}
// NewPeerGater returns a new peer gater.
func NewPeerGater(connGater ConnectionGater, log log.Logger, banEnabled bool) PeerGater {
return &gater{
connGater: connGater,
log: log,
banEnabled: banEnabled,
}
}
// Update handles a peer score update and blocks/unblocks the peer if necessary.
func (s *gater) Update(id peer.ID, score float64) {
// Check if the peer score is below the threshold
// If so, we need to block the peer
if score < PeerScoreThreshold && s.banEnabled {
s.log.Warn("peer blocking enabled, blocking peer", "id", id.String(), "score", score)
err := s.connGater.BlockPeer(id)
s.log.Warn("connection gater failed to block peer", id.String(), "err", err)
}
// TODO: This will never unblock a peer since the peer will disconnect and the score update hook will not trigger this call.
// Unblock peers whose score has recovered to an acceptable level
if (score > PeerScoreThreshold) && slices.Contains(s.connGater.ListBlockedPeers(), id) {
err := s.connGater.UnblockPeer(id)
s.log.Warn("connection gater failed to unblock peer", id.String(), "err", err)
}
}
package p2p_test
import (
"testing"
peer "github.com/libp2p/go-libp2p/core/peer"
p2p "github.com/ethereum-optimism/optimism/op-node/p2p"
log "github.com/ethereum/go-ethereum/log"
node "github.com/ethereum-optimism/optimism/op-node/node"
suite "github.com/stretchr/testify/suite"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
)
// PeerGaterTestSuite tests peer parameterization.
type PeerGaterTestSuite struct {
suite.Suite
mockGater *p2pMocks.ConnectionGater
logger log.Logger
}
// SetupTest sets up the test suite.
func (testSuite *PeerGaterTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.ConnectionGater{}
logger := node.DefaultLogConfig()
testSuite.logger = logger.NewLogger()
}
// TestPeerGater runs the PeerGaterTestSuite.
func TestPeerGater(t *testing.T) {
suite.Run(t, new(PeerGaterTestSuite))
}
// TestPeerScoreConstants validates the peer score constants.
func (testSuite *PeerGaterTestSuite) TestPeerScoreConstants() {
testSuite.Equal(-10, p2p.ConnectionFactor)
testSuite.Equal(-100, p2p.PeerScoreThreshold)
}
// TestPeerGaterUpdate tests the peer gater update hook.
func (testSuite *PeerGaterTestSuite) TestPeerGaterUpdate() {
gater := p2p.NewPeerGater(
testSuite.mockGater,
testSuite.logger,
true,
)
// Mock a connection gater peer block call
// Since the peer score is below the [PeerScoreThreshold] of -100,
// the [BlockPeer] method should be called
testSuite.mockGater.On("BlockPeer", peer.ID("peer1")).Return(nil)
// Apply the peer gater update
gater.Update(peer.ID("peer1"), float64(-100))
}
// TestPeerGaterUpdateNoBanning tests the peer gater update hook without banning set
func (testSuite *PeerGaterTestSuite) TestPeerGaterUpdateNoBanning() {
gater := p2p.NewPeerGater(
testSuite.mockGater,
testSuite.logger,
false,
)
// Notice: [BlockPeer] should not be called since banning is not enabled
gater.Update(peer.ID("peer1"), float64(-100000))
}
package p2p
import (
"github.com/ethereum/go-ethereum/log"
log "github.com/ethereum/go-ethereum/log"
peer "github.com/libp2p/go-libp2p/core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
// "github.com/libp2p/go-libp2p/core/peerstore"
"golang.org/x/exp/slices"
)
const ConnectionFactor = -10
const PeerScoreThreshold = -100
type scorer struct {
connGater ConnectionGater
peerStore Peerstore
metricer GossipMetricer
log log.Logger
gater PeerGater
}
// Peerstore is a subset of the libp2p peerstore.Peerstore interface.
......@@ -41,12 +34,12 @@ type Scorer interface {
}
// NewScorer returns a new peer scorer.
func NewScorer(connGater ConnectionGater, peerStore Peerstore, metricer GossipMetricer, log log.Logger) Scorer {
func NewScorer(peerGater PeerGater, peerStore Peerstore, metricer GossipMetricer, log log.Logger) Scorer {
return &scorer{
connGater: connGater,
peerStore: peerStore,
metricer: metricer,
log: log,
gater: peerGater,
}
}
......@@ -54,49 +47,25 @@ func NewScorer(connGater ConnectionGater, peerStore Peerstore, metricer GossipMe
// It is passed into the pubsub library as a [pubsub.ExtendedPeerScoreInspectFn] in the [pubsub.WithPeerScoreInspect] option.
// The returned [pubsub.ExtendedPeerScoreInspectFn] is called with a mapping of peer IDs to peer score snapshots.
func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
// peer := s.peerStore.Get(peer.ID)
// loop through each peer ID, get the score
// if the score < the configured threshold, ban the peer
// factor in the number of connections/disconnections into the score
// e.g., score = score - (s.peerConnections[peerID] * ConnectionFactor)
// s.connGater.BanAddr(peerID)
return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) {
for id, snap := range m {
// Record peer score in the metricer
s.metricer.RecordPeerScoring(id, snap.Score)
// TODO: if we don't have to do this calculation here, we can push score updates to the metricer
// TODO: which would leave the scoring to the pubsub lib
// peer, err := s.peerStore.Get(id)
// if err != nil {
// }
// Check if the peer score is below the threshold
// If so, we need to block the peer
if snap.Score < PeerScoreThreshold {
err := s.connGater.BlockPeer(id)
s.log.Warn("connection gater failed to block peer", id.String(), "err", err)
}
// Unblock peers whose score has recovered to an acceptable level
if (snap.Score > PeerScoreThreshold) && slices.Contains(s.connGater.ListBlockedPeers(), id) {
err := s.connGater.UnblockPeer(id)
s.log.Warn("connection gater failed to unblock peer", id.String(), "err", err)
}
// Update with the peer gater
s.gater.Update(id, snap.Score)
}
}
}
// TODO: call the two methods below from the notifier
// OnConnect is called when a peer connects.
// See [p2p.NotificationsMetricer] for invocation.
func (s *scorer) OnConnect() {
// record a connection
// no-op
}
// OnDisconnect is called when a peer disconnects.
// See [p2p.NotificationsMetricer] for invocation.
func (s *scorer) OnDisconnect() {
// record a disconnection
// no-op
}
......@@ -16,7 +16,8 @@ import (
type PeerScorerTestSuite struct {
suite.Suite
mockGater *p2pMocks.ConnectionGater
// mockConnGater *p2pMocks.ConnectionGater
mockGater *p2pMocks.PeerGater
mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer
logger log.Logger
......@@ -24,7 +25,8 @@ type PeerScorerTestSuite struct {
// SetupTest sets up the test suite.
func (testSuite *PeerScorerTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.ConnectionGater{}
testSuite.mockGater = &p2pMocks.PeerGater{}
// testSuite.mockConnGater = &p2pMocks.ConnectionGater{}
testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{}
logger := node.DefaultLogConfig()
......@@ -36,12 +38,6 @@ func TestPeerScorer(t *testing.T) {
suite.Run(t, new(PeerScorerTestSuite))
}
// TestPeerScoreConstants validates the peer score constants.
func (testSuite *PeerScorerTestSuite) TestPeerScoreConstants() {
testSuite.Equal(-10, p2p.ConnectionFactor)
testSuite.Equal(-100, p2p.PeerScoreThreshold)
}
// TestPeerScorerOnConnect ensures we can call the OnConnect method on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestPeerScorerOnConnect() {
scorer := p2p.NewScorer(
......@@ -78,8 +74,8 @@ func (testSuite *PeerScorerTestSuite) TestSnapshotHook() {
// This doesn't return anything
testSuite.mockMetricer.On("RecordPeerScoring", peer.ID("peer1"), float64(-100)).Return(nil)
// Since the peer score is not below the [PeerScoreThreshold] of -100,
// no connection gater method should be called since the peer isn't already blocked
// Mock the peer gater call
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-100)).Return(nil)
// Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
......@@ -105,10 +101,8 @@ func (testSuite *PeerScorerTestSuite) TestSnapshotHookBlockPeer() {
// This doesn't return anything
testSuite.mockMetricer.On("RecordPeerScoring", peer.ID("peer1"), float64(-101)).Return(nil)
// Mock a connection gater peer block call
// Since the peer score is below the [PeerScoreThreshold] of -100,
// the [BlockPeer] method should be called
testSuite.mockGater.On("BlockPeer", peer.ID("peer1")).Return(nil)
// Mock the peer gater call
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-101)).Return(nil)
// Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
......
......@@ -12,10 +12,18 @@ func ConfigurePeerScoring(h host.Host, g ConnectionGater, gossipConf GossipSetup
// to return early without returning any [pubsub.Option].
peerScoreParams := gossipConf.PeerScoringParams()
peerScoreThresholds := NewPeerScoreThresholds()
scorer := NewScorer(g, h.Peerstore(), m, log)
opts := []pubsub.Option{
pubsub.WithPeerScore(peerScoreParams, &peerScoreThresholds),
pubsub.WithPeerScoreInspect(scorer.SnapshotHook(), peerScoreInspectFrequency),
banEnabled := gossipConf.BanPeers()
peerGater := NewPeerGater(g, log, banEnabled)
scorer := NewScorer(peerGater, h.Peerstore(), m, log)
opts := []pubsub.Option{}
// Check the app specific score since libp2p doesn't export it's [validate] function :/
if peerScoreParams.AppSpecificScore != nil {
opts = []pubsub.Option{
pubsub.WithPeerScore(peerScoreParams, &peerScoreThresholds),
pubsub.WithPeerScoreInspect(scorer.SnapshotHook(), peerScoreInspectFrequency),
}
} else {
log.Warn("Proceeding with no peer scoring...\nMissing AppSpecificScore in peer scoring params")
}
return opts
}
package p2p_test
import (
"fmt"
"time"
"math/rand"
"context"
"testing"
p2p "github.com/ethereum-optimism/optimism/op-node/p2p"
node "github.com/ethereum-optimism/optimism/op-node/node"
testlog "github.com/ethereum-optimism/optimism/op-node/testlog"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
mock "github.com/stretchr/testify/mock"
suite "github.com/stretchr/testify/suite"
log "github.com/ethereum/go-ethereum/log"
peer "github.com/libp2p/go-libp2p/core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
host "github.com/libp2p/go-libp2p/core/host"
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
tswarm "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
)
// PeerScoresTestSuite tests peer parameterization.
type PeerScoresTestSuite struct {
suite.Suite
mockGater *p2pMocks.ConnectionGater
mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer
logger log.Logger
}
// SetupTest sets up the test suite.
func (testSuite *PeerScoresTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.ConnectionGater{}
testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{}
logger := node.DefaultLogConfig()
testSuite.logger = logger.NewLogger()
}
// TestPeerScores runs the PeerScoresTestSuite.
func TestPeerScores(t *testing.T) {
suite.Run(t, new(PeerScoresTestSuite))
}
// getNetHosts generates a slice of hosts using the [libp2p/go-libp2p] library.
func getNetHosts(testSuite *PeerScoresTestSuite, ctx context.Context, n int) []host.Host {
var out []host.Host
for i := 0; i < n; i++ {
netw := tswarm.GenSwarm(testSuite.T())
h := bhost.NewBlankHost(netw)
testSuite.T().Cleanup(func() { h.Close() })
out = append(out, h)
}
return out
}
func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []host.Host) []*pubsub.PubSub {
var psubs []*pubsub.PubSub
logger := testlog.Logger(testSuite.T(), log.LvlCrit)
// For each host, create a default gossipsub router.
for _, h := range hosts {
rt := pubsub.DefaultGossipSubRouter(h)
opts := []pubsub.Option{}
opts = append(opts, p2p.ConfigurePeerScoring(h, testSuite.mockGater, &p2p.Config{
PeerScoring: pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 {
if p == hosts[0].ID() {
return -1000
} else {
return 0
}
},
AppSpecificWeight: 1,
DecayInterval: time.Second,
DecayToZero: 0.01,
},
}, testSuite.mockMetricer, logger)...)
ps, err := pubsub.NewGossipSubWithRouter(ctx, h, rt, opts...)
if err != nil {
panic(err)
}
psubs = append(psubs, ps)
}
return psubs
}
func connectHosts(t *testing.T, hosts []host.Host, d int) {
for i, a := range hosts {
for j := 0; j < d; j++ {
n := rand.Intn(len(hosts))
if n == i {
j--
continue
}
b := hosts[n]
pinfo := a.Peerstore().PeerInfo(a.ID())
err := b.Connect(context.Background(), pinfo)
if err != nil {
t.Fatal(err)
}
}
}
}
// TestNegativeScores tests blocking peers with negative scores.
//
// This follows the testing done in libp2p's gossipsub_test.go [TestGossipsubNegativeScore] function.
func (testSuite *PeerScoresTestSuite) TestNegativeScores() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testSuite.mockMetricer.On("RecordPeerScoring", mock.Anything, float64(0)).Return(nil)
testSuite.mockMetricer.On("RecordPeerScoring", mock.Anything, float64(-1000)).Return(nil)
testSuite.mockGater.On("ListBlockedPeers").Return([]peer.ID{})
// Construct 20 hosts using the [getNetHosts] function.
hosts := getNetHosts(testSuite, ctx, 20)
testSuite.Equal(20, len(hosts))
// Construct 20 gossipsub routers using the [newGossipSubs] function.
pubsubs := newGossipSubs(testSuite, ctx, hosts)
testSuite.Equal(20, len(pubsubs))
// Connect the hosts in a dense network
connectHosts(testSuite.T(), hosts, 10)
// Create subscriptions
var subs []*pubsub.Subscription
for _, ps := range pubsubs {
sub, err := ps.Subscribe("test")
testSuite.NoError(err)
subs = append(subs, sub)
}
// Wait and then publish messages
time.Sleep(3 * time.Second)
for i := 0; i < 20; i++ {
msg := []byte(fmt.Sprintf("message %d", i))
pubsubs[i%20].Publish("test", msg)
time.Sleep(20 * time.Millisecond)
}
// Allow gossip to propagate
time.Sleep(2 * time.Second)
// Collects all messages from a subscription
collectAll := func(sub *pubsub.Subscription) []*pubsub.Message {
var res []*pubsub.Message
ctx, cancel := context.WithTimeout(context.Background(), 100 * time.Millisecond)
defer cancel()
for {
msg, err := sub.Next(ctx)
if err != nil {
break
}
res = append(res, msg)
}
return res
}
// Collect messages for the first host subscription
// This host should only receive 1 message from itself
count := len(collectAll(subs[0]))
testSuite.Equal(1, count)
// Validate that all messages were received from the first peer
for _, sub := range subs[1:] {
all := collectAll(sub)
for _, m := range all {
testSuite.NotEqual(hosts[0].ID(), m.ReceivedFrom)
}
}
}
......@@ -68,6 +68,10 @@ func (p *Prepared) PeerScoringParams() *pubsub.PeerScoreParams {
return nil
}
func (p *Prepared) BanPeers() bool {
return false
}
func (p *Prepared) TopicScoringParams() *pubsub.TopicScoreParams {
return nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment