Commit 7db090a5 authored by acud's avatar acud Committed by GitHub

pusher: simplify metrics (#1106)

parent 71182976
...@@ -10,45 +10,57 @@ import ( ...@@ -10,45 +10,57 @@ import (
) )
type metrics struct { type metrics struct {
// all metrics fields must be exported TotalToPush prometheus.Counter
// to be able to return them by Metrics() TotalSynced prometheus.Counter
// using reflection TotalErrors prometheus.Counter
MarkAndSweepTime prometheus.Histogram
TotalChunksToBeSentCounter prometheus.Counter SyncTime prometheus.Histogram
TotalChunksSynced prometheus.Counter ErrorTime prometheus.Histogram
ErrorSettingChunkToSynced prometheus.Counter
MarkAndSweepTimer prometheus.Histogram
} }
func newMetrics() metrics { func newMetrics() metrics {
subsystem := "pushsync" subsystem := "pusher"
return metrics{ return metrics{
TotalChunksToBeSentCounter: prometheus.NewCounter(prometheus.CounterOpts{ TotalToPush: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "total_chunk_to_be_sent", Name: "total_to_push",
Help: "Total chunks to be sent.", Help: "Total chunks to push (chunks may be repeated).",
}), }),
TotalChunksSynced: prometheus.NewCounter(prometheus.CounterOpts{ TotalSynced: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "total_chunk_synced", Name: "total_synced",
Help: "Total chunks synced successfully with valid receipts.", Help: "Total chunks synced successfully with valid receipts.",
}), }),
ErrorSettingChunkToSynced: prometheus.NewCounter(prometheus.CounterOpts{ TotalErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "cannot_set_chunk_sync_in_db", Name: "total_errors",
Help: "Total no of times the chunk cannot be synced in DB.", Help: "Total errors encountered.",
}), }),
MarkAndSweepTimer: prometheus.NewHistogram(prometheus.HistogramOpts{ MarkAndSweepTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "mark_and_sweep_time_histogram", Name: "mark_and_sweep_time",
Help: "Histogram of time spent in mark and sweep.", Help: "Histogram of time spent in mark and sweep.",
Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60}, Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60},
}), }),
SyncTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "sync_time",
Help: "Histogram of time spent to sync a chunk.",
Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60},
}),
ErrorTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "error_time",
Help: "Histogram of time spent before giving up on syncing a chunk.",
Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60},
}),
} }
} }
......
...@@ -96,7 +96,8 @@ LOOP: ...@@ -96,7 +96,8 @@ LOOP:
// postpone a retry only after we've finished processing everything in index // postpone a retry only after we've finished processing everything in index
timer.Reset(retryInterval) timer.Reset(retryInterval)
chunksInBatch++ chunksInBatch++
s.metrics.TotalChunksToBeSentCounter.Inc() s.metrics.TotalToPush.Inc()
select { select {
case sem <- struct{}{}: case sem <- struct{}{}:
case <-s.quit: case <-s.quit:
...@@ -120,11 +121,19 @@ LOOP: ...@@ -120,11 +121,19 @@ LOOP:
mtx.Unlock() mtx.Unlock()
go func(ctx context.Context, ch swarm.Chunk) { go func(ctx context.Context, ch swarm.Chunk) {
var err error var (
err error
startTime = time.Now()
)
defer func() { defer func() {
if err == nil { if err == nil {
s.metrics.TotalSynced.Inc()
s.metrics.SyncTime.Observe(time.Since(startTime).Seconds())
// only print this if there was no error while sending the chunk // only print this if there was no error while sending the chunk
s.logger.Tracef("pusher pushed chunk %s", ch.Address().String()) s.logger.Tracef("pusher pushed chunk %s", ch.Address().String())
} else {
s.metrics.TotalErrors.Inc()
s.metrics.ErrorTime.Observe(time.Since(startTime).Seconds())
} }
mtx.Lock() mtx.Lock()
delete(inflight, ch.Address().String()) delete(inflight, ch.Address().String())
...@@ -145,7 +154,6 @@ LOOP: ...@@ -145,7 +154,6 @@ LOOP:
s.logger.Debugf("pusher: error setting chunk as synced: %v", err) s.logger.Debugf("pusher: error setting chunk as synced: %v", err)
return return
} }
}(ctx, ch) }(ctx, ch)
case <-timer.C: case <-timer.C:
// initially timer is set to go off as well as every time we hit the end of push index // initially timer is set to go off as well as every time we hit the end of push index
...@@ -163,7 +171,7 @@ LOOP: ...@@ -163,7 +171,7 @@ LOOP:
// reset timer to go off after retryInterval // reset timer to go off after retryInterval
timer.Reset(retryInterval) timer.Reset(retryInterval)
s.metrics.MarkAndSweepTimer.Observe(time.Since(startTime).Seconds()) s.metrics.MarkAndSweepTime.Observe(time.Since(startTime).Seconds())
if span != nil { if span != nil {
span.Finish() span.Finish()
...@@ -201,8 +209,8 @@ LOOP: ...@@ -201,8 +209,8 @@ LOOP:
func (s *Service) setChunkAsSynced(ctx context.Context, ch swarm.Chunk) error { func (s *Service) setChunkAsSynced(ctx context.Context, ch swarm.Chunk) error {
if err := s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil { if err := s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil {
s.logger.Errorf("pusher: error setting chunk as synced: %v", err) s.logger.Errorf("pusher: error setting chunk as synced: %v", err)
s.metrics.ErrorSettingChunkToSynced.Inc()
} }
t, err := s.tagg.Get(ch.TagID()) t, err := s.tagg.Get(ch.TagID())
if err == nil && t != nil { if err == nil && t != nil {
err = t.Inc(tags.StateSynced) err = t.Inc(tags.StateSynced)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment