Commit bd995d87 authored by Esad Akar's avatar Esad Akar Committed by GitHub

refactor: kademlia quicker bin saturation (#1878)

parent f856cf27
...@@ -6,6 +6,7 @@ package kademlia ...@@ -6,6 +6,7 @@ package kademlia
var ( var (
TimeToRetry = &timeToRetry TimeToRetry = &timeToRetry
QuickSaturationPeers = &quickSaturationPeers
SaturationPeers = &saturationPeers SaturationPeers = &saturationPeers
OverSaturationPeers = &overSaturationPeers OverSaturationPeers = &overSaturationPeers
BootnodeOverSaturationPeers = &bootnodeOverSaturationPeers BootnodeOverSaturationPeers = &bootnodeOverSaturationPeers
......
...@@ -39,8 +39,9 @@ const ( ...@@ -39,8 +39,9 @@ const (
) )
var ( var (
quickSaturationPeers = 4
saturationPeers = 8 saturationPeers = 8
overSaturationPeers = 16 overSaturationPeers = 20
bootnodeOverSaturationPeers = 20 bootnodeOverSaturationPeers = 20
shortRetry = 30 * time.Second shortRetry = 30 * time.Second
timeToRetry = 2 * shortRetry timeToRetry = 2 * shortRetry
...@@ -292,7 +293,15 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI ...@@ -292,7 +293,15 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI
func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerConnInfo) { func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerConnInfo) {
// The topology.EachPeerFunc doesn't return an error // The topology.EachPeerFunc doesn't return an error
// so we ignore the error returned from EachBinRev. // so we ignore the error returned from EachBinRev.
depth := k.NeighborhoodDepth()
_ = k.knownPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) { _ = k.knownPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) {
if po < depth {
return false, true, nil
}
if k.connectedPeers.Exists(addr) { if k.connectedPeers.Exists(addr) {
return false, false, nil return false, false, nil
} }
...@@ -301,10 +310,6 @@ func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerCon ...@@ -301,10 +310,6 @@ func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerCon
return false, false, nil return false, false, nil
} }
if saturated, _ := k.saturationFunc(po, k.knownPeers, k.connectedPeers); saturated {
return false, true, nil // Bin is saturated, skip to next bin.
}
select { select {
case <-k.quit: case <-k.quit:
return true, false, nil return true, false, nil
...@@ -590,12 +595,11 @@ func recalcDepth(peers *pslice.PSlice, radius uint8) uint8 { ...@@ -590,12 +595,11 @@ func recalcDepth(peers *pslice.PSlice, radius uint8) uint8 {
binCount++ binCount++
return false, false, nil return false, false, nil
} }
if bin > shallowestUnsaturated && binCount < saturationPeers { if bin > shallowestUnsaturated && binCount < quickSaturationPeers {
// this means we have less than saturationPeers in the previous bin // this means we have less than quickSaturation in the previous bin
// therefore we can return assuming that bin is the unsaturated one. // therefore we can return assuming that bin is the unsaturated one.
return true, false, nil return true, false, nil
} }
// bin > shallowestUnsaturated && binCount >= saturationPeers
shallowestUnsaturated = bin shallowestUnsaturated = bin
binCount = 1 binCount = 1
......
...@@ -268,44 +268,39 @@ func TestEachNeighbor(t *testing.T) { ...@@ -268,44 +268,39 @@ func TestEachNeighbor(t *testing.T) {
func TestManage(t *testing.T) { func TestManage(t *testing.T) {
var ( var (
conns int32 // how many connect calls were made to the p2p mock conns int32 // how many connect calls were made to the p2p mock
saturation = *kademlia.QuickSaturationPeers
saturationVal = false base, kad, ab, _, signer = newTestKademlia(t, &conns, nil, kademlia.Options{BitSuffixLength: -1})
overSaturationVal = false
saturationFunc = func(bin uint8, peers, connected *pslice.PSlice) (bool, bool) {
return saturationVal, overSaturationVal
}
base, kad, ab, _, signer = newTestKademlia(t, &conns, nil, kademlia.Options{BitSuffixLength: -1, SaturationFunc: saturationFunc})
) )
if err := kad.Start(context.Background()); err != nil { if err := kad.Start(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer kad.Close() defer kad.Close()
// first, saturationFunc returns always false, this means that the bin is not saturated,
// hence we expect that every peer we add to kademlia will be connected to kad.SetRadius(6)
for i := 0; i < 50; i++ {
// first, we add peers to bin 0
for i := 0; i < saturation; i++ {
addr := test.RandomAddressAt(base, 0) addr := test.RandomAddressAt(base, 0)
addOne(t, signer, kad, ab, addr) addOne(t, signer, kad, ab, addr)
} }
waitCounter(t, &conns, 50) waitCounter(t, &conns, 4)
saturationVal = true
// now since the bin is "saturated", no new connections should be made // next, we add peers to the next bin
for i := 0; i < 50; i++ { for i := 0; i < saturation; i++ {
addr := test.RandomAddressAt(base, 0) addr := test.RandomAddressAt(base, 1)
addOne(t, signer, kad, ab, addr) addOne(t, signer, kad, ab, addr)
} }
waitCounter(t, &conns, 0) waitCounter(t, &conns, 4)
// check other bins just for fun // here, we attempt to add to bin 0, but bin is saturated, so no new peers should connect to it
for i := 0; i < 16; i++ { for i := 0; i < saturation; i++ {
for j := 0; j < 10; j++ { addr := test.RandomAddressAt(base, 0)
addr := test.RandomAddressAt(base, i)
addOne(t, signer, kad, ab, addr) addOne(t, signer, kad, ab, addr)
} }
}
waitCounter(t, &conns, 0) waitCounter(t, &conns, 0)
} }
...@@ -382,14 +377,13 @@ func TestManageWithBalancing(t *testing.T) { ...@@ -382,14 +377,13 @@ func TestManageWithBalancing(t *testing.T) {
// in shallower depth for the rest of the function to be executed // in shallower depth for the rest of the function to be executed
func TestBinSaturation(t *testing.T) { func TestBinSaturation(t *testing.T) {
defer func(p int) { defer func(p int) {
*kademlia.SaturationPeers = p *kademlia.QuickSaturationPeers = p
}(*kademlia.SaturationPeers) }(*kademlia.QuickSaturationPeers)
*kademlia.SaturationPeers = 2 *kademlia.QuickSaturationPeers = 2
var ( var (
conns int32 // how many connect calls were made to the p2p mock conns int32 // how many connect calls were made to the p2p mock
base, kad, ab, _, signer = newTestKademlia(t, &conns, nil, kademlia.Options{BitSuffixLength: -1}) base, kad, ab, _, signer = newTestKademlia(t, &conns, nil, kademlia.Options{BitSuffixLength: -1})
peers []swarm.Address
) )
if err := kad.Start(context.Background()); err != nil { if err := kad.Start(context.Background()); err != nil {
...@@ -397,6 +391,8 @@ func TestBinSaturation(t *testing.T) { ...@@ -397,6 +391,8 @@ func TestBinSaturation(t *testing.T) {
} }
defer kad.Close() defer kad.Close()
kad.SetRadius(6)
// add two peers in a few bins to generate some depth >= 0, this will // add two peers in a few bins to generate some depth >= 0, this will
// make the next iteration result in binSaturated==true, causing no new // make the next iteration result in binSaturated==true, causing no new
// connections to be made // connections to be made
...@@ -404,7 +400,6 @@ func TestBinSaturation(t *testing.T) { ...@@ -404,7 +400,6 @@ func TestBinSaturation(t *testing.T) {
for j := 0; j < 2; j++ { for j := 0; j < 2; j++ {
addr := test.RandomAddressAt(base, i) addr := test.RandomAddressAt(base, i)
addOne(t, signer, kad, ab, addr) addOne(t, signer, kad, ab, addr)
peers = append(peers, addr)
} }
} }
waitCounter(t, &conns, 10) waitCounter(t, &conns, 10)
...@@ -430,9 +425,6 @@ func TestBinSaturation(t *testing.T) { ...@@ -430,9 +425,6 @@ func TestBinSaturation(t *testing.T) {
waitCounter(t, &conns, 1) waitCounter(t, &conns, 1)
// this is in order to hit the `if size < 2` in the saturation func
removeOne(kad, peers[2])
waitCounter(t, &conns, 1)
} }
func TestOversaturation(t *testing.T) { func TestOversaturation(t *testing.T) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment