Commit 60b81b39 authored by Adrian Sutton's avatar Adrian Sutton

op-node: Implement peer manager requirement methods on P2PNode and remove adapter.

parent 9f10c3e5
package monitor
import (
"fmt"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)
// PeerManagerAdapter implements the PeerManager interface by delegating to a variety of different p2p components
type PeerManagerAdapter struct {
n network.Network
connMgr connmgr.ConnManager
scores store.ScoreDatastore
// TODO: something to do banning but its not merged yet...
}
func NewPeerManagerAdapter(n network.Network, connMgr connmgr.ConnManager, scores store.ScoreDatastore) *PeerManagerAdapter {
return &PeerManagerAdapter{
n: n,
connMgr: connMgr,
scores: scores,
}
}
func (p *PeerManagerAdapter) Peers() []peer.ID {
return p.n.Peers()
}
func (p *PeerManagerAdapter) GetPeerScore(id peer.ID) (float64, error) {
scores, err := p.scores.GetPeerScores(id)
if err != nil {
return 0, err
}
return scores.Gossip.Total, nil
}
func (p *PeerManagerAdapter) IsProtected(id peer.ID) bool {
if p.connMgr == nil {
return false
}
// TODO: Need a constant for the tag somewhere
return p.connMgr.IsProtected(id, "static")
}
func (p *PeerManagerAdapter) ClosePeer(id peer.ID) error {
return p.n.ClosePeer(id)
}
func (p *PeerManagerAdapter) BanPeer(id peer.ID, banDuration time.Time) error {
//TODO implement me
return fmt.Errorf("peer banning not implemented")
}
var _ PeerManager = (*PeerManagerAdapter)(nil)
...@@ -66,48 +66,6 @@ func (_c *PeerManager_BanPeer_Call) RunAndReturn(run func(peer.ID, time.Time) er ...@@ -66,48 +66,6 @@ func (_c *PeerManager_BanPeer_Call) RunAndReturn(run func(peer.ID, time.Time) er
return _c return _c
} }
// ClosePeer provides a mock function with given fields: _a0
func (_m *PeerManager) ClosePeer(_a0 peer.ID) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(peer.ID) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// PeerManager_ClosePeer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClosePeer'
type PeerManager_ClosePeer_Call struct {
*mock.Call
}
// ClosePeer is a helper method to define mock.On call
// - _a0 peer.ID
func (_e *PeerManager_Expecter) ClosePeer(_a0 interface{}) *PeerManager_ClosePeer_Call {
return &PeerManager_ClosePeer_Call{Call: _e.mock.On("ClosePeer", _a0)}
}
func (_c *PeerManager_ClosePeer_Call) Run(run func(_a0 peer.ID)) *PeerManager_ClosePeer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID))
})
return _c
}
func (_c *PeerManager_ClosePeer_Call) Return(_a0 error) *PeerManager_ClosePeer_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *PeerManager_ClosePeer_Call) RunAndReturn(run func(peer.ID) error) *PeerManager_ClosePeer_Call {
_c.Call.Return(run)
return _c
}
// GetPeerScore provides a mock function with given fields: id // GetPeerScore provides a mock function with given fields: id
func (_m *PeerManager) GetPeerScore(id peer.ID) (float64, error) { func (_m *PeerManager) GetPeerScore(id peer.ID) (float64, error) {
ret := _m.Called(id) ret := _m.Called(id)
...@@ -161,7 +119,7 @@ func (_c *PeerManager_GetPeerScore_Call) RunAndReturn(run func(peer.ID) (float64 ...@@ -161,7 +119,7 @@ func (_c *PeerManager_GetPeerScore_Call) RunAndReturn(run func(peer.ID) (float64
} }
// IsProtected provides a mock function with given fields: _a0 // IsProtected provides a mock function with given fields: _a0
func (_m *PeerManager) IsProtected(_a0 peer.ID) bool { func (_m *PeerManager) IsStatic(_a0 peer.ID) bool {
ret := _m.Called(_a0) ret := _m.Called(_a0)
var r0 bool var r0 bool
...@@ -174,7 +132,7 @@ func (_m *PeerManager) IsProtected(_a0 peer.ID) bool { ...@@ -174,7 +132,7 @@ func (_m *PeerManager) IsProtected(_a0 peer.ID) bool {
return r0 return r0
} }
// PeerManager_IsProtected_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsProtected' // PeerManager_IsProtected_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsStatic'
type PeerManager_IsProtected_Call struct { type PeerManager_IsProtected_Call struct {
*mock.Call *mock.Call
} }
...@@ -182,7 +140,7 @@ type PeerManager_IsProtected_Call struct { ...@@ -182,7 +140,7 @@ type PeerManager_IsProtected_Call struct {
// IsProtected is a helper method to define mock.On call // IsProtected is a helper method to define mock.On call
// - _a0 peer.ID // - _a0 peer.ID
func (_e *PeerManager_Expecter) IsProtected(_a0 interface{}) *PeerManager_IsProtected_Call { func (_e *PeerManager_Expecter) IsProtected(_a0 interface{}) *PeerManager_IsProtected_Call {
return &PeerManager_IsProtected_Call{Call: _e.mock.On("IsProtected", _a0)} return &PeerManager_IsProtected_Call{Call: _e.mock.On("IsStatic", _a0)}
} }
func (_c *PeerManager_IsProtected_Call) Run(run func(_a0 peer.ID)) *PeerManager_IsProtected_Call { func (_c *PeerManager_IsProtected_Call) Run(run func(_a0 peer.ID)) *PeerManager_IsProtected_Call {
......
...@@ -20,9 +20,8 @@ const ( ...@@ -20,9 +20,8 @@ const (
type PeerManager interface { type PeerManager interface {
Peers() []peer.ID Peers() []peer.ID
GetPeerScore(id peer.ID) (float64, error) GetPeerScore(id peer.ID) (float64, error)
IsProtected(peer.ID) bool IsStatic(peer.ID) bool
// TODO: Consider combining Close and Ban into a single call and have the adapter deal with the two calls // BanPeer bans the peer until the specified time and disconnects any existing connections.
ClosePeer(peer.ID) error
BanPeer(peer.ID, time.Time) error BanPeer(peer.ID, time.Time) error
} }
...@@ -77,6 +76,10 @@ func (k *PeerMonitor) checkNextPeer() error { ...@@ -77,6 +76,10 @@ func (k *PeerMonitor) checkNextPeer() error {
k.peerList = k.manager.Peers() k.peerList = k.manager.Peers()
k.nextPeerIdx = 0 k.nextPeerIdx = 0
} }
if len(k.peerList) == 0 {
// No peers to check
return nil
}
id := k.peerList[k.nextPeerIdx] id := k.peerList[k.nextPeerIdx]
k.nextPeerIdx++ k.nextPeerIdx++
score, err := k.manager.GetPeerScore(id) score, err := k.manager.GetPeerScore(id)
...@@ -86,12 +89,9 @@ func (k *PeerMonitor) checkNextPeer() error { ...@@ -86,12 +89,9 @@ func (k *PeerMonitor) checkNextPeer() error {
if score > k.minScore { if score > k.minScore {
return nil return nil
} }
if k.manager.IsProtected(id) { if k.manager.IsStatic(id) {
return nil return nil
} }
if err := k.manager.ClosePeer(id); err != nil {
return fmt.Errorf("disconnecting peer %v: %w", id, err)
}
if err := k.manager.BanPeer(id, k.clock.Now().Add(k.banDuration)); err != nil { if err := k.manager.BanPeer(id, k.clock.Now().Add(k.banDuration)); err != nil {
return fmt.Errorf("banning peer %v: %w", id, err) return fmt.Errorf("banning peer %v: %w", id, err)
} }
......
...@@ -15,27 +15,27 @@ import ( ...@@ -15,27 +15,27 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
const kickerBanDuration = 10 * time.Minute const monitorBanDuration = 10 * time.Minute
func peerKickerSetup(t *testing.T) (*PeerMonitor, *clock2.DeterministicClock, *mocks.PeerManager) { func peerMonitorSetup(t *testing.T) (*PeerMonitor, *clock2.DeterministicClock, *mocks.PeerManager) {
l := testlog.Logger(t, log.LvlInfo) l := testlog.Logger(t, log.LvlInfo)
clock := clock2.NewDeterministicClock(time.UnixMilli(10000)) clock := clock2.NewDeterministicClock(time.UnixMilli(10000))
manager := mocks.NewPeerManager(t) manager := mocks.NewPeerManager(t)
kicker := NewPeerMonitor(context.Background(), l, clock, manager, -100, kickerBanDuration) monitor := NewPeerMonitor(context.Background(), l, clock, manager, -100, monitorBanDuration)
return kicker, clock, manager return monitor, clock, manager
} }
func TestPeriodicallyCheckNextPeer(t *testing.T) { func TestPeriodicallyCheckNextPeer(t *testing.T) {
kicker, clock, _ := peerKickerSetup(t) monitor, clock, _ := peerMonitorSetup(t)
// Each time a step is performed, it calls Done on the wait group so we can wait for it to be performed // Each time a step is performed, it calls Done on the wait group so we can wait for it to be performed
stepCh := make(chan struct{}, 10) stepCh := make(chan struct{}, 10)
kicker.bgTasks.Add(1) monitor.bgTasks.Add(1)
var actionErr error var actionErr error
go kicker.background(func() error { go monitor.background(func() error {
stepCh <- struct{}{} stepCh <- struct{}{}
return actionErr return actionErr
}) })
defer kicker.Stop() defer monitor.Stop()
// Wait for the step ticker to be started // Wait for the step ticker to be started
clock.WaitForNewPendingTaskWithTimeout(30 * time.Second) clock.WaitForNewPendingTaskWithTimeout(30 * time.Second)
...@@ -62,13 +62,19 @@ func TestCheckNextPeer(t *testing.T) { ...@@ -62,13 +62,19 @@ func TestCheckNextPeer(t *testing.T) {
peer.ID("c"), peer.ID("c"),
} }
t.Run("No peers", func(t *testing.T) {
monitor, _, manager := peerMonitorSetup(t)
manager.EXPECT().Peers().Return(nil).Once()
require.NoError(t, monitor.checkNextPeer())
})
t.Run("Check each peer then refresh list", func(t *testing.T) { t.Run("Check each peer then refresh list", func(t *testing.T) {
kicker, _, manager := peerKickerSetup(t) monitor, _, manager := peerMonitorSetup(t)
manager.EXPECT().Peers().Return(peerIDs).Once() manager.EXPECT().Peers().Return(peerIDs).Once()
for _, id := range peerIDs { for _, id := range peerIDs {
manager.EXPECT().GetPeerScore(id).Return(1, nil).Once() manager.EXPECT().GetPeerScore(id).Return(1, nil).Once()
require.NoError(t, kicker.checkNextPeer()) require.NoError(t, monitor.checkNextPeer())
} }
updatedPeers := []peer.ID{ updatedPeers := []peer.ID{
...@@ -81,30 +87,29 @@ func TestCheckNextPeer(t *testing.T) { ...@@ -81,30 +87,29 @@ func TestCheckNextPeer(t *testing.T) {
for _, id := range updatedPeers { for _, id := range updatedPeers {
manager.EXPECT().GetPeerScore(id).Return(1, nil).Once() manager.EXPECT().GetPeerScore(id).Return(1, nil).Once()
require.NoError(t, kicker.checkNextPeer()) require.NoError(t, monitor.checkNextPeer())
} }
}) })
t.Run("Close and ban peer when below min score", func(t *testing.T) { t.Run("Close and ban peer when below min score", func(t *testing.T) {
kicker, clock, manager := peerKickerSetup(t) monitor, clock, manager := peerMonitorSetup(t)
id := peerIDs[0] id := peerIDs[0]
manager.EXPECT().Peers().Return(peerIDs).Once() manager.EXPECT().Peers().Return(peerIDs).Once()
manager.EXPECT().GetPeerScore(id).Return(-101, nil).Once() manager.EXPECT().GetPeerScore(id).Return(-101, nil).Once()
manager.EXPECT().IsProtected(id).Return(false).Once() manager.EXPECT().IsProtected(id).Return(false).Once()
manager.EXPECT().ClosePeer(id).Return(nil).Once() manager.EXPECT().BanPeer(id, clock.Now().Add(monitorBanDuration)).Return(nil).Once()
manager.EXPECT().BanPeer(id, clock.Now().Add(kickerBanDuration)).Return(nil).Once()
require.NoError(t, kicker.checkNextPeer()) require.NoError(t, monitor.checkNextPeer())
}) })
t.Run("Do not close protected peer when below min score", func(t *testing.T) { t.Run("Do not close protected peer when below min score", func(t *testing.T) {
kicker, _, manager := peerKickerSetup(t) monitor, _, manager := peerMonitorSetup(t)
id := peerIDs[0] id := peerIDs[0]
manager.EXPECT().Peers().Return(peerIDs).Once() manager.EXPECT().Peers().Return(peerIDs).Once()
manager.EXPECT().GetPeerScore(id).Return(-101, nil).Once() manager.EXPECT().GetPeerScore(id).Return(-101, nil).Once()
manager.EXPECT().IsProtected(id).Return(true) manager.EXPECT().IsProtected(id).Return(true)
require.NoError(t, kicker.checkNextPeer()) require.NoError(t, monitor.checkNextPeer())
}) })
} }
......
...@@ -126,11 +126,9 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -126,11 +126,9 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.scorer.OnDisconnect(conn.RemotePeer()) n.scorer.OnDisconnect(conn.RemotePeer())
}, },
}) })
peerManager := monitor.NewPeerManagerAdapter(n.host.Network(), n.connMgr, eps)
// TODO: minScore shouldn't just be hard coded here // TODO: minScore shouldn't just be hard coded here
// TODO: Ban duration needs to be set sensibly and probably should be a magic number here // TODO: Ban duration needs to be set sensibly and probably shouldn't be a magic number here
n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, peerManager, -100, 1*time.Hour) n.peerMonitor = monitor.NewPeerMonitor(resourcesCtx, log, clock.SystemClock, n, -100, 1*time.Hour)
n.peerMonitor.Start()
// notify of any new connections/streams/etc. // notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics)) n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled. // note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
...@@ -158,6 +156,8 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l ...@@ -158,6 +156,8 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
if metrics != nil { if metrics != nil {
go metrics.RecordBandwidth(resourcesCtx, bwc) go metrics.RecordBandwidth(resourcesCtx, bwc)
} }
n.peerMonitor.Start()
} }
return nil return nil
} }
...@@ -201,6 +201,23 @@ func (n *NodeP2P) ConnectionManager() connmgr.ConnManager { ...@@ -201,6 +201,23 @@ func (n *NodeP2P) ConnectionManager() connmgr.ConnManager {
return n.connMgr return n.connMgr
} }
func (n *NodeP2P) Peers() []peer.ID {
return n.host.Network().Peers()
}
func (n *NodeP2P) GetPeerScore(id peer.ID) (float64, error) {
scores, err := n.store.GetPeerScores(id)
if err != nil {
return 0, err
}
return scores.Gossip.Total, nil
}
func (n *NodeP2P) IsStatic(id peer.ID) bool {
// TODO: "static" constant should be shared with host layer rather than hard-coded here
return n.connMgr != nil && n.connMgr.IsProtected(id, "static")
}
func (n *NodeP2P) BanPeer(id peer.ID, expiration time.Time) error { func (n *NodeP2P) BanPeer(id peer.ID, expiration time.Time) error {
if err := n.store.SetPeerBanExpiration(id, expiration); err != nil { if err := n.store.SetPeerBanExpiration(id, expiration); err != nil {
return fmt.Errorf("failed to set peer ban expiry: %w", err) return fmt.Errorf("failed to set peer ban expiry: %w", err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment