Commit 33198376 authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

full close timeout handle (#321)

parent 5587d466
...@@ -118,7 +118,7 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa ...@@ -118,7 +118,7 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa
return stream.FullClose() return stream.FullClose()
} }
func (s *Service) peersHandler(_ context.Context, peer p2p.Peer, stream p2p.Stream) error { func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
_, r := protobuf.NewWriterAndReader(stream) _, r := protobuf.NewWriterAndReader(stream)
var peersReq pb.Peers var peersReq pb.Peers
if err := r.ReadMsgWithTimeout(messageTimeout, &peersReq); err != nil { if err := r.ReadMsgWithTimeout(messageTimeout, &peersReq); err != nil {
...@@ -143,7 +143,7 @@ func (s *Service) peersHandler(_ context.Context, peer p2p.Peer, stream p2p.Stre ...@@ -143,7 +143,7 @@ func (s *Service) peersHandler(_ context.Context, peer p2p.Peer, stream p2p.Stre
} }
if s.peerHandler != nil { if s.peerHandler != nil {
if err := s.peerHandler(context.Background(), bzzAddress.Overlay); err != nil { if err := s.peerHandler(ctx, bzzAddress.Overlay); err != nil {
return err return err
} }
} }
......
...@@ -297,7 +297,7 @@ func (k *Kad) announce(ctx context.Context, peer swarm.Address) error { ...@@ -297,7 +297,7 @@ func (k *Kad) announce(ctx context.Context, peer swarm.Address) error {
return false, false, nil return false, false, nil
} }
addrs = append(addrs, connectedPeer) addrs = append(addrs, connectedPeer)
if err := k.discovery.BroadcastPeers(context.Background(), connectedPeer, peer); err != nil { if err := k.discovery.BroadcastPeers(ctx, connectedPeer, peer); err != nil {
// we don't want to fail the whole process because of this, keep on gossiping // we don't want to fail the whole process because of this, keep on gossiping
k.logger.Debugf("error gossiping peer %s to peer %s: %v", peer, connectedPeer, err) k.logger.Debugf("error gossiping peer %s to peer %s: %v", peer, connectedPeer, err)
return false, false, nil return false, false, nil
...@@ -309,7 +309,7 @@ func (k *Kad) announce(ctx context.Context, peer swarm.Address) error { ...@@ -309,7 +309,7 @@ func (k *Kad) announce(ctx context.Context, peer swarm.Address) error {
return nil return nil
} }
err := k.discovery.BroadcastPeers(context.Background(), peer, addrs...) err := k.discovery.BroadcastPeers(ctx, peer, addrs...)
if err != nil { if err != nil {
_ = k.p2p.Disconnect(peer) _ = k.p2p.Disconnect(peer)
} }
......
...@@ -220,11 +220,19 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -220,11 +220,19 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
} }
if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists { if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
_ = stream.Close() if err = stream.Close(); err != nil {
s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID)
_ = s.disconnect(peerID)
}
return return
} }
_ = stream.Close() if err = stream.Close(); err != nil {
s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID)
_ = s.disconnect(peerID)
}
err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress) err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress)
if err != nil { if err != nil {
...@@ -366,6 +374,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -366,6 +374,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists { if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
if err := helpers.FullClose(stream); err != nil { if err := helpers.FullClose(stream); err != nil {
_ = s.disconnect(info.ID)
return nil, err return nil, err
} }
...@@ -373,6 +382,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -373,6 +382,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
} }
if err := helpers.FullClose(stream); err != nil { if err := helpers.FullClose(stream); err != nil {
_ = s.disconnect(info.ID)
return nil, err return nil, err
} }
......
...@@ -100,7 +100,7 @@ func (d *driver) AddPeer(ctx context.Context, addr swarm.Address) error { ...@@ -100,7 +100,7 @@ func (d *driver) AddPeer(ctx context.Context, addr swarm.Address) error {
} }
connectedAddrs = append(connectedAddrs, addressee.Address) connectedAddrs = append(connectedAddrs, addressee.Address)
if err := d.discovery.BroadcastPeers(context.Background(), addressee.Address, addr); err != nil { if err := d.discovery.BroadcastPeers(ctx, addressee.Address, addr); err != nil {
return err return err
} }
} }
...@@ -109,7 +109,7 @@ func (d *driver) AddPeer(ctx context.Context, addr swarm.Address) error { ...@@ -109,7 +109,7 @@ func (d *driver) AddPeer(ctx context.Context, addr swarm.Address) error {
return nil return nil
} }
return d.discovery.BroadcastPeers(context.Background(), addr, connectedAddrs...) return d.discovery.BroadcastPeers(ctx, addr, connectedAddrs...)
} }
// ClosestPeer returns the closest connected peer we have in relation to a // ClosestPeer returns the closest connected peer we have in relation to a
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment