Commit 63939b18 authored by acud's avatar acud Committed by GitHub

pusher, pushsync: more info on error (#342)

parent ab196a3a
...@@ -325,14 +325,14 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) ...@@ -325,14 +325,14 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
} }
func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) { func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) {
p.logger.Debugf("histSyncWorker starting, peer %s bin %d cursor %d", peer, bin, cur) p.logger.Tracef("histSyncWorker starting, peer %s bin %d cursor %d", peer, bin, cur)
for { for {
select { select {
case <-p.quit: case <-p.quit:
p.logger.Debugf("histSyncWorker quitting on shutdown. peer %s bin %d cur %d", peer, bin, cur) p.logger.Tracef("histSyncWorker quitting on shutdown. peer %s bin %d cur %d", peer, bin, cur)
return return
case <-ctx.Done(): case <-ctx.Done():
p.logger.Debugf("histSyncWorker context cancelled. peer %s bin %d cur %d", peer, bin, cur) p.logger.Tracef("histSyncWorker context cancelled. peer %s bin %d cur %d", peer, bin, cur)
return return
default: default:
} }
...@@ -347,38 +347,38 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin ...@@ -347,38 +347,38 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin
continue continue
} }
if s > cur { if s > cur {
p.logger.Debugf("histSyncWorker finished syncing bin %d, cursor %d", bin, cur) p.logger.Tracef("histSyncWorker finished syncing bin %d, cursor %d", bin, cur)
return return
} }
top, err := p.syncer.SyncInterval(ctx, peer, bin, s, cur) top, err := p.syncer.SyncInterval(ctx, peer, bin, s, cur)
if err != nil { if err != nil {
p.logger.Debugf("histSyncWorker error syncing interval. peer %s, bin %d, cursor %d, err %v", peer.String(), bin, cur, err) p.logger.Errorf("histSyncWorker error syncing interval. peer %s, bin %d, cursor %d, err %v", peer.String(), bin, cur, err)
return return
} }
err = p.addPeerInterval(peer, bin, s, top) err = p.addPeerInterval(peer, bin, s, top)
if err != nil { if err != nil {
p.logger.Debugf("error persisting interval for peer, quitting") p.logger.Errorf("error persisting interval for peer, quitting")
return return
} }
} }
} }
func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) { func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) {
p.logger.Debugf("liveSyncWorker starting, peer %s bin %d cursor %d", peer, bin, cur) p.logger.Tracef("liveSyncWorker starting, peer %s bin %d cursor %d", peer, bin, cur)
from := cur + 1 from := cur + 1
for { for {
select { select {
case <-p.quit: case <-p.quit:
p.logger.Debugf("liveSyncWorker quit on shutdown. peer %s bin %d cur %d", peer, bin, cur) p.logger.Tracef("liveSyncWorker quit on shutdown. peer %s bin %d cur %d", peer, bin, cur)
return return
case <-ctx.Done(): case <-ctx.Done():
p.logger.Debugf("liveSyncWorker context cancelled. peer %s bin %d cur %d", peer, bin, cur) p.logger.Tracef("liveSyncWorker context cancelled. peer %s bin %d cur %d", peer, bin, cur)
return return
default: default:
} }
top, err := p.syncer.SyncInterval(ctx, peer, bin, from, math.MaxUint64) top, err := p.syncer.SyncInterval(ctx, peer, bin, from, math.MaxUint64)
if err != nil { if err != nil {
p.logger.Debugf("liveSyncWorker exit on sync error. peer %s bin %d from %d err %v", peer, bin, from, err) p.logger.Errorf("liveSyncWorker exit on sync error. peer %s bin %d from %d err %v", peer, bin, from, err)
return return
} }
if top == 0 { if top == 0 {
...@@ -386,7 +386,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin ...@@ -386,7 +386,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin
} }
err = p.addPeerInterval(peer, bin, from, top) err = p.addPeerInterval(peer, bin, from, top)
if err != nil { if err != nil {
p.logger.Debugf("liveSyncWorker exit on add peer interval. peer %s bin %d from %d err %v", peer, bin, from, err) p.logger.Errorf("liveSyncWorker exit on add peer interval. peer %s bin %d from %d err %v", peer, bin, from, err)
return return
} }
from = top + 1 from = top + 1
......
...@@ -83,7 +83,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -83,7 +83,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
// Get the delivery // Get the delivery
chunk, err := ps.getChunkDelivery(r) chunk, err := ps.getChunkDelivery(r)
if err != nil { if err != nil {
return fmt.Errorf("chunk delivery: %w", err) return fmt.Errorf("chunk delivery from peer %s: %w", p.Address.String(), err)
} }
// Select the closest peer to forward the chunk // Select the closest peer to forward the chunk
...@@ -103,7 +103,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -103,7 +103,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
receipt := &pb.Receipt{Address: chunk.Address().Bytes()} receipt := &pb.Receipt{Address: chunk.Address().Bytes()}
err = ps.sendReceipt(w, receipt) err = ps.sendReceipt(w, receipt)
if err != nil { if err != nil {
return fmt.Errorf("send receipt: %w", err) return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
} }
return nil return nil
} }
...@@ -129,33 +129,33 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -129,33 +129,33 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
// Forward chunk to closest peer // Forward chunk to closest peer
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil { if err != nil {
return fmt.Errorf("new stream: %w", err) return fmt.Errorf("new stream peer %s: %w", peer.String(), err)
} }
defer streamer.Close() defer streamer.Close()
wc, rc := protobuf.NewWriterAndReader(streamer) wc, rc := protobuf.NewWriterAndReader(streamer)
if err := ps.sendChunkDelivery(wc, chunk); err != nil { if err := ps.sendChunkDelivery(wc, chunk); err != nil {
return fmt.Errorf("forward chunk: %w", err) return fmt.Errorf("forward chunk to peer %s: %w", peer.String(), err)
} }
receiptRTTTimer := time.Now() receiptRTTTimer := time.Now()
receipt, err := ps.receiveReceipt(rc) receipt, err := ps.receiveReceipt(rc)
if err != nil { if err != nil {
return fmt.Errorf("receive receipt: %w", err) return fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err)
} }
ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds()) ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds())
// Check if the receipt is valid // Check if the receipt is valid
if !chunk.Address().Equal(swarm.NewAddress(receipt.Address)) { if !chunk.Address().Equal(swarm.NewAddress(receipt.Address)) {
ps.metrics.InvalidReceiptReceived.Inc() ps.metrics.InvalidReceiptReceived.Inc()
return errors.New("invalid receipt") return fmt.Errorf("invalid receipt from peer %s", peer.String())
} }
// pass back the received receipt in the previously received stream // pass back the received receipt in the previously received stream
err = ps.sendReceipt(w, &receipt) err = ps.sendReceipt(w, &receipt)
if err != nil { if err != nil {
return fmt.Errorf("send receipt: %w", err) return fmt.Errorf("send receipt to peer %s: %w", peer.String(), err)
} }
ps.metrics.ReceiptsSentCounter.Inc() ps.metrics.ReceiptsSentCounter.Inc()
...@@ -225,26 +225,26 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re ...@@ -225,26 +225,26 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil { if err != nil {
return nil, fmt.Errorf("new stream: %w", err) return nil, fmt.Errorf("new stream for peer %s: %w", peer.String(), err)
} }
defer streamer.Close() defer streamer.Close()
w, r := protobuf.NewWriterAndReader(streamer) w, r := protobuf.NewWriterAndReader(streamer)
if err := ps.sendChunkDelivery(w, ch); err != nil { if err := ps.sendChunkDelivery(w, ch); err != nil {
return nil, fmt.Errorf("chunk deliver: %w", err) return nil, fmt.Errorf("chunk deliver to peer %s: %w", peer.String(), err)
} }
receiptRTTTimer := time.Now() receiptRTTTimer := time.Now()
receipt, err := ps.receiveReceipt(r) receipt, err := ps.receiveReceipt(r)
if err != nil { if err != nil {
return nil, fmt.Errorf("receive receipt: %w", err) return nil, fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err)
} }
ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds()) ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds())
// Check if the receipt is valid // Check if the receipt is valid
if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) { if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) {
ps.metrics.InvalidReceiptReceived.Inc() ps.metrics.InvalidReceiptReceived.Inc()
return nil, errors.New("invalid receipt") return nil, fmt.Errorf("invalid receipt. peer %s", peer.String())
} }
rec := &Receipt{ rec := &Receipt{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment