Commit 9451b877 authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

minor handshake refactor - streams (#405)

parent 50f306cf
...@@ -24,7 +24,6 @@ import ( ...@@ -24,7 +24,6 @@ import (
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat-svc" autonat "github.com/libp2p/go-libp2p-autonat-svc"
crypto "github.com/libp2p/go-libp2p-core/crypto" crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer" libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
...@@ -212,16 +211,18 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -212,16 +211,18 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
// handshake // handshake
s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) { s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) {
peerID := stream.Conn().RemotePeer() peerID := stream.Conn().RemotePeer()
i, err := s.handshakeService.Handle(NewStream(stream), stream.Conn().RemoteMultiaddr(), peerID) handshakeStream := NewStream(stream)
i, err := s.handshakeService.Handle(handshakeStream, stream.Conn().RemoteMultiaddr(), peerID)
if err != nil { if err != nil {
s.logger.Debugf("handshake: handle %s: %v", peerID, err) s.logger.Debugf("handshake: handle %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID) s.logger.Errorf("unable to handshake with peer %v", peerID)
_ = stream.Reset()
_ = s.disconnect(peerID) _ = s.disconnect(peerID)
return return
} }
if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists { if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
if err = helpers.FullClose(stream); err != nil { if err = handshakeStream.FullClose(); err != nil {
s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err) s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID) s.logger.Errorf("unable to handshake with peer %v", peerID)
_ = s.disconnect(peerID) _ = s.disconnect(peerID)
...@@ -229,7 +230,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -229,7 +230,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
return return
} }
if err = helpers.FullClose(stream); err != nil { if err = handshakeStream.FullClose(); err != nil {
s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err) s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID) s.logger.Errorf("unable to handshake with peer %v", peerID)
_ = s.disconnect(peerID) _ = s.disconnect(peerID)
...@@ -372,14 +373,16 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -372,14 +373,16 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return nil, fmt.Errorf("connect new stream: %w", err) return nil, fmt.Errorf("connect new stream: %w", err)
} }
i, err := s.handshakeService.Handshake(NewStream(stream), stream.Conn().RemoteMultiaddr(), stream.Conn().RemotePeer()) handshakeStream := NewStream(stream)
i, err := s.handshakeService.Handshake(handshakeStream, stream.Conn().RemoteMultiaddr(), stream.Conn().RemotePeer())
if err != nil { if err != nil {
_ = stream.Reset()
_ = s.disconnect(info.ID) _ = s.disconnect(info.ID)
return nil, fmt.Errorf("handshake: %w", err) return nil, fmt.Errorf("handshake: %w", err)
} }
if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists { if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
if err := helpers.FullClose(stream); err != nil { if err := handshakeStream.FullClose(); err != nil {
_ = s.disconnect(info.ID) _ = s.disconnect(info.ID)
return nil, fmt.Errorf("peer exists, full close: %w", err) return nil, fmt.Errorf("peer exists, full close: %w", err)
} }
...@@ -387,7 +390,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -387,7 +390,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return i.BzzAddress, nil return i.BzzAddress, nil
} }
if err := helpers.FullClose(stream); err != nil { if err := handshakeStream.FullClose(); err != nil {
_ = s.disconnect(info.ID) _ = s.disconnect(info.ID)
return nil, fmt.Errorf("connect full close %w", err) return nil, fmt.Errorf("connect full close %w", err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment