Commit 2af9c6a9 authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Pushsync - stream reset (#422)

* pushsync zombie threads
parent d9c674b0
......@@ -80,9 +80,15 @@ func (s *PushSync) Protocol() p2p.ProtocolSpec {
// handler handles chunk delivery from other node and forwards to its destination node.
// If the current node is the destination, it stores in the local store and sends a receipt.
func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close()
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose()
}
}()
// Get the delivery
chunk, err := ps.getChunkDelivery(r)
......@@ -135,7 +141,13 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
if err != nil {
return fmt.Errorf("new stream peer %s: %w", peer.String(), err)
}
defer streamer.Close()
defer func() {
if err != nil {
_ = streamer.Reset()
} else {
go streamer.FullClose()
}
}()
wc, rc := protobuf.NewWriterAndReader(streamer)
if err := ps.sendChunkDelivery(wc, chunk); err != nil {
......@@ -230,10 +242,11 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
if err != nil {
return nil, fmt.Errorf("new stream for peer %s: %w", peer.String(), err)
}
defer streamer.Close()
defer func() { go streamer.FullClose() }()
w, r := protobuf.NewWriterAndReader(streamer)
if err := ps.sendChunkDelivery(w, ch); err != nil {
_ = streamer.Reset()
return nil, fmt.Errorf("chunk deliver to peer %s: %w", peer.String(), err)
}
// if you manage to get a tag, just increment the respective counter
......@@ -245,6 +258,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
receiptRTTTimer := time.Now()
receipt, err := ps.receiveReceipt(r)
if err != nil {
_ = streamer.Reset()
return nil, fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err)
}
ps.metrics.ReceiptRTT.Observe(time.Since(receiptRTTTimer).Seconds())
......@@ -252,6 +266,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
// Check if the receipt is valid
if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) {
ps.metrics.InvalidReceiptReceived.Inc()
_ = streamer.Reset()
return nil, fmt.Errorf("invalid receipt. peer %s", peer.String())
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment