Commit bd55b36f authored by Adrian Sutton's avatar Adrian Sutton

op-node: Make ban threshold and duration configurable.

parent 78700767
...@@ -51,6 +51,20 @@ var ( ...@@ -51,6 +51,20 @@ var (
Required: false, Required: false,
EnvVar: p2pEnv("PEER_BANNING"), EnvVar: p2pEnv("PEER_BANNING"),
} }
BanningThreshold = cli.Float64Flag{
Name: "p2p.ban.threshold",
Usage: "The minimum score below which peers are disconnected and banned.",
Required: false,
Value: -100,
EnvVar: p2pEnv("PEER_BANNING_THRESHOLD"),
}
BanningDuration = cli.DurationFlag{
Name: "p2p.ban.duration",
Usage: "The duration that peers are banned for.",
Required: false,
Value: 1 * time.Hour,
EnvVar: p2pEnv("PEER_BANNING_DURATION"),
}
TopicScoring = cli.StringFlag{ TopicScoring = cli.StringFlag{
Name: "p2p.scoring.topics", Name: "p2p.scoring.topics",
...@@ -294,6 +308,8 @@ var p2pFlags = []cli.Flag{ ...@@ -294,6 +308,8 @@ var p2pFlags = []cli.Flag{
PeerScoring, PeerScoring,
PeerScoreBands, PeerScoreBands,
Banning, Banning,
BanningThreshold,
BanningDuration,
TopicScoring, TopicScoring,
ListenIP, ListenIP,
ListenTCPPort, ListenTCPPort,
......
...@@ -62,7 +62,7 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) { ...@@ -62,7 +62,7 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) {
return nil, fmt.Errorf("failed to load p2p peer score bands: %w", err) return nil, fmt.Errorf("failed to load p2p peer score bands: %w", err)
} }
if err := loadBanningOption(conf, ctx); err != nil { if err := loadBanningOptions(conf, ctx); err != nil {
return nil, fmt.Errorf("failed to load banning option: %w", err) return nil, fmt.Errorf("failed to load banning option: %w", err)
} }
...@@ -135,10 +135,11 @@ func loadPeerScoreBands(conf *p2p.Config, ctx *cli.Context) error { ...@@ -135,10 +135,11 @@ func loadPeerScoreBands(conf *p2p.Config, ctx *cli.Context) error {
return nil return nil
} }
// loadBanningOption loads whether or not to ban peers from the CLI context. // loadBanningOptions loads whether or not to ban peers from the CLI context.
func loadBanningOption(conf *p2p.Config, ctx *cli.Context) error { func loadBanningOptions(conf *p2p.Config, ctx *cli.Context) error {
ban := ctx.GlobalBool(flags.Banning.Name) conf.BanningEnabled = ctx.GlobalBool(flags.Banning.Name)
conf.BanningEnabled = ban conf.BanningThreshold = ctx.GlobalFloat64(flags.BanningThreshold.Name)
conf.BanningDuration = ctx.GlobalDuration(flags.BanningDuration.Name)
return nil return nil
} }
......
...@@ -44,6 +44,10 @@ type SetupP2P interface { ...@@ -44,6 +44,10 @@ type SetupP2P interface {
// Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled. // Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled.
Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error) Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error)
TargetPeers() uint TargetPeers() uint
BanPeers() bool
BanThreshold() float64
BanDuration() time.Duration
PeerBandScorer() *BandScoreThresholds
GossipSetupConfigurables GossipSetupConfigurables
ReqRespSyncEnabled() bool ReqRespSyncEnabled() bool
} }
...@@ -66,8 +70,11 @@ type Config struct { ...@@ -66,8 +70,11 @@ type Config struct {
// Peer Score Band Thresholds // Peer Score Band Thresholds
BandScoreThresholds BandScoreThresholds BandScoreThresholds BandScoreThresholds
// Whether to ban peers based on their [PeerScoring] score. // Whether to ban peers based on their [PeerScoring] score. Should be negative.
BanningEnabled bool BanningEnabled bool
// Minimum score before peers are disconnected and banned
BanningThreshold float64
BanningDuration time.Duration
ListenIP net.IP ListenIP net.IP
ListenTCPPort uint16 ListenTCPPort uint16
...@@ -143,6 +150,14 @@ func (conf *Config) BanPeers() bool { ...@@ -143,6 +150,14 @@ func (conf *Config) BanPeers() bool {
return conf.BanningEnabled return conf.BanningEnabled
} }
func (conf *Config) BanThreshold() float64 {
return conf.BanningThreshold
}
func (conf *Config) BanDuration() time.Duration {
return conf.BanningDuration
}
func (conf *Config) TopicScoringParams() *pubsub.TopicScoreParams { func (conf *Config) TopicScoringParams() *pubsub.TopicScoreParams {
return &conf.TopicScoring return &conf.TopicScoring
} }
......
...@@ -53,10 +53,8 @@ var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0} ...@@ -53,10 +53,8 @@ var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
type GossipSetupConfigurables interface { type GossipSetupConfigurables interface {
PeerScoringParams() *pubsub.PeerScoreParams PeerScoringParams() *pubsub.PeerScoreParams
TopicScoringParams() *pubsub.TopicScoreParams TopicScoringParams() *pubsub.TopicScoreParams
BanPeers() bool
// ConfigureGossip creates configuration options to apply to the GossipSub setup // ConfigureGossip creates configuration options to apply to the GossipSub setup
ConfigureGossip(rollupCfg *rollup.Config) []pubsub.Option ConfigureGossip(rollupCfg *rollup.Config) []pubsub.Option
PeerBandScorer() *BandScoreThresholds
} }
type GossipRuntimeConfig interface { type GossipRuntimeConfig interface {
......
...@@ -33,8 +33,7 @@ import ( ...@@ -33,8 +33,7 @@ import (
) )
const ( const (
staticPeerTag = "static" staticPeerTag = "static"
minAcceptedPeerScore = -100
) )
type ExtraHostFeatures interface { type ExtraHostFeatures interface {
......
...@@ -14,7 +14,6 @@ import ( ...@@ -14,7 +14,6 @@ import (
const ( const (
// Time delay between checking the score of each peer to avoid activity spikes // Time delay between checking the score of each peer to avoid activity spikes
checkInterval = 1 * time.Second checkInterval = 1 * time.Second
banDuration = 1 * time.Hour
) )
//go:generate mockery --name PeerManager --output mocks/ --with-expecter=true //go:generate mockery --name PeerManager --output mocks/ --with-expecter=true
...@@ -30,12 +29,13 @@ type PeerManager interface { ...@@ -30,12 +29,13 @@ type PeerManager interface {
// When it finds bad peers, it disconnects and bans them. // When it finds bad peers, it disconnects and bans them.
// A delay is introduced between each peer being checked to avoid spikes in system load. // A delay is introduced between each peer being checked to avoid spikes in system load.
type PeerMonitor struct { type PeerMonitor struct {
ctx context.Context ctx context.Context
cancelFn context.CancelFunc cancelFn context.CancelFunc
l log.Logger l log.Logger
clock clock.Clock clock clock.Clock
manager PeerManager manager PeerManager
minScore float64 minScore float64
banDuration time.Duration
bgTasks sync.WaitGroup bgTasks sync.WaitGroup
...@@ -44,54 +44,55 @@ type PeerMonitor struct { ...@@ -44,54 +44,55 @@ type PeerMonitor struct {
nextPeerIdx int nextPeerIdx int
} }
func NewPeerMonitor(ctx context.Context, l log.Logger, clock clock.Clock, manager PeerManager, minScore float64) *PeerMonitor { func NewPeerMonitor(ctx context.Context, l log.Logger, clock clock.Clock, manager PeerManager, minScore float64, banDuration time.Duration) *PeerMonitor {
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
return &PeerMonitor{ return &PeerMonitor{
ctx: ctx, ctx: ctx,
cancelFn: cancelFn, cancelFn: cancelFn,
l: l, l: l,
clock: clock, clock: clock,
manager: manager, manager: manager,
minScore: minScore, minScore: minScore,
banDuration: banDuration,
} }
} }
func (k *PeerMonitor) Start() { func (p *PeerMonitor) Start() {
k.bgTasks.Add(1) p.bgTasks.Add(1)
go k.background(k.checkNextPeer) go p.background(p.checkNextPeer)
} }
func (k *PeerMonitor) Stop() { func (p *PeerMonitor) Stop() {
k.cancelFn() p.cancelFn()
k.bgTasks.Wait() p.bgTasks.Wait()
} }
// checkNextPeer checks the next peer and disconnects and bans it if its score is too low and its not protected. // checkNextPeer checks the next peer and disconnects and bans it if its score is too low and its not protected.
// The first call gets the list of current peers and checks the first one, then each subsequent call checks the next // The first call gets the list of current peers and checks the first one, then each subsequent call checks the next
// peer in the list. When the end of the list is reached, an updated list of connected peers is retrieved and the process // peer in the list. When the end of the list is reached, an updated list of connected peers is retrieved and the process
// starts again. // starts again.
func (k *PeerMonitor) checkNextPeer() error { func (p *PeerMonitor) checkNextPeer() error {
// Get a new list of peers to check if we've checked all peers in the previous list // Get a new list of peers to check if we've checked all peers in the previous list
if k.nextPeerIdx >= len(k.peerList) { if p.nextPeerIdx >= len(p.peerList) {
k.peerList = k.manager.Peers() p.peerList = p.manager.Peers()
k.nextPeerIdx = 0 p.nextPeerIdx = 0
} }
if len(k.peerList) == 0 { if len(p.peerList) == 0 {
// No peers to check // No peers to check
return nil return nil
} }
id := k.peerList[k.nextPeerIdx] id := p.peerList[p.nextPeerIdx]
k.nextPeerIdx++ p.nextPeerIdx++
score, err := k.manager.GetPeerScore(id) score, err := p.manager.GetPeerScore(id)
if err != nil { if err != nil {
return fmt.Errorf("retrieve score for peer %v: %w", id, err) return fmt.Errorf("retrieve score for peer %v: %w", id, err)
} }
if score > k.minScore { if score > p.minScore {
return nil return nil
} }
if k.manager.IsStatic(id) { if p.manager.IsStatic(id) {
return nil return nil
} }
if err := k.manager.BanPeer(id, k.clock.Now().Add(banDuration)); err != nil { if err := p.manager.BanPeer(id, p.clock.Now().Add(p.banDuration)); err != nil {
return fmt.Errorf("banning peer %v: %w", id, err) return fmt.Errorf("banning peer %v: %w", id, err)
} }
...@@ -100,17 +101,17 @@ func (k *PeerMonitor) checkNextPeer() error { ...@@ -100,17 +101,17 @@ func (k *PeerMonitor) checkNextPeer() error {
// background is intended to run as a separate go routine. It will call the supplied action function every checkInterval // background is intended to run as a separate go routine. It will call the supplied action function every checkInterval
// until the context is done. // until the context is done.
func (k *PeerMonitor) background(action func() error) { func (p *PeerMonitor) background(action func() error) {
defer k.bgTasks.Done() defer p.bgTasks.Done()
ticker := k.clock.NewTicker(checkInterval) ticker := p.clock.NewTicker(checkInterval)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-k.ctx.Done(): case <-p.ctx.Done():
return return
case <-ticker.Ch(): case <-ticker.Ch():
if err := action(); err != nil { if err := action(); err != nil {
k.l.Warn("Error while checking connected peer score", "err", err) p.l.Warn("Error while checking connected peer score", "err", err)
} }
} }
} }
......
...@@ -15,11 +15,13 @@ import ( ...@@ -15,11 +15,13 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
const testBanDuration = 2 * time.Hour
func peerMonitorSetup(t *testing.T) (*PeerMonitor, *clock2.DeterministicClock, *mocks.PeerManager) { func peerMonitorSetup(t *testing.T) (*PeerMonitor, *clock2.DeterministicClock, *mocks.PeerManager) {
l := testlog.Logger(t, log.LvlInfo) l := testlog.Logger(t, log.LvlInfo)
clock := clock2.NewDeterministicClock(time.UnixMilli(10000)) clock := clock2.NewDeterministicClock(time.UnixMilli(10000))
manager := mocks.NewPeerManager(t) manager := mocks.NewPeerManager(t)
monitor := NewPeerMonitor(context.Background(), l, clock, manager, -100) monitor := NewPeerMonitor(context.Background(), l, clock, manager, -100, testBanDuration)
return monitor, clock, manager return monitor, clock, manager
} }
...@@ -95,7 +97,7 @@ func TestCheckNextPeer(t *testing.T) { ...@@ -95,7 +97,7 @@ func TestCheckNextPeer(t *testing.T) {
manager.EXPECT().Peers().Return(peerIDs).Once() manager.EXPECT().Peers().Return(peerIDs).Once()
manager.EXPECT().GetPeerScore(id).Return(-101, nil).Once() manager.EXPECT().GetPeerScore(id).Return(-101, nil).Once()
manager.EXPECT().IsProtected(id).Return(false).Once() manager.EXPECT().IsProtected(id).Return(false).Once()
manager.EXPECT().BanPeer(id, clock.Now().Add(banDuration)).Return(nil).Once() manager.EXPECT().BanPeer(id, clock.Now().Add(testBanDuration)).Return(nil).Once()
require.NoError(t, monitor.checkNextPeer()) require.NoError(t, monitor.checkNextPeer())
}) })
......
...@@ -155,7 +155,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -155,7 +155,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
} }
if setup.BanPeers() { if setup.BanPeers() {
n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, minAcceptedPeerScore) n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, setup.BanThreshold(), setup.BanDuration())
n.peerMonitor.Start() n.peerMonitor.Start()
} }
} }
......
...@@ -3,6 +3,7 @@ package p2p ...@@ -3,6 +3,7 @@ package p2p
import ( import (
"errors" "errors"
"fmt" "fmt"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
...@@ -80,6 +81,14 @@ func (p *Prepared) BanPeers() bool { ...@@ -80,6 +81,14 @@ func (p *Prepared) BanPeers() bool {
return false return false
} }
func (p *Prepared) BanThreshold() float64 {
return -100
}
func (p *Prepared) BanDuration() time.Duration {
return 1 * time.Hour
}
func (p *Prepared) TopicScoringParams() *pubsub.TopicScoreParams { func (p *Prepared) TopicScoringParams() *pubsub.TopicScoreParams {
return nil return nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment