Commit c79a1dec authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #5870 from ethereum-optimism/aj/score-histogram

op-node: Add a histogram to report current peer scores
parents 0638daf5 74419cc3
...@@ -35,13 +35,11 @@ var ( ...@@ -35,13 +35,11 @@ var (
EnvVar: p2pEnv("PEER_SCORING"), EnvVar: p2pEnv("PEER_SCORING"),
} }
PeerScoreBands = cli.StringFlag{ PeerScoreBands = cli.StringFlag{
Name: "p2p.score.bands", Name: "p2p.score.bands",
Usage: "Sets the peer score bands used primarily for peer score metrics. " + Usage: "Deprecated. This option is ignored and is only present for backwards compatibility.",
"Should be provided in following format: <threshold>:<label>;<threshold>:<label>;..." +
"For example: -40:graylist;-20:restricted;0:nopx;20:friend;",
Required: false, Required: false,
Value: "-40:<=-40;-10:<=-10;-5:<=-05;-0.05:<=-00.05;0:<=0;0.05:<=00.05;5:<=05;10:<=10;20:<=20;100:>20;", Value: "",
EnvVar: p2pEnv("SCORE_BANDS"), Hidden: true,
} }
// Banning Flag - whether or not we want to act on the scoring // Banning Flag - whether or not we want to act on the scoring
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"time" "time"
ophttp "github.com/ethereum-optimism/optimism/op-node/http" ophttp "github.com/ethereum-optimism/optimism/op-node/http"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/metrics" "github.com/ethereum-optimism/optimism/op-service/metrics"
pb "github.com/libp2p/go-libp2p-pubsub/pb" pb "github.com/libp2p/go-libp2p-pubsub/pb"
...@@ -66,7 +67,7 @@ type Metricer interface { ...@@ -66,7 +67,7 @@ type Metricer interface {
Document() []metrics.DocumentedMetric Document() []metrics.DocumentedMetric
RecordChannelInputBytes(num int) RecordChannelInputBytes(num int)
// P2P Metrics // P2P Metrics
SetPeerScores(scores map[string]float64) SetPeerScores(allScores []store.PeerScores)
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
PayloadsQuarantineSize(n int) PayloadsQuarantineSize(n int)
...@@ -134,13 +135,13 @@ type Metrics struct { ...@@ -134,13 +135,13 @@ type Metrics struct {
// P2P Metrics // P2P Metrics
PeerCount prometheus.Gauge PeerCount prometheus.Gauge
StreamCount prometheus.Gauge StreamCount prometheus.Gauge
PeerScores *prometheus.GaugeVec
GossipEventsTotal *prometheus.CounterVec GossipEventsTotal *prometheus.CounterVec
BandwidthTotal *prometheus.GaugeVec BandwidthTotal *prometheus.GaugeVec
PeerUnbans prometheus.Counter PeerUnbans prometheus.Counter
IPUnbans prometheus.Counter IPUnbans prometheus.Counter
Dials *prometheus.CounterVec Dials *prometheus.CounterVec
Accepts *prometheus.CounterVec Accepts *prometheus.CounterVec
PeerScores *prometheus.HistogramVec
ChannelInputBytes prometheus.Counter ChannelInputBytes prometheus.Counter
...@@ -161,6 +162,7 @@ func NewMetrics(procName string) *Metrics { ...@@ -161,6 +162,7 @@ func NewMetrics(procName string) *Metrics {
registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
registry.MustRegister(collectors.NewGoCollector()) registry.MustRegister(collectors.NewGoCollector())
factory := metrics.With(registry) factory := metrics.With(registry)
return &Metrics{ return &Metrics{
Info: factory.NewGaugeVec(prometheus.GaugeOpts{ Info: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
...@@ -308,19 +310,12 @@ func NewMetrics(procName string) *Metrics { ...@@ -308,19 +310,12 @@ func NewMetrics(procName string) *Metrics {
Name: "peer_count", Name: "peer_count",
Help: "Count of currently connected p2p peers", Help: "Count of currently connected p2p peers",
}), }),
// Notice: We cannot use peer ids as [Labels] in the GaugeVec PeerScores: factory.NewHistogramVec(prometheus.HistogramOpts{
// since peer ids would open a service attack vector.
// Each peer id would be a separate metric, flooding prometheus.
//
// [Labels]: https://prometheus.io/docs/practices/naming/#labels
PeerScores: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Subsystem: "p2p",
Name: "peer_scores", Name: "peer_scores",
Help: "Count of peer scores grouped by score", Help: "Histogram of currrently connected peer scores",
}, []string{ Buckets: []float64{-100, -40, -20, -10, -5, -2, -1, -0.5, -0.05, 0, 0.05, 0.5, 1, 2, 5, 10, 20, 40},
"band", }, []string{"type"}),
}),
StreamCount: factory.NewGauge(prometheus.GaugeOpts{ StreamCount: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Subsystem: "p2p", Subsystem: "p2p",
...@@ -450,11 +445,17 @@ func NewMetrics(procName string) *Metrics { ...@@ -450,11 +445,17 @@ func NewMetrics(procName string) *Metrics {
} }
} }
// SetPeerScores updates the peer score [prometheus.GaugeVec]. // SetPeerScores updates the peer score metrics.
// This takes a map of labels to scores. // Accepts a slice of peer scores in any order.
func (m *Metrics) SetPeerScores(scores map[string]float64) { func (m *Metrics) SetPeerScores(allScores []store.PeerScores) {
for label, score := range scores { for _, scores := range allScores {
m.PeerScores.WithLabelValues(label).Set(score) m.PeerScores.WithLabelValues("total").Observe(scores.Gossip.Total)
m.PeerScores.WithLabelValues("ipColocation").Observe(scores.Gossip.IPColocationFactor)
m.PeerScores.WithLabelValues("behavioralPenalty").Observe(scores.Gossip.BehavioralPenalty)
m.PeerScores.WithLabelValues("blocksFirstMessage").Observe(scores.Gossip.Blocks.FirstMessageDeliveries)
m.PeerScores.WithLabelValues("blocksTimeInMesh").Observe(scores.Gossip.Blocks.TimeInMesh)
m.PeerScores.WithLabelValues("blocksMessageDeliveries").Observe(scores.Gossip.Blocks.MeshMessageDeliveries)
m.PeerScores.WithLabelValues("blocksInvalidMessageDeliveries").Observe(scores.Gossip.Blocks.InvalidMessageDeliveries)
} }
} }
...@@ -785,7 +786,7 @@ func (n *noopMetricer) RecordSequencerReset() { ...@@ -785,7 +786,7 @@ func (n *noopMetricer) RecordSequencerReset() {
func (n *noopMetricer) RecordGossipEvent(evType int32) { func (n *noopMetricer) RecordGossipEvent(evType int32) {
} }
func (n *noopMetricer) SetPeerScores(scores map[string]float64) { func (n *noopMetricer) SetPeerScores(allScores []store.PeerScores) {
} }
func (n *noopMetricer) IncPeerCount() { func (n *noopMetricer) IncPeerCount() {
......
package p2p
import (
"testing"
"github.com/stretchr/testify/require"
)
// TestBandScorer_ParseDefault tests the [BandScorer.Parse] function
// on the default band scores cli flag value.
func TestBandScorer_ParseDefault(t *testing.T) {
// Create a new band scorer.
bandScorer, err := NewBandScorer("-40:graylist;-20:restricted;0:nopx;20:friend;")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.ElementsMatch(t, bandScorer.bands, []scorePair{
{band: "graylist", threshold: -40},
{band: "restricted", threshold: -20},
{band: "nopx", threshold: 0},
{band: "friend", threshold: 20},
})
}
// TestBandScorer_BucketCorrectly tests the [BandScorer.Bucket] function
// on a variety of scores.
func TestBandScorer_BucketCorrectly(t *testing.T) {
// Create a new band scorer.
bandScorer, err := NewBandScorer("-40:graylist;-20:restricted;0:nopx;20:friend;")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.Equal(t, bandScorer.Bucket(-100), "graylist")
require.Equal(t, bandScorer.Bucket(-40), "graylist")
require.Equal(t, bandScorer.Bucket(-39), "restricted")
require.Equal(t, bandScorer.Bucket(-20), "restricted")
require.Equal(t, bandScorer.Bucket(-19), "nopx")
require.Equal(t, bandScorer.Bucket(0), "nopx")
require.Equal(t, bandScorer.Bucket(1), "friend")
require.Equal(t, bandScorer.Bucket(20), "friend")
require.Equal(t, bandScorer.Bucket(21), "friend")
}
// TestBandScorer_BucketInverted tests the [BandScorer.Bucket] function
// on a variety of scores, in descending order.
func TestBandScorer_BucketInverted(t *testing.T) {
// Create a new band scorer.
bandScorer, err := NewBandScorer("20:friend;0:nopx;-20:restricted;-40:graylist;")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.Equal(t, bandScorer.Bucket(-100), "graylist")
require.Equal(t, bandScorer.Bucket(-40), "graylist")
require.Equal(t, bandScorer.Bucket(-39), "restricted")
require.Equal(t, bandScorer.Bucket(-20), "restricted")
require.Equal(t, bandScorer.Bucket(-19), "nopx")
require.Equal(t, bandScorer.Bucket(0), "nopx")
require.Equal(t, bandScorer.Bucket(1), "friend")
require.Equal(t, bandScorer.Bucket(20), "friend")
require.Equal(t, bandScorer.Bucket(21), "friend")
}
// TestBandScorer_ParseEmpty tests the [BandScorer.Parse] function
// on an empty string.
func TestBandScorer_ParseEmpty(t *testing.T) {
// Create a band scorer on an empty string.
bandScorer, err := NewBandScorer("")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.Len(t, bandScorer.bands, 0)
}
// TestBandScorer_ParseWhitespace tests the [BandScorer.Parse] function
// on a variety of whitespaced strings.
func TestBandScorer_ParseWhitespace(t *testing.T) {
// Create a band scorer on an empty string.
bandScorer, err := NewBandScorer(" ; ; ; ")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.Len(t, bandScorer.bands, 0)
}
...@@ -58,10 +58,6 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) { ...@@ -58,10 +58,6 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) {
return nil, fmt.Errorf("failed to load p2p peer scoring options: %w", err) return nil, fmt.Errorf("failed to load p2p peer scoring options: %w", err)
} }
if err := loadPeerScoreBands(conf, ctx); err != nil {
return nil, fmt.Errorf("failed to load p2p peer score bands: %w", err)
}
if err := loadBanningOptions(conf, ctx); err != nil { if err := loadBanningOptions(conf, ctx); err != nil {
return nil, fmt.Errorf("failed to load banning option: %w", err) return nil, fmt.Errorf("failed to load banning option: %w", err)
} }
...@@ -124,17 +120,6 @@ func loadPeerScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64) ...@@ -124,17 +120,6 @@ func loadPeerScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64)
return nil return nil
} }
// loadPeerScoreBands loads [p2p.BandScorer] from the CLI context.
func loadPeerScoreBands(conf *p2p.Config, ctx *cli.Context) error {
scoreBands := ctx.GlobalString(flags.PeerScoreBands.Name)
bandScorer, err := p2p.NewBandScorer(scoreBands)
if err != nil {
return err
}
conf.BandScoreThresholds = *bandScorer
return nil
}
// loadBanningOptions loads whether or not to ban peers from the CLI context. // loadBanningOptions loads whether or not to ban peers from the CLI context.
func loadBanningOptions(conf *p2p.Config, ctx *cli.Context) error { func loadBanningOptions(conf *p2p.Config, ctx *cli.Context) error {
conf.BanningEnabled = ctx.GlobalBool(flags.Banning.Name) conf.BanningEnabled = ctx.GlobalBool(flags.Banning.Name)
......
...@@ -47,7 +47,6 @@ type SetupP2P interface { ...@@ -47,7 +47,6 @@ type SetupP2P interface {
BanPeers() bool BanPeers() bool
BanThreshold() float64 BanThreshold() float64
BanDuration() time.Duration BanDuration() time.Duration
PeerBandScorer() *BandScoreThresholds
GossipSetupConfigurables GossipSetupConfigurables
ReqRespSyncEnabled() bool ReqRespSyncEnabled() bool
} }
...@@ -67,9 +66,6 @@ type Config struct { ...@@ -67,9 +66,6 @@ type Config struct {
PeerScoring pubsub.PeerScoreParams PeerScoring pubsub.PeerScoreParams
TopicScoring pubsub.TopicScoreParams TopicScoring pubsub.TopicScoreParams
// Peer Score Band Thresholds
BandScoreThresholds BandScoreThresholds
// Whether to ban peers based on their [PeerScoring] score. Should be negative. // Whether to ban peers based on their [PeerScoring] score. Should be negative.
BanningEnabled bool BanningEnabled bool
// Minimum score before peers are disconnected and banned // Minimum score before peers are disconnected and banned
...@@ -142,10 +138,6 @@ func (conf *Config) PeerScoringParams() *pubsub.PeerScoreParams { ...@@ -142,10 +138,6 @@ func (conf *Config) PeerScoringParams() *pubsub.PeerScoreParams {
return &conf.PeerScoring return &conf.PeerScoring
} }
func (conf *Config) PeerBandScorer() *BandScoreThresholds {
return &conf.BandScoreThresholds
}
func (conf *Config) BanPeers() bool { func (conf *Config) BanPeers() bool {
return conf.BanningEnabled return conf.BanningEnabled
} }
......
// Code generated by mockery v2.22.1. DO NOT EDIT. // Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks package mocks
...@@ -14,11 +14,6 @@ func (_m *GossipMetricer) RecordGossipEvent(evType int32) { ...@@ -14,11 +14,6 @@ func (_m *GossipMetricer) RecordGossipEvent(evType int32) {
_m.Called(evType) _m.Called(evType)
} }
// SetPeerScores provides a mock function with given fields: _a0
func (_m *GossipMetricer) SetPeerScores(_a0 map[string]float64) {
_m.Called(_a0)
}
type mockConstructorTestingTNewGossipMetricer interface { type mockConstructorTestingTNewGossipMetricer interface {
mock.TestingT mock.TestingT
Cleanup(func()) Cleanup(func())
......
...@@ -46,17 +46,27 @@ func (_m *Peerstore) Peers() peer.IDSlice { ...@@ -46,17 +46,27 @@ func (_m *Peerstore) Peers() peer.IDSlice {
} }
// SetScore provides a mock function with given fields: id, diff // SetScore provides a mock function with given fields: id, diff
func (_m *Peerstore) SetScore(id peer.ID, diff store.ScoreDiff) error { func (_m *Peerstore) SetScore(id peer.ID, diff store.ScoreDiff) (store.PeerScores, error) {
ret := _m.Called(id, diff) ret := _m.Called(id, diff)
var r0 error var r0 store.PeerScores
if rf, ok := ret.Get(0).(func(peer.ID, store.ScoreDiff) error); ok { var r1 error
if rf, ok := ret.Get(0).(func(peer.ID, store.ScoreDiff) (store.PeerScores, error)); ok {
return rf(id, diff)
}
if rf, ok := ret.Get(0).(func(peer.ID, store.ScoreDiff) store.PeerScores); ok {
r0 = rf(id, diff) r0 = rf(id, diff)
} else { } else {
r0 = ret.Error(0) r0 = ret.Get(0).(store.PeerScores)
} }
return r0 if rf, ok := ret.Get(1).(func(peer.ID, store.ScoreDiff) error); ok {
r1 = rf(id, diff)
} else {
r1 = ret.Error(1)
}
return r0, r1
} }
type mockConstructorTestingTNewPeerstore interface { type mockConstructorTestingTNewPeerstore interface {
......
// Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
store "github.com/ethereum-optimism/optimism/op-node/p2p/store"
)
// ScoreMetrics is an autogenerated mock type for the ScoreMetrics type
type ScoreMetrics struct {
mock.Mock
}
// SetPeerScores provides a mock function with given fields: _a0
func (_m *ScoreMetrics) SetPeerScores(_a0 []store.PeerScores) {
_m.Called(_a0)
}
type mockConstructorTestingTNewScoreMetrics interface {
mock.TestingT
Cleanup(func())
}
// NewScoreMetrics creates a new instance of ScoreMetrics. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewScoreMetrics(t mockConstructorTestingTNewScoreMetrics) *ScoreMetrics {
mock := &ScoreMetrics{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
...@@ -118,7 +118,7 @@ func (_c *PeerManager_GetPeerScore_Call) RunAndReturn(run func(peer.ID) (float64 ...@@ -118,7 +118,7 @@ func (_c *PeerManager_GetPeerScore_Call) RunAndReturn(run func(peer.ID) (float64
return _c return _c
} }
// IsProtected provides a mock function with given fields: _a0 // IsStatic provides a mock function with given fields: _a0
func (_m *PeerManager) IsStatic(_a0 peer.ID) bool { func (_m *PeerManager) IsStatic(_a0 peer.ID) bool {
ret := _m.Called(_a0) ret := _m.Called(_a0)
...@@ -132,30 +132,30 @@ func (_m *PeerManager) IsStatic(_a0 peer.ID) bool { ...@@ -132,30 +132,30 @@ func (_m *PeerManager) IsStatic(_a0 peer.ID) bool {
return r0 return r0
} }
// PeerManager_IsProtected_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsStatic' // PeerManager_IsStatic_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsStatic'
type PeerManager_IsProtected_Call struct { type PeerManager_IsStatic_Call struct {
*mock.Call *mock.Call
} }
// IsProtected is a helper method to define mock.On call // IsStatic is a helper method to define mock.On call
// - _a0 peer.ID // - _a0 peer.ID
func (_e *PeerManager_Expecter) IsProtected(_a0 interface{}) *PeerManager_IsProtected_Call { func (_e *PeerManager_Expecter) IsStatic(_a0 interface{}) *PeerManager_IsStatic_Call {
return &PeerManager_IsProtected_Call{Call: _e.mock.On("IsStatic", _a0)} return &PeerManager_IsStatic_Call{Call: _e.mock.On("IsStatic", _a0)}
} }
func (_c *PeerManager_IsProtected_Call) Run(run func(_a0 peer.ID)) *PeerManager_IsProtected_Call { func (_c *PeerManager_IsStatic_Call) Run(run func(_a0 peer.ID)) *PeerManager_IsStatic_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID)) run(args[0].(peer.ID))
}) })
return _c return _c
} }
func (_c *PeerManager_IsProtected_Call) Return(_a0 bool) *PeerManager_IsProtected_Call { func (_c *PeerManager_IsStatic_Call) Return(_a0 bool) *PeerManager_IsStatic_Call {
_c.Call.Return(_a0) _c.Call.Return(_a0)
return _c return _c
} }
func (_c *PeerManager_IsProtected_Call) RunAndReturn(run func(peer.ID) bool) *PeerManager_IsProtected_Call { func (_c *PeerManager_IsStatic_Call) RunAndReturn(run func(peer.ID) bool) *PeerManager_IsStatic_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }
......
...@@ -96,7 +96,7 @@ func TestCheckNextPeer(t *testing.T) { ...@@ -96,7 +96,7 @@ func TestCheckNextPeer(t *testing.T) {
id := peerIDs[0] id := peerIDs[0]
manager.EXPECT().Peers().Return(peerIDs).Once() manager.EXPECT().Peers().Return(peerIDs).Once()
manager.EXPECT().GetPeerScore(id).Return(-101, nil).Once() manager.EXPECT().GetPeerScore(id).Return(-101, nil).Once()
manager.EXPECT().IsProtected(id).Return(false).Once() manager.EXPECT().IsStatic(id).Return(false).Once()
manager.EXPECT().BanPeer(id, clock.Now().Add(testBanDuration)).Return(nil).Once() manager.EXPECT().BanPeer(id, clock.Now().Add(testBanDuration)).Return(nil).Once()
require.NoError(t, monitor.checkNextPeer()) require.NoError(t, monitor.checkNextPeer())
...@@ -107,7 +107,7 @@ func TestCheckNextPeer(t *testing.T) { ...@@ -107,7 +107,7 @@ func TestCheckNextPeer(t *testing.T) {
id := peerIDs[0] id := peerIDs[0]
manager.EXPECT().Peers().Return(peerIDs).Once() manager.EXPECT().Peers().Return(peerIDs).Once()
manager.EXPECT().GetPeerScore(id).Return(-101, nil).Once() manager.EXPECT().GetPeerScore(id).Return(-101, nil).Once()
manager.EXPECT().IsProtected(id).Return(true) manager.EXPECT().IsStatic(id).Return(true)
require.NoError(t, monitor.checkNextPeer()) require.NoError(t, monitor.checkNextPeer())
}) })
......
...@@ -117,7 +117,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -117,7 +117,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
return fmt.Errorf("cannot init without extended peerstore: %w", err) return fmt.Errorf("cannot init without extended peerstore: %w", err)
} }
n.store = eps n.store = eps
n.scorer = NewScorer(rollupCfg, eps, metrics, setup.PeerBandScorer(), log) n.scorer = NewScorer(rollupCfg, eps, metrics, log)
n.host.Network().Notify(&network.NotifyBundle{ n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(_ network.Network, conn network.Conn) { ConnectedF: func(_ network.Network, conn network.Conn) {
n.scorer.OnConnect(conn.RemotePeer()) n.scorer.OnConnect(conn.RemotePeer())
......
package p2p package p2p
import ( import (
"fmt"
"sort"
"strconv"
"strings"
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
...@@ -16,72 +12,10 @@ import ( ...@@ -16,72 +12,10 @@ import (
) )
type scorer struct { type scorer struct {
peerStore Peerstore peerStore Peerstore
metricer ScoreMetrics metricer ScoreMetrics
log log.Logger log log.Logger
bandScoreThresholds *BandScoreThresholds cfg *rollup.Config
cfg *rollup.Config
}
// scorePair holds a band and its corresponding threshold.
type scorePair struct {
band string
threshold float64
}
// BandScoreThresholds holds the thresholds for classifying peers
// into different score bands.
type BandScoreThresholds struct {
bands []scorePair
}
// NewBandScorer constructs a new [BandScoreThresholds] instance.
func NewBandScorer(str string) (*BandScoreThresholds, error) {
s := &BandScoreThresholds{
bands: make([]scorePair, 0),
}
for _, band := range strings.Split(str, ";") {
// Skip empty band strings.
band := strings.TrimSpace(band)
if band == "" {
continue
}
split := strings.Split(band, ":")
if len(split) != 2 {
return nil, fmt.Errorf("invalid score band: %s", band)
}
threshold, err := strconv.ParseFloat(split[0], 64)
if err != nil {
return nil, err
}
s.bands = append(s.bands, scorePair{
band: split[1],
threshold: threshold,
})
}
// Order the bands by threshold in ascending order.
sort.Slice(s.bands, func(i, j int) bool {
return s.bands[i].threshold < s.bands[j].threshold
})
return s, nil
}
// Bucket returns the appropriate band for a given score.
func (s *BandScoreThresholds) Bucket(score float64) string {
for _, pair := range s.bands {
if score <= pair.threshold {
return pair.band
}
}
// If there is no band threshold higher than the score,
// the peer must be placed in the highest bucket.
if len(s.bands) > 0 {
return s.bands[len(s.bands)-1].band
}
return ""
} }
// Peerstore is a subset of the libp2p peerstore.Peerstore interface. // Peerstore is a subset of the libp2p peerstore.Peerstore interface.
...@@ -96,7 +30,7 @@ type Peerstore interface { ...@@ -96,7 +30,7 @@ type Peerstore interface {
// Peers returns all of the peer IDs stored across all inner stores. // Peers returns all of the peer IDs stored across all inner stores.
Peers() peer.IDSlice Peers() peer.IDSlice
SetScore(id peer.ID, diff store.ScoreDiff) error SetScore(id peer.ID, diff store.ScoreDiff) (store.PeerScores, error)
} }
// Scorer is a peer scorer that scores peers based on application-specific metrics. // Scorer is a peer scorer that scores peers based on application-specific metrics.
...@@ -106,18 +40,18 @@ type Scorer interface { ...@@ -106,18 +40,18 @@ type Scorer interface {
SnapshotHook() pubsub.ExtendedPeerScoreInspectFn SnapshotHook() pubsub.ExtendedPeerScoreInspectFn
} }
//go:generate mockery --name ScoreMetrics --output mocks/
type ScoreMetrics interface { type ScoreMetrics interface {
SetPeerScores(map[string]float64) SetPeerScores([]store.PeerScores)
} }
// NewScorer returns a new peer scorer. // NewScorer returns a new peer scorer.
func NewScorer(cfg *rollup.Config, peerStore Peerstore, metricer ScoreMetrics, bandScoreThresholds *BandScoreThresholds, log log.Logger) Scorer { func NewScorer(cfg *rollup.Config, peerStore Peerstore, metricer ScoreMetrics, log log.Logger) Scorer {
return &scorer{ return &scorer{
peerStore: peerStore, peerStore: peerStore,
metricer: metricer, metricer: metricer,
log: log, log: log,
bandScoreThresholds: bandScoreThresholds, cfg: cfg,
cfg: cfg,
} }
} }
...@@ -128,11 +62,7 @@ func NewScorer(cfg *rollup.Config, peerStore Peerstore, metricer ScoreMetrics, b ...@@ -128,11 +62,7 @@ func NewScorer(cfg *rollup.Config, peerStore Peerstore, metricer ScoreMetrics, b
func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn { func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
blocksTopicName := blocksTopicV1(s.cfg) blocksTopicName := blocksTopicV1(s.cfg)
return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) { return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) {
scoreMap := make(map[string]float64) allScores := make([]store.PeerScores, 0, len(m))
// Zero out all bands.
for _, b := range s.bandScoreThresholds.bands {
scoreMap[b.band] = 0
}
// Now set the new scores. // Now set the new scores.
for id, snap := range m { for id, snap := range m {
diff := store.GossipScores{ diff := store.GossipScores{
...@@ -147,15 +77,13 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn { ...@@ -147,15 +77,13 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
diff.Blocks.FirstMessageDeliveries = topSnap.FirstMessageDeliveries diff.Blocks.FirstMessageDeliveries = topSnap.FirstMessageDeliveries
diff.Blocks.InvalidMessageDeliveries = topSnap.InvalidMessageDeliveries diff.Blocks.InvalidMessageDeliveries = topSnap.InvalidMessageDeliveries
} }
if err := s.peerStore.SetScore(id, &diff); err != nil { if peerScores, err := s.peerStore.SetScore(id, &diff); err != nil {
s.log.Warn("Unable to update peer gossip score", "err", err) s.log.Warn("Unable to update peer gossip score", "err", err)
} else {
allScores = append(allScores, peerScores)
} }
} }
for _, snap := range m { s.metricer.SetPeerScores(allScores)
band := s.bandScoreThresholds.Bucket(snap.Score)
scoreMap[band] += 1
}
s.metricer.SetPeerScores(scoreMap)
} }
} }
......
...@@ -22,18 +22,14 @@ type PeerScorerTestSuite struct { ...@@ -22,18 +22,14 @@ type PeerScorerTestSuite struct {
suite.Suite suite.Suite
mockStore *p2pMocks.Peerstore mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer mockMetricer *p2pMocks.ScoreMetrics
bandScorer *p2p.BandScoreThresholds
logger log.Logger logger log.Logger
} }
// SetupTest sets up the test suite. // SetupTest sets up the test suite.
func (testSuite *PeerScorerTestSuite) SetupTest() { func (testSuite *PeerScorerTestSuite) SetupTest() {
testSuite.mockStore = &p2pMocks.Peerstore{} testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{} testSuite.mockMetricer = &p2pMocks.ScoreMetrics{}
bandScorer, err := p2p.NewBandScorer("-40:graylist;0:friend;")
testSuite.NoError(err)
testSuite.bandScorer = bandScorer
testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError) testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError)
} }
...@@ -48,7 +44,6 @@ func (testSuite *PeerScorerTestSuite) TestScorer_OnConnect() { ...@@ -48,7 +44,6 @@ func (testSuite *PeerScorerTestSuite) TestScorer_OnConnect() {
&rollup.Config{L2ChainID: big.NewInt(123)}, &rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore, testSuite.mockStore,
testSuite.mockMetricer, testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger, testSuite.logger,
) )
scorer.OnConnect(peer.ID("alice")) scorer.OnConnect(peer.ID("alice"))
...@@ -60,7 +55,6 @@ func (testSuite *PeerScorerTestSuite) TestScorer_OnDisconnect() { ...@@ -60,7 +55,6 @@ func (testSuite *PeerScorerTestSuite) TestScorer_OnDisconnect() {
&rollup.Config{L2ChainID: big.NewInt(123)}, &rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore, testSuite.mockStore,
testSuite.mockMetricer, testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger, testSuite.logger,
) )
scorer.OnDisconnect(peer.ID("alice")) scorer.OnDisconnect(peer.ID("alice"))
...@@ -72,19 +66,16 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() { ...@@ -72,19 +66,16 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
&rollup.Config{L2ChainID: big.NewInt(123)}, &rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore, testSuite.mockStore,
testSuite.mockMetricer, testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger, testSuite.logger,
) )
inspectFn := scorer.SnapshotHook() inspectFn := scorer.SnapshotHook()
scores := store.PeerScores{Gossip: store.GossipScores{Total: 3}}
// Expect updating the peer store // Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: float64(-100)}).Return(nil).Once() testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: float64(-100)}).Return(scores, nil).Once()
// The metricer should then be called with the peer score band map // The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{ testSuite.mockMetricer.On("SetPeerScores", []store.PeerScores{scores}).Return(nil).Once()
"friend": 0,
"graylist": 1,
}).Return(nil).Once()
// Apply the snapshot // Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{ snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
...@@ -95,13 +86,10 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() { ...@@ -95,13 +86,10 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
inspectFn(snapshotMap) inspectFn(snapshotMap)
// Expect updating the peer store // Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: 0}).Return(nil).Once() testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: 0}).Return(scores, nil).Once()
// The metricer should then be called with the peer score band map // The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{ testSuite.mockMetricer.On("SetPeerScores", []store.PeerScores{scores}).Return(nil).Once()
"friend": 1,
"graylist": 0,
}).Return(nil).Once()
// Apply the snapshot // Apply the snapshot
snapshotMap = map[peer.ID]*pubsub.PeerScoreSnapshot{ snapshotMap = map[peer.ID]*pubsub.PeerScoreSnapshot{
...@@ -111,33 +99,3 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() { ...@@ -111,33 +99,3 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
} }
inspectFn(snapshotMap) inspectFn(snapshotMap)
} }
// TestScorer_SnapshotHookBlocksPeer tests running the snapshot hook on the peer scorer with a peer score below the threshold.
// This implies that the peer should be blocked.
func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHookBlocksPeer() {
scorer := p2p.NewScorer(
&rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger,
)
inspectFn := scorer.SnapshotHook()
// Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: float64(-101)}).Return(nil).Once()
// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
"friend": 0,
"graylist": 1,
}).Return(nil)
// Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
peer.ID("peer1"): {
Score: -101,
},
}
inspectFn(snapshotMap)
}
...@@ -36,18 +36,14 @@ type PeerScoresTestSuite struct { ...@@ -36,18 +36,14 @@ type PeerScoresTestSuite struct {
suite.Suite suite.Suite
mockStore *p2pMocks.Peerstore mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer mockMetricer *p2pMocks.ScoreMetrics
bandScorer BandScoreThresholds
logger log.Logger logger log.Logger
} }
// SetupTest sets up the test suite. // SetupTest sets up the test suite.
func (testSuite *PeerScoresTestSuite) SetupTest() { func (testSuite *PeerScoresTestSuite) SetupTest() {
testSuite.mockStore = &p2pMocks.Peerstore{} testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{} testSuite.mockMetricer = &p2pMocks.ScoreMetrics{}
bandScorer, err := NewBandScorer("0:graylist;")
testSuite.NoError(err)
testSuite.bandScorer = *bandScorer
testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError) testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError)
} }
...@@ -104,9 +100,8 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts [] ...@@ -104,9 +100,8 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
scorer := NewScorer( scorer := NewScorer(
&rollup.Config{L2ChainID: big.NewInt(123)}, &rollup.Config{L2ChainID: big.NewInt(123)},
extPeerStore, testSuite.mockMetricer, &testSuite.bandScorer, logger) extPeerStore, testSuite.mockMetricer, logger)
opts = append(opts, ConfigurePeerScoring(&Config{ opts = append(opts, ConfigurePeerScoring(&Config{
BandScoreThresholds: testSuite.bandScorer,
PeerScoring: pubsub.PeerScoreParams{ PeerScoring: pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 { AppSpecificScore: func(p peer.ID) float64 {
if p == hosts[0].ID() { if p == hosts[0].ID() {
...@@ -157,7 +152,7 @@ func (testSuite *PeerScoresTestSuite) TestNegativeScores() { ...@@ -157,7 +152,7 @@ func (testSuite *PeerScoresTestSuite) TestNegativeScores() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
testSuite.mockMetricer.On("SetPeerScores", mock.Anything).Return(nil) testSuite.mockMetricer.On("SetPeerScores", mock.Anything, mock.Anything).Return(nil)
// Construct 20 hosts using the [getNetHosts] function. // Construct 20 hosts using the [getNetHosts] function.
hosts := getNetHosts(testSuite, ctx, 20) hosts := getNetHosts(testSuite, ctx, 20)
......
...@@ -73,10 +73,6 @@ func (p *Prepared) PeerScoringParams() *pubsub.PeerScoreParams { ...@@ -73,10 +73,6 @@ func (p *Prepared) PeerScoringParams() *pubsub.PeerScoreParams {
return nil return nil
} }
func (p *Prepared) PeerBandScorer() *BandScoreThresholds {
return nil
}
func (p *Prepared) BanPeers() bool { func (p *Prepared) BanPeers() bool {
return false return false
} }
......
...@@ -41,7 +41,7 @@ type ScoreDatastore interface { ...@@ -41,7 +41,7 @@ type ScoreDatastore interface {
GetPeerScore(id peer.ID) (float64, error) GetPeerScore(id peer.ID) (float64, error)
// SetScore applies the given store diff to the specified peer // SetScore applies the given store diff to the specified peer
SetScore(id peer.ID, diff ScoreDiff) error SetScore(id peer.ID, diff ScoreDiff) (PeerScores, error)
} }
// ScoreDiff defines a type-safe batch of changes to apply to the peer-scoring record of the peer. // ScoreDiff defines a type-safe batch of changes to apply to the peer-scoring record of the peer.
......
...@@ -84,7 +84,8 @@ func (d *ipBanBook) SetIPBanExpiration(ip net.IP, expirationTime time.Time) erro ...@@ -84,7 +84,8 @@ func (d *ipBanBook) SetIPBanExpiration(ip net.IP, expirationTime time.Time) erro
if expirationTime == (time.Time{}) { if expirationTime == (time.Time{}) {
return d.book.deleteRecord(ip.To16().String()) return d.book.deleteRecord(ip.To16().String())
} }
return d.book.SetRecord(ip.To16().String(), ipBanUpdate(expirationTime)) _, err := d.book.SetRecord(ip.To16().String(), ipBanUpdate(expirationTime))
return err
} }
func (d *ipBanBook) Close() { func (d *ipBanBook) Close() {
......
...@@ -80,7 +80,8 @@ func (d *peerBanBook) SetPeerBanExpiration(id peer.ID, expirationTime time.Time) ...@@ -80,7 +80,8 @@ func (d *peerBanBook) SetPeerBanExpiration(id peer.ID, expirationTime time.Time)
if expirationTime == (time.Time{}) { if expirationTime == (time.Time{}) {
return d.book.deleteRecord(id) return d.book.deleteRecord(id)
} }
return d.book.SetRecord(id, peerBanUpdate(expirationTime)) _, err := d.book.SetRecord(id, peerBanUpdate(expirationTime))
return err
} }
func (d *peerBanBook) Close() { func (d *peerBanBook) Close() {
......
...@@ -124,27 +124,27 @@ func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) { ...@@ -124,27 +124,27 @@ func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
return v, nil return v, nil
} }
func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) error { func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) (V, error) {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
rec, err := d.getRecord(key) rec, err := d.getRecord(key)
if err == UnknownRecordErr { // instantiate new record if it does not exist yet if err == UnknownRecordErr { // instantiate new record if it does not exist yet
rec = d.newRecord() rec = d.newRecord()
} else if err != nil { } else if err != nil {
return err return d.newRecord(), err
} }
rec.SetLastUpdated(d.clock.Now()) rec.SetLastUpdated(d.clock.Now())
diff.Apply(rec) diff.Apply(rec)
data, err := rec.MarshalBinary() data, err := rec.MarshalBinary()
if err != nil { if err != nil {
return fmt.Errorf("failed to encode record for key %v: %w", key, err) return d.newRecord(), fmt.Errorf("failed to encode record for key %v: %w", key, err)
} }
err = d.store.Put(d.ctx, d.dsKey(key), data) err = d.store.Put(d.ctx, d.dsKey(key), data)
if err != nil { if err != nil {
return fmt.Errorf("storing updated record for key %v: %w", key, err) return d.newRecord(), fmt.Errorf("storing updated record for key %v: %w", key, err)
} }
d.cache.Add(key, rec) d.cache.Add(key, rec)
return nil return rec, nil
} }
// prune deletes entries from the store that are older than the configured prune expiration. // prune deletes entries from the store that are older than the configured prune expiration.
......
...@@ -86,8 +86,9 @@ func (d *scoreBook) GetPeerScore(id peer.ID) (float64, error) { ...@@ -86,8 +86,9 @@ func (d *scoreBook) GetPeerScore(id peer.ID) (float64, error) {
return scores.Gossip.Total, nil return scores.Gossip.Total, nil
} }
func (d *scoreBook) SetScore(id peer.ID, diff ScoreDiff) error { func (d *scoreBook) SetScore(id peer.ID, diff ScoreDiff) (PeerScores, error) {
return d.book.SetRecord(id, diff) v, err := d.book.SetRecord(id, diff)
return v.PeerScores, err
} }
func (d *scoreBook) Close() { func (d *scoreBook) Close() {
......
...@@ -26,18 +26,21 @@ func TestRoundTripGossipScore(t *testing.T) { ...@@ -26,18 +26,21 @@ func TestRoundTripGossipScore(t *testing.T) {
id := peer.ID("aaaa") id := peer.ID("aaaa")
store := createMemoryStore(t) store := createMemoryStore(t)
score := 123.45 score := 123.45
err := store.SetScore(id, &GossipScores{Total: score}) res, err := store.SetScore(id, &GossipScores{Total: score})
require.NoError(t, err) require.NoError(t, err)
assertPeerScores(t, store, id, PeerScores{Gossip: GossipScores{Total: score}}) expected := PeerScores{Gossip: GossipScores{Total: score}}
require.Equal(t, expected, res)
assertPeerScores(t, store, id, expected)
} }
func TestUpdateGossipScore(t *testing.T) { func TestUpdateGossipScore(t *testing.T) {
id := peer.ID("aaaa") id := peer.ID("aaaa")
store := createMemoryStore(t) store := createMemoryStore(t)
score := 123.45 score := 123.45
require.NoError(t, store.SetScore(id, &GossipScores{Total: 444.223})) setScoreRequired(t, store, id, &GossipScores{Total: 444.223})
require.NoError(t, store.SetScore(id, &GossipScores{Total: score})) setScoreRequired(t, store, id, &GossipScores{Total: score})
assertPeerScores(t, store, id, PeerScores{Gossip: GossipScores{Total: score}}) assertPeerScores(t, store, id, PeerScores{Gossip: GossipScores{Total: score}})
} }
...@@ -48,8 +51,8 @@ func TestStoreScoresForMultiplePeers(t *testing.T) { ...@@ -48,8 +51,8 @@ func TestStoreScoresForMultiplePeers(t *testing.T) {
store := createMemoryStore(t) store := createMemoryStore(t)
score1 := 123.45 score1 := 123.45
score2 := 453.22 score2 := 453.22
require.NoError(t, store.SetScore(id1, &GossipScores{Total: score1})) setScoreRequired(t, store, id1, &GossipScores{Total: score1})
require.NoError(t, store.SetScore(id2, &GossipScores{Total: score2})) setScoreRequired(t, store, id2, &GossipScores{Total: score2})
assertPeerScores(t, store, id1, PeerScores{Gossip: GossipScores{Total: score1}}) assertPeerScores(t, store, id1, PeerScores{Gossip: GossipScores{Total: score1}})
assertPeerScores(t, store, id2, PeerScores{Gossip: GossipScores{Total: score2}}) assertPeerScores(t, store, id2, PeerScores{Gossip: GossipScores{Total: score2}})
...@@ -61,7 +64,7 @@ func TestPersistData(t *testing.T) { ...@@ -61,7 +64,7 @@ func TestPersistData(t *testing.T) {
backingStore := sync.MutexWrap(ds.NewMapDatastore()) backingStore := sync.MutexWrap(ds.NewMapDatastore())
store := createPeerstoreWithBacking(t, backingStore) store := createPeerstoreWithBacking(t, backingStore)
require.NoError(t, store.SetScore(id, &GossipScores{Total: score})) setScoreRequired(t, store, id, &GossipScores{Total: score})
// Close and recreate a new store from the same backing // Close and recreate a new store from the same backing
require.NoError(t, store.Close()) require.NoError(t, store.Close())
...@@ -92,17 +95,17 @@ func TestPrune(t *testing.T) { ...@@ -92,17 +95,17 @@ func TestPrune(t *testing.T) {
firstStore := clock.Now() firstStore := clock.Now()
// Set some scores all 30 minutes apart so they have different expiry times // Set some scores all 30 minutes apart so they have different expiry times
require.NoError(t, book.SetScore("aaaa", &GossipScores{Total: 123.45})) setScoreRequired(t, book, "aaaa", &GossipScores{Total: 123.45})
clock.AdvanceTime(30 * time.Minute) clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("bbbb", &GossipScores{Total: 123.45})) setScoreRequired(t, book, "bbbb", &GossipScores{Total: 123.45})
clock.AdvanceTime(30 * time.Minute) clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("cccc", &GossipScores{Total: 123.45})) setScoreRequired(t, book, "cccc", &GossipScores{Total: 123.45})
clock.AdvanceTime(30 * time.Minute) clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("dddd", &GossipScores{Total: 123.45})) setScoreRequired(t, book, "dddd", &GossipScores{Total: 123.45})
clock.AdvanceTime(30 * time.Minute) clock.AdvanceTime(30 * time.Minute)
// Update bbbb again which should extend its expiry // Update bbbb again which should extend its expiry
require.NoError(t, book.SetScore("bbbb", &GossipScores{Total: 123.45})) setScoreRequired(t, book, "bbbb", &GossipScores{Total: 123.45})
require.True(t, hasScoreRecorded("aaaa")) require.True(t, hasScoreRecorded("aaaa"))
require.True(t, hasScoreRecorded("bbbb")) require.True(t, hasScoreRecorded("bbbb"))
...@@ -147,7 +150,7 @@ func TestPruneMultipleBatches(t *testing.T) { ...@@ -147,7 +150,7 @@ func TestPruneMultipleBatches(t *testing.T) {
// Set scores for more peers than the max batch size // Set scores for more peers than the max batch size
peerCount := maxPruneBatchSize*3 + 5 peerCount := maxPruneBatchSize*3 + 5
for i := 0; i < peerCount; i++ { for i := 0; i < peerCount; i++ {
require.NoError(t, book.SetScore(peer.ID(strconv.Itoa(i)), &GossipScores{Total: 123.45})) setScoreRequired(t, book, peer.ID(strconv.Itoa(i)), &GossipScores{Total: 123.45})
} }
clock.AdvanceTime(book.book.recordExpiry + 1) clock.AdvanceTime(book.book.recordExpiry + 1)
require.NoError(t, book.book.prune()) require.NoError(t, book.book.prune())
...@@ -169,7 +172,7 @@ func TestIgnoreOutdatedScores(t *testing.T) { ...@@ -169,7 +172,7 @@ func TestIgnoreOutdatedScores(t *testing.T) {
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()), retentionPeriod) book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()), retentionPeriod)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, book.SetScore("a", &GossipScores{Total: 123.45})) setScoreRequired(t, book, "a", &GossipScores{Total: 123.45})
clock.AdvanceTime(retentionPeriod + 1) clock.AdvanceTime(retentionPeriod + 1)
// Not available from cache // Not available from cache
...@@ -211,3 +214,8 @@ func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) Extend ...@@ -211,3 +214,8 @@ func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) Extend
}) })
return eps return eps
} }
func setScoreRequired(t *testing.T, store ScoreDatastore, id peer.ID, diff *GossipScores) {
_, err := store.SetScore(id, diff)
require.NoError(t, err)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment