Commit faaa64f9 authored by Alok Nerurkar's avatar Alok Nerurkar Committed by GitHub

feat: add pullstorage metrics (#1913)

parent 1b29e2c9
...@@ -654,6 +654,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -654,6 +654,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
debugAPIService.MustRegisterMetrics(pushSyncProtocol.Metrics()...) debugAPIService.MustRegisterMetrics(pushSyncProtocol.Metrics()...)
debugAPIService.MustRegisterMetrics(pusherService.Metrics()...) debugAPIService.MustRegisterMetrics(pusherService.Metrics()...)
debugAPIService.MustRegisterMetrics(pullSyncProtocol.Metrics()...) debugAPIService.MustRegisterMetrics(pullSyncProtocol.Metrics()...)
debugAPIService.MustRegisterMetrics(pullStorage.Metrics()...)
debugAPIService.MustRegisterMetrics(retrieve.Metrics()...) debugAPIService.MustRegisterMetrics(retrieve.Metrics()...)
if bs, ok := batchStore.(metrics.Collector); ok { if bs, ok := batchStore.(metrics.Collector); ok {
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pullstorage
import (
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type metrics struct {
TotalSubscribePullRequests prometheus.Counter
TotalSubscribePullRequestsComplete prometheus.Counter
SubscribePullsStarted prometheus.Counter
SubscribePullsComplete prometheus.Counter
SubscribePullsFailures prometheus.Counter
}
func newMetrics() metrics {
subsystem := "pullstorage"
return metrics{
TotalSubscribePullRequests: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "subscribe_pull_requests",
Help: "Total subscribe pull requests.",
}),
TotalSubscribePullRequestsComplete: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "subscribe_pull_requests_complete",
Help: "Total subscribe pull requests completed.",
}),
SubscribePullsStarted: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "subscribe_pulls_started",
Help: "Total subscribe pulls started.",
}),
SubscribePullsComplete: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "subscribe_pulls_complete",
Help: "Total subscribe pulls completed.",
}),
SubscribePullsFailures: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "subscribe_pulls_failures",
Help: "Total subscribe pulls failures.",
})}
}
func (s *PullStorer) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}
...@@ -16,7 +16,7 @@ import ( ...@@ -16,7 +16,7 @@ import (
) )
var ( var (
_ Storer = (*ps)(nil) _ Storer = (*PullStorer)(nil)
// ErrDbClosed is used to signal the underlying database was closed // ErrDbClosed is used to signal the underlying database was closed
ErrDbClosed = errors.New("db closed") ErrDbClosed = errors.New("db closed")
...@@ -42,26 +42,30 @@ type Storer interface { ...@@ -42,26 +42,30 @@ type Storer interface {
Has(ctx context.Context, addr swarm.Address) (bool, error) Has(ctx context.Context, addr swarm.Address) (bool, error)
} }
// ps wraps storage.Storer. // PullStorer wraps storage.Storer.
type ps struct { type PullStorer struct {
storage.Storer storage.Storer
intervalsSF singleflight.Group intervalsSF singleflight.Group
metrics metrics
} }
// New returns a new pullstorage Storer instance. // New returns a new pullstorage Storer instance.
func New(storer storage.Storer) Storer { func New(storer storage.Storer) *PullStorer {
return &ps{ return &PullStorer{
Storer: storer, Storer: storer,
metrics: newMetrics(),
} }
} }
// IntervalChunks collects chunk for a requested interval. // IntervalChunks collects chunk for a requested interval.
func (s *ps) IntervalChunks(ctx context.Context, bin uint8, from, to uint64, limit int) (chs []swarm.Address, topmost uint64, err error) { func (s *PullStorer) IntervalChunks(ctx context.Context, bin uint8, from, to uint64, limit int) (chs []swarm.Address, topmost uint64, err error) {
type result struct { type result struct {
chs []swarm.Address chs []swarm.Address
topmost uint64 topmost uint64
} }
s.metrics.TotalSubscribePullRequests.Inc()
defer s.metrics.TotalSubscribePullRequestsComplete.Inc()
v, _, err := s.intervalsSF.Do(ctx, fmt.Sprintf("%v-%v-%v-%v", bin, from, to, limit), func(ctx context.Context) (interface{}, error) { v, _, err := s.intervalsSF.Do(ctx, fmt.Sprintf("%v-%v-%v-%v", bin, from, to, limit), func(ctx context.Context) (interface{}, error) {
// call iterator, iterate either until upper bound or limit reached // call iterator, iterate either until upper bound or limit reached
...@@ -70,12 +74,14 @@ func (s *ps) IntervalChunks(ctx context.Context, bin uint8, from, to uint64, lim ...@@ -70,12 +74,14 @@ func (s *ps) IntervalChunks(ctx context.Context, bin uint8, from, to uint64, lim
timer *time.Timer timer *time.Timer
timerC <-chan time.Time timerC <-chan time.Time
) )
s.metrics.SubscribePullsStarted.Inc()
ch, dbClosed, stop := s.SubscribePull(ctx, bin, from, to) ch, dbClosed, stop := s.SubscribePull(ctx, bin, from, to)
defer func(start time.Time) { defer func(start time.Time) {
stop() stop()
if timer != nil { if timer != nil {
timer.Stop() timer.Stop()
} }
s.metrics.SubscribePullsComplete.Inc()
}(time.Now()) }(time.Now())
var nomore bool var nomore bool
...@@ -129,6 +135,7 @@ func (s *ps) IntervalChunks(ctx context.Context, bin uint8, from, to uint64, lim ...@@ -129,6 +135,7 @@ func (s *ps) IntervalChunks(ctx context.Context, bin uint8, from, to uint64, lim
}) })
if err != nil { if err != nil {
s.metrics.SubscribePullsFailures.Inc()
return nil, 0, err return nil, 0, err
} }
r := v.(*result) r := v.(*result)
...@@ -136,7 +143,7 @@ func (s *ps) IntervalChunks(ctx context.Context, bin uint8, from, to uint64, lim ...@@ -136,7 +143,7 @@ func (s *ps) IntervalChunks(ctx context.Context, bin uint8, from, to uint64, lim
} }
// Cursors gets the last BinID for every bin in the local storage // Cursors gets the last BinID for every bin in the local storage
func (s *ps) Cursors(ctx context.Context) (curs []uint64, err error) { func (s *PullStorer) Cursors(ctx context.Context) (curs []uint64, err error) {
curs = make([]uint64, swarm.MaxBins) curs = make([]uint64, swarm.MaxBins)
for i := uint8(0); i < swarm.MaxBins; i++ { for i := uint8(0); i < swarm.MaxBins; i++ {
binID, err := s.Storer.LastPullSubscriptionBinID(i) binID, err := s.Storer.LastPullSubscriptionBinID(i)
...@@ -149,12 +156,12 @@ func (s *ps) Cursors(ctx context.Context) (curs []uint64, err error) { ...@@ -149,12 +156,12 @@ func (s *ps) Cursors(ctx context.Context) (curs []uint64, err error) {
} }
// Get chunks. // Get chunks.
func (s *ps) Get(ctx context.Context, mode storage.ModeGet, addrs ...swarm.Address) ([]swarm.Chunk, error) { func (s *PullStorer) Get(ctx context.Context, mode storage.ModeGet, addrs ...swarm.Address) ([]swarm.Chunk, error) {
return s.Storer.GetMulti(ctx, mode, addrs...) return s.Storer.GetMulti(ctx, mode, addrs...)
} }
// Put chunks. // Put chunks.
func (s *ps) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) error { func (s *PullStorer) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) error {
_, err := s.Storer.Put(ctx, mode, chs...) _, err := s.Storer.Put(ctx, mode, chs...)
return err return err
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment