Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
95978376
Unverified
Commit
95978376
authored
May 23, 2023
by
OptimismBot
Committed by
GitHub
May 23, 2023
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #5772 from ethereum-optimism/p2p-scorer-refactor
op-node: P2P scorer refactor
parents
e643a258
bf2bdf75
Changes
22
Hide whitespace changes
Inline
Side-by-side
Showing
22 changed files
with
289 additions
and
644 deletions
+289
-644
setup.go
op-e2e/setup.go
+60
-1
system_test.go
op-e2e/system_test.go
+1
-1
config.go
op-node/p2p/config.go
+8
-30
gossip.go
op-node/p2p/gossip.go
+2
-4
host.go
op-node/p2p/host.go
+15
-12
host_test.go
op-node/p2p/host_test.go
+2
-2
ConnectionGater.go
op-node/p2p/mocks/ConnectionGater.go
+0
-248
Peerstore.go
op-node/p2p/mocks/Peerstore.go
+6
-6
node.go
op-node/p2p/node.go
+29
-15
peer_gater.go
op-node/p2p/peer_gater.go
+0
-75
peer_gater_test.go
op-node/p2p/peer_gater_test.go
+0
-95
peer_scorer.go
op-node/p2p/peer_scorer.go
+36
-19
peer_scorer_test.go
op-node/p2p/peer_scorer_test.go
+17
-23
peer_scores.go
op-node/p2p/peer_scores.go
+1
-11
peer_scores_test.go
op-node/p2p/peer_scores_test.go
+26
-19
prepared.go
op-node/p2p/prepared.go
+1
-1
rpc_server.go
op-node/p2p/rpc_server.go
+3
-1
iface.go
op-node/p2p/store/iface.go
+28
-8
scorebook.go
op-node/p2p/store/scorebook.go
+6
-12
scorebook_test.go
op-node/p2p/store/scorebook_test.go
+17
-23
serialize.go
op-node/p2p/store/serialize.go
+6
-28
serialize_test.go
op-node/p2p/store/serialize_test.go
+25
-10
No files found.
op-e2e/setup.go
View file @
95978376
...
@@ -3,8 +3,10 @@ package op_e2e
...
@@ -3,8 +3,10 @@ package op_e2e
import
(
import
(
"context"
"context"
"crypto/ecdsa"
"crypto/ecdsa"
"crypto/rand"
"fmt"
"fmt"
"math/big"
"math/big"
"net"
"os"
"os"
"path"
"path"
"sort"
"sort"
...
@@ -12,6 +14,18 @@ import (
...
@@ -12,6 +14,18 @@ import (
"testing"
"testing"
"time"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/clock"
ds
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
ic
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
ma
"github.com/multiformats/go-multiaddr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core"
...
@@ -465,7 +479,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
...
@@ -465,7 +479,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
if
p
,
ok
:=
p2pNodes
[
name
];
ok
{
if
p
,
ok
:=
p2pNodes
[
name
];
ok
{
return
p
,
nil
return
p
,
nil
}
}
h
,
err
:=
sys
.
Mocknet
.
Gen
Peer
()
h
,
err
:=
sys
.
newMockNet
Peer
()
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to init p2p host for node %s"
,
name
)
return
nil
,
fmt
.
Errorf
(
"failed to init p2p host for node %s"
,
name
)
}
}
...
@@ -627,6 +641,51 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
...
@@ -627,6 +641,51 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
return
sys
,
nil
return
sys
,
nil
}
}
// IP6 range that gets blackholed (in case our traffic ever makes it out onto
// the internet).
var
blackholeIP6
=
net
.
ParseIP
(
"100::"
)
// mocknet doesn't allow us to add a peerstore without fully creating the peer ourselves
func
(
sys
*
System
)
newMockNetPeer
()
(
host
.
Host
,
error
)
{
sk
,
_
,
err
:=
ic
.
GenerateECDSAKeyPair
(
rand
.
Reader
)
if
err
!=
nil
{
return
nil
,
err
}
id
,
err
:=
peer
.
IDFromPrivateKey
(
sk
)
if
err
!=
nil
{
return
nil
,
err
}
suffix
:=
id
if
len
(
id
)
>
8
{
suffix
=
id
[
len
(
id
)
-
8
:
]
}
ip
:=
append
(
net
.
IP
{},
blackholeIP6
...
)
copy
(
ip
[
net
.
IPv6len
-
len
(
suffix
)
:
],
suffix
)
a
,
err
:=
ma
.
NewMultiaddr
(
fmt
.
Sprintf
(
"/ip6/%s/tcp/4242"
,
ip
))
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to create test multiaddr: %w"
,
err
)
}
p
,
err
:=
peer
.
IDFromPublicKey
(
sk
.
GetPublic
())
if
err
!=
nil
{
return
nil
,
err
}
ps
,
err
:=
pstoremem
.
NewPeerstore
()
if
err
!=
nil
{
return
nil
,
err
}
ps
.
AddAddr
(
p
,
a
,
peerstore
.
PermanentAddrTTL
)
_
=
ps
.
AddPrivKey
(
p
,
sk
)
_
=
ps
.
AddPubKey
(
p
,
sk
.
GetPublic
())
ds
:=
sync
.
MutexWrap
(
ds
.
NewMapDatastore
())
eps
,
err
:=
store
.
NewExtendedPeerstore
(
context
.
Background
(),
log
.
Root
(),
clock
.
SystemClock
,
ps
,
ds
)
if
err
!=
nil
{
return
nil
,
err
}
return
sys
.
Mocknet
.
AddPeerWithPeerstore
(
p
,
eps
)
}
func
selectEndpoint
(
node
*
node
.
Node
)
string
{
func
selectEndpoint
(
node
*
node
.
Node
)
string
{
useHTTP
:=
os
.
Getenv
(
"OP_E2E_USE_HTTP"
)
==
"true"
useHTTP
:=
os
.
Getenv
(
"OP_E2E_USE_HTTP"
)
==
"true"
if
useHTTP
{
if
useHTTP
{
...
...
op-e2e/system_test.go
View file @
95978376
...
@@ -706,7 +706,7 @@ func TestSystemP2PAltSync(t *testing.T) {
...
@@ -706,7 +706,7 @@ func TestSystemP2PAltSync(t *testing.T) {
snapLog
.
SetHandler
(
log
.
DiscardHandler
())
snapLog
.
SetHandler
(
log
.
DiscardHandler
())
// Create a peer, and hook up alice and bob
// Create a peer, and hook up alice and bob
h
,
err
:=
sys
.
Mocknet
.
Gen
Peer
()
h
,
err
:=
sys
.
newMockNet
Peer
()
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
_
,
err
=
sys
.
Mocknet
.
LinkPeers
(
sys
.
RollupNodes
[
"alice"
]
.
P2P
()
.
Host
()
.
ID
(),
h
.
ID
())
_
,
err
=
sys
.
Mocknet
.
LinkPeers
(
sys
.
RollupNodes
[
"alice"
]
.
P2P
()
.
Host
()
.
ID
(),
h
.
ID
())
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
...
...
op-node/p2p/config.go
View file @
95978376
...
@@ -6,6 +6,8 @@ import (
...
@@ -6,6 +6,8 @@ import (
"net"
"net"
"time"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enode"
...
@@ -17,8 +19,6 @@ import (
...
@@ -17,8 +19,6 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
cmgr
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
cmgr
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup"
...
@@ -30,12 +30,17 @@ var DefaultBootnodes = []*enode.Node{
...
@@ -30,12 +30,17 @@ var DefaultBootnodes = []*enode.Node{
enode
.
MustParse
(
"enode://9d7a3efefe442351217e73b3a593bcb8efffb55b4807699972145324eab5e6b382152f8d24f6301baebbfb5ecd4127bd3faab2842c04cd432bdf50ba092f6645@34.65.109.126:0?discport=30305"
),
enode
.
MustParse
(
"enode://9d7a3efefe442351217e73b3a593bcb8efffb55b4807699972145324eab5e6b382152f8d24f6301baebbfb5ecd4127bd3faab2842c04cd432bdf50ba092f6645@34.65.109.126:0?discport=30305"
),
}
}
type
HostMetrics
interface
{
gating
.
UnbanMetrics
gating
.
ConnectionGaterMetrics
}
// SetupP2P provides a host and discovery service for usage in the rollup node.
// SetupP2P provides a host and discovery service for usage in the rollup node.
type
SetupP2P
interface
{
type
SetupP2P
interface
{
Check
()
error
Check
()
error
Disabled
()
bool
Disabled
()
bool
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
Host
(
log
log
.
Logger
,
reporter
metrics
.
Reporter
)
(
host
.
Host
,
error
)
Host
(
log
log
.
Logger
,
reporter
metrics
.
Reporter
,
metrics
HostMetrics
)
(
host
.
Host
,
error
)
// Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled.
// Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled.
Discovery
(
log
log
.
Logger
,
rollupCfg
*
rollup
.
Config
,
tcpPort
uint16
)
(
*
enode
.
LocalNode
,
*
discover
.
UDPv5
,
error
)
Discovery
(
log
log
.
Logger
,
rollupCfg
*
rollup
.
Config
,
tcpPort
uint16
)
(
*
enode
.
LocalNode
,
*
discover
.
UDPv5
,
error
)
TargetPeers
()
uint
TargetPeers
()
uint
...
@@ -109,33 +114,6 @@ type Config struct {
...
@@ -109,33 +114,6 @@ type Config struct {
EnableReqRespSync
bool
EnableReqRespSync
bool
}
}
//go:generate mockery --name ConnectionGater
type
ConnectionGater
interface
{
connmgr
.
ConnectionGater
// BlockPeer adds a peer to the set of blocked peers.
// Note: active connections to the peer are not automatically closed.
BlockPeer
(
p
peer
.
ID
)
error
UnblockPeer
(
p
peer
.
ID
)
error
ListBlockedPeers
()
[]
peer
.
ID
// BlockAddr adds an IP address to the set of blocked addresses.
// Note: active connections to the IP address are not automatically closed.
BlockAddr
(
ip
net
.
IP
)
error
UnblockAddr
(
ip
net
.
IP
)
error
ListBlockedAddrs
()
[]
net
.
IP
// BlockSubnet adds an IP subnet to the set of blocked addresses.
// Note: active connections to the IP subnet are not automatically closed.
BlockSubnet
(
ipnet
*
net
.
IPNet
)
error
UnblockSubnet
(
ipnet
*
net
.
IPNet
)
error
ListBlockedSubnets
()
[]
*
net
.
IPNet
}
func
DefaultConnGater
(
conf
*
Config
)
(
connmgr
.
ConnectionGater
,
error
)
{
return
conngater
.
NewBasicConnectionGater
(
conf
.
Store
)
}
func
DefaultConnManager
(
conf
*
Config
)
(
connmgr
.
ConnManager
,
error
)
{
func
DefaultConnManager
(
conf
*
Config
)
(
connmgr
.
ConnManager
,
error
)
{
return
cmgr
.
NewConnManager
(
return
cmgr
.
NewConnManager
(
int
(
conf
.
PeersLo
),
int
(
conf
.
PeersLo
),
...
...
op-node/p2p/gossip.go
View file @
95978376
...
@@ -66,8 +66,6 @@ type GossipRuntimeConfig interface {
...
@@ -66,8 +66,6 @@ type GossipRuntimeConfig interface {
//go:generate mockery --name GossipMetricer
//go:generate mockery --name GossipMetricer
type
GossipMetricer
interface
{
type
GossipMetricer
interface
{
RecordGossipEvent
(
evType
int32
)
RecordGossipEvent
(
evType
int32
)
// Peer Scoring Metric Funcs
SetPeerScores
(
map
[
string
]
float64
)
}
}
func
blocksTopicV1
(
cfg
*
rollup
.
Config
)
string
{
func
blocksTopicV1
(
cfg
*
rollup
.
Config
)
string
{
...
@@ -157,7 +155,7 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams {
...
@@ -157,7 +155,7 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams {
// NewGossipSub configures a new pubsub instance with the specified parameters.
// NewGossipSub configures a new pubsub instance with the specified parameters.
// PubSub uses a GossipSubRouter as it's router under the hood.
// PubSub uses a GossipSubRouter as it's router under the hood.
func
NewGossipSub
(
p2pCtx
context
.
Context
,
h
host
.
Host
,
g
ConnectionGater
,
cfg
*
rollup
.
Config
,
gossipConf
GossipSetupConfigurables
,
m
GossipMetricer
,
log
log
.
Logger
)
(
*
pubsub
.
PubSub
,
error
)
{
func
NewGossipSub
(
p2pCtx
context
.
Context
,
h
host
.
Host
,
cfg
*
rollup
.
Config
,
gossipConf
GossipSetupConfigurables
,
scorer
Scorer
,
m
GossipMetricer
,
log
log
.
Logger
)
(
*
pubsub
.
PubSub
,
error
)
{
denyList
,
err
:=
pubsub
.
NewTimeCachedBlacklist
(
30
*
time
.
Second
)
denyList
,
err
:=
pubsub
.
NewTimeCachedBlacklist
(
30
*
time
.
Second
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
...
@@ -176,7 +174,7 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, g ConnectionGater, cfg *r
...
@@ -176,7 +174,7 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, g ConnectionGater, cfg *r
pubsub
.
WithBlacklist
(
denyList
),
pubsub
.
WithBlacklist
(
denyList
),
pubsub
.
WithEventTracer
(
&
gossipTracer
{
m
:
m
}),
pubsub
.
WithEventTracer
(
&
gossipTracer
{
m
:
m
}),
}
}
gossipOpts
=
append
(
gossipOpts
,
ConfigurePeerScoring
(
h
,
g
,
gossipConf
,
m
,
log
)
...
)
gossipOpts
=
append
(
gossipOpts
,
ConfigurePeerScoring
(
gossipConf
,
scorer
,
log
)
...
)
gossipOpts
=
append
(
gossipOpts
,
gossipConf
.
ConfigureGossip
(
cfg
)
...
)
gossipOpts
=
append
(
gossipOpts
,
gossipConf
.
ConfigureGossip
(
cfg
)
...
)
return
pubsub
.
NewGossipSub
(
p2pCtx
,
h
,
gossipOpts
...
)
return
pubsub
.
NewGossipSub
(
p2pCtx
,
h
,
gossipOpts
...
)
}
}
...
...
op-node/p2p/host.go
View file @
95978376
...
@@ -7,9 +7,6 @@ import (
...
@@ -7,9 +7,6 @@ import (
"sync"
"sync"
"time"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
libp2p
"github.com/libp2p/go-libp2p"
libp2p
"github.com/libp2p/go-libp2p"
lconf
"github.com/libp2p/go-libp2p/config"
lconf
"github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/connmgr"
...
@@ -29,17 +26,21 @@ import (
...
@@ -29,17 +26,21 @@ import (
madns
"github.com/multiformats/go-multiaddr-dns"
madns
"github.com/multiformats/go-multiaddr-dns"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/clock"
)
)
type
ExtraHostFeatures
interface
{
type
ExtraHostFeatures
interface
{
host
.
Host
host
.
Host
ConnectionGater
()
ConnectionGater
ConnectionGater
()
gating
.
Blocking
ConnectionGater
ConnectionManager
()
connmgr
.
ConnManager
ConnectionManager
()
connmgr
.
ConnManager
}
}
type
extraHost
struct
{
type
extraHost
struct
{
host
.
Host
host
.
Host
gater
ConnectionGater
gater
gating
.
Blocking
ConnectionGater
connMgr
connmgr
.
ConnManager
connMgr
connmgr
.
ConnManager
log
log
.
Logger
log
log
.
Logger
...
@@ -48,7 +49,7 @@ type extraHost struct {
...
@@ -48,7 +49,7 @@ type extraHost struct {
quitC
chan
struct
{}
quitC
chan
struct
{}
}
}
func
(
e
*
extraHost
)
ConnectionGater
()
ConnectionGater
{
func
(
e
*
extraHost
)
ConnectionGater
()
gating
.
Blocking
ConnectionGater
{
return
e
.
gater
return
e
.
gater
}
}
...
@@ -125,7 +126,7 @@ func (e *extraHost) monitorStaticPeers() {
...
@@ -125,7 +126,7 @@ func (e *extraHost) monitorStaticPeers() {
var
_
ExtraHostFeatures
=
(
*
extraHost
)(
nil
)
var
_
ExtraHostFeatures
=
(
*
extraHost
)(
nil
)
func
(
conf
*
Config
)
Host
(
log
log
.
Logger
,
reporter
metrics
.
Reporter
)
(
host
.
Host
,
error
)
{
func
(
conf
*
Config
)
Host
(
log
log
.
Logger
,
reporter
metrics
.
Reporter
,
metrics
HostMetrics
)
(
host
.
Host
,
error
)
{
if
conf
.
DisableP2P
{
if
conf
.
DisableP2P
{
return
nil
,
nil
return
nil
,
nil
}
}
...
@@ -152,10 +153,15 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host,
...
@@ -152,10 +153,15 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host,
return
nil
,
fmt
.
Errorf
(
"failed to set up peerstore with pub key: %w"
,
err
)
return
nil
,
fmt
.
Errorf
(
"failed to set up peerstore with pub key: %w"
,
err
)
}
}
connGtr
,
err
:=
DefaultConnGater
(
conf
)
var
connGtr
gating
.
BlockingConnectionGater
connGtr
,
err
=
gating
.
NewBlockingConnectionGater
(
conf
.
Store
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to open connection gater: %w"
,
err
)
return
nil
,
fmt
.
Errorf
(
"failed to open connection gater: %w"
,
err
)
}
}
// TODO(CLI-4015): apply connGtr enhancements
// connGtr = gating.AddBanExpiry(connGtr, ps, log, cl, reporter)
//connGtr = gating.AddScoring(connGtr, ps, 0)
connGtr
=
gating
.
AddMetering
(
connGtr
,
metrics
)
connMngr
,
err
:=
DefaultConnManager
(
conf
)
connMngr
,
err
:=
DefaultConnManager
(
conf
)
if
err
!=
nil
{
if
err
!=
nil
{
...
@@ -234,10 +240,7 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host,
...
@@ -234,10 +240,7 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host,
go
out
.
monitorStaticPeers
()
go
out
.
monitorStaticPeers
()
}
}
// Only add the connection gater if it offers the full interface we're looking for.
out
.
gater
=
connGtr
if
g
,
ok
:=
connGtr
.
(
ConnectionGater
);
ok
{
out
.
gater
=
g
}
return
out
,
nil
return
out
,
nil
}
}
...
...
op-node/p2p/host_test.go
View file @
95978376
...
@@ -59,10 +59,10 @@ func TestingConfig(t *testing.T) *Config {
...
@@ -59,10 +59,10 @@ func TestingConfig(t *testing.T) *Config {
func
TestP2PSimple
(
t
*
testing
.
T
)
{
func
TestP2PSimple
(
t
*
testing
.
T
)
{
confA
:=
TestingConfig
(
t
)
confA
:=
TestingConfig
(
t
)
confB
:=
TestingConfig
(
t
)
confB
:=
TestingConfig
(
t
)
hostA
,
err
:=
confA
.
Host
(
testlog
.
Logger
(
t
,
log
.
LvlError
)
.
New
(
"host"
,
"A"
),
nil
)
hostA
,
err
:=
confA
.
Host
(
testlog
.
Logger
(
t
,
log
.
LvlError
)
.
New
(
"host"
,
"A"
),
nil
,
metrics
.
NoopMetrics
)
require
.
NoError
(
t
,
err
,
"failed to launch host A"
)
require
.
NoError
(
t
,
err
,
"failed to launch host A"
)
defer
hostA
.
Close
()
defer
hostA
.
Close
()
hostB
,
err
:=
confB
.
Host
(
testlog
.
Logger
(
t
,
log
.
LvlError
)
.
New
(
"host"
,
"B"
),
nil
)
hostB
,
err
:=
confB
.
Host
(
testlog
.
Logger
(
t
,
log
.
LvlError
)
.
New
(
"host"
,
"B"
),
nil
,
metrics
.
NoopMetrics
)
require
.
NoError
(
t
,
err
,
"failed to launch host B"
)
require
.
NoError
(
t
,
err
,
"failed to launch host B"
)
defer
hostB
.
Close
()
defer
hostB
.
Close
()
err
=
hostA
.
Connect
(
context
.
Background
(),
peer
.
AddrInfo
{
ID
:
hostB
.
ID
(),
Addrs
:
hostB
.
Addrs
()})
err
=
hostA
.
Connect
(
context
.
Background
(),
peer
.
AddrInfo
{
ID
:
hostB
.
ID
(),
Addrs
:
hostB
.
Addrs
()})
...
...
op-node/p2p/mocks/ConnectionGater.go
deleted
100644 → 0
View file @
e643a258
// Code generated by mockery v2.22.1. DO NOT EDIT.
package
mocks
import
(
control
"github.com/libp2p/go-libp2p/core/control"
mock
"github.com/stretchr/testify/mock"
multiaddr
"github.com/multiformats/go-multiaddr"
net
"net"
network
"github.com/libp2p/go-libp2p/core/network"
peer
"github.com/libp2p/go-libp2p/core/peer"
)
// ConnectionGater is an autogenerated mock type for the ConnectionGater type
type
ConnectionGater
struct
{
mock
.
Mock
}
// BlockAddr provides a mock function with given fields: ip
func
(
_m
*
ConnectionGater
)
BlockAddr
(
ip
net
.
IP
)
error
{
ret
:=
_m
.
Called
(
ip
)
var
r0
error
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
net
.
IP
)
error
);
ok
{
r0
=
rf
(
ip
)
}
else
{
r0
=
ret
.
Error
(
0
)
}
return
r0
}
// BlockPeer provides a mock function with given fields: p
func
(
_m
*
ConnectionGater
)
BlockPeer
(
p
peer
.
ID
)
error
{
ret
:=
_m
.
Called
(
p
)
var
r0
error
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
peer
.
ID
)
error
);
ok
{
r0
=
rf
(
p
)
}
else
{
r0
=
ret
.
Error
(
0
)
}
return
r0
}
// BlockSubnet provides a mock function with given fields: ipnet
func
(
_m
*
ConnectionGater
)
BlockSubnet
(
ipnet
*
net
.
IPNet
)
error
{
ret
:=
_m
.
Called
(
ipnet
)
var
r0
error
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
*
net
.
IPNet
)
error
);
ok
{
r0
=
rf
(
ipnet
)
}
else
{
r0
=
ret
.
Error
(
0
)
}
return
r0
}
// InterceptAccept provides a mock function with given fields: _a0
func
(
_m
*
ConnectionGater
)
InterceptAccept
(
_a0
network
.
ConnMultiaddrs
)
bool
{
ret
:=
_m
.
Called
(
_a0
)
var
r0
bool
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
network
.
ConnMultiaddrs
)
bool
);
ok
{
r0
=
rf
(
_a0
)
}
else
{
r0
=
ret
.
Get
(
0
)
.
(
bool
)
}
return
r0
}
// InterceptAddrDial provides a mock function with given fields: _a0, _a1
func
(
_m
*
ConnectionGater
)
InterceptAddrDial
(
_a0
peer
.
ID
,
_a1
multiaddr
.
Multiaddr
)
bool
{
ret
:=
_m
.
Called
(
_a0
,
_a1
)
var
r0
bool
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
peer
.
ID
,
multiaddr
.
Multiaddr
)
bool
);
ok
{
r0
=
rf
(
_a0
,
_a1
)
}
else
{
r0
=
ret
.
Get
(
0
)
.
(
bool
)
}
return
r0
}
// InterceptPeerDial provides a mock function with given fields: p
func
(
_m
*
ConnectionGater
)
InterceptPeerDial
(
p
peer
.
ID
)
bool
{
ret
:=
_m
.
Called
(
p
)
var
r0
bool
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
peer
.
ID
)
bool
);
ok
{
r0
=
rf
(
p
)
}
else
{
r0
=
ret
.
Get
(
0
)
.
(
bool
)
}
return
r0
}
// InterceptSecured provides a mock function with given fields: _a0, _a1, _a2
func
(
_m
*
ConnectionGater
)
InterceptSecured
(
_a0
network
.
Direction
,
_a1
peer
.
ID
,
_a2
network
.
ConnMultiaddrs
)
bool
{
ret
:=
_m
.
Called
(
_a0
,
_a1
,
_a2
)
var
r0
bool
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
network
.
Direction
,
peer
.
ID
,
network
.
ConnMultiaddrs
)
bool
);
ok
{
r0
=
rf
(
_a0
,
_a1
,
_a2
)
}
else
{
r0
=
ret
.
Get
(
0
)
.
(
bool
)
}
return
r0
}
// InterceptUpgraded provides a mock function with given fields: _a0
func
(
_m
*
ConnectionGater
)
InterceptUpgraded
(
_a0
network
.
Conn
)
(
bool
,
control
.
DisconnectReason
)
{
ret
:=
_m
.
Called
(
_a0
)
var
r0
bool
var
r1
control
.
DisconnectReason
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
network
.
Conn
)
(
bool
,
control
.
DisconnectReason
));
ok
{
return
rf
(
_a0
)
}
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
network
.
Conn
)
bool
);
ok
{
r0
=
rf
(
_a0
)
}
else
{
r0
=
ret
.
Get
(
0
)
.
(
bool
)
}
if
rf
,
ok
:=
ret
.
Get
(
1
)
.
(
func
(
network
.
Conn
)
control
.
DisconnectReason
);
ok
{
r1
=
rf
(
_a0
)
}
else
{
r1
=
ret
.
Get
(
1
)
.
(
control
.
DisconnectReason
)
}
return
r0
,
r1
}
// ListBlockedAddrs provides a mock function with given fields:
func
(
_m
*
ConnectionGater
)
ListBlockedAddrs
()
[]
net
.
IP
{
ret
:=
_m
.
Called
()
var
r0
[]
net
.
IP
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
()
[]
net
.
IP
);
ok
{
r0
=
rf
()
}
else
{
if
ret
.
Get
(
0
)
!=
nil
{
r0
=
ret
.
Get
(
0
)
.
([]
net
.
IP
)
}
}
return
r0
}
// ListBlockedPeers provides a mock function with given fields:
func
(
_m
*
ConnectionGater
)
ListBlockedPeers
()
[]
peer
.
ID
{
ret
:=
_m
.
Called
()
var
r0
[]
peer
.
ID
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
()
[]
peer
.
ID
);
ok
{
r0
=
rf
()
}
else
{
if
ret
.
Get
(
0
)
!=
nil
{
r0
=
ret
.
Get
(
0
)
.
([]
peer
.
ID
)
}
}
return
r0
}
// ListBlockedSubnets provides a mock function with given fields:
func
(
_m
*
ConnectionGater
)
ListBlockedSubnets
()
[]
*
net
.
IPNet
{
ret
:=
_m
.
Called
()
var
r0
[]
*
net
.
IPNet
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
()
[]
*
net
.
IPNet
);
ok
{
r0
=
rf
()
}
else
{
if
ret
.
Get
(
0
)
!=
nil
{
r0
=
ret
.
Get
(
0
)
.
([]
*
net
.
IPNet
)
}
}
return
r0
}
// UnblockAddr provides a mock function with given fields: ip
func
(
_m
*
ConnectionGater
)
UnblockAddr
(
ip
net
.
IP
)
error
{
ret
:=
_m
.
Called
(
ip
)
var
r0
error
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
net
.
IP
)
error
);
ok
{
r0
=
rf
(
ip
)
}
else
{
r0
=
ret
.
Error
(
0
)
}
return
r0
}
// UnblockPeer provides a mock function with given fields: p
func
(
_m
*
ConnectionGater
)
UnblockPeer
(
p
peer
.
ID
)
error
{
ret
:=
_m
.
Called
(
p
)
var
r0
error
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
peer
.
ID
)
error
);
ok
{
r0
=
rf
(
p
)
}
else
{
r0
=
ret
.
Error
(
0
)
}
return
r0
}
// UnblockSubnet provides a mock function with given fields: ipnet
func
(
_m
*
ConnectionGater
)
UnblockSubnet
(
ipnet
*
net
.
IPNet
)
error
{
ret
:=
_m
.
Called
(
ipnet
)
var
r0
error
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
*
net
.
IPNet
)
error
);
ok
{
r0
=
rf
(
ipnet
)
}
else
{
r0
=
ret
.
Error
(
0
)
}
return
r0
}
type
mockConstructorTestingTNewConnectionGater
interface
{
mock
.
TestingT
Cleanup
(
func
())
}
// NewConnectionGater creates a new instance of ConnectionGater. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func
NewConnectionGater
(
t
mockConstructorTestingTNewConnectionGater
)
*
ConnectionGater
{
mock
:=
&
ConnectionGater
{}
mock
.
Mock
.
Test
(
t
)
t
.
Cleanup
(
func
()
{
mock
.
AssertExpectations
(
t
)
})
return
mock
}
op-node/p2p/mocks/Peerstore.go
View file @
95978376
// Code generated by mockery v2.2
2.1
. DO NOT EDIT.
// Code generated by mockery v2.2
8.0
. DO NOT EDIT.
package
mocks
package
mocks
...
@@ -45,13 +45,13 @@ func (_m *Peerstore) Peers() peer.IDSlice {
...
@@ -45,13 +45,13 @@ func (_m *Peerstore) Peers() peer.IDSlice {
return
r0
return
r0
}
}
// SetScore provides a mock function with given fields:
_a0, _a1, _a2
// SetScore provides a mock function with given fields:
id, diff
func
(
_m
*
Peerstore
)
SetScore
(
_a0
peer
.
ID
,
_a1
store
.
ScoreType
,
_a2
float64
)
error
{
func
(
_m
*
Peerstore
)
SetScore
(
id
peer
.
ID
,
diff
store
.
ScoreDiff
)
error
{
ret
:=
_m
.
Called
(
_a0
,
_a1
,
_a2
)
ret
:=
_m
.
Called
(
id
,
diff
)
var
r0
error
var
r0
error
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
peer
.
ID
,
store
.
Score
Type
,
float64
)
error
);
ok
{
if
rf
,
ok
:=
ret
.
Get
(
0
)
.
(
func
(
peer
.
ID
,
store
.
Score
Diff
)
error
);
ok
{
r0
=
rf
(
_a0
,
_a1
,
_a2
)
r0
=
rf
(
id
,
diff
)
}
else
{
}
else
{
r0
=
ret
.
Error
(
0
)
r0
=
ret
.
Error
(
0
)
}
}
...
...
op-node/p2p/node.go
View file @
95978376
...
@@ -6,6 +6,14 @@ import (
...
@@ -6,6 +6,14 @@ import (
"fmt"
"fmt"
"strconv"
"strconv"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-multierror"
pubsub
"github.com/libp2p/go-libp2p-pubsub"
pubsub
"github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/connmgr"
...
@@ -13,22 +21,14 @@ import (
...
@@ -13,22 +21,14 @@ import (
p2pmetrics
"github.com/libp2p/go-libp2p/core/metrics"
p2pmetrics
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/network"
ma
"github.com/multiformats/go-multiaddr"
ma
"github.com/multiformats/go-multiaddr"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum-optimism/optimism/op-node/rollup"
)
)
// NodeP2P is a p2p node, which can be used to gossip messages.
// NodeP2P is a p2p node, which can be used to gossip messages.
type
NodeP2P
struct
{
type
NodeP2P
struct
{
host
host
.
Host
// p2p host (optional, may be nil)
host
host
.
Host
// p2p host (optional, may be nil)
gater
ConnectionGater
// p2p gater, to ban/unban peers with, may be nil even with p2p enabled
gater
gating
.
BlockingConnectionGater
// p2p gater, to ban/unban peers with, may be nil even with p2p enabled
connMgr
connmgr
.
ConnManager
// p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
scorer
Scorer
// writes score-updates to the peerstore and keeps metrics of score changes
connMgr
connmgr
.
ConnManager
// p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
// the below components are all optional, and may be nil. They require the host to not be nil.
// the below components are all optional, and may be nil. They require the host to not be nil.
dv5Local
*
enode
.
LocalNode
// p2p discovery identity
dv5Local
*
enode
.
LocalNode
// p2p discovery identity
dv5Udp
*
discover
.
UDPv5
// p2p discovery service
dv5Udp
*
discover
.
UDPv5
// p2p discovery service
...
@@ -63,7 +63,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
...
@@ -63,7 +63,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
var
err
error
var
err
error
// nil if disabled.
// nil if disabled.
n
.
host
,
err
=
setup
.
Host
(
log
,
bwc
)
n
.
host
,
err
=
setup
.
Host
(
log
,
bwc
,
metrics
)
if
err
!=
nil
{
if
err
!=
nil
{
if
n
.
dv5Udp
!=
nil
{
if
n
.
dv5Udp
!=
nil
{
n
.
dv5Udp
.
Close
()
n
.
dv5Udp
.
Close
()
...
@@ -71,6 +71,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
...
@@ -71,6 +71,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
return
fmt
.
Errorf
(
"failed to start p2p host: %w"
,
err
)
return
fmt
.
Errorf
(
"failed to start p2p host: %w"
,
err
)
}
}
// TODO(CLI-4016): host is not optional, NodeP2P as a whole is. This if statement is wrong
if
n
.
host
!=
nil
{
if
n
.
host
!=
nil
{
// Enable extra features, if any. During testing we don't setup the most advanced host all the time.
// Enable extra features, if any. During testing we don't setup the most advanced host all the time.
if
extra
,
ok
:=
n
.
host
.
(
ExtraHostFeatures
);
ok
{
if
extra
,
ok
:=
n
.
host
.
(
ExtraHostFeatures
);
ok
{
...
@@ -100,10 +101,23 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
...
@@ -100,10 +101,23 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n
.
host
.
SetStreamHandler
(
PayloadByNumberProtocolID
(
rollupCfg
.
L2ChainID
),
payloadByNumber
)
n
.
host
.
SetStreamHandler
(
PayloadByNumberProtocolID
(
rollupCfg
.
L2ChainID
),
payloadByNumber
)
}
}
}
}
eps
,
ok
:=
n
.
host
.
Peerstore
()
.
(
store
.
ExtendedPeerstore
)
if
!
ok
{
return
fmt
.
Errorf
(
"cannot init without extended peerstore: %w"
,
err
)
}
n
.
scorer
=
NewScorer
(
rollupCfg
,
eps
,
metrics
,
setup
.
PeerBandScorer
(),
log
)
n
.
host
.
Network
()
.
Notify
(
&
network
.
NotifyBundle
{
ConnectedF
:
func
(
_
network
.
Network
,
conn
network
.
Conn
)
{
n
.
scorer
.
OnConnect
(
conn
.
RemotePeer
())
},
DisconnectedF
:
func
(
_
network
.
Network
,
conn
network
.
Conn
)
{
n
.
scorer
.
OnDisconnect
(
conn
.
RemotePeer
())
},
})
// notify of any new connections/streams/etc.
// notify of any new connections/streams/etc.
n
.
host
.
Network
()
.
Notify
(
NewNetworkNotifier
(
log
,
metrics
))
n
.
host
.
Network
()
.
Notify
(
NewNetworkNotifier
(
log
,
metrics
))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
n
.
gs
,
err
=
NewGossipSub
(
resourcesCtx
,
n
.
host
,
n
.
gater
,
rollupCfg
,
setup
,
metrics
,
log
)
n
.
gs
,
err
=
NewGossipSub
(
resourcesCtx
,
n
.
host
,
rollupCfg
,
setup
,
n
.
scorer
,
metrics
,
log
)
if
err
!=
nil
{
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to start gossipsub router: %w"
,
err
)
return
fmt
.
Errorf
(
"failed to start gossipsub router: %w"
,
err
)
}
}
...
@@ -162,7 +176,7 @@ func (n *NodeP2P) GossipOut() GossipOut {
...
@@ -162,7 +176,7 @@ func (n *NodeP2P) GossipOut() GossipOut {
return
n
.
gsOut
return
n
.
gsOut
}
}
func
(
n
*
NodeP2P
)
ConnectionGater
()
ConnectionGater
{
func
(
n
*
NodeP2P
)
ConnectionGater
()
gating
.
Blocking
ConnectionGater
{
return
n
.
gater
return
n
.
gater
}
}
...
...
op-node/p2p/peer_gater.go
deleted
100644 → 0
View file @
e643a258
package
p2p
import
(
log
"github.com/ethereum/go-ethereum/log"
peer
"github.com/libp2p/go-libp2p/core/peer"
)
// ConnectionFactor is the factor by which we multiply the connection score.
const
ConnectionFactor
=
-
10
// PeerScoreThreshold is the threshold at which we block a peer.
const
PeerScoreThreshold
=
-
100
// gater is an internal implementation of the [PeerGater] interface.
type
gater
struct
{
connGater
ConnectionGater
blockedMap
map
[
peer
.
ID
]
bool
log
log
.
Logger
banEnabled
bool
}
// PeerGater manages the connection gating of peers.
//
//go:generate mockery --name PeerGater --output mocks/
type
PeerGater
interface
{
// Update handles a peer score update and blocks/unblocks the peer if necessary.
Update
(
peer
.
ID
,
float64
)
// IsBlocked returns true if the given [peer.ID] is blocked.
IsBlocked
(
peer
.
ID
)
bool
}
// NewPeerGater returns a new peer gater.
func
NewPeerGater
(
connGater
ConnectionGater
,
log
log
.
Logger
,
banEnabled
bool
)
PeerGater
{
return
&
gater
{
connGater
:
connGater
,
blockedMap
:
make
(
map
[
peer
.
ID
]
bool
),
log
:
log
,
banEnabled
:
banEnabled
,
}
}
// IsBlocked returns true if the given [peer.ID] is blocked.
func
(
s
*
gater
)
IsBlocked
(
peerID
peer
.
ID
)
bool
{
return
s
.
blockedMap
[
peerID
]
}
// setBlocked sets the blocked status of the given [peer.ID].
func
(
s
*
gater
)
setBlocked
(
peerID
peer
.
ID
,
blocked
bool
)
{
s
.
blockedMap
[
peerID
]
=
blocked
}
// Update handles a peer score update and blocks/unblocks the peer if necessary.
func
(
s
*
gater
)
Update
(
id
peer
.
ID
,
score
float64
)
{
// Check if the peer score is below the threshold
// If so, we need to block the peer
isAlreadyBlocked
:=
s
.
IsBlocked
(
id
)
if
score
<
PeerScoreThreshold
&&
s
.
banEnabled
&&
!
isAlreadyBlocked
{
s
.
log
.
Warn
(
"peer blocking enabled, blocking peer"
,
"id"
,
id
.
String
(),
"score"
,
score
)
err
:=
s
.
connGater
.
BlockPeer
(
id
)
if
err
!=
nil
{
s
.
log
.
Warn
(
"connection gater failed to block peer"
,
"id"
,
id
.
String
(),
"err"
,
err
)
}
// Set the peer as blocked in the blocked map
s
.
setBlocked
(
id
,
true
)
}
// Unblock peers whose score has recovered to an acceptable level
if
(
score
>
PeerScoreThreshold
)
&&
isAlreadyBlocked
{
err
:=
s
.
connGater
.
UnblockPeer
(
id
)
if
err
!=
nil
{
s
.
log
.
Warn
(
"connection gater failed to unblock peer"
,
"id"
,
id
.
String
(),
"err"
,
err
)
}
// Set the peer as unblocked in the blocked map
s
.
setBlocked
(
id
,
false
)
}
}
op-node/p2p/peer_gater_test.go
deleted
100644 → 0
View file @
e643a258
package
p2p_test
import
(
"testing"
p2p
"github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks
"github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
testlog
"github.com/ethereum-optimism/optimism/op-node/testlog"
log
"github.com/ethereum/go-ethereum/log"
peer
"github.com/libp2p/go-libp2p/core/peer"
suite
"github.com/stretchr/testify/suite"
)
// PeerGaterTestSuite tests peer parameterization.
type
PeerGaterTestSuite
struct
{
suite
.
Suite
mockGater
*
p2pMocks
.
ConnectionGater
logger
log
.
Logger
}
// SetupTest sets up the test suite.
func
(
testSuite
*
PeerGaterTestSuite
)
SetupTest
()
{
testSuite
.
mockGater
=
&
p2pMocks
.
ConnectionGater
{}
testSuite
.
logger
=
testlog
.
Logger
(
testSuite
.
T
(),
log
.
LvlError
)
}
// TestPeerGater runs the PeerGaterTestSuite.
func
TestPeerGater
(
t
*
testing
.
T
)
{
suite
.
Run
(
t
,
new
(
PeerGaterTestSuite
))
}
// TestPeerScoreConstants validates the peer score constants.
func
(
testSuite
*
PeerGaterTestSuite
)
TestPeerScoreConstants
()
{
testSuite
.
Equal
(
-
10
,
p2p
.
ConnectionFactor
)
testSuite
.
Equal
(
-
100
,
p2p
.
PeerScoreThreshold
)
}
// TestPeerGaterUpdate tests the peer gater update hook.
func
(
testSuite
*
PeerGaterTestSuite
)
TestPeerGater_UpdateBansPeers
()
{
gater
:=
p2p
.
NewPeerGater
(
testSuite
.
mockGater
,
testSuite
.
logger
,
true
,
)
// Return an empty list of already blocked peers
testSuite
.
mockGater
.
On
(
"ListBlockedPeers"
)
.
Return
([]
peer
.
ID
{})
.
Once
()
// Mock a connection gater peer block call
// Since the peer score is below the [PeerScoreThreshold] of -100,
// the [BlockPeer] method should be called
testSuite
.
mockGater
.
On
(
"BlockPeer"
,
peer
.
ID
(
"peer1"
))
.
Return
(
nil
)
.
Once
()
// The peer should initially be unblocked
testSuite
.
False
(
gater
.
IsBlocked
(
peer
.
ID
(
"peer1"
)))
// Apply the peer gater update
gater
.
Update
(
peer
.
ID
(
"peer1"
),
float64
(
-
101
))
// The peer should be considered blocked
testSuite
.
True
(
gater
.
IsBlocked
(
peer
.
ID
(
"peer1"
)))
// Now let's unblock the peer
testSuite
.
mockGater
.
On
(
"UnblockPeer"
,
peer
.
ID
(
"peer1"
))
.
Return
(
nil
)
.
Once
()
gater
.
Update
(
peer
.
ID
(
"peer1"
),
float64
(
0
))
// The peer should be considered unblocked
testSuite
.
False
(
gater
.
IsBlocked
(
peer
.
ID
(
"peer1"
)))
}
// TestPeerGaterUpdateNoBanning tests the peer gater update hook without banning set
func
(
testSuite
*
PeerGaterTestSuite
)
TestPeerGater_UpdateNoBanning
()
{
gater
:=
p2p
.
NewPeerGater
(
testSuite
.
mockGater
,
testSuite
.
logger
,
false
,
)
// Return an empty list of already blocked peers
testSuite
.
mockGater
.
On
(
"ListBlockedPeers"
)
.
Return
([]
peer
.
ID
{})
// Notice: [BlockPeer] should not be called since banning is not enabled
// even though the peer score is way below the [PeerScoreThreshold] of -100
gater
.
Update
(
peer
.
ID
(
"peer1"
),
float64
(
-
100000
))
// The peer should be unblocked
testSuite
.
False
(
gater
.
IsBlocked
(
peer
.
ID
(
"peer1"
)))
// Make sure that if we then "unblock" the peer, nothing happens
gater
.
Update
(
peer
.
ID
(
"peer1"
),
float64
(
0
))
// The peer should still be unblocked
testSuite
.
False
(
gater
.
IsBlocked
(
peer
.
ID
(
"peer1"
)))
}
op-node/p2p/peer_scorer.go
View file @
95978376
...
@@ -5,6 +5,9 @@ import (
...
@@ -5,6 +5,9 @@ import (
"sort"
"sort"
"strconv"
"strconv"
"strings"
"strings"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
log
"github.com/ethereum/go-ethereum/log"
log
"github.com/ethereum/go-ethereum/log"
...
@@ -14,10 +17,10 @@ import (
...
@@ -14,10 +17,10 @@ import (
type
scorer
struct
{
type
scorer
struct
{
peerStore
Peerstore
peerStore
Peerstore
metricer
GossipMetricer
metricer
ScoreMetrics
log
log
.
Logger
log
log
.
Logger
gater
PeerGater
bandScoreThresholds
*
BandScoreThresholds
bandScoreThresholds
*
BandScoreThresholds
cfg
*
rollup
.
Config
}
}
// scorePair holds a band and its corresponding threshold.
// scorePair holds a band and its corresponding threshold.
...
@@ -93,31 +96,37 @@ type Peerstore interface {
...
@@ -93,31 +96,37 @@ type Peerstore interface {
// Peers returns all of the peer IDs stored across all inner stores.
// Peers returns all of the peer IDs stored across all inner stores.
Peers
()
peer
.
IDSlice
Peers
()
peer
.
IDSlice
SetScore
(
peer
.
ID
,
store
.
ScoreType
,
float64
)
error
SetScore
(
id
peer
.
ID
,
diff
store
.
ScoreDiff
)
error
}
}
// Scorer is a peer scorer that scores peers based on application-specific metrics.
// Scorer is a peer scorer that scores peers based on application-specific metrics.
type
Scorer
interface
{
type
Scorer
interface
{
OnConnect
()
OnConnect
(
id
peer
.
ID
)
OnDisconnect
()
OnDisconnect
(
id
peer
.
ID
)
SnapshotHook
()
pubsub
.
ExtendedPeerScoreInspectFn
SnapshotHook
()
pubsub
.
ExtendedPeerScoreInspectFn
}
}
type
ScoreMetrics
interface
{
SetPeerScores
(
map
[
string
]
float64
)
}
// NewScorer returns a new peer scorer.
// NewScorer returns a new peer scorer.
func
NewScorer
(
peerGater
PeerGater
,
peerStore
Peerstore
,
metricer
GossipMetricer
,
bandScoreThresholds
*
BandScoreThresholds
,
log
log
.
Logger
)
Scorer
{
func
NewScorer
(
cfg
*
rollup
.
Config
,
peerStore
Peerstore
,
metricer
ScoreMetrics
,
bandScoreThresholds
*
BandScoreThresholds
,
log
log
.
Logger
)
Scorer
{
return
&
scorer
{
return
&
scorer
{
peerStore
:
peerStore
,
peerStore
:
peerStore
,
metricer
:
metricer
,
metricer
:
metricer
,
log
:
log
,
log
:
log
,
gater
:
peerGater
,
bandScoreThresholds
:
bandScoreThresholds
,
bandScoreThresholds
:
bandScoreThresholds
,
cfg
:
cfg
,
}
}
}
}
// SnapshotHook returns a function that is called periodically by the pubsub library to inspect the peer scores.
// SnapshotHook returns a function that is called periodically by the pubsub library to inspect the
gossip
peer scores.
// It is passed into the pubsub library as a [pubsub.ExtendedPeerScoreInspectFn] in the [pubsub.WithPeerScoreInspect] option.
// It is passed into the pubsub library as a [pubsub.ExtendedPeerScoreInspectFn] in the [pubsub.WithPeerScoreInspect] option.
// The returned [pubsub.ExtendedPeerScoreInspectFn] is called with a mapping of peer IDs to peer score snapshots.
// The returned [pubsub.ExtendedPeerScoreInspectFn] is called with a mapping of peer IDs to peer score snapshots.
// The incoming peer score snapshots only contain gossip-score components.
func
(
s
*
scorer
)
SnapshotHook
()
pubsub
.
ExtendedPeerScoreInspectFn
{
func
(
s
*
scorer
)
SnapshotHook
()
pubsub
.
ExtendedPeerScoreInspectFn
{
blocksTopicName
:=
blocksTopicV1
(
s
.
cfg
)
return
func
(
m
map
[
peer
.
ID
]
*
pubsub
.
PeerScoreSnapshot
)
{
return
func
(
m
map
[
peer
.
ID
]
*
pubsub
.
PeerScoreSnapshot
)
{
scoreMap
:=
make
(
map
[
string
]
float64
)
scoreMap
:=
make
(
map
[
string
]
float64
)
// Zero out all bands.
// Zero out all bands.
...
@@ -126,28 +135,36 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
...
@@ -126,28 +135,36 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
}
}
// Now set the new scores.
// Now set the new scores.
for
id
,
snap
:=
range
m
{
for
id
,
snap
:=
range
m
{
scores
:=
make
(
map
[
store
.
ScoreType
]
float64
)
diff
:=
store
.
GossipScores
{
scores
[
store
.
TypeGossip
]
=
snap
.
Score
Total
:
snap
.
Score
,
Blocks
:
store
.
TopicScores
{},
if
err
:=
s
.
peerStore
.
SetScore
(
id
,
store
.
TypeGossip
,
snap
.
Score
);
err
!=
nil
{
IPColocationFactor
:
snap
.
IPColocationFactor
,
BehavioralPenalty
:
snap
.
BehaviourPenalty
,
}
if
topSnap
,
ok
:=
snap
.
Topics
[
blocksTopicName
];
ok
{
diff
.
Blocks
.
TimeInMesh
=
float64
(
topSnap
.
TimeInMesh
)
/
float64
(
time
.
Second
)
diff
.
Blocks
.
MeshMessageDeliveries
=
uint64
(
topSnap
.
MeshMessageDeliveries
)
diff
.
Blocks
.
FirstMessageDeliveries
=
uint64
(
topSnap
.
FirstMessageDeliveries
)
diff
.
Blocks
.
InvalidMessageDeliveries
=
uint64
(
topSnap
.
InvalidMessageDeliveries
)
}
if
err
:=
s
.
peerStore
.
SetScore
(
id
,
&
diff
);
err
!=
nil
{
s
.
log
.
Warn
(
"Unable to update peer gossip score"
,
"err"
,
err
)
s
.
log
.
Warn
(
"Unable to update peer gossip score"
,
"err"
,
err
)
}
}
}
for
_
,
snap
:=
range
m
{
band
:=
s
.
bandScoreThresholds
.
Bucket
(
snap
.
Score
)
band
:=
s
.
bandScoreThresholds
.
Bucket
(
snap
.
Score
)
scoreMap
[
band
]
+=
1
scoreMap
[
band
]
+=
1
s
.
gater
.
Update
(
id
,
snap
.
Score
)
}
}
s
.
metricer
.
SetPeerScores
(
scoreMap
)
s
.
metricer
.
SetPeerScores
(
scoreMap
)
}
}
}
}
// OnConnect is called when a peer connects.
// OnConnect is called when a peer connects.
// See [p2p.NotificationsMetricer] for invocation.
func
(
s
*
scorer
)
OnConnect
(
id
peer
.
ID
)
{
func
(
s
*
scorer
)
OnConnect
()
{
// TODO(CLI-4003): apply decay to scores, based on last connection time
// no-op
}
}
// OnDisconnect is called when a peer disconnects.
// OnDisconnect is called when a peer disconnects.
// See [p2p.NotificationsMetricer] for invocation.
func
(
s
*
scorer
)
OnDisconnect
(
id
peer
.
ID
)
{
func
(
s
*
scorer
)
OnDisconnect
()
{
// TODO(CLI-4003): persist disconnect-time
// no-op
}
}
op-node/p2p/peer_scorer_test.go
View file @
95978376
package
p2p_test
package
p2p_test
import
(
import
(
"math/big"
"testing"
"testing"
pubsub
"github.com/libp2p/go-libp2p-pubsub"
peer
"github.com/libp2p/go-libp2p/core/peer"
suite
"github.com/stretchr/testify/suite"
log
"github.com/ethereum/go-ethereum/log"
p2p
"github.com/ethereum-optimism/optimism/op-node/p2p"
p2p
"github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks
"github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
p2pMocks
"github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-node/testlog"
log
"github.com/ethereum/go-ethereum/log"
pubsub
"github.com/libp2p/go-libp2p-pubsub"
peer
"github.com/libp2p/go-libp2p/core/peer"
suite
"github.com/stretchr/testify/suite"
)
)
// PeerScorerTestSuite tests peer parameterization.
// PeerScorerTestSuite tests peer parameterization.
type
PeerScorerTestSuite
struct
{
type
PeerScorerTestSuite
struct
{
suite
.
Suite
suite
.
Suite
// mockConnGater *p2pMocks.ConnectionGater
mockGater
*
p2pMocks
.
PeerGater
mockStore
*
p2pMocks
.
Peerstore
mockStore
*
p2pMocks
.
Peerstore
mockMetricer
*
p2pMocks
.
GossipMetricer
mockMetricer
*
p2pMocks
.
GossipMetricer
bandScorer
*
p2p
.
BandScoreThresholds
bandScorer
*
p2p
.
BandScoreThresholds
...
@@ -27,7 +29,6 @@ type PeerScorerTestSuite struct {
...
@@ -27,7 +29,6 @@ type PeerScorerTestSuite struct {
// SetupTest sets up the test suite.
// SetupTest sets up the test suite.
func
(
testSuite
*
PeerScorerTestSuite
)
SetupTest
()
{
func
(
testSuite
*
PeerScorerTestSuite
)
SetupTest
()
{
testSuite
.
mockGater
=
&
p2pMocks
.
PeerGater
{}
testSuite
.
mockStore
=
&
p2pMocks
.
Peerstore
{}
testSuite
.
mockStore
=
&
p2pMocks
.
Peerstore
{}
testSuite
.
mockMetricer
=
&
p2pMocks
.
GossipMetricer
{}
testSuite
.
mockMetricer
=
&
p2pMocks
.
GossipMetricer
{}
bandScorer
,
err
:=
p2p
.
NewBandScorer
(
"-40:graylist;0:friend;"
)
bandScorer
,
err
:=
p2p
.
NewBandScorer
(
"-40:graylist;0:friend;"
)
...
@@ -44,31 +45,31 @@ func TestPeerScorer(t *testing.T) {
...
@@ -44,31 +45,31 @@ func TestPeerScorer(t *testing.T) {
// TestScorer_OnConnect ensures we can call the OnConnect method on the peer scorer.
// TestScorer_OnConnect ensures we can call the OnConnect method on the peer scorer.
func
(
testSuite
*
PeerScorerTestSuite
)
TestScorer_OnConnect
()
{
func
(
testSuite
*
PeerScorerTestSuite
)
TestScorer_OnConnect
()
{
scorer
:=
p2p
.
NewScorer
(
scorer
:=
p2p
.
NewScorer
(
testSuite
.
mockGater
,
&
rollup
.
Config
{
L2ChainID
:
big
.
NewInt
(
123
)}
,
testSuite
.
mockStore
,
testSuite
.
mockStore
,
testSuite
.
mockMetricer
,
testSuite
.
mockMetricer
,
testSuite
.
bandScorer
,
testSuite
.
bandScorer
,
testSuite
.
logger
,
testSuite
.
logger
,
)
)
scorer
.
OnConnect
()
scorer
.
OnConnect
(
peer
.
ID
(
"alice"
)
)
}
}
// TestScorer_OnDisconnect ensures we can call the OnDisconnect method on the peer scorer.
// TestScorer_OnDisconnect ensures we can call the OnDisconnect method on the peer scorer.
func
(
testSuite
*
PeerScorerTestSuite
)
TestScorer_OnDisconnect
()
{
func
(
testSuite
*
PeerScorerTestSuite
)
TestScorer_OnDisconnect
()
{
scorer
:=
p2p
.
NewScorer
(
scorer
:=
p2p
.
NewScorer
(
testSuite
.
mockGater
,
&
rollup
.
Config
{
L2ChainID
:
big
.
NewInt
(
123
)}
,
testSuite
.
mockStore
,
testSuite
.
mockStore
,
testSuite
.
mockMetricer
,
testSuite
.
mockMetricer
,
testSuite
.
bandScorer
,
testSuite
.
bandScorer
,
testSuite
.
logger
,
testSuite
.
logger
,
)
)
scorer
.
OnDisconnect
()
scorer
.
OnDisconnect
(
peer
.
ID
(
"alice"
)
)
}
}
// TestScorer_SnapshotHook tests running the snapshot hook on the peer scorer.
// TestScorer_SnapshotHook tests running the snapshot hook on the peer scorer.
func
(
testSuite
*
PeerScorerTestSuite
)
TestScorer_SnapshotHook
()
{
func
(
testSuite
*
PeerScorerTestSuite
)
TestScorer_SnapshotHook
()
{
scorer
:=
p2p
.
NewScorer
(
scorer
:=
p2p
.
NewScorer
(
testSuite
.
mockGater
,
&
rollup
.
Config
{
L2ChainID
:
big
.
NewInt
(
123
)}
,
testSuite
.
mockStore
,
testSuite
.
mockStore
,
testSuite
.
mockMetricer
,
testSuite
.
mockMetricer
,
testSuite
.
bandScorer
,
testSuite
.
bandScorer
,
...
@@ -76,11 +77,8 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
...
@@ -76,11 +77,8 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
)
)
inspectFn
:=
scorer
.
SnapshotHook
()
inspectFn
:=
scorer
.
SnapshotHook
()
// Mock the peer gater call
testSuite
.
mockGater
.
On
(
"Update"
,
peer
.
ID
(
"peer1"
),
float64
(
-
100
))
.
Return
(
nil
)
.
Once
()
// Expect updating the peer store
// Expect updating the peer store
testSuite
.
mockStore
.
On
(
"SetScore"
,
peer
.
ID
(
"peer1"
),
store
.
TypeGossip
,
float64
(
-
100
)
)
.
Return
(
nil
)
.
Once
()
testSuite
.
mockStore
.
On
(
"SetScore"
,
peer
.
ID
(
"peer1"
),
&
store
.
GossipScores
{
Total
:
float64
(
-
100
)}
)
.
Return
(
nil
)
.
Once
()
// The metricer should then be called with the peer score band map
// The metricer should then be called with the peer score band map
testSuite
.
mockMetricer
.
On
(
"SetPeerScores"
,
map
[
string
]
float64
{
testSuite
.
mockMetricer
.
On
(
"SetPeerScores"
,
map
[
string
]
float64
{
...
@@ -96,10 +94,8 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
...
@@ -96,10 +94,8 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
}
}
inspectFn
(
snapshotMap
)
inspectFn
(
snapshotMap
)
// Change the peer score now to a different band
testSuite
.
mockGater
.
On
(
"Update"
,
peer
.
ID
(
"peer1"
),
float64
(
0
))
.
Return
(
nil
)
.
Once
()
// Expect updating the peer store
// Expect updating the peer store
testSuite
.
mockStore
.
On
(
"SetScore"
,
peer
.
ID
(
"peer1"
),
store
.
TypeGossip
,
float64
(
0
)
)
.
Return
(
nil
)
.
Once
()
testSuite
.
mockStore
.
On
(
"SetScore"
,
peer
.
ID
(
"peer1"
),
&
store
.
GossipScores
{
Total
:
0
}
)
.
Return
(
nil
)
.
Once
()
// The metricer should then be called with the peer score band map
// The metricer should then be called with the peer score band map
testSuite
.
mockMetricer
.
On
(
"SetPeerScores"
,
map
[
string
]
float64
{
testSuite
.
mockMetricer
.
On
(
"SetPeerScores"
,
map
[
string
]
float64
{
...
@@ -120,7 +116,7 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
...
@@ -120,7 +116,7 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
// This implies that the peer should be blocked.
// This implies that the peer should be blocked.
func
(
testSuite
*
PeerScorerTestSuite
)
TestScorer_SnapshotHookBlocksPeer
()
{
func
(
testSuite
*
PeerScorerTestSuite
)
TestScorer_SnapshotHookBlocksPeer
()
{
scorer
:=
p2p
.
NewScorer
(
scorer
:=
p2p
.
NewScorer
(
testSuite
.
mockGater
,
&
rollup
.
Config
{
L2ChainID
:
big
.
NewInt
(
123
)}
,
testSuite
.
mockStore
,
testSuite
.
mockStore
,
testSuite
.
mockMetricer
,
testSuite
.
mockMetricer
,
testSuite
.
bandScorer
,
testSuite
.
bandScorer
,
...
@@ -128,10 +124,8 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHookBlocksPeer() {
...
@@ -128,10 +124,8 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHookBlocksPeer() {
)
)
inspectFn
:=
scorer
.
SnapshotHook
()
inspectFn
:=
scorer
.
SnapshotHook
()
// Mock the peer gater call
testSuite
.
mockGater
.
On
(
"Update"
,
peer
.
ID
(
"peer1"
),
float64
(
-
101
))
.
Return
(
nil
)
// Expect updating the peer store
// Expect updating the peer store
testSuite
.
mockStore
.
On
(
"SetScore"
,
peer
.
ID
(
"peer1"
),
store
.
TypeGossip
,
float64
(
-
101
)
)
.
Return
(
nil
)
.
Once
()
testSuite
.
mockStore
.
On
(
"SetScore"
,
peer
.
ID
(
"peer1"
),
&
store
.
GossipScores
{
Total
:
float64
(
-
101
)}
)
.
Return
(
nil
)
.
Once
()
// The metricer should then be called with the peer score band map
// The metricer should then be called with the peer score band map
testSuite
.
mockMetricer
.
On
(
"SetPeerScores"
,
map
[
string
]
float64
{
testSuite
.
mockMetricer
.
On
(
"SetPeerScores"
,
map
[
string
]
float64
{
...
...
op-node/p2p/peer_scores.go
View file @
95978376
package
p2p
package
p2p
import
(
import
(
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
log
"github.com/ethereum/go-ethereum/log"
log
"github.com/ethereum/go-ethereum/log"
pubsub
"github.com/libp2p/go-libp2p-pubsub"
pubsub
"github.com/libp2p/go-libp2p-pubsub"
host
"github.com/libp2p/go-libp2p/core/host"
)
)
// ConfigurePeerScoring configures the peer scoring parameters for the pubsub
// ConfigurePeerScoring configures the peer scoring parameters for the pubsub
func
ConfigurePeerScoring
(
h
host
.
Host
,
g
ConnectionGater
,
gossipConf
GossipSetupConfigurables
,
m
GossipMetric
er
,
log
log
.
Logger
)
[]
pubsub
.
Option
{
func
ConfigurePeerScoring
(
gossipConf
GossipSetupConfigurables
,
scorer
Scor
er
,
log
log
.
Logger
)
[]
pubsub
.
Option
{
// If we want to completely disable scoring config here, we can use the [peerScoringParams]
// If we want to completely disable scoring config here, we can use the [peerScoringParams]
// to return early without returning any [pubsub.Option].
// to return early without returning any [pubsub.Option].
peerScoreParams
:=
gossipConf
.
PeerScoringParams
()
peerScoreParams
:=
gossipConf
.
PeerScoringParams
()
peerScoreThresholds
:=
NewPeerScoreThresholds
()
peerScoreThresholds
:=
NewPeerScoreThresholds
()
banEnabled
:=
gossipConf
.
BanPeers
()
peerGater
:=
NewPeerGater
(
g
,
log
,
banEnabled
)
opts
:=
[]
pubsub
.
Option
{}
opts
:=
[]
pubsub
.
Option
{}
eps
,
ok
:=
h
.
Peerstore
()
.
(
store
.
ExtendedPeerstore
)
if
!
ok
{
log
.
Warn
(
"Disabling peer scoring. Peerstore does not support peer scores"
)
return
opts
}
scorer
:=
NewScorer
(
peerGater
,
eps
,
m
,
gossipConf
.
PeerBandScorer
(),
log
)
// Check the app specific score since libp2p doesn't export it's [validate] function :/
// Check the app specific score since libp2p doesn't export it's [validate] function :/
if
peerScoreParams
!=
nil
&&
peerScoreParams
.
AppSpecificScore
!=
nil
{
if
peerScoreParams
!=
nil
&&
peerScoreParams
.
AppSpecificScore
!=
nil
{
opts
=
[]
pubsub
.
Option
{
opts
=
[]
pubsub
.
Option
{
...
...
op-node/p2p/peer_scores_test.go
View file @
95978376
package
p2p
_test
package
p2p
import
(
import
(
"context"
"context"
"fmt"
"fmt"
"math/big"
"math/rand"
"math/rand"
"testing"
"testing"
"time"
"time"
"github.com/ethereum-optimism/optimism/op-
service/clock
"
"github.com/ethereum-optimism/optimism/op-
node/rollup
"
p2p
"github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks
"github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
p2pMocks
"github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
testlog
"github.com/ethereum-optimism/optimism/op-node/testlog"
testlog
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
log
"github.com/ethereum/go-ethereum/log"
ds
"github.com/ipfs/go-datastore"
ds
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
suite
"github.com/stretchr/testify/suite"
log
"github.com/ethereum/go-ethereum/log"
pubsub
"github.com/libp2p/go-libp2p-pubsub"
pubsub
"github.com/libp2p/go-libp2p-pubsub"
host
"github.com/libp2p/go-libp2p/core/host"
host
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
peer
"github.com/libp2p/go-libp2p/core/peer"
peer
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
bhost
"github.com/libp2p/go-libp2p/p2p/host/blank"
bhost
"github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds"
tswarm
"github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
tswarm
"github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
)
// PeerScoresTestSuite tests peer parameterization.
// PeerScoresTestSuite tests peer parameterization.
type
PeerScoresTestSuite
struct
{
type
PeerScoresTestSuite
struct
{
suite
.
Suite
suite
.
Suite
mockGater
*
p2pMocks
.
ConnectionGater
mockStore
*
p2pMocks
.
Peerstore
mockStore
*
p2pMocks
.
Peerstore
mockMetricer
*
p2pMocks
.
GossipMetricer
mockMetricer
*
p2pMocks
.
GossipMetricer
bandScorer
p2p
.
BandScoreThresholds
bandScorer
BandScoreThresholds
logger
log
.
Logger
logger
log
.
Logger
}
}
// SetupTest sets up the test suite.
// SetupTest sets up the test suite.
func
(
testSuite
*
PeerScoresTestSuite
)
SetupTest
()
{
func
(
testSuite
*
PeerScoresTestSuite
)
SetupTest
()
{
testSuite
.
mockGater
=
&
p2pMocks
.
ConnectionGater
{}
testSuite
.
mockStore
=
&
p2pMocks
.
Peerstore
{}
testSuite
.
mockStore
=
&
p2pMocks
.
Peerstore
{}
testSuite
.
mockMetricer
=
&
p2pMocks
.
GossipMetricer
{}
testSuite
.
mockMetricer
=
&
p2pMocks
.
GossipMetricer
{}
bandScorer
,
err
:=
p2p
.
NewBandScorer
(
"0:graylist;"
)
bandScorer
,
err
:=
NewBandScorer
(
"0:graylist;"
)
testSuite
.
NoError
(
err
)
testSuite
.
NoError
(
err
)
testSuite
.
bandScorer
=
*
bandScorer
testSuite
.
bandScorer
=
*
bandScorer
testSuite
.
logger
=
testlog
.
Logger
(
testSuite
.
T
(),
log
.
LvlError
)
testSuite
.
logger
=
testlog
.
Logger
(
testSuite
.
T
(),
log
.
LvlError
)
...
@@ -96,7 +95,17 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
...
@@ -96,7 +95,17 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
for
_
,
h
:=
range
hosts
{
for
_
,
h
:=
range
hosts
{
rt
:=
pubsub
.
DefaultGossipSubRouter
(
h
)
rt
:=
pubsub
.
DefaultGossipSubRouter
(
h
)
opts
:=
[]
pubsub
.
Option
{}
opts
:=
[]
pubsub
.
Option
{}
opts
=
append
(
opts
,
p2p
.
ConfigurePeerScoring
(
h
,
testSuite
.
mockGater
,
&
p2p
.
Config
{
dataStore
:=
sync
.
MutexWrap
(
ds
.
NewMapDatastore
())
peerStore
,
err
:=
pstoreds
.
NewPeerstore
(
context
.
Background
(),
dataStore
,
pstoreds
.
DefaultOpts
())
require
.
NoError
(
testSuite
.
T
(),
err
)
extPeerStore
,
err
:=
store
.
NewExtendedPeerstore
(
context
.
Background
(),
logger
,
clock
.
SystemClock
,
peerStore
,
dataStore
)
require
.
NoError
(
testSuite
.
T
(),
err
)
scorer
:=
NewScorer
(
&
rollup
.
Config
{
L2ChainID
:
big
.
NewInt
(
123
)},
extPeerStore
,
testSuite
.
mockMetricer
,
&
testSuite
.
bandScorer
,
logger
)
opts
=
append
(
opts
,
ConfigurePeerScoring
(
&
Config
{
BandScoreThresholds
:
testSuite
.
bandScorer
,
BandScoreThresholds
:
testSuite
.
bandScorer
,
PeerScoring
:
pubsub
.
PeerScoreParams
{
PeerScoring
:
pubsub
.
PeerScoreParams
{
AppSpecificScore
:
func
(
p
peer
.
ID
)
float64
{
AppSpecificScore
:
func
(
p
peer
.
ID
)
float64
{
...
@@ -110,7 +119,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
...
@@ -110,7 +119,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
DecayInterval
:
time
.
Second
,
DecayInterval
:
time
.
Second
,
DecayToZero
:
0.01
,
DecayToZero
:
0.01
,
},
},
},
testSuite
.
mockMetric
er
,
logger
)
...
)
},
scor
er
,
logger
)
...
)
ps
,
err
:=
pubsub
.
NewGossipSubWithRouter
(
ctx
,
h
,
rt
,
opts
...
)
ps
,
err
:=
pubsub
.
NewGossipSubWithRouter
(
ctx
,
h
,
rt
,
opts
...
)
if
err
!=
nil
{
if
err
!=
nil
{
panic
(
err
)
panic
(
err
)
...
@@ -150,8 +159,6 @@ func (testSuite *PeerScoresTestSuite) TestNegativeScores() {
...
@@ -150,8 +159,6 @@ func (testSuite *PeerScoresTestSuite) TestNegativeScores() {
testSuite
.
mockMetricer
.
On
(
"SetPeerScores"
,
mock
.
Anything
)
.
Return
(
nil
)
testSuite
.
mockMetricer
.
On
(
"SetPeerScores"
,
mock
.
Anything
)
.
Return
(
nil
)
testSuite
.
mockGater
.
On
(
"ListBlockedPeers"
)
.
Return
([]
peer
.
ID
{})
// Construct 20 hosts using the [getNetHosts] function.
// Construct 20 hosts using the [getNetHosts] function.
hosts
:=
getNetHosts
(
testSuite
,
ctx
,
20
)
hosts
:=
getNetHosts
(
testSuite
,
ctx
,
20
)
testSuite
.
Equal
(
20
,
len
(
hosts
))
testSuite
.
Equal
(
20
,
len
(
hosts
))
...
...
op-node/p2p/prepared.go
View file @
95978376
...
@@ -43,7 +43,7 @@ func (p *Prepared) Check() error {
...
@@ -43,7 +43,7 @@ func (p *Prepared) Check() error {
}
}
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
func
(
p
*
Prepared
)
Host
(
log
log
.
Logger
,
reporter
metrics
.
Reporter
)
(
host
.
Host
,
error
)
{
func
(
p
*
Prepared
)
Host
(
log
log
.
Logger
,
reporter
metrics
.
Reporter
,
metrics
HostMetrics
)
(
host
.
Host
,
error
)
{
return
p
.
HostP2P
,
nil
return
p
.
HostP2P
,
nil
}
}
...
...
op-node/p2p/rpc_server.go
View file @
95978376
...
@@ -7,6 +7,8 @@ import (
...
@@ -7,6 +7,8 @@ import (
"net"
"net"
"time"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
decredSecp
"github.com/decred/dcrd/dcrec/secp256k1/v4"
decredSecp
"github.com/decred/dcrd/dcrec/secp256k1/v4"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
pubsub
"github.com/libp2p/go-libp2p-pubsub"
pubsub
"github.com/libp2p/go-libp2p-pubsub"
...
@@ -48,7 +50,7 @@ type Node interface {
...
@@ -48,7 +50,7 @@ type Node interface {
// GossipOut returns the gossip output/info control
// GossipOut returns the gossip output/info control
GossipOut
()
GossipOut
GossipOut
()
GossipOut
// ConnectionGater returns the connection gater, to ban/unban peers with, may be nil
// ConnectionGater returns the connection gater, to ban/unban peers with, may be nil
ConnectionGater
()
ConnectionGater
ConnectionGater
()
gating
.
Blocking
ConnectionGater
// ConnectionManager returns the connection manager, to protect peers with, may be nil
// ConnectionManager returns the connection manager, to protect peers with, may be nil
ConnectionManager
()
connmgr
.
ConnManager
ConnectionManager
()
connmgr
.
ConnManager
}
}
...
...
op-node/p2p/store/iface.go
View file @
95978376
...
@@ -5,23 +5,43 @@ import (
...
@@ -5,23 +5,43 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/peerstore"
)
)
type
PeerScores
struct
{
type
TopicScores
struct
{
Gossip
float64
`json:"gossip"`
TimeInMesh
float64
`json:"timeInMesh"`
// in seconds
FirstMessageDeliveries
uint64
`json:"firstMessageDeliveries"`
MeshMessageDeliveries
uint64
`json:"meshMessageDeliveries"`
InvalidMessageDeliveries
uint64
`json:"invalidMessageDeliveries"`
}
}
type
ScoreType
int
type
GossipScores
struct
{
Total
float64
`json:"total"`
Blocks
TopicScores
`json:"blocks"`
// fully zeroed if the peer has not been in the mesh on the topic
IPColocationFactor
float64
`json:"IPColocationFactor"`
BehavioralPenalty
float64
`json:"behavioralPenalty"`
}
const
(
func
(
g
GossipScores
)
Apply
(
rec
*
scoreRecord
)
{
TypeGossip
ScoreType
=
iota
rec
.
PeerScores
.
Gossip
=
g
)
}
type
PeerScores
struct
{
Gossip
GossipScores
`json:"gossip"`
ReqRespSync
float64
`json:"reqRespSync"`
}
// ScoreDatastore defines a type-safe API for getting and setting libp2p peer score information
// ScoreDatastore defines a type-safe API for getting and setting libp2p peer score information
type
ScoreDatastore
interface
{
type
ScoreDatastore
interface
{
// GetPeerScores returns the current scores for the specified peer
// GetPeerScores returns the current scores for the specified peer
GetPeerScores
(
id
peer
.
ID
)
(
PeerScores
,
error
)
GetPeerScores
(
id
peer
.
ID
)
(
PeerScores
,
error
)
// SetScore stores the latest score for the specified peer and score type
// SetScore applies the given store diff to the specified peer
SetScore
(
id
peer
.
ID
,
scoreType
ScoreType
,
score
float64
)
error
SetScore
(
id
peer
.
ID
,
diff
ScoreDiff
)
error
}
// ScoreDiff defines a type-safe batch of changes to apply to the peer-scoring record of the peer.
// The scoreRecord the diff is applied to is private: diffs can only be defined in this package,
// to ensure changes to the record are non-breaking.
type
ScoreDiff
interface
{
Apply
(
score
*
scoreRecord
)
}
}
// ExtendedPeerstore defines a type-safe API to work with additional peer metadata based on a libp2p peerstore.Peerstore
// ExtendedPeerstore defines a type-safe API to work with additional peer metadata based on a libp2p peerstore.Peerstore
...
...
op-node/p2p/store/scorebook.go
View file @
95978376
...
@@ -26,8 +26,8 @@ const (
...
@@ -26,8 +26,8 @@ const (
var
scoresBase
=
ds
.
NewKey
(
"/peers/scores"
)
var
scoresBase
=
ds
.
NewKey
(
"/peers/scores"
)
type
scoreRecord
struct
{
type
scoreRecord
struct
{
PeerScores
PeerScores
PeerScores
`json:"peerScores"`
lastUpdate
time
.
Time
LastUpdate
int64
`json:"lastUpdate"`
// unix timestamp in seconds
}
}
type
scoreBook
struct
{
type
scoreBook
struct
{
...
@@ -91,21 +91,15 @@ func (d *scoreBook) getRecord(id peer.ID) (scoreRecord, error) {
...
@@ -91,21 +91,15 @@ func (d *scoreBook) getRecord(id peer.ID) (scoreRecord, error) {
return
record
,
nil
return
record
,
nil
}
}
func
(
d
*
scoreBook
)
SetScore
(
id
peer
.
ID
,
scoreType
ScoreType
,
score
float64
)
error
{
func
(
d
*
scoreBook
)
SetScore
(
id
peer
.
ID
,
diff
ScoreDiff
)
error
{
d
.
Lock
()
d
.
Lock
()
defer
d
.
Unlock
()
defer
d
.
Unlock
()
scores
,
err
:=
d
.
getRecord
(
id
)
scores
,
err
:=
d
.
getRecord
(
id
)
if
err
!=
nil
{
if
err
!=
nil
{
return
err
return
err
}
}
scores
.
lastUpdate
=
d
.
clock
.
Now
()
scores
.
LastUpdate
=
d
.
clock
.
Now
()
.
Unix
()
scores
.
Gossip
=
score
diff
.
Apply
(
&
scores
)
switch
scoreType
{
case
TypeGossip
:
scores
.
Gossip
=
score
default
:
return
fmt
.
Errorf
(
"unknown score type: %v"
,
scoreType
)
}
data
,
err
:=
serializeScoresV0
(
scores
)
data
,
err
:=
serializeScoresV0
(
scores
)
if
err
!=
nil
{
if
err
!=
nil
{
return
fmt
.
Errorf
(
"encode scores for peer %v: %w"
,
id
,
err
)
return
fmt
.
Errorf
(
"encode scores for peer %v: %w"
,
id
,
err
)
...
@@ -145,7 +139,7 @@ func (d *scoreBook) prune() error {
...
@@ -145,7 +139,7 @@ func (d *scoreBook) prune() error {
if
err
!=
nil
{
if
err
!=
nil
{
return
err
return
err
}
}
if
record
.
lastUpdate
.
Add
(
expiryPeriod
)
.
Before
(
d
.
clock
.
Now
())
{
if
time
.
Unix
(
record
.
LastUpdate
,
0
)
.
Add
(
expiryPeriod
)
.
Before
(
d
.
clock
.
Now
())
{
if
pending
>
maxPruneBatchSize
{
if
pending
>
maxPruneBatchSize
{
if
err
:=
batch
.
Commit
(
d
.
ctx
);
err
!=
nil
{
if
err
:=
batch
.
Commit
(
d
.
ctx
);
err
!=
nil
{
return
err
return
err
...
...
op-node/p2p/store/scorebook_test.go
View file @
95978376
...
@@ -26,20 +26,20 @@ func TestRoundTripGossipScore(t *testing.T) {
...
@@ -26,20 +26,20 @@ func TestRoundTripGossipScore(t *testing.T) {
id
:=
peer
.
ID
(
"aaaa"
)
id
:=
peer
.
ID
(
"aaaa"
)
store
:=
createMemoryStore
(
t
)
store
:=
createMemoryStore
(
t
)
score
:=
123.45
score
:=
123.45
err
:=
store
.
SetScore
(
id
,
TypeGossip
,
score
)
err
:=
store
.
SetScore
(
id
,
&
GossipScores
{
Total
:
score
}
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
assertPeerScores
(
t
,
store
,
id
,
PeerScores
{
Gossip
:
score
})
assertPeerScores
(
t
,
store
,
id
,
PeerScores
{
Gossip
:
GossipScores
{
Total
:
score
}
})
}
}
func
TestUpdateGossipScore
(
t
*
testing
.
T
)
{
func
TestUpdateGossipScore
(
t
*
testing
.
T
)
{
id
:=
peer
.
ID
(
"aaaa"
)
id
:=
peer
.
ID
(
"aaaa"
)
store
:=
createMemoryStore
(
t
)
store
:=
createMemoryStore
(
t
)
score
:=
123.45
score
:=
123.45
require
.
NoError
(
t
,
store
.
SetScore
(
id
,
TypeGossip
,
444.223
))
require
.
NoError
(
t
,
store
.
SetScore
(
id
,
&
GossipScores
{
Total
:
444.223
}
))
require
.
NoError
(
t
,
store
.
SetScore
(
id
,
TypeGossip
,
score
))
require
.
NoError
(
t
,
store
.
SetScore
(
id
,
&
GossipScores
{
Total
:
score
}
))
assertPeerScores
(
t
,
store
,
id
,
PeerScores
{
Gossip
:
score
})
assertPeerScores
(
t
,
store
,
id
,
PeerScores
{
Gossip
:
GossipScores
{
Total
:
score
}
})
}
}
func
TestStoreScoresForMultiplePeers
(
t
*
testing
.
T
)
{
func
TestStoreScoresForMultiplePeers
(
t
*
testing
.
T
)
{
...
@@ -48,11 +48,11 @@ func TestStoreScoresForMultiplePeers(t *testing.T) {
...
@@ -48,11 +48,11 @@ func TestStoreScoresForMultiplePeers(t *testing.T) {
store
:=
createMemoryStore
(
t
)
store
:=
createMemoryStore
(
t
)
score1
:=
123.45
score1
:=
123.45
score2
:=
453.22
score2
:=
453.22
require
.
NoError
(
t
,
store
.
SetScore
(
id1
,
TypeGossip
,
score1
))
require
.
NoError
(
t
,
store
.
SetScore
(
id1
,
&
GossipScores
{
Total
:
score1
}
))
require
.
NoError
(
t
,
store
.
SetScore
(
id2
,
TypeGossip
,
score2
))
require
.
NoError
(
t
,
store
.
SetScore
(
id2
,
&
GossipScores
{
Total
:
score2
}
))
assertPeerScores
(
t
,
store
,
id1
,
PeerScores
{
Gossip
:
score1
})
assertPeerScores
(
t
,
store
,
id1
,
PeerScores
{
Gossip
:
GossipScores
{
Total
:
score1
}
})
assertPeerScores
(
t
,
store
,
id2
,
PeerScores
{
Gossip
:
score2
})
assertPeerScores
(
t
,
store
,
id2
,
PeerScores
{
Gossip
:
GossipScores
{
Total
:
score2
}
})
}
}
func
TestPersistData
(
t
*
testing
.
T
)
{
func
TestPersistData
(
t
*
testing
.
T
)
{
...
@@ -61,19 +61,13 @@ func TestPersistData(t *testing.T) {
...
@@ -61,19 +61,13 @@ func TestPersistData(t *testing.T) {
backingStore
:=
sync
.
MutexWrap
(
ds
.
NewMapDatastore
())
backingStore
:=
sync
.
MutexWrap
(
ds
.
NewMapDatastore
())
store
:=
createPeerstoreWithBacking
(
t
,
backingStore
)
store
:=
createPeerstoreWithBacking
(
t
,
backingStore
)
require
.
NoError
(
t
,
store
.
SetScore
(
id
,
TypeGossip
,
score
))
require
.
NoError
(
t
,
store
.
SetScore
(
id
,
&
GossipScores
{
Total
:
score
}
))
// Close and recreate a new store from the same backing
// Close and recreate a new store from the same backing
require
.
NoError
(
t
,
store
.
Close
())
require
.
NoError
(
t
,
store
.
Close
())
store
=
createPeerstoreWithBacking
(
t
,
backingStore
)
store
=
createPeerstoreWithBacking
(
t
,
backingStore
)
assertPeerScores
(
t
,
store
,
id
,
PeerScores
{
Gossip
:
score
})
assertPeerScores
(
t
,
store
,
id
,
PeerScores
{
Gossip
:
GossipScores
{
Total
:
score
}})
}
func
TestUnknownScoreType
(
t
*
testing
.
T
)
{
store
:=
createMemoryStore
(
t
)
err
:=
store
.
SetScore
(
"aaaa"
,
92832
,
244.24
)
require
.
ErrorContains
(
t
,
err
,
"unknown score type"
)
}
}
func
TestCloseCompletes
(
t
*
testing
.
T
)
{
func
TestCloseCompletes
(
t
*
testing
.
T
)
{
...
@@ -98,17 +92,17 @@ func TestPrune(t *testing.T) {
...
@@ -98,17 +92,17 @@ func TestPrune(t *testing.T) {
firstStore
:=
clock
.
Now
()
firstStore
:=
clock
.
Now
()
// Set some scores all 30 minutes apart so they have different expiry times
// Set some scores all 30 minutes apart so they have different expiry times
require
.
NoError
(
t
,
book
.
SetScore
(
"aaaa"
,
TypeGossip
,
123.45
))
require
.
NoError
(
t
,
book
.
SetScore
(
"aaaa"
,
&
GossipScores
{
Total
:
123.45
}
))
clock
.
AdvanceTime
(
30
*
time
.
Minute
)
clock
.
AdvanceTime
(
30
*
time
.
Minute
)
require
.
NoError
(
t
,
book
.
SetScore
(
"bbbb"
,
TypeGossip
,
123.45
))
require
.
NoError
(
t
,
book
.
SetScore
(
"bbbb"
,
&
GossipScores
{
Total
:
123.45
}
))
clock
.
AdvanceTime
(
30
*
time
.
Minute
)
clock
.
AdvanceTime
(
30
*
time
.
Minute
)
require
.
NoError
(
t
,
book
.
SetScore
(
"cccc"
,
TypeGossip
,
123.45
))
require
.
NoError
(
t
,
book
.
SetScore
(
"cccc"
,
&
GossipScores
{
Total
:
123.45
}
))
clock
.
AdvanceTime
(
30
*
time
.
Minute
)
clock
.
AdvanceTime
(
30
*
time
.
Minute
)
require
.
NoError
(
t
,
book
.
SetScore
(
"dddd"
,
TypeGossip
,
123.45
))
require
.
NoError
(
t
,
book
.
SetScore
(
"dddd"
,
&
GossipScores
{
Total
:
123.45
}
))
clock
.
AdvanceTime
(
30
*
time
.
Minute
)
clock
.
AdvanceTime
(
30
*
time
.
Minute
)
// Update bbbb again which should extend its expiry
// Update bbbb again which should extend its expiry
require
.
NoError
(
t
,
book
.
SetScore
(
"bbbb"
,
TypeGossip
,
123.45
))
require
.
NoError
(
t
,
book
.
SetScore
(
"bbbb"
,
&
GossipScores
{
Total
:
123.45
}
))
require
.
True
(
t
,
hasScoreRecorded
(
"aaaa"
))
require
.
True
(
t
,
hasScoreRecorded
(
"aaaa"
))
require
.
True
(
t
,
hasScoreRecorded
(
"bbbb"
))
require
.
True
(
t
,
hasScoreRecorded
(
"bbbb"
))
...
@@ -153,7 +147,7 @@ func TestPruneMultipleBatches(t *testing.T) {
...
@@ -153,7 +147,7 @@ func TestPruneMultipleBatches(t *testing.T) {
// Set scores for more peers than the max batch size
// Set scores for more peers than the max batch size
peerCount
:=
maxPruneBatchSize
*
3
+
5
peerCount
:=
maxPruneBatchSize
*
3
+
5
for
i
:=
0
;
i
<
peerCount
;
i
++
{
for
i
:=
0
;
i
<
peerCount
;
i
++
{
require
.
NoError
(
t
,
book
.
SetScore
(
peer
.
ID
(
strconv
.
Itoa
(
i
)),
TypeGossip
,
123.45
))
require
.
NoError
(
t
,
book
.
SetScore
(
peer
.
ID
(
strconv
.
Itoa
(
i
)),
&
GossipScores
{
Total
:
123.45
}
))
}
}
clock
.
AdvanceTime
(
expiryPeriod
+
1
)
clock
.
AdvanceTime
(
expiryPeriod
+
1
)
require
.
NoError
(
t
,
book
.
prune
())
require
.
NoError
(
t
,
book
.
prune
())
...
...
op-node/p2p/store/serialize.go
View file @
95978376
package
store
package
store
import
(
import
"encoding/json"
"bytes"
"encoding/binary"
"time"
)
func
serializeScoresV0
(
scores
scoreRecord
)
([]
byte
,
error
)
{
func
serializeScoresV0
(
scores
scoreRecord
)
([]
byte
,
error
)
{
var
b
bytes
.
Buffer
// v0 just serializes to JSON. New/unrecognized values default to 0.
err
:=
binary
.
Write
(
&
b
,
binary
.
BigEndian
,
scores
.
lastUpdate
.
UnixMilli
())
return
json
.
Marshal
(
&
scores
)
if
err
!=
nil
{
return
nil
,
err
}
err
=
binary
.
Write
(
&
b
,
binary
.
BigEndian
,
scores
.
Gossip
)
if
err
!=
nil
{
return
nil
,
err
}
return
b
.
Bytes
(),
nil
}
}
func
deserializeScoresV0
(
data
[]
byte
)
(
scoreRecord
,
error
)
{
func
deserializeScoresV0
(
data
[]
byte
)
(
scoreRecord
,
error
)
{
var
scores
scoreRecord
var
out
scoreRecord
r
:=
bytes
.
NewReader
(
data
)
err
:=
json
.
Unmarshal
(
data
,
&
out
)
var
lastUpdate
int64
return
out
,
err
err
:=
binary
.
Read
(
r
,
binary
.
BigEndian
,
&
lastUpdate
)
if
err
!=
nil
{
return
scoreRecord
{},
err
}
scores
.
lastUpdate
=
time
.
UnixMilli
(
lastUpdate
)
err
=
binary
.
Read
(
r
,
binary
.
BigEndian
,
&
scores
.
Gossip
)
if
err
!=
nil
{
return
scoreRecord
{},
err
}
return
scores
,
nil
}
}
op-node/p2p/store/serialize_test.go
View file @
95978376
package
store
package
store
import
(
import
(
"encoding/json"
"strconv"
"strconv"
"testing"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/require"
)
)
func
TestRoundtripScoresV0
(
t
*
testing
.
T
)
{
func
TestRoundtripScoresV0
(
t
*
testing
.
T
)
{
scores
:=
scoreRecord
{
scores
:=
scoreRecord
{
PeerScores
:
PeerScores
{
Gossip
:
1234.52382
},
PeerScores
:
PeerScores
{
Gossip
:
GossipScores
{
Total
:
1234.52382
}
},
lastUpdate
:
time
.
UnixMilli
(
1923841
)
,
LastUpdate
:
1923841
,
}
}
data
,
err
:=
serializeScoresV0
(
scores
)
data
,
err
:=
serializeScoresV0
(
scores
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
...
@@ -23,25 +22,41 @@ func TestRoundtripScoresV0(t *testing.T) {
...
@@ -23,25 +22,41 @@ func TestRoundtripScoresV0(t *testing.T) {
}
}
// TestParseHistoricSerializations checks that existing data can still be deserialized
// TestParseHistoricSerializations checks that existing data can still be deserialized
// Adding new fields should not require bumping the version, only removing fields
// Adding new fields should not require bumping the version. Removing fields may require bumping.
// Scores should always default to 0.
// A new entry should be added to this test each time any fields are changed to ensure it can always be deserialized
// A new entry should be added to this test each time any fields are changed to ensure it can always be deserialized
func
TestParseHistoricSerializationsV0
(
t
*
testing
.
T
)
{
func
TestParseHistoricSerializationsV0
(
t
*
testing
.
T
)
{
tests
:=
[]
struct
{
tests
:=
[]
struct
{
data
[]
byte
data
string
expected
scoreRecord
expected
scoreRecord
}{
}{
{
{
data
:
common
.
Hex2Bytes
(
"00000000001D5B0140934A18644523F6"
)
,
data
:
`{"peerScores":{"gossip":{"total":1234.52382,"blocks":{"timeInMesh":1234,"firstMessageDeliveries":12,"meshMessageDeliveries":34,"invalidMessageDeliveries":56},"IPColocationFactor":12.34,"behavioralPenalty":56.78},"reqRespSync":123456},"lastUpdate":1923841}`
,
expected
:
scoreRecord
{
expected
:
scoreRecord
{
PeerScores
:
PeerScores
{
Gossip
:
1234.52382
},
PeerScores
:
PeerScores
{
lastUpdate
:
time
.
UnixMilli
(
1923841
),
Gossip
:
GossipScores
{
Total
:
1234.52382
,
Blocks
:
TopicScores
{
TimeInMesh
:
1234
,
FirstMessageDeliveries
:
12
,
MeshMessageDeliveries
:
34
,
InvalidMessageDeliveries
:
56
,
},
IPColocationFactor
:
12.34
,
BehavioralPenalty
:
56.78
,
},
ReqRespSync
:
123456
,
},
LastUpdate
:
1923841
,
},
},
},
},
}
}
for
idx
,
test
:=
range
tests
{
for
idx
,
test
:=
range
tests
{
test
:=
test
test
:=
test
out
,
_
:=
json
.
Marshal
(
&
test
.
expected
)
t
.
Log
(
string
(
out
))
t
.
Run
(
strconv
.
Itoa
(
idx
),
func
(
t
*
testing
.
T
)
{
t
.
Run
(
strconv
.
Itoa
(
idx
),
func
(
t
*
testing
.
T
)
{
result
,
err
:=
deserializeScoresV0
(
test
.
data
)
result
,
err
:=
deserializeScoresV0
(
[]
byte
(
test
.
data
)
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
require
.
Equal
(
t
,
test
.
expected
,
result
)
require
.
Equal
(
t
,
test
.
expected
,
result
)
})
})
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment