Commit 8be4a933 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into willc/dai

parents abb4fff2 6066c16b
---
'@eth-optimism/contracts-bedrock': patch
---
contracts-bedrock was exporting hardhat when it didn't need to be
......@@ -198,13 +198,15 @@ The ability to pause the bridge in case of emergency means that in the worst cas
#### Unfreezing the bridge via L1 soft fork
In order to address the frozen funds, there is a potential final recovery mechanism we call the “L1 Soft Fork Upgrade Recovery” mechanism. This mechanism enables L1 to initiate a bridge upgrade with a soft fork, bypassing all other permissions within the Superchain bridge contracts. The mechanism is as follows:
In order to address the frozen funds, there is a potential final recovery mechanism which has been discussed by the L2 community which we call the “L1 Soft Fork Upgrade Recovery” mechanism. This mechanism enables L1 to initiate a bridge upgrade with a soft fork, bypassing all other permissions within the Superchain bridge contracts. This approach may [introduce systemic risk](https://vitalik.ca/general/2023/05/21/dont_overload.html) to Ethereum and requires research and community buy-in before implementation. It is not required for implementing the Superchain and is being documented for research completeness. Without further research into the implications and safety, it is not an approach the team currently endorses.
*Anyone* may propose an upgrade by submitting a transaction to a special bridge contract, along with a very large bond. This begins a two week challenge period. During this challenge period, *anyone* may submit a challenge which immediately *cancels* the upgrade and claims the bond. Under normal circumstances, it is impossible that an upgrade would go uncancelled for the required two weeks due to the large incentive provided for anyone to cancel the upgrade. However, if the upgrade is accompanied by a modification to Ethereum L1 validator software (the L1 soft fork), which ignores blocks that contain the cancellation transaction then it may succeed.
The mechanism is as follows:
While a successful upgrade of this type would represent a soft fork of Ethereum L1, it would not incur long term technical debt to the Ethereum codebase because the soft fork logic can be removed once the upgrade has completed.
*Anyone* may propose an upgrade by submitting a transaction to a special bridge contract, along with a very large bond. This begins a two week challenge period. During this challenge period, anyone may submit a challenge which immediately cancels the upgrade and claims the bond. Under normal circumstances, it is impossible that an upgrade would go uncancelled for the required two weeks due to the large incentive provided for anyone to cancel the upgrade. However, if the upgrade is accompanied by a modification to Ethereum L1 validator software (the L1 soft fork), which ignores blocks that contain the cancellation transaction then it may succeed.
We expect this escape hatch will never be used, but its very existence should deter malicious behavior.
While a successful upgrade of this type would represent a soft fork of Ethereum L1, it would not incur long term technical debt to the Ethereum codebase because the soft fork logic can be removed once the upgrade has completed.
We expect this escape hatch will never be used, but its very existence could deter malicious behavior.
### The combination of these features results in a system satisfying the core Superchain properties
......
......@@ -24,6 +24,7 @@ require (
github.com/libp2p/go-libp2p-pubsub v0.9.0
github.com/libp2p/go-libp2p-testing v0.12.0
github.com/mattn/go-isatty v0.0.17
github.com/multiformats/go-base32 v0.1.0
github.com/multiformats/go-multiaddr v0.8.0
github.com/multiformats/go-multiaddr-dns v0.3.1
github.com/olekukonko/tablewriter v0.0.5
......@@ -128,7 +129,6 @@ require (
github.com/moby/term v0.0.0-20221105221325-4eb28fa6025c // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.1.1 // indirect
......
package challenger
import (
"errors"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
)
var ErrMissingEvent = errors.New("missing event")
// BuildOutputLogFilter creates a filter query for the L2OutputOracle contract.
//
// The `OutputProposed` event is encoded as:
// 0: bytes32 indexed outputRoot,
// 1: uint256 indexed l2OutputIndex,
// 2: uint256 indexed l2BlockNumber,
// 3: uint256 l1Timestamp
func BuildOutputLogFilter(l2ooABI *abi.ABI) (ethereum.FilterQuery, error) {
// Get the L2OutputOracle contract `OutputProposed` event
event := l2ooABI.Events["OutputProposed"]
// Sanity check that the `OutputProposed` event is defined
if event.ID == (common.Hash{}) {
return ethereum.FilterQuery{}, ErrMissingEvent
}
query := ethereum.FilterQuery{
Topics: [][]common.Hash{
{event.ID},
},
}
return query, nil
}
package challenger
import (
"testing"
"github.com/stretchr/testify/require"
eth "github.com/ethereum/go-ethereum"
abi "github.com/ethereum/go-ethereum/accounts/abi"
common "github.com/ethereum/go-ethereum/common"
)
// TestBuildOutputLogFilter_Succeeds tests that the Output
// Log Filter is built correctly.
func TestBuildOutputLogFilter_Succeeds(t *testing.T) {
// Create a mock event id
event := abi.Event{
ID: [32]byte{0x01},
}
filterQuery := eth.FilterQuery{
Topics: [][]common.Hash{
{event.ID},
},
}
// Mock the ABI
l2ooABI := abi.ABI{
Events: map[string]abi.Event{
"OutputProposed": event,
},
}
// Build the filter
query, err := BuildOutputLogFilter(&l2ooABI)
require.Equal(t, filterQuery, query)
require.NoError(t, err)
}
// TestBuildOutputLogFilter_Fails tests that the Output
// Log Filter fails when the event definition is missing.
func TestBuildOutputLogFilter_Fails(t *testing.T) {
// Mock the ABI
l2ooABI := abi.ABI{
Events: map[string]abi.Event{},
}
// Build the filter
_, err := BuildOutputLogFilter(&l2ooABI)
require.Error(t, err)
require.Equal(t, ErrMissingEvent, err)
}
......@@ -3,8 +3,10 @@ package op_e2e
import (
"context"
"crypto/ecdsa"
"crypto/rand"
"fmt"
"math/big"
"net"
"os"
"path"
"sort"
......@@ -12,6 +14,18 @@ import (
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/clock"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
......@@ -465,7 +479,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
if p, ok := p2pNodes[name]; ok {
return p, nil
}
h, err := sys.Mocknet.GenPeer()
h, err := sys.newMockNetPeer()
if err != nil {
return nil, fmt.Errorf("failed to init p2p host for node %s", name)
}
......@@ -627,6 +641,51 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
return sys, nil
}
// IP6 range that gets blackholed (in case our traffic ever makes it out onto
// the internet).
var blackholeIP6 = net.ParseIP("100::")
// mocknet doesn't allow us to add a peerstore without fully creating the peer ourselves
func (sys *System) newMockNetPeer() (host.Host, error) {
sk, _, err := ic.GenerateECDSAKeyPair(rand.Reader)
if err != nil {
return nil, err
}
id, err := peer.IDFromPrivateKey(sk)
if err != nil {
return nil, err
}
suffix := id
if len(id) > 8 {
suffix = id[len(id)-8:]
}
ip := append(net.IP{}, blackholeIP6...)
copy(ip[net.IPv6len-len(suffix):], suffix)
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip6/%s/tcp/4242", ip))
if err != nil {
return nil, fmt.Errorf("failed to create test multiaddr: %w", err)
}
p, err := peer.IDFromPublicKey(sk.GetPublic())
if err != nil {
return nil, err
}
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
}
ps.AddAddr(p, a, peerstore.PermanentAddrTTL)
_ = ps.AddPrivKey(p, sk)
_ = ps.AddPubKey(p, sk.GetPublic())
ds := sync.MutexWrap(ds.NewMapDatastore())
eps, err := store.NewExtendedPeerstore(context.Background(), log.Root(), clock.SystemClock, ps, ds)
if err != nil {
return nil, err
}
return sys.Mocknet.AddPeerWithPeerstore(p, eps)
}
func selectEndpoint(node *node.Node) string {
useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true"
if useHTTP {
......
......@@ -706,7 +706,7 @@ func TestSystemP2PAltSync(t *testing.T) {
snapLog.SetHandler(log.DiscardHandler())
// Create a peer, and hook up alice and bob
h, err := sys.Mocknet.GenPeer()
h, err := sys.newMockNetPeer()
require.NoError(t, err)
_, err = sys.Mocknet.LinkPeers(sys.RollupNodes["alice"].P2P().Host().ID(), h.ID())
require.NoError(t, err)
......
......@@ -70,6 +70,10 @@ type Metricer interface {
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
PayloadsQuarantineSize(n int)
RecordPeerUnban()
RecordIPUnban()
RecordDial(allow bool)
RecordAccept(allow bool)
}
// Metrics tracks all the metrics for the op-node.
......@@ -133,6 +137,10 @@ type Metrics struct {
PeerScores *prometheus.GaugeVec
GossipEventsTotal *prometheus.CounterVec
BandwidthTotal *prometheus.GaugeVec
PeerUnbans prometheus.Counter
IPUnbans prometheus.Counter
Dials *prometheus.CounterVec
Accepts *prometheus.CounterVec
ChannelInputBytes prometheus.Counter
......@@ -335,6 +343,30 @@ func NewMetrics(procName string) *Metrics {
}, []string{
"direction",
}),
PeerUnbans: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "peer_unbans",
Help: "Count of peer unbans",
}),
IPUnbans: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "ip_unbans",
Help: "Count of IP unbans",
}),
Dials: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "dials",
Help: "Count of outgoing dial attempts, with label to filter to allowed attempts",
}, []string{"allow"}),
Accepts: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "accepts",
Help: "Count of incoming dial attempts to accept, with label to filter to allowed attempts",
}, []string{"allow"}),
ChannelInputBytes: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns,
......@@ -663,6 +695,30 @@ func (m *Metrics) RecordChannelInputBytes(inputCompressedBytes int) {
m.ChannelInputBytes.Add(float64(inputCompressedBytes))
}
func (m *Metrics) RecordPeerUnban() {
m.PeerUnbans.Inc()
}
func (m *Metrics) RecordIPUnban() {
m.IPUnbans.Inc()
}
func (m *Metrics) RecordDial(allow bool) {
if allow {
m.Dials.WithLabelValues("true").Inc()
} else {
m.Dials.WithLabelValues("false").Inc()
}
}
func (m *Metrics) RecordAccept(allow bool) {
if allow {
m.Accepts.WithLabelValues("true").Inc()
} else {
m.Accepts.WithLabelValues("false").Inc()
}
}
type noopMetricer struct{}
var NoopMetrics Metricer = new(noopMetricer)
......@@ -768,3 +824,15 @@ func (n *noopMetricer) PayloadsQuarantineSize(int) {
func (n *noopMetricer) RecordChannelInputBytes(int) {
}
func (n *noopMetricer) RecordPeerUnban() {
}
func (n *noopMetricer) RecordIPUnban() {
}
func (n *noopMetricer) RecordDial(allow bool) {
}
func (n *noopMetricer) RecordAccept(allow bool) {
}
......@@ -6,6 +6,8 @@ import (
"net"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
......@@ -17,8 +19,6 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
cmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/ethereum-optimism/optimism/op-node/rollup"
......@@ -30,12 +30,17 @@ var DefaultBootnodes = []*enode.Node{
enode.MustParse("enode://9d7a3efefe442351217e73b3a593bcb8efffb55b4807699972145324eab5e6b382152f8d24f6301baebbfb5ecd4127bd3faab2842c04cd432bdf50ba092f6645@34.65.109.126:0?discport=30305"),
}
type HostMetrics interface {
gating.UnbanMetrics
gating.ConnectionGaterMetrics
}
// SetupP2P provides a host and discovery service for usage in the rollup node.
type SetupP2P interface {
Check() error
Disabled() bool
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
Host(log log.Logger, reporter metrics.Reporter) (host.Host, error)
Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error)
// Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled.
Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error)
TargetPeers() uint
......@@ -109,33 +114,6 @@ type Config struct {
EnableReqRespSync bool
}
//go:generate mockery --name ConnectionGater
type ConnectionGater interface {
connmgr.ConnectionGater
// BlockPeer adds a peer to the set of blocked peers.
// Note: active connections to the peer are not automatically closed.
BlockPeer(p peer.ID) error
UnblockPeer(p peer.ID) error
ListBlockedPeers() []peer.ID
// BlockAddr adds an IP address to the set of blocked addresses.
// Note: active connections to the IP address are not automatically closed.
BlockAddr(ip net.IP) error
UnblockAddr(ip net.IP) error
ListBlockedAddrs() []net.IP
// BlockSubnet adds an IP subnet to the set of blocked addresses.
// Note: active connections to the IP subnet are not automatically closed.
BlockSubnet(ipnet *net.IPNet) error
UnblockSubnet(ipnet *net.IPNet) error
ListBlockedSubnets() []*net.IPNet
}
func DefaultConnGater(conf *Config) (connmgr.ConnectionGater, error) {
return conngater.NewBasicConnectionGater(conf.Store)
}
func DefaultConnManager(conf *Config) (connmgr.ConnManager, error) {
return cmgr.NewConnManager(
int(conf.PeersLo),
......
package gating
import (
"net"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
)
//go:generate mockery --name BlockingConnectionGater --output mocks/ --with-expecter=true
type BlockingConnectionGater interface {
connmgr.ConnectionGater
// BlockPeer adds a peer to the set of blocked peers.
// Note: active connections to the peer are not automatically closed.
BlockPeer(p peer.ID) error
UnblockPeer(p peer.ID) error
ListBlockedPeers() []peer.ID
// BlockAddr adds an IP address to the set of blocked addresses.
// Note: active connections to the IP address are not automatically closed.
BlockAddr(ip net.IP) error
UnblockAddr(ip net.IP) error
ListBlockedAddrs() []net.IP
// BlockSubnet adds an IP subnet to the set of blocked addresses.
// Note: active connections to the IP subnet are not automatically closed.
BlockSubnet(ipnet *net.IPNet) error
UnblockSubnet(ipnet *net.IPNet) error
ListBlockedSubnets() []*net.IPNet
}
func NewBlockingConnectionGater(store ds.Batching) (BlockingConnectionGater, error) {
return conngater.NewBasicConnectionGater(store)
}
package gating
import (
"errors"
"net"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
type UnbanMetrics interface {
RecordPeerUnban()
RecordIPUnban()
}
var UnknownExpiry = errors.New("unknown ban expiry")
//go:generate mockery --name ExpiryStore --output mocks/ --with-expecter=true
type ExpiryStore interface {
PeerBanExpiry(id peer.ID) (time.Time, error)
IPBanExpiry(ip net.IP) (time.Time, error)
}
// ExpiryConnectionGater enhances a BlockingConnectionGater by implementing ban-expiration
type ExpiryConnectionGater struct {
BlockingConnectionGater
store ExpiryStore
log log.Logger
clock clock.Clock
m UnbanMetrics
}
func AddBanExpiry(gater BlockingConnectionGater, store ExpiryStore, log log.Logger, clock clock.Clock, m UnbanMetrics) *ExpiryConnectionGater {
return &ExpiryConnectionGater{
BlockingConnectionGater: gater,
store: store,
log: log,
clock: clock,
m: m,
}
}
func (g *ExpiryConnectionGater) peerBanExpiryCheck(p peer.ID) (allow bool) {
// if the peer is blocked, check if it's time to unblock
expiry, err := g.store.PeerBanExpiry(p)
if err != nil {
if errors.Is(err, UnknownExpiry) {
return false // peer is permanently banned if no expiry time is set.
}
g.log.Warn("failed to load peer-ban expiry time", "peer_id", p, "err", err)
return false
}
if g.clock.Now().Before(expiry) {
return false
}
g.log.Info("peer-ban expired, unbanning peer", "peer_id", p, "expiry", expiry)
if err := g.BlockingConnectionGater.UnblockPeer(p); err != nil {
g.log.Warn("failed to unban peer", "peer_id", p, "err", err)
return false // if we ignored the error, then the inner connection-gater would drop them
}
g.m.RecordPeerUnban()
return true
}
func (g *ExpiryConnectionGater) addrBanExpiryCheck(ma multiaddr.Multiaddr) (allow bool) {
ip, err := manet.ToIP(ma)
if err != nil {
g.log.Error("tried to check multi-addr with bad IP", "addr", ma)
return false
}
// Check if it's a subnet-wide ban first. Subnet-bans do not expire.
for _, ipnet := range g.BlockingConnectionGater.ListBlockedSubnets() {
if ipnet.Contains(ip) {
return false // peer is still in banned subnet
}
}
// if just the IP is blocked, check if it's time to unblock
expiry, err := g.store.IPBanExpiry(ip)
if err != nil {
if errors.Is(err, UnknownExpiry) {
return false // IP is permanently banned if no expiry time is set.
}
g.log.Warn("failed to load IP-ban expiry time", "ip", ip, "err", err)
return false
}
if g.clock.Now().Before(expiry) {
return false
}
g.log.Info("IP-ban expired, unbanning IP", "ip", ip, "expiry", expiry)
if err := g.BlockingConnectionGater.UnblockAddr(ip); err != nil {
g.log.Warn("failed to unban IP", "ip", ip, "err", err)
return false // if we ignored the error, then the inner connection-gater would drop them
}
g.m.RecordIPUnban()
return true
}
func (g *ExpiryConnectionGater) InterceptPeerDial(p peer.ID) (allow bool) {
// if not allowed, and not expired, then do not allow the dial
return g.BlockingConnectionGater.InterceptPeerDial(p) || g.peerBanExpiryCheck(p)
}
func (g *ExpiryConnectionGater) InterceptAddrDial(id peer.ID, ma multiaddr.Multiaddr) (allow bool) {
if !g.BlockingConnectionGater.InterceptAddrDial(id, ma) {
// Check if it was intercepted because of a peer ban
if !g.BlockingConnectionGater.InterceptPeerDial(id) {
if !g.peerBanExpiryCheck(id) {
return false // peer is still peer-banned
}
if g.BlockingConnectionGater.InterceptAddrDial(id, ma) { // allow dial if peer-ban was everything
return true
}
}
// intercepted because of addr ban still, check if it is expired
if !g.addrBanExpiryCheck(ma) {
return false // peer is still addr-banned
}
}
return true
}
func (g *ExpiryConnectionGater) InterceptAccept(mas network.ConnMultiaddrs) (allow bool) {
return g.BlockingConnectionGater.InterceptAccept(mas) || g.addrBanExpiryCheck(mas.RemoteMultiaddr())
}
func (g *ExpiryConnectionGater) InterceptSecured(direction network.Direction, id peer.ID, mas network.ConnMultiaddrs) (allow bool) {
// Outbound dials are always accepted: the dial intercepts handle it before the connection is made.
if direction == network.DirOutbound {
return true
}
// InterceptSecured is called after InterceptAccept, we already checked the addrs.
// This leaves just the peer-ID expiry to check on inbound connections.
return g.BlockingConnectionGater.InterceptSecured(direction, id, mas) || g.peerBanExpiryCheck(id)
}
package gating
import (
"net"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating/mocks"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
log "github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
func expiryTestSetup(t *testing.T) (*clock.DeterministicClock, *mocks.ExpiryStore, *mocks.BlockingConnectionGater, *ExpiryConnectionGater) {
mockGater := mocks.NewBlockingConnectionGater(t)
log := testlog.Logger(t, log.LvlError)
cl := clock.NewDeterministicClock(time.Now())
mockExpiryStore := mocks.NewExpiryStore(t)
gater := AddBanExpiry(mockGater, mockExpiryStore, log, cl, metrics.NoopMetrics)
return cl, mockExpiryStore, mockGater, gater
}
func TestExpiryConnectionGater_InterceptPeerDial(t *testing.T) {
mallory := peer.ID("malllory")
t.Run("expired peer ban", func(t *testing.T) {
cl, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptPeerDial(mallory).Return(false)
mockExpiryStore.EXPECT().PeerBanExpiry(mallory).Return(cl.Now().Add(-time.Second), nil)
mockGater.EXPECT().UnblockPeer(mallory).Return(nil)
allow := gater.InterceptPeerDial(mallory)
require.True(t, allow)
})
t.Run("active peer ban", func(t *testing.T) {
cl, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptPeerDial(mallory).Return(false)
mockExpiryStore.EXPECT().PeerBanExpiry(mallory).Return(cl.Now().Add(time.Second), nil)
allow := gater.InterceptPeerDial(mallory)
require.False(t, allow)
})
t.Run("unknown expiry", func(t *testing.T) {
_, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptPeerDial(mallory).Return(false)
mockExpiryStore.EXPECT().PeerBanExpiry(mallory).Return(time.Time{}, UnknownExpiry)
allow := gater.InterceptPeerDial(mallory)
require.False(t, allow)
})
t.Run("no ban", func(t *testing.T) {
_, _, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptPeerDial(mallory).Return(true)
allow := gater.InterceptPeerDial(mallory)
require.True(t, allow)
})
}
func TestExpiryConnectionGater_InterceptAddrDial(t *testing.T) {
ip := net.IPv4(1, 2, 3, 4)
mallory := peer.ID("7y9Qv7mG2h6fnzcDkeqVsEvW2rU9PdybSZ8y1dCrB9p")
addr, err := multiaddr.NewMultiaddr("/ip4/1.2.3.4/tcp/9000")
require.NoError(t, err)
t.Run("expired IP ban", func(t *testing.T) {
cl, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptAddrDial(mallory, addr).Return(false)
mockGater.EXPECT().InterceptPeerDial(mallory).Return(true)
mockGater.EXPECT().ListBlockedSubnets().Return(nil)
mockExpiryStore.EXPECT().IPBanExpiry(ip.To4()).Return(cl.Now().Add(-time.Second), nil)
mockGater.EXPECT().UnblockAddr(ip.To4()).Return(nil)
allow := gater.InterceptAddrDial(mallory, addr)
require.True(t, allow)
})
t.Run("active IP ban", func(t *testing.T) {
cl, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptAddrDial(mallory, addr).Return(false)
mockGater.EXPECT().InterceptPeerDial(mallory).Return(true)
mockGater.EXPECT().ListBlockedSubnets().Return(nil)
mockExpiryStore.EXPECT().IPBanExpiry(ip.To4()).Return(cl.Now().Add(time.Second), nil)
allow := gater.InterceptAddrDial(mallory, addr)
require.False(t, allow)
})
t.Run("unknown expiry", func(t *testing.T) {
_, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptAddrDial(mallory, addr).Return(false)
mockGater.EXPECT().InterceptPeerDial(mallory).Return(true)
mockGater.EXPECT().ListBlockedSubnets().Return(nil)
mockExpiryStore.EXPECT().IPBanExpiry(ip.To4()).Return(time.Time{}, UnknownExpiry)
allow := gater.InterceptAddrDial(mallory, addr)
require.False(t, allow)
})
t.Run("no ban", func(t *testing.T) {
_, _, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptAddrDial(mallory, addr).Return(true)
allow := gater.InterceptAddrDial(mallory, addr)
require.True(t, allow)
})
t.Run("subnet ban", func(t *testing.T) {
_, _, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptAddrDial(mallory, addr).Return(false)
mockGater.EXPECT().InterceptPeerDial(mallory).Return(true)
mockGater.EXPECT().ListBlockedSubnets().Return([]*net.IPNet{
{IP: net.IPv4(1, 2, 0, 0), Mask: net.IPv4Mask(0xff, 0xff, 0, 0)},
})
allow := gater.InterceptAddrDial(mallory, addr)
require.False(t, allow)
})
t.Run("expired peer ban but active ip ban", func(t *testing.T) {
cl, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptAddrDial(mallory, addr).Return(false)
mockGater.EXPECT().InterceptPeerDial(mallory).Return(false)
mockExpiryStore.EXPECT().PeerBanExpiry(mallory).Return(cl.Now().Add(-time.Second), nil)
mockGater.EXPECT().UnblockPeer(mallory).Return(nil)
mockGater.EXPECT().InterceptAddrDial(mallory, addr).Return(false)
mockGater.EXPECT().ListBlockedSubnets().Return(nil)
mockExpiryStore.EXPECT().IPBanExpiry(ip.To4()).Return(cl.Now().Add(time.Second), nil)
allow := gater.InterceptAddrDial(mallory, addr)
require.False(t, allow)
})
t.Run("active peer ban", func(t *testing.T) {
cl, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptAddrDial(mallory, addr).Return(false)
mockGater.EXPECT().InterceptPeerDial(mallory).Return(false)
mockExpiryStore.EXPECT().PeerBanExpiry(mallory).Return(cl.Now().Add(time.Second), nil)
allow := gater.InterceptAddrDial(mallory, addr)
require.False(t, allow)
})
t.Run("expired peer ban and expired ip ban", func(t *testing.T) {
cl, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptAddrDial(mallory, addr).Return(false)
mockGater.EXPECT().InterceptPeerDial(mallory).Return(false)
mockExpiryStore.EXPECT().PeerBanExpiry(mallory).Return(cl.Now().Add(-time.Second), nil)
mockGater.EXPECT().UnblockPeer(mallory).Return(nil)
mockGater.EXPECT().InterceptAddrDial(mallory, addr).Return(false)
mockGater.EXPECT().ListBlockedSubnets().Return(nil)
mockExpiryStore.EXPECT().IPBanExpiry(ip.To4()).Return(cl.Now().Add(-time.Second), nil)
mockGater.EXPECT().UnblockAddr(ip.To4()).Return(nil)
allow := gater.InterceptAddrDial(mallory, addr)
require.True(t, allow)
})
}
type localRemoteAddrs struct {
local multiaddr.Multiaddr
remote multiaddr.Multiaddr
}
func (l localRemoteAddrs) LocalMultiaddr() multiaddr.Multiaddr {
return l.local
}
func (l localRemoteAddrs) RemoteMultiaddr() multiaddr.Multiaddr {
return l.remote
}
var _ network.ConnMultiaddrs = localRemoteAddrs{}
func TestExpiryConnectionGater_InterceptAccept(t *testing.T) {
ip := net.IPv4(1, 2, 3, 4)
addr, err := multiaddr.NewMultiaddr("/ip4/1.2.3.4/tcp/9000")
require.NoError(t, err)
mas := localRemoteAddrs{remote: addr}
t.Run("expired IP ban", func(t *testing.T) {
cl, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptAccept(mas).Return(false)
mockGater.EXPECT().ListBlockedSubnets().Return(nil)
mockExpiryStore.EXPECT().IPBanExpiry(ip.To4()).Return(cl.Now().Add(-time.Second), nil)
mockGater.EXPECT().UnblockAddr(ip.To4()).Return(nil)
allow := gater.InterceptAccept(mas)
require.True(t, allow)
})
t.Run("active IP ban", func(t *testing.T) {
cl, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptAccept(mas).Return(false)
mockGater.EXPECT().ListBlockedSubnets().Return(nil)
mockExpiryStore.EXPECT().IPBanExpiry(ip.To4()).Return(cl.Now().Add(time.Second), nil)
allow := gater.InterceptAccept(mas)
require.False(t, allow)
})
t.Run("unknown expiry", func(t *testing.T) {
_, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptAccept(mas).Return(false)
mockGater.EXPECT().ListBlockedSubnets().Return(nil)
mockExpiryStore.EXPECT().IPBanExpiry(ip.To4()).Return(time.Time{}, UnknownExpiry)
allow := gater.InterceptAccept(mas)
require.False(t, allow)
})
t.Run("no ban", func(t *testing.T) {
_, _, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptAccept(mas).Return(true)
allow := gater.InterceptAccept(mas)
require.True(t, allow)
})
t.Run("subnet ban", func(t *testing.T) {
_, _, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptAccept(mas).Return(false)
mockGater.EXPECT().ListBlockedSubnets().Return([]*net.IPNet{
{IP: net.IPv4(1, 2, 0, 0), Mask: net.IPv4Mask(0xff, 0xff, 0, 0)},
})
allow := gater.InterceptAccept(mas)
require.False(t, allow)
})
}
func TestExpiryConnectionGater_InterceptSecured(t *testing.T) {
mallory := peer.ID("7y9Qv7mG2h6fnzcDkeqVsEvW2rU9PdybSZ8y1dCrB9p")
addr, err := multiaddr.NewMultiaddr("/ip4/1.2.3.4/tcp/9000")
require.NoError(t, err)
mas := localRemoteAddrs{remote: addr}
t.Run("expired peer ban", func(t *testing.T) {
cl, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptSecured(network.DirInbound, mallory, mas).Return(false)
mockExpiryStore.EXPECT().PeerBanExpiry(mallory).Return(cl.Now().Add(-time.Second), nil)
mockGater.EXPECT().UnblockPeer(mallory).Return(nil)
allow := gater.InterceptSecured(network.DirInbound, mallory, mas)
require.True(t, allow)
})
t.Run("active peer ban", func(t *testing.T) {
cl, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptSecured(network.DirInbound, mallory, mas).Return(false)
mockExpiryStore.EXPECT().PeerBanExpiry(mallory).Return(cl.Now().Add(time.Second), nil)
allow := gater.InterceptSecured(network.DirInbound, mallory, mas)
require.False(t, allow)
})
t.Run("unknown expiry", func(t *testing.T) {
_, mockExpiryStore, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptSecured(network.DirInbound, mallory, mas).Return(false)
mockExpiryStore.EXPECT().PeerBanExpiry(mallory).Return(time.Time{}, UnknownExpiry)
allow := gater.InterceptSecured(network.DirInbound, mallory, mas)
require.False(t, allow)
})
t.Run("no ban", func(t *testing.T) {
_, _, mockGater, gater := expiryTestSetup(t)
mockGater.EXPECT().InterceptSecured(network.DirInbound, mallory, mas).Return(true)
allow := gater.InterceptSecured(network.DirInbound, mallory, mas)
require.True(t, allow)
})
t.Run("accept outbound", func(t *testing.T) {
_, _, _, gater := expiryTestSetup(t)
allow := gater.InterceptSecured(network.DirOutbound, mallory, mas)
require.True(t, allow)
})
}
package gating
import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
type ConnectionGaterMetrics interface {
RecordDial(allow bool)
RecordAccept(allow bool)
}
type MeteredConnectionGater struct {
BlockingConnectionGater
m ConnectionGaterMetrics
}
func AddMetering(gater BlockingConnectionGater, m ConnectionGaterMetrics) *MeteredConnectionGater {
return &MeteredConnectionGater{BlockingConnectionGater: gater, m: m}
}
func (g *MeteredConnectionGater) InterceptPeerDial(p peer.ID) (allow bool) {
allow = g.BlockingConnectionGater.InterceptPeerDial(p)
g.m.RecordDial(allow)
return allow
}
func (g *MeteredConnectionGater) InterceptAddrDial(id peer.ID, ma multiaddr.Multiaddr) (allow bool) {
allow = g.BlockingConnectionGater.InterceptAddrDial(id, ma)
g.m.RecordDial(allow)
return allow
}
func (g *MeteredConnectionGater) InterceptAccept(mas network.ConnMultiaddrs) (allow bool) {
allow = g.BlockingConnectionGater.InterceptAccept(mas)
g.m.RecordAccept(allow)
return allow
}
func (g *MeteredConnectionGater) InterceptSecured(dir network.Direction, id peer.ID, mas network.ConnMultiaddrs) (allow bool) {
allow = g.BlockingConnectionGater.InterceptSecured(dir, id, mas)
g.m.RecordAccept(allow)
return allow
}
// Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks
import (
control "github.com/libp2p/go-libp2p/core/control"
mock "github.com/stretchr/testify/mock"
multiaddr "github.com/multiformats/go-multiaddr"
net "net"
network "github.com/libp2p/go-libp2p/core/network"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// BlockingConnectionGater is an autogenerated mock type for the BlockingConnectionGater type
type BlockingConnectionGater struct {
mock.Mock
}
type BlockingConnectionGater_Expecter struct {
mock *mock.Mock
}
func (_m *BlockingConnectionGater) EXPECT() *BlockingConnectionGater_Expecter {
return &BlockingConnectionGater_Expecter{mock: &_m.Mock}
}
// BlockAddr provides a mock function with given fields: ip
func (_m *BlockingConnectionGater) BlockAddr(ip net.IP) error {
ret := _m.Called(ip)
var r0 error
if rf, ok := ret.Get(0).(func(net.IP) error); ok {
r0 = rf(ip)
} else {
r0 = ret.Error(0)
}
return r0
}
// BlockingConnectionGater_BlockAddr_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BlockAddr'
type BlockingConnectionGater_BlockAddr_Call struct {
*mock.Call
}
// BlockAddr is a helper method to define mock.On call
// - ip net.IP
func (_e *BlockingConnectionGater_Expecter) BlockAddr(ip interface{}) *BlockingConnectionGater_BlockAddr_Call {
return &BlockingConnectionGater_BlockAddr_Call{Call: _e.mock.On("BlockAddr", ip)}
}
func (_c *BlockingConnectionGater_BlockAddr_Call) Run(run func(ip net.IP)) *BlockingConnectionGater_BlockAddr_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(net.IP))
})
return _c
}
func (_c *BlockingConnectionGater_BlockAddr_Call) Return(_a0 error) *BlockingConnectionGater_BlockAddr_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *BlockingConnectionGater_BlockAddr_Call) RunAndReturn(run func(net.IP) error) *BlockingConnectionGater_BlockAddr_Call {
_c.Call.Return(run)
return _c
}
// BlockPeer provides a mock function with given fields: p
func (_m *BlockingConnectionGater) BlockPeer(p peer.ID) error {
ret := _m.Called(p)
var r0 error
if rf, ok := ret.Get(0).(func(peer.ID) error); ok {
r0 = rf(p)
} else {
r0 = ret.Error(0)
}
return r0
}
// BlockingConnectionGater_BlockPeer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BlockPeer'
type BlockingConnectionGater_BlockPeer_Call struct {
*mock.Call
}
// BlockPeer is a helper method to define mock.On call
// - p peer.ID
func (_e *BlockingConnectionGater_Expecter) BlockPeer(p interface{}) *BlockingConnectionGater_BlockPeer_Call {
return &BlockingConnectionGater_BlockPeer_Call{Call: _e.mock.On("BlockPeer", p)}
}
func (_c *BlockingConnectionGater_BlockPeer_Call) Run(run func(p peer.ID)) *BlockingConnectionGater_BlockPeer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID))
})
return _c
}
func (_c *BlockingConnectionGater_BlockPeer_Call) Return(_a0 error) *BlockingConnectionGater_BlockPeer_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *BlockingConnectionGater_BlockPeer_Call) RunAndReturn(run func(peer.ID) error) *BlockingConnectionGater_BlockPeer_Call {
_c.Call.Return(run)
return _c
}
// BlockSubnet provides a mock function with given fields: ipnet
func (_m *BlockingConnectionGater) BlockSubnet(ipnet *net.IPNet) error {
ret := _m.Called(ipnet)
var r0 error
if rf, ok := ret.Get(0).(func(*net.IPNet) error); ok {
r0 = rf(ipnet)
} else {
r0 = ret.Error(0)
}
return r0
}
// BlockingConnectionGater_BlockSubnet_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BlockSubnet'
type BlockingConnectionGater_BlockSubnet_Call struct {
*mock.Call
}
// BlockSubnet is a helper method to define mock.On call
// - ipnet *net.IPNet
func (_e *BlockingConnectionGater_Expecter) BlockSubnet(ipnet interface{}) *BlockingConnectionGater_BlockSubnet_Call {
return &BlockingConnectionGater_BlockSubnet_Call{Call: _e.mock.On("BlockSubnet", ipnet)}
}
func (_c *BlockingConnectionGater_BlockSubnet_Call) Run(run func(ipnet *net.IPNet)) *BlockingConnectionGater_BlockSubnet_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*net.IPNet))
})
return _c
}
func (_c *BlockingConnectionGater_BlockSubnet_Call) Return(_a0 error) *BlockingConnectionGater_BlockSubnet_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *BlockingConnectionGater_BlockSubnet_Call) RunAndReturn(run func(*net.IPNet) error) *BlockingConnectionGater_BlockSubnet_Call {
_c.Call.Return(run)
return _c
}
// InterceptAccept provides a mock function with given fields: _a0
func (_m *BlockingConnectionGater) InterceptAccept(_a0 network.ConnMultiaddrs) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(network.ConnMultiaddrs) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// BlockingConnectionGater_InterceptAccept_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InterceptAccept'
type BlockingConnectionGater_InterceptAccept_Call struct {
*mock.Call
}
// InterceptAccept is a helper method to define mock.On call
// - _a0 network.ConnMultiaddrs
func (_e *BlockingConnectionGater_Expecter) InterceptAccept(_a0 interface{}) *BlockingConnectionGater_InterceptAccept_Call {
return &BlockingConnectionGater_InterceptAccept_Call{Call: _e.mock.On("InterceptAccept", _a0)}
}
func (_c *BlockingConnectionGater_InterceptAccept_Call) Run(run func(_a0 network.ConnMultiaddrs)) *BlockingConnectionGater_InterceptAccept_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(network.ConnMultiaddrs))
})
return _c
}
func (_c *BlockingConnectionGater_InterceptAccept_Call) Return(allow bool) *BlockingConnectionGater_InterceptAccept_Call {
_c.Call.Return(allow)
return _c
}
func (_c *BlockingConnectionGater_InterceptAccept_Call) RunAndReturn(run func(network.ConnMultiaddrs) bool) *BlockingConnectionGater_InterceptAccept_Call {
_c.Call.Return(run)
return _c
}
// InterceptAddrDial provides a mock function with given fields: _a0, _a1
func (_m *BlockingConnectionGater) InterceptAddrDial(_a0 peer.ID, _a1 multiaddr.Multiaddr) bool {
ret := _m.Called(_a0, _a1)
var r0 bool
if rf, ok := ret.Get(0).(func(peer.ID, multiaddr.Multiaddr) bool); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// BlockingConnectionGater_InterceptAddrDial_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InterceptAddrDial'
type BlockingConnectionGater_InterceptAddrDial_Call struct {
*mock.Call
}
// InterceptAddrDial is a helper method to define mock.On call
// - _a0 peer.ID
// - _a1 multiaddr.Multiaddr
func (_e *BlockingConnectionGater_Expecter) InterceptAddrDial(_a0 interface{}, _a1 interface{}) *BlockingConnectionGater_InterceptAddrDial_Call {
return &BlockingConnectionGater_InterceptAddrDial_Call{Call: _e.mock.On("InterceptAddrDial", _a0, _a1)}
}
func (_c *BlockingConnectionGater_InterceptAddrDial_Call) Run(run func(_a0 peer.ID, _a1 multiaddr.Multiaddr)) *BlockingConnectionGater_InterceptAddrDial_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID), args[1].(multiaddr.Multiaddr))
})
return _c
}
func (_c *BlockingConnectionGater_InterceptAddrDial_Call) Return(allow bool) *BlockingConnectionGater_InterceptAddrDial_Call {
_c.Call.Return(allow)
return _c
}
func (_c *BlockingConnectionGater_InterceptAddrDial_Call) RunAndReturn(run func(peer.ID, multiaddr.Multiaddr) bool) *BlockingConnectionGater_InterceptAddrDial_Call {
_c.Call.Return(run)
return _c
}
// InterceptPeerDial provides a mock function with given fields: p
func (_m *BlockingConnectionGater) InterceptPeerDial(p peer.ID) bool {
ret := _m.Called(p)
var r0 bool
if rf, ok := ret.Get(0).(func(peer.ID) bool); ok {
r0 = rf(p)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// BlockingConnectionGater_InterceptPeerDial_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InterceptPeerDial'
type BlockingConnectionGater_InterceptPeerDial_Call struct {
*mock.Call
}
// InterceptPeerDial is a helper method to define mock.On call
// - p peer.ID
func (_e *BlockingConnectionGater_Expecter) InterceptPeerDial(p interface{}) *BlockingConnectionGater_InterceptPeerDial_Call {
return &BlockingConnectionGater_InterceptPeerDial_Call{Call: _e.mock.On("InterceptPeerDial", p)}
}
func (_c *BlockingConnectionGater_InterceptPeerDial_Call) Run(run func(p peer.ID)) *BlockingConnectionGater_InterceptPeerDial_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID))
})
return _c
}
func (_c *BlockingConnectionGater_InterceptPeerDial_Call) Return(allow bool) *BlockingConnectionGater_InterceptPeerDial_Call {
_c.Call.Return(allow)
return _c
}
func (_c *BlockingConnectionGater_InterceptPeerDial_Call) RunAndReturn(run func(peer.ID) bool) *BlockingConnectionGater_InterceptPeerDial_Call {
_c.Call.Return(run)
return _c
}
// InterceptSecured provides a mock function with given fields: _a0, _a1, _a2
func (_m *BlockingConnectionGater) InterceptSecured(_a0 network.Direction, _a1 peer.ID, _a2 network.ConnMultiaddrs) bool {
ret := _m.Called(_a0, _a1, _a2)
var r0 bool
if rf, ok := ret.Get(0).(func(network.Direction, peer.ID, network.ConnMultiaddrs) bool); ok {
r0 = rf(_a0, _a1, _a2)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// BlockingConnectionGater_InterceptSecured_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InterceptSecured'
type BlockingConnectionGater_InterceptSecured_Call struct {
*mock.Call
}
// InterceptSecured is a helper method to define mock.On call
// - _a0 network.Direction
// - _a1 peer.ID
// - _a2 network.ConnMultiaddrs
func (_e *BlockingConnectionGater_Expecter) InterceptSecured(_a0 interface{}, _a1 interface{}, _a2 interface{}) *BlockingConnectionGater_InterceptSecured_Call {
return &BlockingConnectionGater_InterceptSecured_Call{Call: _e.mock.On("InterceptSecured", _a0, _a1, _a2)}
}
func (_c *BlockingConnectionGater_InterceptSecured_Call) Run(run func(_a0 network.Direction, _a1 peer.ID, _a2 network.ConnMultiaddrs)) *BlockingConnectionGater_InterceptSecured_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(network.Direction), args[1].(peer.ID), args[2].(network.ConnMultiaddrs))
})
return _c
}
func (_c *BlockingConnectionGater_InterceptSecured_Call) Return(allow bool) *BlockingConnectionGater_InterceptSecured_Call {
_c.Call.Return(allow)
return _c
}
func (_c *BlockingConnectionGater_InterceptSecured_Call) RunAndReturn(run func(network.Direction, peer.ID, network.ConnMultiaddrs) bool) *BlockingConnectionGater_InterceptSecured_Call {
_c.Call.Return(run)
return _c
}
// InterceptUpgraded provides a mock function with given fields: _a0
func (_m *BlockingConnectionGater) InterceptUpgraded(_a0 network.Conn) (bool, control.DisconnectReason) {
ret := _m.Called(_a0)
var r0 bool
var r1 control.DisconnectReason
if rf, ok := ret.Get(0).(func(network.Conn) (bool, control.DisconnectReason)); ok {
return rf(_a0)
}
if rf, ok := ret.Get(0).(func(network.Conn) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
if rf, ok := ret.Get(1).(func(network.Conn) control.DisconnectReason); ok {
r1 = rf(_a0)
} else {
r1 = ret.Get(1).(control.DisconnectReason)
}
return r0, r1
}
// BlockingConnectionGater_InterceptUpgraded_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InterceptUpgraded'
type BlockingConnectionGater_InterceptUpgraded_Call struct {
*mock.Call
}
// InterceptUpgraded is a helper method to define mock.On call
// - _a0 network.Conn
func (_e *BlockingConnectionGater_Expecter) InterceptUpgraded(_a0 interface{}) *BlockingConnectionGater_InterceptUpgraded_Call {
return &BlockingConnectionGater_InterceptUpgraded_Call{Call: _e.mock.On("InterceptUpgraded", _a0)}
}
func (_c *BlockingConnectionGater_InterceptUpgraded_Call) Run(run func(_a0 network.Conn)) *BlockingConnectionGater_InterceptUpgraded_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(network.Conn))
})
return _c
}
func (_c *BlockingConnectionGater_InterceptUpgraded_Call) Return(allow bool, reason control.DisconnectReason) *BlockingConnectionGater_InterceptUpgraded_Call {
_c.Call.Return(allow, reason)
return _c
}
func (_c *BlockingConnectionGater_InterceptUpgraded_Call) RunAndReturn(run func(network.Conn) (bool, control.DisconnectReason)) *BlockingConnectionGater_InterceptUpgraded_Call {
_c.Call.Return(run)
return _c
}
// ListBlockedAddrs provides a mock function with given fields:
func (_m *BlockingConnectionGater) ListBlockedAddrs() []net.IP {
ret := _m.Called()
var r0 []net.IP
if rf, ok := ret.Get(0).(func() []net.IP); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]net.IP)
}
}
return r0
}
// BlockingConnectionGater_ListBlockedAddrs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListBlockedAddrs'
type BlockingConnectionGater_ListBlockedAddrs_Call struct {
*mock.Call
}
// ListBlockedAddrs is a helper method to define mock.On call
func (_e *BlockingConnectionGater_Expecter) ListBlockedAddrs() *BlockingConnectionGater_ListBlockedAddrs_Call {
return &BlockingConnectionGater_ListBlockedAddrs_Call{Call: _e.mock.On("ListBlockedAddrs")}
}
func (_c *BlockingConnectionGater_ListBlockedAddrs_Call) Run(run func()) *BlockingConnectionGater_ListBlockedAddrs_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *BlockingConnectionGater_ListBlockedAddrs_Call) Return(_a0 []net.IP) *BlockingConnectionGater_ListBlockedAddrs_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *BlockingConnectionGater_ListBlockedAddrs_Call) RunAndReturn(run func() []net.IP) *BlockingConnectionGater_ListBlockedAddrs_Call {
_c.Call.Return(run)
return _c
}
// ListBlockedPeers provides a mock function with given fields:
func (_m *BlockingConnectionGater) ListBlockedPeers() []peer.ID {
ret := _m.Called()
var r0 []peer.ID
if rf, ok := ret.Get(0).(func() []peer.ID); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]peer.ID)
}
}
return r0
}
// BlockingConnectionGater_ListBlockedPeers_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListBlockedPeers'
type BlockingConnectionGater_ListBlockedPeers_Call struct {
*mock.Call
}
// ListBlockedPeers is a helper method to define mock.On call
func (_e *BlockingConnectionGater_Expecter) ListBlockedPeers() *BlockingConnectionGater_ListBlockedPeers_Call {
return &BlockingConnectionGater_ListBlockedPeers_Call{Call: _e.mock.On("ListBlockedPeers")}
}
func (_c *BlockingConnectionGater_ListBlockedPeers_Call) Run(run func()) *BlockingConnectionGater_ListBlockedPeers_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *BlockingConnectionGater_ListBlockedPeers_Call) Return(_a0 []peer.ID) *BlockingConnectionGater_ListBlockedPeers_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *BlockingConnectionGater_ListBlockedPeers_Call) RunAndReturn(run func() []peer.ID) *BlockingConnectionGater_ListBlockedPeers_Call {
_c.Call.Return(run)
return _c
}
// ListBlockedSubnets provides a mock function with given fields:
func (_m *BlockingConnectionGater) ListBlockedSubnets() []*net.IPNet {
ret := _m.Called()
var r0 []*net.IPNet
if rf, ok := ret.Get(0).(func() []*net.IPNet); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*net.IPNet)
}
}
return r0
}
// BlockingConnectionGater_ListBlockedSubnets_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListBlockedSubnets'
type BlockingConnectionGater_ListBlockedSubnets_Call struct {
*mock.Call
}
// ListBlockedSubnets is a helper method to define mock.On call
func (_e *BlockingConnectionGater_Expecter) ListBlockedSubnets() *BlockingConnectionGater_ListBlockedSubnets_Call {
return &BlockingConnectionGater_ListBlockedSubnets_Call{Call: _e.mock.On("ListBlockedSubnets")}
}
func (_c *BlockingConnectionGater_ListBlockedSubnets_Call) Run(run func()) *BlockingConnectionGater_ListBlockedSubnets_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *BlockingConnectionGater_ListBlockedSubnets_Call) Return(_a0 []*net.IPNet) *BlockingConnectionGater_ListBlockedSubnets_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *BlockingConnectionGater_ListBlockedSubnets_Call) RunAndReturn(run func() []*net.IPNet) *BlockingConnectionGater_ListBlockedSubnets_Call {
_c.Call.Return(run)
return _c
}
// UnblockAddr provides a mock function with given fields: ip
func (_m *BlockingConnectionGater) UnblockAddr(ip net.IP) error {
ret := _m.Called(ip)
var r0 error
if rf, ok := ret.Get(0).(func(net.IP) error); ok {
r0 = rf(ip)
} else {
r0 = ret.Error(0)
}
return r0
}
// BlockingConnectionGater_UnblockAddr_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UnblockAddr'
type BlockingConnectionGater_UnblockAddr_Call struct {
*mock.Call
}
// UnblockAddr is a helper method to define mock.On call
// - ip net.IP
func (_e *BlockingConnectionGater_Expecter) UnblockAddr(ip interface{}) *BlockingConnectionGater_UnblockAddr_Call {
return &BlockingConnectionGater_UnblockAddr_Call{Call: _e.mock.On("UnblockAddr", ip)}
}
func (_c *BlockingConnectionGater_UnblockAddr_Call) Run(run func(ip net.IP)) *BlockingConnectionGater_UnblockAddr_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(net.IP))
})
return _c
}
func (_c *BlockingConnectionGater_UnblockAddr_Call) Return(_a0 error) *BlockingConnectionGater_UnblockAddr_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *BlockingConnectionGater_UnblockAddr_Call) RunAndReturn(run func(net.IP) error) *BlockingConnectionGater_UnblockAddr_Call {
_c.Call.Return(run)
return _c
}
// UnblockPeer provides a mock function with given fields: p
func (_m *BlockingConnectionGater) UnblockPeer(p peer.ID) error {
ret := _m.Called(p)
var r0 error
if rf, ok := ret.Get(0).(func(peer.ID) error); ok {
r0 = rf(p)
} else {
r0 = ret.Error(0)
}
return r0
}
// BlockingConnectionGater_UnblockPeer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UnblockPeer'
type BlockingConnectionGater_UnblockPeer_Call struct {
*mock.Call
}
// UnblockPeer is a helper method to define mock.On call
// - p peer.ID
func (_e *BlockingConnectionGater_Expecter) UnblockPeer(p interface{}) *BlockingConnectionGater_UnblockPeer_Call {
return &BlockingConnectionGater_UnblockPeer_Call{Call: _e.mock.On("UnblockPeer", p)}
}
func (_c *BlockingConnectionGater_UnblockPeer_Call) Run(run func(p peer.ID)) *BlockingConnectionGater_UnblockPeer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID))
})
return _c
}
func (_c *BlockingConnectionGater_UnblockPeer_Call) Return(_a0 error) *BlockingConnectionGater_UnblockPeer_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *BlockingConnectionGater_UnblockPeer_Call) RunAndReturn(run func(peer.ID) error) *BlockingConnectionGater_UnblockPeer_Call {
_c.Call.Return(run)
return _c
}
// UnblockSubnet provides a mock function with given fields: ipnet
func (_m *BlockingConnectionGater) UnblockSubnet(ipnet *net.IPNet) error {
ret := _m.Called(ipnet)
var r0 error
if rf, ok := ret.Get(0).(func(*net.IPNet) error); ok {
r0 = rf(ipnet)
} else {
r0 = ret.Error(0)
}
return r0
}
// BlockingConnectionGater_UnblockSubnet_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UnblockSubnet'
type BlockingConnectionGater_UnblockSubnet_Call struct {
*mock.Call
}
// UnblockSubnet is a helper method to define mock.On call
// - ipnet *net.IPNet
func (_e *BlockingConnectionGater_Expecter) UnblockSubnet(ipnet interface{}) *BlockingConnectionGater_UnblockSubnet_Call {
return &BlockingConnectionGater_UnblockSubnet_Call{Call: _e.mock.On("UnblockSubnet", ipnet)}
}
func (_c *BlockingConnectionGater_UnblockSubnet_Call) Run(run func(ipnet *net.IPNet)) *BlockingConnectionGater_UnblockSubnet_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*net.IPNet))
})
return _c
}
func (_c *BlockingConnectionGater_UnblockSubnet_Call) Return(_a0 error) *BlockingConnectionGater_UnblockSubnet_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *BlockingConnectionGater_UnblockSubnet_Call) RunAndReturn(run func(*net.IPNet) error) *BlockingConnectionGater_UnblockSubnet_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewBlockingConnectionGater interface {
mock.TestingT
Cleanup(func())
}
// NewBlockingConnectionGater creates a new instance of BlockingConnectionGater. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewBlockingConnectionGater(t mockConstructorTestingTNewBlockingConnectionGater) *BlockingConnectionGater {
mock := &BlockingConnectionGater{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks
import (
net "net"
mock "github.com/stretchr/testify/mock"
peer "github.com/libp2p/go-libp2p/core/peer"
time "time"
)
// ExpiryStore is an autogenerated mock type for the ExpiryStore type
type ExpiryStore struct {
mock.Mock
}
type ExpiryStore_Expecter struct {
mock *mock.Mock
}
func (_m *ExpiryStore) EXPECT() *ExpiryStore_Expecter {
return &ExpiryStore_Expecter{mock: &_m.Mock}
}
// IPBanExpiry provides a mock function with given fields: ip
func (_m *ExpiryStore) IPBanExpiry(ip net.IP) (time.Time, error) {
ret := _m.Called(ip)
var r0 time.Time
var r1 error
if rf, ok := ret.Get(0).(func(net.IP) (time.Time, error)); ok {
return rf(ip)
}
if rf, ok := ret.Get(0).(func(net.IP) time.Time); ok {
r0 = rf(ip)
} else {
r0 = ret.Get(0).(time.Time)
}
if rf, ok := ret.Get(1).(func(net.IP) error); ok {
r1 = rf(ip)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ExpiryStore_IPBanExpiry_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IPBanExpiry'
type ExpiryStore_IPBanExpiry_Call struct {
*mock.Call
}
// IPBanExpiry is a helper method to define mock.On call
// - ip net.IP
func (_e *ExpiryStore_Expecter) IPBanExpiry(ip interface{}) *ExpiryStore_IPBanExpiry_Call {
return &ExpiryStore_IPBanExpiry_Call{Call: _e.mock.On("IPBanExpiry", ip)}
}
func (_c *ExpiryStore_IPBanExpiry_Call) Run(run func(ip net.IP)) *ExpiryStore_IPBanExpiry_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(net.IP))
})
return _c
}
func (_c *ExpiryStore_IPBanExpiry_Call) Return(_a0 time.Time, _a1 error) *ExpiryStore_IPBanExpiry_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *ExpiryStore_IPBanExpiry_Call) RunAndReturn(run func(net.IP) (time.Time, error)) *ExpiryStore_IPBanExpiry_Call {
_c.Call.Return(run)
return _c
}
// PeerBanExpiry provides a mock function with given fields: id
func (_m *ExpiryStore) PeerBanExpiry(id peer.ID) (time.Time, error) {
ret := _m.Called(id)
var r0 time.Time
var r1 error
if rf, ok := ret.Get(0).(func(peer.ID) (time.Time, error)); ok {
return rf(id)
}
if rf, ok := ret.Get(0).(func(peer.ID) time.Time); ok {
r0 = rf(id)
} else {
r0 = ret.Get(0).(time.Time)
}
if rf, ok := ret.Get(1).(func(peer.ID) error); ok {
r1 = rf(id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ExpiryStore_PeerBanExpiry_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PeerBanExpiry'
type ExpiryStore_PeerBanExpiry_Call struct {
*mock.Call
}
// PeerBanExpiry is a helper method to define mock.On call
// - id peer.ID
func (_e *ExpiryStore_Expecter) PeerBanExpiry(id interface{}) *ExpiryStore_PeerBanExpiry_Call {
return &ExpiryStore_PeerBanExpiry_Call{Call: _e.mock.On("PeerBanExpiry", id)}
}
func (_c *ExpiryStore_PeerBanExpiry_Call) Run(run func(id peer.ID)) *ExpiryStore_PeerBanExpiry_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID))
})
return _c
}
func (_c *ExpiryStore_PeerBanExpiry_Call) Return(_a0 time.Time, _a1 error) *ExpiryStore_PeerBanExpiry_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *ExpiryStore_PeerBanExpiry_Call) RunAndReturn(run func(peer.ID) (time.Time, error)) *ExpiryStore_PeerBanExpiry_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewExpiryStore interface {
mock.TestingT
Cleanup(func())
}
// NewExpiryStore creates a new instance of ExpiryStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewExpiryStore(t mockConstructorTestingTNewExpiryStore) *ExpiryStore {
mock := &ExpiryStore{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks
import (
peer "github.com/libp2p/go-libp2p/core/peer"
mock "github.com/stretchr/testify/mock"
)
// Scores is an autogenerated mock type for the Scores type
type Scores struct {
mock.Mock
}
type Scores_Expecter struct {
mock *mock.Mock
}
func (_m *Scores) EXPECT() *Scores_Expecter {
return &Scores_Expecter{mock: &_m.Mock}
}
// GetPeerScore provides a mock function with given fields: id
func (_m *Scores) GetPeerScore(id peer.ID) (float64, error) {
ret := _m.Called(id)
var r0 float64
var r1 error
if rf, ok := ret.Get(0).(func(peer.ID) (float64, error)); ok {
return rf(id)
}
if rf, ok := ret.Get(0).(func(peer.ID) float64); ok {
r0 = rf(id)
} else {
r0 = ret.Get(0).(float64)
}
if rf, ok := ret.Get(1).(func(peer.ID) error); ok {
r1 = rf(id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Scores_GetPeerScore_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPeerScore'
type Scores_GetPeerScore_Call struct {
*mock.Call
}
// GetPeerScore is a helper method to define mock.On call
// - id peer.ID
func (_e *Scores_Expecter) GetPeerScore(id interface{}) *Scores_GetPeerScore_Call {
return &Scores_GetPeerScore_Call{Call: _e.mock.On("GetPeerScore", id)}
}
func (_c *Scores_GetPeerScore_Call) Run(run func(id peer.ID)) *Scores_GetPeerScore_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID))
})
return _c
}
func (_c *Scores_GetPeerScore_Call) Return(_a0 float64, _a1 error) *Scores_GetPeerScore_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *Scores_GetPeerScore_Call) RunAndReturn(run func(peer.ID) (float64, error)) *Scores_GetPeerScore_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewScores interface {
mock.TestingT
Cleanup(func())
}
// NewScores creates a new instance of Scores. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewScores(t mockConstructorTestingTNewScores) *Scores {
mock := &Scores{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
package gating
import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
//go:generate mockery --name Scores --output mocks/ --with-expecter=true
type Scores interface {
GetPeerScore(id peer.ID) (float64, error)
}
// ScoringConnectionGater enhances a ConnectionGater by enforcing a minimum score for peer connections
type ScoringConnectionGater struct {
BlockingConnectionGater
scores Scores
minScore float64
}
func AddScoring(gater BlockingConnectionGater, scores Scores, minScore float64) *ScoringConnectionGater {
return &ScoringConnectionGater{BlockingConnectionGater: gater, scores: scores, minScore: minScore}
}
func (g *ScoringConnectionGater) checkScore(p peer.ID) (allow bool) {
score, err := g.scores.GetPeerScore(p)
if err != nil {
return false
}
return score >= g.minScore
}
func (g *ScoringConnectionGater) InterceptPeerDial(p peer.ID) (allow bool) {
return g.BlockingConnectionGater.InterceptPeerDial(p) && g.checkScore(p)
}
func (g *ScoringConnectionGater) InterceptAddrDial(id peer.ID, ma multiaddr.Multiaddr) (allow bool) {
return g.BlockingConnectionGater.InterceptAddrDial(id, ma) && g.checkScore(id)
}
func (g *ScoringConnectionGater) InterceptSecured(dir network.Direction, id peer.ID, mas network.ConnMultiaddrs) (allow bool) {
return g.BlockingConnectionGater.InterceptSecured(dir, id, mas) && g.checkScore(id)
}
......@@ -66,8 +66,6 @@ type GossipRuntimeConfig interface {
//go:generate mockery --name GossipMetricer
type GossipMetricer interface {
RecordGossipEvent(evType int32)
// Peer Scoring Metric Funcs
SetPeerScores(map[string]float64)
}
func blocksTopicV1(cfg *rollup.Config) string {
......@@ -157,7 +155,7 @@ func BuildGlobalGossipParams(cfg *rollup.Config) pubsub.GossipSubParams {
// NewGossipSub configures a new pubsub instance with the specified parameters.
// PubSub uses a GossipSubRouter as it's router under the hood.
func NewGossipSub(p2pCtx context.Context, h host.Host, g ConnectionGater, cfg *rollup.Config, gossipConf GossipSetupConfigurables, m GossipMetricer, log log.Logger) (*pubsub.PubSub, error) {
func NewGossipSub(p2pCtx context.Context, h host.Host, cfg *rollup.Config, gossipConf GossipSetupConfigurables, scorer Scorer, m GossipMetricer, log log.Logger) (*pubsub.PubSub, error) {
denyList, err := pubsub.NewTimeCachedBlacklist(30 * time.Second)
if err != nil {
return nil, err
......@@ -176,7 +174,7 @@ func NewGossipSub(p2pCtx context.Context, h host.Host, g ConnectionGater, cfg *r
pubsub.WithBlacklist(denyList),
pubsub.WithEventTracer(&gossipTracer{m: m}),
}
gossipOpts = append(gossipOpts, ConfigurePeerScoring(h, g, gossipConf, m, log)...)
gossipOpts = append(gossipOpts, ConfigurePeerScoring(gossipConf, scorer, log)...)
gossipOpts = append(gossipOpts, gossipConf.ConfigureGossip(cfg)...)
return pubsub.NewGossipSub(p2pCtx, h, gossipOpts...)
}
......
......@@ -26,17 +26,21 @@ import (
madns "github.com/multiformats/go-multiaddr-dns"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/clock"
)
type ExtraHostFeatures interface {
host.Host
ConnectionGater() ConnectionGater
ConnectionGater() gating.BlockingConnectionGater
ConnectionManager() connmgr.ConnManager
}
type extraHost struct {
host.Host
gater ConnectionGater
gater gating.BlockingConnectionGater
connMgr connmgr.ConnManager
log log.Logger
......@@ -45,7 +49,7 @@ type extraHost struct {
quitC chan struct{}
}
func (e *extraHost) ConnectionGater() ConnectionGater {
func (e *extraHost) ConnectionGater() gating.BlockingConnectionGater {
return e.gater
}
......@@ -122,7 +126,7 @@ func (e *extraHost) monitorStaticPeers() {
var _ ExtraHostFeatures = (*extraHost)(nil)
func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host, error) {
func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error) {
if conf.DisableP2P {
return nil, nil
}
......@@ -132,11 +136,16 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host,
return nil, fmt.Errorf("failed to derive pubkey from network priv key: %w", err)
}
ps, err := pstoreds.NewPeerstore(context.Background(), conf.Store, pstoreds.DefaultOpts())
basePs, err := pstoreds.NewPeerstore(context.Background(), conf.Store, pstoreds.DefaultOpts())
if err != nil {
return nil, fmt.Errorf("failed to open peerstore: %w", err)
}
ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store)
if err != nil {
return nil, fmt.Errorf("failed to open extended peerstore: %w", err)
}
if err := ps.AddPrivKey(pid, conf.Priv); err != nil {
return nil, fmt.Errorf("failed to set up peerstore with priv key: %w", err)
}
......@@ -144,10 +153,15 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host,
return nil, fmt.Errorf("failed to set up peerstore with pub key: %w", err)
}
connGtr, err := DefaultConnGater(conf)
var connGtr gating.BlockingConnectionGater
connGtr, err = gating.NewBlockingConnectionGater(conf.Store)
if err != nil {
return nil, fmt.Errorf("failed to open connection gater: %w", err)
}
// TODO(CLI-4015): apply connGtr enhancements
// connGtr = gating.AddBanExpiry(connGtr, ps, log, cl, reporter)
//connGtr = gating.AddScoring(connGtr, ps, 0)
connGtr = gating.AddMetering(connGtr, metrics)
connMngr, err := DefaultConnManager(conf)
if err != nil {
......@@ -226,10 +240,7 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter) (host.Host,
go out.monitorStaticPeers()
}
// Only add the connection gater if it offers the full interface we're looking for.
if g, ok := connGtr.(ConnectionGater); ok {
out.gater = g
}
out.gater = connGtr
return out, nil
}
......
......@@ -59,10 +59,10 @@ func TestingConfig(t *testing.T) *Config {
func TestP2PSimple(t *testing.T) {
confA := TestingConfig(t)
confB := TestingConfig(t)
hostA, err := confA.Host(testlog.Logger(t, log.LvlError).New("host", "A"), nil)
hostA, err := confA.Host(testlog.Logger(t, log.LvlError).New("host", "A"), nil, metrics.NoopMetrics)
require.NoError(t, err, "failed to launch host A")
defer hostA.Close()
hostB, err := confB.Host(testlog.Logger(t, log.LvlError).New("host", "B"), nil)
hostB, err := confB.Host(testlog.Logger(t, log.LvlError).New("host", "B"), nil, metrics.NoopMetrics)
require.NoError(t, err, "failed to launch host B")
defer hostB.Close()
err = hostA.Connect(context.Background(), peer.AddrInfo{ID: hostB.ID(), Addrs: hostB.Addrs()})
......
// Code generated by mockery v2.22.1. DO NOT EDIT.
package mocks
import (
control "github.com/libp2p/go-libp2p/core/control"
mock "github.com/stretchr/testify/mock"
multiaddr "github.com/multiformats/go-multiaddr"
net "net"
network "github.com/libp2p/go-libp2p/core/network"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// ConnectionGater is an autogenerated mock type for the ConnectionGater type
type ConnectionGater struct {
mock.Mock
}
// BlockAddr provides a mock function with given fields: ip
func (_m *ConnectionGater) BlockAddr(ip net.IP) error {
ret := _m.Called(ip)
var r0 error
if rf, ok := ret.Get(0).(func(net.IP) error); ok {
r0 = rf(ip)
} else {
r0 = ret.Error(0)
}
return r0
}
// BlockPeer provides a mock function with given fields: p
func (_m *ConnectionGater) BlockPeer(p peer.ID) error {
ret := _m.Called(p)
var r0 error
if rf, ok := ret.Get(0).(func(peer.ID) error); ok {
r0 = rf(p)
} else {
r0 = ret.Error(0)
}
return r0
}
// BlockSubnet provides a mock function with given fields: ipnet
func (_m *ConnectionGater) BlockSubnet(ipnet *net.IPNet) error {
ret := _m.Called(ipnet)
var r0 error
if rf, ok := ret.Get(0).(func(*net.IPNet) error); ok {
r0 = rf(ipnet)
} else {
r0 = ret.Error(0)
}
return r0
}
// InterceptAccept provides a mock function with given fields: _a0
func (_m *ConnectionGater) InterceptAccept(_a0 network.ConnMultiaddrs) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(network.ConnMultiaddrs) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// InterceptAddrDial provides a mock function with given fields: _a0, _a1
func (_m *ConnectionGater) InterceptAddrDial(_a0 peer.ID, _a1 multiaddr.Multiaddr) bool {
ret := _m.Called(_a0, _a1)
var r0 bool
if rf, ok := ret.Get(0).(func(peer.ID, multiaddr.Multiaddr) bool); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// InterceptPeerDial provides a mock function with given fields: p
func (_m *ConnectionGater) InterceptPeerDial(p peer.ID) bool {
ret := _m.Called(p)
var r0 bool
if rf, ok := ret.Get(0).(func(peer.ID) bool); ok {
r0 = rf(p)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// InterceptSecured provides a mock function with given fields: _a0, _a1, _a2
func (_m *ConnectionGater) InterceptSecured(_a0 network.Direction, _a1 peer.ID, _a2 network.ConnMultiaddrs) bool {
ret := _m.Called(_a0, _a1, _a2)
var r0 bool
if rf, ok := ret.Get(0).(func(network.Direction, peer.ID, network.ConnMultiaddrs) bool); ok {
r0 = rf(_a0, _a1, _a2)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// InterceptUpgraded provides a mock function with given fields: _a0
func (_m *ConnectionGater) InterceptUpgraded(_a0 network.Conn) (bool, control.DisconnectReason) {
ret := _m.Called(_a0)
var r0 bool
var r1 control.DisconnectReason
if rf, ok := ret.Get(0).(func(network.Conn) (bool, control.DisconnectReason)); ok {
return rf(_a0)
}
if rf, ok := ret.Get(0).(func(network.Conn) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
if rf, ok := ret.Get(1).(func(network.Conn) control.DisconnectReason); ok {
r1 = rf(_a0)
} else {
r1 = ret.Get(1).(control.DisconnectReason)
}
return r0, r1
}
// IsBlocked provides a mock function with given fields: p
func (_m *ConnectionGater) IsBlocked(p peer.ID) bool {
ret := _m.Called(p)
var r0 bool
if rf, ok := ret.Get(0).(func(peer.ID) bool); ok {
r0 = rf(p)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// ListBlockedAddrs provides a mock function with given fields:
func (_m *ConnectionGater) ListBlockedAddrs() []net.IP {
ret := _m.Called()
var r0 []net.IP
if rf, ok := ret.Get(0).(func() []net.IP); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]net.IP)
}
}
return r0
}
// ListBlockedPeers provides a mock function with given fields:
func (_m *ConnectionGater) ListBlockedPeers() []peer.ID {
ret := _m.Called()
var r0 []peer.ID
if rf, ok := ret.Get(0).(func() []peer.ID); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]peer.ID)
}
}
return r0
}
// ListBlockedSubnets provides a mock function with given fields:
func (_m *ConnectionGater) ListBlockedSubnets() []*net.IPNet {
ret := _m.Called()
var r0 []*net.IPNet
if rf, ok := ret.Get(0).(func() []*net.IPNet); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*net.IPNet)
}
}
return r0
}
// UnblockAddr provides a mock function with given fields: ip
func (_m *ConnectionGater) UnblockAddr(ip net.IP) error {
ret := _m.Called(ip)
var r0 error
if rf, ok := ret.Get(0).(func(net.IP) error); ok {
r0 = rf(ip)
} else {
r0 = ret.Error(0)
}
return r0
}
// UnblockPeer provides a mock function with given fields: p
func (_m *ConnectionGater) UnblockPeer(p peer.ID) error {
ret := _m.Called(p)
var r0 error
if rf, ok := ret.Get(0).(func(peer.ID) error); ok {
r0 = rf(p)
} else {
r0 = ret.Error(0)
}
return r0
}
// UnblockSubnet provides a mock function with given fields: ipnet
func (_m *ConnectionGater) UnblockSubnet(ipnet *net.IPNet) error {
ret := _m.Called(ipnet)
var r0 error
if rf, ok := ret.Get(0).(func(*net.IPNet) error); ok {
r0 = rf(ipnet)
} else {
r0 = ret.Error(0)
}
return r0
}
type mockConstructorTestingTNewConnectionGater interface {
mock.TestingT
Cleanup(func())
}
// NewConnectionGater creates a new instance of ConnectionGater. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewConnectionGater(t mockConstructorTestingTNewConnectionGater) *ConnectionGater {
mock := &ConnectionGater{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.22.1. DO NOT EDIT.
// Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks
......@@ -6,6 +6,8 @@ import (
mock "github.com/stretchr/testify/mock"
peer "github.com/libp2p/go-libp2p/core/peer"
store "github.com/ethereum-optimism/optimism/op-node/p2p/store"
)
// Peerstore is an autogenerated mock type for the Peerstore type
......@@ -43,6 +45,20 @@ func (_m *Peerstore) Peers() peer.IDSlice {
return r0
}
// SetScore provides a mock function with given fields: id, diff
func (_m *Peerstore) SetScore(id peer.ID, diff store.ScoreDiff) error {
ret := _m.Called(id, diff)
var r0 error
if rf, ok := ret.Get(0).(func(peer.ID, store.ScoreDiff) error); ok {
r0 = rf(id, diff)
} else {
r0 = ret.Error(0)
}
return r0
}
type mockConstructorTestingTNewPeerstore interface {
mock.TestingT
Cleanup(func())
......
......@@ -6,6 +6,14 @@ import (
"fmt"
"strconv"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/hashicorp/go-multierror"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/connmgr"
......@@ -13,22 +21,14 @@ import (
p2pmetrics "github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
ma "github.com/multiformats/go-multiaddr"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum-optimism/optimism/op-node/rollup"
)
// NodeP2P is a p2p node, which can be used to gossip messages.
type NodeP2P struct {
host host.Host // p2p host (optional, may be nil)
gater ConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled
connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
host host.Host // p2p host (optional, may be nil)
gater gating.BlockingConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled
scorer Scorer // writes score-updates to the peerstore and keeps metrics of score changes
connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
// the below components are all optional, and may be nil. They require the host to not be nil.
dv5Local *enode.LocalNode // p2p discovery identity
dv5Udp *discover.UDPv5 // p2p discovery service
......@@ -63,7 +63,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
var err error
// nil if disabled.
n.host, err = setup.Host(log, bwc)
n.host, err = setup.Host(log, bwc, metrics)
if err != nil {
if n.dv5Udp != nil {
n.dv5Udp.Close()
......@@ -71,6 +71,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
return fmt.Errorf("failed to start p2p host: %w", err)
}
// TODO(CLI-4016): host is not optional, NodeP2P as a whole is. This if statement is wrong
if n.host != nil {
// Enable extra features, if any. During testing we don't setup the most advanced host all the time.
if extra, ok := n.host.(ExtraHostFeatures); ok {
......@@ -100,10 +101,23 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber)
}
}
eps, ok := n.host.Peerstore().(store.ExtendedPeerstore)
if !ok {
return fmt.Errorf("cannot init without extended peerstore: %w", err)
}
n.scorer = NewScorer(rollupCfg, eps, metrics, setup.PeerBandScorer(), log)
n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(_ network.Network, conn network.Conn) {
n.scorer.OnConnect(conn.RemotePeer())
},
DisconnectedF: func(_ network.Network, conn network.Conn) {
n.scorer.OnDisconnect(conn.RemotePeer())
},
})
// notify of any new connections/streams/etc.
n.host.Network().Notify(NewNetworkNotifier(log, metrics))
// note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled.
n.gs, err = NewGossipSub(resourcesCtx, n.host, n.gater, rollupCfg, setup, metrics, log)
n.gs, err = NewGossipSub(resourcesCtx, n.host, rollupCfg, setup, n.scorer, metrics, log)
if err != nil {
return fmt.Errorf("failed to start gossipsub router: %w", err)
}
......@@ -162,7 +176,7 @@ func (n *NodeP2P) GossipOut() GossipOut {
return n.gsOut
}
func (n *NodeP2P) ConnectionGater() ConnectionGater {
func (n *NodeP2P) ConnectionGater() gating.BlockingConnectionGater {
return n.gater
}
......
package p2p
import (
log "github.com/ethereum/go-ethereum/log"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// ConnectionFactor is the factor by which we multiply the connection score.
const ConnectionFactor = -10
// PeerScoreThreshold is the threshold at which we block a peer.
const PeerScoreThreshold = -100
// gater is an internal implementation of the [PeerGater] interface.
type gater struct {
connGater ConnectionGater
blockedMap map[peer.ID]bool
log log.Logger
banEnabled bool
}
// PeerGater manages the connection gating of peers.
//
//go:generate mockery --name PeerGater --output mocks/
type PeerGater interface {
// Update handles a peer score update and blocks/unblocks the peer if necessary.
Update(peer.ID, float64)
// IsBlocked returns true if the given [peer.ID] is blocked.
IsBlocked(peer.ID) bool
}
// NewPeerGater returns a new peer gater.
func NewPeerGater(connGater ConnectionGater, log log.Logger, banEnabled bool) PeerGater {
return &gater{
connGater: connGater,
blockedMap: make(map[peer.ID]bool),
log: log,
banEnabled: banEnabled,
}
}
// IsBlocked returns true if the given [peer.ID] is blocked.
func (s *gater) IsBlocked(peerID peer.ID) bool {
return s.blockedMap[peerID]
}
// setBlocked sets the blocked status of the given [peer.ID].
func (s *gater) setBlocked(peerID peer.ID, blocked bool) {
s.blockedMap[peerID] = blocked
}
// Update handles a peer score update and blocks/unblocks the peer if necessary.
func (s *gater) Update(id peer.ID, score float64) {
// Check if the peer score is below the threshold
// If so, we need to block the peer
isAlreadyBlocked := s.IsBlocked(id)
if score < PeerScoreThreshold && s.banEnabled && !isAlreadyBlocked {
s.log.Warn("peer blocking enabled, blocking peer", "id", id.String(), "score", score)
err := s.connGater.BlockPeer(id)
if err != nil {
s.log.Warn("connection gater failed to block peer", "id", id.String(), "err", err)
}
// Set the peer as blocked in the blocked map
s.setBlocked(id, true)
}
// Unblock peers whose score has recovered to an acceptable level
if (score > PeerScoreThreshold) && isAlreadyBlocked {
err := s.connGater.UnblockPeer(id)
if err != nil {
s.log.Warn("connection gater failed to unblock peer", "id", id.String(), "err", err)
}
// Set the peer as unblocked in the blocked map
s.setBlocked(id, false)
}
}
package p2p_test
import (
"testing"
p2p "github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
testlog "github.com/ethereum-optimism/optimism/op-node/testlog"
log "github.com/ethereum/go-ethereum/log"
peer "github.com/libp2p/go-libp2p/core/peer"
suite "github.com/stretchr/testify/suite"
)
// PeerGaterTestSuite tests peer parameterization.
type PeerGaterTestSuite struct {
suite.Suite
mockGater *p2pMocks.ConnectionGater
logger log.Logger
}
// SetupTest sets up the test suite.
func (testSuite *PeerGaterTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.ConnectionGater{}
testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError)
}
// TestPeerGater runs the PeerGaterTestSuite.
func TestPeerGater(t *testing.T) {
suite.Run(t, new(PeerGaterTestSuite))
}
// TestPeerScoreConstants validates the peer score constants.
func (testSuite *PeerGaterTestSuite) TestPeerScoreConstants() {
testSuite.Equal(-10, p2p.ConnectionFactor)
testSuite.Equal(-100, p2p.PeerScoreThreshold)
}
// TestPeerGaterUpdate tests the peer gater update hook.
func (testSuite *PeerGaterTestSuite) TestPeerGater_UpdateBansPeers() {
gater := p2p.NewPeerGater(
testSuite.mockGater,
testSuite.logger,
true,
)
// Return an empty list of already blocked peers
testSuite.mockGater.On("ListBlockedPeers").Return([]peer.ID{}).Once()
// Mock a connection gater peer block call
// Since the peer score is below the [PeerScoreThreshold] of -100,
// the [BlockPeer] method should be called
testSuite.mockGater.On("BlockPeer", peer.ID("peer1")).Return(nil).Once()
// The peer should initially be unblocked
testSuite.False(gater.IsBlocked(peer.ID("peer1")))
// Apply the peer gater update
gater.Update(peer.ID("peer1"), float64(-101))
// The peer should be considered blocked
testSuite.True(gater.IsBlocked(peer.ID("peer1")))
// Now let's unblock the peer
testSuite.mockGater.On("UnblockPeer", peer.ID("peer1")).Return(nil).Once()
gater.Update(peer.ID("peer1"), float64(0))
// The peer should be considered unblocked
testSuite.False(gater.IsBlocked(peer.ID("peer1")))
}
// TestPeerGaterUpdateNoBanning tests the peer gater update hook without banning set
func (testSuite *PeerGaterTestSuite) TestPeerGater_UpdateNoBanning() {
gater := p2p.NewPeerGater(
testSuite.mockGater,
testSuite.logger,
false,
)
// Return an empty list of already blocked peers
testSuite.mockGater.On("ListBlockedPeers").Return([]peer.ID{})
// Notice: [BlockPeer] should not be called since banning is not enabled
// even though the peer score is way below the [PeerScoreThreshold] of -100
gater.Update(peer.ID("peer1"), float64(-100000))
// The peer should be unblocked
testSuite.False(gater.IsBlocked(peer.ID("peer1")))
// Make sure that if we then "unblock" the peer, nothing happens
gater.Update(peer.ID("peer1"), float64(0))
// The peer should still be unblocked
testSuite.False(gater.IsBlocked(peer.ID("peer1")))
}
......@@ -5,7 +5,11 @@ import (
"sort"
"strconv"
"strings"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
log "github.com/ethereum/go-ethereum/log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
peer "github.com/libp2p/go-libp2p/core/peer"
......@@ -13,10 +17,10 @@ import (
type scorer struct {
peerStore Peerstore
metricer GossipMetricer
metricer ScoreMetrics
log log.Logger
gater PeerGater
bandScoreThresholds *BandScoreThresholds
cfg *rollup.Config
}
// scorePair holds a band and its corresponding threshold.
......@@ -91,30 +95,38 @@ type Peerstore interface {
// Peers returns all of the peer IDs stored across all inner stores.
Peers() peer.IDSlice
SetScore(id peer.ID, diff store.ScoreDiff) error
}
// Scorer is a peer scorer that scores peers based on application-specific metrics.
type Scorer interface {
OnConnect()
OnDisconnect()
OnConnect(id peer.ID)
OnDisconnect(id peer.ID)
SnapshotHook() pubsub.ExtendedPeerScoreInspectFn
}
type ScoreMetrics interface {
SetPeerScores(map[string]float64)
}
// NewScorer returns a new peer scorer.
func NewScorer(peerGater PeerGater, peerStore Peerstore, metricer GossipMetricer, bandScoreThresholds *BandScoreThresholds, log log.Logger) Scorer {
func NewScorer(cfg *rollup.Config, peerStore Peerstore, metricer ScoreMetrics, bandScoreThresholds *BandScoreThresholds, log log.Logger) Scorer {
return &scorer{
peerStore: peerStore,
metricer: metricer,
log: log,
gater: peerGater,
bandScoreThresholds: bandScoreThresholds,
cfg: cfg,
}
}
// SnapshotHook returns a function that is called periodically by the pubsub library to inspect the peer scores.
// SnapshotHook returns a function that is called periodically by the pubsub library to inspect the gossip peer scores.
// It is passed into the pubsub library as a [pubsub.ExtendedPeerScoreInspectFn] in the [pubsub.WithPeerScoreInspect] option.
// The returned [pubsub.ExtendedPeerScoreInspectFn] is called with a mapping of peer IDs to peer score snapshots.
// The incoming peer score snapshots only contain gossip-score components.
func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
blocksTopicName := blocksTopicV1(s.cfg)
return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) {
scoreMap := make(map[string]float64)
// Zero out all bands.
......@@ -123,22 +135,36 @@ func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
}
// Now set the new scores.
for id, snap := range m {
diff := store.GossipScores{
Total: snap.Score,
Blocks: store.TopicScores{},
IPColocationFactor: snap.IPColocationFactor,
BehavioralPenalty: snap.BehaviourPenalty,
}
if topSnap, ok := snap.Topics[blocksTopicName]; ok {
diff.Blocks.TimeInMesh = float64(topSnap.TimeInMesh) / float64(time.Second)
diff.Blocks.MeshMessageDeliveries = uint64(topSnap.MeshMessageDeliveries)
diff.Blocks.FirstMessageDeliveries = uint64(topSnap.FirstMessageDeliveries)
diff.Blocks.InvalidMessageDeliveries = uint64(topSnap.InvalidMessageDeliveries)
}
if err := s.peerStore.SetScore(id, &diff); err != nil {
s.log.Warn("Unable to update peer gossip score", "err", err)
}
}
for _, snap := range m {
band := s.bandScoreThresholds.Bucket(snap.Score)
scoreMap[band] += 1
s.gater.Update(id, snap.Score)
}
s.metricer.SetPeerScores(scoreMap)
}
}
// OnConnect is called when a peer connects.
// See [p2p.NotificationsMetricer] for invocation.
func (s *scorer) OnConnect() {
// no-op
func (s *scorer) OnConnect(id peer.ID) {
// TODO(CLI-4003): apply decay to scores, based on last connection time
}
// OnDisconnect is called when a peer disconnects.
// See [p2p.NotificationsMetricer] for invocation.
func (s *scorer) OnDisconnect() {
// no-op
func (s *scorer) OnDisconnect(id peer.ID) {
// TODO(CLI-4003): persist disconnect-time
}
package p2p_test
import (
"math/big"
"testing"
p2p "github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
"github.com/ethereum-optimism/optimism/op-node/testlog"
log "github.com/ethereum/go-ethereum/log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
peer "github.com/libp2p/go-libp2p/core/peer"
suite "github.com/stretchr/testify/suite"
log "github.com/ethereum/go-ethereum/log"
p2p "github.com/ethereum-optimism/optimism/op-node/p2p"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
)
// PeerScorerTestSuite tests peer parameterization.
type PeerScorerTestSuite struct {
suite.Suite
// mockConnGater *p2pMocks.ConnectionGater
mockGater *p2pMocks.PeerGater
mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer
bandScorer *p2p.BandScoreThresholds
......@@ -26,7 +29,6 @@ type PeerScorerTestSuite struct {
// SetupTest sets up the test suite.
func (testSuite *PeerScorerTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.PeerGater{}
testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{}
bandScorer, err := p2p.NewBandScorer("-40:graylist;0:friend;")
......@@ -43,31 +45,31 @@ func TestPeerScorer(t *testing.T) {
// TestScorer_OnConnect ensures we can call the OnConnect method on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestScorer_OnConnect() {
scorer := p2p.NewScorer(
testSuite.mockGater,
&rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger,
)
scorer.OnConnect()
scorer.OnConnect(peer.ID("alice"))
}
// TestScorer_OnDisconnect ensures we can call the OnDisconnect method on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestScorer_OnDisconnect() {
scorer := p2p.NewScorer(
testSuite.mockGater,
&rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger,
)
scorer.OnDisconnect()
scorer.OnDisconnect(peer.ID("alice"))
}
// TestScorer_SnapshotHook tests running the snapshot hook on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
scorer := p2p.NewScorer(
testSuite.mockGater,
&rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.bandScorer,
......@@ -75,8 +77,8 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
)
inspectFn := scorer.SnapshotHook()
// Mock the peer gater call
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-100)).Return(nil).Once()
// Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: float64(-100)}).Return(nil).Once()
// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
......@@ -92,8 +94,8 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
}
inspectFn(snapshotMap)
// Change the peer score now to a different band
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(0)).Return(nil).Once()
// Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: 0}).Return(nil).Once()
// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
......@@ -114,7 +116,7 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
// This implies that the peer should be blocked.
func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHookBlocksPeer() {
scorer := p2p.NewScorer(
testSuite.mockGater,
&rollup.Config{L2ChainID: big.NewInt(123)},
testSuite.mockStore,
testSuite.mockMetricer,
testSuite.bandScorer,
......@@ -122,8 +124,8 @@ func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHookBlocksPeer() {
)
inspectFn := scorer.SnapshotHook()
// Mock the peer gater call
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-101)).Return(nil)
// Expect updating the peer store
testSuite.mockStore.On("SetScore", peer.ID("peer1"), &store.GossipScores{Total: float64(-101)}).Return(nil).Once()
// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
......
......@@ -3,18 +3,14 @@ package p2p
import (
log "github.com/ethereum/go-ethereum/log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
host "github.com/libp2p/go-libp2p/core/host"
)
// ConfigurePeerScoring configures the peer scoring parameters for the pubsub
func ConfigurePeerScoring(h host.Host, g ConnectionGater, gossipConf GossipSetupConfigurables, m GossipMetricer, log log.Logger) []pubsub.Option {
func ConfigurePeerScoring(gossipConf GossipSetupConfigurables, scorer Scorer, log log.Logger) []pubsub.Option {
// If we want to completely disable scoring config here, we can use the [peerScoringParams]
// to return early without returning any [pubsub.Option].
peerScoreParams := gossipConf.PeerScoringParams()
peerScoreThresholds := NewPeerScoreThresholds()
banEnabled := gossipConf.BanPeers()
peerGater := NewPeerGater(g, log, banEnabled)
scorer := NewScorer(peerGater, h.Peerstore(), m, gossipConf.PeerBandScorer(), log)
opts := []pubsub.Option{}
// Check the app specific score since libp2p doesn't export it's [validate] function :/
if peerScoreParams != nil && peerScoreParams.AppSpecificScore != nil {
......
package p2p_test
package p2p
import (
"context"
"fmt"
"math/big"
"math/rand"
"testing"
"time"
p2p "github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup"
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
testlog "github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/stretchr/testify/mock"
suite "github.com/stretchr/testify/suite"
"github.com/ethereum-optimism/optimism/op-service/clock"
log "github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
host "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds"
tswarm "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
// PeerScoresTestSuite tests peer parameterization.
type PeerScoresTestSuite struct {
suite.Suite
mockGater *p2pMocks.ConnectionGater
mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer
bandScorer p2p.BandScoreThresholds
bandScorer BandScoreThresholds
logger log.Logger
}
// SetupTest sets up the test suite.
func (testSuite *PeerScoresTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.ConnectionGater{}
testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{}
bandScorer, err := p2p.NewBandScorer("0:graylist;")
bandScorer, err := NewBandScorer("0:graylist;")
testSuite.NoError(err)
testSuite.bandScorer = *bandScorer
testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError)
......@@ -50,11 +56,29 @@ func TestPeerScores(t *testing.T) {
suite.Run(t, new(PeerScoresTestSuite))
}
type customPeerstoreNetwork struct {
network.Network
ps peerstore.Peerstore
}
func (c *customPeerstoreNetwork) Peerstore() peerstore.Peerstore {
return c.ps
}
func (c *customPeerstoreNetwork) Close() error {
_ = c.ps.Close()
return c.Network.Close()
}
// getNetHosts generates a slice of hosts using the [libp2p/go-libp2p] library.
func getNetHosts(testSuite *PeerScoresTestSuite, ctx context.Context, n int) []host.Host {
var out []host.Host
log := testlog.Logger(testSuite.T(), log.LvlError)
for i := 0; i < n; i++ {
netw := tswarm.GenSwarm(testSuite.T())
swarm := tswarm.GenSwarm(testSuite.T())
eps, err := store.NewExtendedPeerstore(ctx, log, clock.SystemClock, swarm.Peerstore(), sync.MutexWrap(ds.NewMapDatastore()))
netw := &customPeerstoreNetwork{swarm, eps}
require.NoError(testSuite.T(), err)
h := bhost.NewBlankHost(netw)
testSuite.T().Cleanup(func() { h.Close() })
out = append(out, h)
......@@ -71,7 +95,17 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
for _, h := range hosts {
rt := pubsub.DefaultGossipSubRouter(h)
opts := []pubsub.Option{}
opts = append(opts, p2p.ConfigurePeerScoring(h, testSuite.mockGater, &p2p.Config{
dataStore := sync.MutexWrap(ds.NewMapDatastore())
peerStore, err := pstoreds.NewPeerstore(context.Background(), dataStore, pstoreds.DefaultOpts())
require.NoError(testSuite.T(), err)
extPeerStore, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, peerStore, dataStore)
require.NoError(testSuite.T(), err)
scorer := NewScorer(
&rollup.Config{L2ChainID: big.NewInt(123)},
extPeerStore, testSuite.mockMetricer, &testSuite.bandScorer, logger)
opts = append(opts, ConfigurePeerScoring(&Config{
BandScoreThresholds: testSuite.bandScorer,
PeerScoring: pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 {
......@@ -85,7 +119,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
DecayInterval: time.Second,
DecayToZero: 0.01,
},
}, testSuite.mockMetricer, logger)...)
}, scorer, logger)...)
ps, err := pubsub.NewGossipSubWithRouter(ctx, h, rt, opts...)
if err != nil {
panic(err)
......@@ -125,8 +159,6 @@ func (testSuite *PeerScoresTestSuite) TestNegativeScores() {
testSuite.mockMetricer.On("SetPeerScores", mock.Anything).Return(nil)
testSuite.mockGater.On("ListBlockedPeers").Return([]peer.ID{})
// Construct 20 hosts using the [getNetHosts] function.
hosts := getNetHosts(testSuite, ctx, 20)
testSuite.Equal(20, len(hosts))
......
......@@ -43,7 +43,7 @@ func (p *Prepared) Check() error {
}
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
func (p *Prepared) Host(log log.Logger, reporter metrics.Reporter) (host.Host, error) {
func (p *Prepared) Host(log log.Logger, reporter metrics.Reporter, metrics HostMetrics) (host.Host, error) {
return p.HostP2P, nil
}
......
......@@ -5,6 +5,7 @@ import (
"net"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
......@@ -28,6 +29,8 @@ type PeerInfo struct {
Latency time.Duration `json:"latency"`
GossipBlocks bool `json:"gossipBlocks"` // if the peer is in our gossip topic
PeerScores store.PeerScores `json:"scores"`
}
type PeerDump struct {
......
......@@ -7,7 +7,10 @@ import (
"net"
"time"
"github.com/ethereum-optimism/optimism/op-node/p2p/gating"
decredSecp "github.com/decred/dcrd/dcrec/secp256k1/v4"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-testing/netutil"
"github.com/libp2p/go-libp2p/core/connmgr"
......@@ -47,7 +50,7 @@ type Node interface {
// GossipOut returns the gossip output/info control
GossipOut() GossipOut
// ConnectionGater returns the connection gater, to ban/unban peers with, may be nil
ConnectionGater() ConnectionGater
ConnectionGater() gating.BlockingConnectionGater
// ConnectionManager returns the connection manager, to protect peers with, may be nil
ConnectionManager() connmgr.ConnManager
}
......@@ -108,6 +111,11 @@ func dumpPeer(id peer.ID, nw network.Network, pstore peerstore.Peerstore, connMg
info.NodeID = enode.PubkeyToIDV4((*decredSecp.PublicKey)(typedPub).ToECDSA())
}
}
if eps, ok := pstore.(store.ExtendedPeerstore); ok {
if dat, err := eps.GetPeerScores(id); err == nil {
info.PeerScores = dat
}
}
if dat, err := pstore.Get(id, "ProtocolVersion"); err == nil {
protocolVersion, ok := dat.(string)
if ok {
......
package store
import (
"context"
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/peerstore"
)
type extendedStore struct {
peerstore.Peerstore
peerstore.CertifiedAddrBook
*scoreBook
}
func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching) (ExtendedPeerstore, error) {
cab, ok := peerstore.GetCertifiedAddrBook(ps)
if !ok {
return nil, errors.New("peerstore should also be a certified address book")
}
sb, err := newScoreBook(ctx, logger, clock, store)
if err != nil {
return nil, fmt.Errorf("create scorebook: %w", err)
}
sb.startGC()
return &extendedStore{
Peerstore: ps,
CertifiedAddrBook: cab,
scoreBook: sb,
}, nil
}
func (s *extendedStore) Close() error {
s.scoreBook.Close()
return s.Peerstore.Close()
}
var _ ExtendedPeerstore = (*extendedStore)(nil)
package store
import (
"context"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
)
const (
gcPeriod = 2 * time.Hour
)
type gcAction func() error
func startGc(ctx context.Context, logger log.Logger, clock clock.Clock, bgTasks *sync.WaitGroup, action gcAction) {
bgTasks.Add(1)
go func() {
defer bgTasks.Done()
gcTimer := clock.NewTicker(gcPeriod)
defer gcTimer.Stop()
for {
select {
case <-gcTimer.Ch():
if err := action(); err != nil {
logger.Warn("GC failed", "err", err)
}
case <-ctx.Done():
return
}
}
}()
}
package store
import (
"context"
"sync"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestScheduleGcPeriodically(t *testing.T) {
var bgTasks sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
// Wait for the gc background process to complete after cancelling the context
bgTasks.Wait()
}()
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(5000))
called := make(chan struct{}, 10)
action := func() error {
called <- struct{}{}
return nil
}
waitForGc := func(failMsg string) {
timeout, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
select {
case <-timeout.Done():
t.Fatal(failMsg)
case <-called:
require.Len(t, called, 0, "should only run once after gc period")
}
}
startGc(ctx, logger, clock, &bgTasks, action)
timeout, tCancel := context.WithTimeout(ctx, 10*time.Second)
defer tCancel()
require.True(t, clock.WaitForNewPendingTask(timeout), "did not schedule pending GC")
require.Len(t, called, 0, "should not run immediately")
clock.AdvanceTime(gcPeriod)
waitForGc("should run gc after first time period")
clock.AdvanceTime(gcPeriod)
waitForGc("should run gc again after second time period")
}
package store
import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
)
type TopicScores struct {
TimeInMesh float64 `json:"timeInMesh"` // in seconds
FirstMessageDeliveries uint64 `json:"firstMessageDeliveries"`
MeshMessageDeliveries uint64 `json:"meshMessageDeliveries"`
InvalidMessageDeliveries uint64 `json:"invalidMessageDeliveries"`
}
type GossipScores struct {
Total float64 `json:"total"`
Blocks TopicScores `json:"blocks"` // fully zeroed if the peer has not been in the mesh on the topic
IPColocationFactor float64 `json:"IPColocationFactor"`
BehavioralPenalty float64 `json:"behavioralPenalty"`
}
func (g GossipScores) Apply(rec *scoreRecord) {
rec.PeerScores.Gossip = g
}
type PeerScores struct {
Gossip GossipScores `json:"gossip"`
ReqRespSync float64 `json:"reqRespSync"`
}
// ScoreDatastore defines a type-safe API for getting and setting libp2p peer score information
type ScoreDatastore interface {
// GetPeerScores returns the current scores for the specified peer
GetPeerScores(id peer.ID) (PeerScores, error)
// SetScore applies the given store diff to the specified peer
SetScore(id peer.ID, diff ScoreDiff) error
}
// ScoreDiff defines a type-safe batch of changes to apply to the peer-scoring record of the peer.
// The scoreRecord the diff is applied to is private: diffs can only be defined in this package,
// to ensure changes to the record are non-breaking.
type ScoreDiff interface {
Apply(score *scoreRecord)
}
// ExtendedPeerstore defines a type-safe API to work with additional peer metadata based on a libp2p peerstore.Peerstore
type ExtendedPeerstore interface {
peerstore.Peerstore
ScoreDatastore
peerstore.CertifiedAddrBook
}
package store
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
lru "github.com/hashicorp/golang-lru/v2"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-base32"
)
const (
scoreDataV0 = "0"
scoreCacheSize = 100
expiryPeriod = 24 * time.Hour
maxPruneBatchSize = 20
)
var scoresBase = ds.NewKey("/peers/scores")
type scoreRecord struct {
PeerScores PeerScores `json:"peerScores"`
LastUpdate int64 `json:"lastUpdate"` // unix timestamp in seconds
}
type scoreBook struct {
ctx context.Context
cancelFn context.CancelFunc
clock clock.Clock
log log.Logger
bgTasks sync.WaitGroup
store ds.Batching
cache *lru.Cache[peer.ID, scoreRecord]
sync.RWMutex
}
func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*scoreBook, error) {
cache, err := lru.New[peer.ID, scoreRecord](scoreCacheSize)
if err != nil {
return nil, fmt.Errorf("creating cache: %w", err)
}
ctx, cancelFn := context.WithCancel(ctx)
book := scoreBook{
ctx: ctx,
cancelFn: cancelFn,
clock: clock,
log: logger,
store: store,
cache: cache,
}
return &book, nil
}
func (d *scoreBook) startGC() {
startGc(d.ctx, d.log, d.clock, &d.bgTasks, d.prune)
}
func (d *scoreBook) GetPeerScores(id peer.ID) (PeerScores, error) {
d.RLock()
defer d.RUnlock()
record, err := d.getRecord(id)
if err != nil {
return PeerScores{}, nil
}
return record.PeerScores, err
}
func (d *scoreBook) getRecord(id peer.ID) (scoreRecord, error) {
if scores, ok := d.cache.Get(id); ok {
return scores, nil
}
data, err := d.store.Get(d.ctx, scoreKey(id))
if errors.Is(err, ds.ErrNotFound) {
return scoreRecord{}, nil
} else if err != nil {
return scoreRecord{}, fmt.Errorf("load scores for peer %v: %w", id, err)
}
record, err := deserializeScoresV0(data)
if err != nil {
return scoreRecord{}, fmt.Errorf("invalid score data for peer %v: %w", id, err)
}
d.cache.Add(id, record)
return record, nil
}
func (d *scoreBook) SetScore(id peer.ID, diff ScoreDiff) error {
d.Lock()
defer d.Unlock()
scores, err := d.getRecord(id)
if err != nil {
return err
}
scores.LastUpdate = d.clock.Now().Unix()
diff.Apply(&scores)
data, err := serializeScoresV0(scores)
if err != nil {
return fmt.Errorf("encode scores for peer %v: %w", id, err)
}
err = d.store.Put(d.ctx, scoreKey(id), data)
if err != nil {
return fmt.Errorf("storing updated scores for peer %v: %w", id, err)
}
d.cache.Add(id, scores)
return nil
}
// prune deletes entries from the store that are older than expiryPeriod.
// Note that the expiry period is not a strict TTL. Entries that are eligible for deletion may still be present
// either because the prune function hasn't yet run or because they are still preserved in the in-memory cache after
// having been deleted from the database.
func (d *scoreBook) prune() error {
results, err := d.store.Query(d.ctx, query.Query{
Prefix: scoresBase.String(),
})
if err != nil {
return err
}
pending := 0
batch, err := d.store.Batch(d.ctx)
if err != nil {
return err
}
for result := range results.Next() {
// Bail out if the context is done
select {
case <-d.ctx.Done():
return d.ctx.Err()
default:
}
record, err := deserializeScoresV0(result.Value)
if err != nil {
return err
}
if time.Unix(record.LastUpdate, 0).Add(expiryPeriod).Before(d.clock.Now()) {
if pending > maxPruneBatchSize {
if err := batch.Commit(d.ctx); err != nil {
return err
}
batch, err = d.store.Batch(d.ctx)
if err != nil {
return err
}
pending = 0
}
pending++
if err := batch.Delete(d.ctx, ds.NewKey(result.Key)); err != nil {
return err
}
}
}
if err := batch.Commit(d.ctx); err != nil {
return err
}
return nil
}
func (d *scoreBook) Close() {
d.cancelFn()
d.bgTasks.Wait()
}
func scoreKey(id peer.ID) ds.Key {
return scoresBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(id))).ChildString(scoreDataV0)
}
package store
import (
"context"
"strconv"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds"
"github.com/stretchr/testify/require"
)
func TestGetEmptyScoreComponents(t *testing.T) {
id := peer.ID("aaaa")
store := createMemoryStore(t)
assertPeerScores(t, store, id, PeerScores{})
}
func TestRoundTripGossipScore(t *testing.T) {
id := peer.ID("aaaa")
store := createMemoryStore(t)
score := 123.45
err := store.SetScore(id, &GossipScores{Total: score})
require.NoError(t, err)
assertPeerScores(t, store, id, PeerScores{Gossip: GossipScores{Total: score}})
}
func TestUpdateGossipScore(t *testing.T) {
id := peer.ID("aaaa")
store := createMemoryStore(t)
score := 123.45
require.NoError(t, store.SetScore(id, &GossipScores{Total: 444.223}))
require.NoError(t, store.SetScore(id, &GossipScores{Total: score}))
assertPeerScores(t, store, id, PeerScores{Gossip: GossipScores{Total: score}})
}
func TestStoreScoresForMultiplePeers(t *testing.T) {
id1 := peer.ID("aaaa")
id2 := peer.ID("bbbb")
store := createMemoryStore(t)
score1 := 123.45
score2 := 453.22
require.NoError(t, store.SetScore(id1, &GossipScores{Total: score1}))
require.NoError(t, store.SetScore(id2, &GossipScores{Total: score2}))
assertPeerScores(t, store, id1, PeerScores{Gossip: GossipScores{Total: score1}})
assertPeerScores(t, store, id2, PeerScores{Gossip: GossipScores{Total: score2}})
}
func TestPersistData(t *testing.T) {
id := peer.ID("aaaa")
score := 123.45
backingStore := sync.MutexWrap(ds.NewMapDatastore())
store := createPeerstoreWithBacking(t, backingStore)
require.NoError(t, store.SetScore(id, &GossipScores{Total: score}))
// Close and recreate a new store from the same backing
require.NoError(t, store.Close())
store = createPeerstoreWithBacking(t, backingStore)
assertPeerScores(t, store, id, PeerScores{Gossip: GossipScores{Total: score}})
}
func TestCloseCompletes(t *testing.T) {
store := createMemoryStore(t)
require.NoError(t, store.Close())
}
func TestPrune(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logger := testlog.Logger(t, log.LvlInfo)
store := sync.MutexWrap(ds.NewMapDatastore())
clock := clock.NewDeterministicClock(time.UnixMilli(1000))
book, err := newScoreBook(ctx, logger, clock, store)
require.NoError(t, err)
hasScoreRecorded := func(id peer.ID) bool {
scores, err := book.GetPeerScores(id)
require.NoError(t, err)
return scores != PeerScores{}
}
firstStore := clock.Now()
// Set some scores all 30 minutes apart so they have different expiry times
require.NoError(t, book.SetScore("aaaa", &GossipScores{Total: 123.45}))
clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("bbbb", &GossipScores{Total: 123.45}))
clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("cccc", &GossipScores{Total: 123.45}))
clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("dddd", &GossipScores{Total: 123.45}))
clock.AdvanceTime(30 * time.Minute)
// Update bbbb again which should extend its expiry
require.NoError(t, book.SetScore("bbbb", &GossipScores{Total: 123.45}))
require.True(t, hasScoreRecorded("aaaa"))
require.True(t, hasScoreRecorded("bbbb"))
require.True(t, hasScoreRecorded("cccc"))
require.True(t, hasScoreRecorded("dddd"))
elapsedTime := clock.Now().Sub(firstStore)
timeToFirstExpiry := expiryPeriod - elapsedTime
// Advance time until the score for aaaa should be pruned.
clock.AdvanceTime(timeToFirstExpiry + 1)
require.NoError(t, book.prune())
// Clear the cache so reads have to come from the database
book.cache.Purge()
require.False(t, hasScoreRecorded("aaaa"), "should have pruned aaaa record")
// Advance time so cccc, dddd and the original bbbb entry should be pruned
clock.AdvanceTime(90 * time.Minute)
require.NoError(t, book.prune())
// Clear the cache so reads have to come from the database
book.cache.Purge()
require.False(t, hasScoreRecorded("cccc"), "should have pruned cccc record")
require.False(t, hasScoreRecorded("dddd"), "should have pruned cccc record")
require.True(t, hasScoreRecorded("bbbb"), "should not prune bbbb record")
}
func TestPruneMultipleBatches(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(1000))
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()))
require.NoError(t, err)
hasScoreRecorded := func(id peer.ID) bool {
scores, err := book.GetPeerScores(id)
require.NoError(t, err)
return scores != PeerScores{}
}
// Set scores for more peers than the max batch size
peerCount := maxPruneBatchSize*3 + 5
for i := 0; i < peerCount; i++ {
require.NoError(t, book.SetScore(peer.ID(strconv.Itoa(i)), &GossipScores{Total: 123.45}))
}
clock.AdvanceTime(expiryPeriod + 1)
require.NoError(t, book.prune())
// Clear the cache so reads have to come from the database
book.cache.Purge()
for i := 0; i < peerCount; i++ {
require.Falsef(t, hasScoreRecorded(peer.ID(strconv.Itoa(i))), "Should prune record peer %v", i)
}
}
func assertPeerScores(t *testing.T, store ExtendedPeerstore, id peer.ID, expected PeerScores) {
result, err := store.GetPeerScores(id)
require.NoError(t, err)
require.Equal(t, result, expected)
}
func createMemoryStore(t *testing.T) ExtendedPeerstore {
store := sync.MutexWrap(ds.NewMapDatastore())
return createPeerstoreWithBacking(t, store)
}
func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) ExtendedPeerstore {
ps, err := pstoreds.NewPeerstore(context.Background(), store, pstoreds.DefaultOpts())
require.NoError(t, err, "Failed to create peerstore")
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(100))
eps, err := NewExtendedPeerstore(context.Background(), logger, clock, ps, store)
require.NoError(t, err)
t.Cleanup(func() {
_ = eps.Close()
})
return eps
}
package store
import "encoding/json"
func serializeScoresV0(scores scoreRecord) ([]byte, error) {
// v0 just serializes to JSON. New/unrecognized values default to 0.
return json.Marshal(&scores)
}
func deserializeScoresV0(data []byte) (scoreRecord, error) {
var out scoreRecord
err := json.Unmarshal(data, &out)
return out, err
}
package store
import (
"encoding/json"
"strconv"
"testing"
"github.com/stretchr/testify/require"
)
func TestRoundtripScoresV0(t *testing.T) {
scores := scoreRecord{
PeerScores: PeerScores{Gossip: GossipScores{Total: 1234.52382}},
LastUpdate: 1923841,
}
data, err := serializeScoresV0(scores)
require.NoError(t, err)
result, err := deserializeScoresV0(data)
require.NoError(t, err)
require.Equal(t, scores, result)
}
// TestParseHistoricSerializations checks that existing data can still be deserialized
// Adding new fields should not require bumping the version. Removing fields may require bumping.
// Scores should always default to 0.
// A new entry should be added to this test each time any fields are changed to ensure it can always be deserialized
func TestParseHistoricSerializationsV0(t *testing.T) {
tests := []struct {
data string
expected scoreRecord
}{
{
data: `{"peerScores":{"gossip":{"total":1234.52382,"blocks":{"timeInMesh":1234,"firstMessageDeliveries":12,"meshMessageDeliveries":34,"invalidMessageDeliveries":56},"IPColocationFactor":12.34,"behavioralPenalty":56.78},"reqRespSync":123456},"lastUpdate":1923841}`,
expected: scoreRecord{
PeerScores: PeerScores{
Gossip: GossipScores{
Total: 1234.52382,
Blocks: TopicScores{
TimeInMesh: 1234,
FirstMessageDeliveries: 12,
MeshMessageDeliveries: 34,
InvalidMessageDeliveries: 56,
},
IPColocationFactor: 12.34,
BehavioralPenalty: 56.78,
},
ReqRespSync: 123456,
},
LastUpdate: 1923841,
},
},
}
for idx, test := range tests {
test := test
out, _ := json.Marshal(&test.expected)
t.Log(string(out))
t.Run(strconv.Itoa(idx), func(t *testing.T) {
result, err := deserializeScoresV0([]byte(test.data))
require.NoError(t, err)
require.Equal(t, test.expected, result)
})
}
}
// Package clock provides an abstraction for time to enable testing of functionality that uses time as an input.
package clock
import "time"
// Clock represents time in a way that can be provided by varying implementations.
// Methods are designed to be direct replacements for methods in the time package.
type Clock interface {
// Now provides the current local time. Equivalent to time.Now
Now() time.Time
// After waits for the duration to elapse and then sends the current time on the returned channel.
// It is equivalent to time.After
After(d time.Duration) <-chan time.Time
// NewTicker returns a new Ticker containing a channel that will send
// the current time on the channel after each tick. The period of the
// ticks is specified by the duration argument. The ticker will adjust
// the time interval or drop ticks to make up for slow receivers.
// The duration d must be greater than zero; if not, NewTicker will
// panic. Stop the ticker to release associated resources.
NewTicker(d time.Duration) Ticker
}
// A Ticker holds a channel that delivers "ticks" of a clock at intervals
type Ticker interface {
// Ch returns the channel for the ticker. Equivalent to time.Ticker.C
Ch() <-chan time.Time
// Stop turns off a ticker. After Stop, no more ticks will be sent.
// Stop does not close the channel, to prevent a concurrent goroutine
// reading from the channel from seeing an erroneous "tick".
Stop()
// Reset stops a ticker and resets its period to the specified duration.
// The next tick will arrive after the new period elapses. The duration d
// must be greater than zero; if not, Reset will panic.
Reset(d time.Duration)
}
// SystemClock provides an instance of Clock that uses the system clock via methods in the time package.
var SystemClock Clock = systemClock{}
type systemClock struct {
}
func (s systemClock) Now() time.Time {
return time.Now()
}
func (s systemClock) After(d time.Duration) <-chan time.Time {
return time.After(d)
}
type SystemTicker struct {
*time.Ticker
}
func (t *SystemTicker) Ch() <-chan time.Time {
return t.C
}
func (s systemClock) NewTicker(d time.Duration) Ticker {
return &SystemTicker{time.NewTicker(d)}
}
package clock
import (
"context"
"sync"
"time"
)
type action interface {
// Return true if the action is due to fire
isDue(time.Time) bool
// fire triggers the action. Returns true if the action needs to fire again in the future
fire(time.Time) bool
}
type task struct {
ch chan time.Time
due time.Time
}
func (t task) isDue(now time.Time) bool {
return !t.due.After(now)
}
func (t task) fire(now time.Time) bool {
t.ch <- now
close(t.ch)
return false
}
type ticker struct {
c Clock
ch chan time.Time
nextDue time.Time
period time.Duration
stopped bool
sync.Mutex
}
func (t *ticker) Ch() <-chan time.Time {
return t.ch
}
func (t *ticker) Stop() {
t.Lock()
defer t.Unlock()
t.stopped = true
}
func (t *ticker) Reset(d time.Duration) {
if d <= 0 {
panic("Continuously firing tickers are a really bad idea")
}
t.Lock()
defer t.Unlock()
t.period = d
t.nextDue = t.c.Now().Add(d)
}
func (t *ticker) isDue(now time.Time) bool {
t.Lock()
defer t.Unlock()
return !t.nextDue.After(now)
}
func (t *ticker) fire(now time.Time) bool {
t.Lock()
defer t.Unlock()
if t.stopped {
return false
}
t.ch <- now
t.nextDue = now.Add(t.period)
return true
}
type DeterministicClock struct {
now time.Time
pending []action
newPendingCh chan struct{}
lock sync.Mutex
}
// NewDeterministicClock creates a new clock where time only advances when the DeterministicClock.AdvanceTime method is called.
// This is intended for use in situations where a deterministic clock is required, such as testing or event driven systems.
func NewDeterministicClock(now time.Time) *DeterministicClock {
return &DeterministicClock{
now: now,
newPendingCh: make(chan struct{}, 1),
}
}
func (s *DeterministicClock) Now() time.Time {
s.lock.Lock()
defer s.lock.Unlock()
return s.now
}
func (s *DeterministicClock) After(d time.Duration) <-chan time.Time {
s.lock.Lock()
defer s.lock.Unlock()
ch := make(chan time.Time, 1)
if d.Nanoseconds() == 0 {
ch <- s.now
close(ch)
} else {
s.addPending(&task{ch: ch, due: s.now.Add(d)})
}
return ch
}
func (s *DeterministicClock) NewTicker(d time.Duration) Ticker {
if d <= 0 {
panic("Continuously firing tickers are a really bad idea")
}
s.lock.Lock()
defer s.lock.Unlock()
ch := make(chan time.Time, 1)
t := &ticker{
c: s,
ch: ch,
nextDue: s.now.Add(d),
period: d,
}
s.addPending(t)
return t
}
func (s *DeterministicClock) addPending(t action) {
s.pending = append(s.pending, t)
select {
case s.newPendingCh <- struct{}{}:
default:
// Must already have a new pending task flagged, do nothing
}
}
// WaitForNewPendingTask blocks until a new task is scheduled since the last time this method was called.
// true is returned if a new task was scheduled, false if the context completed before a new task was added.
func (s *DeterministicClock) WaitForNewPendingTask(ctx context.Context) bool {
select {
case <-ctx.Done():
return false
case <-s.newPendingCh:
return true
}
}
// AdvanceTime moves the time forward by the specific duration
func (s *DeterministicClock) AdvanceTime(d time.Duration) {
s.lock.Lock()
defer s.lock.Unlock()
s.now = s.now.Add(d)
var remaining []action
for _, a := range s.pending {
if !a.isDue(s.now) || a.fire(s.now) {
remaining = append(remaining, a)
}
}
s.pending = remaining
}
var _ Clock = (*DeterministicClock)(nil)
package clock
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestNowReturnsCurrentTime(t *testing.T) {
now := time.UnixMilli(23829382)
clock := NewDeterministicClock(now)
require.Equal(t, now, clock.Now())
}
func TestAdvanceTime(t *testing.T) {
start := time.UnixMilli(1000)
clock := NewDeterministicClock(start)
clock.AdvanceTime(500 * time.Millisecond)
require.Equal(t, start.Add(500*time.Millisecond), clock.Now())
}
func TestAfter(t *testing.T) {
t.Run("ZeroCompletesImmediately", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ch := clock.After(0)
require.Len(t, ch, 1, "duration should already have been reached")
})
t.Run("CompletesWhenTimeAdvances", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ch := clock.After(500 * time.Millisecond)
require.Len(t, ch, 0, "should not complete immediately")
clock.AdvanceTime(499 * time.Millisecond)
require.Len(t, ch, 0, "should not complete before time is due")
clock.AdvanceTime(1 * time.Millisecond)
require.Len(t, ch, 1, "should complete when time is reached")
require.Equal(t, clock.Now(), <-ch)
})
t.Run("CompletesWhenTimeAdvancesPastDue", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ch := clock.After(500 * time.Millisecond)
require.Len(t, ch, 0, "should not complete immediately")
clock.AdvanceTime(9000 * time.Millisecond)
require.Len(t, ch, 1, "should complete when time is past")
require.Equal(t, clock.Now(), <-ch)
})
t.Run("RegisterAsPending", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
_ = clock.After(500 * time.Millisecond)
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
require.True(t, clock.WaitForNewPendingTask(ctx), "should have added a new pending task")
})
}
func TestNewTicker(t *testing.T) {
t.Run("FiresAfterEachDuration", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire immediately")
clock.AdvanceTime(4 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire before due")
clock.AdvanceTime(1 * time.Second)
require.Len(t, ticker.Ch(), 1, "should fire when due")
require.Equal(t, clock.Now(), <-ticker.Ch(), "should post current time")
clock.AdvanceTime(4 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not re-fire before due")
clock.AdvanceTime(1 * time.Second)
require.Len(t, ticker.Ch(), 1, "should fire when due")
require.Equal(t, clock.Now(), <-ticker.Ch(), "should post current time")
})
t.Run("SkipsFiringWhenAdvancedMultipleDurations", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire immediately")
// Advance more than three periods, but should still only fire once
clock.AdvanceTime(16 * time.Second)
require.Len(t, ticker.Ch(), 1, "should fire when due")
require.Equal(t, clock.Now(), <-ticker.Ch(), "should post current time")
clock.AdvanceTime(1 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire until due again")
})
t.Run("StopFiring", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
ticker.Stop()
clock.AdvanceTime(10 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire after stop")
})
t.Run("ResetPanicWhenLessNotPositive", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
require.Panics(t, func() {
ticker.Reset(0)
}, "reset to 0 should panic")
require.Panics(t, func() {
ticker.Reset(-1)
}, "reset to negative duration should panic")
})
t.Run("ResetWithShorterPeriod", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire immediately")
ticker.Reset(1 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire immediately after reset")
clock.AdvanceTime(1 * time.Second)
require.Len(t, ticker.Ch(), 1, "should fire when new duration reached")
require.Equal(t, clock.Now(), <-ticker.Ch(), "should post current time")
})
t.Run("ResetWithLongerPeriod", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire immediately")
ticker.Reset(7 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire immediately after reset")
clock.AdvanceTime(5 * time.Second)
require.Len(t, ticker.Ch(), 0, "should not fire when old duration reached")
clock.AdvanceTime(2 * time.Second)
require.Len(t, ticker.Ch(), 1, "should fire when new duration reached")
require.Equal(t, clock.Now(), <-ticker.Ch(), "should post current time")
})
t.Run("RegisterAsPending", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
defer ticker.Stop()
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
require.True(t, clock.WaitForNewPendingTask(ctx), "should have added a new pending task")
})
}
func TestWaitForPending(t *testing.T) {
t.Run("DoNotBlockWhenAlreadyPending", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
_ = clock.After(5 * time.Minute)
_ = clock.After(5 * time.Minute)
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
require.True(t, clock.WaitForNewPendingTask(ctx), "should have added a new pending task")
})
t.Run("ResetNewPendingFlagAfterWaiting", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
_ = clock.After(5 * time.Minute)
_ = clock.After(5 * time.Minute)
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
require.True(t, clock.WaitForNewPendingTask(ctx), "should have added a new pending task")
ctx, cancelFunc = context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancelFunc()
require.False(t, clock.WaitForNewPendingTask(ctx), "should have reset new pending task flag")
})
}
......@@ -10,6 +10,8 @@ import (
"path/filepath"
"strings"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/beacon"
......@@ -104,7 +106,7 @@ func (ch *Cheater) RunAndClose(fn HeadFn) error {
_ = ch.Close()
return fmt.Errorf("failed to commit state change: %w", err)
}
header := preHeader // copy the header
header := types.CopyHeader(preHeader) // copy the header
header.Root = stateRoot
blockHash := header.Hash()
......
......@@ -29,7 +29,7 @@ DeployerWhitelist_Test:test_owner_succeeds() (gas: 7582)
DeployerWhitelist_Test:test_storageSlots_succeeds() (gas: 33395)
DisputeGameFactory_Test:test_owner_succeeds() (gas: 7582)
DisputeGameFactory_Test:test_setImplementation_notOwner_reverts() (gas: 11191)
DisputeGameFactory_Test:test_setImplementation_succeeds() (gas: 32635)
DisputeGameFactory_Test:test_setImplementation_succeeds() (gas: 38765)
DisputeGameFactory_Test:test_transferOwnership_notOwner_reverts() (gas: 10979)
DisputeGameFactory_Test:test_transferOwnership_succeeds() (gas: 13180)
FeeVault_Test:test_constructor_succeeds() (gas: 10736)
......
......@@ -13,7 +13,9 @@ import { IDisputeGameFactory } from "./IDisputeGameFactory.sol";
* @notice The Bond Manager serves as an escrow for permissionless output proposal bonds.
*/
contract BondManager {
// The Bond Type
/**
* @notice The Bond Type
*/
struct Bond {
address owner;
uint256 expiration;
......@@ -58,6 +60,13 @@ contract BondManager {
*/
IDisputeGameFactory public immutable DISPUTE_GAME_FACTORY;
/**
* @notice Amount of gas used to transfer ether when splitting the bond.
* This is a reasonable amount of gas for a transfer, even to a smart contract.
* The number of participants is bound of by the block gas limit.
*/
uint256 private constant TRANSFER_GAS = 30_000;
/**
* @notice Instantiates the bond maanger with the registered dispute game factory.
* @param _disputeGameFactory is the dispute game factory.
......@@ -147,13 +156,14 @@ contract BondManager {
uint256 len = _claimRecipients.length;
uint256 proportionalAmount = b.amount / len;
for (uint256 i = 0; i < len; i++) {
bool success = SafeCall.send(
payable(_claimRecipients[i]),
gasleft() / len,
proportionalAmount
);
require(success, "BondManager: Failed to send Ether.");
// Send the proportional amount to each recipient. Do not revert if a send fails as that
// will prevent other recipients from receiving their share.
for (uint256 i; i < len; i++) {
SafeCall.send({
_target: payable(_claimRecipients[i]),
_gas: TRANSFER_GAS,
_value: proportionalAmount
});
}
}
......
......@@ -113,6 +113,7 @@ contract DisputeGameFactory is Ownable, IDisputeGameFactory {
*/
function setImplementation(GameType gameType, IDisputeGame impl) external onlyOwner {
gameImpls[gameType] = impl;
emit ImplementationSet(address(impl), gameType);
}
/**
......
......@@ -23,6 +23,13 @@ interface IDisputeGameFactory {
Claim indexed rootClaim
);
/**
* @notice Emitted when a new game implementation added to the factory
* @param impl The implementation contract for the given `GameType`.
* @param gameType The type of the DisputeGame.
*/
event ImplementationSet(address indexed impl, GameType indexed gameType);
/**
* @notice `games` queries an internal a mapping that maps the hash of
* `gameType ++ rootClaim ++ extraData` to the deployed `DisputeGame` clone.
......
......@@ -18,6 +18,8 @@ contract DisputeGameFactory_Test is Test {
Claim indexed rootClaim
);
event ImplementationSet(address indexed impl, GameType indexed gameType);
function setUp() public {
factory = new DisputeGameFactory(address(this));
fakeClone = new FakeClone();
......@@ -105,6 +107,9 @@ contract DisputeGameFactory_Test is Test {
// There should be no implementation for the `GameType.FAULT` enum value, it has not been set.
assertEq(address(factory.gameImpls(GameType.FAULT)), address(0));
vm.expectEmit(true, true, true, true, address(factory));
emit ImplementationSet(address(1), GameType.FAULT);
// Set the implementation for the `GameType.FAULT` enum value.
factory.setImplementation(GameType.FAULT, IDisputeGame(address(1)));
......
......@@ -56,8 +56,7 @@
"@eth-optimism/core-utils": "^0.12.0",
"@openzeppelin/contracts": "4.7.3",
"@openzeppelin/contracts-upgradeable": "4.7.3",
"ethers": "^5.7.0",
"hardhat": "^2.9.6"
"ethers": "^5.7.0"
},
"devDependencies": {
"@eth-optimism/hardhat-deploy-config": "^0.2.6",
......@@ -82,6 +81,7 @@
"ethereum-waffle": "^3.0.0",
"forge-std": "https://github.com/foundry-rs/forge-std.git#46264e9788017fc74f9f58b7efa0bc6e1df6d410",
"glob": "^7.1.6",
"hardhat": "^2.9.6",
"hardhat-deploy": "^0.11.4",
"solhint": "^3.3.7",
"solhint-plugin-prettier": "^0.0.5",
......
......@@ -130,10 +130,8 @@ func newRPCCache(cache Cache) RPCCache {
"eth_getBlockTransactionCountByHash": staticHandler,
"eth_getUncleCountByBlockHash": staticHandler,
"eth_getBlockByHash": staticHandler,
"eth_getTransactionByHash": staticHandler,
"eth_getTransactionByBlockHashAndIndex": staticHandler,
"eth_getUncleByBlockHashAndIndex": staticHandler,
"eth_getTransactionReceipt": staticHandler,
}
return &rpcCache{
cache: cache,
......
......@@ -87,34 +87,6 @@ func TestRPCCacheImmutableRPCs(t *testing.T) {
},
name: "eth_getBlockByHash",
},
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getTransactionByHash",
Params: mustMarshalJSON([]string{"0x88df016429689c079f3b2f6ad39fa052532c56795b733da78a91ebe6a713944b"}),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `{"eth_getTransactionByHash":"!"}`,
ID: ID,
},
name: "eth_getTransactionByHash",
},
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getTransactionByBlockHashAndIndex",
Params: mustMarshalJSON([]string{"0xe670ec64341771606e55d6b4ca35a1a6b75ee3d5145a99d05921026d1527331", "0x55"}),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `{"eth_getTransactionByBlockHashAndIndex":"!"}`,
ID: ID,
},
name: "eth_getTransactionByBlockHashAndIndex",
},
{
req: &RPCReq{
JSONRPC: "2.0",
......@@ -129,20 +101,6 @@ func TestRPCCacheImmutableRPCs(t *testing.T) {
},
name: "eth_getUncleByBlockHashAndIndex",
},
{
req: &RPCReq{
JSONRPC: "2.0",
Method: "eth_getTransactionReceipt",
Params: mustMarshalJSON([]string{"0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c5"}),
ID: ID,
},
res: &RPCRes{
JSONRPC: "2.0",
Result: `{"eth_getTransactionReceipt":"!"}`,
ID: ID,
},
name: "eth_getTransactionReceipt",
},
}
for _, rpc := range rpcs {
......
......@@ -26,9 +26,7 @@ type ServerConfig struct {
}
type CacheConfig struct {
Enabled bool `toml:"enabled"`
BlockSyncRPCURL string `toml:"block_sync_rpc_url"`
NumBlockConfirmations int `toml:"num_block_confirmations"`
Enabled bool `toml:"enabled"`
}
type RedisConfig struct {
......
......@@ -77,12 +77,6 @@ func TestCaching(t *testing.T) {
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getBlockByHash\", \"id\": 999}",
1,
},
{
"eth_getTransactionByHash",
[]interface{}{"0x88df016429689c079f3b2f6ad39fa052532c56795b733da78a91ebe6a713944b"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getTransactionByHash\", \"id\": 999}",
1,
},
{
"eth_getTransactionByBlockHashAndIndex",
[]interface{}{"0xe670ec64341771606e55d6b4ca35a1a6b75ee3d5145a99d05921026d1527331", "0x55"},
......@@ -95,12 +89,6 @@ func TestCaching(t *testing.T) {
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getUncleByBlockHashAndIndex\", \"id\": 999}",
1,
},
{
"eth_getTransactionReceipt",
[]interface{}{"0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c5"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getTransactionReceipt\", \"id\": 999}",
1,
},
/* not cacheable */
{
"eth_getBlockByNumber",
......@@ -111,6 +99,18 @@ func TestCaching(t *testing.T) {
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getBlockByNumber\", \"id\": 999}",
2,
},
{
"eth_getTransactionReceipt",
[]interface{}{"0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c5"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getTransactionReceipt\", \"id\": 999}",
2,
},
{
"eth_getTransactionByHash",
[]interface{}{"0x88df016429689c079f3b2f6ad39fa052532c56795b733da78a91ebe6a713944b"},
"{\"jsonrpc\": \"2.0\", \"result\": \"eth_getTransactionByHash\", \"id\": 999}",
2,
},
{
"eth_call",
[]interface{}{
......
......@@ -10,8 +10,6 @@ namespace = "proxyd"
[cache]
enabled = true
block_sync_rpc_url = "$GOOD_BACKEND_RPC_URL"
[backends]
[backends.good]
......
......@@ -9,7 +9,6 @@ import (
"time"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
"github.com/prometheus/client_golang/prometheus/promhttp"
......@@ -206,27 +205,12 @@ func Start(config *Config) (*Server, func(), error) {
rpcCache RPCCache
)
if config.Cache.Enabled {
if config.Cache.BlockSyncRPCURL == "" {
return nil, nil, fmt.Errorf("block sync node required for caching")
}
blockSyncRPCURL, err := ReadFromEnvOrConfig(config.Cache.BlockSyncRPCURL)
if err != nil {
return nil, nil, err
}
if redisClient == nil {
log.Warn("redis is not configured, using in-memory cache")
cache = newMemoryCache()
} else {
cache = newRedisCache(redisClient, config.Redis.Namespace)
}
// Ideally, the BlocKSyncRPCURL should be the sequencer or a HA replica that's not far behind
ethClient, err := ethclient.Dial(blockSyncRPCURL)
if err != nil {
return nil, nil, err
}
defer ethClient.Close()
rpcCache = newRPCCache(newCacheWithCompression(cache))
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment