Commit f37f7967 authored by Esad Akar's avatar Esad Akar Committed by GitHub

feat: bootnodes drop peers from oversaturated bins (#1715)

parent 04e90294
...@@ -5,7 +5,8 @@ ...@@ -5,7 +5,8 @@
package kademlia package kademlia
var ( var (
TimeToRetry = &timeToRetry TimeToRetry = &timeToRetry
SaturationPeers = &saturationPeers SaturationPeers = &saturationPeers
OverSaturationPeers = &overSaturationPeers OverSaturationPeers = &overSaturationPeers
BootnodeOverSaturationPeers = &bootnodeOverSaturationPeers
) )
...@@ -34,16 +34,18 @@ const ( ...@@ -34,16 +34,18 @@ const (
) )
var ( var (
saturationPeers = 4 saturationPeers = 4
overSaturationPeers = 16 overSaturationPeers = 16
shortRetry = 30 * time.Second bootnodeOverSaturationPeers = 64
timeToRetry = 2 * shortRetry shortRetry = 30 * time.Second
broadcastBinSize = 4 timeToRetry = 2 * shortRetry
broadcastBinSize = 4
) )
var ( var (
errOverlayMismatch = errors.New("overlay mismatch") errOverlayMismatch = errors.New("overlay mismatch")
errPruneEntry = errors.New("prune entry") errPruneEntry = errors.New("prune entry")
errEmptyBin = errors.New("empty bin")
) )
type binSaturationFunc func(bin uint8, peers, connected *pslice.PSlice) (saturated bool, oversaturated bool) type binSaturationFunc func(bin uint8, peers, connected *pslice.PSlice) (saturated bool, oversaturated bool)
...@@ -101,7 +103,11 @@ func New(base swarm.Address, ...@@ -101,7 +103,11 @@ func New(base swarm.Address,
logger logging.Logger, logger logging.Logger,
o Options) *Kad { o Options) *Kad {
if o.SaturationFunc == nil { if o.SaturationFunc == nil {
o.SaturationFunc = binSaturated os := overSaturationPeers
if o.BootnodeMode {
os = bootnodeOverSaturationPeers
}
o.SaturationFunc = binSaturated(os)
} }
if o.BitSuffixLength == 0 { if o.BitSuffixLength == 0 {
o.BitSuffixLength = defaultBitSuffixLength o.BitSuffixLength = defaultBitSuffixLength
...@@ -543,30 +549,32 @@ func (k *Kad) connectBootnodes(ctx context.Context) { ...@@ -543,30 +549,32 @@ func (k *Kad) connectBootnodes(ctx context.Context) {
// binSaturated indicates whether a certain bin is saturated or not. // binSaturated indicates whether a certain bin is saturated or not.
// when a bin is not saturated it means we would like to proactively // when a bin is not saturated it means we would like to proactively
// initiate connections to other peers in the bin. // initiate connections to other peers in the bin.
func binSaturated(bin uint8, peers, connected *pslice.PSlice) (bool, bool) { func binSaturated(oversaturationAmount int) binSaturationFunc {
potentialDepth := recalcDepth(peers, swarm.MaxPO) return func(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
potentialDepth := recalcDepth(peers, swarm.MaxPO)
// short circuit for bins which are >= depth
if bin >= potentialDepth {
return false, false
}
// lets assume for now that the minimum number of peers in a bin // short circuit for bins which are >= depth
// would be 2, under which we would always want to connect to new peers if bin >= potentialDepth {
// obviously this should be replaced with a better optimization return false, false
// the iterator is used here since when we check if a bin is saturated,
// the plain number of size of bin might not suffice (for example for squared
// gaps measurement)
size := 0
_ = connected.EachBin(func(_ swarm.Address, po uint8) (bool, bool, error) {
if po == bin {
size++
} }
return false, false, nil
})
return size >= saturationPeers, size >= overSaturationPeers // lets assume for now that the minimum number of peers in a bin
// would be 2, under which we would always want to connect to new peers
// obviously this should be replaced with a better optimization
// the iterator is used here since when we check if a bin is saturated,
// the plain number of size of bin might not suffice (for example for squared
// gaps measurement)
size := 0
_ = connected.EachBin(func(_ swarm.Address, po uint8) (bool, bool, error) {
if po == bin {
size++
}
return false, false, nil
})
return size >= saturationPeers, size >= oversaturationAmount
}
} }
// recalcDepth calculates and returns the kademlia depth. // recalcDepth calculates and returns the kademlia depth.
...@@ -763,15 +771,26 @@ func (k *Kad) Pick(peer p2p.Peer) bool { ...@@ -763,15 +771,26 @@ func (k *Kad) Pick(peer p2p.Peer) bool {
// Connected is called when a peer has dialed in. // Connected is called when a peer has dialed in.
func (k *Kad) Connected(ctx context.Context, peer p2p.Peer) error { func (k *Kad) Connected(ctx context.Context, peer p2p.Peer) error {
if !k.bootnode {
// don't run this check if we're a bootnode address := peer.Address
po := swarm.Proximity(k.base.Bytes(), peer.Address.Bytes()) po := swarm.Proximity(k.base.Bytes(), address.Bytes())
if _, overSaturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers); overSaturated {
return topology.ErrOversaturated if _, overSaturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers); overSaturated {
if k.bootnode {
randPeer, err := k.randomPeer(po)
if err != nil {
return err
}
_ = k.p2p.Disconnect(randPeer)
goto connected
} }
return topology.ErrOversaturated
} }
if err := k.connected(ctx, peer.Address); err != nil { connected:
if err := k.connected(ctx, address); err != nil {
return err return err
} }
...@@ -1203,3 +1222,19 @@ func randomSubset(addrs []swarm.Address, count int) ([]swarm.Address, error) { ...@@ -1203,3 +1222,19 @@ func randomSubset(addrs []swarm.Address, count int) ([]swarm.Address, error) {
return addrs[:count], nil return addrs[:count], nil
} }
func (k *Kad) randomPeer(bin uint8) (swarm.Address, error) {
peers := k.connectedPeers.BinPeers(bin)
if len(peers) == 0 {
return swarm.ZeroAddress, errEmptyBin
}
rndIndx, err := random.Int(random.Reader, big.NewInt(int64(len(peers))))
if err != nil {
return swarm.ZeroAddress, err
}
return peers[rndIndx.Int64()], nil
}
...@@ -537,6 +537,60 @@ func TestOversaturationBootnode(t *testing.T) { ...@@ -537,6 +537,60 @@ func TestOversaturationBootnode(t *testing.T) {
} }
} }
func TestBootnodeMaxConnections(t *testing.T) {
defer func(p int) {
*kademlia.BootnodeOverSaturationPeers = p
}(*kademlia.BootnodeOverSaturationPeers)
*kademlia.BootnodeOverSaturationPeers = 4
var (
conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, kademlia.Options{BootnodeMode: true})
)
kad.SetRadius(swarm.MaxPO) // don't use radius for checks
if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
defer kad.Close()
// Add maximum accepted number of peers up until bin 5 without problems
for i := 0; i < 6; i++ {
for j := 0; j < *kademlia.BootnodeOverSaturationPeers; j++ {
addr := test.RandomAddressAt(base, i)
// if error is not nil as specified, connectOne goes fatal
connectOne(t, signer, kad, ab, addr, nil)
}
// see depth is limited to currently added peers proximity
kDepth(t, kad, i)
}
// see depth is 5
kDepth(t, kad, 5)
depth := 5
outSideDepthPeers := 5
for k := 0; k < depth; k++ {
// further connections should succeed outside of depth
for l := 0; l < outSideDepthPeers; l++ {
addr := test.RandomAddressAt(base, k)
// if error is not as specified, connectOne goes fatal
connectOne(t, signer, kad, ab, addr, nil)
// check that pick works correctly
if !kad.Pick(p2p.Peer{Address: addr}) {
t.Fatal("should pick the peer but didnt")
}
}
}
got := atomic.LoadInt32(&conns)
want := -int32(depth * outSideDepthPeers)
if got != want {
t.Fatalf("got %d, want %d", got, want)
}
}
// TestNotifierHooks tests that the Connected/Disconnected hooks // TestNotifierHooks tests that the Connected/Disconnected hooks
// result in the correct behavior once called. // result in the correct behavior once called.
func TestNotifierHooks(t *testing.T) { func TestNotifierHooks(t *testing.T) {
...@@ -1055,38 +1109,46 @@ func newTestKademlia(connCounter, failedConnCounter *int32, kadOpts kademlia.Opt ...@@ -1055,38 +1109,46 @@ func newTestKademlia(connCounter, failedConnCounter *int32, kadOpts kademlia.Opt
} }
func p2pMock(ab addressbook.Interface, signer beeCrypto.Signer, counter, failedCounter *int32) p2p.Service { func p2pMock(ab addressbook.Interface, signer beeCrypto.Signer, counter, failedCounter *int32) p2p.Service {
p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) { p2ps := p2pmock.New(
if addr.Equal(nonConnectableAddress) { p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
_ = atomic.AddInt32(failedCounter, 1) if addr.Equal(nonConnectableAddress) {
return nil, errors.New("non reachable node") _ = atomic.AddInt32(failedCounter, 1)
} return nil, errors.New("non reachable node")
if counter != nil { }
_ = atomic.AddInt32(counter, 1) if counter != nil {
} _ = atomic.AddInt32(counter, 1)
}
addresses, err := ab.Addresses() addresses, err := ab.Addresses()
if err != nil { if err != nil {
return nil, errors.New("could not fetch addresbook addresses") return nil, errors.New("could not fetch addresbook addresses")
} }
for _, a := range addresses { for _, a := range addresses {
if a.Underlay.Equal(addr) { if a.Underlay.Equal(addr) {
return &a, nil return &a, nil
}
} }
}
address := test.RandomAddress() address := test.RandomAddress()
bzzAddr, err := bzz.NewAddress(signer, addr, address, 0) bzzAddr, err := bzz.NewAddress(signer, addr, address, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := ab.Put(address, *bzzAddr); err != nil { if err := ab.Put(address, *bzzAddr); err != nil {
return nil, err return nil, err
} }
return bzzAddr, nil return bzzAddr, nil
})) }),
p2pmock.WithDisconnectFunc(func(swarm.Address) error {
if counter != nil {
_ = atomic.AddInt32(counter, -1)
}
return nil
}),
)
return p2ps return p2ps
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment