Commit 037d5527 authored by metacertain's avatar metacertain Committed by GitHub

kademlia, libp2p: limit incoming connections when bins oversaturate (#1352)

Co-authored-by: default avataracud <12988138+acud@users.noreply.github.com>
parent 906c0bda
......@@ -5,6 +5,7 @@
package kademlia
var (
TimeToRetry = &timeToRetry
SaturationPeers = &saturationPeers
TimeToRetry = &timeToRetry
SaturationPeers = &saturationPeers
OverSaturationPeers = &overSaturationPeers
)
......@@ -34,9 +34,10 @@ var (
timeToRetry = 60 * time.Second
shortRetry = 30 * time.Second
saturationPeers = 4
overSaturationPeers = 16
)
type binSaturationFunc func(bin uint8, peers, connected *pslice.PSlice) bool
type binSaturationFunc func(bin uint8, peers, connected *pslice.PSlice) (saturated bool, oversaturated bool)
// Options for injecting services to Kademlia.
type Options struct {
......@@ -151,7 +152,7 @@ func (k *Kad) manage() {
k.waitNextMu.Unlock()
currentDepth := k.NeighborhoodDepth()
if saturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers); saturated {
if saturated, _ := k.saturationFunc(po, k.knownPeers, k.connectedPeers); saturated {
return false, true, nil // bin is saturated, skip to next bin
}
......@@ -281,12 +282,12 @@ func (k *Kad) connectBootnodes(ctx context.Context) {
// binSaturated indicates whether a certain bin is saturated or not.
// when a bin is not saturated it means we would like to proactively
// initiate connections to other peers in the bin.
func binSaturated(bin uint8, peers, connected *pslice.PSlice) bool {
func binSaturated(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
potentialDepth := recalcDepth(peers)
// short circuit for bins which are >= depth
if bin >= potentialDepth {
return false
return false, false
}
// lets assume for now that the minimum number of peers in a bin
......@@ -304,7 +305,7 @@ func binSaturated(bin uint8, peers, connected *pslice.PSlice) bool {
return false, false, nil
})
return size >= saturationPeers
return size >= saturationPeers, size >= overSaturationPeers
}
// recalcDepth calculates and returns the kademlia depth.
......@@ -451,8 +452,20 @@ func (k *Kad) AddPeers(ctx context.Context, addrs ...swarm.Address) error {
return nil
}
func (k *Kad) Pick(peer p2p.Peer) bool {
po := swarm.Proximity(k.base.Bytes(), peer.Address.Bytes())
_, oversaturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers)
// pick the peer if we are not oversaturated
return !oversaturated
}
// Connected is called when a peer has dialed in.
func (k *Kad) Connected(ctx context.Context, peer p2p.Peer) error {
po := swarm.Proximity(k.base.Bytes(), peer.Address.Bytes())
if _, overSaturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers); overSaturated {
return topology.ErrOversaturated
}
if err := k.connected(ctx, peer.Address); err != nil {
return err
}
......@@ -471,6 +484,7 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
}
po := swarm.Proximity(k.base.Bytes(), addr.Bytes())
k.knownPeers.Add(addr, po)
k.connectedPeers.Add(addr, po)
......@@ -483,8 +497,8 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
k.depthMu.Unlock()
k.notifyPeerSig()
return nil
}
// Disconnected is called when peer disconnects.
......
......@@ -162,9 +162,10 @@ func TestManage(t *testing.T) {
var (
conns int32 // how many connect calls were made to the p2p mock
saturationVal = false
saturationFunc = func(bin uint8, peers, connected *pslice.PSlice) bool {
return saturationVal
saturationVal = false
overSaturationVal = false
saturationFunc = func(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
return saturationVal, overSaturationVal
}
base, kad, ab, _, signer = newTestKademlia(&conns, nil, saturationFunc, nil)
)
......@@ -262,6 +263,61 @@ func TestBinSaturation(t *testing.T) {
waitCounter(t, &conns, 1)
}
func TestOversaturation(t *testing.T) {
defer func(p int) {
*kademlia.OverSaturationPeers = p
}(*kademlia.OverSaturationPeers)
*kademlia.OverSaturationPeers = 8
var (
conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, nil, nil)
)
if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
defer kad.Close()
// Add maximum accepted number of peers up until bin 5 without problems
for i := 0; i < 6; i++ {
for j := 0; j < *kademlia.OverSaturationPeers; j++ {
addr := test.RandomAddressAt(base, i)
// if error is not nil as specified, connectOne goes fatal
connectOne(t, signer, kad, ab, addr, nil)
}
// see depth is limited to currently added peers proximity
kDepth(t, kad, i)
}
// see depth is 5
kDepth(t, kad, 5)
for k := 0; k < 5; k++ {
// no further connections can be made
for l := 0; l < 3; l++ {
addr := test.RandomAddressAt(base, k)
// if error is not as specified, connectOne goes fatal
connectOne(t, signer, kad, ab, addr, topology.ErrOversaturated)
// check that pick works correctly
if kad.Pick(p2p.Peer{Address: addr}) {
t.Fatal("should not pick the peer")
}
}
// see depth is still as expected
kDepth(t, kad, 5)
}
// see we can still add / not limiting more peers in neighborhood depth
for m := 0; m < 12; m++ {
addr := test.RandomAddressAt(base, 5)
// if error is not nil as specified, connectOne goes fatal
connectOne(t, signer, kad, ab, addr, nil)
// see depth is still as expected
kDepth(t, kad, 5)
}
}
// TestNotifierHooks tests that the Connected/Disconnected hooks
// result in the correct behavior once called.
func TestNotifierHooks(t *testing.T) {
......@@ -276,7 +332,7 @@ func TestNotifierHooks(t *testing.T) {
}
defer kad.Close()
connectOne(t, signer, kad, ab, peer)
connectOne(t, signer, kad, ab, peer, nil)
p, err := kad.ClosestPeer(addr)
if err != nil {
......@@ -324,7 +380,7 @@ func TestDiscoveryHooks(t *testing.T) {
// add another peer that dialed in, check that all peers gossiped
// correctly to each other
connectOne(t, signer, kad, ab, p3)
connectOne(t, signer, kad, ab, p3, nil)
waitBcast(t, disc, p1, p3)
waitBcast(t, disc, p2, p3)
waitBcast(t, disc, p3, p1, p2)
......@@ -725,7 +781,7 @@ func TestStart(t *testing.T) {
})
}
func newTestKademlia(connCounter, failedConnCounter *int32, f func(bin uint8, peers, connected *pslice.PSlice) bool, bootnodes []ma.Multiaddr) (swarm.Address, *kademlia.Kad, addressbook.Interface, *mock.Discovery, beeCrypto.Signer) {
func newTestKademlia(connCounter, failedConnCounter *int32, f func(bin uint8, peers, connected *pslice.PSlice) (bool, bool), bootnodes []ma.Multiaddr) (swarm.Address, *kademlia.Kad, addressbook.Interface, *mock.Discovery, beeCrypto.Signer) {
pk, _ := crypto.GenerateSecp256k1Key()
var (
......@@ -784,7 +840,7 @@ func removeOne(k *kademlia.Kad, peer swarm.Address) {
const underlayBase = "/ip4/127.0.0.1/tcp/1634/dns/"
func connectOne(t *testing.T, signer beeCrypto.Signer, k *kademlia.Kad, ab addressbook.Putter, peer swarm.Address) {
func connectOne(t *testing.T, signer beeCrypto.Signer, k *kademlia.Kad, ab addressbook.Putter, peer swarm.Address, expErr error) {
t.Helper()
multiaddr, err := ma.NewMultiaddr(underlayBase + peer.String())
if err != nil {
......@@ -798,7 +854,12 @@ func connectOne(t *testing.T, signer beeCrypto.Signer, k *kademlia.Kad, ab addre
if err := ab.Put(peer, *bzzAddr); err != nil {
t.Fatal(err)
}
_ = k.Connected(context.Background(), p2p.Peer{Address: peer})
err = k.Connected(context.Background(), p2p.Peer{Address: peer})
if !errors.Is(err, expErr) {
t.Fatalf("expected error %v , got %v", expErr, err)
}
}
func addOne(t *testing.T, signer beeCrypto.Signer, k *kademlia.Kad, ab addressbook.Putter, peer swarm.Address) {
......
......@@ -364,7 +364,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
kad := kademlia.New(swarmAddress, addressbook, hive, p2ps, logger, kademlia.Options{Bootnodes: bootnodes, Standalone: o.Standalone})
b.topologyCloser = kad
hive.SetAddPeersHandler(kad.AddPeers)
p2ps.SetNotifier(kad)
p2ps.SetPickyNotifier(kad)
addrs, err := p2ps.Addresses()
if err != nil {
return nil, fmt.Errorf("get server addresses: %w", err)
......
......@@ -388,13 +388,13 @@ func TestTopologyNotifier(t *testing.T) {
n2disconnectedPeer = p
}
)
notifier1 := mockNotifier(n1c, n1d)
notifier1 := mockNotifier(n1c, n1d, true)
s1, overlay1 := newService(t, 1, libp2pServiceOpts{Addressbook: ab1})
s1.SetNotifier(notifier1)
s1.SetPickyNotifier(notifier1)
notifier2 := mockNotifier(n2c, n2d)
notifier2 := mockNotifier(n2c, n2d, true)
s2, overlay2 := newService(t, 1, libp2pServiceOpts{Addressbook: ab2})
s2.SetNotifier(notifier2)
s2.SetPickyNotifier(notifier2)
addr := serviceUnderlayAddress(t, s1)
......@@ -455,6 +455,62 @@ func TestTopologyNotifier(t *testing.T) {
waitAddrSet(t, &n2disconnectedPeer.Address, &mtx, overlay1)
}
func TestTopologyOverSaturated(t *testing.T) {
var (
mtx sync.Mutex
ctx = context.Background()
ab1, ab2 = addressbook.New(mock.NewStateStore()), addressbook.New(mock.NewStateStore())
n1connectedPeer p2p.Peer
n2connectedPeer p2p.Peer
n2disconnectedPeer p2p.Peer
n1c = func(_ context.Context, p p2p.Peer) error {
mtx.Lock()
defer mtx.Unlock()
expectZeroAddress(t, n1connectedPeer.Address) // fail if set more than once
n1connectedPeer = p
return nil
}
n1d = func(p p2p.Peer) {}
n2c = func(_ context.Context, p p2p.Peer) error {
mtx.Lock()
defer mtx.Unlock()
expectZeroAddress(t, n2connectedPeer.Address) // fail if set more than once
n2connectedPeer = p
return nil
}
n2d = func(p p2p.Peer) {
mtx.Lock()
defer mtx.Unlock()
n2disconnectedPeer = p
}
)
//this notifier will not pick the peer
notifier1 := mockNotifier(n1c, n1d, false)
s1, overlay1 := newService(t, 1, libp2pServiceOpts{Addressbook: ab1})
s1.SetPickyNotifier(notifier1)
notifier2 := mockNotifier(n2c, n2d, false)
s2, _ := newService(t, 1, libp2pServiceOpts{Addressbook: ab2})
s2.SetPickyNotifier(notifier2)
addr := serviceUnderlayAddress(t, s1)
// s2 connects to s1, thus the notifier on s1 should be called on Connect
_, err := s2.Connect(ctx, addr)
if err == nil {
t.Fatal("expected connect to fail but it didnt")
}
expectPeers(t, s1)
expectPeersEventually(t, s2)
waitAddrSet(t, &n2disconnectedPeer.Address, &mtx, overlay1)
}
func expectZeroAddress(t *testing.T, addrs ...swarm.Address) {
t.Helper()
for i, a := range addrs {
......@@ -496,6 +552,7 @@ func checkAddressbook(t *testing.T, ab addressbook.Getter, overlay swarm.Address
type notifiee struct {
connected func(context.Context, p2p.Peer) error
disconnected func(p2p.Peer)
pick bool
}
func (n *notifiee) Connected(c context.Context, p p2p.Peer) error {
......@@ -506,8 +563,12 @@ func (n *notifiee) Disconnected(p p2p.Peer) {
n.disconnected(p)
}
func mockNotifier(c cFunc, d dFunc) p2p.Notifier {
return &notifiee{connected: c, disconnected: d}
func (n *notifiee) Pick(p p2p.Peer) bool {
return n.pick
}
func mockNotifier(c cFunc, d dFunc, pick bool) p2p.PickyNotifier {
return &notifiee{connected: c, disconnected: d, pick: pick}
}
type cFunc func(context.Context, p2p.Peer) error
......
......@@ -61,7 +61,7 @@ type Service struct {
connectionBreaker breaker.Interface
blocklist *blocklist.Blocklist
protocols []p2p.ProtocolSpec
notifier p2p.Notifier
notifier p2p.PickyNotifier
logger logging.Logger
tracer *tracing.Tracer
ready chan struct{}
......@@ -269,6 +269,15 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
return
}
if s.notifier != nil {
if !s.notifier.Pick(p2p.Peer{Address: i.BzzAddress.Overlay}) {
s.logger.Errorf("don't want incoming peer %s. disconnecting", peerID)
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(peerID)
return
}
}
if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
if err = handshakeStream.FullClose(); err != nil {
s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err)
......@@ -303,12 +312,22 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
}
}
}
s.protocolsmu.RUnlock()
if s.notifier != nil {
if err := s.notifier.Connected(ctx, peer); err != nil {
s.logger.Debugf("notifier.Connected: peer: %s: %v", i.BzzAddress.Overlay, err)
s.logger.Debugf("notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
// note: this cannot be unit tested since the node
// waiting on handshakeStream.FullClose() on the other side
// might actually get a stream reset when we disconnect here
// resulting in a flaky response from the Connect method on
// the other side.
// that is why the Pick method has been added to the notifier
// interface, in addition to the possibility of deciding whether
// a peer connection is wanted prior to adding the peer to the
// peer registry and starting the protocols.
_ = s.Disconnect(i.BzzAddress.Overlay)
return
}
}
......@@ -327,7 +346,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
return s, nil
}
func (s *Service) SetNotifier(n p2p.Notifier) {
func (s *Service) SetPickyNotifier(n p2p.PickyNotifier) {
s.notifier = n
}
......@@ -500,12 +519,14 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
if err != nil {
s.logger.Debugf("blocklisting: exists %s: %v", info.ID, err)
s.logger.Errorf("internal error while connecting with peer %s", info.ID)
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(info.ID)
return nil, fmt.Errorf("peer blocklisted")
}
if blocked {
s.logger.Errorf("blocked connection from blocklisted peer %s", info.ID)
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(info.ID)
return nil, fmt.Errorf("peer blocklisted")
}
......
......@@ -23,7 +23,7 @@ type Service struct {
peersFunc func() []p2p.Peer
blocklistedPeersFunc func() ([]p2p.Peer, error)
addressesFunc func() ([]ma.Multiaddr, error)
setNotifierFunc func(p2p.Notifier)
setNotifierFunc func(p2p.PickyNotifier)
setWelcomeMessageFunc func(string) error
getWelcomeMessageFunc func() string
blocklistFunc func(swarm.Address, time.Duration) error
......@@ -38,7 +38,7 @@ func WithAddProtocolFunc(f func(p2p.ProtocolSpec) error) Option {
}
// WithSetNotifierFunc sets the mock implementation of the SetNotifier function
func WithSetNotifierFunc(f func(p2p.Notifier)) Option {
func WithSetPickyNotifierFunc(f func(p2p.PickyNotifier)) Option {
return optionFunc(func(s *Service) {
s.setNotifierFunc = f
})
......@@ -173,7 +173,7 @@ func (s *Service) Blocklist(overlay swarm.Address, duration time.Duration) error
return s.blocklistFunc(overlay, duration)
}
func (s *Service) SetNotifier(f p2p.Notifier) {
func (s *Service) SetPickyNotifier(f p2p.PickyNotifier) {
if s.setNotifierFunc == nil {
return
}
......
......@@ -25,7 +25,7 @@ type Service interface {
Peers() []Peer
BlocklistedPeers() ([]Peer, error)
Addresses() ([]ma.Multiaddr, error)
SetNotifier(Notifier)
SetPickyNotifier(PickyNotifier)
}
type Disconnecter interface {
......@@ -35,6 +35,12 @@ type Disconnecter interface {
Blocklist(overlay swarm.Address, duration time.Duration) error
}
// PickyNotifer can decide whether a peer should be picked
type PickyNotifier interface {
Pick(Peer) bool
Notifier
}
type Notifier interface {
Connected(context.Context, Peer) error
Disconnected(Peer)
......
......@@ -15,8 +15,9 @@ import (
)
var (
ErrNotFound = errors.New("no peer found")
ErrWantSelf = errors.New("node wants self")
ErrNotFound = errors.New("no peer found")
ErrWantSelf = errors.New("node wants self")
ErrOversaturated = errors.New("oversaturated")
)
type Driver interface {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment