Commit 99e04e6b authored by protolambda's avatar protolambda

op-node: refactor scorer to decouple connection-gater, implement...

op-node: refactor scorer to decouple connection-gater, implement connection-gater with layered functionality
parent 78d1ef1e
......@@ -3,8 +3,10 @@ package op_e2e
import (
"context"
"crypto/ecdsa"
"crypto/rand"
"fmt"
"math/big"
"net"
"os"
"path"
"sort"
......@@ -12,6 +14,18 @@ import (
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/clock"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
......@@ -465,7 +479,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
if p, ok := p2pNodes[name]; ok {
return p, nil
}
h, err := sys.Mocknet.GenPeer()
h, err := sys.newMockNetPeer()
if err != nil {
return nil, fmt.Errorf("failed to init p2p host for node %s", name)
}
......@@ -627,6 +641,51 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
return sys, nil
}
// IP6 range that gets blackholed (in case our traffic ever makes it out onto
// the internet).
var blackholeIP6 = net.ParseIP("100::")
// mocknet doesn't allow us to add a peerstore without fully creating the peer ourselves
func (sys *System) newMockNetPeer() (host.Host, error) {
sk, _, err := ic.GenerateECDSAKeyPair(rand.Reader)
if err != nil {
return nil, err
}
id, err := peer.IDFromPrivateKey(sk)
if err != nil {
return nil, err
}
suffix := id
if len(id) > 8 {
suffix = id[len(id)-8:]
}
ip := append(net.IP{}, blackholeIP6...)
copy(ip[net.IPv6len-len(suffix):], suffix)
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip6/%s/tcp/4242", ip))
if err != nil {
return nil, fmt.Errorf("failed to create test multiaddr: %w", err)
}
p, err := peer.IDFromPublicKey(sk.GetPublic())
if err != nil {
return nil, err
}
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
}
ps.AddAddr(p, a, peerstore.PermanentAddrTTL)
_ = ps.AddPrivKey(p, sk)
_ = ps.AddPubKey(p, sk.GetPublic())
ds := sync.MutexWrap(ds.NewMapDatastore())
eps, err := store.NewExtendedPeerstore(context.Background(), log.Root(), clock.SystemClock, ps, ds)
if err != nil {
return nil, err
}
return sys.Mocknet.AddPeerWithPeerstore(p, eps)
}
func selectEndpoint(node *node.Node) string {
useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true"
if useHTTP {
......
......@@ -706,7 +706,7 @@ func TestSystemP2PAltSync(t *testing.T) {
snapLog.SetHandler(log.DiscardHandler())
// Create a peer, and hook up alice and bob
h, err := sys.Mocknet.GenPeer()
h, err := sys.newMockNetPeer()
require.NoError(t, err)
_, err = sys.Mocknet.LinkPeers(sys.RollupNodes["alice"].P2P().Host().ID(), h.ID())
require.NoError(t, err)
......
......@@ -6,6 +6,8 @@ import (
"net"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
......@@ -17,8 +19,6 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
cmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/ethereum-optimism/optimism/op-node/rollup"
......@@ -30,12 +30,17 @@ var DefaultBootnodes = []*enode.Node{
enode.MustParse("enode://9d7a3efefe442351217e73b3a593bcb8efffb55b4807699972145324eab5e6b382152f8d24f6301baebbfb5ecd4127bd3faab2842c04cd432bdf50ba092f6645@34.65.109.126:0?discport=30305"),
}
type HostMetrics interface {
gating.UnbanMetrics
gating.ConnectionGaterMetrics
}
// SetupP2P provides a host and discovery service for usage in the rollup node.
type SetupP2P interface {
Check() error
Disabled() bool
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
Host(log log.Logger, reporter metrics.Reporter) (host.Host, error)
Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error)
// Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled.
Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error)
TargetPeers() uint
......@@ -109,33 +114,6 @@ type Config struct {
EnableReqRespSync bool
}
//go:generate mockery --name ConnectionGater
type ConnectionGater interface {
connmgr.ConnectionGater
// BlockPeer adds a peer to the set of blocked peers.
// Note: active connections to the peer are not automatically closed.
BlockPeer(p peer.ID) error
UnblockPeer(p peer.ID) error
ListBlockedPeers() []peer.ID
// BlockAddr adds an IP address to the set of blocked addresses.
// Note: active connections to the IP address are not automatically closed.
BlockAddr(ip net.IP) error
UnblockAddr(ip net.IP) error
ListBlockedAddrs() []net.IP
// BlockSubnet adds an IP subnet to the set of blocked addresses.
// Note: active connections to the IP subnet are not automatically closed.
BlockSubnet(ipnet *net.IPNet) error
UnblockSubnet(ipnet *net.IPNet) error
ListBlockedSubnets() []*net.IPNet
}
func DefaultConnGater(conf *Config) (connmgr.ConnectionGater, error) {
return conngater.NewBasicConnectionGater(conf.Store)
}
func DefaultConnManager(conf *Config) (connmgr.ConnManager, error) {
return cmgr.NewConnManager(
int(conf.PeersLo),
......
......@@ -66,8 +66,6 @@ type GossipRuntimeConfig interface {
//go:generate mockery --name GossipMetricer
type GossipMetricer interface {
RecordGossipEvent(evType int32)
// Peer Scoring Metric Funcs
SetPeerScores(map[string]float64)
}
func blocksTopicV1(cfg *rollup.Config) string {
......@@ -157,7 +155,7 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams {
// NewGossipSub configures a new pubsub instance with the specified parameters.
// PubSub uses a GossipSubRouter as it's router under the hood.
func NewGossipSub(p2pCtx context.Context, h host.Host, g ConnectionGater, cfg *rollup.Config, gossipConf GossipSetupConfigurables, m GossipMetricer, log log.Logger) (*pubsub.PubSub, error) {
func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, gossipConf GossipSetupConfigurables, scorer Scorer, m GossipMetricer, log log.Logger) (*pubsub.PubSub, error) {
denyList, err := pubsub.NewTimeCachedBlacklist(30 * time.Second)
if err != nil {
return nil, err
......@@ -176,7 +174,7 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, g ConnectionGater, cfg *r
pubsub.WithBlacklist(denyList),
pubsub.WithEventTracer(&gossipTracer{m: m}),
}
gossipOpts = append(gossipOpts, ConfigurePeerScoring(h, g, gossipConf, m, log)...)
gossipOpts = append(gossipOpts, ConfigurePeerScoring(gossipConf, scorer, log)...)
gossipOpts = append(gossipOpts, gossipConf.ConfigureGossip(cfg)...)
return pubsub.NewGossipSub(p2pCtx, h, gossipOpts...)
}
......
......@@ -7,9 +7,6 @@ import (
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
libp2p "github.com/libp2p/go-libp2p"
lconf "github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/connmgr"
......@@ -29,17 +26,21 @@ import (
madns "github.com/multiformats/go-multiaddr-dns"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/clock"
)
type ExtraHostFeatures interface {
host.Host
ConnectionGater() ConnectionGater
ConnectionGater() gating.BlockingConnectionGater
ConnectionManager() connmgr.ConnManager
}
type extraHost struct {
host.Host
gater ConnectionGater
gater gating.BlockingConnectionGater
connMgr connmgr.ConnManager
log log.Logger
......@@ -48,7 +49,7 @@ type extraHost struct {
quitC chan struct{}
}
func (e *extraHost) ConnectionGater() ConnectionGater {
func (e *extraHost) ConnectionGater() gating.BlockingConnectionGater {
return e.gater
}
......@@ -125,7 +126,7 @@ func (e *extraHost) monitorStaticPeers() {
var _ ExtraHostFeatures = (*extraHost)(nil)
func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host, error) {
func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error) {
if conf.DisableP2P {
return nil, nil
}
......@@ -152,10 +153,15 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host,
return nil, fmt.Errorf("failed to set up peerstore with pub key: %w", err)
}
connGtr, err := DefaultConnGater(conf)
var connGtr gating.BlockingConnectionGater
connGtr, err = gating.NewBlockingConnectionGater(conf.Store)
if err != nil {
return nil, fmt.Errorf("failed to open connection gater: %w", err)
}
// TODO(CLI-4015): apply connGtr enhancements
// connGtr = gating.AddBanExpiry(connGtr, ps, log, cl, reporter)
//connGtr = gating.AddScoring(connGtr, ps, 0)
connGtr = gating.AddMetering(connGtr, metrics)
connMngr, err := DefaultConnManager(conf)
if err != nil {
......@@ -234,10 +240,7 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host,
go out.monitorStaticPeers()
}
// Only add the connection gater if it offers the full interface we're looking for.
if g, ok := connGtr.(ConnectionGater); ok {
out.gater = g
}
out.gater = connGtr
return out, nil
}
......
......@@ -59,10 +59,10 @@ func TestingConfig(t *testing.T) *Config {
func TestP2PSimple(t *testing.T) {
confA := TestingConfig(t)
confB := TestingConfig(t)
hostA, err := confA.Host(testlog.Logger(t, log.LvlError).New("host", "A"), nil)
hostA, err := confA.Host(testlog.Logger(t, log.LvlError).New("host", "A"), nil, metrics.NoopMetrics)
require.NoError(t, err, "failed to launch host A")
defer hostA.Close()
hostB, err := confB.Host(testlog.Logger(t, log.LvlError).New("host", "B"), nil)
hostB, err := confB.Host(testlog.Logger(t, log.LvlError).New("host", "B"), nil, metrics.NoopMetrics)
require.NoError(t, err, "failed to launch host B")
defer hostB.Close()
err = hostA.Connect(context.Background(), peer.AddrInfo{ID: hostB.ID(), Addrs: hostB.Addrs()})
......
// Code generated by mockery v2.22.1. DO NOT EDIT.
package mocks
import (
control "github.com/libp2p/go-libp2p/core/control"
mock "github.com/stretchr/testify/mock"
multiaddr "github.com/multiformats/go-multiaddr"
net "net"
network "github.com/libp2p/go-libp2p/core/network"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// ConnectionGater is an autogenerated mock type for the ConnectionGater type
type ConnectionGater struct {
mock.Mock
}
// BlockAddr provides a mock function with given fields: ip
func (_m *ConnectionGater) BlockAddr(ip net.IP) error {
ret := _m.Called(ip)
var r0 error
if rf, ok := ret.Get(0).(func(net.IP) error); ok {
r0 = rf(ip)
} else {
r0 = ret.Error(0)
}
return r0
}
// BlockPeer provides a mock function with given fields: p
func (_m *ConnectionGater) BlockPeer(p peer.ID) error {
ret := _m.Called(p)
var r0 error
if rf, ok := ret.Get(0).(func(peer.ID) error); ok {
r0 = rf(p)
} else {
r0 = ret.Error(0)
}
return r0
}
// BlockSubnet provides a mock function with given fields: ipnet
func (_m *ConnectionGater) BlockSubnet(ipnet *net.IPNet) error {
ret := _m.Called(ipnet)
var r0 error
if rf, ok := ret.Get(0).(func(*net.IPNet) error); ok {
r0 = rf(ipnet)
} else {
r0 = ret.Error(0)
}
return r0
}
// InterceptAccept provides a mock function with given fields: _a0
func (_m *ConnectionGater) InterceptAccept(_a0 network.ConnMultiaddrs) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(network.ConnMultiaddrs) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// InterceptAddrDial provides a mock function with given fields: _a0, _a1
func (_m *ConnectionGater) InterceptAddrDial(_a0 peer.ID, _a1 multiaddr.Multiaddr) bool {
ret := _m.Called(_a0, _a1)
var r0 bool
if rf, ok := ret.Get(0).(func(peer.ID, multiaddr.Multiaddr) bool); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// InterceptPeerDial provides a mock function with given fields: p
func (_m *ConnectionGater) InterceptPeerDial(p peer.ID) bool {
ret := _m.Called(p)
var r0 bool
if rf, ok := ret.Get(0).(func(peer.ID) bool); ok {
r0 = rf(p)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// InterceptSecured provides a mock function with given fields: _a0, _a1, _a2
func (_m *ConnectionGater) InterceptSecured(_a0 network.Direction, _a1 peer.ID, _a2 network.ConnMultiaddrs) bool {
ret := _m.Called(_a0, _a1, _a2)
var r0 bool
if rf, ok := ret.Get(0).(func(network.Direction, peer.ID, network.ConnMultiaddrs) bool); ok {
r0 = rf(_a0, _a1, _a2)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// InterceptUpgraded provides a mock function with given fields: _a0
func (_m *ConnectionGater) InterceptUpgraded(_a0 network.Conn) (bool, control.DisconnectReason) {
ret := _m.Called(_a0)
var r0 bool
var r1 control.DisconnectReason
if rf, ok := ret.Get(0).(func(network.Conn) (bool, control.DisconnectReason)); ok {
return rf(_a0)
}
if rf, ok := ret.Get(0).(func(network.Conn) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
if rf, ok := ret.Get(1).(func(network.Conn) control.DisconnectReason); ok {
r1 = rf(_a0)
} else {
r1 = ret.Get(1).(control.DisconnectReason)
}
return r0, r1
}
// ListBlockedAddrs provides a mock function with given fields:
func (_m *ConnectionGater) ListBlockedAddrs() []net.IP {
ret := _m.Called()
var r0 []net.IP
if rf, ok := ret.Get(0).(func() []net.IP); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]net.IP)
}
}
return r0
}
// ListBlockedPeers provides a mock function with given fields:
func (_m *ConnectionGater) ListBlockedPeers() []peer.ID {
ret := _m.Called()
var r0 []peer.ID
if rf, ok := ret.Get(0).(func() []peer.ID); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]peer.ID)
}
}
return r0
}
// ListBlockedSubnets provides a mock function with given fields:
func (_m *ConnectionGater) ListBlockedSubnets() []*net.IPNet {
ret := _m.Called()
var r0 []*net.IPNet
if rf, ok := ret.Get(0).(func() []*net.IPNet); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*net.IPNet)
}
}
return r0
}
// UnblockAddr provides a mock function with given fields: ip
func (_m *ConnectionGater) UnblockAddr(ip net.IP) error {
ret := _m.Called(ip)
var r0 error
if rf, ok := ret.Get(0).(func(net.IP) error); ok {
r0 = rf(ip)
} else {
r0 = ret.Error(0)
}
return r0
}
// UnblockPeer provides a mock function with given fields: p
func (_m *ConnectionGater) UnblockPeer(p peer.ID) error {
ret := _m.Called(p)
var r0 error
if rf, ok := ret.Get(0).(func(peer.ID) error); ok {
r0 = rf(p)
} else {
r0 = ret.Error(0)
}
return r0
}
// UnblockSubnet provides a mock function with given fields: ipnet
func (_m *ConnectionGater) UnblockSubnet(ipnet *net.IPNet) error {
ret := _m.Called(ipnet)
var r0 error
if rf, ok := ret.Get(0).(func(*net.IPNet) error); ok {
r0 = rf(ipnet)
} else {
r0 = ret.Error(0)
}
return r0
}
type mockConstructorTestingTNewConnectionGater interface {
mock.TestingT
Cleanup(func())
}
// NewConnectionGater creates a new instance of ConnectionGater. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewConnectionGater(t mockConstructorTestingTNewConnectionGater) *ConnectionGater {
mock := &ConnectionGater{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.22.1. DO NOT EDIT.
// Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks
......@@ -45,13 +45,13 @@ func (_m *Peerstore) Peers() peer.IDSlice {
return r0
}
// SetScore provides a mock function with given fields: _a0, _a1, _a2
func (_m *Peerstore) SetScore(_a0 peer.ID, _a1 store.ScoreType, _a2 float64) error {
ret := _m.Called(_a0, _a1, _a2)
// SetScore provides a mock function with given fields: id, diff
func (_m *Peerstore) SetScore(id peer.ID, diff store.ScoreDiff) error {
ret := _m.Called(id, diff)
var r0 error
if rf, ok := ret.Get(0).(func(peer.ID, store.ScoreType, float64) error); ok {
r0 = rf(_a0, _a1, _a2)
if rf, ok := ret.Get(0).(func(peer.ID, store.ScoreDiff) error); ok {
r0 = rf(id, diff)
} else {
r0 = ret.Error(0)
}
......
......@@ -6,6 +6,10 @@ import (
"fmt"
"strconv"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
"github.com/hashicorp/go-multierror"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/connmgr"
......@@ -26,9 +30,10 @@ import (
// NodeP2P is a p2p node, which can be used to gossip messages.
type NodeP2P struct {
host host.Host // p2p host (optional, may be nil)
gater ConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled
connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
host host.Host // p2p host (optional, may be nil)
gater gating.BlockingConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled
scorer Scorer // writes score-updates to the peerstore and keeps metrics of score changes
connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
// the below components are all optional, and may be nil. They require the host to not be nil.
dv5Local *enode.LocalNode // p2p discovery identity
dv5Udp *discover.UDPv5 // p2p discovery service
......@@ -63,7 +68,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
var err error
// nil if disabled.
n.host, err = setup.Host(log, bwc)
n.host, err = setup.Host(log, bwc, metrics)
if err != nil {
if n.dv5Udp != nil {
n.dv5Udp.Close()
......@@ -71,6 +76,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
return fmt.Errorf("failed to start p2p host: %w", err)
}
// TODO(CLI-4016): host is not optional, NodeP2P as a whole is. This if statement is wrong
if n.host != nil {
// Enable extra features, if any. During testing we don't setup the most advanced host all the time.
if extra, ok := n.host.(ExtraHostFeatures); ok {
......@@ -100,10 +106,23 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
}
}
eps, ok := n.host.Peerstore().(store.ExtendedPeerstore)
if !ok {
return fmt.Errorf("cannot init without extended peerstore: %w", err)
}
n.scorer = NewScorer(rollupCfg, eps, metrics, setup.PeerBandScorer(), log)
n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(_ network.Network, conn network.Conn) {
n.scorer.OnConnect(conn.RemotePeer())
},
DisconnectedF: func(_ network.Network, conn network.Conn) {
n.scorer.OnDisconnect(conn.RemotePeer())
},
})
// notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
n.gs, err = NewGossipSub(resourcesCtx, n.host, n.gater, rollupCfg, setup, metrics, log)
n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log)
if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err)
}
......@@ -162,7 +181,7 @@ func (n *NodeP2P) GossipOut() GossipOut {
return n.gsOut
}
func (n *NodeP2P) ConnectionGater() ConnectionGater {
func (n *NodeP2P) ConnectionGater() gating.BlockingConnectionGater {
return n.gater
}
......
package p2p
import (
log "github.com/ethereum/go-ethereum/log"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// ConnectionFactor is the factor by which we multiply the connection score.
const ConnectionFactor = -10
// PeerScoreThreshold is the threshold at which we block a peer.
const PeerScoreThreshold = -100
// gater is an internal implementation of the [PeerGater] interface.
type gater struct {
connGater ConnectionGater
blockedMap map[peer.ID]bool
log log.Logger
banEnabled bool
}
// PeerGater manages the connection gating of peers.
//
//go:generate mockery --name PeerGater --output mocks/
type PeerGater interface {
// Update handles a peer score update and blocks/unblocks the peer if necessary.
Update(peer.ID, float64)
// IsBlocked returns true if the given [peer.ID] is blocked.
IsBlocked(peer.ID) bool
}
// NewPeerGater returns a new peer gater.
func NewPeerGater(connGater ConnectionGater, log log.Logger, banEnabled bool) PeerGater {
return &gater{
connGater: connGater,
blockedMap: make(map[peer.ID]bool),
log: log,
banEnabled: banEnabled,
}
}
// IsBlocked returns true if the given [peer.ID] is blocked.
func (s *gater) IsBlocked(peerID peer.ID) bool {
return s.blockedMap[peerID]
}
// setBlocked sets the blocked status of the given [peer.ID].
func (s *gater) setBlocked(peerID peer.ID, blocked bool) {
s.blockedMap[peerID] = blocked
}
// Update handles a peer score update and blocks/unblocks the peer if necessary.
func (s *gater) Update(id peer.ID, score float64) {
// Check if the peer score is below the threshold
// If so, we need to block the peer
isAlreadyBlocked := s.IsBlocked(id)
if score < PeerScoreThreshold && s.banEnabled && !isAlreadyBlocked {
s.log.Warn("peer blocking enabled, blocking peer", "id", id.String(), "score", score)
err := s.connGater.BlockPeer(id)
if err != nil {
s.log.Warn("connection gater failed to block peer", "id", id.String(), "err", err)
}
// Set the peer as blocked in the blocked map
s.setBlocked(id, true)
}
// Unblock peers whose score has recovered to an acceptable level
if (score > PeerScoreThreshold) && isAlreadyBlocked {
err := s.connGater.UnblockPeer(id)
if err != nil {
s.log.Warn("connection gater failed to unblock peer", "id", id.String(), "err", err)
}
// Set the peer as unblocked in the blocked map
s.setBlocked(id, false)
}
}
package p2p_test
import (
"testing"
p2p "github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
testlog "github.com/ethereum-optimism/optimism/op-node/testlog"
log "github.com/ethereum/go-ethereum/log"
peer "github.com/libp2p/go-libp2p/core/peer"
suite "github.com/stretchr/testify/suite"
)
// PeerGaterTestSuite tests peer parameterization.
type PeerGaterTestSuite struct {
suite.Suite
mockGater *p2pMocks.ConnectionGater
logger log.Logger
}
// SetupTest sets up the test suite.
func (testSuite *PeerGaterTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.ConnectionGater{}
testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError)
}
// TestPeerGater runs the PeerGaterTestSuite.
func TestPeerGater(t *testing.T) {
suite.Run(t, new(PeerGaterTestSuite))
}
// TestPeerScoreConstants validates the peer score constants.
func (testSuite *PeerGaterTestSuite) TestPeerScoreConstants() {
testSuite.Equal(-10, p2p.ConnectionFactor)
testSuite.Equal(-100, p2p.PeerScoreThreshold)
}
// TestPeerGaterUpdate tests the peer gater update hook.
func (testSuite *PeerGaterTestSuite) TestPeerGater_UpdateBansPeers() {
gater := p2p.NewPeerGater(
testSuite.mockGater,
testSuite.logger,
true,
)
// Return an empty list of already blocked peers
testSuite.mockGater.On("ListBlockedPeers").Return([]peer.ID{}).Once()
// Mock a connection gater peer block call
// Since the peer score is below the [PeerScoreThreshold] of -100,
// the [BlockPeer] method should be called
testSuite.mockGater.On("BlockPeer", peer.ID("peer1")).Return(nil).Once()
// The peer should initially be unblocked
testSuite.False(gater.IsBlocked(peer.ID("peer1")))
// Apply the peer gater update
gater.Update(peer.ID("peer1"), float64(-101))
// The peer should be considered blocked
testSuite.True(gater.IsBlocked(peer.ID("peer1")))
// Now let's unblock the peer
testSuite.mockGater.On("UnblockPeer", peer.ID("peer1")).Return(nil).Once()
gater.Update(peer.ID("peer1"), float64(0))
// The peer should be considered unblocked
testSuite.False(gater.IsBlocked(peer.ID("peer1")))
}
// TestPeerGaterUpdateNoBanning tests the peer gater update hook without banning set
func (testSuite *PeerGaterTestSuite) TestPeerGater_UpdateNoBanning() {
gater := p2p.NewPeerGater(
testSuite.mockGater,
testSuite.logger,
false,
)
// Return an empty list of already blocked peers
testSuite.mockGater.On("ListBlockedPeers").Return([]peer.ID{})
// Notice: [BlockPeer] should not be called since banning is not enabled
// even though the peer score is way below the [PeerScoreThreshold] of -100
gater.Update(peer.ID("peer1"), float64(-100000))
// The peer should be unblocked
testSuite.False(gater.IsBlocked(peer.ID("peer1")))
// Make sure that if we then "unblock" the peer, nothing happens
gater.Update(peer.ID("peer1"), float64(0))
// The peer should still be unblocked
testSuite.False(gater.IsBlocked(peer.ID("peer1")))
}
......@@ -5,6 +5,9 @@ import (
"sort"
"strconv"
"strings"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
log "github.com/ethereum/go-ethereum/log"
......@@ -14,10 +17,10 @@ import (
type scorer struct {
peerStore Peerstore
metricer GossipMetricer
metricer ScoreMetrics
log log.Logger
gater PeerGater
bandScoreThresholds *BandScoreThresholds
cfg *rollup.Config
}
// scorePair holds a band and its corresponding threshold.
......@@ -93,31 +96,37 @@ type Peerstore interface {
// Peers returns all of the peer IDs stored across all inner stores.
Peers() peer.IDSlice
SetScore(peer.ID, store.ScoreType, float64) error
SetScore(id peer.ID, diff store.ScoreDiff) error
}
// Scorer is a peer scorer that scores peers based on application-specific metrics.
type Scorer interface {
OnConnect()
OnDisconnect()
OnConnect(id peer.ID)
OnDisconnect(id peer.ID)
SnapshotHook() pubsub.ExtendedPeerScoreInspectFn
}
type ScoreMetrics interface {
SetPeerScores(map[string]float64)
}
// NewScorer returns a new peer scorer.
func NewScorer(peerGater PeerGater, peerStore Peerstore, metricer GossipMetricer, bandScoreThresholds *BandScoreThresholds, log log.Logger) Scorer {
func NewScorer(cfg *rollup.Config, peerStore Peerstore, metricer ScoreMetrics, bandScoreThresholds *BandScoreThresholds, log log.Logger) Scorer {
return &scorer{
peerStore: peerStore,
metricer: metricer,
log: log,
gater: peerGater,
bandScoreThresholds: bandScoreThresholds,
cfg: cfg,
}
}
// SnapshotHook returns a function that is called periodically by the pubsub library to inspect the peer scores.
// SnapshotHook returns a function that is called periodically by the pubsub library to inspect the gossip peer scores.
// It is passed into the pubsub library as a [pubsub.ExtendedPeerScoreInspectFn] in the [pubsub.WithPeerScoreInspect] option.
// The returned [pubsub.ExtendedPeerScoreInspectFn] is called with a mapping of peer IDs to peer score snapshots.
// The incoming peer score snapshots only contain gossip-score components.
func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
blocksTopicName := blocksTopicV1(s.cfg)
return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) {
scoreMap := make(map[string]float64)
// Zero out all bands.
......@@ -126,28 +135,36 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
}
// Now set the new scores.
for id, snap := range m {
scores := make(map[store.ScoreType]float64)
scores[store.TypeGossip] = snap.Score
if err := s.peerStore.SetScore(id, store.TypeGossip, snap.Score); err != nil {
diff := store.GossipScores{
Total: snap.Score,
Blocks: store.TopicScores{},
IPColocationFactor: snap.IPColocationFactor,
BehavioralPenalty: snap.BehaviourPenalty,
}
if topSnap, ok := snap.Topics[blocksTopicName]; ok {
diff.Blocks.TimeInMesh = float64(topSnap.TimeInMesh) / float64(time.Second)
diff.Blocks.MeshMessageDeliveries = uint64(topSnap.MeshMessageDeliveries)
diff.Blocks.FirstMessageDeliveries = uint64(topSnap.FirstMessageDeliveries)
diff.Blocks.InvalidMessageDeliveries = uint64(topSnap.InvalidMessageDeliveries)
}
if err := s.peerStore.SetScore(id, &diff); err != nil {
s.log.Warn("Unable to update peer gossip score", "err", err)
}
}
for _, snap := range m {
band := s.bandScoreThresholds.Bucket(snap.Score)
scoreMap[band] += 1
s.gater.Update(id, snap.Score)
}
s.metricer.SetPeerScores(scoreMap)
}
}
// OnConnect is called when a peer connects.
// See [p2p.NotificationsMetricer] for invocation.
func (s *scorer) OnConnect() {
// no-op
func (s *scorer) OnConnect(id peer.ID) {
// TODO(CLI-4003): apply decay to scores, based on last connection time
}
// OnDisconnect is called when a peer disconnects.
// See [p2p.NotificationsMetricer] for invocation.
func (s *scorer) OnDisconnect() {
// no-op
func (s *scorer) OnDisconnect(id peer.ID) {
// TODO(CLI-4003): persist disconnect-time
}
package p2p_test
import (
"math/big"
"testing"
pubsub "github.com/libp2p/go-libp2p-pubsub"
peer "github.com/libp2p/go-libp2p/core/peer"
suite "github.com/stretchr/testify/suite"
log "github.com/ethereum/go-ethereum/log"
p2p "github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
log "github.com/ethereum/go-ethereum/log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
peer "github.com/libp2p/go-libp2p/core/peer"
suite "github.com/stretchr/testify/suite"
)
// PeerScorerTestSuite tests peer parameterization.
type PeerScorerTestSuite struct {
suite.Suite
// mockConnGater *p2pMocks.ConnectionGater
mockGater *p2pMocks.PeerGater
mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer
bandScorer *p2p.BandScoreThresholds
......@@ -27,7 +29,6 @@ type PeerScorerTestSuite struct {
// SetupTest sets up the test suite.
func (testSuite *PeerScorerTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.PeerGater{}
testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{}
bandScorer, err := p2p.NewBandScorer("-40:graylist;0:friend;")
......@@ -44,31 +45,31 @@ func TestPeerScorer(t *testing.T) {
// TestScorer_OnConnect ensures we can call the OnConnect method on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestScorer_OnConnect() {
scorer := p2p.NewScorer(
testSuite.mockGater,
&rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger,
)
scorer.OnConnect()
scorer.OnConnect(peer.ID("alice"))
}
// TestScorer_OnDisconnect ensures we can call the OnDisconnect method on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestScorer_OnDisconnect() {
scorer := p2p.NewScorer(
testSuite.mockGater,
&rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger,
)
scorer.OnDisconnect()
scorer.OnDisconnect(peer.ID("alice"))
}
// TestScorer_SnapshotHook tests running the snapshot hook on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
scorer := p2p.NewScorer(
testSuite.mockGater,
&rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.bandScorer,
......@@ -76,11 +77,8 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
)
inspectFn := scorer.SnapshotHook()
// Mock the peer gater call
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-100)).Return(nil).Once()
// Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), store.TypeGossip, float64(-100)).Return(nil).Once()
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: float64(-100)}).Return(nil).Once()
// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
......@@ -96,10 +94,8 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
}
inspectFn(snapshotMap)
// Change the peer score now to a different band
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(0)).Return(nil).Once()
// Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), store.TypeGossip, float64(0)).Return(nil).Once()
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: 0}).Return(nil).Once()
// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
......@@ -120,7 +116,7 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
// This implies that the peer should be blocked.
func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHookBlocksPeer() {
scorer := p2p.NewScorer(
testSuite.mockGater,
&rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.bandScorer,
......@@ -128,10 +124,8 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHookBlocksPeer() {
)
inspectFn := scorer.SnapshotHook()
// Mock the peer gater call
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-101)).Return(nil)
// Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), store.TypeGossip, float64(-101)).Return(nil).Once()
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: float64(-101)}).Return(nil).Once()
// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
......
package p2p
import (
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
log "github.com/ethereum/go-ethereum/log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
host "github.com/libp2p/go-libp2p/core/host"
)
// ConfigurePeerScoring configures the peer scoring parameters for the pubsub
func ConfigurePeerScoring(h host.Host, g ConnectionGater, gossipConf GossipSetupConfigurables, m GossipMetricer, log log.Logger) []pubsub.Option {
func ConfigurePeerScoring(gossipConf GossipSetupConfigurables, scorer Scorer, log log.Logger) []pubsub.Option {
// If we want to completely disable scoring config here, we can use the [peerScoringParams]
// to return early without returning any [pubsub.Option].
peerScoreParams := gossipConf.PeerScoringParams()
peerScoreThresholds := NewPeerScoreThresholds()
banEnabled := gossipConf.BanPeers()
peerGater := NewPeerGater(g, log, banEnabled)
opts := []pubsub.Option{}
eps, ok := h.Peerstore().(store.ExtendedPeerstore)
if !ok {
log.Warn("Disabling peer scoring. Peerstore does not support peer scores")
return opts
}
scorer := NewScorer(peerGater, eps, m, gossipConf.PeerBandScorer(), log)
// Check the app specific score since libp2p doesn't export it's [validate] function :/
if peerScoreParams != nil && peerScoreParams.AppSpecificScore != nil {
opts = []pubsub.Option{
......
package p2p_test
package p2p
import (
"context"
"fmt"
"math/big"
"math/rand"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum-optimism/optimism/op-node/rollup"
p2p "github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
testlog "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
log "github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
suite "github.com/stretchr/testify/suite"
log "github.com/ethereum/go-ethereum/log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
host "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds"
tswarm "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
// PeerScoresTestSuite tests peer parameterization.
type PeerScoresTestSuite struct {
suite.Suite
mockGater *p2pMocks.ConnectionGater
mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer
bandScorer p2p.BandScoreThresholds
bandScorer BandScoreThresholds
logger log.Logger
}
// SetupTest sets up the test suite.
func (testSuite *PeerScoresTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.ConnectionGater{}
testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{}
bandScorer, err := p2p.NewBandScorer("0:graylist;")
bandScorer, err := NewBandScorer("0:graylist;")
testSuite.NoError(err)
testSuite.bandScorer = *bandScorer
testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError)
......@@ -96,7 +95,17 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
for _, h := range hosts {
rt := pubsub.DefaultGossipSubRouter(h)
opts := []pubsub.Option{}
opts = append(opts, p2p.ConfigurePeerScoring(h, testSuite.mockGater, &p2p.Config{
dataStore := sync.MutexWrap(ds.NewMapDatastore())
peerStore, err := pstoreds.NewPeerstore(context.Background(), dataStore, pstoreds.DefaultOpts())
require.NoError(testSuite.T(), err)
extPeerStore, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, peerStore, dataStore)
require.NoError(testSuite.T(), err)
scorer := NewScorer(
&rollup.Config{L2ChainID: big.NewInt(123)},
extPeerStore, testSuite.mockMetricer, &testSuite.bandScorer, logger)
opts = append(opts, ConfigurePeerScoring(&Config{
BandScoreThresholds: testSuite.bandScorer,
PeerScoring: pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 {
......@@ -110,7 +119,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
DecayInterval: time.Second,
DecayToZero: 0.01,
},
}, testSuite.mockMetricer, logger)...)
}, scorer, logger)...)
ps, err := pubsub.NewGossipSubWithRouter(ctx, h, rt, opts...)
if err != nil {
panic(err)
......@@ -150,8 +159,6 @@ func (testSuite *PeerScoresTestSuite) TestNegativeScores() {
testSuite.mockMetricer.On("SetPeerScores", mock.Anything).Return(nil)
testSuite.mockGater.On("ListBlockedPeers").Return([]peer.ID{})
// Construct 20 hosts using the [getNetHosts] function.
hosts := getNetHosts(testSuite, ctx, 20)
testSuite.Equal(20, len(hosts))
......
......@@ -43,7 +43,7 @@ func (p *Prepared) Check() error {
}
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
func (p *Prepared) Host(log log.Logger, reporter metrics.Reporter) (host.Host, error) {
func (p *Prepared) Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error) {
return p.HostP2P, nil
}
......
......@@ -7,6 +7,8 @@ import (
"net"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
decredSecp "github.com/decred/dcrd/dcrec/secp256k1/v4"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
pubsub "github.com/libp2p/go-libp2p-pubsub"
......@@ -48,7 +50,7 @@ type Node interface {
// GossipOut returns the gossip output/info control
GossipOut() GossipOut
// ConnectionGater returns the connection gater, to ban/unban peers with, may be nil
ConnectionGater() ConnectionGater
ConnectionGater() gating.BlockingConnectionGater
// ConnectionManager returns the connection manager, to protect peers with, may be nil
ConnectionManager() connmgr.ConnManager
}
......
......@@ -5,23 +5,43 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
)
type PeerScores struct {
Gossip float64 `json:"gossip"`
type TopicScores struct {
TimeInMesh float64 `json:"timeInMesh"` // in seconds
FirstMessageDeliveries uint64 `json:"firstMessageDeliveries"`
MeshMessageDeliveries uint64 `json:"meshMessageDeliveries"`
InvalidMessageDeliveries uint64 `json:"invalidMessageDeliveries"`
}
type ScoreType int
type GossipScores struct {
Total float64 `json:"total"`
Blocks TopicScores `json:"blocks"` // fully zeroed if the peer has not been in the mesh on the topic
IPColocationFactor float64 `json:"IPColocationFactor"`
BehavioralPenalty float64 `json:"behavioralPenalty"`
}
const (
TypeGossip ScoreType = iota
)
func (g GossipScores) Apply(rec *scoreRecord) {
rec.PeerScores.Gossip = g
}
type PeerScores struct {
Gossip GossipScores `json:"gossip"`
ReqRespSync float64 `json:"reqRespSync"`
}
// ScoreDatastore defines a type-safe API for getting and setting libp2p peer score information
type ScoreDatastore interface {
// GetPeerScores returns the current scores for the specified peer
GetPeerScores(id peer.ID) (PeerScores, error)
// SetScore stores the latest score for the specified peer and score type
SetScore(id peer.ID, scoreType ScoreType, score float64) error
// SetScore applies the given store diff to the specified peer
SetScore(id peer.ID, diff ScoreDiff) error
}
// ScoreDiff defines a type-safe batch of changes to apply to the peer-scoring record of the peer.
// The scoreRecord the diff is applied to is private: diffs can only be defined in this package,
// to ensure changes to the record are non-breaking.
type ScoreDiff interface {
Apply(score *scoreRecord)
}
// ExtendedPeerstore defines a type-safe API to work with additional peer metadata based on a libp2p peerstore.Peerstore
......
......@@ -26,8 +26,8 @@ const (
var scoresBase = ds.NewKey("/peers/scores")
type scoreRecord struct {
PeerScores
lastUpdate time.Time
PeerScores PeerScores `json:"peerScores"`
LastUpdate int64 `json:"lastUpdate"` // unix timestamp in seconds
}
type scoreBook struct {
......@@ -91,21 +91,15 @@ func (d *scoreBook) getRecord(id peer.ID) (scoreRecord, error) {
return record, nil
}
func (d *scoreBook) SetScore(id peer.ID, scoreType ScoreType, score float64) error {
func (d *scoreBook) SetScore(id peer.ID, diff ScoreDiff) error {
d.Lock()
defer d.Unlock()
scores, err := d.getRecord(id)
if err != nil {
return err
}
scores.lastUpdate = d.clock.Now()
scores.Gossip = score
switch scoreType {
case TypeGossip:
scores.Gossip = score
default:
return fmt.Errorf("unknown score type: %v", scoreType)
}
scores.LastUpdate = d.clock.Now().Unix()
diff.Apply(&scores)
data, err := serializeScoresV0(scores)
if err != nil {
return fmt.Errorf("encode scores for peer %v: %w", id, err)
......@@ -145,7 +139,7 @@ func (d *scoreBook) prune() error {
if err != nil {
return err
}
if record.lastUpdate.Add(expiryPeriod).Before(d.clock.Now()) {
if time.Unix(record.LastUpdate, 0).Add(expiryPeriod).Before(d.clock.Now()) {
if pending > maxPruneBatchSize {
if err := batch.Commit(d.ctx); err != nil {
return err
......
......@@ -26,20 +26,20 @@ func TestRoundTripGossipScore(t *testing.T) {
id := peer.ID("aaaa")
store := createMemoryStore(t)
score := 123.45
err := store.SetScore(id, TypeGossip, score)
err := store.SetScore(id, &GossipScores{Total: score})
require.NoError(t, err)
assertPeerScores(t, store, id, PeerScores{Gossip: score})
assertPeerScores(t, store, id, PeerScores{Gossip: GossipScores{Total: score}})
}
func TestUpdateGossipScore(t *testing.T) {
id := peer.ID("aaaa")
store := createMemoryStore(t)
score := 123.45
require.NoError(t, store.SetScore(id, TypeGossip, 444.223))
require.NoError(t, store.SetScore(id, TypeGossip, score))
require.NoError(t, store.SetScore(id, &GossipScores{Total: 444.223}))
require.NoError(t, store.SetScore(id, &GossipScores{Total: score}))
assertPeerScores(t, store, id, PeerScores{Gossip: score})
assertPeerScores(t, store, id, PeerScores{Gossip: GossipScores{Total: score}})
}
func TestStoreScoresForMultiplePeers(t *testing.T) {
......@@ -48,11 +48,11 @@ func TestStoreScoresForMultiplePeers(t *testing.T) {
store := createMemoryStore(t)
score1 := 123.45
score2 := 453.22
require.NoError(t, store.SetScore(id1, TypeGossip, score1))
require.NoError(t, store.SetScore(id2, TypeGossip, score2))
require.NoError(t, store.SetScore(id1, &GossipScores{Total: score1}))
require.NoError(t, store.SetScore(id2, &GossipScores{Total: score2}))
assertPeerScores(t, store, id1, PeerScores{Gossip: score1})
assertPeerScores(t, store, id2, PeerScores{Gossip: score2})
assertPeerScores(t, store, id1, PeerScores{Gossip: GossipScores{Total: score1}})
assertPeerScores(t, store, id2, PeerScores{Gossip: GossipScores{Total: score2}})
}
func TestPersistData(t *testing.T) {
......@@ -61,19 +61,13 @@ func TestPersistData(t *testing.T) {
backingStore := sync.MutexWrap(ds.NewMapDatastore())
store := createPeerstoreWithBacking(t, backingStore)
require.NoError(t, store.SetScore(id, TypeGossip, score))
require.NoError(t, store.SetScore(id, &GossipScores{Total: score}))
// Close and recreate a new store from the same backing
require.NoError(t, store.Close())
store = createPeerstoreWithBacking(t, backingStore)
assertPeerScores(t, store, id, PeerScores{Gossip: score})
}
func TestUnknownScoreType(t *testing.T) {
store := createMemoryStore(t)
err := store.SetScore("aaaa", 92832, 244.24)
require.ErrorContains(t, err, "unknown score type")
assertPeerScores(t, store, id, PeerScores{Gossip: GossipScores{Total: score}})
}
func TestCloseCompletes(t *testing.T) {
......@@ -98,17 +92,17 @@ func TestPrune(t *testing.T) {
firstStore := clock.Now()
// Set some scores all 30 minutes apart so they have different expiry times
require.NoError(t, book.SetScore("aaaa", TypeGossip, 123.45))
require.NoError(t, book.SetScore("aaaa", &GossipScores{Total: 123.45}))
clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("bbbb", TypeGossip, 123.45))
require.NoError(t, book.SetScore("bbbb", &GossipScores{Total: 123.45}))
clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("cccc", TypeGossip, 123.45))
require.NoError(t, book.SetScore("cccc", &GossipScores{Total: 123.45}))
clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("dddd", TypeGossip, 123.45))
require.NoError(t, book.SetScore("dddd", &GossipScores{Total: 123.45}))
clock.AdvanceTime(30 * time.Minute)
// Update bbbb again which should extend its expiry
require.NoError(t, book.SetScore("bbbb", TypeGossip, 123.45))
require.NoError(t, book.SetScore("bbbb", &GossipScores{Total: 123.45}))
require.True(t, hasScoreRecorded("aaaa"))
require.True(t, hasScoreRecorded("bbbb"))
......@@ -153,7 +147,7 @@ func TestPruneMultipleBatches(t *testing.T) {
// Set scores for more peers than the max batch size
peerCount := maxPruneBatchSize*3 + 5
for i := 0; i < peerCount; i++ {
require.NoError(t, book.SetScore(peer.ID(strconv.Itoa(i)), TypeGossip, 123.45))
require.NoError(t, book.SetScore(peer.ID(strconv.Itoa(i)), &GossipScores{Total: 123.45}))
}
clock.AdvanceTime(expiryPeriod + 1)
require.NoError(t, book.prune())
......
package store
import (
"bytes"
"encoding/binary"
"time"
)
import "encoding/json"
func serializeScoresV0(scores scoreRecord) ([]byte, error) {
var b bytes.Buffer
err := binary.Write(&b, binary.BigEndian, scores.lastUpdate.UnixMilli())
if err != nil {
return nil, err
}
err = binary.Write(&b, binary.BigEndian, scores.Gossip)
if err != nil {
return nil, err
}
return b.Bytes(), nil
// v0 just serializes to JSON. New/unrecognized values default to 0.
return json.Marshal(&scores)
}
func deserializeScoresV0(data []byte) (scoreRecord, error) {
var scores scoreRecord
r := bytes.NewReader(data)
var lastUpdate int64
err := binary.Read(r, binary.BigEndian, &lastUpdate)
if err != nil {
return scoreRecord{}, err
}
scores.lastUpdate = time.UnixMilli(lastUpdate)
err = binary.Read(r, binary.BigEndian, &scores.Gossip)
if err != nil {
return scoreRecord{}, err
}
return scores, nil
var out scoreRecord
err := json.Unmarshal(data, &out)
return out, err
}
package store
import (
"encoding/json"
"strconv"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
func TestRoundtripScoresV0(t *testing.T) {
scores := scoreRecord{
PeerScores: PeerScores{Gossip: 1234.52382},
lastUpdate: time.UnixMilli(1923841),
PeerScores: PeerScores{Gossip: GossipScores{Total: 1234.52382}},
LastUpdate: 1923841,
}
data, err := serializeScoresV0(scores)
require.NoError(t, err)
......@@ -23,25 +22,41 @@ func TestRoundtripScoresV0(t *testing.T) {
}
// TestParseHistoricSerializations checks that existing data can still be deserialized
// Adding new fields should not require bumping the version, only removing fields
// Adding new fields should not require bumping the version. Removing fields may require bumping.
// Scores should always default to 0.
// A new entry should be added to this test each time any fields are changed to ensure it can always be deserialized
func TestParseHistoricSerializationsV0(t *testing.T) {
tests := []struct {
data []byte
data string
expected scoreRecord
}{
{
data: common.Hex2Bytes("00000000001D5B0140934A18644523F6"),
data: `{"peerScores":{"gossip":{"total":1234.52382,"blocks":{"timeInMesh":1234,"firstMessageDeliveries":12,"meshMessageDeliveries":34,"invalidMessageDeliveries":56},"IPColocationFactor":12.34,"behavioralPenalty":56.78},"reqRespSync":123456},"lastUpdate":1923841}`,
expected: scoreRecord{
PeerScores: PeerScores{Gossip: 1234.52382},
lastUpdate: time.UnixMilli(1923841),
PeerScores: PeerScores{
Gossip: GossipScores{
Total: 1234.52382,
Blocks: TopicScores{
TimeInMesh: 1234,
FirstMessageDeliveries: 12,
MeshMessageDeliveries: 34,
InvalidMessageDeliveries: 56,
},
IPColocationFactor: 12.34,
BehavioralPenalty: 56.78,
},
ReqRespSync: 123456,
},
LastUpdate: 1923841,
},
},
}
for idx, test := range tests {
test := test
out, _ := json.Marshal(&test.expected)
t.Log(string(out))
t.Run(strconv.Itoa(idx), func(t *testing.T) {
result, err := deserializeScoresV0(test.data)
result, err := deserializeScoresV0([]byte(test.data))
require.NoError(t, err)
require.Equal(t, test.expected, result)
})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment