Commit 9a885234 authored by acud's avatar acud Committed by GitHub

hive: add metrics (#1373)

parent 229c33f6
...@@ -38,6 +38,7 @@ type Service struct { ...@@ -38,6 +38,7 @@ type Service struct {
addPeersHandler func(context.Context, ...swarm.Address) error addPeersHandler func(context.Context, ...swarm.Address) error
networkID uint64 networkID uint64
logger logging.Logger logger logging.Logger
metrics metrics
} }
func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uint64, logger logging.Logger) *Service { func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uint64, logger logging.Logger) *Service {
...@@ -46,6 +47,7 @@ func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uin ...@@ -46,6 +47,7 @@ func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uin
logger: logger, logger: logger,
addressBook: addressbook, addressBook: addressbook,
networkID: networkID, networkID: networkID,
metrics: newMetrics(),
} }
} }
...@@ -64,6 +66,9 @@ func (s *Service) Protocol() p2p.ProtocolSpec { ...@@ -64,6 +66,9 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, peers ...swarm.Address) error { func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, peers ...swarm.Address) error {
max := maxBatchSize max := maxBatchSize
s.metrics.BroadcastPeers.Inc()
s.metrics.BroadcastPeersPeers.Add(float64(len(peers)))
for len(peers) > 0 { for len(peers) > 0 {
if max > len(peers) { if max > len(peers) {
max = len(peers) max = len(peers)
...@@ -83,6 +88,7 @@ func (s *Service) SetAddPeersHandler(h func(ctx context.Context, addr ...swarm.A ...@@ -83,6 +88,7 @@ func (s *Service) SetAddPeersHandler(h func(ctx context.Context, addr ...swarm.A
} }
func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swarm.Address) (err error) { func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swarm.Address) (err error) {
s.metrics.BroadcastPeersSends.Inc()
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, peersStreamName) stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, peersStreamName)
if err != nil { if err != nil {
return fmt.Errorf("new stream: %w", err) return fmt.Errorf("new stream: %w", err)
...@@ -121,6 +127,7 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa ...@@ -121,6 +127,7 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa
} }
func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error { func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
s.metrics.PeersHandler.Inc()
_, r := protobuf.NewWriterAndReader(stream) _, r := protobuf.NewWriterAndReader(stream)
ctx, cancel := context.WithTimeout(ctx, messageTimeout) ctx, cancel := context.WithTimeout(ctx, messageTimeout)
defer cancel() defer cancel()
...@@ -130,6 +137,8 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St ...@@ -130,6 +137,8 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St
return fmt.Errorf("read requestPeers message: %w", err) return fmt.Errorf("read requestPeers message: %w", err)
} }
s.metrics.PeersHandlerPeers.Add(float64(len(peersReq.Peers)))
// close the stream before processing in order to unblock the sending side // close the stream before processing in order to unblock the sending side
// fullclose is called async because there is no need to wait for confirmation, // fullclose is called async because there is no need to wait for confirmation,
// but we still want to handle not closed stream from the other side to avoid zombie stream // but we still want to handle not closed stream from the other side to avoid zombie stream
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package hive
import (
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type metrics struct {
BroadcastPeers prometheus.Counter
BroadcastPeersPeers prometheus.Counter
BroadcastPeersSends prometheus.Counter
PeersHandler prometheus.Counter
PeersHandlerPeers prometheus.Counter
}
func newMetrics() metrics {
subsystem := "hive"
return metrics{
BroadcastPeers: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "broadcast_peers_count",
Help: "Number of calls to broadcast peers.",
}),
BroadcastPeersPeers: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "broadcast_peers_peer_count",
Help: "Number of peers to be sent.",
}),
BroadcastPeersSends: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "broadcast_peers_message_count",
Help: "Number of individual peer gossip messages sent.",
}),
PeersHandler: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "peers_handler_count",
Help: "Number of peer messages received.",
}),
PeersHandlerPeers: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "peers_handler_peers_count",
Help: "Number of peers received in peer messages.",
}),
}
}
func (s *Service) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment