Commit 7de5042c authored by Adrian Sutton's avatar Adrian Sutton

op-node: Add a histogram to report current peer scores

parent 6a474e36
package metrics
import (
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
)
type ReplaceableHistogramVec struct {
current *atomic.Value
opts prometheus.HistogramOpts
variableLabels []string
}
func NewReplaceableHistogramVec(registry *prometheus.Registry, opts prometheus.HistogramOpts, variableLabels []string) *ReplaceableHistogramVec {
metric := &ReplaceableHistogramVec{
current: &atomic.Value{},
opts: opts,
variableLabels: variableLabels,
}
metric.current.Store(prometheus.NewHistogramVec(opts, variableLabels))
registry.MustRegister(metric)
return metric
}
func (c *ReplaceableHistogramVec) Replace(updater func(h *prometheus.HistogramVec)) {
h := prometheus.NewHistogramVec(c.opts, c.variableLabels)
updater(h)
c.current.Store(h)
}
func (c *ReplaceableHistogramVec) Describe(ch chan<- *prometheus.Desc) {
collector, ok := c.current.Load().(prometheus.Collector)
if ok {
collector.Describe(ch)
}
}
func (c *ReplaceableHistogramVec) Collect(ch chan<- prometheus.Metric) {
collector, ok := c.current.Load().(prometheus.Collector)
if ok {
collector.Collect(ch)
}
}
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"time" "time"
ophttp "github.com/ethereum-optimism/optimism/op-node/http" ophttp "github.com/ethereum-optimism/optimism/op-node/http"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/metrics" "github.com/ethereum-optimism/optimism/op-service/metrics"
pb "github.com/libp2p/go-libp2p-pubsub/pb" pb "github.com/libp2p/go-libp2p-pubsub/pb"
...@@ -66,7 +67,7 @@ type Metricer interface { ...@@ -66,7 +67,7 @@ type Metricer interface {
Document() []metrics.DocumentedMetric Document() []metrics.DocumentedMetric
RecordChannelInputBytes(num int) RecordChannelInputBytes(num int)
// P2P Metrics // P2P Metrics
SetPeerScores(scores map[string]float64) SetPeerScores(scores map[string]float64, allScores []store.PeerScores)
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
PayloadsQuarantineSize(n int) PayloadsQuarantineSize(n int)
...@@ -132,15 +133,16 @@ type Metrics struct { ...@@ -132,15 +133,16 @@ type Metrics struct {
TransactionsSequencedTotal prometheus.Counter TransactionsSequencedTotal prometheus.Counter
// P2P Metrics // P2P Metrics
PeerCount prometheus.Gauge PeerCount prometheus.Gauge
StreamCount prometheus.Gauge StreamCount prometheus.Gauge
PeerScores *prometheus.GaugeVec PeerScores *prometheus.GaugeVec
GossipEventsTotal *prometheus.CounterVec GossipEventsTotal *prometheus.CounterVec
BandwidthTotal *prometheus.GaugeVec BandwidthTotal *prometheus.GaugeVec
PeerUnbans prometheus.Counter PeerUnbans prometheus.Counter
IPUnbans prometheus.Counter IPUnbans prometheus.Counter
Dials *prometheus.CounterVec Dials *prometheus.CounterVec
Accepts *prometheus.CounterVec Accepts *prometheus.CounterVec
PeerScoresHistogram *ReplaceableHistogramVec
ChannelInputBytes prometheus.Counter ChannelInputBytes prometheus.Counter
...@@ -161,6 +163,7 @@ func NewMetrics(procName string) *Metrics { ...@@ -161,6 +163,7 @@ func NewMetrics(procName string) *Metrics {
registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
registry.MustRegister(collectors.NewGoCollector()) registry.MustRegister(collectors.NewGoCollector())
factory := metrics.With(registry) factory := metrics.With(registry)
return &Metrics{ return &Metrics{
Info: factory.NewGaugeVec(prometheus.GaugeOpts{ Info: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
...@@ -321,6 +324,12 @@ func NewMetrics(procName string) *Metrics { ...@@ -321,6 +324,12 @@ func NewMetrics(procName string) *Metrics {
}, []string{ }, []string{
"band", "band",
}), }),
PeerScoresHistogram: NewReplaceableHistogramVec(registry, prometheus.HistogramOpts{
Namespace: ns,
Name: "peer_scores_histogram",
Help: "Histogram of currrently connected peer scores",
Buckets: []float64{-100, -40, -20, -10, -5, -2, -1, -0.5, -0.05, 0, 0.05, 0.5, 1, 2, 5, 10, 20, 40},
}, []string{"type"}),
StreamCount: factory.NewGauge(prometheus.GaugeOpts{ StreamCount: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Subsystem: "p2p", Subsystem: "p2p",
...@@ -452,7 +461,18 @@ func NewMetrics(procName string) *Metrics { ...@@ -452,7 +461,18 @@ func NewMetrics(procName string) *Metrics {
// SetPeerScores updates the peer score [prometheus.GaugeVec]. // SetPeerScores updates the peer score [prometheus.GaugeVec].
// This takes a map of labels to scores. // This takes a map of labels to scores.
func (m *Metrics) SetPeerScores(scores map[string]float64) { func (m *Metrics) SetPeerScores(scores map[string]float64, allScores []store.PeerScores) {
m.PeerScoresHistogram.Replace(func(h *prometheus.HistogramVec) {
for _, scores := range allScores {
h.WithLabelValues("total").Observe(scores.Gossip.Total)
h.WithLabelValues("ipColocation").Observe(scores.Gossip.IPColocationFactor)
h.WithLabelValues("behavioralPenalty").Observe(scores.Gossip.BehavioralPenalty)
h.WithLabelValues("blocksFirstMessage").Observe(scores.Gossip.Blocks.FirstMessageDeliveries)
h.WithLabelValues("blocksTimeInMesh").Observe(scores.Gossip.Blocks.TimeInMesh)
h.WithLabelValues("blocksMessageDeliveries").Observe(scores.Gossip.Blocks.MeshMessageDeliveries)
h.WithLabelValues("blocksInvalidMessageDeliveries").Observe(scores.Gossip.Blocks.InvalidMessageDeliveries)
}
})
for label, score := range scores { for label, score := range scores {
m.PeerScores.WithLabelValues(label).Set(score) m.PeerScores.WithLabelValues(label).Set(score)
} }
...@@ -785,7 +805,7 @@ func (n *noopMetricer) RecordSequencerReset() { ...@@ -785,7 +805,7 @@ func (n *noopMetricer) RecordSequencerReset() {
func (n *noopMetricer) RecordGossipEvent(evType int32) { func (n *noopMetricer) RecordGossipEvent(evType int32) {
} }
func (n *noopMetricer) SetPeerScores(scores map[string]float64) { func (n *noopMetricer) SetPeerScores(scores map[string]float64, allScores []store.PeerScores) {
} }
func (n *noopMetricer) IncPeerCount() { func (n *noopMetricer) IncPeerCount() {
......
// Code generated by mockery v2.22.1. DO NOT EDIT. // Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks package mocks
...@@ -14,11 +14,6 @@ func (_m *GossipMetricer) RecordGossipEvent(evType int32) { ...@@ -14,11 +14,6 @@ func (_m *GossipMetricer) RecordGossipEvent(evType int32) {
_m.Called(evType) _m.Called(evType)
} }
// SetPeerScores provides a mock function with given fields: _a0
func (_m *GossipMetricer) SetPeerScores(_a0 map[string]float64) {
_m.Called(_a0)
}
type mockConstructorTestingTNewGossipMetricer interface { type mockConstructorTestingTNewGossipMetricer interface {
mock.TestingT mock.TestingT
Cleanup(func()) Cleanup(func())
......
...@@ -46,17 +46,27 @@ func (_m *Peerstore) Peers() peer.IDSlice { ...@@ -46,17 +46,27 @@ func (_m *Peerstore) Peers() peer.IDSlice {
} }
// SetScore provides a mock function with given fields: id, diff // SetScore provides a mock function with given fields: id, diff
func (_m *Peerstore) SetScore(id peer.ID, diff store.ScoreDiff) error { func (_m *Peerstore) SetScore(id peer.ID, diff store.ScoreDiff) (store.PeerScores, error) {
ret := _m.Called(id, diff) ret := _m.Called(id, diff)
var r0 error var r0 store.PeerScores
if rf, ok := ret.Get(0).(func(peer.ID, store.ScoreDiff) error); ok { var r1 error
if rf, ok := ret.Get(0).(func(peer.ID, store.ScoreDiff) (store.PeerScores, error)); ok {
return rf(id, diff)
}
if rf, ok := ret.Get(0).(func(peer.ID, store.ScoreDiff) store.PeerScores); ok {
r0 = rf(id, diff) r0 = rf(id, diff)
} else { } else {
r0 = ret.Error(0) r0 = ret.Get(0).(store.PeerScores)
} }
return r0 if rf, ok := ret.Get(1).(func(peer.ID, store.ScoreDiff) error); ok {
r1 = rf(id, diff)
} else {
r1 = ret.Error(1)
}
return r0, r1
} }
type mockConstructorTestingTNewPeerstore interface { type mockConstructorTestingTNewPeerstore interface {
......
// Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
store "github.com/ethereum-optimism/optimism/op-node/p2p/store"
)
// ScoreMetrics is an autogenerated mock type for the ScoreMetrics type
type ScoreMetrics struct {
mock.Mock
}
// SetPeerScores provides a mock function with given fields: _a0, _a1
func (_m *ScoreMetrics) SetPeerScores(_a0 map[string]float64, _a1 []store.PeerScores) {
_m.Called(_a0, _a1)
}
type mockConstructorTestingTNewScoreMetrics interface {
mock.TestingT
Cleanup(func())
}
// NewScoreMetrics creates a new instance of ScoreMetrics. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewScoreMetrics(t mockConstructorTestingTNewScoreMetrics) *ScoreMetrics {
mock := &ScoreMetrics{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
...@@ -96,7 +96,7 @@ type Peerstore interface { ...@@ -96,7 +96,7 @@ type Peerstore interface {
// Peers returns all of the peer IDs stored across all inner stores. // Peers returns all of the peer IDs stored across all inner stores.
Peers() peer.IDSlice Peers() peer.IDSlice
SetScore(id peer.ID, diff store.ScoreDiff) error SetScore(id peer.ID, diff store.ScoreDiff) (store.PeerScores, error)
} }
// Scorer is a peer scorer that scores peers based on application-specific metrics. // Scorer is a peer scorer that scores peers based on application-specific metrics.
...@@ -106,8 +106,9 @@ type Scorer interface { ...@@ -106,8 +106,9 @@ type Scorer interface {
SnapshotHook() pubsub.ExtendedPeerScoreInspectFn SnapshotHook() pubsub.ExtendedPeerScoreInspectFn
} }
//go:generate mockery --name ScoreMetrics --output mocks/
type ScoreMetrics interface { type ScoreMetrics interface {
SetPeerScores(map[string]float64) SetPeerScores(map[string]float64, []store.PeerScores)
} }
// NewScorer returns a new peer scorer. // NewScorer returns a new peer scorer.
...@@ -129,6 +130,7 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn { ...@@ -129,6 +130,7 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
blocksTopicName := blocksTopicV1(s.cfg) blocksTopicName := blocksTopicV1(s.cfg)
return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) { return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) {
scoreMap := make(map[string]float64) scoreMap := make(map[string]float64)
allScores := make([]store.PeerScores, 0, len(m))
// Zero out all bands. // Zero out all bands.
for _, b := range s.bandScoreThresholds.bands { for _, b := range s.bandScoreThresholds.bands {
scoreMap[b.band] = 0 scoreMap[b.band] = 0
...@@ -147,15 +149,17 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn { ...@@ -147,15 +149,17 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
diff.Blocks.FirstMessageDeliveries = topSnap.FirstMessageDeliveries diff.Blocks.FirstMessageDeliveries = topSnap.FirstMessageDeliveries
diff.Blocks.InvalidMessageDeliveries = topSnap.InvalidMessageDeliveries diff.Blocks.InvalidMessageDeliveries = topSnap.InvalidMessageDeliveries
} }
if err := s.peerStore.SetScore(id, &diff); err != nil { if peerScores, err := s.peerStore.SetScore(id, &diff); err != nil {
s.log.Warn("Unable to update peer gossip score", "err", err) s.log.Warn("Unable to update peer gossip score", "err", err)
} else {
allScores = append(allScores, peerScores)
} }
} }
for _, snap := range m { for _, snap := range m {
band := s.bandScoreThresholds.Bucket(snap.Score) band := s.bandScoreThresholds.Bucket(snap.Score)
scoreMap[band] += 1 scoreMap[band] += 1
} }
s.metricer.SetPeerScores(scoreMap) s.metricer.SetPeerScores(scoreMap, allScores)
} }
} }
......
...@@ -22,7 +22,7 @@ type PeerScorerTestSuite struct { ...@@ -22,7 +22,7 @@ type PeerScorerTestSuite struct {
suite.Suite suite.Suite
mockStore *p2pMocks.Peerstore mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer mockMetricer *p2pMocks.ScoreMetrics
bandScorer *p2p.BandScoreThresholds bandScorer *p2p.BandScoreThresholds
logger log.Logger logger log.Logger
} }
...@@ -30,7 +30,7 @@ type PeerScorerTestSuite struct { ...@@ -30,7 +30,7 @@ type PeerScorerTestSuite struct {
// SetupTest sets up the test suite. // SetupTest sets up the test suite.
func (testSuite *PeerScorerTestSuite) SetupTest() { func (testSuite *PeerScorerTestSuite) SetupTest() {
testSuite.mockStore = &p2pMocks.Peerstore{} testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{} testSuite.mockMetricer = &p2pMocks.ScoreMetrics{}
bandScorer, err := p2p.NewBandScorer("-40:graylist;0:friend;") bandScorer, err := p2p.NewBandScorer("-40:graylist;0:friend;")
testSuite.NoError(err) testSuite.NoError(err)
testSuite.bandScorer = bandScorer testSuite.bandScorer = bandScorer
...@@ -77,14 +77,15 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() { ...@@ -77,14 +77,15 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
) )
inspectFn := scorer.SnapshotHook() inspectFn := scorer.SnapshotHook()
scores := store.PeerScores{}
// Expect updating the peer store // Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: float64(-100)}).Return(nil).Once() testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: float64(-100)}).Return(scores, nil).Once()
// The metricer should then be called with the peer score band map // The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{ testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
"friend": 0, "friend": 0,
"graylist": 1, "graylist": 1,
}).Return(nil).Once() }, []store.PeerScores{scores}).Return(nil).Once()
// Apply the snapshot // Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{ snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
...@@ -95,13 +96,13 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() { ...@@ -95,13 +96,13 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
inspectFn(snapshotMap) inspectFn(snapshotMap)
// Expect updating the peer store // Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: 0}).Return(nil).Once() testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: 0}).Return(scores, nil).Once()
// The metricer should then be called with the peer score band map // The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{ testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
"friend": 1, "friend": 1,
"graylist": 0, "graylist": 0,
}).Return(nil).Once() }, []store.PeerScores{scores}).Return(nil).Once()
// Apply the snapshot // Apply the snapshot
snapshotMap = map[peer.ID]*pubsub.PeerScoreSnapshot{ snapshotMap = map[peer.ID]*pubsub.PeerScoreSnapshot{
...@@ -111,33 +112,3 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() { ...@@ -111,33 +112,3 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
} }
inspectFn(snapshotMap) inspectFn(snapshotMap)
} }
// TestScorer_SnapshotHookBlocksPeer tests running the snapshot hook on the peer scorer with a peer score below the threshold.
// This implies that the peer should be blocked.
func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHookBlocksPeer() {
scorer := p2p.NewScorer(
&rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger,
)
inspectFn := scorer.SnapshotHook()
// Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: float64(-101)}).Return(nil).Once()
// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
"friend": 0,
"graylist": 1,
}).Return(nil)
// Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
peer.ID("peer1"): {
Score: -101,
},
}
inspectFn(snapshotMap)
}
...@@ -36,7 +36,7 @@ type PeerScoresTestSuite struct { ...@@ -36,7 +36,7 @@ type PeerScoresTestSuite struct {
suite.Suite suite.Suite
mockStore *p2pMocks.Peerstore mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer mockMetricer *p2pMocks.ScoreMetrics
bandScorer BandScoreThresholds bandScorer BandScoreThresholds
logger log.Logger logger log.Logger
} }
...@@ -44,7 +44,7 @@ type PeerScoresTestSuite struct { ...@@ -44,7 +44,7 @@ type PeerScoresTestSuite struct {
// SetupTest sets up the test suite. // SetupTest sets up the test suite.
func (testSuite *PeerScoresTestSuite) SetupTest() { func (testSuite *PeerScoresTestSuite) SetupTest() {
testSuite.mockStore = &p2pMocks.Peerstore{} testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{} testSuite.mockMetricer = &p2pMocks.ScoreMetrics{}
bandScorer, err := NewBandScorer("0:graylist;") bandScorer, err := NewBandScorer("0:graylist;")
testSuite.NoError(err) testSuite.NoError(err)
testSuite.bandScorer = *bandScorer testSuite.bandScorer = *bandScorer
...@@ -157,7 +157,7 @@ func (testSuite *PeerScoresTestSuite) TestNegativeScores() { ...@@ -157,7 +157,7 @@ func (testSuite *PeerScoresTestSuite) TestNegativeScores() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
testSuite.mockMetricer.On("SetPeerScores", mock.Anything).Return(nil) testSuite.mockMetricer.On("SetPeerScores", mock.Anything, mock.Anything).Return(nil)
// Construct 20 hosts using the [getNetHosts] function. // Construct 20 hosts using the [getNetHosts] function.
hosts := getNetHosts(testSuite, ctx, 20) hosts := getNetHosts(testSuite, ctx, 20)
......
...@@ -41,7 +41,7 @@ type ScoreDatastore interface { ...@@ -41,7 +41,7 @@ type ScoreDatastore interface {
GetPeerScore(id peer.ID) (float64, error) GetPeerScore(id peer.ID) (float64, error)
// SetScore applies the given store diff to the specified peer // SetScore applies the given store diff to the specified peer
SetScore(id peer.ID, diff ScoreDiff) error SetScore(id peer.ID, diff ScoreDiff) (PeerScores, error)
} }
// ScoreDiff defines a type-safe batch of changes to apply to the peer-scoring record of the peer. // ScoreDiff defines a type-safe batch of changes to apply to the peer-scoring record of the peer.
......
...@@ -84,7 +84,8 @@ func (d *ipBanBook) SetIPBanExpiration(ip net.IP, expirationTime time.Time) erro ...@@ -84,7 +84,8 @@ func (d *ipBanBook) SetIPBanExpiration(ip net.IP, expirationTime time.Time) erro
if expirationTime == (time.Time{}) { if expirationTime == (time.Time{}) {
return d.book.deleteRecord(ip.To16().String()) return d.book.deleteRecord(ip.To16().String())
} }
return d.book.SetRecord(ip.To16().String(), ipBanUpdate(expirationTime)) _, err := d.book.SetRecord(ip.To16().String(), ipBanUpdate(expirationTime))
return err
} }
func (d *ipBanBook) Close() { func (d *ipBanBook) Close() {
......
...@@ -80,7 +80,8 @@ func (d *peerBanBook) SetPeerBanExpiration(id peer.ID, expirationTime time.Time) ...@@ -80,7 +80,8 @@ func (d *peerBanBook) SetPeerBanExpiration(id peer.ID, expirationTime time.Time)
if expirationTime == (time.Time{}) { if expirationTime == (time.Time{}) {
return d.book.deleteRecord(id) return d.book.deleteRecord(id)
} }
return d.book.SetRecord(id, peerBanUpdate(expirationTime)) _, err := d.book.SetRecord(id, peerBanUpdate(expirationTime))
return err
} }
func (d *peerBanBook) Close() { func (d *peerBanBook) Close() {
......
...@@ -124,27 +124,27 @@ func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) { ...@@ -124,27 +124,27 @@ func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
return v, nil return v, nil
} }
func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) error { func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) (V, error) {
d.Lock() d.Lock()
defer d.Unlock() defer d.Unlock()
rec, err := d.getRecord(key) rec, err := d.getRecord(key)
if err == UnknownRecordErr { // instantiate new record if it does not exist yet if err == UnknownRecordErr { // instantiate new record if it does not exist yet
rec = d.newRecord() rec = d.newRecord()
} else if err != nil { } else if err != nil {
return err return d.newRecord(), err
} }
rec.SetLastUpdated(d.clock.Now()) rec.SetLastUpdated(d.clock.Now())
diff.Apply(rec) diff.Apply(rec)
data, err := rec.MarshalBinary() data, err := rec.MarshalBinary()
if err != nil { if err != nil {
return fmt.Errorf("failed to encode record for key %v: %w", key, err) return d.newRecord(), fmt.Errorf("failed to encode record for key %v: %w", key, err)
} }
err = d.store.Put(d.ctx, d.dsKey(key), data) err = d.store.Put(d.ctx, d.dsKey(key), data)
if err != nil { if err != nil {
return fmt.Errorf("storing updated record for key %v: %w", key, err) return d.newRecord(), fmt.Errorf("storing updated record for key %v: %w", key, err)
} }
d.cache.Add(key, rec) d.cache.Add(key, rec)
return nil return rec, nil
} }
// prune deletes entries from the store that are older than the configured prune expiration. // prune deletes entries from the store that are older than the configured prune expiration.
......
...@@ -86,8 +86,9 @@ func (d *scoreBook) GetPeerScore(id peer.ID) (float64, error) { ...@@ -86,8 +86,9 @@ func (d *scoreBook) GetPeerScore(id peer.ID) (float64, error) {
return scores.Gossip.Total, nil return scores.Gossip.Total, nil
} }
func (d *scoreBook) SetScore(id peer.ID, diff ScoreDiff) error { func (d *scoreBook) SetScore(id peer.ID, diff ScoreDiff) (PeerScores, error) {
return d.book.SetRecord(id, diff) v, err := d.book.SetRecord(id, diff)
return v.PeerScores, err
} }
func (d *scoreBook) Close() { func (d *scoreBook) Close() {
......
...@@ -26,18 +26,21 @@ func TestRoundTripGossipScore(t *testing.T) { ...@@ -26,18 +26,21 @@ func TestRoundTripGossipScore(t *testing.T) {
id := peer.ID("aaaa") id := peer.ID("aaaa")
store := createMemoryStore(t) store := createMemoryStore(t)
score := 123.45 score := 123.45
err := store.SetScore(id, &GossipScores{Total: score}) res, err := store.SetScore(id, &GossipScores{Total: score})
require.NoError(t, err) require.NoError(t, err)
assertPeerScores(t, store, id, PeerScores{Gossip: GossipScores{Total: score}}) expected := PeerScores{Gossip: GossipScores{Total: score}}
require.Equal(t, expected, res)
assertPeerScores(t, store, id, expected)
} }
func TestUpdateGossipScore(t *testing.T) { func TestUpdateGossipScore(t *testing.T) {
id := peer.ID("aaaa") id := peer.ID("aaaa")
store := createMemoryStore(t) store := createMemoryStore(t)
score := 123.45 score := 123.45
require.NoError(t, store.SetScore(id, &GossipScores{Total: 444.223})) setScoreRequired(t, store, id, &GossipScores{Total: 444.223})
require.NoError(t, store.SetScore(id, &GossipScores{Total: score})) setScoreRequired(t, store, id, &GossipScores{Total: score})
assertPeerScores(t, store, id, PeerScores{Gossip: GossipScores{Total: score}}) assertPeerScores(t, store, id, PeerScores{Gossip: GossipScores{Total: score}})
} }
...@@ -48,8 +51,8 @@ func TestStoreScoresForMultiplePeers(t *testing.T) { ...@@ -48,8 +51,8 @@ func TestStoreScoresForMultiplePeers(t *testing.T) {
store := createMemoryStore(t) store := createMemoryStore(t)
score1 := 123.45 score1 := 123.45
score2 := 453.22 score2 := 453.22
require.NoError(t, store.SetScore(id1, &GossipScores{Total: score1})) setScoreRequired(t, store, id1, &GossipScores{Total: score1})
require.NoError(t, store.SetScore(id2, &GossipScores{Total: score2})) setScoreRequired(t, store, id2, &GossipScores{Total: score2})
assertPeerScores(t, store, id1, PeerScores{Gossip: GossipScores{Total: score1}}) assertPeerScores(t, store, id1, PeerScores{Gossip: GossipScores{Total: score1}})
assertPeerScores(t, store, id2, PeerScores{Gossip: GossipScores{Total: score2}}) assertPeerScores(t, store, id2, PeerScores{Gossip: GossipScores{Total: score2}})
...@@ -61,7 +64,7 @@ func TestPersistData(t *testing.T) { ...@@ -61,7 +64,7 @@ func TestPersistData(t *testing.T) {
backingStore := sync.MutexWrap(ds.NewMapDatastore()) backingStore := sync.MutexWrap(ds.NewMapDatastore())
store := createPeerstoreWithBacking(t, backingStore) store := createPeerstoreWithBacking(t, backingStore)
require.NoError(t, store.SetScore(id, &GossipScores{Total: score})) setScoreRequired(t, store, id, &GossipScores{Total: score})
// Close and recreate a new store from the same backing // Close and recreate a new store from the same backing
require.NoError(t, store.Close()) require.NoError(t, store.Close())
...@@ -92,17 +95,17 @@ func TestPrune(t *testing.T) { ...@@ -92,17 +95,17 @@ func TestPrune(t *testing.T) {
firstStore := clock.Now() firstStore := clock.Now()
// Set some scores all 30 minutes apart so they have different expiry times // Set some scores all 30 minutes apart so they have different expiry times
require.NoError(t, book.SetScore("aaaa", &GossipScores{Total: 123.45})) setScoreRequired(t, book, "aaaa", &GossipScores{Total: 123.45})
clock.AdvanceTime(30 * time.Minute) clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("bbbb", &GossipScores{Total: 123.45})) setScoreRequired(t, book, "bbbb", &GossipScores{Total: 123.45})
clock.AdvanceTime(30 * time.Minute) clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("cccc", &GossipScores{Total: 123.45})) setScoreRequired(t, book, "cccc", &GossipScores{Total: 123.45})
clock.AdvanceTime(30 * time.Minute) clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("dddd", &GossipScores{Total: 123.45})) setScoreRequired(t, book, "dddd", &GossipScores{Total: 123.45})
clock.AdvanceTime(30 * time.Minute) clock.AdvanceTime(30 * time.Minute)
// Update bbbb again which should extend its expiry // Update bbbb again which should extend its expiry
require.NoError(t, book.SetScore("bbbb", &GossipScores{Total: 123.45})) setScoreRequired(t, book, "bbbb", &GossipScores{Total: 123.45})
require.True(t, hasScoreRecorded("aaaa")) require.True(t, hasScoreRecorded("aaaa"))
require.True(t, hasScoreRecorded("bbbb")) require.True(t, hasScoreRecorded("bbbb"))
...@@ -147,7 +150,7 @@ func TestPruneMultipleBatches(t *testing.T) { ...@@ -147,7 +150,7 @@ func TestPruneMultipleBatches(t *testing.T) {
// Set scores for more peers than the max batch size // Set scores for more peers than the max batch size
peerCount := maxPruneBatchSize*3 + 5 peerCount := maxPruneBatchSize*3 + 5
for i := 0; i < peerCount; i++ { for i := 0; i < peerCount; i++ {
require.NoError(t, book.SetScore(peer.ID(strconv.Itoa(i)), &GossipScores{Total: 123.45})) setScoreRequired(t, book, peer.ID(strconv.Itoa(i)), &GossipScores{Total: 123.45})
} }
clock.AdvanceTime(book.book.recordExpiry + 1) clock.AdvanceTime(book.book.recordExpiry + 1)
require.NoError(t, book.book.prune()) require.NoError(t, book.book.prune())
...@@ -169,7 +172,7 @@ func TestIgnoreOutdatedScores(t *testing.T) { ...@@ -169,7 +172,7 @@ func TestIgnoreOutdatedScores(t *testing.T) {
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()), retentionPeriod) book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()), retentionPeriod)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, book.SetScore("a", &GossipScores{Total: 123.45})) setScoreRequired(t, book, "a", &GossipScores{Total: 123.45})
clock.AdvanceTime(retentionPeriod + 1) clock.AdvanceTime(retentionPeriod + 1)
// Not available from cache // Not available from cache
...@@ -211,3 +214,8 @@ func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) Extend ...@@ -211,3 +214,8 @@ func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) Extend
}) })
return eps return eps
} }
func setScoreRequired(t *testing.T, store ScoreDatastore, id peer.ID, diff *GossipScores) {
_, err := store.SetScore(id, diff)
require.NoError(t, err)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment