Commit 9443cf68 authored by acud's avatar acud Committed by GitHub

pullsync, puller: add metrics (#424)

* puller, pullsync: add metrics
parent 3748f52d
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package puller
import (
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type metrics struct {
HistWorkerIterCounter prometheus.Counter // counts the number of historical syncing iterations
HistWorkerDoneCounter prometheus.Counter // count number of finished historical syncing jobs
HistWorkerErrCounter prometheus.Counter // count number of errors
LiveWorkerIterCounter prometheus.Counter // counts the number of live syncing iterations
LiveWorkerErrCounter prometheus.Counter // count number of errors
}
func newMetrics() metrics {
subsystem := "puller"
return metrics{
HistWorkerIterCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "hist_worker_iterations",
Help: "Total history worker iterations.",
}),
HistWorkerDoneCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "hist_worker_done",
Help: "Total history worker jobs done.",
}),
HistWorkerErrCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "hist_worker_errors",
Help: "Total history worker errors.",
}),
LiveWorkerIterCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "live_worker_iterations",
Help: "Total live worker iterations.",
}),
LiveWorkerErrCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "live_worker_errors",
Help: "Total live worker errors.",
}),
}
}
func (s *Puller) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}
...@@ -41,6 +41,7 @@ type Puller struct { ...@@ -41,6 +41,7 @@ type Puller struct {
intervalMtx sync.Mutex intervalMtx sync.Mutex
syncer pullsync.Interface syncer pullsync.Interface
metrics metrics
logger logging.Logger logger logging.Logger
syncPeers []map[string]*syncPeer // index is bin, map key is peer address syncPeers []map[string]*syncPeer // index is bin, map key is peer address
...@@ -72,6 +73,7 @@ func New(o Options) *Puller { ...@@ -72,6 +73,7 @@ func New(o Options) *Puller {
statestore: o.StateStore, statestore: o.StateStore,
topology: o.Topology, topology: o.Topology,
syncer: o.PullSync, syncer: o.PullSync,
metrics: newMetrics(),
logger: o.Logger, logger: o.Logger,
cursors: make(map[string][]uint64), cursors: make(map[string][]uint64),
...@@ -363,11 +365,15 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) ...@@ -363,11 +365,15 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
} }
func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) { func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) {
defer p.wg.Done() defer func() {
p.wg.Done()
p.metrics.HistWorkerDoneCounter.Inc()
}()
if logMore { if logMore {
p.logger.Tracef("histSyncWorker starting, peer %s bin %d cursor %d", peer, bin, cur) p.logger.Tracef("histSyncWorker starting, peer %s bin %d cursor %d", peer, bin, cur)
} }
for { for {
p.metrics.HistWorkerIterCounter.Inc()
select { select {
case <-p.quit: case <-p.quit:
if logMore { if logMore {
...@@ -384,6 +390,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin ...@@ -384,6 +390,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin
s, _, _, err := p.nextPeerInterval(peer, bin) s, _, _, err := p.nextPeerInterval(peer, bin)
if err != nil { if err != nil {
p.metrics.HistWorkerErrCounter.Inc()
p.logger.Debugf("histSyncWorker nextPeerInterval: %v", err) p.logger.Debugf("histSyncWorker nextPeerInterval: %v", err)
return return
} }
...@@ -399,6 +406,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin ...@@ -399,6 +406,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin
p.logger.Debugf("histSyncWorker error syncing interval. peer %s, bin %d, cursor %d, err %v", peer.String(), bin, cur, err) p.logger.Debugf("histSyncWorker error syncing interval. peer %s, bin %d, cursor %d, err %v", peer.String(), bin, cur, err)
} }
if ruid == 0 { if ruid == 0 {
p.metrics.HistWorkerErrCounter.Inc()
return return
} }
if err := p.syncer.CancelRuid(peer, ruid); err != nil && logMore { if err := p.syncer.CancelRuid(peer, ruid); err != nil && logMore {
...@@ -408,6 +416,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin ...@@ -408,6 +416,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin
} }
err = p.addPeerInterval(peer, bin, s, top) err = p.addPeerInterval(peer, bin, s, top)
if err != nil { if err != nil {
p.metrics.HistWorkerErrCounter.Inc()
p.logger.Errorf("error persisting interval for peer, quitting") p.logger.Errorf("error persisting interval for peer, quitting")
return return
} }
...@@ -421,6 +430,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin ...@@ -421,6 +430,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin
} }
from := cur + 1 from := cur + 1
for { for {
p.metrics.LiveWorkerIterCounter.Inc()
select { select {
case <-p.quit: case <-p.quit:
if logMore { if logMore {
...@@ -440,6 +450,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin ...@@ -440,6 +450,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin
p.logger.Debugf("liveSyncWorker exit on sync error. peer %s bin %d from %d err %v", peer, bin, from, err) p.logger.Debugf("liveSyncWorker exit on sync error. peer %s bin %d from %d err %v", peer, bin, from, err)
} }
if ruid == 0 { if ruid == 0 {
p.metrics.LiveWorkerErrCounter.Inc()
return return
} }
if err := p.syncer.CancelRuid(peer, ruid); err != nil && logMore { if err := p.syncer.CancelRuid(peer, ruid); err != nil && logMore {
...@@ -452,6 +463,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin ...@@ -452,6 +463,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin
} }
err = p.addPeerInterval(peer, bin, from, top) err = p.addPeerInterval(peer, bin, from, top)
if err != nil { if err != nil {
p.metrics.LiveWorkerErrCounter.Inc()
p.logger.Errorf("liveSyncWorker exit on add peer interval. peer %s bin %d from %d err %v", peer, bin, from, err) p.logger.Errorf("liveSyncWorker exit on add peer interval. peer %s bin %d from %d err %v", peer, bin, from, err)
return return
} }
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pullsync
import (
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type metrics struct {
OfferCounter prometheus.Counter // number of chunks offered
WantCounter prometheus.Counter // number of chunks wanted
DeliveryCounter prometheus.Counter // number of chunk deliveries
DbOpsCounter prometheus.Counter // number of db ops
}
func newMetrics() metrics {
subsystem := "pullsync"
return metrics{
OfferCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunks_offered",
Help: "Total chunks offered.",
}),
WantCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunks_wanted",
Help: "Total chunks wanted.",
}),
DeliveryCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunks_delivered",
Help: "Total chunks delivered.",
}),
DbOpsCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "db_ops",
Help: "Total Db Ops.",
})}
}
func (s *Syncer) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics)
}
...@@ -35,6 +35,8 @@ const ( ...@@ -35,6 +35,8 @@ const (
var ( var (
ErrUnsolicitedChunk = errors.New("peer sent unsolicited chunk") ErrUnsolicitedChunk = errors.New("peer sent unsolicited chunk")
cancellationTimeout = 5 * time.Second // explicit ruid cancellation message timeout
) )
// how many maximum chunks in a batch // how many maximum chunks in a batch
...@@ -48,6 +50,7 @@ type Interface interface { ...@@ -48,6 +50,7 @@ type Interface interface {
type Syncer struct { type Syncer struct {
streamer p2p.Streamer streamer p2p.Streamer
metrics metrics
logger logging.Logger logger logging.Logger
storage pullstorage.Storer storage pullstorage.Storer
quit chan struct{} quit chan struct{}
...@@ -71,6 +74,7 @@ func New(o Options) *Syncer { ...@@ -71,6 +74,7 @@ func New(o Options) *Syncer {
return &Syncer{ return &Syncer{
streamer: o.Streamer, streamer: o.Streamer,
storage: o.Storage, storage: o.Storage,
metrics: newMetrics(),
logger: o.Logger, logger: o.Logger,
ruidCtx: make(map[uint32]func()), ruidCtx: make(map[uint32]func()),
wg: sync.WaitGroup{}, wg: sync.WaitGroup{},
...@@ -163,6 +167,8 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 ...@@ -163,6 +167,8 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
s.logger.Errorf("syncer got a zero address hash on offer") s.logger.Errorf("syncer got a zero address hash on offer")
return 0, ru.Ruid, fmt.Errorf("zero address on offer") return 0, ru.Ruid, fmt.Errorf("zero address on offer")
} }
s.metrics.OfferCounter.Inc()
s.metrics.DbOpsCounter.Inc()
have, err := s.storage.Has(ctx, a) have, err := s.storage.Has(ctx, a)
if err != nil { if err != nil {
return 0, ru.Ruid, fmt.Errorf("storage has: %w", err) return 0, ru.Ruid, fmt.Errorf("storage has: %w", err)
...@@ -170,6 +176,7 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 ...@@ -170,6 +176,7 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
if !have { if !have {
wantChunks[a.String()] = struct{}{} wantChunks[a.String()] = struct{}{}
ctr++ ctr++
s.metrics.WantCounter.Inc()
bv.Set(i / swarm.HashSize) bv.Set(i / swarm.HashSize)
} }
} }
...@@ -196,7 +203,8 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 ...@@ -196,7 +203,8 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
} }
delete(wantChunks, addr.String()) delete(wantChunks, addr.String())
s.metrics.DbOpsCounter.Inc()
s.metrics.DeliveryCounter.Inc()
if err = s.storage.Put(ctx, storage.ModePutSync, swarm.NewChunk(addr, delivery.Data)); err != nil { if err = s.storage.Put(ctx, storage.ModePutSync, swarm.NewChunk(addr, delivery.Data)); err != nil {
return 0, ru.Ruid, fmt.Errorf("delivery put: %w", err) return 0, ru.Ruid, fmt.Errorf("delivery put: %w", err)
} }
...@@ -295,6 +303,7 @@ func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) err ...@@ -295,6 +303,7 @@ func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) err
} }
func (s *Syncer) setChunks(ctx context.Context, addrs ...swarm.Address) error { func (s *Syncer) setChunks(ctx context.Context, addrs ...swarm.Address) error {
s.metrics.DbOpsCounter.Inc()
return s.storage.Set(ctx, storage.ModeSetSyncPull, addrs...) return s.storage.Set(ctx, storage.ModeSetSyncPull, addrs...)
} }
...@@ -329,6 +338,7 @@ func (s *Syncer) processWant(ctx context.Context, o *pb.Offer, w *pb.Want) ([]sw ...@@ -329,6 +338,7 @@ func (s *Syncer) processWant(ctx context.Context, o *pb.Offer, w *pb.Want) ([]sw
addrs = append(addrs, a) addrs = append(addrs, a)
} }
} }
s.metrics.DbOpsCounter.Inc()
return s.storage.Get(ctx, storage.ModeGetSync, addrs...) return s.storage.Get(ctx, storage.ModeGetSync, addrs...)
} }
...@@ -363,6 +373,7 @@ func (s *Syncer) cursorHandler(ctx context.Context, p p2p.Peer, stream p2p.Strea ...@@ -363,6 +373,7 @@ func (s *Syncer) cursorHandler(ctx context.Context, p p2p.Peer, stream p2p.Strea
} }
var ack pb.Ack var ack pb.Ack
s.metrics.DbOpsCounter.Inc()
ints, err := s.storage.Cursors(ctx) ints, err := s.storage.Cursors(ctx)
if err != nil { if err != nil {
_ = stream.FullClose() _ = stream.FullClose()
...@@ -387,7 +398,7 @@ func (s *Syncer) CancelRuid(peer swarm.Address, ruid uint32) error { ...@@ -387,7 +398,7 @@ func (s *Syncer) CancelRuid(peer swarm.Address, ruid uint32) error {
var c pb.Cancel var c pb.Cancel
c.Ruid = ruid c.Ruid = ruid
if err := w.WriteMsgWithTimeout(5*time.Second, &c); err != nil { if err := w.WriteMsgWithTimeout(cancellationTimeout, &c); err != nil {
return fmt.Errorf("send cancellation: %w", err) return fmt.Errorf("send cancellation: %w", err)
} }
return nil return nil
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment