1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package metrics provides service for collecting various metrics about peers.
// It is intended to be used with the kademlia where the metrics are collected.
package metrics
import (
"fmt"
"sync"
"time"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/hashicorp/go-multierror"
"github.com/syndtr/goleveldb/leveldb"
"go.uber.org/atomic"
)
const (
peerLastSeenTimestamp string = "peer-last-seen-timestamp"
peerTotalConnectionDuration string = "peer-total-connection-duration"
)
// PeerConnectionDirection represents peer connection direction.
type PeerConnectionDirection string
const (
PeerConnectionDirectionInbound PeerConnectionDirection = "inbound"
PeerConnectionDirectionOutbound PeerConnectionDirection = "outbound"
)
// peerKey is used to store peers' persistent metrics counters.
type peerKey struct {
prefix string
address string
}
// String implements Stringer.String method.
func (pk peerKey) String() string {
return fmt.Sprintf("%s-%s", pk.prefix, pk.address)
}
// newPeerKey is a convenient constructor for creating new peerKey.
func newPeerKey(p, a string) *peerKey {
return &peerKey{
prefix: p,
address: a,
}
}
// RecordOp is a definition of a peer metrics Record
// operation whose execution modifies a specific metrics.
type RecordOp func(*Counters)
// PeerLogIn will first update the current last seen to the give time t and as
// the second it'll set the direction of the session connection to the given
// value. The force flag will force the peer re-login if he's already logged in.
// The time is set as Unix timestamp ignoring the timezone. The operation will
// panics if the given time is before the Unix epoch.
func PeerLogIn(t time.Time, dir PeerConnectionDirection) RecordOp {
return func(cs *Counters) {
cs.Lock()
defer cs.Unlock()
if cs.loggedIn {
return // Ignore when the peer is already logged in.
}
cs.loggedIn = true
ls := t.UnixNano()
if ls < 0 {
panic(fmt.Errorf("time before unix epoch: %s", t))
}
cs.sessionConnDirection = dir
cs.lastSeenTimestamp = ls
cs.dirty.Store(3)
}
}
// PeerLogOut will first update the connection session and total duration with
// the difference of the given time t and the current last seen value. As the
// second it'll also update the last seen peer metrics to the given time t.
// The time is set as Unix timestamp ignoring the timezone. The operation will
// panics if the given time is before the Unix epoch.
func PeerLogOut(t time.Time) RecordOp {
return func(cs *Counters) {
cs.Lock()
defer cs.Unlock()
if !cs.loggedIn {
return // Ignore when the peer is not logged in.
}
cs.loggedIn = false
curLs := cs.lastSeenTimestamp
newLs := t.UnixNano()
if newLs < 0 {
panic(fmt.Errorf("time before unix epoch: %s", t))
}
cs.sessionConnDuration = time.Duration(newLs - curLs)
cs.connTotalDuration += cs.sessionConnDuration
cs.lastSeenTimestamp = newLs
cs.dirty.Store(3)
}
}
// IncSessionConnectionRetry increments the session connection retry
// counter by 1.
func IncSessionConnectionRetry() RecordOp {
return func(cs *Counters) {
cs.Lock()
defer cs.Unlock()
cs.sessionConnRetry++
}
}
// Snapshot represents a snapshot of peers' metrics counters.
type Snapshot struct {
LastSeenTimestamp int64
SessionConnectionRetry uint64
ConnectionTotalDuration time.Duration
SessionConnectionDuration time.Duration
SessionConnectionDirection PeerConnectionDirection
}
// HasAtMaxOneConnectionAttempt returns true if the snapshot represents a new
// peer which has at maximum one session connection attempt but it still isn't
// logged in.
func (ss *Snapshot) HasAtMaxOneConnectionAttempt() bool {
return ss.LastSeenTimestamp == 0 && ss.SessionConnectionRetry <= 1
}
// Counters represents a collection of peer metrics
// mainly collected for statistics and debugging.
type Counters struct {
sync.Mutex
// Bookkeeping.
peer *swarm.Address
loggedIn bool
// Watches in-memory counters which has to be persisted.
// 3 - dirty, need to be persisted
// 2 - snapshot of counters in progress
// 1 - batched for persistent write
// 0 - persisted
dirty atomic.Int32
// In-memory counters.
lastSeenTimestamp int64
connTotalDuration time.Duration
sessionConnRetry uint64
sessionConnDuration time.Duration
sessionConnDirection PeerConnectionDirection
// Persistent counters.
persistentLastSeenTimestamp atomic.Value
persistentConnTotalDuration atomic.Value
}
// flush writes the current state of in memory counters into the given db.
func (cs *Counters) flush(db *shed.DB, batch *leveldb.Batch) error {
if cs.dirty.Load() > 1 {
return nil
}
cs.dirty.CAS(3, 2)
cs.Lock()
var (
key = cs.peer.String()
lastSeenTimestampSnapshot = cs.lastSeenTimestamp
connectionTotalDurationSnapshot = cs.connTotalDuration
)
cs.Unlock()
ls, ok := cs.persistentLastSeenTimestamp.Load().(*shed.Uint64Field)
if !ok {
mk := newPeerKey(peerLastSeenTimestamp, key)
field, err := db.NewUint64Field(mk.String())
if err != nil {
return fmt.Errorf("field initialization for %q failed: %w", mk, err)
}
ls = &field
cs.persistentLastSeenTimestamp.Store(ls)
}
cd, ok := cs.persistentConnTotalDuration.Load().(*shed.Uint64Field)
if !ok {
mk := newPeerKey(peerTotalConnectionDuration, key)
field, err := db.NewUint64Field(mk.String())
if err != nil {
return fmt.Errorf("field initialization for %q failed: %w", mk, err)
}
cd = &field
cs.persistentConnTotalDuration.Store(cd)
}
ls.PutInBatch(batch, uint64(lastSeenTimestampSnapshot))
cd.PutInBatch(batch, uint64(connectionTotalDurationSnapshot))
cs.dirty.CAS(2, 1)
return nil
}
// snapshot returns current snapshot of counters referenced to the given t.
func (cs *Counters) snapshot(t time.Time) *Snapshot {
cs.Lock()
defer cs.Unlock()
connTotalDuration := cs.connTotalDuration
sessionConnDuration := cs.sessionConnDuration
if cs.loggedIn {
sessionConnDuration = t.Sub(time.Unix(0, cs.lastSeenTimestamp))
connTotalDuration += sessionConnDuration
}
return &Snapshot{
LastSeenTimestamp: cs.lastSeenTimestamp,
SessionConnectionRetry: cs.sessionConnRetry,
ConnectionTotalDuration: connTotalDuration,
SessionConnectionDuration: sessionConnDuration,
SessionConnectionDirection: cs.sessionConnDirection,
}
}
// NewCollector is a convenient constructor for creating new Collector.
func NewCollector(db *shed.DB) *Collector {
return &Collector{db: db}
}
// Collector collects various metrics about
// peers specified be the swarm.Address.
type Collector struct {
db *shed.DB
counters sync.Map
}
// Record records a set of metrics for peer specified by the given address.
func (c *Collector) Record(addr swarm.Address, rop ...RecordOp) {
val, _ := c.counters.LoadOrStore(addr.ByteString(), &Counters{peer: &addr})
for _, op := range rop {
op(val.(*Counters))
}
}
// Snapshot returns the current state of the metrics collector for peer(s).
// The given time t is used to calculate the duration of the current session,
// if any. If an address or a set of addresses is specified then only metrics
// related to them will be returned, otherwise metrics for all peers will be
// returned. If the peer is still logged in, the session-related counters will
// be evaluated against the last seen time, which equals to the login time. If
// the peer is logged out, then the session counters will reflect its last
// session.
func (c *Collector) Snapshot(t time.Time, addresses ...swarm.Address) map[string]*Snapshot {
snapshot := make(map[string]*Snapshot)
for _, addr := range addresses {
val, ok := c.counters.Load(addr.ByteString())
if !ok {
continue
}
cs := val.(*Counters)
snapshot[addr.ByteString()] = cs.snapshot(t)
}
if len(addresses) == 0 {
c.counters.Range(func(key, val interface{}) bool {
cs := val.(*Counters)
snapshot[cs.peer.ByteString()] = cs.snapshot(t)
return true
})
}
return snapshot
}
// Inspect allows to inspect current snapshot for the given
// peer address by executing the inspection function.
func (c *Collector) Inspect(addr swarm.Address, fn func(ss *Snapshot)) {
snapshots := c.Snapshot(time.Now(), addr)
fn(snapshots[addr.ByteString()])
}
// Flush sync the dirty in memory counters by flushing their values to the
// underlying storage. If an address or a set of addresses is specified then
// only counters related to them will be flushed, otherwise counters for all
// peers will be flushed.
func (c *Collector) Flush(addresses ...swarm.Address) error {
var (
mErr error
dirty []string
batch = new(leveldb.Batch)
)
for _, addr := range addresses {
val, ok := c.counters.Load(addr.ByteString())
if !ok {
continue
}
cs := val.(*Counters)
if err := cs.flush(c.db, batch); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to batch the counters of peer %q for flash: %w", addr, err))
continue
}
dirty = append(dirty, addr.ByteString())
}
if len(addresses) == 0 {
c.counters.Range(func(_, val interface{}) bool {
cs := val.(*Counters)
if err := cs.flush(c.db, batch); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to batch the counters of peer %q for flash: %w", cs.peer, err))
return true
}
dirty = append(dirty, cs.peer.ByteString())
return true
})
}
if batch.Len() == 0 {
return mErr
}
if err := c.db.WriteBatch(batch); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to persist counters in batch: %w", err))
}
for _, addr := range dirty {
val, ok := c.counters.Load(addr)
if !ok {
continue
}
cs := val.(*Counters)
cs.dirty.CAS(1, 0)
}
return mErr
}
// Finalize logs out all ongoing peer sessions
// and flushes all in-memory metrics counters.
func (c *Collector) Finalize(t time.Time) error {
var (
mErr error
batch = new(leveldb.Batch)
)
c.counters.Range(func(_, val interface{}) bool {
cs := val.(*Counters)
PeerLogOut(t)(cs)
if err := cs.flush(c.db, batch); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to flush counters for peer %q: %w", cs.peer, err))
}
return true
})
if batch.Len() > 0 {
if err := c.db.WriteBatch(batch); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("unable to persist counters in batch: %w", err))
}
}
c.counters.Range(func(_, val interface{}) bool {
cs := val.(*Counters)
c.counters.Delete(cs.peer.ByteString())
return true
})
return mErr
}