Commit 533e83f3 authored by acud's avatar acud Committed by GitHub

pushsync: simplify metrics (#1107)

parent 7db090a5
...@@ -10,109 +10,33 @@ import ( ...@@ -10,109 +10,33 @@ import (
) )
type metrics struct { type metrics struct {
// all metrics fields must be exported TotalSent prometheus.Counter
// to be able to return them by Metrics() TotalReceived prometheus.Counter
// using reflection TotalErrors prometheus.Counter
TotalChunksStoredInDB prometheus.Counter
ChunksSentCounter prometheus.Counter
ChunksReceivedCounter prometheus.Counter
SendChunkErrorCounter prometheus.Counter
ReceivedChunkErrorCounter prometheus.Counter
ReceiptsReceivedCounter prometheus.Counter
ReceiptsSentCounter prometheus.Counter
SendReceiptErrorCounter prometheus.Counter
ReceiveReceiptErrorCounter prometheus.Counter
RetriesExhaustedCounter prometheus.Counter
InvalidReceiptReceived prometheus.Counter
SendChunkTimer prometheus.Histogram
ReceiptRTT prometheus.Histogram
} }
func newMetrics() metrics { func newMetrics() metrics {
subsystem := "pushsync" subsystem := "pushsync"
return metrics{ return metrics{
TotalChunksStoredInDB: prometheus.NewCounter(prometheus.CounterOpts{ TotalSent: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_chunk_stored_in_DB",
Help: "Total chunks stored successfully in local store.",
}),
ChunksSentCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "sent_chunk", Name: "total_sent",
Help: "Total chunks sent.", Help: "Total chunks sent.",
}), }),
ChunksReceivedCounter: prometheus.NewCounter(prometheus.CounterOpts{ TotalReceived: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "received_chunk", Name: "total_received",
Help: "Total chunks received.", Help: "Total chunks received.",
}), }),
SendChunkErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{ TotalErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "send_chunk_error", Name: "total_errors",
Help: "Total no of time error received while sending chunk.", Help: "Total no of time error received while sending chunk.",
}), }),
ReceivedChunkErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "received_chunk_error",
Help: "Total no of time error received while receiving chunk.",
}),
ReceiptsReceivedCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "received_receipts",
Help: "Total no of times receipts received.",
}),
ReceiptsSentCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "sent_receipts",
Help: "Total no of times receipts are sent.",
}),
SendReceiptErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "sent_receipts_error",
Help: "Total no of times receipts were sent and error was encountered.",
}),
ReceiveReceiptErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "receive_receipt_error",
Help: "Total no of time error received while receiving receipt.",
}),
RetriesExhaustedCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunk_retries_exhausted",
Help: "CHunk retries exhausted.",
}),
InvalidReceiptReceived: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "invalid_receipt_receipt",
Help: "Invalid receipt received from peer.",
}),
SendChunkTimer: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "send_chunk_time_histogram",
Help: "Histogram for Time taken to send a chunk.",
Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60},
}),
ReceiptRTT: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "receipt_rtt_histogram",
Help: "Histogram of RTT for receiving receipt for a pushed chunk.",
Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60},
}),
} }
} }
......
...@@ -93,6 +93,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -93,6 +93,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
w, r := protobuf.NewWriterAndReader(stream) w, r := protobuf.NewWriterAndReader(stream)
defer func() { defer func() {
if err != nil { if err != nil {
ps.metrics.TotalErrors.Inc()
_ = stream.Reset() _ = stream.Reset()
} else { } else {
_ = stream.FullClose() _ = stream.FullClose()
...@@ -101,10 +102,9 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -101,10 +102,9 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
var ch pb.Delivery var ch pb.Delivery
if err = r.ReadMsgWithContext(ctx, &ch); err != nil { if err = r.ReadMsgWithContext(ctx, &ch); err != nil {
ps.metrics.ReceivedChunkErrorCounter.Inc()
return fmt.Errorf("pushsync read delivery: %w", err) return fmt.Errorf("pushsync read delivery: %w", err)
} }
ps.metrics.ChunksReceivedCounter.Inc() ps.metrics.TotalReceived.Inc()
chunk := swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data) chunk := swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data)
...@@ -162,17 +162,14 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -162,17 +162,14 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
if err := ps.sendChunkDelivery(ctx, wc, chunk); err != nil { if err := ps.sendChunkDelivery(ctx, wc, chunk); err != nil {
return fmt.Errorf("forward chunk to peer %s: %w", peer.String(), err) return fmt.Errorf("forward chunk to peer %s: %w", peer.String(), err)
} }
receiptRTTTimer := time.Now()
receipt, err := ps.receiveReceipt(ctx, rc) receipt, err := ps.receiveReceipt(ctx, rc)
if err != nil { if err != nil {
return fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err) return fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err)
} }
ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds())
// Check if the receipt is valid // Check if the receipt is valid
if !chunk.Address().Equal(swarm.NewAddress(receipt.Address)) { if !chunk.Address().Equal(swarm.NewAddress(receipt.Address)) {
ps.metrics.InvalidReceiptReceived.Inc()
return fmt.Errorf("invalid receipt from peer %s", peer.String()) return fmt.Errorf("invalid receipt from peer %s", peer.String())
} }
...@@ -186,7 +183,6 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -186,7 +183,6 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
if err != nil { if err != nil {
return fmt.Errorf("send receipt to peer %s: %w", peer.String(), err) return fmt.Errorf("send receipt to peer %s: %w", peer.String(), err)
} }
ps.metrics.ReceiptsSentCounter.Inc()
return ps.accounting.Debit(p.Address, ps.pricer.Price(chunk.Address())) return ps.accounting.Debit(p.Address, ps.pricer.Price(chunk.Address()))
} }
...@@ -194,27 +190,18 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -194,27 +190,18 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
func (ps *PushSync) sendChunkDelivery(ctx context.Context, w protobuf.Writer, chunk swarm.Chunk) (err error) { func (ps *PushSync) sendChunkDelivery(ctx context.Context, w protobuf.Writer, chunk swarm.Chunk) (err error) {
ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt) ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt)
defer cancel() defer cancel()
startTimer := time.Now() return w.WriteMsgWithContext(ctx, &pb.Delivery{
if err = w.WriteMsgWithContext(ctx, &pb.Delivery{
Address: chunk.Address().Bytes(), Address: chunk.Address().Bytes(),
Data: chunk.Data(), Data: chunk.Data(),
}); err != nil { })
ps.metrics.SendChunkErrorCounter.Inc()
return err
}
ps.metrics.SendChunkTimer.Observe(time.Since(startTimer).Seconds())
ps.metrics.ChunksSentCounter.Inc()
return nil
} }
func (ps *PushSync) sendReceipt(ctx context.Context, w protobuf.Writer, receipt *pb.Receipt) (err error) { func (ps *PushSync) sendReceipt(ctx context.Context, w protobuf.Writer, receipt *pb.Receipt) (err error) {
ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt) ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt)
defer cancel() defer cancel()
if err := w.WriteMsgWithContext(ctx, receipt); err != nil { if err := w.WriteMsgWithContext(ctx, receipt); err != nil {
ps.metrics.SendReceiptErrorCounter.Inc()
return err return err
} }
ps.metrics.ReceiptsSentCounter.Inc()
return nil return nil
} }
...@@ -222,10 +209,8 @@ func (ps *PushSync) receiveReceipt(ctx context.Context, r protobuf.Reader) (rece ...@@ -222,10 +209,8 @@ func (ps *PushSync) receiveReceipt(ctx context.Context, r protobuf.Reader) (rece
ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt) ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt)
defer cancel() defer cancel()
if err := r.ReadMsgWithContext(ctx, &receipt); err != nil { if err := r.ReadMsgWithContext(ctx, &receipt); err != nil {
ps.metrics.ReceiveReceiptErrorCounter.Inc()
return receipt, err return receipt, err
} }
ps.metrics.ReceiptsReceivedCounter.Inc()
return receipt, nil return receipt, nil
} }
...@@ -302,6 +287,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re ...@@ -302,6 +287,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil { if err != nil {
ps.metrics.TotalErrors.Inc()
lastErr = fmt.Errorf("new stream for peer %s: %w", peer.String(), err) lastErr = fmt.Errorf("new stream for peer %s: %w", peer.String(), err)
ps.logger.Debugf("pushsync-push: %v", lastErr) ps.logger.Debugf("pushsync-push: %v", lastErr)
continue continue
...@@ -310,12 +296,15 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re ...@@ -310,12 +296,15 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
w, r := protobuf.NewWriterAndReader(streamer) w, r := protobuf.NewWriterAndReader(streamer)
if err := ps.sendChunkDelivery(ctx, w, ch); err != nil { if err := ps.sendChunkDelivery(ctx, w, ch); err != nil {
ps.metrics.TotalErrors.Inc()
_ = streamer.Reset() _ = streamer.Reset()
lastErr = fmt.Errorf("chunk %s deliver to peer %s: %w", ch.Address().String(), peer.String(), err) lastErr = fmt.Errorf("chunk %s deliver to peer %s: %w", ch.Address().String(), peer.String(), err)
ps.logger.Debugf("pushsync-push: %v", lastErr) ps.logger.Debugf("pushsync-push: %v", lastErr)
continue continue
} }
ps.metrics.TotalSent.Inc()
// if you manage to get a tag, just increment the respective counter // if you manage to get a tag, just increment the respective counter
t, err := ps.tagger.Get(ch.TagID()) t, err := ps.tagger.Get(ch.TagID())
if err == nil && t != nil { if err == nil && t != nil {
...@@ -325,19 +314,17 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re ...@@ -325,19 +314,17 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
} }
} }
receiptRTTTimer := time.Now()
receipt, err := ps.receiveReceipt(ctx, r) receipt, err := ps.receiveReceipt(ctx, r)
if err != nil { if err != nil {
ps.metrics.TotalErrors.Inc()
_ = streamer.Reset() _ = streamer.Reset()
lastErr = fmt.Errorf("chunk %s receive receipt from peer %s: %w", ch.Address().String(), peer.String(), err) lastErr = fmt.Errorf("chunk %s receive receipt from peer %s: %w", ch.Address().String(), peer.String(), err)
ps.logger.Debugf("pushsync-push: %v", lastErr) ps.logger.Debugf("pushsync-push: %v", lastErr)
continue continue
} }
ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds())
// Check if the receipt is valid // Check if the receipt is valid
if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) { if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) {
ps.metrics.InvalidReceiptReceived.Inc()
_ = streamer.Reset() _ = streamer.Reset()
return nil, fmt.Errorf("invalid receipt. peer %s", peer.String()) return nil, fmt.Errorf("invalid receipt. peer %s", peer.String())
} }
...@@ -369,7 +356,6 @@ func (ps *PushSync) handleDeliveryResponse(ctx context.Context, w protobuf.Write ...@@ -369,7 +356,6 @@ func (ps *PushSync) handleDeliveryResponse(ctx context.Context, w protobuf.Write
if err != nil { if err != nil {
return fmt.Errorf("chunk store: %w", err) return fmt.Errorf("chunk store: %w", err)
} }
ps.metrics.TotalChunksStoredInDB.Inc()
// Send a receipt immediately once the storage of the chunk is successfully // Send a receipt immediately once the storage of the chunk is successfully
receipt := &pb.Receipt{Address: chunk.Address().Bytes()} receipt := &pb.Receipt{Address: chunk.Address().Bytes()}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment