Commit 17e187f0 authored by Esad Akar's avatar Esad Akar Committed by GitHub

refactor: waitNext map as a type (#1863)

parent ce1889d3
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package metrics provides service for collecting various metrics about peers.
// It is intended to be used with the kademlia where the metrics are collected.
package waitnext
import (
"sync"
"time"
"github.com/ethersphere/bee/pkg/swarm"
)
type next struct {
tryAfter time.Time
failedAttempts int
}
type WaitNext struct {
next map[string]*next
sync.Mutex
}
func New() *WaitNext {
return &WaitNext{
next: make(map[string]*next),
}
}
func (r *WaitNext) Set(addr swarm.Address, tryAfter time.Time, attempts int) {
r.Lock()
defer r.Unlock()
r.next[addr.ByteString()] = &next{tryAfter: tryAfter, failedAttempts: attempts}
}
func (r *WaitNext) SetTryAfter(addr swarm.Address, tryAfter time.Time) {
r.Lock()
defer r.Unlock()
if info, ok := r.next[addr.ByteString()]; ok {
info.tryAfter = tryAfter
} else {
r.next[addr.ByteString()] = &next{tryAfter: tryAfter}
}
}
func (r *WaitNext) Waiting(addr swarm.Address) bool {
r.Lock()
defer r.Unlock()
info, ok := r.next[addr.ByteString()]
return ok && time.Now().Before(info.tryAfter)
}
func (r *WaitNext) Attempts(addr swarm.Address) int {
r.Lock()
defer r.Unlock()
if info, ok := r.next[addr.ByteString()]; ok {
return info.failedAttempts
}
return 0
}
func (r *WaitNext) Remove(addr swarm.Address) {
r.Lock()
defer r.Unlock()
delete(r.next, addr.ByteString())
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package metrics provides service for collecting various metrics about peers.
// It is intended to be used with the kademlia where the metrics are collected.
package waitnext_test
import (
"testing"
"time"
"github.com/ethersphere/bee/pkg/swarm/test"
"github.com/ethersphere/bee/pkg/topology/kademlia/internal/waitnext"
)
func TestSet(t *testing.T) {
waitNext := waitnext.New()
addr := test.RandomAddress()
waitNext.Set(addr, time.Now().Add(time.Millisecond*10), 2)
if !waitNext.Waiting(addr) {
t.Fatal("should be waiting")
}
time.Sleep(time.Millisecond * 11)
if waitNext.Waiting(addr) {
t.Fatal("should not be waiting")
}
if attempts := waitNext.Attempts(addr); attempts != 2 {
t.Fatalf("want 2, got %d", attempts)
}
waitNext.SetTryAfter(addr, time.Now().Add(time.Millisecond*10))
if !waitNext.Waiting(addr) {
t.Fatal("should be waiting")
}
time.Sleep(time.Millisecond * 11)
if waitNext.Waiting(addr) {
t.Fatal("should not be waiting")
}
if attempts := waitNext.Attempts(addr); attempts != 2 {
t.Fatalf("want 2, got %d", attempts)
}
}
......@@ -24,6 +24,7 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/kademlia/internal/metrics"
"github.com/ethersphere/bee/pkg/topology/kademlia/internal/waitnext"
"github.com/ethersphere/bee/pkg/topology/pslice"
ma "github.com/multiformats/go-multiaddr"
)
......@@ -78,12 +79,10 @@ type Kad struct {
connectedPeers *pslice.PSlice // a slice of peers sorted and indexed by po, indexes kept in `bins`
knownPeers *pslice.PSlice // both are po aware slice of addresses
bootnodes []ma.Multiaddr
depth uint8 // current neighborhood depth
radius uint8 // storage area of responsibility
depthMu sync.RWMutex // protect depth changes
manageC chan struct{} // trigger the manage forever loop to connect to new peers
waitNext map[string]retryInfo // sanction connections to a peer, key is overlay string and value is a retry information
waitNextMu sync.Mutex // guards waitNext map
depth uint8 // current neighborhood depth
radius uint8 // storage area of responsibility
depthMu sync.RWMutex // protect depth changes
manageC chan struct{} // trigger the manage forever loop to connect to new peers
peerSig []chan struct{}
peerSigMtx sync.Mutex
logger logging.Logger // logger
......@@ -93,11 +92,7 @@ type Kad struct {
quit chan struct{} // quit channel
done chan struct{} // signal that `manage` has quit
wg sync.WaitGroup
}
type retryInfo struct {
tryAfter time.Time
failedAttempts int
waitNext *waitnext.WaitNext
}
// New returns a new Kademlia.
......@@ -133,7 +128,7 @@ func New(
knownPeers: pslice.New(int(swarm.MaxBins)),
bootnodes: o.Bootnodes,
manageC: make(chan struct{}, 1),
waitNext: make(map[string]retryInfo),
waitNext: waitnext.New(),
logger: logger,
standalone: o.StandaloneMode,
bootnode: o.BootnodeMode,
......@@ -237,10 +232,7 @@ type peerConnInfo struct {
// connectBalanced attempts to connect to the balanced peers first.
func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnInfo) {
skipPeers := func(peer swarm.Address) bool {
k.waitNextMu.Lock()
defer k.waitNextMu.Unlock()
next, ok := k.waitNext[peer.ByteString()]
return ok && time.Now().Before(next.tryAfter)
return k.waitNext.Waiting(peer)
}
for i := range k.commonBinPrefixes {
......@@ -305,12 +297,9 @@ func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerCon
return false, false, nil
}
k.waitNextMu.Lock()
if next, ok := k.waitNext[addr.ByteString()]; ok && time.Now().Before(next.tryAfter) {
k.waitNextMu.Unlock()
if k.waitNext.Waiting(addr) {
return false, false, nil
}
k.waitNextMu.Unlock()
if saturated, _ := k.saturationFunc(po, k.knownPeers, k.connectedPeers); saturated {
return false, true, nil // Bin is saturated, skip to next bin.
......@@ -349,9 +338,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
}
remove := func(peer *peerConnInfo) {
k.waitNextMu.Lock()
delete(k.waitNext, peer.addr.ByteString())
k.waitNextMu.Unlock()
k.waitNext.Remove(peer.addr)
k.knownPeers.Remove(peer.addr, peer.po)
if err := k.addressBook.Remove(peer.addr); err != nil {
k.logger.Debugf("kademlia: could not remove peer %q from addressbook", peer.addr)
......@@ -373,9 +360,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
return
}
k.waitNextMu.Lock()
k.waitNext[peer.addr.ByteString()] = retryInfo{tryAfter: time.Now().Add(shortRetry)}
k.waitNextMu.Unlock()
k.waitNext.Set(peer.addr, time.Now().Add(shortRetry), 0)
k.connectedPeers.Add(peer.addr, peer.po)
......@@ -410,15 +395,10 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
case peer := <-peerConnChan:
addr := peer.addr.String()
// Check if the peer was penalized.
k.waitNextMu.Lock()
next, ok := k.waitNext[peer.addr.ByteString()]
if ok && time.Now().Before(next.tryAfter) {
k.waitNextMu.Unlock()
if k.waitNext.Waiting(peer.addr) {
wg.Done()
continue
}
k.waitNextMu.Unlock()
inProgressMu.Lock()
if !inProgress[addr] {
......@@ -679,12 +659,8 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
if errors.As(err, &e) {
retryTime = e.TryAfter()
} else {
k.waitNextMu.Lock()
if info, ok := k.waitNext[peer.ByteString()]; ok {
failedAttempts = info.failedAttempts
}
failedAttempts = k.waitNext.Attempts(peer)
failedAttempts++
k.waitNextMu.Unlock()
}
if err := k.collector.Record(peer, metrics.IncSessionConnectionRetry()); err != nil {
......@@ -694,21 +670,16 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
if err := k.collector.Inspect(peer, func(ss *metrics.Snapshot) {
quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt()
k.waitNextMu.Lock()
if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts {
delete(k.waitNext, peer.ByteString())
k.waitNext.Remove(peer)
k.knownPeers.Remove(peer, swarm.Proximity(k.base.Bytes(), peer.Bytes()))
if err := k.addressBook.Remove(peer); err != nil {
k.logger.Debugf("could not remove peer from addressbook: %q", peer)
}
k.logger.Debugf("kademlia pruned peer from address book %q", peer)
} else {
k.waitNext[peer.ByteString()] = retryInfo{
tryAfter: retryTime,
failedAttempts: failedAttempts,
}
k.waitNext.Set(peer, retryTime, failedAttempts)
}
k.waitNextMu.Unlock()
}); err != nil {
k.logger.Debugf("kademlia: connect: unable to inspect snapshot for %q: %v", peer, err)
}
......@@ -838,9 +809,7 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
k.logger.Debugf("kademlia: unable to record login inbound metrics for %q: %v", addr, err)
}
k.waitNextMu.Lock()
delete(k.waitNext, addr.ByteString())
k.waitNextMu.Unlock()
k.waitNext.Remove(addr)
k.depthMu.Lock()
k.depth = recalcDepth(k.connectedPeers, k.radius)
......@@ -859,13 +828,7 @@ func (k *Kad) Disconnected(peer p2p.Peer) {
po := swarm.Proximity(k.base.Bytes(), peer.Address.Bytes())
k.connectedPeers.Remove(peer.Address, po)
k.waitNextMu.Lock()
newInfo := retryInfo{tryAfter: time.Now().Add(timeToRetry), failedAttempts: 0}
if info, ok := k.waitNext[peer.Address.ByteString()]; ok {
newInfo.failedAttempts = info.failedAttempts
}
k.waitNext[peer.Address.ByteString()] = newInfo
k.waitNextMu.Unlock()
k.waitNext.SetTryAfter(peer.Address, time.Now().Add(timeToRetry))
if err := k.collector.Record(
peer.Address,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment