Commit 06d1cb40 authored by Nemanja Zbiljić's avatar Nemanja Zbiljić Committed by GitHub

Instrument retrieval package (#1112)

parent 9df7f53d
......@@ -432,6 +432,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
debugAPIService.MustRegisterMetrics(pushSyncProtocol.Metrics()...)
debugAPIService.MustRegisterMetrics(pushSyncPusher.Metrics()...)
debugAPIService.MustRegisterMetrics(pullSync.Metrics()...)
debugAPIService.MustRegisterMetrics(retrieve.Metrics()...)
if pssServiceMetrics, ok := pssService.(metrics.Collector); ok {
debugAPIService.MustRegisterMetrics(pssServiceMetrics.Metrics()...)
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package retrieval
import (
"github.com/prometheus/client_golang/prometheus"
m "github.com/ethersphere/bee/pkg/metrics"
)
type metrics struct {
// all metrics fields must be exported
// to be able to return them by Metrics()
// using reflection
RequestCounter prometheus.Counter
PeerRequestCounter prometheus.Counter
TotalRetrieved prometheus.Counter
InvalidChunkRetrieved prometheus.Counter
RetrieveChunkPeerPOTimer prometheus.HistogramVec
RetrieveChunkPOGainCounter prometheus.CounterVec
ChunkPrice prometheus.Summary
TotalErrors prometheus.Counter
}
func newMetrics() metrics {
subsystem := "retrieval"
return metrics{
RequestCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "request_count",
Help: "Number of requests to retrieve chunks.",
}),
PeerRequestCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "peer_request_count",
Help: "Number of request to single peer.",
}),
TotalRetrieved: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_retrieved",
Help: "Total chunks retrieved.",
}),
InvalidChunkRetrieved: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "invalid_chunk_retrieved",
Help: "Invalid chunk retrieved from peer.",
}),
RetrieveChunkPeerPOTimer: *prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "retrieve_po_time",
Help: "Histogram for time taken to retrieve a chunk per PO.",
Buckets: []float64{0.01, 0.1, 0.25, 0.5, 1, 2.5, 5, 10},
},
[]string{"po"},
),
RetrieveChunkPOGainCounter: *prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunk_po_gain_count",
Help: "Counter of chunk retrieval requests per address PO hop distance.",
},
[]string{"gain"},
),
ChunkPrice: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunk_price",
Help: "The price of the chunk that was paid.",
}),
TotalErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_errors",
Help: "Total number of errors while retrieving chunk.",
}),
}
}
func (s *Service) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}
......@@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"time"
"github.com/ethersphere/bee/pkg/accounting"
......@@ -48,6 +49,7 @@ type Service struct {
logger logging.Logger
accounting accounting.Interface
pricer accounting.Pricer
metrics metrics
tracer *tracing.Tracer
}
......@@ -60,6 +62,7 @@ func New(addr swarm.Address, storer storage.Storer, streamer p2p.Streamer, chunk
logger: logger,
accounting: accounting,
pricer: pricer,
metrics: newMetrics(),
tracer: tracer,
}
}
......@@ -86,6 +89,8 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm.
ctx, cancel := context.WithTimeout(ctx, maxPeers*retrieveChunkTimeout)
defer cancel()
s.metrics.RequestCounter.Inc()
v, err, _ := s.singleflight.Do(addr.String(), func() (interface{}, error) {
span, logger, ctx := s.tracer.StartSpanFromContext(ctx, "retrieve-chunk", s.logger, opentracing.Tag{Key: "address", Value: addr.String()})
defer span.Finish()
......@@ -100,6 +105,8 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm.
default:
}
s.metrics.PeerRequestCounter.Inc()
var peer swarm.Address
chunk, peer, err := s.retrieveChunk(ctx, addr, skipPeers)
......@@ -125,27 +132,46 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm.
}
func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPeers []swarm.Address) (chunk swarm.Chunk, peer swarm.Address, err error) {
startTimer := time.Now()
v := ctx.Value(requestSourceContextKey{})
sourcePeerAddr := swarm.Address{}
// allow upstream requests if this node is the source of the request
// i.e. the request was not forwarded, to improve retrieval
// if this node is the closest to he chunk but still does not contain it
allowUpstream := true
if src, ok := v.(string); ok {
skipAddr, err := swarm.ParseHexAddress(src)
sourcePeerAddr, err = swarm.ParseHexAddress(src)
if err == nil {
skipPeers = append(skipPeers, skipAddr)
skipPeers = append(skipPeers, sourcePeerAddr)
}
// do not allow upstream requests if the request was forwarded to this node
// to avoid the request loops
allowUpstream = false
}
ctx, cancel := context.WithTimeout(ctx, retrieveChunkTimeout)
defer cancel()
peer, err = s.closestPeer(addr, skipPeers, allowUpstream)
if err != nil {
return nil, peer, fmt.Errorf("get closest for address %s, allow upstream %v: %w", addr.String(), allowUpstream, err)
}
peerPO := swarm.Proximity(s.addr.Bytes(), peer.Bytes())
if !sourcePeerAddr.IsZero() {
// is forwarded request
sourceAddrPO := swarm.Proximity(sourcePeerAddr.Bytes(), addr.Bytes())
addrPO := swarm.Proximity(peer.Bytes(), addr.Bytes())
poGain := int(addrPO) - int(sourceAddrPO)
s.metrics.RetrieveChunkPOGainCounter.
WithLabelValues(strconv.Itoa(poGain)).
Inc()
}
// compute the price we pay for this chunk and reserve it for the rest of this function
chunkPrice := s.pricer.PeerPrice(peer, addr)
err = s.accounting.Reserve(ctx, peer, chunkPrice)
......@@ -157,6 +183,7 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPee
s.logger.Tracef("retrieval: requesting chunk %s from peer %s", addr, peer)
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
s.metrics.TotalErrors.Inc()
return nil, peer, fmt.Errorf("new stream: %w", err)
}
defer func() {
......@@ -171,17 +198,25 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPee
if err := w.WriteMsgWithContext(ctx, &pb.Request{
Addr: addr.Bytes(),
}); err != nil {
s.metrics.TotalErrors.Inc()
return nil, peer, fmt.Errorf("write request: %w peer %s", err, peer.String())
}
var d pb.Delivery
if err := r.ReadMsgWithContext(ctx, &d); err != nil {
s.metrics.TotalErrors.Inc()
return nil, peer, fmt.Errorf("read delivery: %w peer %s", err, peer.String())
}
s.metrics.RetrieveChunkPeerPOTimer.
WithLabelValues(strconv.Itoa(int(peerPO))).
Observe(time.Since(startTimer).Seconds())
s.metrics.TotalRetrieved.Inc()
chunk = swarm.NewChunk(addr, d.Data)
if !content.Valid(chunk) {
if !soc.Valid(chunk) {
s.metrics.InvalidChunkRetrieved.Inc()
s.metrics.TotalErrors.Inc()
return nil, peer, swarm.ErrInvalidChunk
}
}
......@@ -191,6 +226,7 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPee
if err != nil {
return nil, peer, err
}
s.metrics.ChunkPrice.Observe(float64(chunkPrice))
return chunk, peer, err
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment