Commit dc9a129a authored by Peter Mrekaj's avatar Peter Mrekaj Committed by GitHub

refactor(kademlia): add manage loop notification as a separate method (#1796)

parent d87c48f8
...@@ -340,12 +340,12 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, ...@@ -340,12 +340,12 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
bzzAddr, err := k.addressBook.Get(peer.addr) bzzAddr, err := k.addressBook.Get(peer.addr)
switch { switch {
case errors.Is(err, addressbook.ErrNotFound): case errors.Is(err, addressbook.ErrNotFound):
k.logger.Debugf("empty address book entry for peer %q", peer.addr) k.logger.Debugf("kademlia: empty address book entry for peer %q", peer.addr)
po := swarm.Proximity(k.base.Bytes(), peer.addr.Bytes()) po := swarm.Proximity(k.base.Bytes(), peer.addr.Bytes())
k.knownPeers.Remove(peer.addr, po) k.knownPeers.Remove(peer.addr, po)
return return
case err != nil: case err != nil:
k.logger.Debugf("failed to get address book entry for peer %q: %v", peer.addr, err) k.logger.Debugf("kademlia: failed to get address book entry for peer %q: %v", peer.addr, err)
return return
} }
...@@ -355,21 +355,21 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, ...@@ -355,21 +355,21 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
k.waitNextMu.Unlock() k.waitNextMu.Unlock()
k.knownPeers.Remove(peer.addr, peer.po) k.knownPeers.Remove(peer.addr, peer.po)
if err := k.addressBook.Remove(peer.addr); err != nil { if err := k.addressBook.Remove(peer.addr); err != nil {
k.logger.Debugf("could not remove peer %q from addressbook", peer.addr) k.logger.Debugf("kademlia: could not remove peer %q from addressbook", peer.addr)
} }
} }
switch err = k.connect(ctx, peer.addr, bzzAddr.Underlay); { switch err = k.connect(ctx, peer.addr, bzzAddr.Underlay); {
case errors.Is(err, errPruneEntry): case errors.Is(err, errPruneEntry):
k.logger.Debugf("dial to light node with overlay %q and underlay %q", peer.addr, bzzAddr.Underlay) k.logger.Debugf("kademlia: dial to light node with overlay %q and underlay %q", peer.addr, bzzAddr.Underlay)
remove(peer) remove(peer)
return return
case errors.Is(err, errOverlayMismatch): case errors.Is(err, errOverlayMismatch):
k.logger.Debugf("overlay mismatch has occurred to an overlay %q with underlay %q", peer.addr, bzzAddr.Underlay) k.logger.Debugf("kademlia: overlay mismatch has occurred to an overlay %q with underlay %q", peer.addr, bzzAddr.Underlay)
remove(peer) remove(peer)
return return
case err != nil: case err != nil:
k.logger.Debugf("peer not reachable from kademlia %q: %v", bzzAddr, err) k.logger.Debugf("kademlia: peer not reachable from kademlia %q: %v", bzzAddr, err)
k.logger.Warningf("peer not reachable when attempting to connect") k.logger.Warningf("peer not reachable when attempting to connect")
return return
} }
...@@ -391,12 +391,8 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, ...@@ -391,12 +391,8 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
k.depth = recalcDepth(k.connectedPeers, k.radius) k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock() k.depthMu.Unlock()
select { k.logger.Debugf("kademlia: connected to peer: %q in bin: %d", peer.addr, peer.po)
case k.manageC <- struct{}{}: k.notifyManageLoop()
default:
}
k.logger.Debugf("connected to peer: %q for bin: %d", peer.addr, peer.po)
k.notifyPeerSig() k.notifyPeerSig()
} }
...@@ -441,6 +437,14 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, ...@@ -441,6 +437,14 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
} }
} }
// notifyManageLoop notifies kademlia manage loop.
func (k *Kad) notifyManageLoop() {
select {
case k.manageC <- struct{}{}:
default:
}
}
// manage is a forever loop that manages the connection to new peers // manage is a forever loop that manages the connection to new peers
// once they get added or once others leave. // once they get added or once others leave.
func (k *Kad) manage() { func (k *Kad) manage() {
...@@ -464,10 +468,7 @@ func (k *Kad) manage() { ...@@ -464,10 +468,7 @@ func (k *Kad) manage() {
case <-k.quit: case <-k.quit:
return return
case <-time.After(30 * time.Second): case <-time.After(30 * time.Second):
select { k.notifyManageLoop()
case k.manageC <- struct{}{}:
default:
}
case <-k.manageC: case <-k.manageC:
start := time.Now() start := time.Now()
...@@ -785,12 +786,7 @@ func (k *Kad) AddPeers(ctx context.Context, addrs ...swarm.Address) error { ...@@ -785,12 +786,7 @@ func (k *Kad) AddPeers(ctx context.Context, addrs ...swarm.Address) error {
po := swarm.Proximity(k.base.Bytes(), addr.Bytes()) po := swarm.Proximity(k.base.Bytes(), addr.Bytes())
k.knownPeers.Add(addr, po) k.knownPeers.Add(addr, po)
} }
k.notifyManageLoop()
select {
case k.manageC <- struct{}{}:
default:
}
return nil return nil
} }
...@@ -830,12 +826,7 @@ connected: ...@@ -830,12 +826,7 @@ connected:
if err := k.connected(ctx, address); err != nil { if err := k.connected(ctx, address); err != nil {
return err return err
} }
k.notifyManageLoop()
select {
case k.manageC <- struct{}{}:
default:
}
return nil return nil
} }
...@@ -892,10 +883,7 @@ func (k *Kad) Disconnected(peer p2p.Peer) { ...@@ -892,10 +883,7 @@ func (k *Kad) Disconnected(peer p2p.Peer) {
k.depth = recalcDepth(k.connectedPeers, k.radius) k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock() k.depthMu.Unlock()
select { k.notifyManageLoop()
case k.manageC <- struct{}{}:
default:
}
k.notifyPeerSig() k.notifyPeerSig()
} }
...@@ -1141,10 +1129,7 @@ func (k *Kad) SetRadius(r uint8) { ...@@ -1141,10 +1129,7 @@ func (k *Kad) SetRadius(r uint8) {
oldD := k.depth oldD := k.depth
k.depth = recalcDepth(k.connectedPeers, k.radius) k.depth = recalcDepth(k.connectedPeers, k.radius)
if k.depth != oldD { if k.depth != oldD {
select { k.notifyManageLoop()
case k.manageC <- struct{}{}:
default:
}
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment