Commit 435a12ff authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Retreival - stream reset (#421)

* retreival zombie threads
parent 2af9c6a9
......@@ -123,7 +123,13 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPee
if err != nil {
return nil, peer, fmt.Errorf("new stream: %w", err)
}
defer stream.Close()
defer func() {
if err != nil {
_ = stream.Reset()
} else {
go stream.FullClose()
}
}()
w, r := protobuf.NewWriterAndReader(stream)
......@@ -181,9 +187,15 @@ func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address) (sw
return closest, nil
}
func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close()
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose()
}
}()
var req pb.Request
if err := r.ReadMsg(&req); err != nil {
return fmt.Errorf("read request: %w peer %s", err, p.Address.String())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment