Commit e66df071 authored by Peter Mrekaj's avatar Peter Mrekaj Committed by GitHub

fix: move metrics collector finalize at the end of the close call (#1765)

The Finalize collector method is called after all the waits and the
timeouts for finishing the ongoing connection attempts have finished.
parent 083e4739
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
package metrics package metrics
import ( import (
"errors"
"fmt" "fmt"
"sync" "sync"
"time" "time"
...@@ -17,8 +16,6 @@ import ( ...@@ -17,8 +16,6 @@ import (
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
) )
var ErrPeerNotFound = errors.New("peer no found")
const ( const (
peerLastSeenTimestamp string = "peer-last-seen-timestamp" peerLastSeenTimestamp string = "peer-last-seen-timestamp"
peerTotalConnectionDuration string = "peer-total-connection-duration" peerTotalConnectionDuration string = "peer-total-connection-duration"
...@@ -211,16 +208,42 @@ func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) error { ...@@ -211,16 +208,42 @@ func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) error {
// error, it rather continues and all the execution errors are returned together // error, it rather continues and all the execution errors are returned together
// with the successful metrics snapshots. // with the successful metrics snapshots.
func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[string]*Snapshot, error) { func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[string]*Snapshot, error) {
c.mu.RLock()
defer c.mu.RUnlock()
var mErr error var mErr error
snapshot := make(map[string]*Snapshot) snapshot := make(map[string]*Snapshot)
take := func(addr string) { take := func(addr string) {
peerSnapshot, err := c.peer(t, addr) cs := c.counters[addr]
if cs == nil {
return
}
ls, err := cs.lastSeenTimestamp.Get()
if err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to take last seen snapshot for %q: %w", addr, err))
}
lastSeenTimestamp := int64(ls)
cn, err := cs.connTotalDuration.Get()
if err != nil { if err != nil {
mErr = multierror.Append(err) mErr = multierror.Append(mErr, fmt.Errorf("unable to take connection duration snapshot for %q: %w", addr, err))
} else { }
snapshot[addr] = peerSnapshot connTotalDuration := time.Duration(cn)
sessionConnDuration := cs.sessionConnDuration
if cs.loggedIn {
sessionConnDuration = t.Sub(time.Unix(0, lastSeenTimestamp))
connTotalDuration += sessionConnDuration
}
snapshot[addr] = &Snapshot{
LastSeenTimestamp: lastSeenTimestamp,
SessionConnectionRetry: cs.sessionConnRetry,
ConnectionTotalDuration: connTotalDuration,
SessionConnectionDuration: sessionConnDuration,
SessionConnectionDirection: cs.sessionConnDirection,
} }
} }
...@@ -228,58 +251,14 @@ func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[strin ...@@ -228,58 +251,14 @@ func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) (map[strin
take(addr.String()) take(addr.String())
} }
if len(addresses) == 0 { if len(addresses) == 0 {
c.mu.RLock()
for addr := range c.counters { for addr := range c.counters {
take(addr) take(addr)
} }
c.mu.RUnlock()
} }
return snapshot, mErr return snapshot, mErr
} }
func (c *Collector) Peer(t time.Time, addr swarm.Address) (*Snapshot, error) {
return c.peer(t, addr.String())
}
func (c *Collector) peer(t time.Time, addr string) (*Snapshot, error) {
c.mu.RLock()
defer c.mu.RUnlock()
var mErr error
cs := c.counters[addr]
if cs == nil {
return nil, ErrPeerNotFound
}
ls, err := cs.lastSeenTimestamp.Get()
if err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to take last seen snapshot for %q: %w", addr, err))
}
lastSeenTimestamp := int64(ls)
cn, err := cs.connTotalDuration.Get()
if err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to take connection duration snapshot for %q: %w", addr, err))
}
connTotalDuration := time.Duration(cn)
sessionConnDuration := cs.sessionConnDuration
if cs.loggedIn {
sessionConnDuration = t.Sub(time.Unix(0, lastSeenTimestamp))
connTotalDuration += sessionConnDuration
}
return &Snapshot{
LastSeenTimestamp: lastSeenTimestamp,
ConnectionTotalDuration: connTotalDuration,
SessionConnectionRetry: cs.sessionConnRetry,
SessionConnectionDuration: sessionConnDuration,
SessionConnectionDirection: cs.sessionConnDirection,
}, mErr
}
// Finalize logs out all ongoing peer sessions // Finalize logs out all ongoing peer sessions
// and flushes all in-memory metrics counters. // and flushes all in-memory metrics counters.
func (c *Collector) Finalize(t time.Time) error { func (c *Collector) Finalize(t time.Time) error {
......
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
package metrics_test package metrics_test
import ( import (
"errors"
"testing" "testing"
"time" "time"
...@@ -138,7 +137,7 @@ func TestPeerMetricsCollector(t *testing.T) { ...@@ -138,7 +137,7 @@ func TestPeerMetricsCollector(t *testing.T) {
t.Fatalf("Finalize(%s): unexpected error: %v", t3, err) t.Fatalf("Finalize(%s): unexpected error: %v", t3, err)
} }
sss, err := mc.Snapshot(t2, addr) sss, err := mc.Snapshot(t2, addr)
if !errors.Is(err, metrics.ErrPeerNotFound) { if err != nil {
t.Fatalf("Snapshot(%q, ...): unexpected error: %v", addr, err) t.Fatalf("Snapshot(%q, ...): unexpected error: %v", addr, err)
} }
if have, want := len(sss), 0; have != want { if have, want := len(sss), 0; have != want {
......
...@@ -33,6 +33,8 @@ const ( ...@@ -33,6 +33,8 @@ const (
maxConnAttempts = 3 // when there is maxConnAttempts failed connect calls for a given peer it is considered non-connectable maxConnAttempts = 3 // when there is maxConnAttempts failed connect calls for a given peer it is considered non-connectable
maxBootnodeAttempts = 3 // how many attempts to dial to bootnodes before giving up maxBootnodeAttempts = 3 // how many attempts to dial to bootnodes before giving up
defaultBitSuffixLength = 2 // the number of bits used to create pseudo addresses for balancing defaultBitSuffixLength = 2 // the number of bits used to create pseudo addresses for balancing
peerConnectionAttemptTimeout = 5 * time.Second // Timeout for establishing a new connection with peer.
) )
var ( var (
...@@ -653,35 +655,34 @@ func recalcDepth(peers *pslice.PSlice, radius uint8) uint8 { ...@@ -653,35 +655,34 @@ func recalcDepth(peers *pslice.PSlice, radius uint8) uint8 {
// connect connects to a peer and gossips its address to our connected peers, // connect connects to a peer and gossips its address to our connected peers,
// as well as sends the peers we are connected to to the newly connected peer // as well as sends the peers we are connected to to the newly connected peer
func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) error { func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) error {
k.logger.Infof("attempting to connect to peer %s", peer) k.logger.Infof("attempting to connect to peer %q", peer)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, peerConnectionAttemptTimeout)
defer cancel() defer cancel()
i, err := k.p2p.Connect(ctx, ma)
if err != nil {
if errors.Is(err, p2p.ErrDialLightNode) {
return errPruneEntry
}
if errors.Is(err, p2p.ErrAlreadyConnected) {
if !i.Overlay.Equal(peer) {
return errOverlayMismatch
}
return nil switch i, err := k.p2p.Connect(ctx, ma); {
case errors.Is(err, p2p.ErrDialLightNode):
return errPruneEntry
case errors.Is(err, p2p.ErrAlreadyConnected):
if !i.Overlay.Equal(peer) {
return errOverlayMismatch
} }
return nil
case errors.Is(err, context.Canceled):
return err
case err != nil:
k.logger.Debugf("could not connect to peer %q: %v", peer, err)
k.logger.Debugf("could not connect to peer %s: %v", peer, err) k.waitNextMu.Lock()
retryTime := time.Now().Add(timeToRetry) retryTime := time.Now().Add(timeToRetry)
var e *p2p.ConnectionBackoffError var e *p2p.ConnectionBackoffError
k.waitNextMu.Lock()
failedAttempts := 0 failedAttempts := 0
if errors.As(err, &e) { if errors.As(err, &e) {
retryTime = e.TryAfter() retryTime = e.TryAfter()
} else { } else {
info, ok := k.waitNext[peer.String()] if info, ok := k.waitNext[peer.String()]; ok {
if ok {
failedAttempts = info.failedAttempts failedAttempts = info.failedAttempts
} }
failedAttempts++ failedAttempts++
} }
...@@ -692,19 +693,19 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -692,19 +693,19 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
if k.quickPrune(peer) || failedAttempts > maxConnAttempts { if k.quickPrune(peer) || failedAttempts > maxConnAttempts {
delete(k.waitNext, peer.String()) delete(k.waitNext, peer.String())
if err := k.addressBook.Remove(peer); err != nil { if err := k.addressBook.Remove(peer); err != nil {
k.logger.Debugf("could not remove peer from addressbook: %s", peer.String()) k.logger.Debugf("could not remove peer from addressbook: %q", peer)
} }
k.logger.Debugf("kademlia pruned peer from address book %s", peer.String()) k.logger.Debugf("kademlia pruned peer from address book %q", peer)
} else { } else {
k.waitNext[peer.String()] = retryInfo{tryAfter: retryTime, failedAttempts: failedAttempts} k.waitNext[peer.String()] = retryInfo{
tryAfter: retryTime,
failedAttempts: failedAttempts,
}
} }
k.waitNextMu.Unlock() k.waitNextMu.Unlock()
return err return err
} case !i.Overlay.Equal(peer):
if !i.Overlay.Equal(peer) {
_ = k.p2p.Disconnect(peer) _ = k.p2p.Disconnect(peer)
_ = k.p2p.Disconnect(i.Overlay) _ = k.p2p.Disconnect(i.Overlay)
return errOverlayMismatch return errOverlayMismatch
...@@ -714,19 +715,20 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) ...@@ -714,19 +715,20 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
} }
// quickPrune will return true for cases where: // quickPrune will return true for cases where:
// there are other connected peers, the addr has never been seen before, AND it's the first failed attempt // - there are other connected peers
// - the addr has never been seen before and it's the first failed attempt
func (k *Kad) quickPrune(addr swarm.Address) bool { func (k *Kad) quickPrune(addr swarm.Address) bool {
if k.connectedPeers.Length() == 0 { if k.connectedPeers.Length() == 0 {
return false return false
} }
snapshot, _ := k.collector.Peer(time.Now(), addr) sss, err := k.collector.Snapshot(time.Now(), addr)
if snapshot == nil || (snapshot.LastSeenTimestamp == 0 && snapshot.SessionConnectionRetry <= 1) { if err != nil {
return true k.logger.Debugf("kademlia: quickPrune: unable to take snapshot for %q: %v", addr, err)
} }
snapshot := sss[addr.String()]
return false return snapshot == nil ||
(snapshot.LastSeenTimestamp == 0 && snapshot.SessionConnectionRetry <= 1)
} }
// announce a newly connected peer to our connected peers, but also // announce a newly connected peer to our connected peers, but also
...@@ -1252,17 +1254,13 @@ func (k *Kad) Close() error { ...@@ -1252,17 +1254,13 @@ func (k *Kad) Close() error {
cc := make(chan struct{}) cc := make(chan struct{})
go func() { go func() {
defer close(cc)
k.wg.Wait() k.wg.Wait()
close(cc)
}() }()
if err := k.collector.Finalize(time.Now()); err != nil {
k.logger.Debugf("kademlia: unable to finalize open sessions: %v", err)
}
select { select {
case <-cc: case <-cc:
case <-time.After(10 * time.Second): case <-time.After(peerConnectionAttemptTimeout):
k.logger.Warning("kademlia shutting down with announce goroutines") k.logger.Warning("kademlia shutting down with announce goroutines")
} }
...@@ -1272,6 +1270,10 @@ func (k *Kad) Close() error { ...@@ -1272,6 +1270,10 @@ func (k *Kad) Close() error {
k.logger.Warning("kademlia manage loop did not shut down properly") k.logger.Warning("kademlia manage loop did not shut down properly")
} }
if err := k.collector.Finalize(time.Now()); err != nil {
k.logger.Debugf("kademlia: unable to finalize open sessions: %v", err)
}
return nil return nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment