Commit 2c9e1223 authored by Esad Akar's avatar Esad Akar Committed by GitHub

fix: kademlia metrics mutex optimize (#1861)

parent b71302a0
...@@ -59,6 +59,10 @@ type RecordOp func(*Counters) error ...@@ -59,6 +59,10 @@ type RecordOp func(*Counters) error
// panics if the given time is before the Unix epoch. // panics if the given time is before the Unix epoch.
func PeerLogIn(t time.Time, dir PeerConnectionDirection) RecordOp { func PeerLogIn(t time.Time, dir PeerConnectionDirection) RecordOp {
return func(cs *Counters) error { return func(cs *Counters) error {
cs.Lock()
defer cs.Unlock()
if cs.loggedIn { if cs.loggedIn {
return nil // Ignore when the peer is already logged in. return nil // Ignore when the peer is already logged in.
} }
...@@ -80,6 +84,10 @@ func PeerLogIn(t time.Time, dir PeerConnectionDirection) RecordOp { ...@@ -80,6 +84,10 @@ func PeerLogIn(t time.Time, dir PeerConnectionDirection) RecordOp {
// panics if the given time is before the Unix epoch. // panics if the given time is before the Unix epoch.
func PeerLogOut(t time.Time) RecordOp { func PeerLogOut(t time.Time) RecordOp {
return func(cs *Counters) error { return func(cs *Counters) error {
cs.Lock()
defer cs.Unlock()
if !cs.loggedIn { if !cs.loggedIn {
return nil // Ignore when the peer is not logged in. return nil // Ignore when the peer is not logged in.
} }
...@@ -116,6 +124,9 @@ func PeerLogOut(t time.Time) RecordOp { ...@@ -116,6 +124,9 @@ func PeerLogOut(t time.Time) RecordOp {
// counter by 1. // counter by 1.
func IncSessionConnectionRetry() RecordOp { func IncSessionConnectionRetry() RecordOp {
return func(cs *Counters) error { return func(cs *Counters) error {
cs.Lock()
defer cs.Unlock()
cs.sessionConnRetry++ cs.sessionConnRetry++
return nil return nil
} }
...@@ -148,6 +159,7 @@ type Counters struct { ...@@ -148,6 +159,7 @@ type Counters struct {
sessionConnRetry uint sessionConnRetry uint
sessionConnDuration time.Duration sessionConnDuration time.Duration
sessionConnDirection PeerConnectionDirection sessionConnDirection PeerConnectionDirection
sync.Mutex
} }
// NewCollector is a convenient constructor for creating new Collector. // NewCollector is a convenient constructor for creating new Collector.
...@@ -170,11 +182,13 @@ type Collector struct { ...@@ -170,11 +182,13 @@ type Collector struct {
// The execution doesn't stop if some metric operation returns an error, it // The execution doesn't stop if some metric operation returns an error, it
// rather continues and all the execution errors are returned. // rather continues and all the execution errors are returned.
func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) error { func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) error {
c.mu.Lock()
defer c.mu.Unlock()
key := addr.String() key := addr.String()
c.mu.RLock()
cs, ok := c.counters[key] cs, ok := c.counters[key]
c.mu.RUnlock()
if !ok { if !ok {
mk := newPeerKey(peerLastSeenTimestamp, key) mk := newPeerKey(peerLastSeenTimestamp, key)
ls, err := c.db.NewUint64Field(mk.String()) ls, err := c.db.NewUint64Field(mk.String())
...@@ -193,7 +207,10 @@ func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) error { ...@@ -193,7 +207,10 @@ func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) error {
connTotalDuration: &cd, connTotalDuration: &cd,
} }
} }
c.mu.Lock()
c.counters[key] = cs c.counters[key] = cs
c.mu.Unlock()
var err error var err error
for i, op := range rop { for i, op := range rop {
...@@ -215,14 +232,16 @@ func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) error { ...@@ -215,14 +232,16 @@ func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) error {
// error, it rather continues and all the execution errors are returned together // error, it rather continues and all the execution errors are returned together
// with the successful metrics snapshots. // with the successful metrics snapshots.
func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[string]*Snapshot, error) { func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[string]*Snapshot, error) {
c.mu.RLock()
defer c.mu.RUnlock()
var mErr error var mErr error
snapshot := make(map[string]*Snapshot) snapshot := make(map[string]*Snapshot)
take := func(addr string) { take := func(addr string) {
c.mu.RLock()
cs := c.counters[addr] cs := c.counters[addr]
c.mu.RUnlock()
if cs == nil { if cs == nil {
return return
} }
...@@ -239,6 +258,7 @@ func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[strin ...@@ -239,6 +258,7 @@ func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[strin
} }
connTotalDuration := time.Duration(cn) connTotalDuration := time.Duration(cn)
cs.Lock()
sessionConnDuration := cs.sessionConnDuration sessionConnDuration := cs.sessionConnDuration
if cs.loggedIn { if cs.loggedIn {
sessionConnDuration = t.Sub(time.Unix(0, lastSeenTimestamp)) sessionConnDuration = t.Sub(time.Unix(0, lastSeenTimestamp))
...@@ -252,13 +272,15 @@ func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[strin ...@@ -252,13 +272,15 @@ func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[strin
SessionConnectionDuration: sessionConnDuration, SessionConnectionDuration: sessionConnDuration,
SessionConnectionDirection: cs.sessionConnDirection, SessionConnectionDirection: cs.sessionConnDirection,
} }
cs.Unlock()
} }
for _, addr := range addresses { for _, addr := range addresses {
take(addr.String()) take(addr.String())
} }
if len(addresses) == 0 { if len(addresses) == 0 {
for addr := range c.counters { for _, addr := range c.keys() {
take(addr) take(addr)
} }
} }
...@@ -266,13 +288,24 @@ func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[strin ...@@ -266,13 +288,24 @@ func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[strin
return snapshot, mErr return snapshot, mErr
} }
func (c *Collector) keys() []string {
c.mu.RLock()
defer c.mu.RUnlock()
keys := make([]string, 0, len(c.counters))
for k := range c.counters {
keys = append(keys, k)
}
return keys
}
// Inspect allows to inspect current snapshot for the given peer address by // Inspect allows to inspect current snapshot for the given peer address by
// executing the given fn in a safe manner when write to the counters is // executing the given fn in a safe manner when write to the counters is
// blocked while the performing inspection function is executed. // blocked while the performing inspection function is executed.
func (c *Collector) Inspect(addr swarm.Address, fn func(ss *Snapshot)) error { func (c *Collector) Inspect(addr swarm.Address, fn func(ss *Snapshot)) error {
c.mu.RLock()
defer c.mu.RUnlock()
snapshots, err := c.Snapshot(time.Now(), addr) snapshots, err := c.Snapshot(time.Now(), addr)
if err != nil { if err != nil {
return err return err
......
...@@ -673,17 +673,18 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -673,17 +673,18 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
case err != nil: case err != nil:
k.logger.Debugf("could not connect to peer %q: %v", peer, err) k.logger.Debugf("could not connect to peer %q: %v", peer, err)
k.waitNextMu.Lock()
retryTime := time.Now().Add(timeToRetry) retryTime := time.Now().Add(timeToRetry)
var e *p2p.ConnectionBackoffError var e *p2p.ConnectionBackoffError
failedAttempts := 0 failedAttempts := 0
if errors.As(err, &e) { if errors.As(err, &e) {
retryTime = e.TryAfter() retryTime = e.TryAfter()
} else { } else {
k.waitNextMu.Lock()
if info, ok := k.waitNext[peer.ByteString()]; ok { if info, ok := k.waitNext[peer.ByteString()]; ok {
failedAttempts = info.failedAttempts failedAttempts = info.failedAttempts
} }
failedAttempts++ failedAttempts++
k.waitNextMu.Unlock()
} }
if err := k.collector.Record(peer, metrics.IncSessionConnectionRetry()); err != nil { if err := k.collector.Record(peer, metrics.IncSessionConnectionRetry()); err != nil {
...@@ -692,6 +693,8 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -692,6 +693,8 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
if err := k.collector.Inspect(peer, func(ss *metrics.Snapshot) { if err := k.collector.Inspect(peer, func(ss *metrics.Snapshot) {
quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt() quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt()
k.waitNextMu.Lock()
if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts { if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts {
delete(k.waitNext, peer.ByteString()) delete(k.waitNext, peer.ByteString())
k.knownPeers.Remove(peer, swarm.Proximity(k.base.Bytes(), peer.Bytes())) k.knownPeers.Remove(peer, swarm.Proximity(k.base.Bytes(), peer.Bytes()))
...@@ -705,10 +708,10 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -705,10 +708,10 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
failedAttempts: failedAttempts, failedAttempts: failedAttempts,
} }
} }
k.waitNextMu.Unlock()
}); err != nil { }); err != nil {
k.logger.Debugf("kademlia: connect: unable to inspect snapshot for %q: %v", peer, err) k.logger.Debugf("kademlia: connect: unable to inspect snapshot for %q: %v", peer, err)
} }
k.waitNextMu.Unlock()
return err return err
case !i.Overlay.Equal(peer): case !i.Overlay.Equal(peer):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment