Commit 3748f52d authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Stream reset - hive (#420)

* hive - zombie threads
parent 435a12ff
...@@ -88,7 +88,7 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa ...@@ -88,7 +88,7 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa
if err != nil { if err != nil {
return fmt.Errorf("new stream: %w", err) return fmt.Errorf("new stream: %w", err)
} }
defer stream.Close() defer stream.FullClose()
w, _ := protobuf.NewWriterAndReader(stream) w, _ := protobuf.NewWriterAndReader(stream)
var peersRequest pb.Peers var peersRequest pb.Peers
...@@ -100,6 +100,7 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa ...@@ -100,6 +100,7 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa
continue continue
} }
_ = stream.Reset()
return err return err
} }
...@@ -111,23 +112,26 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa ...@@ -111,23 +112,26 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa
} }
if err := w.WriteMsg(&peersRequest); err != nil { if err := w.WriteMsg(&peersRequest); err != nil {
_ = stream.Reset()
return fmt.Errorf("write Peers message: %w", err) return fmt.Errorf("write Peers message: %w", err)
} }
return stream.FullClose() return nil
} }
func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error { func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
_, r := protobuf.NewWriterAndReader(stream) _, r := protobuf.NewWriterAndReader(stream)
var peersReq pb.Peers var peersReq pb.Peers
if err := r.ReadMsgWithTimeout(messageTimeout, &peersReq); err != nil { if err := r.ReadMsgWithTimeout(messageTimeout, &peersReq); err != nil {
_ = stream.Close() _ = stream.Reset()
return fmt.Errorf("read requestPeers message: %w", err) return fmt.Errorf("read requestPeers message: %w", err)
} }
if err := stream.Close(); err != nil { // close the stream before processing in order to unblock the sending side
return fmt.Errorf("close stream: %w", err) // fullclose is called async because there is no need to wait for conformation,
} // but we still want to handle not closed stream from the other side to avoid zombie stream
go stream.FullClose()
for _, newPeer := range peersReq.Peers { for _, newPeer := range peersReq.Peers {
bzzAddress, err := bzz.ParseAddress(newPeer.Underlay, newPeer.Overlay, newPeer.Signature, s.networkID) bzzAddress, err := bzz.ParseAddress(newPeer.Underlay, newPeer.Overlay, newPeer.Signature, s.networkID)
if err != nil { if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment