Commit 5fb06739 authored by acud's avatar acud Committed by GitHub

feat: add listener metrics (#1786)

parent dcfaea41
...@@ -342,6 +342,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -342,6 +342,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
var ( var (
postageContractService postagecontract.Interface postageContractService postagecontract.Interface
batchSvc postage.EventUpdater batchSvc postage.EventUpdater
eventListener postage.Listener
) )
var postageSyncStart uint64 = 0 var postageSyncStart uint64 = 0
...@@ -359,7 +360,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -359,7 +360,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
postageSyncStart = startBlock postageSyncStart = startBlock
} }
eventListener := listener.New(logger, swapBackend, postageContractAddress, o.BlockTime) eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime)
b.listenerCloser = eventListener b.listenerCloser = eventListener
batchSvc = batchservice.New(batchStore, logger, eventListener) batchSvc = batchservice.New(batchStore, logger, eventListener)
...@@ -642,6 +643,12 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -642,6 +643,12 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
debugAPIService.MustRegisterMetrics(bs.Metrics()...) debugAPIService.MustRegisterMetrics(bs.Metrics()...)
} }
if eventListener != nil {
if ls, ok := eventListener.(metrics.Collector); ok {
debugAPIService.MustRegisterMetrics(ls.Metrics()...)
}
}
if pssServiceMetrics, ok := pssService.(metrics.Collector); ok { if pssServiceMetrics, ok := pssService.(metrics.Collector); ok {
debugAPIService.MustRegisterMetrics(pssServiceMetrics.Metrics()...) debugAPIService.MustRegisterMetrics(pssServiceMetrics.Metrics()...)
} }
......
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"github.com/ethersphere/bee/pkg/postage" "github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/settlement/swap/transaction" "github.com/ethersphere/bee/pkg/settlement/swap/transaction"
"github.com/ethersphere/go-storage-incentives-abi/postageabi" "github.com/ethersphere/go-storage-incentives-abi/postageabi"
"github.com/prometheus/client_golang/prometheus"
) )
const ( const (
...@@ -54,6 +55,7 @@ type listener struct { ...@@ -54,6 +55,7 @@ type listener struct {
postageStampAddress common.Address postageStampAddress common.Address
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
metrics metrics
} }
func New( func New(
...@@ -68,6 +70,7 @@ func New( ...@@ -68,6 +70,7 @@ func New(
blockTime: blockTime, blockTime: blockTime,
postageStampAddress: postageStampAddress, postageStampAddress: postageStampAddress,
quit: make(chan struct{}), quit: make(chan struct{}),
metrics: newMetrics(),
} }
} }
...@@ -90,6 +93,7 @@ func (l *listener) filterQuery(from, to *big.Int) ethereum.FilterQuery { ...@@ -90,6 +93,7 @@ func (l *listener) filterQuery(from, to *big.Int) ethereum.FilterQuery {
} }
func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error { func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error {
defer l.metrics.EventsProcessed.Inc()
switch e.Topics[0] { switch e.Topics[0] {
case batchCreatedTopic: case batchCreatedTopic:
c := &batchCreatedEvent{} c := &batchCreatedEvent{}
...@@ -97,6 +101,7 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error ...@@ -97,6 +101,7 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error
if err != nil { if err != nil {
return err return err
} }
l.metrics.CreatedCounter.Inc()
return updater.Create( return updater.Create(
c.BatchId[:], c.BatchId[:],
c.Owner.Bytes(), c.Owner.Bytes(),
...@@ -109,6 +114,7 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error ...@@ -109,6 +114,7 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error
if err != nil { if err != nil {
return err return err
} }
l.metrics.TopupCounter.Inc()
return updater.TopUp( return updater.TopUp(
c.BatchId[:], c.BatchId[:],
c.NormalisedBalance, c.NormalisedBalance,
...@@ -119,6 +125,7 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error ...@@ -119,6 +125,7 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error
if err != nil { if err != nil {
return err return err
} }
l.metrics.DepthCounter.Inc()
return updater.UpdateDepth( return updater.UpdateDepth(
c.BatchId[:], c.BatchId[:],
c.NewDepth, c.NewDepth,
...@@ -130,10 +137,12 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error ...@@ -130,10 +137,12 @@ func (l *listener) processEvent(e types.Log, updater postage.EventUpdater) error
if err != nil { if err != nil {
return err return err
} }
l.metrics.PriceCounter.Inc()
return updater.UpdatePrice( return updater.UpdatePrice(
c.Price, c.Price,
) )
default: default:
l.metrics.EventErrors.Inc()
return errors.New("unknown event") return errors.New("unknown event")
} }
} }
...@@ -163,8 +172,12 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru ...@@ -163,8 +172,12 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru
case <-l.quit: case <-l.quit:
return nil return nil
} }
start := time.Now()
l.metrics.BackendCalls.Inc()
to, err := l.ev.BlockNumber(ctx) to, err := l.ev.BlockNumber(ctx)
if err != nil { if err != nil {
l.metrics.BackendErrors.Inc()
return err return err
} }
...@@ -188,13 +201,16 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru ...@@ -188,13 +201,16 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru
} else { } else {
closeOnce.Do(func() { close(synced) }) closeOnce.Do(func() { close(synced) })
} }
l.metrics.BackendCalls.Inc()
events, err := l.ev.FilterLogs(ctx, l.filterQuery(big.NewInt(int64(from)), big.NewInt(int64(to)))) events, err := l.ev.FilterLogs(ctx, l.filterQuery(big.NewInt(int64(from)), big.NewInt(int64(to))))
if err != nil { if err != nil {
l.metrics.BackendErrors.Inc()
return err return err
} }
for _, e := range events { for _, e := range events {
startEv := time.Now()
err = updater.UpdateBlockNumber(e.BlockNumber) err = updater.UpdateBlockNumber(e.BlockNumber)
if err != nil { if err != nil {
return err return err
...@@ -202,6 +218,7 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru ...@@ -202,6 +218,7 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru
if err = l.processEvent(e, updater); err != nil { if err = l.processEvent(e, updater); err != nil {
return err return err
} }
totalTimeMetric(l.metrics.EventProcessDuration, startEv)
} }
err = updater.UpdateBlockNumber(to) err = updater.UpdateBlockNumber(to)
...@@ -210,6 +227,8 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru ...@@ -210,6 +227,8 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru
} }
from = to + 1 from = to + 1
totalTimeMetric(l.metrics.PageProcessDuration, start)
l.metrics.PagesProcessed.Inc()
} }
} }
...@@ -285,3 +304,8 @@ func DiscoverAddresses(chainID int64) (postageStamp common.Address, startBlock u ...@@ -285,3 +304,8 @@ func DiscoverAddresses(chainID int64) (postageStamp common.Address, startBlock u
} }
return common.Address{}, 0, false return common.Address{}, 0, false
} }
func totalTimeMetric(metric prometheus.Counter, start time.Time) {
totalTime := time.Since(start)
metric.Add(float64(totalTime))
}
package listener
import (
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type metrics struct {
// aggregate events handled
EventsProcessed prometheus.Counter
EventErrors prometheus.Counter
PagesProcessed prometheus.Counter
// individual event counters
CreatedCounter prometheus.Counter
TopupCounter prometheus.Counter
DepthCounter prometheus.Counter
PriceCounter prometheus.Counter
// total calls to chain backend
BackendCalls prometheus.Counter
BackendErrors prometheus.Counter
// processing durations
PageProcessDuration prometheus.Counter
EventProcessDuration prometheus.Counter
}
func newMetrics() metrics {
subsystem := "postage_listener"
return metrics{
// aggregate events handled
EventsProcessed: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "events_processed",
Help: "total events processed",
}),
EventErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "event_errors",
Help: "total event errors while processing",
}),
PagesProcessed: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "pages_processed",
Help: "total pages processed",
}),
// individual event counters
CreatedCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "created_events",
Help: "total batch created events processed",
}),
TopupCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "topup_events",
Help: "total batch topup events handled",
}),
DepthCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "depth_events",
Help: "total batch depth change events handled",
}),
PriceCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "price_events",
Help: "total price change events handled",
}),
// total call
BackendCalls: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "backend_calls",
Help: "total chain backend calls",
}),
BackendErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "backend_errors",
Help: "total chain backend errors",
}),
// processing durations
PageProcessDuration: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "page_duration",
Help: "how long it took to process a page",
}),
EventProcessDuration: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "event_duration",
Help: "how long it took to process a single event",
}),
}
}
func (s *listener) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment