Commit 0b2d9e34 authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

fix stream flaky tests (#86)

parent 898a3fda
......@@ -95,7 +95,6 @@ func (s *Service) Handshake(stream p2p.Stream) (i *Info, err error) {
}
func (s *Service) Handle(stream p2p.Stream, peerID libp2ppeer.ID) (i *Info, err error) {
defer stream.Close()
s.receivedHandshakesMu.Lock()
if _, exists := s.receivedHandshakes[peerID]; exists {
s.receivedHandshakesMu.Unlock()
......
......@@ -193,9 +193,11 @@ func New(ctx context.Context, o Options) (*Service, error) {
}
if exists := s.peers.addIfNotExists(stream.Conn(), i.Address); exists {
_ = stream.Close()
return
}
_ = stream.Close()
remoteMultiaddr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", stream.Conn().RemoteMultiaddr().String(), peerID.Pretty()))
if err != nil {
s.logger.Debugf("multiaddr error: handle %s: %v", peerID, err)
......@@ -328,14 +330,18 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm
return swarm.Address{}, fmt.Errorf("handshake: %w", err)
}
if exists := s.peers.addIfNotExists(stream.Conn(), i.Address); exists {
if err := helpers.FullClose(stream); err != nil {
return swarm.Address{}, err
}
if exists := s.peers.addIfNotExists(stream.Conn(), i.Address); exists {
return i.Address, nil
}
if err := helpers.FullClose(stream); err != nil {
return swarm.Address{}, err
}
s.metrics.CreatedConnectionCount.Inc()
s.logger.Infof("peer %s connected", i.Address)
return i.Address, nil
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment