Commit 425891bb authored by Peter Mrekaj's avatar Peter Mrekaj Committed by GitHub

refactor(kademlia): build topology peer connections concurrently (#1647)

Decreases the time for building the topology by doing the connection attempts to peers concurrently.
parent f3cb3f2e
......@@ -6,6 +6,7 @@ package kademlia
import (
"context"
random "crypto/rand"
"encoding/json"
"errors"
"fmt"
......@@ -15,8 +16,6 @@ import (
"sync"
"time"
random "crypto/rand"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/logging"
......@@ -35,15 +34,15 @@ const (
)
var (
errMissingAddressBookEntry = errors.New("addressbook underlay entry not found")
errOverlayMismatch = errors.New("overlay mismatch")
timeToRetry = 60 * time.Second
shortRetry = 30 * time.Second
saturationPeers = 4
overSaturationPeers = 16
broadcastBinSize = 4
saturationPeers = 4
overSaturationPeers = 16
shortRetry = 30 * time.Second
timeToRetry = 2 * shortRetry
broadcastBinSize = 4
)
var errOverlayMismatch = errors.New("overlay mismatch")
type binSaturationFunc func(bin uint8, peers, connected *pslice.PSlice) (saturated bool, oversaturated bool)
type sanctionedPeerFunc func(peer swarm.Address) bool
......@@ -75,7 +74,7 @@ type Kad struct {
depthMu sync.RWMutex // protect depth changes
manageC chan struct{} // trigger the manage forever loop to connect to new peers
waitNext map[string]retryInfo // sanction connections to a peer, key is overlay string and value is a retry information
waitNextMu sync.Mutex // synchronize map
waitNextMu sync.Mutex // guards waitNext map
peerSig []chan struct{}
peerSigMtx sync.Mutex
logger logging.Logger // logger
......@@ -199,271 +198,271 @@ func (k *Kad) generateCommonBinPrefixes() {
// Clears the bit at pos in n.
func clearBit(n, pos uint8) uint8 {
mask := ^(uint8(1) << pos)
n &= mask
return n
return n & mask
}
// Sets the bit at pos in the integer n.
func setBit(n, pos uint8) uint8 {
n |= (1 << pos)
return n
return n | 1<<pos
}
func hasBit(n, pos uint8) bool {
val := n & (1 << pos)
return (val > 0)
return n&(1<<pos) > 0
}
// manage is a forever loop that manages the connection to new peers
// once they get added or once others leave.
func (k *Kad) manage() {
var (
peerToRemove swarm.Address
start time.Time
spf = func(peer swarm.Address) bool {
k.waitNextMu.Lock()
defer k.waitNextMu.Unlock()
if next, ok := k.waitNext[peer.String()]; ok && time.Now().Before(next.tryAfter) {
return true
// peerConnInfo groups necessary fields needed to create a connection.
type peerConnInfo struct {
po uint8
addr swarm.Address
}
// connectBalanced attempts to connect to the balanced peers first.
func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnInfo) {
skipPeers := func(peer swarm.Address) bool {
k.waitNextMu.Lock()
defer k.waitNextMu.Unlock()
next, ok := k.waitNext[peer.String()]
return ok && time.Now().Before(next.tryAfter)
}
for i := range k.commonBinPrefixes {
for j := range k.commonBinPrefixes[i] {
pseudoAddr := k.commonBinPrefixes[i][j]
closestConnectedPeer, err := closestPeer(k.connectedPeers, pseudoAddr, noopSanctionedPeerFn)
if err != nil {
if errors.Is(err, topology.ErrNotFound) {
break
}
k.logger.Errorf("closest connected peer: %v", err)
continue
}
return false
}
)
defer k.wg.Done()
defer close(k.done)
closestConnectedPO := swarm.ExtendedProximity(closestConnectedPeer.Bytes(), pseudoAddr.Bytes())
if int(closestConnectedPO) >= i+k.bitSuffixLength+1 {
continue
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-k.quit
cancel()
}()
for {
select {
case <-k.quit:
return
case <-time.After(30 * time.Second):
// periodically try to connect to new peers
select {
case k.manageC <- struct{}{}:
default:
// Connect to closest known peer which we haven't tried connecting to recently.
closestKnownPeer, err := closestPeer(k.knownPeers, pseudoAddr, skipPeers)
if err != nil {
if errors.Is(err, topology.ErrNotFound) {
break
}
k.logger.Errorf("closest known peer: %v", err)
continue
}
case <-k.manageC:
start = time.Now()
if k.connectedPeers.Exists(closestKnownPeer) {
continue
}
closestKnownPeerPO := swarm.ExtendedProximity(closestKnownPeer.Bytes(), pseudoAddr.Bytes())
if int(closestKnownPeerPO) < i+k.bitSuffixLength+1 {
continue
}
select {
case <-k.quit:
return
default:
wg.Add(1)
peerConnChan <- &peerConnInfo{
po: swarm.Proximity(k.base.Bytes(), closestKnownPeer.Bytes()),
addr: closestKnownPeer,
}
}
if k.standalone {
continue
break
}
}
}
// connectNeighbours attempts to connect to the neighbours
// which were not considered by the connectBalanced method.
func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerConnInfo) {
// The topology.EachPeerFunc doesn't return an error
// so we ignore the error returned from EachBinRev.
_ = k.knownPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) {
if k.connectedPeers.Exists(addr) {
return false, false, nil
}
k.waitNextMu.Lock()
if next, ok := k.waitNext[addr.String()]; ok && time.Now().Before(next.tryAfter) {
k.waitNextMu.Unlock()
return false, false, nil
}
k.waitNextMu.Unlock()
if saturated, _ := k.saturationFunc(po, k.knownPeers, k.connectedPeers); saturated {
return false, true, nil // Bin is saturated, skip to next bin.
}
select {
case <-k.quit:
return true, false, nil
default:
wg.Add(1)
peerConnChan <- &peerConnInfo{
po: po,
addr: addr,
}
}
// attempt balanced connection first
err := func() error {
// for each bin
for i := range k.commonBinPrefixes {
// and each pseudo address
for j := range k.commonBinPrefixes[i] {
pseudoAddr := k.commonBinPrefixes[i][j]
closestConnectedPeer, err := closestPeer(k.connectedPeers, pseudoAddr, noopSanctionedPeerFn, swarm.ZeroAddress)
if err != nil {
if errors.Is(err, topology.ErrNotFound) {
break
}
k.logger.Errorf("closest connected peer: %v", err)
continue
}
// check proximity
closestConnectedPO := swarm.ExtendedProximity(closestConnectedPeer.Bytes(), pseudoAddr.Bytes())
if int(closestConnectedPO) < i+k.bitSuffixLength+1 {
// connect to closest known peer which we haven't tried connecting
// to recently
closestKnownPeer, err := closestPeer(k.knownPeers, pseudoAddr, spf, swarm.ZeroAddress)
if err != nil {
if errors.Is(err, topology.ErrNotFound) {
break
}
k.logger.Errorf("closest known peer: %v", err)
continue
}
if k.connectedPeers.Exists(closestKnownPeer) {
continue
}
closestKnownPeerPO := swarm.ExtendedProximity(closestKnownPeer.Bytes(), pseudoAddr.Bytes())
if int(closestKnownPeerPO) < i+k.bitSuffixLength+1 {
continue
}
peer := closestKnownPeer
bzzAddr, err := k.addressBook.Get(peer)
if err != nil {
if err == addressbook.ErrNotFound {
k.logger.Debugf("failed to get address book entry for peer: %s", peer.String())
peerToRemove = peer
return errMissingAddressBookEntry
}
// either a peer is not known in the address book, in which case it
// should be removed, or that some severe I/O problem is at hand
return err
}
po := swarm.Proximity(k.base.Bytes(), peer.Bytes())
err = k.connect(ctx, peer, bzzAddr.Underlay, po)
if err != nil {
if errors.Is(err, errOverlayMismatch) {
k.knownPeers.Remove(peer, po)
if err := k.addressBook.Remove(peer); err != nil {
k.logger.Debugf("could not remove peer from addressbook: %s", peer.String())
}
}
k.logger.Debugf("peer not reachable from kademlia %s: %v", bzzAddr.String(), err)
k.logger.Warningf("peer not reachable when attempting to connect")
k.waitNextMu.Lock()
if _, ok := k.waitNext[peer.String()]; !ok {
// don't override existing data in the map
k.waitNext[peer.String()] = retryInfo{tryAfter: time.Now().Add(timeToRetry)}
}
k.waitNextMu.Unlock()
// continue to next
continue
}
k.waitNextMu.Lock()
k.waitNext[peer.String()] = retryInfo{tryAfter: time.Now().Add(shortRetry)}
k.waitNextMu.Unlock()
k.connectedPeers.Add(peer, po)
k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock()
k.logger.Debugf("connected to peer: %s for bin: %d", peer, i)
k.notifyPeerSig()
}
}
}
return nil
}()
k.logger.Tracef("kademlia balanced connector took %s to finish", time.Since(start))
// The bin could be saturated or not, so a decision cannot
// be made before checking the next peer, so we iterate to next.
return false, true, nil
})
}
if err != nil {
if errors.Is(err, errMissingAddressBookEntry) {
po := swarm.Proximity(k.base.Bytes(), peerToRemove.Bytes())
k.knownPeers.Remove(peerToRemove, po)
} else {
k.logger.Errorf("kademlia manage loop iterator: %v", err)
}
// connectionAttemptsHandler handles the connection attempts
// to peers sent by the producers to the peerConnChan.
func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, peerConnChan <-chan *peerConnInfo) {
connect := func(peer *peerConnInfo) {
bzzAddr, err := k.addressBook.Get(peer.addr)
switch {
case errors.Is(err, addressbook.ErrNotFound):
k.logger.Debugf("empty address book entry for peer %q", peer.addr)
po := swarm.Proximity(k.base.Bytes(), peer.addr.Bytes())
k.knownPeers.Remove(peer.addr, po)
return
case err != nil:
k.logger.Debugf("failed to get address book entry for peer %q: %v", peer.addr, err)
return
}
switch err = k.connect(ctx, peer.addr, bzzAddr.Underlay); {
case errors.Is(err, errOverlayMismatch):
k.logger.Debugf("overlay mismatch has occurred to an overlay %q with underlay %q", peer.addr, bzzAddr.Underlay)
k.waitNextMu.Lock()
delete(k.waitNext, peer.addr.String())
k.waitNextMu.Unlock()
k.knownPeers.Remove(peer.addr, peer.po)
if err := k.addressBook.Remove(peer.addr); err != nil {
k.logger.Debugf("could not remove peer %q from addressbook", peer.addr)
}
fallthrough
case err != nil:
k.logger.Debugf("peer not reachable from kademlia %q: %v", bzzAddr, err)
k.logger.Warningf("peer not reachable when attempting to connect")
return
}
err = k.knownPeers.EachBinRev(func(peer swarm.Address, po uint8) (bool, bool, error) {
k.waitNextMu.Lock()
k.waitNext[peer.addr.String()] = retryInfo{tryAfter: time.Now().Add(shortRetry)}
k.waitNextMu.Unlock()
if k.connectedPeers.Exists(peer) {
return false, false, nil
}
k.connectedPeers.Add(peer.addr, peer.po)
k.waitNextMu.Lock()
if next, ok := k.waitNext[peer.String()]; ok && time.Now().Before(next.tryAfter) {
k.waitNextMu.Unlock()
return false, false, nil
}
k.waitNextMu.Unlock()
k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock()
currentDepth := k.NeighborhoodDepth()
if saturated, _ := k.saturationFunc(po, k.knownPeers, k.connectedPeers); saturated {
return false, true, nil // bin is saturated, skip to next bin
}
select {
case k.manageC <- struct{}{}:
default:
}
bzzAddr, err := k.addressBook.Get(peer)
if err != nil {
if err == addressbook.ErrNotFound {
k.logger.Debugf("failed to get address book entry for peer: %s", peer.String())
peerToRemove = peer
return false, false, errMissingAddressBookEntry
}
// either a peer is not known in the address book, in which case it
// should be removed, or that some severe I/O problem is at hand
return false, false, err
}
k.logger.Debugf("connected to peer: %q for bin: %d", peer.addr, peer.po)
k.notifyPeerSig()
}
err = k.connect(ctx, peer, bzzAddr.Underlay, po)
if err != nil {
if errors.Is(err, errOverlayMismatch) {
k.knownPeers.Remove(peer, po)
if err := k.addressBook.Remove(peer); err != nil {
k.logger.Debugf("could not remove peer from addressbook: %s", peer.String())
}
}
k.logger.Debugf("peer not reachable from kademlia %s: %v", bzzAddr.String(), err)
k.logger.Warningf("peer not reachable when attempting to connect")
var (
// The inProgress helps to avoid making a connection
// to a peer who has the connection already in progress.
inProgress = make(map[string]bool)
inProgressMu sync.Mutex
)
for i := 0; i < int(swarm.MaxBins); i++ {
go func() {
for {
select {
case <-k.quit:
return
case peer := <-peerConnChan:
addr := peer.addr.String()
// Check if the peer was penalized.
k.waitNextMu.Lock()
if _, ok := k.waitNext[peer.String()]; !ok {
// don't override existing data in the map
k.waitNext[peer.String()] = retryInfo{tryAfter: time.Now().Add(timeToRetry)}
next, ok := k.waitNext[addr]
if ok && time.Now().Before(next.tryAfter) {
k.waitNextMu.Unlock()
wg.Done()
continue
}
k.waitNextMu.Unlock()
// continue to next
return false, false, nil
inProgressMu.Lock()
if !inProgress[addr] {
inProgress[addr] = true
inProgressMu.Unlock()
connect(peer)
inProgressMu.Lock()
delete(inProgress, addr)
}
inProgressMu.Unlock()
wg.Done()
}
}
}()
}
}
k.waitNextMu.Lock()
k.waitNext[peer.String()] = retryInfo{tryAfter: time.Now().Add(shortRetry)}
k.waitNextMu.Unlock()
k.connectedPeers.Add(peer, po)
k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock()
// manage is a forever loop that manages the connection to new peers
// once they get added or once others leave.
func (k *Kad) manage() {
defer k.wg.Done()
defer close(k.done)
k.logger.Debugf("connected to peer: %s old depth: %d new depth: %d", peer, currentDepth, k.NeighborhoodDepth())
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-k.quit
cancel()
}()
k.notifyPeerSig()
// The wg makes sure that we wait for all the connection attempts,
// spun up by goroutines, to finish before we try the boot-nodes.
var wg sync.WaitGroup
var peerConnChan = make(chan *peerConnInfo)
go k.connectionAttemptsHandler(ctx, &wg, peerConnChan)
select {
case <-k.quit:
return true, false, nil
default:
}
for {
select {
case <-k.quit:
return
case <-time.After(30 * time.Second):
select {
case k.manageC <- struct{}{}:
default:
}
case <-k.manageC:
start := time.Now()
// the bin could be saturated or not, so a decision cannot
// be made before checking the next peer, so we iterate to next
return false, false, nil
})
k.logger.Tracef("kademlia iterator took %s to finish", time.Since(start))
select {
case <-k.quit:
return
default:
}
if err != nil {
if errors.Is(err, errMissingAddressBookEntry) {
po := swarm.Proximity(k.base.Bytes(), peerToRemove.Bytes())
k.knownPeers.Remove(peerToRemove, po)
} else {
k.logger.Errorf("kademlia manage loop iterator: %v", err)
}
if k.standalone {
continue
}
oldDepth := k.NeighborhoodDepth()
k.connectBalanced(&wg, peerConnChan)
k.connectNeighbours(&wg, peerConnChan)
wg.Wait()
k.logger.Tracef(
"kademlia: connector took %s to finish: old depth %d; new depth %d",
time.Since(start),
oldDepth,
k.NeighborhoodDepth(),
)
if k.connectedPeers.Length() == 0 {
k.logger.Debug("kademlia has no connected peers, trying bootnodes")
k.logger.Debug("kademlia: no connected peers, trying bootnodes")
k.connectBootnodes(ctx)
}
}
}
}
......@@ -614,7 +613,7 @@ func recalcDepth(peers *pslice.PSlice, radius uint8) uint8 {
// connect connects to a peer and gossips its address to our connected peers,
// as well as sends the peers we are connected to to the newly connected peer
func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr, po uint8) error {
func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) error {
k.logger.Infof("attempting to connect to peer %s", peer)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
......@@ -821,14 +820,9 @@ func (k *Kad) notifyPeerSig() {
}
}
func closestPeer(peers *pslice.PSlice, addr swarm.Address, spf sanctionedPeerFunc, skipPeers ...swarm.Address) (swarm.Address, error) {
closest := swarm.Address{}
func closestPeer(peers *pslice.PSlice, addr swarm.Address, spf sanctionedPeerFunc) (swarm.Address, error) {
closest := swarm.ZeroAddress
err := peers.EachBinRev(func(peer swarm.Address, po uint8) (bool, bool, error) {
for _, a := range skipPeers {
if a.Equal(peer) {
return false, false, nil
}
}
// check whether peer is sanctioned
if spf(peer) {
return false, false, nil
......@@ -854,12 +848,12 @@ func closestPeer(peers *pslice.PSlice, addr swarm.Address, spf sanctionedPeerFun
return false, false, nil
})
if err != nil {
return swarm.Address{}, err
return swarm.ZeroAddress, err
}
// check if found
if closest.IsZero() {
return swarm.Address{}, topology.ErrNotFound
return swarm.ZeroAddress, topology.ErrNotFound
}
return closest, nil
......@@ -1014,10 +1008,6 @@ func (k *Kad) NeighborhoodDepth() uint8 {
k.depthMu.RLock()
defer k.depthMu.RUnlock()
return k.neighborhoodDepth()
}
func (k *Kad) neighborhoodDepth() uint8 {
return k.depth
}
......@@ -1033,7 +1023,7 @@ func (k *Kad) IsBalanced(bin uint8) bool {
// for each pseudo address
for i := range k.commonBinPrefixes[bin] {
pseudoAddr := k.commonBinPrefixes[bin][i]
closestConnectedPeer, err := closestPeer(k.connectedPeers, pseudoAddr, noopSanctionedPeerFn, swarm.ZeroAddress)
closestConnectedPeer, err := closestPeer(k.connectedPeers, pseudoAddr, noopSanctionedPeerFn)
if err != nil {
return false
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment