Commit 0eb68833 authored by Adrian Sutton's avatar Adrian Sutton

op-node: Integrate application scores into overall peer score.

parent ad9305f6
...@@ -127,25 +127,25 @@ func (s *peerApplicationScorer) stop() { ...@@ -127,25 +127,25 @@ func (s *peerApplicationScorer) stop() {
s.done.Wait() s.done.Wait()
} }
type noopApplicationScorer struct{} type NoopApplicationScorer struct{}
func (n *noopApplicationScorer) ApplicationScore(_ peer.ID) float64 { func (n *NoopApplicationScorer) ApplicationScore(_ peer.ID) float64 {
return 0 return 0
} }
func (n *noopApplicationScorer) onValidResponse(_ peer.ID) { func (n *NoopApplicationScorer) onValidResponse(_ peer.ID) {
} }
func (n *noopApplicationScorer) onResponseError(_ peer.ID) { func (n *NoopApplicationScorer) onResponseError(_ peer.ID) {
} }
func (n *noopApplicationScorer) onRejectedPayload(_ peer.ID) { func (n *NoopApplicationScorer) onRejectedPayload(_ peer.ID) {
} }
func (n *noopApplicationScorer) start() { func (n *NoopApplicationScorer) start() {
} }
func (n *noopApplicationScorer) stop() { func (n *NoopApplicationScorer) stop() {
} }
var _ ApplicationScorer = (*noopApplicationScorer)(nil) var _ ApplicationScorer = (*NoopApplicationScorer)(nil)
...@@ -100,7 +100,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -100,7 +100,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
if scoreParams != nil { if scoreParams != nil {
n.appScorer = newPeerApplicationScorer(resourcesCtx, log, clock.SystemClock, &scoreParams.ApplicationScoring, eps, n.host.Network().Peers) n.appScorer = newPeerApplicationScorer(resourcesCtx, log, clock.SystemClock, &scoreParams.ApplicationScoring, eps, n.host.Network().Peers)
} else { } else {
n.appScorer = &noopApplicationScorer{} n.appScorer = &NoopApplicationScorer{}
} }
// Activate the P2P req-resp sync if enabled by feature-flag. // Activate the P2P req-resp sync if enabled by feature-flag.
if setup.ReqRespSyncEnabled() { if setup.ReqRespSyncEnabled() {
...@@ -125,7 +125,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -125,7 +125,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber) n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
} }
} }
n.scorer = NewScorer(rollupCfg, eps, metrics, log) n.scorer = NewScorer(rollupCfg, eps, metrics, n.appScorer, log)
// notify of any new connections/streams/etc. // notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics)) n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled. // note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
type scorer struct { type scorer struct {
peerStore Peerstore peerStore Peerstore
metricer ScoreMetrics metricer ScoreMetrics
appScorer ApplicationScorer
log log.Logger log log.Logger
cfg *rollup.Config cfg *rollup.Config
} }
...@@ -36,6 +37,7 @@ type Peerstore interface { ...@@ -36,6 +37,7 @@ type Peerstore interface {
// Scorer is a peer scorer that scores peers based on application-specific metrics. // Scorer is a peer scorer that scores peers based on application-specific metrics.
type Scorer interface { type Scorer interface {
SnapshotHook() pubsub.ExtendedPeerScoreInspectFn SnapshotHook() pubsub.ExtendedPeerScoreInspectFn
ApplicationScore(peer.ID) float64
} }
//go:generate mockery --name ScoreMetrics --output mocks/ //go:generate mockery --name ScoreMetrics --output mocks/
...@@ -44,10 +46,11 @@ type ScoreMetrics interface { ...@@ -44,10 +46,11 @@ type ScoreMetrics interface {
} }
// NewScorer returns a new peer scorer. // NewScorer returns a new peer scorer.
func NewScorer(cfg *rollup.Config, peerStore Peerstore, metricer ScoreMetrics, log log.Logger) Scorer { func NewScorer(cfg *rollup.Config, peerStore Peerstore, metricer ScoreMetrics, appScorer ApplicationScorer, log log.Logger) Scorer {
return &scorer{ return &scorer{
peerStore: peerStore, peerStore: peerStore,
metricer: metricer, metricer: metricer,
appScorer: appScorer,
log: log, log: log,
cfg: cfg, cfg: cfg,
} }
...@@ -84,3 +87,7 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn { ...@@ -84,3 +87,7 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
s.metricer.SetPeerScores(allScores) s.metricer.SetPeerScores(allScores)
} }
} }
func (s *scorer) ApplicationScore(id peer.ID) float64 {
return s.appScorer.ApplicationScore(id)
}
...@@ -44,6 +44,7 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() { ...@@ -44,6 +44,7 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
&rollup.Config{L2ChainID: big.NewInt(123)}, &rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore, testSuite.mockStore,
testSuite.mockMetricer, testSuite.mockMetricer,
&p2p.NoopApplicationScorer{},
testSuite.logger, testSuite.logger,
) )
inspectFn := scorer.SnapshotHook() inspectFn := scorer.SnapshotHook()
......
...@@ -13,8 +13,11 @@ func ConfigurePeerScoring(gossipConf GossipSetupConfigurables, scorer Scorer, lo ...@@ -13,8 +13,11 @@ func ConfigurePeerScoring(gossipConf GossipSetupConfigurables, scorer Scorer, lo
opts := []pubsub.Option{} opts := []pubsub.Option{}
if scoreParams != nil { if scoreParams != nil {
peerScoreThresholds := NewPeerScoreThresholds() peerScoreThresholds := NewPeerScoreThresholds()
// Create copy of params before modifying the AppSpecificScore
params := scoreParams.PeerScoring
params.AppSpecificScore = scorer.ApplicationScore
opts = []pubsub.Option{ opts = []pubsub.Option{
pubsub.WithPeerScore(&scoreParams.PeerScoring, &peerScoreThresholds), pubsub.WithPeerScore(&params, &peerScoreThresholds),
pubsub.WithPeerScoreInspect(scorer.SnapshotHook(), peerScoreInspectFrequency), pubsub.WithPeerScoreInspect(scorer.SnapshotHook(), peerScoreInspectFrequency),
} }
} else { } else {
......
...@@ -82,6 +82,18 @@ func getNetHosts(testSuite *PeerScoresTestSuite, ctx context.Context, n int) []h ...@@ -82,6 +82,18 @@ func getNetHosts(testSuite *PeerScoresTestSuite, ctx context.Context, n int) []h
return out return out
} }
type discriminatingAppScorer struct {
badPeer peer.ID
NoopApplicationScorer
}
func (d *discriminatingAppScorer) ApplicationScore(id peer.ID) float64 {
if id == d.badPeer {
return -1000
}
return 0
}
func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []host.Host) []*pubsub.PubSub { func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []host.Host) []*pubsub.PubSub {
var psubs []*pubsub.PubSub var psubs []*pubsub.PubSub
...@@ -100,17 +112,10 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts [] ...@@ -100,17 +112,10 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
scorer := NewScorer( scorer := NewScorer(
&rollup.Config{L2ChainID: big.NewInt(123)}, &rollup.Config{L2ChainID: big.NewInt(123)},
extPeerStore, testSuite.mockMetricer, logger) extPeerStore, testSuite.mockMetricer, &discriminatingAppScorer{badPeer: hosts[0].ID()}, logger)
opts = append(opts, ConfigurePeerScoring(&Config{ opts = append(opts, ConfigurePeerScoring(&Config{
ScoringParams: &ScoringParams{ ScoringParams: &ScoringParams{
PeerScoring: pubsub.PeerScoreParams{ PeerScoring: pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 {
if p == hosts[0].ID() {
return -1000
} else {
return 0
}
},
AppSpecificWeight: 1, AppSpecificWeight: 1,
DecayInterval: time.Second, DecayInterval: time.Second,
DecayToZero: 0.01, DecayToZero: 0.01,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment