Commit dcb24286 authored by Adrian Sutton's avatar Adrian Sutton

op-node: Add initial take on PeerMonitor

parent ce94c644
// Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// ConnectedPeers is an autogenerated mock type for the ConnectedPeers type
type ConnectedPeers struct {
mock.Mock
}
type ConnectedPeers_Expecter struct {
mock *mock.Mock
}
func (_m *ConnectedPeers) EXPECT() *ConnectedPeers_Expecter {
return &ConnectedPeers_Expecter{mock: &_m.Mock}
}
// ClosePeer provides a mock function with given fields: _a0
func (_m *ConnectedPeers) ClosePeer(_a0 peer.ID) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(peer.ID) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// ConnectedPeers_ClosePeer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClosePeer'
type ConnectedPeers_ClosePeer_Call struct {
*mock.Call
}
// ClosePeer is a helper method to define mock.On call
// - _a0 peer.ID
func (_e *ConnectedPeers_Expecter) ClosePeer(_a0 interface{}) *ConnectedPeers_ClosePeer_Call {
return &ConnectedPeers_ClosePeer_Call{Call: _e.mock.On("ClosePeer", _a0)}
}
func (_c *ConnectedPeers_ClosePeer_Call) Run(run func(_a0 peer.ID)) *ConnectedPeers_ClosePeer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID))
})
return _c
}
func (_c *ConnectedPeers_ClosePeer_Call) Return(_a0 error) *ConnectedPeers_ClosePeer_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *ConnectedPeers_ClosePeer_Call) RunAndReturn(run func(peer.ID) error) *ConnectedPeers_ClosePeer_Call {
_c.Call.Return(run)
return _c
}
// Peers provides a mock function with given fields:
func (_m *ConnectedPeers) Peers() []peer.ID {
ret := _m.Called()
var r0 []peer.ID
if rf, ok := ret.Get(0).(func() []peer.ID); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]peer.ID)
}
}
return r0
}
// ConnectedPeers_Peers_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Peers'
type ConnectedPeers_Peers_Call struct {
*mock.Call
}
// Peers is a helper method to define mock.On call
func (_e *ConnectedPeers_Expecter) Peers() *ConnectedPeers_Peers_Call {
return &ConnectedPeers_Peers_Call{Call: _e.mock.On("Peers")}
}
func (_c *ConnectedPeers_Peers_Call) Run(run func()) *ConnectedPeers_Peers_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *ConnectedPeers_Peers_Call) Return(_a0 []peer.ID) *ConnectedPeers_Peers_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *ConnectedPeers_Peers_Call) RunAndReturn(run func() []peer.ID) *ConnectedPeers_Peers_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewConnectedPeers interface {
mock.TestingT
Cleanup(func())
}
// NewConnectedPeers creates a new instance of ConnectedPeers. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewConnectedPeers(t mockConstructorTestingTNewConnectedPeers) *ConnectedPeers {
mock := &ConnectedPeers{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
peer "github.com/libp2p/go-libp2p/core/peer"
time "time"
)
// PeerBlocker is an autogenerated mock type for the PeerBlocker type
type PeerBlocker struct {
mock.Mock
}
type PeerBlocker_Expecter struct {
mock *mock.Mock
}
func (_m *PeerBlocker) EXPECT() *PeerBlocker_Expecter {
return &PeerBlocker_Expecter{mock: &_m.Mock}
}
// BanPeer provides a mock function with given fields: _a0, _a1
func (_m *PeerBlocker) BanPeer(_a0 peer.ID, _a1 time.Time) error {
ret := _m.Called(_a0, _a1)
var r0 error
if rf, ok := ret.Get(0).(func(peer.ID, time.Time) error); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Error(0)
}
return r0
}
// PeerBlocker_BanPeer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BanPeer'
type PeerBlocker_BanPeer_Call struct {
*mock.Call
}
// BanPeer is a helper method to define mock.On call
// - _a0 peer.ID
// - _a1 time.Time
func (_e *PeerBlocker_Expecter) BanPeer(_a0 interface{}, _a1 interface{}) *PeerBlocker_BanPeer_Call {
return &PeerBlocker_BanPeer_Call{Call: _e.mock.On("BanPeer", _a0, _a1)}
}
func (_c *PeerBlocker_BanPeer_Call) Run(run func(_a0 peer.ID, _a1 time.Time)) *PeerBlocker_BanPeer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID), args[1].(time.Time))
})
return _c
}
func (_c *PeerBlocker_BanPeer_Call) Return(_a0 error) *PeerBlocker_BanPeer_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *PeerBlocker_BanPeer_Call) RunAndReturn(run func(peer.ID, time.Time) error) *PeerBlocker_BanPeer_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewPeerBlocker interface {
mock.TestingT
Cleanup(func())
}
// NewPeerBlocker creates a new instance of PeerBlocker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewPeerBlocker(t mockConstructorTestingTNewPeerBlocker) *PeerBlocker {
mock := &PeerBlocker{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// PeerProtector is an autogenerated mock type for the PeerProtector type
type PeerProtector struct {
mock.Mock
}
type PeerProtector_Expecter struct {
mock *mock.Mock
}
func (_m *PeerProtector) EXPECT() *PeerProtector_Expecter {
return &PeerProtector_Expecter{mock: &_m.Mock}
}
// IsProtected provides a mock function with given fields: _a0
func (_m *PeerProtector) IsProtected(_a0 peer.ID) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(peer.ID) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// PeerProtector_IsProtected_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsProtected'
type PeerProtector_IsProtected_Call struct {
*mock.Call
}
// IsProtected is a helper method to define mock.On call
// - _a0 peer.ID
func (_e *PeerProtector_Expecter) IsProtected(_a0 interface{}) *PeerProtector_IsProtected_Call {
return &PeerProtector_IsProtected_Call{Call: _e.mock.On("IsProtected", _a0)}
}
func (_c *PeerProtector_IsProtected_Call) Run(run func(_a0 peer.ID)) *PeerProtector_IsProtected_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID))
})
return _c
}
func (_c *PeerProtector_IsProtected_Call) Return(_a0 bool) *PeerProtector_IsProtected_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *PeerProtector_IsProtected_Call) RunAndReturn(run func(peer.ID) bool) *PeerProtector_IsProtected_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewPeerProtector interface {
mock.TestingT
Cleanup(func())
}
// NewPeerProtector creates a new instance of PeerProtector. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewPeerProtector(t mockConstructorTestingTNewPeerProtector) *PeerProtector {
mock := &PeerProtector{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// Scores is an autogenerated mock type for the Scores type
type Scores struct {
mock.Mock
}
type Scores_Expecter struct {
mock *mock.Mock
}
func (_m *Scores) EXPECT() *Scores_Expecter {
return &Scores_Expecter{mock: &_m.Mock}
}
// GetPeerScore provides a mock function with given fields: id
func (_m *Scores) GetPeerScore(id peer.ID) (float64, error) {
ret := _m.Called(id)
var r0 float64
var r1 error
if rf, ok := ret.Get(0).(func(peer.ID) (float64, error)); ok {
return rf(id)
}
if rf, ok := ret.Get(0).(func(peer.ID) float64); ok {
r0 = rf(id)
} else {
r0 = ret.Get(0).(float64)
}
if rf, ok := ret.Get(1).(func(peer.ID) error); ok {
r1 = rf(id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Scores_GetPeerScore_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPeerScore'
type Scores_GetPeerScore_Call struct {
*mock.Call
}
// GetPeerScore is a helper method to define mock.On call
// - id peer.ID
func (_e *Scores_Expecter) GetPeerScore(id interface{}) *Scores_GetPeerScore_Call {
return &Scores_GetPeerScore_Call{Call: _e.mock.On("GetPeerScore", id)}
}
func (_c *Scores_GetPeerScore_Call) Run(run func(id peer.ID)) *Scores_GetPeerScore_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID))
})
return _c
}
func (_c *Scores_GetPeerScore_Call) Return(_a0 float64, _a1 error) *Scores_GetPeerScore_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *Scores_GetPeerScore_Call) RunAndReturn(run func(peer.ID) (float64, error)) *Scores_GetPeerScore_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewScores interface {
mock.TestingT
Cleanup(func())
}
// NewScores creates a new instance of Scores. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewScores(t mockConstructorTestingTNewScores) *Scores {
mock := &Scores{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
package monitor
import (
"context"
"fmt"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer"
)
const (
// Time delay between checking the score of each peer to avoid
checkInterval = 1 * time.Second
)
//go:generate mockery --name Scores --output mocks/ --with-expecter=true
type Scores interface {
GetPeerScore(id peer.ID) (float64, error)
}
// ConnectedPeers enables querying the set of currently connected peers and disconnecting peers
//
//go:generate mockery --name ConnectedPeers --output mocks/ --with-expecter=true
type ConnectedPeers interface {
Peers() []peer.ID
ClosePeer(peer.ID) error
}
//go:generate mockery --name PeerProtector --output mocks/ --with-expecter=true
type PeerProtector interface {
IsProtected(peer.ID) bool
}
//go:generate mockery --name PeerBlocker --output mocks/ --with-expecter=true
type PeerBlocker interface {
BanPeer(peer.ID, time.Time) error
}
type PeerMonitor struct {
ctx context.Context
cancelFn context.CancelFunc
l log.Logger
clock clock.Clock
peers ConnectedPeers
protector PeerProtector
scores Scores
blocker PeerBlocker
minScore float64
banDuration time.Duration
bgTasks sync.WaitGroup
// Used by checkNextPeer and must only be accessed from the background thread
peerList []peer.ID
nextPeerIdx int
}
func NewPeerMonitor(ctx context.Context, l log.Logger, clock clock.Clock, peers ConnectedPeers, protector PeerProtector, scores Scores, blocker PeerBlocker, minScore float64, banDuration time.Duration) *PeerMonitor {
ctx, cancelFn := context.WithCancel(ctx)
return &PeerMonitor{
ctx: ctx,
cancelFn: cancelFn,
l: l,
clock: clock,
peers: peers,
protector: protector,
scores: scores,
blocker: blocker,
minScore: minScore,
banDuration: banDuration,
}
}
func (k *PeerMonitor) Start() {
k.bgTasks.Add(1)
go k.background(k.checkNextPeer)
}
func (k *PeerMonitor) Stop() {
k.cancelFn()
k.bgTasks.Wait()
}
// checkNextPeer checks the next peer and disconnects and bans it if its score is too low and its not protected.
// The first call gets the list of current peers and checks the first one, then each subsequent call checks the next
// peer in the list. When the end of the list is reached, an updated list of connected peers is retrieved and the process
// starts again.
func (k *PeerMonitor) checkNextPeer() error {
// Get a new list of peers to check if we've checked all peers in the previous list
if k.nextPeerIdx >= len(k.peerList) {
k.peerList = k.peers.Peers()
k.nextPeerIdx = 0
}
id := k.peerList[k.nextPeerIdx]
k.nextPeerIdx++
score, err := k.scores.GetPeerScore(id)
if err != nil {
return fmt.Errorf("retrieve score for peer %v: %w", id, err)
}
if score > k.minScore {
return nil
}
if k.protector.IsProtected(id) {
return nil
}
if err := k.peers.ClosePeer(id); err != nil {
return fmt.Errorf("disconnecting peer %v: %w", id, err)
}
if err := k.blocker.BanPeer(id, k.clock.Now().Add(k.banDuration)); err != nil {
return fmt.Errorf("banning peer %v: %w", id, err)
}
return nil
}
// background is intended to run as a separate go routine. It will call the supplied action function every checkInterval
// until the context is done.
func (k *PeerMonitor) background(action func() error) {
defer k.bgTasks.Done()
ticker := k.clock.NewTicker(checkInterval)
defer ticker.Stop()
for {
select {
case <-k.ctx.Done():
return
case <-ticker.Ch():
if err := action(); err != nil {
k.l.Warn("Error while checking connected peer score", "err", err)
}
}
}
}
package monitor
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/monitor/mocks"
"github.com/ethereum-optimism/optimism/op-node/testlog"
clock2 "github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)
const kickerBanDuration = 10 * time.Minute
func peerKickerSetup(t *testing.T) (*PeerMonitor, *clock2.DeterministicClock, *mocks.ConnectedPeers, *mocks.PeerProtector, *mocks.Scores, *mocks.PeerBlocker) {
l := testlog.Logger(t, log.LvlInfo)
clock := clock2.NewDeterministicClock(time.UnixMilli(10000))
peers := mocks.NewConnectedPeers(t)
protector := mocks.NewPeerProtector(t)
blocker := mocks.NewPeerBlocker(t)
scores := mocks.NewScores(t)
kicker := NewPeerMonitor(context.Background(), l, clock, peers, protector, scores, blocker, -100, kickerBanDuration)
return kicker, clock, peers, protector, scores, blocker
}
func TestPeriodicallyCheckNextPeer(t *testing.T) {
kicker, clock, _, _, _, _ := peerKickerSetup(t)
// Each time a step is performed, it calls Done on the wait group so we can wait for it to be performed
stepCh := make(chan struct{}, 10)
kicker.bgTasks.Add(1)
var actionErr error
go kicker.background(func() error {
stepCh <- struct{}{}
return actionErr
})
defer kicker.Stop()
// Wait for the step ticker to be started
clock.WaitForNewPendingTaskWithTimeout(30 * time.Second)
// Should perform another step after each interval
for i := 0; i < 5; i++ {
clock.AdvanceTime(checkInterval)
waitForChan(t, stepCh, fmt.Sprintf("Did not perform step %v", i))
require.Len(t, stepCh, 0)
}
// Should continue executing periodically even after an error
actionErr = errors.New("boom")
for i := 0; i < 5; i++ {
clock.AdvanceTime(checkInterval)
waitForChan(t, stepCh, fmt.Sprintf("Did not perform step %v", i))
require.Len(t, stepCh, 0)
}
}
func TestCheckNextPeer(t *testing.T) {
peerIDs := []peer.ID{
peer.ID("a"),
peer.ID("b"),
peer.ID("c"),
}
t.Run("Check each peer then refresh list", func(t *testing.T) {
kicker, _, peers, _, scores, _ := peerKickerSetup(t)
peers.EXPECT().Peers().Return(peerIDs).Once()
for _, id := range peerIDs {
scores.EXPECT().GetPeerScore(id).Return(1, nil).Once()
require.NoError(t, kicker.checkNextPeer())
}
updatedPeers := []peer.ID{
peer.ID("x"),
peer.ID("y"),
peer.ID("z"),
peer.ID("a"),
}
peers.EXPECT().Peers().Return(updatedPeers).Once()
for _, id := range updatedPeers {
scores.EXPECT().GetPeerScore(id).Return(1, nil).Once()
require.NoError(t, kicker.checkNextPeer())
}
})
t.Run("Close and ban peer when below min score", func(t *testing.T) {
kicker, clock, peers, protector, scores, blocker := peerKickerSetup(t)
id := peerIDs[0]
peers.EXPECT().Peers().Return(peerIDs).Once()
scores.EXPECT().GetPeerScore(id).Return(-101, nil).Once()
protector.EXPECT().IsProtected(id).Return(false).Once()
peers.EXPECT().ClosePeer(id).Return(nil).Once()
blocker.EXPECT().BanPeer(id, clock.Now().Add(kickerBanDuration)).Return(nil).Once()
require.NoError(t, kicker.checkNextPeer())
})
t.Run("Do not close protected peer when below min score", func(t *testing.T) {
kicker, _, peers, protector, scores, _ := peerKickerSetup(t)
id := peerIDs[0]
peers.EXPECT().Peers().Return(peerIDs).Once()
scores.EXPECT().GetPeerScore(id).Return(-101, nil).Once()
protector.EXPECT().IsProtected(id).Return(true)
require.NoError(t, kicker.checkNextPeer())
})
}
func waitForChan(t *testing.T, ch chan struct{}, msg string) {
ctx, cancelFn := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFn()
select {
case <-ctx.Done():
t.Fatal(msg)
case <-ch:
// Ok
}
}
...@@ -8,14 +8,13 @@ import ( ...@@ -8,14 +8,13 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/libp2p/go-libp2p/core/peer"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating" "github.com/ethereum-optimism/optimism/op-node/p2p/gating"
"github.com/ethereum-optimism/optimism/op-node/p2p/monitor"
"github.com/ethereum-optimism/optimism/op-node/p2p/store" "github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
...@@ -25,17 +24,20 @@ import ( ...@@ -25,17 +24,20 @@ import (
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
p2pmetrics "github.com/libp2p/go-libp2p/core/metrics" p2pmetrics "github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
) )
// NodeP2P is a p2p node, which can be used to gossip messages. // NodeP2P is a p2p node, which can be used to gossip messages.
type NodeP2P struct { type NodeP2P struct {
host host.Host // p2p host (optional, may be nil) host host.Host // p2p host (optional, may be nil)
gater gating.BlockingConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled gater gating.BlockingConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled
scorer Scorer // writes score-updates to the peerstore and keeps metrics of score changes scorer Scorer // writes score-updates to the peerstore and keeps metrics of score changes
connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
store store.ExtendedPeerstore // peerstore of host, with extra bindings for scoring and banning store store.ExtendedPeerstore // peerstore of host, with extra bindings for scoring and banning
log log.Logger peerMonitor *monitor.PeerMonitor // peer monitor to disconnect bad peers
log log.Logger
// the below components are all optional, and may be nil. They require the host to not be nil. // the below components are all optional, and may be nil. They require the host to not be nil.
dv5Local *enode.LocalNode // p2p discovery identity dv5Local *enode.LocalNode // p2p discovery identity
dv5Udp *discover.UDPv5 // p2p discovery service dv5Udp *discover.UDPv5 // p2p discovery service
...@@ -124,6 +126,8 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -124,6 +126,8 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.scorer.OnDisconnect(conn.RemotePeer()) n.scorer.OnDisconnect(conn.RemotePeer())
}, },
}) })
n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n.host.Network(), n.connMgr, eps, n, -100, 1*time.Hour)
n.peerMonitor.Start()
// notify of any new connections/streams/etc. // notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics)) n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled. // note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
...@@ -226,6 +230,7 @@ func (n *NodeP2P) BanIP(ip net.IP, expiration time.Time) error { ...@@ -226,6 +230,7 @@ func (n *NodeP2P) BanIP(ip net.IP, expiration time.Time) error {
func (n *NodeP2P) Close() error { func (n *NodeP2P) Close() error {
var result *multierror.Error var result *multierror.Error
n.peerMonitor.Stop()
if n.dv5Udp != nil { if n.dv5Udp != nil {
n.dv5Udp.Close() n.dv5Udp.Close()
} }
......
...@@ -136,6 +136,12 @@ func (s *DeterministicClock) addPending(t action) { ...@@ -136,6 +136,12 @@ func (s *DeterministicClock) addPending(t action) {
} }
} }
func (s *DeterministicClock) WaitForNewPendingTaskWithTimeout(timeout time.Duration) bool {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return s.WaitForNewPendingTask(ctx)
}
// WaitForNewPendingTask blocks until a new task is scheduled since the last time this method was called. // WaitForNewPendingTask blocks until a new task is scheduled since the last time this method was called.
// true is returned if a new task was scheduled, false if the context completed before a new task was added. // true is returned if a new task was scheduled, false if the context completed before a new task was added.
func (s *DeterministicClock) WaitForNewPendingTask(ctx context.Context) bool { func (s *DeterministicClock) WaitForNewPendingTask(ctx context.Context) bool {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment