1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package monitor
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/monitor/mocks"
"github.com/ethereum-optimism/optimism/op-node/testlog"
clock2 "github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)
const testBanDuration = 2 * time.Hour
func peerMonitorSetup(t *testing.T) (*PeerMonitor, *clock2.DeterministicClock, *mocks.PeerManager) {
l := testlog.Logger(t, log.LvlInfo)
clock := clock2.NewDeterministicClock(time.UnixMilli(10000))
manager := mocks.NewPeerManager(t)
monitor := NewPeerMonitor(context.Background(), l, clock, manager, -100, testBanDuration)
return monitor, clock, manager
}
func TestPeriodicallyCheckNextPeer(t *testing.T) {
monitor, clock, _ := peerMonitorSetup(t)
// Each time a step is performed, it calls Done on the wait group so we can wait for it to be performed
stepCh := make(chan struct{}, 10)
monitor.bgTasks.Add(1)
var actionErr error
go monitor.background(func() error {
stepCh <- struct{}{}
return actionErr
})
defer monitor.Stop()
// Wait for the step ticker to be started
clock.WaitForNewPendingTaskWithTimeout(30 * time.Second)
// Should perform another step after each interval
for i := 0; i < 5; i++ {
clock.AdvanceTime(checkInterval)
waitForChan(t, stepCh, fmt.Sprintf("Did not perform step %v", i))
require.Len(t, stepCh, 0)
}
// Should continue executing periodically even after an error
actionErr = errors.New("boom")
for i := 0; i < 5; i++ {
clock.AdvanceTime(checkInterval)
waitForChan(t, stepCh, fmt.Sprintf("Did not perform step %v", i))
require.Len(t, stepCh, 0)
}
}
func TestCheckNextPeer(t *testing.T) {
peerIDs := []peer.ID{
peer.ID("a"),
peer.ID("b"),
peer.ID("c"),
}
t.Run("No peers", func(t *testing.T) {
monitor, _, manager := peerMonitorSetup(t)
manager.EXPECT().Peers().Return(nil).Once()
require.NoError(t, monitor.checkNextPeer())
})
t.Run("Check each peer then refresh list", func(t *testing.T) {
monitor, _, manager := peerMonitorSetup(t)
manager.EXPECT().Peers().Return(peerIDs).Once()
for _, id := range peerIDs {
manager.EXPECT().GetPeerScore(id).Return(1, nil).Once()
require.NoError(t, monitor.checkNextPeer())
}
updatedPeers := []peer.ID{
peer.ID("x"),
peer.ID("y"),
peer.ID("z"),
peer.ID("a"),
}
manager.EXPECT().Peers().Return(updatedPeers).Once()
for _, id := range updatedPeers {
manager.EXPECT().GetPeerScore(id).Return(1, nil).Once()
require.NoError(t, monitor.checkNextPeer())
}
})
t.Run("Close and ban peer when below min score", func(t *testing.T) {
monitor, clock, manager := peerMonitorSetup(t)
id := peerIDs[0]
manager.EXPECT().Peers().Return(peerIDs).Once()
manager.EXPECT().GetPeerScore(id).Return(-101, nil).Once()
manager.EXPECT().IsStatic(id).Return(false).Once()
manager.EXPECT().BanPeer(id, clock.Now().Add(testBanDuration)).Return(nil).Once()
require.NoError(t, monitor.checkNextPeer())
})
t.Run("Do not close protected peer when below min score", func(t *testing.T) {
monitor, _, manager := peerMonitorSetup(t)
id := peerIDs[0]
manager.EXPECT().Peers().Return(peerIDs).Once()
manager.EXPECT().GetPeerScore(id).Return(-101, nil).Once()
manager.EXPECT().IsStatic(id).Return(true)
require.NoError(t, monitor.checkNextPeer())
})
}
func waitForChan(t *testing.T, ch chan struct{}, msg string) {
ctx, cancelFn := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFn()
select {
case <-ctx.Done():
t.Fatal(msg)
case <-ch:
// Ok
}
}