Commit dd605e23 authored by Janoš Guljaš's avatar Janoš Guljaš Committed by GitHub

perf(kademlia): optimize wait next map keys (#1817)

parent 48f0a205
...@@ -239,7 +239,7 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI ...@@ -239,7 +239,7 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI
skipPeers := func(peer swarm.Address) bool { skipPeers := func(peer swarm.Address) bool {
k.waitNextMu.Lock() k.waitNextMu.Lock()
defer k.waitNextMu.Unlock() defer k.waitNextMu.Unlock()
next, ok := k.waitNext[peer.String()] next, ok := k.waitNext[peer.ByteString()]
return ok && time.Now().Before(next.tryAfter) return ok && time.Now().Before(next.tryAfter)
} }
...@@ -306,7 +306,7 @@ func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerCon ...@@ -306,7 +306,7 @@ func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerCon
} }
k.waitNextMu.Lock() k.waitNextMu.Lock()
if next, ok := k.waitNext[addr.String()]; ok && time.Now().Before(next.tryAfter) { if next, ok := k.waitNext[addr.ByteString()]; ok && time.Now().Before(next.tryAfter) {
k.waitNextMu.Unlock() k.waitNextMu.Unlock()
return false, false, nil return false, false, nil
} }
...@@ -351,7 +351,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, ...@@ -351,7 +351,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
remove := func(peer *peerConnInfo) { remove := func(peer *peerConnInfo) {
k.waitNextMu.Lock() k.waitNextMu.Lock()
delete(k.waitNext, peer.addr.String()) delete(k.waitNext, peer.addr.ByteString())
k.waitNextMu.Unlock() k.waitNextMu.Unlock()
k.knownPeers.Remove(peer.addr, peer.po) k.knownPeers.Remove(peer.addr, peer.po)
if err := k.addressBook.Remove(peer.addr); err != nil { if err := k.addressBook.Remove(peer.addr); err != nil {
...@@ -375,7 +375,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, ...@@ -375,7 +375,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
} }
k.waitNextMu.Lock() k.waitNextMu.Lock()
k.waitNext[peer.addr.String()] = retryInfo{tryAfter: time.Now().Add(shortRetry)} k.waitNext[peer.addr.ByteString()] = retryInfo{tryAfter: time.Now().Add(shortRetry)}
k.waitNextMu.Unlock() k.waitNextMu.Unlock()
k.connectedPeers.Add(peer.addr, peer.po) k.connectedPeers.Add(peer.addr, peer.po)
...@@ -413,7 +413,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, ...@@ -413,7 +413,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
// Check if the peer was penalized. // Check if the peer was penalized.
k.waitNextMu.Lock() k.waitNextMu.Lock()
next, ok := k.waitNext[addr] next, ok := k.waitNext[peer.addr.ByteString()]
if ok && time.Now().Before(next.tryAfter) { if ok && time.Now().Before(next.tryAfter) {
k.waitNextMu.Unlock() k.waitNextMu.Unlock()
wg.Done() wg.Done()
...@@ -681,7 +681,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -681,7 +681,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
if errors.As(err, &e) { if errors.As(err, &e) {
retryTime = e.TryAfter() retryTime = e.TryAfter()
} else { } else {
if info, ok := k.waitNext[peer.String()]; ok { if info, ok := k.waitNext[peer.ByteString()]; ok {
failedAttempts = info.failedAttempts failedAttempts = info.failedAttempts
} }
failedAttempts++ failedAttempts++
...@@ -694,13 +694,13 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -694,13 +694,13 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
if err := k.collector.Inspect(peer, func(ss *metrics.Snapshot) { if err := k.collector.Inspect(peer, func(ss *metrics.Snapshot) {
quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt() quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt()
if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts { if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts {
delete(k.waitNext, peer.String()) delete(k.waitNext, peer.ByteString())
if err := k.addressBook.Remove(peer); err != nil { if err := k.addressBook.Remove(peer); err != nil {
k.logger.Debugf("could not remove peer from addressbook: %q", peer) k.logger.Debugf("could not remove peer from addressbook: %q", peer)
} }
k.logger.Debugf("kademlia pruned peer from address book %q", peer) k.logger.Debugf("kademlia pruned peer from address book %q", peer)
} else { } else {
k.waitNext[peer.String()] = retryInfo{ k.waitNext[peer.ByteString()] = retryInfo{
tryAfter: retryTime, tryAfter: retryTime,
failedAttempts: failedAttempts, failedAttempts: failedAttempts,
} }
...@@ -836,7 +836,7 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error { ...@@ -836,7 +836,7 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
} }
k.waitNextMu.Lock() k.waitNextMu.Lock()
delete(k.waitNext, addr.String()) delete(k.waitNext, addr.ByteString())
k.waitNextMu.Unlock() k.waitNextMu.Unlock()
k.depthMu.Lock() k.depthMu.Lock()
...@@ -857,7 +857,7 @@ func (k *Kad) Disconnected(peer p2p.Peer) { ...@@ -857,7 +857,7 @@ func (k *Kad) Disconnected(peer p2p.Peer) {
k.connectedPeers.Remove(peer.Address, po) k.connectedPeers.Remove(peer.Address, po)
k.waitNextMu.Lock() k.waitNextMu.Lock()
k.waitNext[peer.Address.String()] = retryInfo{tryAfter: time.Now().Add(timeToRetry), failedAttempts: 0} k.waitNext[peer.Address.ByteString()] = retryInfo{tryAfter: time.Now().Add(timeToRetry), failedAttempts: 0}
k.waitNextMu.Unlock() k.waitNextMu.Unlock()
if err := k.collector.Record( if err := k.collector.Record(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment