Commit 098d3aee authored by acud's avatar acud Committed by GitHub

kademlia: saturate based on known peers (#374)

* saturate based on connected and known peers
parent 5ac5a02f
...@@ -4,5 +4,8 @@ ...@@ -4,5 +4,8 @@
package kademlia package kademlia
var MaxBins = maxBins var (
var TimeToRetry = &timeToRetry MaxBins = maxBins
TimeToRetry = &timeToRetry
SaturationPeers = &saturationPeers
)
...@@ -32,9 +32,10 @@ var ( ...@@ -32,9 +32,10 @@ var (
errOverlayMismatch = errors.New("overlay mismatch") errOverlayMismatch = errors.New("overlay mismatch")
timeToRetry = 60 * time.Second timeToRetry = 60 * time.Second
shortRetry = 30 * time.Second shortRetry = 30 * time.Second
saturationPeers = 4
) )
type binSaturationFunc func(bin, depth uint8, peers *pslice.PSlice) bool type binSaturationFunc func(bin uint8, peers, connected *pslice.PSlice) bool
// Options for injecting services to Kademlia. // Options for injecting services to Kademlia.
type Options struct { type Options struct {
...@@ -127,7 +128,11 @@ func (k *Kad) manage() { ...@@ -127,7 +128,11 @@ func (k *Kad) manage() {
} }
case <-k.manageC: case <-k.manageC:
start = time.Now() start = time.Now()
select {
case <-k.quit:
return
default:
}
err := k.knownPeers.EachBinRev(func(peer swarm.Address, po uint8) (bool, bool, error) { err := k.knownPeers.EachBinRev(func(peer swarm.Address, po uint8) (bool, bool, error) {
if k.connectedPeers.Exists(peer) { if k.connectedPeers.Exists(peer) {
return false, false, nil return false, false, nil
...@@ -141,7 +146,7 @@ func (k *Kad) manage() { ...@@ -141,7 +146,7 @@ func (k *Kad) manage() {
k.waitNextMu.Unlock() k.waitNextMu.Unlock()
currentDepth := k.NeighborhoodDepth() currentDepth := k.NeighborhoodDepth()
if saturated := k.saturationFunc(po, currentDepth, k.connectedPeers); saturated { if saturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers); saturated {
return false, true, nil // bin is saturated, skip to next bin return false, true, nil // bin is saturated, skip to next bin
} }
...@@ -180,7 +185,7 @@ func (k *Kad) manage() { ...@@ -180,7 +185,7 @@ func (k *Kad) manage() {
k.connectedPeers.Add(peer, po) k.connectedPeers.Add(peer, po)
k.depthMu.Lock() k.depthMu.Lock()
k.depth = k.recalcDepth() k.depth = recalcDepth(k.connectedPeers)
k.depthMu.Unlock() k.depthMu.Unlock()
k.logger.Debugf("connected to peer: %s old depth: %d new depth: %d", peer, currentDepth, k.NeighborhoodDepth()) k.logger.Debugf("connected to peer: %s old depth: %d new depth: %d", peer, currentDepth, k.NeighborhoodDepth())
...@@ -214,9 +219,11 @@ func (k *Kad) manage() { ...@@ -214,9 +219,11 @@ func (k *Kad) manage() {
// binSaturated indicates whether a certain bin is saturated or not. // binSaturated indicates whether a certain bin is saturated or not.
// when a bin is not saturated it means we would like to proactively // when a bin is not saturated it means we would like to proactively
// initiate connections to other peers in the bin. // initiate connections to other peers in the bin.
func binSaturated(bin, depth uint8, peers *pslice.PSlice) bool { func binSaturated(bin uint8, peers, connected *pslice.PSlice) bool {
potentialDepth := recalcDepth(peers)
// short circuit for bins which are >= depth // short circuit for bins which are >= depth
if bin >= depth { if bin >= potentialDepth {
return false return false
} }
...@@ -228,31 +235,31 @@ func binSaturated(bin, depth uint8, peers *pslice.PSlice) bool { ...@@ -228,31 +235,31 @@ func binSaturated(bin, depth uint8, peers *pslice.PSlice) bool {
// gaps measurement) // gaps measurement)
size := 0 size := 0
_ = peers.EachBin(func(_ swarm.Address, po uint8) (bool, bool, error) { _ = connected.EachBin(func(_ swarm.Address, po uint8) (bool, bool, error) {
if po == bin { if po == bin {
size++ size++
} }
return false, false, nil return false, false, nil
}) })
return size >= 2 return size >= saturationPeers
} }
// recalcDepth calculates and returns the kademlia depth. // recalcDepth calculates and returns the kademlia depth.
func (k *Kad) recalcDepth() uint8 { func recalcDepth(peers *pslice.PSlice) uint8 {
// handle edge case separately // handle edge case separately
if k.connectedPeers.Length() <= nnLowWatermark { if peers.Length() <= nnLowWatermark {
return 0 return 0
} }
var ( var (
peers = uint(0) peersCtr = uint(0)
candidate = uint8(0) candidate = uint8(0)
shallowestEmpty, noEmptyBins = k.connectedPeers.ShallowestEmpty() shallowestEmpty, noEmptyBins = peers.ShallowestEmpty()
) )
_ = k.connectedPeers.EachBin(func(_ swarm.Address, po uint8) (bool, bool, error) { _ = peers.EachBin(func(_ swarm.Address, po uint8) (bool, bool, error) {
peers++ peersCtr++
if peers >= nnLowWatermark { if peersCtr >= nnLowWatermark {
candidate = po candidate = po
return true, false, nil return true, false, nil
} }
...@@ -390,7 +397,7 @@ func (k *Kad) Connected(ctx context.Context, addr swarm.Address) error { ...@@ -390,7 +397,7 @@ func (k *Kad) Connected(ctx context.Context, addr swarm.Address) error {
k.waitNextMu.Unlock() k.waitNextMu.Unlock()
k.depthMu.Lock() k.depthMu.Lock()
k.depth = k.recalcDepth() k.depth = recalcDepth(k.connectedPeers)
k.depthMu.Unlock() k.depthMu.Unlock()
k.notifyPeerSig() k.notifyPeerSig()
...@@ -413,7 +420,7 @@ func (k *Kad) Disconnected(addr swarm.Address) { ...@@ -413,7 +420,7 @@ func (k *Kad) Disconnected(addr swarm.Address) {
k.waitNextMu.Unlock() k.waitNextMu.Unlock()
k.depthMu.Lock() k.depthMu.Lock()
k.depth = k.recalcDepth() k.depth = recalcDepth(k.connectedPeers)
k.depthMu.Unlock() k.depthMu.Unlock()
select { select {
case k.manageC <- struct{}{}: case k.manageC <- struct{}{}:
......
...@@ -160,11 +160,12 @@ func TestManage(t *testing.T) { ...@@ -160,11 +160,12 @@ func TestManage(t *testing.T) {
conns int32 // how many connect calls were made to the p2p mock conns int32 // how many connect calls were made to the p2p mock
saturationVal = false saturationVal = false
saturationFunc = func(bin, depth uint8, peers *pslice.PSlice) bool { saturationFunc = func(bin uint8, peers, connected *pslice.PSlice) bool {
return saturationVal return saturationVal
} }
base, kad, ab, _, signer = newTestKademlia(&conns, nil, saturationFunc) base, kad, ab, _, signer = newTestKademlia(&conns, nil, saturationFunc)
) )
defer kad.Close()
// first, saturationFunc returns always false, this means that the bin is not saturated, // first, saturationFunc returns always false, this means that the bin is not saturated,
// hence we expect that every peer we add to kademlia will be connected to // hence we expect that every peer we add to kademlia will be connected to
for i := 0; i < 50; i++ { for i := 0; i < 50; i++ {
...@@ -200,12 +201,19 @@ func TestManage(t *testing.T) { ...@@ -200,12 +201,19 @@ func TestManage(t *testing.T) {
// be true since depth is increasingly moving deeper, but then we add more peers // be true since depth is increasingly moving deeper, but then we add more peers
// in shallower depth for the rest of the function to be executed // in shallower depth for the rest of the function to be executed
func TestBinSaturation(t *testing.T) { func TestBinSaturation(t *testing.T) {
defer func(p int) {
*kademlia.SaturationPeers = p
}(*kademlia.SaturationPeers)
*kademlia.SaturationPeers = 2
var ( var (
conns int32 // how many connect calls were made to the p2p mock conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(&conns, nil, nil) base, kad, ab, _, signer = newTestKademlia(&conns, nil, nil)
peers []swarm.Address peers []swarm.Address
) )
defer kad.Close()
// add two peers in a few bins to generate some depth >= 0, this will // add two peers in a few bins to generate some depth >= 0, this will
// make the next iteration result in binSaturated==true, causing no new // make the next iteration result in binSaturated==true, causing no new
// connections to be made // connections to be made
...@@ -610,7 +618,7 @@ func TestMarshal(t *testing.T) { ...@@ -610,7 +618,7 @@ func TestMarshal(t *testing.T) {
} }
} }
func newTestKademlia(connCounter, failedConnCounter *int32, f func(bin, depth uint8, peers *pslice.PSlice) bool) (swarm.Address, *kademlia.Kad, addressbook.Interface, *mock.Discovery, beeCrypto.Signer) { func newTestKademlia(connCounter, failedConnCounter *int32, f func(bin uint8, peers, connected *pslice.PSlice) bool) (swarm.Address, *kademlia.Kad, addressbook.Interface, *mock.Discovery, beeCrypto.Signer) {
var ( var (
base = test.RandomAddress() // base address base = test.RandomAddress() // base address
ab = addressbook.New(mockstate.NewStateStore()) // address book ab = addressbook.New(mockstate.NewStateStore()) // address book
......
...@@ -81,7 +81,10 @@ func (r *peerRegistry) Disconnected(_ network.Network, c network.Conn) { ...@@ -81,7 +81,10 @@ func (r *peerRegistry) Disconnected(_ network.Network, c network.Conn) {
func (r *peerRegistry) addStream(peerID libp2ppeer.ID, stream network.Stream, cancel context.CancelFunc) { func (r *peerRegistry) addStream(peerID libp2ppeer.ID, stream network.Stream, cancel context.CancelFunc) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
if _, ok := r.streams[peerID]; !ok {
// it is possible that an addStream will be called after a disconnect
return
}
r.streams[peerID][stream] = cancel r.streams[peerID][stream] = cancel
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment