Commit 78d1ef1e authored by protolambda's avatar protolambda

op-node: implement new layered p2p connection gater

parent 17e98729
...@@ -70,6 +70,10 @@ type Metricer interface { ...@@ -70,6 +70,10 @@ type Metricer interface {
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
PayloadsQuarantineSize(n int) PayloadsQuarantineSize(n int)
RecordPeerUnban()
RecordIPUnban()
RecordDial(allow bool)
RecordAccept(allow bool)
} }
// Metrics tracks all the metrics for the op-node. // Metrics tracks all the metrics for the op-node.
...@@ -133,6 +137,10 @@ type Metrics struct { ...@@ -133,6 +137,10 @@ type Metrics struct {
PeerScores *prometheus.GaugeVec PeerScores *prometheus.GaugeVec
GossipEventsTotal *prometheus.CounterVec GossipEventsTotal *prometheus.CounterVec
BandwidthTotal *prometheus.GaugeVec BandwidthTotal *prometheus.GaugeVec
PeerUnbans prometheus.Counter
IPUnbans prometheus.Counter
Dials *prometheus.CounterVec
Accepts *prometheus.CounterVec
ChannelInputBytes prometheus.Counter ChannelInputBytes prometheus.Counter
...@@ -335,6 +343,30 @@ func NewMetrics(procName string) *Metrics { ...@@ -335,6 +343,30 @@ func NewMetrics(procName string) *Metrics {
}, []string{ }, []string{
"direction", "direction",
}), }),
PeerUnbans: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "peer_unbans",
Help: "Count of peer unbans",
}),
IPUnbans: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "ip_unbans",
Help: "Count of IP unbans",
}),
Dials: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "dials",
Help: "Count of outgoing dial attempts, with label to filter to allowed attempts",
}, []string{"allow"}),
Accepts: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "accepts",
Help: "Count of incoming dial attempts to accept, with label to filter to allowed attempts",
}, []string{"allow"}),
ChannelInputBytes: factory.NewCounter(prometheus.CounterOpts{ ChannelInputBytes: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
...@@ -663,6 +695,30 @@ func (m *Metrics) RecordChannelInputBytes(inputCompressedBytes int) { ...@@ -663,6 +695,30 @@ func (m *Metrics) RecordChannelInputBytes(inputCompressedBytes int) {
m.ChannelInputBytes.Add(float64(inputCompressedBytes)) m.ChannelInputBytes.Add(float64(inputCompressedBytes))
} }
func (m *Metrics) RecordPeerUnban() {
m.PeerUnbans.Inc()
}
func (m *Metrics) RecordIPUnban() {
m.IPUnbans.Inc()
}
func (m *Metrics) RecordDial(allow bool) {
if allow {
m.Dials.WithLabelValues("true").Inc()
} else {
m.Dials.WithLabelValues("false").Inc()
}
}
func (m *Metrics) RecordAccept(allow bool) {
if allow {
m.Accepts.WithLabelValues("true").Inc()
} else {
m.Accepts.WithLabelValues("false").Inc()
}
}
type noopMetricer struct{} type noopMetricer struct{}
var NoopMetrics Metricer = new(noopMetricer) var NoopMetrics Metricer = new(noopMetricer)
...@@ -768,3 +824,15 @@ func (n *noopMetricer) PayloadsQuarantineSize(int) { ...@@ -768,3 +824,15 @@ func (n *noopMetricer) PayloadsQuarantineSize(int) {
func (n *noopMetricer) RecordChannelInputBytes(int) { func (n *noopMetricer) RecordChannelInputBytes(int) {
} }
func (n *noopMetricer) RecordPeerUnban() {
}
func (n *noopMetricer) RecordIPUnban() {
}
func (n *noopMetricer) RecordDial(allow bool) {
}
func (n *noopMetricer) RecordAccept(allow bool) {
}
package gating
import (
"net"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
)
//go:generate mockery --name BlockingConnectionGater --output mocks/ --with-expecter=true
type BlockingConnectionGater interface {
connmgr.ConnectionGater
// BlockPeer adds a peer to the set of blocked peers.
// Note: active connections to the peer are not automatically closed.
BlockPeer(p peer.ID) error
UnblockPeer(p peer.ID) error
ListBlockedPeers() []peer.ID
// BlockAddr adds an IP address to the set of blocked addresses.
// Note: active connections to the IP address are not automatically closed.
BlockAddr(ip net.IP) error
UnblockAddr(ip net.IP) error
ListBlockedAddrs() []net.IP
// BlockSubnet adds an IP subnet to the set of blocked addresses.
// Note: active connections to the IP subnet are not automatically closed.
BlockSubnet(ipnet *net.IPNet) error
UnblockSubnet(ipnet *net.IPNet) error
ListBlockedSubnets() []*net.IPNet
}
func NewBlockingConnectionGater(store ds.Batching) (BlockingConnectionGater, error) {
return conngater.NewBasicConnectionGater(store)
}
package gating
import (
"errors"
"net"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
type UnbanMetrics interface {
RecordPeerUnban()
RecordIPUnban()
}
var UnknownExpiry = errors.New("unknown ban expiry")
//go:generate mockery --name ExpiryStore --output mocks/ --with-expecter=true
type ExpiryStore interface {
PeerBanExpiry(id peer.ID) (time.Time, error)
IPBanExpiry(ip net.IP) (time.Time, error)
}
// ExpiryConnectionGater enhances a BlockingConnectionGater by implementing ban-expiration
type ExpiryConnectionGater struct {
BlockingConnectionGater
store ExpiryStore
log log.Logger
clock clock.Clock
m UnbanMetrics
}
func AddBanExpiry(gater BlockingConnectionGater, store ExpiryStore, log log.Logger, clock clock.Clock, m UnbanMetrics) *ExpiryConnectionGater {
return &ExpiryConnectionGater{
BlockingConnectionGater: gater,
store: store,
log: log,
clock: clock,
m: m,
}
}
func (g *ExpiryConnectionGater) peerBanExpiryCheck(p peer.ID) (allow bool) {
// if the peer is blocked, check if it's time to unblock
expiry, err := g.store.PeerBanExpiry(p)
if err != nil {
if errors.Is(err, UnknownExpiry) {
return false // peer is permanently banned if no expiry time is set.
}
g.log.Warn("failed to load peer-ban expiry time", "peer_id", p, "err", err)
return false
}
if g.clock.Now().Before(expiry) {
return false
}
g.log.Info("peer-ban expired, unbanning peer", "peer_id", p, "expiry", expiry)
if err := g.BlockingConnectionGater.UnblockPeer(p); err != nil {
g.log.Warn("failed to unban peer", "peer_id", p, "err", err)
return false // if we ignored the error, then the inner connection-gater would drop them
}
g.m.RecordPeerUnban()
return true
}
func (g *ExpiryConnectionGater) addrBanExpiryCheck(ma multiaddr.Multiaddr) (allow bool) {
ip, err := manet.ToIP(ma)
if err != nil {
g.log.Error("tried to check multi-addr with bad IP", "addr", ma)
return false
}
// Check if it's a subnet-wide ban first. Subnet-bans do not expire.
for _, ipnet := range g.BlockingConnectionGater.ListBlockedSubnets() {
if ipnet.Contains(ip) {
return false // peer is still in banned subnet
}
}
// if just the IP is blocked, check if it's time to unblock
expiry, err := g.store.IPBanExpiry(ip)
if err != nil {
if errors.Is(err, UnknownExpiry) {
return false // IP is permanently banned if no expiry time is set.
}
g.log.Warn("failed to load IP-ban expiry time", "ip", ip, "err", err)
return false
}
if g.clock.Now().Before(expiry) {
return false
}
g.log.Info("IP-ban expired, unbanning IP", "ip", ip, "expiry", expiry)
if err := g.BlockingConnectionGater.UnblockAddr(ip); err != nil {
g.log.Warn("failed to unban IP", "ip", ip, "err", err)
return false // if we ignored the error, then the inner connection-gater would drop them
}
g.m.RecordIPUnban()
return true
}
func (g *ExpiryConnectionGater) InterceptPeerDial(p peer.ID) (allow bool) {
// if not allowed, and not expired, then do not allow the dial
return g.BlockingConnectionGater.InterceptPeerDial(p) || g.peerBanExpiryCheck(p)
}
func (g *ExpiryConnectionGater) InterceptAddrDial(id peer.ID, ma multiaddr.Multiaddr) (allow bool) {
if !g.BlockingConnectionGater.InterceptAddrDial(id, ma) {
// Check if it was intercepted because of a peer ban
if !g.BlockingConnectionGater.InterceptPeerDial(id) {
if !g.peerBanExpiryCheck(id) {
return false // peer is still peer-banned
}
if g.BlockingConnectionGater.InterceptAddrDial(id, ma) { // allow dial if peer-ban was everything
return true
}
}
// intercepted because of addr ban still, check if it is expired
if !g.addrBanExpiryCheck(ma) {
return false // peer is still addr-banned
}
}
return true
}
func (g *ExpiryConnectionGater) InterceptAccept(mas network.ConnMultiaddrs) (allow bool) {
return g.BlockingConnectionGater.InterceptAccept(mas) || g.addrBanExpiryCheck(mas.RemoteMultiaddr())
}
func (g *ExpiryConnectionGater) InterceptSecured(direction network.Direction, id peer.ID, mas network.ConnMultiaddrs) (allow bool) {
// Outbound dials are always accepted: the dial intercepts handle it before the connection is made.
if direction == network.DirOutbound {
return true
}
// InterceptSecured is called after InterceptAccept, we already checked the addrs.
// This leaves just the peer-ID expiry to check on inbound connections.
return g.BlockingConnectionGater.InterceptSecured(direction, id, mas) || g.peerBanExpiryCheck(id)
}
This diff is collapsed.
package gating
import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
type ConnectionGaterMetrics interface {
RecordDial(allow bool)
RecordAccept(allow bool)
}
type MeteredConnectionGater struct {
BlockingConnectionGater
m ConnectionGaterMetrics
}
func AddMetering(gater BlockingConnectionGater, m ConnectionGaterMetrics) *MeteredConnectionGater {
return &MeteredConnectionGater{BlockingConnectionGater: gater, m: m}
}
func (g *MeteredConnectionGater) InterceptPeerDial(p peer.ID) (allow bool) {
allow = g.BlockingConnectionGater.InterceptPeerDial(p)
g.m.RecordDial(allow)
return allow
}
func (g *MeteredConnectionGater) InterceptAddrDial(id peer.ID, ma multiaddr.Multiaddr) (allow bool) {
allow = g.BlockingConnectionGater.InterceptAddrDial(id, ma)
g.m.RecordDial(allow)
return allow
}
func (g *MeteredConnectionGater) InterceptAccept(mas network.ConnMultiaddrs) (allow bool) {
allow = g.BlockingConnectionGater.InterceptAccept(mas)
g.m.RecordAccept(allow)
return allow
}
func (g *MeteredConnectionGater) InterceptSecured(dir network.Direction, id peer.ID, mas network.ConnMultiaddrs) (allow bool) {
allow = g.BlockingConnectionGater.InterceptSecured(dir, id, mas)
g.m.RecordAccept(allow)
return allow
}
This diff is collapsed.
// Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks
import (
net "net"
mock "github.com/stretchr/testify/mock"
peer "github.com/libp2p/go-libp2p/core/peer"
time "time"
)
// ExpiryStore is an autogenerated mock type for the ExpiryStore type
type ExpiryStore struct {
mock.Mock
}
type ExpiryStore_Expecter struct {
mock *mock.Mock
}
func (_m *ExpiryStore) EXPECT() *ExpiryStore_Expecter {
return &ExpiryStore_Expecter{mock: &_m.Mock}
}
// IPBanExpiry provides a mock function with given fields: ip
func (_m *ExpiryStore) IPBanExpiry(ip net.IP) (time.Time, error) {
ret := _m.Called(ip)
var r0 time.Time
var r1 error
if rf, ok := ret.Get(0).(func(net.IP) (time.Time, error)); ok {
return rf(ip)
}
if rf, ok := ret.Get(0).(func(net.IP) time.Time); ok {
r0 = rf(ip)
} else {
r0 = ret.Get(0).(time.Time)
}
if rf, ok := ret.Get(1).(func(net.IP) error); ok {
r1 = rf(ip)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ExpiryStore_IPBanExpiry_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IPBanExpiry'
type ExpiryStore_IPBanExpiry_Call struct {
*mock.Call
}
// IPBanExpiry is a helper method to define mock.On call
// - ip net.IP
func (_e *ExpiryStore_Expecter) IPBanExpiry(ip interface{}) *ExpiryStore_IPBanExpiry_Call {
return &ExpiryStore_IPBanExpiry_Call{Call: _e.mock.On("IPBanExpiry", ip)}
}
func (_c *ExpiryStore_IPBanExpiry_Call) Run(run func(ip net.IP)) *ExpiryStore_IPBanExpiry_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(net.IP))
})
return _c
}
func (_c *ExpiryStore_IPBanExpiry_Call) Return(_a0 time.Time, _a1 error) *ExpiryStore_IPBanExpiry_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *ExpiryStore_IPBanExpiry_Call) RunAndReturn(run func(net.IP) (time.Time, error)) *ExpiryStore_IPBanExpiry_Call {
_c.Call.Return(run)
return _c
}
// PeerBanExpiry provides a mock function with given fields: id
func (_m *ExpiryStore) PeerBanExpiry(id peer.ID) (time.Time, error) {
ret := _m.Called(id)
var r0 time.Time
var r1 error
if rf, ok := ret.Get(0).(func(peer.ID) (time.Time, error)); ok {
return rf(id)
}
if rf, ok := ret.Get(0).(func(peer.ID) time.Time); ok {
r0 = rf(id)
} else {
r0 = ret.Get(0).(time.Time)
}
if rf, ok := ret.Get(1).(func(peer.ID) error); ok {
r1 = rf(id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ExpiryStore_PeerBanExpiry_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PeerBanExpiry'
type ExpiryStore_PeerBanExpiry_Call struct {
*mock.Call
}
// PeerBanExpiry is a helper method to define mock.On call
// - id peer.ID
func (_e *ExpiryStore_Expecter) PeerBanExpiry(id interface{}) *ExpiryStore_PeerBanExpiry_Call {
return &ExpiryStore_PeerBanExpiry_Call{Call: _e.mock.On("PeerBanExpiry", id)}
}
func (_c *ExpiryStore_PeerBanExpiry_Call) Run(run func(id peer.ID)) *ExpiryStore_PeerBanExpiry_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID))
})
return _c
}
func (_c *ExpiryStore_PeerBanExpiry_Call) Return(_a0 time.Time, _a1 error) *ExpiryStore_PeerBanExpiry_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *ExpiryStore_PeerBanExpiry_Call) RunAndReturn(run func(peer.ID) (time.Time, error)) *ExpiryStore_PeerBanExpiry_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewExpiryStore interface {
mock.TestingT
Cleanup(func())
}
// NewExpiryStore creates a new instance of ExpiryStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewExpiryStore(t mockConstructorTestingTNewExpiryStore) *ExpiryStore {
mock := &ExpiryStore{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// Code generated by mockery v2.28.0. DO NOT EDIT.
package mocks
import (
peer "github.com/libp2p/go-libp2p/core/peer"
mock "github.com/stretchr/testify/mock"
)
// Scores is an autogenerated mock type for the Scores type
type Scores struct {
mock.Mock
}
type Scores_Expecter struct {
mock *mock.Mock
}
func (_m *Scores) EXPECT() *Scores_Expecter {
return &Scores_Expecter{mock: &_m.Mock}
}
// GetPeerScore provides a mock function with given fields: id
func (_m *Scores) GetPeerScore(id peer.ID) (float64, error) {
ret := _m.Called(id)
var r0 float64
var r1 error
if rf, ok := ret.Get(0).(func(peer.ID) (float64, error)); ok {
return rf(id)
}
if rf, ok := ret.Get(0).(func(peer.ID) float64); ok {
r0 = rf(id)
} else {
r0 = ret.Get(0).(float64)
}
if rf, ok := ret.Get(1).(func(peer.ID) error); ok {
r1 = rf(id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Scores_GetPeerScore_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPeerScore'
type Scores_GetPeerScore_Call struct {
*mock.Call
}
// GetPeerScore is a helper method to define mock.On call
// - id peer.ID
func (_e *Scores_Expecter) GetPeerScore(id interface{}) *Scores_GetPeerScore_Call {
return &Scores_GetPeerScore_Call{Call: _e.mock.On("GetPeerScore", id)}
}
func (_c *Scores_GetPeerScore_Call) Run(run func(id peer.ID)) *Scores_GetPeerScore_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID))
})
return _c
}
func (_c *Scores_GetPeerScore_Call) Return(_a0 float64, _a1 error) *Scores_GetPeerScore_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *Scores_GetPeerScore_Call) RunAndReturn(run func(peer.ID) (float64, error)) *Scores_GetPeerScore_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewScores interface {
mock.TestingT
Cleanup(func())
}
// NewScores creates a new instance of Scores. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewScores(t mockConstructorTestingTNewScores) *Scores {
mock := &Scores{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
package gating
import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
//go:generate mockery --name Scores --output mocks/ --with-expecter=true
type Scores interface {
GetPeerScore(id peer.ID) (float64, error)
}
// ScoringConnectionGater enhances a ConnectionGater by enforcing a minimum score for peer connections
type ScoringConnectionGater struct {
BlockingConnectionGater
scores Scores
minScore float64
}
func AddScoring(gater BlockingConnectionGater, scores Scores, minScore float64) *ScoringConnectionGater {
return &ScoringConnectionGater{BlockingConnectionGater: gater, scores: scores, minScore: minScore}
}
func (g *ScoringConnectionGater) checkScore(p peer.ID) (allow bool) {
score, err := g.scores.GetPeerScore(p)
if err != nil {
return false
}
return score >= g.minScore
}
func (g *ScoringConnectionGater) InterceptPeerDial(p peer.ID) (allow bool) {
return g.BlockingConnectionGater.InterceptPeerDial(p) && g.checkScore(p)
}
func (g *ScoringConnectionGater) InterceptAddrDial(id peer.ID, ma multiaddr.Multiaddr) (allow bool) {
return g.BlockingConnectionGater.InterceptAddrDial(id, ma) && g.checkScore(id)
}
func (g *ScoringConnectionGater) InterceptSecured(dir network.Direction, id peer.ID, mas network.ConnMultiaddrs) (allow bool) {
return g.BlockingConnectionGater.InterceptSecured(dir, id, mas) && g.checkScore(id)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment