peer_monitor_test.go 3.66 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
package monitor

import (
	"context"
	"errors"
	"fmt"
	"testing"
	"time"

	"github.com/ethereum-optimism/optimism/op-node/p2p/monitor/mocks"
	clock2 "github.com/ethereum-optimism/optimism/op-service/clock"
12
	"github.com/ethereum-optimism/optimism/op-service/testlog"
13 14 15 16 17
	"github.com/ethereum/go-ethereum/log"
	"github.com/libp2p/go-libp2p/core/peer"
	"github.com/stretchr/testify/require"
)

18 19
const testBanDuration = 2 * time.Hour

20
func peerMonitorSetup(t *testing.T) (*PeerMonitor, *clock2.DeterministicClock, *mocks.PeerManager) {
21
	l := testlog.Logger(t, log.LevelInfo)
22
	clock := clock2.NewDeterministicClock(time.UnixMilli(10000))
23
	manager := mocks.NewPeerManager(t)
24
	monitor := NewPeerMonitor(context.Background(), l, clock, manager, -100, testBanDuration)
25
	return monitor, clock, manager
26 27 28
}

func TestPeriodicallyCheckNextPeer(t *testing.T) {
29
	monitor, clock, _ := peerMonitorSetup(t)
30 31
	// Each time a step is performed, it calls Done on the wait group so we can wait for it to be performed
	stepCh := make(chan struct{}, 10)
32
	monitor.bgTasks.Add(1)
33
	actionErr := make(chan error, 1)
34
	go monitor.background(func() error {
35
		stepCh <- struct{}{}
36 37 38 39 40 41
		select {
		case err := <-actionErr:
			return err
		default:
			return nil
		}
42
	})
43
	defer monitor.Stop()
44 45 46 47 48 49 50 51 52 53 54
	// Wait for the step ticker to be started
	clock.WaitForNewPendingTaskWithTimeout(30 * time.Second)

	// Should perform another step after each interval
	for i := 0; i < 5; i++ {
		clock.AdvanceTime(checkInterval)
		waitForChan(t, stepCh, fmt.Sprintf("Did not perform step %v", i))
		require.Len(t, stepCh, 0)
	}

	// Should continue executing periodically even after an error
55
	actionErr <- errors.New("boom")
56 57 58 59 60 61 62 63 64 65 66 67 68 69
	for i := 0; i < 5; i++ {
		clock.AdvanceTime(checkInterval)
		waitForChan(t, stepCh, fmt.Sprintf("Did not perform step %v", i))
		require.Len(t, stepCh, 0)
	}
}

func TestCheckNextPeer(t *testing.T) {
	peerIDs := []peer.ID{
		peer.ID("a"),
		peer.ID("b"),
		peer.ID("c"),
	}

70 71 72 73 74 75
	t.Run("No peers", func(t *testing.T) {
		monitor, _, manager := peerMonitorSetup(t)
		manager.EXPECT().Peers().Return(nil).Once()
		require.NoError(t, monitor.checkNextPeer())
	})

76
	t.Run("Check each peer then refresh list", func(t *testing.T) {
77
		monitor, _, manager := peerMonitorSetup(t)
78
		manager.EXPECT().Peers().Return(peerIDs).Once()
79
		for _, id := range peerIDs {
80
			manager.EXPECT().GetPeerScore(id).Return(1, nil).Once()
81

82
			require.NoError(t, monitor.checkNextPeer())
83 84 85 86 87 88 89 90
		}

		updatedPeers := []peer.ID{
			peer.ID("x"),
			peer.ID("y"),
			peer.ID("z"),
			peer.ID("a"),
		}
91
		manager.EXPECT().Peers().Return(updatedPeers).Once()
92
		for _, id := range updatedPeers {
93
			manager.EXPECT().GetPeerScore(id).Return(1, nil).Once()
94

95
			require.NoError(t, monitor.checkNextPeer())
96 97 98 99
		}
	})

	t.Run("Close and ban peer when below min score", func(t *testing.T) {
100
		monitor, clock, manager := peerMonitorSetup(t)
101
		id := peerIDs[0]
102 103
		manager.EXPECT().Peers().Return(peerIDs).Once()
		manager.EXPECT().GetPeerScore(id).Return(-101, nil).Once()
104
		manager.EXPECT().IsStatic(id).Return(false).Once()
105
		manager.EXPECT().BanPeer(id, clock.Now().Add(testBanDuration)).Return(nil).Once()
106

107
		require.NoError(t, monitor.checkNextPeer())
108 109 110
	})

	t.Run("Do not close protected peer when below min score", func(t *testing.T) {
111
		monitor, _, manager := peerMonitorSetup(t)
112
		id := peerIDs[0]
113 114
		manager.EXPECT().Peers().Return(peerIDs).Once()
		manager.EXPECT().GetPeerScore(id).Return(-101, nil).Once()
115
		manager.EXPECT().IsStatic(id).Return(true)
116

117
		require.NoError(t, monitor.checkNextPeer())
118 119 120 121 122 123 124 125 126 127 128 129 130
	})
}

func waitForChan(t *testing.T, ch chan struct{}, msg string) {
	ctx, cancelFn := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancelFn()
	select {
	case <-ctx.Done():
		t.Fatal(msg)
	case <-ch:
		// Ok
	}
}