Commit ad9305f6 authored by Adrian Sutton's avatar Adrian Sutton

op-node: Hook peerApplicationScorer into p2p/node

parent e31c3dab
...@@ -15,6 +15,15 @@ type ScoreBook interface { ...@@ -15,6 +15,15 @@ type ScoreBook interface {
SetScore(id peer.ID, diff store.ScoreDiff) (store.PeerScores, error) SetScore(id peer.ID, diff store.ScoreDiff) (store.PeerScores, error)
} }
type ApplicationScorer interface {
ApplicationScore(id peer.ID) float64
onValidResponse(id peer.ID)
onResponseError(id peer.ID)
onRejectedPayload(id peer.ID)
start()
stop()
}
type peerApplicationScorer struct { type peerApplicationScorer struct {
ctx context.Context ctx context.Context
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
...@@ -27,6 +36,8 @@ type peerApplicationScorer struct { ...@@ -27,6 +36,8 @@ type peerApplicationScorer struct {
done sync.WaitGroup done sync.WaitGroup
} }
var _ ApplicationScorer = (*peerApplicationScorer)(nil)
func newPeerApplicationScorer(ctx context.Context, logger log.Logger, clock clock.Clock, params *ApplicationScoreParams, scorebook ScoreBook, connectedPeers func() []peer.ID) *peerApplicationScorer { func newPeerApplicationScorer(ctx context.Context, logger log.Logger, clock clock.Clock, params *ApplicationScoreParams, scorebook ScoreBook, connectedPeers func() []peer.ID) *peerApplicationScorer {
ctx, cancelFunc := context.WithCancel(ctx) ctx, cancelFunc := context.WithCancel(ctx)
return &peerApplicationScorer{ return &peerApplicationScorer{
...@@ -115,3 +126,26 @@ func (s *peerApplicationScorer) stop() { ...@@ -115,3 +126,26 @@ func (s *peerApplicationScorer) stop() {
s.cancelFunc() s.cancelFunc()
s.done.Wait() s.done.Wait()
} }
type noopApplicationScorer struct{}
func (n *noopApplicationScorer) ApplicationScore(_ peer.ID) float64 {
return 0
}
func (n *noopApplicationScorer) onValidResponse(_ peer.ID) {
}
func (n *noopApplicationScorer) onResponseError(_ peer.ID) {
}
func (n *noopApplicationScorer) onRejectedPayload(_ peer.ID) {
}
func (n *noopApplicationScorer) start() {
}
func (n *noopApplicationScorer) stop() {
}
var _ ApplicationScorer = (*noopApplicationScorer)(nil)
...@@ -138,11 +138,11 @@ func (conf *Config) Disabled() bool { ...@@ -138,11 +138,11 @@ func (conf *Config) Disabled() bool {
return conf.DisableP2P return conf.DisableP2P
} }
func (conf *Config) PeerScoringParams() *pubsub.PeerScoreParams { func (conf *Config) PeerScoringParams() *ScoringParams {
if conf.ScoringParams == nil { if conf.ScoringParams == nil {
return nil return nil
} }
return &conf.ScoringParams.PeerScoring return conf.ScoringParams
} }
func (conf *Config) BanPeers() bool { func (conf *Config) BanPeers() bool {
......
...@@ -51,7 +51,7 @@ var MessageDomainInvalidSnappy = [4]byte{0, 0, 0, 0} ...@@ -51,7 +51,7 @@ var MessageDomainInvalidSnappy = [4]byte{0, 0, 0, 0}
var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0} var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
type GossipSetupConfigurables interface { type GossipSetupConfigurables interface {
PeerScoringParams() *pubsub.PeerScoreParams PeerScoringParams() *ScoringParams
// ConfigureGossip creates configuration options to apply to the GossipSub setup // ConfigureGossip creates configuration options to apply to the GossipSub setup
ConfigureGossip(rollupCfg *rollup.Config) []pubsub.Option ConfigureGossip(rollupCfg *rollup.Config) []pubsub.Option
} }
......
...@@ -149,7 +149,7 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host ...@@ -149,7 +149,7 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host
var scoreRetention time.Duration var scoreRetention time.Duration
if peerScoreParams != nil { if peerScoreParams != nil {
// Use the same retention period as gossip will if available // Use the same retention period as gossip will if available
scoreRetention = peerScoreParams.RetainScore scoreRetention = peerScoreParams.PeerScoring.RetainScore
} else { } else {
// Disable score GC if peer scoring is disabled // Disable score GC if peer scoring is disabled
scoreRetention = 0 scoreRetention = 0
......
...@@ -37,6 +37,7 @@ type NodeP2P struct { ...@@ -37,6 +37,7 @@ type NodeP2P struct {
connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
peerMonitor *monitor.PeerMonitor // peer monitor to disconnect bad peers, may be nil even with p2p enabled peerMonitor *monitor.PeerMonitor // peer monitor to disconnect bad peers, may be nil even with p2p enabled
store store.ExtendedPeerstore // peerstore of host, with extra bindings for scoring and banning store store.ExtendedPeerstore // peerstore of host, with extra bindings for scoring and banning
appScorer ApplicationScorer
log log.Logger log log.Logger
// the below components are all optional, and may be nil. They require the host to not be nil. // the below components are all optional, and may be nil. They require the host to not be nil.
dv5Local *enode.LocalNode // p2p discovery identity dv5Local *enode.LocalNode // p2p discovery identity
...@@ -89,6 +90,18 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -89,6 +90,18 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.gater = extra.ConnectionGater() n.gater = extra.ConnectionGater()
n.connMgr = extra.ConnectionManager() n.connMgr = extra.ConnectionManager()
} }
eps, ok := n.host.Peerstore().(store.ExtendedPeerstore)
if !ok {
return fmt.Errorf("cannot init without extended peerstore: %w", err)
}
n.store = eps
scoreParams := setup.PeerScoringParams()
if scoreParams != nil {
n.appScorer = newPeerApplicationScorer(resourcesCtx, log, clock.SystemClock, &scoreParams.ApplicationScoring, eps, n.host.Network().Peers)
} else {
n.appScorer = &noopApplicationScorer{}
}
// Activate the P2P req-resp sync if enabled by feature-flag. // Activate the P2P req-resp sync if enabled by feature-flag.
if setup.ReqRespSyncEnabled() { if setup.ReqRespSyncEnabled() {
n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics) n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics)
...@@ -112,11 +125,6 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -112,11 +125,6 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber) n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
} }
} }
eps, ok := n.host.Peerstore().(store.ExtendedPeerstore)
if !ok {
return fmt.Errorf("cannot init without extended peerstore: %w", err)
}
n.store = eps
n.scorer = NewScorer(rollupCfg, eps, metrics, log) n.scorer = NewScorer(rollupCfg, eps, metrics, log)
// notify of any new connections/streams/etc. // notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics)) n.host.Network().Notify(NewNetworkNotifier(log, metrics))
...@@ -150,6 +158,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -150,6 +158,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, setup.BanThreshold(), setup.BanDuration()) n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, setup.BanThreshold(), setup.BanDuration())
n.peerMonitor.Start() n.peerMonitor.Start()
} }
n.appScorer.start()
} }
return nil return nil
} }
...@@ -258,6 +267,9 @@ func (n *NodeP2P) Close() error { ...@@ -258,6 +267,9 @@ func (n *NodeP2P) Close() error {
} }
} }
} }
if n.appScorer != nil {
n.appScorer.stop()
}
return result.ErrorOrNil() return result.ErrorOrNil()
} }
......
...@@ -9,12 +9,12 @@ import ( ...@@ -9,12 +9,12 @@ import (
func ConfigurePeerScoring(gossipConf GossipSetupConfigurables, scorer Scorer, log log.Logger) []pubsub.Option { func ConfigurePeerScoring(gossipConf GossipSetupConfigurables, scorer Scorer, log log.Logger) []pubsub.Option {
// If we want to completely disable scoring config here, we can use the [peerScoringParams] // If we want to completely disable scoring config here, we can use the [peerScoringParams]
// to return early without returning any [pubsub.Option]. // to return early without returning any [pubsub.Option].
peerScoreParams := gossipConf.PeerScoringParams() scoreParams := gossipConf.PeerScoringParams()
peerScoreThresholds := NewPeerScoreThresholds()
opts := []pubsub.Option{} opts := []pubsub.Option{}
if peerScoreParams != nil { if scoreParams != nil {
peerScoreThresholds := NewPeerScoreThresholds()
opts = []pubsub.Option{ opts = []pubsub.Option{
pubsub.WithPeerScore(peerScoreParams, &peerScoreThresholds), pubsub.WithPeerScore(&scoreParams.PeerScoring, &peerScoreThresholds),
pubsub.WithPeerScoreInspect(scorer.SnapshotHook(), peerScoreInspectFrequency), pubsub.WithPeerScoreInspect(scorer.SnapshotHook(), peerScoreInspectFrequency),
} }
} else { } else {
......
...@@ -69,7 +69,7 @@ func (p *Prepared) ConfigureGossip(rollupCfg *rollup.Config) []pubsub.Option { ...@@ -69,7 +69,7 @@ func (p *Prepared) ConfigureGossip(rollupCfg *rollup.Config) []pubsub.Option {
} }
} }
func (p *Prepared) PeerScoringParams() *pubsub.PeerScoreParams { func (p *Prepared) PeerScoringParams() *ScoringParams {
return nil return nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment