Commit 388256b1 authored by Peter Mrekaj's avatar Peter Mrekaj Committed by GitHub

refactor(kademlia): speeding up topology build (#2028)

Speeds up the topology build by dividing the connect neighbor
method into two separate phases executed in sequential order.
The first phase makes a number of concurrent connections not
bigger than what is considered for the bin to be saturated. The
second phase takes care of the rest of the connection in a
regular manner.
parent 7383d3dc
......@@ -9,5 +9,5 @@ var (
QuickSaturationPeers = &quickSaturationPeers
SaturationPeers = &saturationPeers
OverSaturationPeers = &overSaturationPeers
BootnodeOverSaturationPeers = &bootnodeOverSaturationPeers
BootnodeOverSaturationPeers = &bootNodeOverSaturationPeers
)
......@@ -31,8 +31,8 @@ import (
const (
nnLowWatermark = 2 // the number of peers in consecutive deepest bins that constitute as nearest neighbours
maxConnAttempts = 3 // when there is maxConnAttempts failed connect calls for a given peer it is considered non-connectable
maxBootnodeAttempts = 3 // how many attempts to dial to bootnodes before giving up
maxConnAttempts = 1 // when there is maxConnAttempts failed connect calls for a given peer it is considered non-connectable
maxBootNodeAttempts = 3 // how many attempts to dial to boot-nodes before giving up
defaultBitSuffixLength = 3 // the number of bits used to create pseudo addresses for balancing
peerConnectionAttemptTimeout = 5 * time.Second // Timeout for establishing a new connection with peer.
......@@ -42,7 +42,7 @@ var (
quickSaturationPeers = 4
saturationPeers = 8
overSaturationPeers = 20
bootnodeOverSaturationPeers = 20
bootNodeOverSaturationPeers = 20
shortRetry = 30 * time.Second
timeToRetry = 2 * shortRetry
broadcastBinSize = 4
......@@ -111,7 +111,7 @@ func New(
if o.SaturationFunc == nil {
os := overSaturationPeers
if o.BootnodeMode {
os = bootnodeOverSaturationPeers
os = bootNodeOverSaturationPeers
}
o.SaturationFunc = binSaturated(os)
}
......@@ -245,6 +245,9 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI
}
for i := range k.commonBinPrefixes {
if i >= int(k.NeighborhoodDepth()) {
continue
}
for j := range k.commonBinPrefixes[i] {
pseudoAddr := k.commonBinPrefixes[i][j]
......@@ -298,15 +301,18 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI
// connectNeighbours attempts to connect to the neighbours
// which were not considered by the connectBalanced method.
func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerConnInfo) {
// The topology.EachPeerFunc doesn't return an error
// so we ignore the error returned from EachBinRev.
func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan, peerConnChan2 chan<- *peerConnInfo) {
const multiplePeerThreshold = 8
sent := 0
_ = k.knownPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) {
depth := k.NeighborhoodDepth()
if po < depth {
if depth > po || po >= depth+multiplePeerThreshold {
return false, true, nil
}
if len(k.connectedPeers.BinPeers(po)) >= overSaturationPeers-1 {
return false, true, nil
}
......@@ -328,6 +334,43 @@ func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerCon
po: po,
addr: addr,
}
sent++
}
// We want to sent number of attempts equal to saturationPeers
// in order to speed up the topology build.
next := sent == saturationPeers
if next {
sent = 0
}
return false, next, nil
})
_ = k.knownPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) {
depth := k.NeighborhoodDepth()
if po < depth+multiplePeerThreshold {
return false, true, nil
}
if k.connectedPeers.Exists(addr) {
return false, false, nil
}
if k.waitNext.Waiting(addr) {
k.metrics.TotalBeforeExpireWaits.Inc()
return false, false, nil
}
select {
case <-k.quit:
return true, false, nil
default:
wg.Add(1)
peerConnChan2 <- &peerConnInfo{
po: po,
addr: addr,
}
}
// The bin could be saturated or not, so a decision cannot
......@@ -338,7 +381,7 @@ func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerCon
// connectionAttemptsHandler handles the connection attempts
// to peers sent by the producers to the peerConnChan.
func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, peerConnChan <-chan *peerConnInfo) {
func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, peerConnChan, peerConnChan2 <-chan *peerConnInfo) {
connect := func(peer *peerConnInfo) {
bzzAddr, err := k.addressBook.Get(peer.addr)
switch {
......@@ -396,34 +439,38 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
inProgress = make(map[string]bool)
inProgressMu sync.Mutex
)
for i := 0; i < int(swarm.MaxBins); i++ {
go func() {
for {
select {
case <-k.quit:
return
case peer := <-peerConnChan:
addr := peer.addr.String()
connAttempt := func(peerConnChan <-chan *peerConnInfo) {
for {
select {
case <-k.quit:
return
case peer := <-peerConnChan:
addr := peer.addr.String()
if k.waitNext.Waiting(peer.addr) {
k.metrics.TotalBeforeExpireWaits.Inc()
wg.Done()
continue
}
if k.waitNext.Waiting(peer.addr) {
k.metrics.TotalBeforeExpireWaits.Inc()
wg.Done()
continue
}
inProgressMu.Lock()
if !inProgress[addr] {
inProgress[addr] = true
inProgressMu.Unlock()
connect(peer)
inProgressMu.Lock()
delete(inProgress, addr)
}
inProgressMu.Lock()
if !inProgress[addr] {
inProgress[addr] = true
inProgressMu.Unlock()
wg.Done()
connect(peer)
inProgressMu.Lock()
delete(inProgress, addr)
}
inProgressMu.Unlock()
wg.Done()
}
}()
}
}
for i := 0; i < 64; i++ {
go connAttempt(peerConnChan)
}
for i := 0; i < 8; i++ {
go connAttempt(peerConnChan2)
}
}
......@@ -452,7 +499,8 @@ func (k *Kad) manage() {
// spun up by goroutines, to finish before we try the boot-nodes.
var wg sync.WaitGroup
var peerConnChan = make(chan *peerConnInfo)
go k.connectionAttemptsHandler(ctx, &wg, peerConnChan)
var peerConnChan2 = make(chan *peerConnInfo)
go k.connectionAttemptsHandler(ctx, &wg, peerConnChan, peerConnChan2)
for {
select {
......@@ -484,8 +532,8 @@ func (k *Kad) manage() {
}
oldDepth := k.NeighborhoodDepth()
k.connectBalanced(&wg, peerConnChan)
k.connectNeighbours(&wg, peerConnChan)
k.connectNeighbours(&wg, peerConnChan, peerConnChan2)
k.connectBalanced(&wg, peerConnChan2)
wg.Wait()
k.depthMu.Lock()
......@@ -512,13 +560,13 @@ func (k *Kad) manage() {
default:
}
k.logger.Debug("kademlia: no connected peers, trying bootnodes")
k.connectBootnodes(ctx)
k.connectBootNodes(ctx)
}
}
}
}
func (k *Kad) Start(ctx context.Context) error {
func (k *Kad) Start(_ context.Context) error {
k.wg.Add(1)
go k.manage()
......@@ -531,9 +579,9 @@ func (k *Kad) Start(ctx context.Context) error {
return nil
}
func (k *Kad) connectBootnodes(ctx context.Context) {
func (k *Kad) connectBootNodes(ctx context.Context) {
var attempts, connected int
var totalAttempts = maxBootnodeAttempts * len(k.bootnodes)
var totalAttempts = maxBootNodeAttempts * len(k.bootnodes)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
......@@ -545,7 +593,7 @@ func (k *Kad) connectBootnodes(ctx context.Context) {
if _, err := p2p.Discover(ctx, addr, func(addr ma.Multiaddr) (stop bool, err error) {
k.logger.Tracef("connecting to bootnode %s", addr)
if attempts >= maxBootnodeAttempts {
if attempts >= maxBootNodeAttempts {
return true, nil
}
bzzAddress, err := k.p2p.Connect(ctx, addr)
......@@ -710,8 +758,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
k.collector.Inspect(peer, func(ss *im.Snapshot) {
quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt()
if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts {
if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts >= maxConnAttempts {
k.waitNext.Remove(peer)
k.knownPeers.Remove(peer)
if err := k.addressBook.Remove(peer); err != nil {
......@@ -736,7 +783,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
// Announce a newly connected peer to our connected peers, but also
// notify the peer about our already connected peers
func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) error {
addrs := []swarm.Address{}
var addrs []swarm.Address
for bin := uint8(0); bin < swarm.MaxBins; bin++ {
......@@ -805,30 +852,22 @@ func (k *Kad) Pick(peer p2p.Peer) bool {
// Connected is called when a peer has dialed in.
func (k *Kad) Connected(ctx context.Context, peer p2p.Peer) error {
address := peer.Address
po := swarm.Proximity(k.base.Bytes(), address.Bytes())
if _, overSaturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers); overSaturated {
if k.bootnode {
randPeer, err := k.randomPeer(po)
if err != nil {
return err
}
_ = k.p2p.Disconnect(randPeer)
goto connected
return k.connected(ctx, address)
}
return topology.ErrOversaturated
}
connected:
if err := k.connected(ctx, address); err != nil {
return err
}
k.notifyManageLoop()
return nil
return k.connected(ctx, address)
}
func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
......@@ -848,6 +887,7 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock()
k.notifyManageLoop()
k.notifyPeerSig()
return nil
......@@ -1009,7 +1049,7 @@ func (k *Kad) IsWithinDepth(addr swarm.Address) bool {
return swarm.Proximity(k.base.Bytes(), addr.Bytes()) >= k.NeighborhoodDepth()
}
// // EachNeighbor iterates from closest bin to farthest of the neighborhood peers.
// EachNeighbor iterates from closest bin to farthest of the neighborhood peers.
func (k *Kad) EachNeighbor(f topology.EachPeerFunc) error {
depth := k.NeighborhoodDepth()
fn := func(a swarm.Address, po uint8) (bool, bool, error) {
......
......@@ -174,7 +174,7 @@ func TestNeighborhoodDepth(t *testing.T) {
kDepth(t, kad, i)
}
// add a whole bunch of peers in bin 15, expect depth to stay at 15
// add a whole bunch of peers in the last bin, expect depth to stay at 31
for i := 0; i < 15; i++ {
addr = test.RandomAddressAt(base, int(swarm.MaxPO))
addOne(t, signer, kad, ab, addr)
......@@ -746,13 +746,10 @@ func TestAddressBookPrune(t *testing.T) {
waitCounter(t, &conns, 0)
waitCounter(t, &failedConns, 1)
p, err := ab.Get(nonConnPeer.Overlay)
if err != nil {
_, err = ab.Get(nonConnPeer.Overlay)
if err != addressbook.ErrNotFound {
t.Fatal(err)
}
if !nonConnPeer.Equal(p) {
t.Fatalf("expected %+v, got %+v", nonConnPeer, p)
}
addr := test.RandomAddressAt(base, 1)
addr1 := test.RandomAddressAt(base, 1)
......@@ -761,37 +758,29 @@ func TestAddressBookPrune(t *testing.T) {
// add one valid peer to initiate the retry, check connection and failed connection counters
addOne(t, signer, kad, ab, addr)
waitCounter(t, &conns, 1)
waitCounter(t, &failedConns, 1)
waitCounter(t, &failedConns, 0)
p, err = ab.Get(nonConnPeer.Overlay)
if err != nil {
_, err = ab.Get(nonConnPeer.Overlay)
if err != addressbook.ErrNotFound {
t.Fatal(err)
}
if !nonConnPeer.Equal(p) {
t.Fatalf("expected %+v, got %+v", nonConnPeer, p)
}
time.Sleep(50 * time.Millisecond)
// add one valid peer to initiate the retry, check connection and failed connection counters
addOne(t, signer, kad, ab, addr1)
waitCounter(t, &conns, 1)
waitCounter(t, &failedConns, 1)
waitCounter(t, &failedConns, 0)
p, err = ab.Get(nonConnPeer.Overlay)
if err != nil {
_, err = ab.Get(nonConnPeer.Overlay)
if err != addressbook.ErrNotFound {
t.Fatal(err)
}
if !nonConnPeer.Equal(p) {
t.Fatalf("expected %+v, got %+v", nonConnPeer, p)
}
time.Sleep(50 * time.Millisecond)
// add one valid peer to initiate the retry, check connection and failed connection counters
addOne(t, signer, kad, ab, addr2)
waitCounter(t, &conns, 1)
waitCounter(t, &failedConns, 1)
waitCounter(t, &failedConns, 0)
_, err = ab.Get(nonConnPeer.Overlay)
if err != addressbook.ErrNotFound {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment