1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package monitor
import (
"context"
"fmt"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer"
)
const (
// Time delay between checking the score of each peer to avoid activity spikes
checkInterval = 1 * time.Second
)
//go:generate mockery --name PeerManager --output mocks/ --with-expecter=true
type PeerManager interface {
Peers() []peer.ID
GetPeerScore(id peer.ID) (float64, error)
IsStatic(peer.ID) bool
// BanPeer bans the peer until the specified time and disconnects any existing connections.
BanPeer(peer.ID, time.Time) error
}
// PeerMonitor runs a background process to periodically check for peers with scores below a minimum.
// When it finds bad peers, it disconnects and bans them.
// A delay is introduced between each peer being checked to avoid spikes in system load.
type PeerMonitor struct {
ctx context.Context
cancelFn context.CancelFunc
l log.Logger
clock clock.Clock
manager PeerManager
minScore float64
banDuration time.Duration
bgTasks sync.WaitGroup
// Used by checkNextPeer and must only be accessed from the background thread
peerList []peer.ID
nextPeerIdx int
}
func NewPeerMonitor(ctx context.Context, l log.Logger, clock clock.Clock, manager PeerManager, minScore float64, banDuration time.Duration) *PeerMonitor {
ctx, cancelFn := context.WithCancel(ctx)
return &PeerMonitor{
ctx: ctx,
cancelFn: cancelFn,
l: l,
clock: clock,
manager: manager,
minScore: minScore,
banDuration: banDuration,
}
}
func (p *PeerMonitor) Start() {
p.bgTasks.Add(1)
go p.background(p.checkNextPeer)
}
func (p *PeerMonitor) Stop() {
p.cancelFn()
p.bgTasks.Wait()
}
// checkNextPeer checks the next peer and disconnects and bans it if its score is too low and its not protected.
// The first call gets the list of current peers and checks the first one, then each subsequent call checks the next
// peer in the list. When the end of the list is reached, an updated list of connected peers is retrieved and the process
// starts again.
func (p *PeerMonitor) checkNextPeer() error {
// Get a new list of peers to check if we've checked all peers in the previous list
if p.nextPeerIdx >= len(p.peerList) {
p.peerList = p.manager.Peers()
p.nextPeerIdx = 0
}
if len(p.peerList) == 0 {
// No peers to check
return nil
}
id := p.peerList[p.nextPeerIdx]
p.nextPeerIdx++
score, err := p.manager.GetPeerScore(id)
if err != nil {
return fmt.Errorf("retrieve score for peer %v: %w", id, err)
}
if score >= p.minScore {
return nil
}
if p.manager.IsStatic(id) {
return nil
}
if err := p.manager.BanPeer(id, p.clock.Now().Add(p.banDuration)); err != nil {
return fmt.Errorf("banning peer %v: %w", id, err)
}
return nil
}
// background is intended to run as a separate go routine. It will call the supplied action function every checkInterval
// until the context is done.
func (p *PeerMonitor) background(action func() error) {
defer p.bgTasks.Done()
ticker := p.clock.NewTicker(checkInterval)
defer ticker.Stop()
for {
select {
case <-p.ctx.Done():
return
case <-ticker.Ch():
if err := action(); err != nil {
p.l.Warn("Error while checking connected peer score", "err", err)
}
}
}
}