Commit dee4a635 authored by Zahoor Mohamed's avatar Zahoor Mohamed Committed by GitHub

P7 - All bee metrics to locatore (#83)

* added metrics to localstore
Co-authored-by: default avatarJanos Guljas <janos@resenje.org>
parent 26eea2bd
...@@ -20,7 +20,6 @@ import ( ...@@ -20,7 +20,6 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
...@@ -78,12 +77,11 @@ func (db *DB) collectGarbageWorker() { ...@@ -78,12 +77,11 @@ func (db *DB) collectGarbageWorker() {
// the rest of the garbage as the batch size limit is reached. // the rest of the garbage as the batch size limit is reached.
// This function is called in collectGarbageWorker. // This function is called in collectGarbageWorker.
func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
metricName := "localstore/gc" db.metrics.GCCounter.Inc()
metrics.GetOrRegisterCounter(metricName, nil).Inc(1) defer totalTimeMetric(db.metrics.TotalTimeCollectGarbage, time.Now())
defer totalTimeMetric(metricName, time.Now())
defer func() { defer func() {
if err != nil { if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1) db.metrics.GCErrorCounter.Inc()
} }
}() }()
...@@ -106,7 +104,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { ...@@ -106,7 +104,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
if err != nil { if err != nil {
return 0, true, err return 0, true, err
} }
metrics.GetOrRegisterGauge(metricName+"/gcsize", nil).Update(int64(gcSize)) db.metrics.GCSize.Inc()
done = true done = true
err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
...@@ -114,8 +112,8 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { ...@@ -114,8 +112,8 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
return true, nil return true, nil
} }
metrics.GetOrRegisterGauge(metricName+"/storets", nil).Update(item.StoreTimestamp) db.metrics.GCStoreTimeStamps.Set(float64(item.StoreTimestamp))
metrics.GetOrRegisterGauge(metricName+"/accessts", nil).Update(item.AccessTimestamp) db.metrics.GCStoreAccessTimeStamps.Set(float64(item.AccessTimestamp))
// delete from retrieve, pull, gc // delete from retrieve, pull, gc
err = db.retrievalDataIndex.DeleteInBatch(batch, item) err = db.retrievalDataIndex.DeleteInBatch(batch, item)
...@@ -146,13 +144,13 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { ...@@ -146,13 +144,13 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
if err != nil { if err != nil {
return 0, false, err return 0, false, err
} }
metrics.GetOrRegisterCounter(metricName+"/collected-count", nil).Inc(int64(collectedCount)) db.metrics.GCCollectedCounter.Inc()
db.gcSize.PutInBatch(batch, gcSize-collectedCount) db.gcSize.PutInBatch(batch, gcSize-collectedCount)
err = db.shed.WriteBatch(batch) err = db.shed.WriteBatch(batch)
if err != nil { if err != nil {
metrics.GetOrRegisterCounter(metricName+"/writebatch/err", nil).Inc(1) db.metrics.GCExcludeWriteBatchError.Inc()
return 0, false, err return 0, false, err
} }
return collectedCount, done, nil return collectedCount, done, nil
...@@ -160,12 +158,11 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { ...@@ -160,12 +158,11 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
// removeChunksInExcludeIndexFromGC removed any recently chunks in the exclude Index, from the gcIndex. // removeChunksInExcludeIndexFromGC removed any recently chunks in the exclude Index, from the gcIndex.
func (db *DB) removeChunksInExcludeIndexFromGC() (err error) { func (db *DB) removeChunksInExcludeIndexFromGC() (err error) {
metricName := "localstore/gc/exclude" db.metrics.GCExcludeCounter.Inc()
metrics.GetOrRegisterCounter(metricName, nil).Inc(1) defer totalTimeMetric(db.metrics.TotalTimeGCExclude, time.Now())
defer totalTimeMetric(metricName, time.Now())
defer func() { defer func() {
if err != nil { if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1) db.metrics.GCExcludeError.Inc()
} }
}() }()
...@@ -219,10 +216,10 @@ func (db *DB) removeChunksInExcludeIndexFromGC() (err error) { ...@@ -219,10 +216,10 @@ func (db *DB) removeChunksInExcludeIndexFromGC() (err error) {
return err return err
} }
metrics.GetOrRegisterCounter(metricName+"/excluded-count", nil).Inc(int64(excludedCount)) db.metrics.GCExcludeCounter.Inc()
err = db.shed.WriteBatch(batch) err = db.shed.WriteBatch(batch)
if err != nil { if err != nil {
metrics.GetOrRegisterCounter(metricName+"/writebatch/err", nil).Inc(1) db.metrics.GCExcludeWriteBatchError.Inc()
return err return err
} }
......
...@@ -19,13 +19,13 @@ package localstore ...@@ -19,13 +19,13 @@ package localstore
import ( import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"github.com/prometheus/client_golang/prometheus"
"os" "os"
"runtime/pprof" "runtime/pprof"
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
"github.com/ethersphere/swarm/storage/mock" "github.com/ethersphere/swarm/storage/mock"
...@@ -126,6 +126,8 @@ type DB struct { ...@@ -126,6 +126,8 @@ type DB struct {
// underlaying LevelDB to prevent possible panics from // underlaying LevelDB to prevent possible panics from
// iterators // iterators
subscritionsWG sync.WaitGroup subscritionsWG sync.WaitGroup
metrics metrics
} }
// Options struct holds optional parameters for configuring DB. // Options struct holds optional parameters for configuring DB.
...@@ -175,6 +177,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { ...@@ -175,6 +177,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
close: make(chan struct{}), close: make(chan struct{}),
collectGarbageWorkerDone: make(chan struct{}), collectGarbageWorkerDone: make(chan struct{}),
putToGCCheck: o.PutToGCCheck, putToGCCheck: o.PutToGCCheck,
metrics: newMetrics(),
} }
if db.capacity == 0 { if db.capacity == 0 {
db.capacity = defaultCapacity db.capacity = defaultCapacity
...@@ -542,7 +545,8 @@ func init() { ...@@ -542,7 +545,8 @@ func init() {
// totalTimeMetric logs a message about time between provided start time // totalTimeMetric logs a message about time between provided start time
// and the time when the function is called and sends a resetting timer metric // and the time when the function is called and sends a resetting timer metric
// with provided name appended with ".total-time". // with provided name appended with ".total-time".
func totalTimeMetric(name string, start time.Time) { func totalTimeMetric(metric prometheus.Counter, start time.Time) {
totalTime := time.Since(start) totalTime := time.Since(start)
metrics.GetOrRegisterResettingTimer(name+"/total-time", nil).Update(totalTime) metric.Add(float64(totalTime))
} }
This diff is collapsed.
...@@ -18,11 +18,9 @@ package localstore ...@@ -18,11 +18,9 @@ package localstore
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
...@@ -34,14 +32,12 @@ import ( ...@@ -34,14 +32,12 @@ import (
// Getter Mode. Get is required to implement chunk.Store // Getter Mode. Get is required to implement chunk.Store
// interface. // interface.
func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) { func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
metricName := fmt.Sprintf("localstore/Get/%s", mode) db.metrics.ModeGet.Inc()
defer totalTimeMetric(db.metrics.TotalTimeGet, time.Now())
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())
defer func() { defer func() {
if err != nil { if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1) db.metrics.ModeGetFailure.Inc()
} }
}() }()
...@@ -103,14 +99,13 @@ func (db *DB) updateGCItems(items ...shed.Item) { ...@@ -103,14 +99,13 @@ func (db *DB) updateGCItems(items ...shed.Item) {
defer func() { <-db.updateGCSem }() defer func() { <-db.updateGCSem }()
} }
metricName := "localstore/updateGC" db.metrics.GCUpdate.Inc()
metrics.GetOrRegisterCounter(metricName, nil).Inc(1) defer totalTimeMetric(db.metrics.TotalTimeUpdateGC, time.Now())
defer totalTimeMetric(metricName, time.Now())
for _, item := range items { for _, item := range items {
err := db.updateGC(item) err := db.updateGC(item)
if err != nil { if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1) db.metrics.GCUpdateError.Inc()
log.Error("localstore update gc", "err", err) log.Error("localstore update gc", "err", err)
} }
} }
......
...@@ -18,10 +18,8 @@ package localstore ...@@ -18,10 +18,8 @@ package localstore
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
...@@ -32,14 +30,12 @@ import ( ...@@ -32,14 +30,12 @@ import (
// required by the Getter Mode. GetMulti is required to implement chunk.Store // required by the Getter Mode. GetMulti is required to implement chunk.Store
// interface. // interface.
func (db *DB) GetMulti(ctx context.Context, mode chunk.ModeGet, addrs ...chunk.Address) (chunks []chunk.Chunk, err error) { func (db *DB) GetMulti(ctx context.Context, mode chunk.ModeGet, addrs ...chunk.Address) (chunks []chunk.Chunk, err error) {
metricName := fmt.Sprintf("localstore/GetMulti/%s", mode) db.metrics.ModeGetMulti.Inc()
defer totalTimeMetric(db.metrics.TotalTimeGetMulti, time.Now())
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())
defer func() { defer func() {
if err != nil { if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1) db.metrics.ModeGetMultiFailure.Inc()
} }
}() }()
......
...@@ -20,20 +20,18 @@ import ( ...@@ -20,20 +20,18 @@ import (
"context" "context"
"time" "time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
) )
// Has returns true if the chunk is stored in database. // Has returns true if the chunk is stored in database.
func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) { func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) {
metricName := "localstore/Has"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1) db.metrics.ModeHas.Inc()
defer totalTimeMetric(metricName, time.Now()) defer totalTimeMetric(db.metrics.TotalTimeHas, time.Now())
has, err := db.retrievalDataIndex.Has(addressToItem(addr)) has, err := db.retrievalDataIndex.Has(addressToItem(addr))
if err != nil { if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1) db.metrics.ModeHasFailure.Inc()
} }
return has, err return has, err
} }
...@@ -41,14 +39,13 @@ func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) { ...@@ -41,14 +39,13 @@ func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) {
// HasMulti returns a slice of booleans which represent if the provided chunks // HasMulti returns a slice of booleans which represent if the provided chunks
// are stored in database. // are stored in database.
func (db *DB) HasMulti(ctx context.Context, addrs ...chunk.Address) ([]bool, error) { func (db *DB) HasMulti(ctx context.Context, addrs ...chunk.Address) ([]bool, error) {
metricName := "localstore/HasMulti"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1) db.metrics.ModeHasMulti.Inc()
defer totalTimeMetric(metricName, time.Now()) defer totalTimeMetric(db.metrics.TotalTimeHasMulti, time.Now())
have, err := db.retrievalDataIndex.HasMulti(addressesToItems(addrs...)...) have, err := db.retrievalDataIndex.HasMulti(addressesToItems(addrs...)...)
if err != nil { if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1) db.metrics.ModeHasMultiFailure.Inc()
} }
return have, err return have, err
} }
...@@ -19,10 +19,8 @@ package localstore ...@@ -19,10 +19,8 @@ package localstore
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"time" "time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
...@@ -33,14 +31,13 @@ import ( ...@@ -33,14 +31,13 @@ import (
// Put is required to implement chunk.Store // Put is required to implement chunk.Store
// interface. // interface.
func (db *DB) Put(ctx context.Context, mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err error) { func (db *DB) Put(ctx context.Context, mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err error) {
metricName := fmt.Sprintf("localstore/Put/%s", mode)
metrics.GetOrRegisterCounter(metricName, nil).Inc(1) db.metrics.ModePut.Inc()
defer totalTimeMetric(metricName, time.Now()) defer totalTimeMetric(db.metrics.TotalTimePut, time.Now())
exist, err = db.put(mode, chs...) exist, err = db.put(mode, chs...)
if err != nil { if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1) db.metrics.ModePutFailure.Inc()
} }
return exist, err return exist, err
......
...@@ -19,10 +19,8 @@ package localstore ...@@ -19,10 +19,8 @@ package localstore
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"time" "time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/log"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
...@@ -33,13 +31,11 @@ import ( ...@@ -33,13 +31,11 @@ import (
// Set is required to implement chunk.Store // Set is required to implement chunk.Store
// interface. // interface.
func (db *DB) Set(ctx context.Context, mode chunk.ModeSet, addrs ...chunk.Address) (err error) { func (db *DB) Set(ctx context.Context, mode chunk.ModeSet, addrs ...chunk.Address) (err error) {
metricName := fmt.Sprintf("localstore/Set/%s", mode) db.metrics.ModePut.Inc()
defer totalTimeMetric(db.metrics.TotalTimeSet, time.Now())
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())
err = db.set(mode, addrs...) err = db.set(mode, addrs...)
if err != nil { if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1) db.metrics.ModePutFailure.Inc()
} }
return err return err
} }
......
...@@ -23,7 +23,6 @@ import ( ...@@ -23,7 +23,6 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
...@@ -38,8 +37,7 @@ import ( ...@@ -38,8 +37,7 @@ import (
// Make sure that you check the second returned parameter from the channel to stop iteration when its value // Make sure that you check the second returned parameter from the channel to stop iteration when its value
// is false. // is false.
func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) { func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) {
metricName := "localstore/SubscribePull" db.metrics.SubscribePull.Inc()
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
chunkDescriptors := make(chan chunk.Descriptor) chunkDescriptors := make(chan chunk.Descriptor)
trigger := make(chan struct{}, 1) trigger := make(chan struct{}, 1)
...@@ -64,7 +62,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) ...@@ -64,7 +62,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
db.subscritionsWG.Add(1) db.subscritionsWG.Add(1)
go func() { go func() {
defer db.subscritionsWG.Done() defer db.subscritionsWG.Done()
defer metrics.GetOrRegisterCounter(metricName+"/stop", nil).Inc(1) db.metrics.SubscribePullStop.Inc()
// close the returned chunk.Descriptor channel at the end to // close the returned chunk.Descriptor channel at the end to
// signal that the subscription is done // signal that the subscription is done
defer close(chunkDescriptors) defer close(chunkDescriptors)
...@@ -85,7 +83,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) ...@@ -85,7 +83,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
// - last index Item is reached // - last index Item is reached
// - subscription stop is called // - subscription stop is called
// - context is done // - context is done
metrics.GetOrRegisterCounter(metricName+"/iter", nil).Inc(1) db.metrics.SubscribePullIteration.Inc()
iterStart := time.Now() iterStart := time.Now()
var count int var count int
...@@ -129,7 +127,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) ...@@ -129,7 +127,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
Prefix: []byte{bin}, Prefix: []byte{bin},
}) })
totalTimeMetric(metricName+"/iter", iterStart) totalTimeMetric(db.metrics.TotalTimeSubscribePullIteration, iterStart)
if err != nil { if err != nil {
if err == errStopSubscription { if err == errStopSubscription {
...@@ -137,7 +135,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) ...@@ -137,7 +135,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
// if until is reached // if until is reached
return return
} }
metrics.GetOrRegisterCounter(metricName+"/iter/error", nil).Inc(1) db.metrics.SubscribePullIterationFailure.Inc()
log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err) log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
return return
} }
...@@ -185,7 +183,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) ...@@ -185,7 +183,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
// in pull syncing index for a provided bin. If there are no chunks in // in pull syncing index for a provided bin. If there are no chunks in
// that bin, 0 value is returned. // that bin, 0 value is returned.
func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) { func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
metrics.GetOrRegisterCounter("localstore/LastPullSubscriptionBinID", nil).Inc(1) db.metrics.LastPullSubscriptionBinID.Inc()
item, err := db.pullIndex.Last([]byte{bin}) item, err := db.pullIndex.Last([]byte{bin})
if err != nil { if err != nil {
......
...@@ -23,7 +23,6 @@ import ( ...@@ -23,7 +23,6 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
) )
...@@ -33,8 +32,7 @@ import ( ...@@ -33,8 +32,7 @@ import (
// the returned channel without any errors. Make sure that you check the second returned parameter // the returned channel without any errors. Make sure that you check the second returned parameter
// from the channel to stop iteration when its value is false. // from the channel to stop iteration when its value is false.
func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop func()) { func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop func()) {
metricName := "localstore/SubscribePush" db.metrics.SubscribePush.Inc()
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
chunks := make(chan chunk.Chunk) chunks := make(chan chunk.Chunk)
trigger := make(chan struct{}, 1) trigger := make(chan struct{}, 1)
...@@ -52,7 +50,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun ...@@ -52,7 +50,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
db.subscritionsWG.Add(1) db.subscritionsWG.Add(1)
go func() { go func() {
defer db.subscritionsWG.Done() defer db.subscritionsWG.Done()
defer metrics.GetOrRegisterCounter(metricName+"/done", nil).Inc(1) db.metrics.SubscribePushIterationDone.Inc()
// close the returned chunkInfo channel at the end to // close the returned chunkInfo channel at the end to
// signal that the subscription is done // signal that the subscription is done
defer close(chunks) defer close(chunks)
...@@ -65,8 +63,8 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun ...@@ -65,8 +63,8 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
// iterate until: // iterate until:
// - last index Item is reached // - last index Item is reached
// - subscription stop is called // - subscription stop is called
// - context is done // - context is done.met
metrics.GetOrRegisterCounter(metricName+"/iter", nil).Inc(1) db.metrics.SubscribePushIteration.Inc()
iterStart := time.Now() iterStart := time.Now()
var count int var count int
...@@ -103,10 +101,10 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun ...@@ -103,10 +101,10 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
SkipStartFromItem: true, SkipStartFromItem: true,
}) })
totalTimeMetric(metricName+"/iter", iterStart) totalTimeMetric(db.metrics.TotalTimeSubscribePushIteration, iterStart)
if err != nil { if err != nil {
metrics.GetOrRegisterCounter(metricName+"/iter/error", nil).Inc(1) db.metrics.SubscribePushIterationFailure.Inc()
log.Error("localstore push subscription iteration", "err", err) log.Error("localstore push subscription iteration", "err", err)
return return
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment