Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
6a3cc606
Unverified
Commit
6a3cc606
authored
Jun 02, 2023
by
OptimismBot
Committed by
GitHub
Jun 02, 2023
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #5779 from ethereum-optimism/aj/kick-bad-peers
op-node: Add peer monitor to kick/ban peers with low scores
parents
a48e53c1
14c4c50b
Changes
11
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
555 additions
and
18 deletions
+555
-18
p2p_flags.go
op-node/flags/p2p_flags.go
+16
-0
load_config.go
op-node/p2p/cli/load_config.go
+6
-5
config.go
op-node/p2p/config.go
+16
-1
gossip.go
op-node/p2p/gossip.go
+0
-2
host.go
op-node/p2p/host.go
+5
-1
PeerManager.go
op-node/p2p/monitor/mocks/PeerManager.go
+219
-0
peer_monitor.go
op-node/p2p/monitor/peer_monitor.go
+118
-0
peer_monitor_test.go
op-node/p2p/monitor/peer_monitor_test.go
+125
-0
node.go
op-node/p2p/node.go
+35
-9
prepared.go
op-node/p2p/prepared.go
+9
-0
deterministic.go
op-service/clock/deterministic.go
+6
-0
No files found.
op-node/flags/p2p_flags.go
View file @
6a3cc606
...
...
@@ -51,6 +51,20 @@ var (
Required
:
false
,
EnvVar
:
p2pEnv
(
"PEER_BANNING"
),
}
BanningThreshold
=
cli
.
Float64Flag
{
Name
:
"p2p.ban.threshold"
,
Usage
:
"The minimum score below which peers are disconnected and banned."
,
Required
:
false
,
Value
:
-
100
,
EnvVar
:
p2pEnv
(
"PEER_BANNING_THRESHOLD"
),
}
BanningDuration
=
cli
.
DurationFlag
{
Name
:
"p2p.ban.duration"
,
Usage
:
"The duration that peers are banned for."
,
Required
:
false
,
Value
:
1
*
time
.
Hour
,
EnvVar
:
p2pEnv
(
"PEER_BANNING_DURATION"
),
}
TopicScoring
=
cli
.
StringFlag
{
Name
:
"p2p.scoring.topics"
,
...
...
@@ -294,6 +308,8 @@ var p2pFlags = []cli.Flag{
PeerScoring
,
PeerScoreBands
,
Banning
,
BanningThreshold
,
BanningDuration
,
TopicScoring
,
ListenIP
,
ListenTCPPort
,
...
...
op-node/p2p/cli/load_config.go
View file @
6a3cc606
...
...
@@ -62,7 +62,7 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) {
return
nil
,
fmt
.
Errorf
(
"failed to load p2p peer score bands: %w"
,
err
)
}
if
err
:=
loadBanningOption
(
conf
,
ctx
);
err
!=
nil
{
if
err
:=
loadBanningOption
s
(
conf
,
ctx
);
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to load banning option: %w"
,
err
)
}
...
...
@@ -135,10 +135,11 @@ func loadPeerScoreBands(conf *p2p.Config, ctx *cli.Context) error {
return
nil
}
// loadBanningOption loads whether or not to ban peers from the CLI context.
func
loadBanningOption
(
conf
*
p2p
.
Config
,
ctx
*
cli
.
Context
)
error
{
ban
:=
ctx
.
GlobalBool
(
flags
.
Banning
.
Name
)
conf
.
BanningEnabled
=
ban
// loadBanningOptions loads whether or not to ban peers from the CLI context.
func
loadBanningOptions
(
conf
*
p2p
.
Config
,
ctx
*
cli
.
Context
)
error
{
conf
.
BanningEnabled
=
ctx
.
GlobalBool
(
flags
.
Banning
.
Name
)
conf
.
BanningThreshold
=
ctx
.
GlobalFloat64
(
flags
.
BanningThreshold
.
Name
)
conf
.
BanningDuration
=
ctx
.
GlobalDuration
(
flags
.
BanningDuration
.
Name
)
return
nil
}
...
...
op-node/p2p/config.go
View file @
6a3cc606
...
...
@@ -44,6 +44,10 @@ type SetupP2P interface {
// Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled.
Discovery
(
log
log
.
Logger
,
rollupCfg
*
rollup
.
Config
,
tcpPort
uint16
)
(
*
enode
.
LocalNode
,
*
discover
.
UDPv5
,
error
)
TargetPeers
()
uint
BanPeers
()
bool
BanThreshold
()
float64
BanDuration
()
time
.
Duration
PeerBandScorer
()
*
BandScoreThresholds
GossipSetupConfigurables
ReqRespSyncEnabled
()
bool
}
...
...
@@ -66,8 +70,11 @@ type Config struct {
// Peer Score Band Thresholds
BandScoreThresholds
BandScoreThresholds
// Whether to ban peers based on their [PeerScoring] score.
// Whether to ban peers based on their [PeerScoring] score.
Should be negative.
BanningEnabled
bool
// Minimum score before peers are disconnected and banned
BanningThreshold
float64
BanningDuration
time
.
Duration
ListenIP
net
.
IP
ListenTCPPort
uint16
...
...
@@ -143,6 +150,14 @@ func (conf *Config) BanPeers() bool {
return
conf
.
BanningEnabled
}
func
(
conf
*
Config
)
BanThreshold
()
float64
{
return
conf
.
BanningThreshold
}
func
(
conf
*
Config
)
BanDuration
()
time
.
Duration
{
return
conf
.
BanningDuration
}
func
(
conf
*
Config
)
TopicScoringParams
()
*
pubsub
.
TopicScoreParams
{
return
&
conf
.
TopicScoring
}
...
...
op-node/p2p/gossip.go
View file @
6a3cc606
...
...
@@ -53,10 +53,8 @@ var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
type
GossipSetupConfigurables
interface
{
PeerScoringParams
()
*
pubsub
.
PeerScoreParams
TopicScoringParams
()
*
pubsub
.
TopicScoreParams
BanPeers
()
bool
// ConfigureGossip creates configuration options to apply to the GossipSub setup
ConfigureGossip
(
rollupCfg
*
rollup
.
Config
)
[]
pubsub
.
Option
PeerBandScorer
()
*
BandScoreThresholds
}
type
GossipRuntimeConfig
interface
{
...
...
op-node/p2p/host.go
View file @
6a3cc606
...
...
@@ -32,6 +32,10 @@ import (
"github.com/ethereum-optimism/optimism/op-service/clock"
)
const
(
staticPeerTag
=
"static"
)
type
ExtraHostFeatures
interface
{
host
.
Host
ConnectionGater
()
gating
.
BlockingConnectionGater
...
...
@@ -67,7 +71,7 @@ func (e *extraHost) initStaticPeers() {
e
.
Peerstore
()
.
AddAddrs
(
addr
.
ID
,
addr
.
Addrs
,
time
.
Hour
*
24
*
7
)
// We protect the peer, so the connection manager doesn't decide to prune it.
// We tag it with "static" so other protects/unprotects with different tags don't affect this protection.
e
.
connMgr
.
Protect
(
addr
.
ID
,
"static"
)
e
.
connMgr
.
Protect
(
addr
.
ID
,
staticPeerTag
)
// Try to dial the node in the background
go
func
(
addr
*
peer
.
AddrInfo
)
{
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
time
.
Second
*
30
)
...
...
op-node/p2p/monitor/mocks/PeerManager.go
0 → 100644
View file @
6a3cc606
// Code generated by mockery v2.28.0. DO NOT EDIT.
package
mocks
import
(
mock
"github.com/stretchr/testify/mock"
peer
"github.com/libp2p/go-libp2p/core/peer"
time
"time"
)
// PeerManager is an autogenerated mock type for the PeerManager type
type
PeerManager
struct
{
mock
.
Mock
}
type
PeerManager_Expecter
struct
{
mock
*
mock
.
Mock
}
func
(
_m
*
PeerManager
)
EXPECT
()
*
PeerManager_Expecter
{
return
&
PeerManager_Expecter
{
mock
:
&
_m
.
Mock
}
}
// BanPeer provides a mock function with given fields: _a0, _a1
func
(
_m
*
PeerManager
)
BanPeer
(
_a0
peer
.
ID
,
_a1
time
.
Time
)
error
{
ret
:=
_m
.
Called
(
_a0
,
_a1
)
var
r0
error
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
peer
.
ID
,
time
.
Time
)
error
);
ok
{
r0
=
rf
(
_a0
,
_a1
)
}
else
{
r0
=
ret
.
Error
(
0
)
}
return
r0
}
// PeerManager_BanPeer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BanPeer'
type
PeerManager_BanPeer_Call
struct
{
*
mock
.
Call
}
// BanPeer is a helper method to define mock.On call
// - _a0 peer.ID
// - _a1 time.Time
func
(
_e
*
PeerManager_Expecter
)
BanPeer
(
_a0
interface
{},
_a1
interface
{})
*
PeerManager_BanPeer_Call
{
return
&
PeerManager_BanPeer_Call
{
Call
:
_e
.
mock
.
On
(
"BanPeer"
,
_a0
,
_a1
)}
}
func
(
_c
*
PeerManager_BanPeer_Call
)
Run
(
run
func
(
_a0
peer
.
ID
,
_a1
time
.
Time
))
*
PeerManager_BanPeer_Call
{
_c
.
Call
.
Run
(
func
(
args
mock
.
Arguments
)
{
run
(
args
[
0
]
.
(
peer
.
ID
),
args
[
1
]
.
(
time
.
Time
))
})
return
_c
}
func
(
_c
*
PeerManager_BanPeer_Call
)
Return
(
_a0
error
)
*
PeerManager_BanPeer_Call
{
_c
.
Call
.
Return
(
_a0
)
return
_c
}
func
(
_c
*
PeerManager_BanPeer_Call
)
RunAndReturn
(
run
func
(
peer
.
ID
,
time
.
Time
)
error
)
*
PeerManager_BanPeer_Call
{
_c
.
Call
.
Return
(
run
)
return
_c
}
// GetPeerScore provides a mock function with given fields: id
func
(
_m
*
PeerManager
)
GetPeerScore
(
id
peer
.
ID
)
(
float64
,
error
)
{
ret
:=
_m
.
Called
(
id
)
var
r0
float64
var
r1
error
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
peer
.
ID
)
(
float64
,
error
));
ok
{
return
rf
(
id
)
}
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
peer
.
ID
)
float64
);
ok
{
r0
=
rf
(
id
)
}
else
{
r0
=
ret
.
Get
(
0
)
.
(
float64
)
}
if
rf
,
ok
:=
ret
.
Get
(
1
)
.
(
func
(
peer
.
ID
)
error
);
ok
{
r1
=
rf
(
id
)
}
else
{
r1
=
ret
.
Error
(
1
)
}
return
r0
,
r1
}
// PeerManager_GetPeerScore_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPeerScore'
type
PeerManager_GetPeerScore_Call
struct
{
*
mock
.
Call
}
// GetPeerScore is a helper method to define mock.On call
// - id peer.ID
func
(
_e
*
PeerManager_Expecter
)
GetPeerScore
(
id
interface
{})
*
PeerManager_GetPeerScore_Call
{
return
&
PeerManager_GetPeerScore_Call
{
Call
:
_e
.
mock
.
On
(
"GetPeerScore"
,
id
)}
}
func
(
_c
*
PeerManager_GetPeerScore_Call
)
Run
(
run
func
(
id
peer
.
ID
))
*
PeerManager_GetPeerScore_Call
{
_c
.
Call
.
Run
(
func
(
args
mock
.
Arguments
)
{
run
(
args
[
0
]
.
(
peer
.
ID
))
})
return
_c
}
func
(
_c
*
PeerManager_GetPeerScore_Call
)
Return
(
_a0
float64
,
_a1
error
)
*
PeerManager_GetPeerScore_Call
{
_c
.
Call
.
Return
(
_a0
,
_a1
)
return
_c
}
func
(
_c
*
PeerManager_GetPeerScore_Call
)
RunAndReturn
(
run
func
(
peer
.
ID
)
(
float64
,
error
))
*
PeerManager_GetPeerScore_Call
{
_c
.
Call
.
Return
(
run
)
return
_c
}
// IsProtected provides a mock function with given fields: _a0
func
(
_m
*
PeerManager
)
IsStatic
(
_a0
peer
.
ID
)
bool
{
ret
:=
_m
.
Called
(
_a0
)
var
r0
bool
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
peer
.
ID
)
bool
);
ok
{
r0
=
rf
(
_a0
)
}
else
{
r0
=
ret
.
Get
(
0
)
.
(
bool
)
}
return
r0
}
// PeerManager_IsProtected_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsStatic'
type
PeerManager_IsProtected_Call
struct
{
*
mock
.
Call
}
// IsProtected is a helper method to define mock.On call
// - _a0 peer.ID
func
(
_e
*
PeerManager_Expecter
)
IsProtected
(
_a0
interface
{})
*
PeerManager_IsProtected_Call
{
return
&
PeerManager_IsProtected_Call
{
Call
:
_e
.
mock
.
On
(
"IsStatic"
,
_a0
)}
}
func
(
_c
*
PeerManager_IsProtected_Call
)
Run
(
run
func
(
_a0
peer
.
ID
))
*
PeerManager_IsProtected_Call
{
_c
.
Call
.
Run
(
func
(
args
mock
.
Arguments
)
{
run
(
args
[
0
]
.
(
peer
.
ID
))
})
return
_c
}
func
(
_c
*
PeerManager_IsProtected_Call
)
Return
(
_a0
bool
)
*
PeerManager_IsProtected_Call
{
_c
.
Call
.
Return
(
_a0
)
return
_c
}
func
(
_c
*
PeerManager_IsProtected_Call
)
RunAndReturn
(
run
func
(
peer
.
ID
)
bool
)
*
PeerManager_IsProtected_Call
{
_c
.
Call
.
Return
(
run
)
return
_c
}
// Peers provides a mock function with given fields:
func
(
_m
*
PeerManager
)
Peers
()
[]
peer
.
ID
{
ret
:=
_m
.
Called
()
var
r0
[]
peer
.
ID
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
()
[]
peer
.
ID
);
ok
{
r0
=
rf
()
}
else
{
if
ret
.
Get
(
0
)
!=
nil
{
r0
=
ret
.
Get
(
0
)
.
([]
peer
.
ID
)
}
}
return
r0
}
// PeerManager_Peers_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Peers'
type
PeerManager_Peers_Call
struct
{
*
mock
.
Call
}
// Peers is a helper method to define mock.On call
func
(
_e
*
PeerManager_Expecter
)
Peers
()
*
PeerManager_Peers_Call
{
return
&
PeerManager_Peers_Call
{
Call
:
_e
.
mock
.
On
(
"Peers"
)}
}
func
(
_c
*
PeerManager_Peers_Call
)
Run
(
run
func
())
*
PeerManager_Peers_Call
{
_c
.
Call
.
Run
(
func
(
args
mock
.
Arguments
)
{
run
()
})
return
_c
}
func
(
_c
*
PeerManager_Peers_Call
)
Return
(
_a0
[]
peer
.
ID
)
*
PeerManager_Peers_Call
{
_c
.
Call
.
Return
(
_a0
)
return
_c
}
func
(
_c
*
PeerManager_Peers_Call
)
RunAndReturn
(
run
func
()
[]
peer
.
ID
)
*
PeerManager_Peers_Call
{
_c
.
Call
.
Return
(
run
)
return
_c
}
type
mockConstructorTestingTNewPeerManager
interface
{
mock
.
TestingT
Cleanup
(
func
())
}
// NewPeerManager creates a new instance of PeerManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func
NewPeerManager
(
t
mockConstructorTestingTNewPeerManager
)
*
PeerManager
{
mock
:=
&
PeerManager
{}
mock
.
Mock
.
Test
(
t
)
t
.
Cleanup
(
func
()
{
mock
.
AssertExpectations
(
t
)
})
return
mock
}
op-node/p2p/monitor/peer_monitor.go
0 → 100644
View file @
6a3cc606
package
monitor
import
(
"context"
"fmt"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer"
)
const
(
// Time delay between checking the score of each peer to avoid activity spikes
checkInterval
=
1
*
time
.
Second
)
//go:generate mockery --name PeerManager --output mocks/ --with-expecter=true
type
PeerManager
interface
{
Peers
()
[]
peer
.
ID
GetPeerScore
(
id
peer
.
ID
)
(
float64
,
error
)
IsStatic
(
peer
.
ID
)
bool
// BanPeer bans the peer until the specified time and disconnects any existing connections.
BanPeer
(
peer
.
ID
,
time
.
Time
)
error
}
// PeerMonitor runs a background process to periodically check for peers with scores below a minimum.
// When it finds bad peers, it disconnects and bans them.
// A delay is introduced between each peer being checked to avoid spikes in system load.
type
PeerMonitor
struct
{
ctx
context
.
Context
cancelFn
context
.
CancelFunc
l
log
.
Logger
clock
clock
.
Clock
manager
PeerManager
minScore
float64
banDuration
time
.
Duration
bgTasks
sync
.
WaitGroup
// Used by checkNextPeer and must only be accessed from the background thread
peerList
[]
peer
.
ID
nextPeerIdx
int
}
func
NewPeerMonitor
(
ctx
context
.
Context
,
l
log
.
Logger
,
clock
clock
.
Clock
,
manager
PeerManager
,
minScore
float64
,
banDuration
time
.
Duration
)
*
PeerMonitor
{
ctx
,
cancelFn
:=
context
.
WithCancel
(
ctx
)
return
&
PeerMonitor
{
ctx
:
ctx
,
cancelFn
:
cancelFn
,
l
:
l
,
clock
:
clock
,
manager
:
manager
,
minScore
:
minScore
,
banDuration
:
banDuration
,
}
}
func
(
p
*
PeerMonitor
)
Start
()
{
p
.
bgTasks
.
Add
(
1
)
go
p
.
background
(
p
.
checkNextPeer
)
}
func
(
p
*
PeerMonitor
)
Stop
()
{
p
.
cancelFn
()
p
.
bgTasks
.
Wait
()
}
// checkNextPeer checks the next peer and disconnects and bans it if its score is too low and its not protected.
// The first call gets the list of current peers and checks the first one, then each subsequent call checks the next
// peer in the list. When the end of the list is reached, an updated list of connected peers is retrieved and the process
// starts again.
func
(
p
*
PeerMonitor
)
checkNextPeer
()
error
{
// Get a new list of peers to check if we've checked all peers in the previous list
if
p
.
nextPeerIdx
>=
len
(
p
.
peerList
)
{
p
.
peerList
=
p
.
manager
.
Peers
()
p
.
nextPeerIdx
=
0
}
if
len
(
p
.
peerList
)
==
0
{
// No peers to check
return
nil
}
id
:=
p
.
peerList
[
p
.
nextPeerIdx
]
p
.
nextPeerIdx
++
score
,
err
:=
p
.
manager
.
GetPeerScore
(
id
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"retrieve score for peer %v: %w"
,
id
,
err
)
}
if
score
>=
p
.
minScore
{
return
nil
}
if
p
.
manager
.
IsStatic
(
id
)
{
return
nil
}
if
err
:=
p
.
manager
.
BanPeer
(
id
,
p
.
clock
.
Now
()
.
Add
(
p
.
banDuration
));
err
!=
nil
{
return
fmt
.
Errorf
(
"banning peer %v: %w"
,
id
,
err
)
}
return
nil
}
// background is intended to run as a separate go routine. It will call the supplied action function every checkInterval
// until the context is done.
func
(
p
*
PeerMonitor
)
background
(
action
func
()
error
)
{
defer
p
.
bgTasks
.
Done
()
ticker
:=
p
.
clock
.
NewTicker
(
checkInterval
)
defer
ticker
.
Stop
()
for
{
select
{
case
<-
p
.
ctx
.
Done
()
:
return
case
<-
ticker
.
Ch
()
:
if
err
:=
action
();
err
!=
nil
{
p
.
l
.
Warn
(
"Error while checking connected peer score"
,
"err"
,
err
)
}
}
}
}
op-node/p2p/monitor/peer_monitor_test.go
0 → 100644
View file @
6a3cc606
package
monitor
import
(
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/monitor/mocks"
"github.com/ethereum-optimism/optimism/op-node/testlog"
clock2
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)
const
testBanDuration
=
2
*
time
.
Hour
func
peerMonitorSetup
(
t
*
testing
.
T
)
(
*
PeerMonitor
,
*
clock2
.
DeterministicClock
,
*
mocks
.
PeerManager
)
{
l
:=
testlog
.
Logger
(
t
,
log
.
LvlInfo
)
clock
:=
clock2
.
NewDeterministicClock
(
time
.
UnixMilli
(
10000
))
manager
:=
mocks
.
NewPeerManager
(
t
)
monitor
:=
NewPeerMonitor
(
context
.
Background
(),
l
,
clock
,
manager
,
-
100
,
testBanDuration
)
return
monitor
,
clock
,
manager
}
func
TestPeriodicallyCheckNextPeer
(
t
*
testing
.
T
)
{
monitor
,
clock
,
_
:=
peerMonitorSetup
(
t
)
// Each time a step is performed, it calls Done on the wait group so we can wait for it to be performed
stepCh
:=
make
(
chan
struct
{},
10
)
monitor
.
bgTasks
.
Add
(
1
)
var
actionErr
error
go
monitor
.
background
(
func
()
error
{
stepCh
<-
struct
{}{}
return
actionErr
})
defer
monitor
.
Stop
()
// Wait for the step ticker to be started
clock
.
WaitForNewPendingTaskWithTimeout
(
30
*
time
.
Second
)
// Should perform another step after each interval
for
i
:=
0
;
i
<
5
;
i
++
{
clock
.
AdvanceTime
(
checkInterval
)
waitForChan
(
t
,
stepCh
,
fmt
.
Sprintf
(
"Did not perform step %v"
,
i
))
require
.
Len
(
t
,
stepCh
,
0
)
}
// Should continue executing periodically even after an error
actionErr
=
errors
.
New
(
"boom"
)
for
i
:=
0
;
i
<
5
;
i
++
{
clock
.
AdvanceTime
(
checkInterval
)
waitForChan
(
t
,
stepCh
,
fmt
.
Sprintf
(
"Did not perform step %v"
,
i
))
require
.
Len
(
t
,
stepCh
,
0
)
}
}
func
TestCheckNextPeer
(
t
*
testing
.
T
)
{
peerIDs
:=
[]
peer
.
ID
{
peer
.
ID
(
"a"
),
peer
.
ID
(
"b"
),
peer
.
ID
(
"c"
),
}
t
.
Run
(
"No peers"
,
func
(
t
*
testing
.
T
)
{
monitor
,
_
,
manager
:=
peerMonitorSetup
(
t
)
manager
.
EXPECT
()
.
Peers
()
.
Return
(
nil
)
.
Once
()
require
.
NoError
(
t
,
monitor
.
checkNextPeer
())
})
t
.
Run
(
"Check each peer then refresh list"
,
func
(
t
*
testing
.
T
)
{
monitor
,
_
,
manager
:=
peerMonitorSetup
(
t
)
manager
.
EXPECT
()
.
Peers
()
.
Return
(
peerIDs
)
.
Once
()
for
_
,
id
:=
range
peerIDs
{
manager
.
EXPECT
()
.
GetPeerScore
(
id
)
.
Return
(
1
,
nil
)
.
Once
()
require
.
NoError
(
t
,
monitor
.
checkNextPeer
())
}
updatedPeers
:=
[]
peer
.
ID
{
peer
.
ID
(
"x"
),
peer
.
ID
(
"y"
),
peer
.
ID
(
"z"
),
peer
.
ID
(
"a"
),
}
manager
.
EXPECT
()
.
Peers
()
.
Return
(
updatedPeers
)
.
Once
()
for
_
,
id
:=
range
updatedPeers
{
manager
.
EXPECT
()
.
GetPeerScore
(
id
)
.
Return
(
1
,
nil
)
.
Once
()
require
.
NoError
(
t
,
monitor
.
checkNextPeer
())
}
})
t
.
Run
(
"Close and ban peer when below min score"
,
func
(
t
*
testing
.
T
)
{
monitor
,
clock
,
manager
:=
peerMonitorSetup
(
t
)
id
:=
peerIDs
[
0
]
manager
.
EXPECT
()
.
Peers
()
.
Return
(
peerIDs
)
.
Once
()
manager
.
EXPECT
()
.
GetPeerScore
(
id
)
.
Return
(
-
101
,
nil
)
.
Once
()
manager
.
EXPECT
()
.
IsProtected
(
id
)
.
Return
(
false
)
.
Once
()
manager
.
EXPECT
()
.
BanPeer
(
id
,
clock
.
Now
()
.
Add
(
testBanDuration
))
.
Return
(
nil
)
.
Once
()
require
.
NoError
(
t
,
monitor
.
checkNextPeer
())
})
t
.
Run
(
"Do not close protected peer when below min score"
,
func
(
t
*
testing
.
T
)
{
monitor
,
_
,
manager
:=
peerMonitorSetup
(
t
)
id
:=
peerIDs
[
0
]
manager
.
EXPECT
()
.
Peers
()
.
Return
(
peerIDs
)
.
Once
()
manager
.
EXPECT
()
.
GetPeerScore
(
id
)
.
Return
(
-
101
,
nil
)
.
Once
()
manager
.
EXPECT
()
.
IsProtected
(
id
)
.
Return
(
true
)
require
.
NoError
(
t
,
monitor
.
checkNextPeer
())
})
}
func
waitForChan
(
t
*
testing
.
T
,
ch
chan
struct
{},
msg
string
)
{
ctx
,
cancelFn
:=
context
.
WithTimeout
(
context
.
Background
(),
30
*
time
.
Second
)
defer
cancelFn
()
select
{
case
<-
ctx
.
Done
()
:
t
.
Fatal
(
msg
)
case
<-
ch
:
// Ok
}
}
op-node/p2p/node.go
View file @
6a3cc606
...
...
@@ -8,14 +8,13 @@ import (
"strconv"
"time"
"github.com/libp2p/go-libp2p/core/peer"
manet
"github.com/multiformats/go-multiaddr/net"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
"github.com/ethereum-optimism/optimism/op-node/p2p/monitor"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
...
...
@@ -25,17 +24,20 @@ import (
"github.com/libp2p/go-libp2p/core/host"
p2pmetrics
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
ma
"github.com/multiformats/go-multiaddr"
manet
"github.com/multiformats/go-multiaddr/net"
)
// NodeP2P is a p2p node, which can be used to gossip messages.
type
NodeP2P
struct
{
host
host
.
Host
// p2p host (optional, may be nil)
gater
gating
.
BlockingConnectionGater
// p2p gater, to ban/unban peers with, may be nil even with p2p enabled
scorer
Scorer
// writes score-updates to the peerstore and keeps metrics of score changes
connMgr
connmgr
.
ConnManager
// p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
store
store
.
ExtendedPeerstore
// peerstore of host, with extra bindings for scoring and banning
log
log
.
Logger
host
host
.
Host
// p2p host (optional, may be nil)
gater
gating
.
BlockingConnectionGater
// p2p gater, to ban/unban peers with, may be nil even with p2p enabled
scorer
Scorer
// writes score-updates to the peerstore and keeps metrics of score changes
connMgr
connmgr
.
ConnManager
// p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
peerMonitor
*
monitor
.
PeerMonitor
// peer monitor to disconnect bad peers, may be nil even with p2p enabled
store
store
.
ExtendedPeerstore
// peerstore of host, with extra bindings for scoring and banning
log
log
.
Logger
// the below components are all optional, and may be nil. They require the host to not be nil.
dv5Local
*
enode
.
LocalNode
// p2p discovery identity
dv5Udp
*
discover
.
UDPv5
// p2p discovery service
...
...
@@ -151,6 +153,11 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
if
metrics
!=
nil
{
go
metrics
.
RecordBandwidth
(
resourcesCtx
,
bwc
)
}
if
setup
.
BanPeers
()
{
n
.
peerMonitor
=
monitor
.
NewPeerMonitor
(
resourcesCtx
,
log
,
clock
.
SystemClock
,
n
,
setup
.
BanThreshold
(),
setup
.
BanDuration
())
n
.
peerMonitor
.
Start
()
}
}
return
nil
}
...
...
@@ -194,6 +201,22 @@ func (n *NodeP2P) ConnectionManager() connmgr.ConnManager {
return
n
.
connMgr
}
func
(
n
*
NodeP2P
)
Peers
()
[]
peer
.
ID
{
return
n
.
host
.
Network
()
.
Peers
()
}
func
(
n
*
NodeP2P
)
GetPeerScore
(
id
peer
.
ID
)
(
float64
,
error
)
{
scores
,
err
:=
n
.
store
.
GetPeerScores
(
id
)
if
err
!=
nil
{
return
0
,
err
}
return
scores
.
Gossip
.
Total
,
nil
}
func
(
n
*
NodeP2P
)
IsStatic
(
id
peer
.
ID
)
bool
{
return
n
.
connMgr
!=
nil
&&
n
.
connMgr
.
IsProtected
(
id
,
staticPeerTag
)
}
func
(
n
*
NodeP2P
)
BanPeer
(
id
peer
.
ID
,
expiration
time
.
Time
)
error
{
if
err
:=
n
.
store
.
SetPeerBanExpiration
(
id
,
expiration
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to set peer ban expiry: %w"
,
err
)
...
...
@@ -226,6 +249,9 @@ func (n *NodeP2P) BanIP(ip net.IP, expiration time.Time) error {
func
(
n
*
NodeP2P
)
Close
()
error
{
var
result
*
multierror
.
Error
if
n
.
peerMonitor
!=
nil
{
n
.
peerMonitor
.
Stop
()
}
if
n
.
dv5Udp
!=
nil
{
n
.
dv5Udp
.
Close
()
}
...
...
op-node/p2p/prepared.go
View file @
6a3cc606
...
...
@@ -3,6 +3,7 @@ package p2p
import
(
"errors"
"fmt"
"time"
pubsub
"github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
...
...
@@ -80,6 +81,14 @@ func (p *Prepared) BanPeers() bool {
return
false
}
func
(
p
*
Prepared
)
BanThreshold
()
float64
{
return
-
100
}
func
(
p
*
Prepared
)
BanDuration
()
time
.
Duration
{
return
1
*
time
.
Hour
}
func
(
p
*
Prepared
)
TopicScoringParams
()
*
pubsub
.
TopicScoreParams
{
return
nil
}
...
...
op-service/clock/deterministic.go
View file @
6a3cc606
...
...
@@ -136,6 +136,12 @@ func (s *DeterministicClock) addPending(t action) {
}
}
func
(
s
*
DeterministicClock
)
WaitForNewPendingTaskWithTimeout
(
timeout
time
.
Duration
)
bool
{
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
timeout
)
defer
cancel
()
return
s
.
WaitForNewPendingTask
(
ctx
)
}
// WaitForNewPendingTask blocks until a new task is scheduled since the last time this method was called.
// true is returned if a new task was scheduled, false if the context completed before a new task was added.
func
(
s
*
DeterministicClock
)
WaitForNewPendingTask
(
ctx
context
.
Context
)
bool
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment