Commit 5bf7137f authored by Esad Akar's avatar Esad Akar Committed by GitHub

fix(libp2p,kademlia): fixes discrepency-kademlia and libp2p peers lists (#1696)

parent e98c03b6
...@@ -256,71 +256,77 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -256,71 +256,77 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
handshakeStream := NewStream(stream) handshakeStream := NewStream(stream)
i, err := s.handshakeService.Handle(ctx, handshakeStream, stream.Conn().RemoteMultiaddr(), peerID) i, err := s.handshakeService.Handle(ctx, handshakeStream, stream.Conn().RemoteMultiaddr(), peerID)
if err != nil { if err != nil {
s.logger.Debugf("handshake: handle %s: %v", peerID, err) s.logger.Debugf("stream handler: handshake: handle %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID) s.logger.Errorf("stream handler: handshake: unable to handshake with peer id %v", peerID)
_ = handshakeStream.Reset() _ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(peerID) _ = s.host.Network().ClosePeer(peerID)
return return
} }
blocked, err := s.blocklist.Exists(i.BzzAddress.Overlay) overlay := i.BzzAddress.Overlay
blocked, err := s.blocklist.Exists(overlay)
if err != nil { if err != nil {
s.logger.Debugf("blocklisting: exists %s: %v", peerID, err) s.logger.Debugf("stream handler: blocklisting: exists %s: %v", overlay, err)
s.logger.Errorf("internal error while connecting with peer %s", peerID) s.logger.Errorf("stream handler: internal error while connecting with peer %s", overlay)
_ = handshakeStream.Reset() _ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(peerID) _ = s.host.Network().ClosePeer(peerID)
return return
} }
if blocked { if blocked {
s.logger.Errorf("blocked connection from blocklisted peer %s", peerID) s.logger.Errorf("stream handler: blocked connection from blocklisted peer %s", overlay)
_ = handshakeStream.Reset() _ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(peerID) _ = s.host.Network().ClosePeer(peerID)
return return
} }
if s.notifier != nil { if s.notifier != nil {
if !s.notifier.Pick(p2p.Peer{Address: i.BzzAddress.Overlay}) { if !s.notifier.Pick(p2p.Peer{Address: overlay}) {
s.logger.Errorf("don't want incoming peer %s. disconnecting", peerID) s.logger.Warningf("stream handler: don't want incoming peer %s. disconnecting", overlay)
_ = handshakeStream.Reset() _ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(peerID) _ = s.host.Network().ClosePeer(peerID)
return return
} }
} }
if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists { if exists := s.peers.addIfNotExists(stream.Conn(), overlay); exists {
s.logger.Debugf("stream handler: peer %s already exists", overlay)
if err = handshakeStream.FullClose(); err != nil { if err = handshakeStream.FullClose(); err != nil {
s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err) s.logger.Debugf("stream handler: could not close stream %s: %v", overlay, err)
s.logger.Errorf("unable to handshake with peer %v", peerID) s.logger.Errorf("stream handler: unable to handshake with peer %v", overlay)
_ = s.Disconnect(i.BzzAddress.Overlay) _ = s.Disconnect(overlay)
} }
return return
} }
if err = handshakeStream.FullClose(); err != nil { if err = handshakeStream.FullClose(); err != nil {
s.logger.Debugf("handshake: could not close stream %s: %v", peerID, err) s.logger.Debugf("stream handler: could not close stream %s: %v", overlay, err)
s.logger.Errorf("unable to handshake with peer %v", peerID) s.logger.Errorf("stream handler: unable to handshake with peer %v", overlay)
_ = s.Disconnect(i.BzzAddress.Overlay) _ = s.Disconnect(overlay)
return return
} }
if i.FullNode { if i.FullNode {
err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress) err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress)
if err != nil { if err != nil {
s.logger.Debugf("handshake: addressbook put error %s: %v", peerID, err) s.logger.Debugf("stream handler: addressbook put error %s: %v", peerID, err)
s.logger.Errorf("unable to persist peer %v", peerID) s.logger.Errorf("stream handler: unable to persist peer %v", peerID)
_ = s.Disconnect(i.BzzAddress.Overlay) _ = s.Disconnect(i.BzzAddress.Overlay)
return return
} }
} }
peer := p2p.Peer{Address: i.BzzAddress.Overlay} peer := p2p.Peer{Address: overlay}
s.protocolsmu.RLock() s.protocolsmu.RLock()
for _, tn := range s.protocols { for _, tn := range s.protocols {
if tn.ConnectIn != nil { if tn.ConnectIn != nil {
if err := tn.ConnectIn(ctx, peer); err != nil { if err := tn.ConnectIn(ctx, peer); err != nil {
s.logger.Debugf("connectIn: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, i.BzzAddress.Overlay, err) s.logger.Debugf("stream handler: connectIn: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, overlay, err)
_ = s.Disconnect(overlay)
s.protocolsmu.RUnlock()
return
} }
} }
} }
...@@ -331,10 +337,10 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -331,10 +337,10 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
s.lightNodes.Connected(ctx, peer) s.lightNodes.Connected(ctx, peer)
//light node announces explicitly //light node announces explicitly
if err := s.notifier.Announce(ctx, peer.Address); err != nil { if err := s.notifier.Announce(ctx, peer.Address); err != nil {
s.logger.Debugf("notifier.Announce: %s: %v", peer.Address.String(), err) s.logger.Debugf("stream handler: notifier.Announce: %s: %v", peer.Address.String(), err)
} }
} else if err := s.notifier.Connected(ctx, peer); err != nil { // full node announces implicitly } else if err := s.notifier.Connected(ctx, peer); err != nil { // full node announces implicitly
s.logger.Debugf("notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err) s.logger.Debugf("stream handler: notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
// note: this cannot be unit tested since the node // note: this cannot be unit tested since the node
// waiting on handshakeStream.FullClose() on the other side // waiting on handshakeStream.FullClose() on the other side
// might actually get a stream reset when we disconnect here // might actually get a stream reset when we disconnect here
...@@ -344,14 +350,20 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -344,14 +350,20 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
// interface, in addition to the possibility of deciding whether // interface, in addition to the possibility of deciding whether
// a peer connection is wanted prior to adding the peer to the // a peer connection is wanted prior to adding the peer to the
// peer registry and starting the protocols. // peer registry and starting the protocols.
_ = s.Disconnect(i.BzzAddress.Overlay) _ = s.Disconnect(overlay)
return return
} }
} }
s.metrics.HandledStreamCount.Inc() s.metrics.HandledStreamCount.Inc()
s.logger.Debugf("successfully connected to peer %s%s (inbound)", i.BzzAddress.ShortString(), i.LightString()) if !s.peers.Exists(overlay) {
s.logger.Infof("successfully connected to peer %s%s (inbound)", i.BzzAddress.Overlay, i.LightString()) s.logger.Warningf("stream handler: inbound peer %s does not exist, disconnecting", overlay)
_ = s.Disconnect(overlay)
return
}
s.logger.Debugf("stream handler: successfully connected to peer %s%s (inbound)", i.BzzAddress.ShortString(), i.LightString())
s.logger.Infof("stream handler: successfully connected to peer %s%s (inbound)", i.BzzAddress.Overlay, i.LightString())
}) })
h.Network().SetConnHandler(func(_ network.Conn) { h.Network().SetConnHandler(func(_ network.Conn) {
...@@ -380,7 +392,6 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { ...@@ -380,7 +392,6 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
peerID := streamlibp2p.Conn().RemotePeer() peerID := streamlibp2p.Conn().RemotePeer()
overlay, found := s.peers.overlay(peerID) overlay, found := s.peers.overlay(peerID)
if !found { if !found {
_ = s.Disconnect(overlay)
_ = streamlibp2p.Reset() _ = streamlibp2p.Reset()
s.logger.Debugf("overlay address for peer %q not found", peerID) s.logger.Debugf("overlay address for peer %q not found", peerID)
return return
...@@ -544,7 +555,9 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -544,7 +555,9 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return nil, p2p.ErrDialLightNode return nil, p2p.ErrDialLightNode
} }
blocked, err := s.blocklist.Exists(i.BzzAddress.Overlay) overlay := i.BzzAddress.Overlay
blocked, err := s.blocklist.Exists(overlay)
if err != nil { if err != nil {
s.logger.Debugf("blocklisting: exists %s: %v", info.ID, err) s.logger.Debugf("blocklisting: exists %s: %v", info.ID, err)
s.logger.Errorf("internal error while connecting with peer %s", info.ID) s.logger.Errorf("internal error while connecting with peer %s", info.ID)
...@@ -560,9 +573,9 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -560,9 +573,9 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return nil, fmt.Errorf("peer blocklisted") return nil, fmt.Errorf("peer blocklisted")
} }
if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists { if exists := s.peers.addIfNotExists(stream.Conn(), overlay); exists {
if err := handshakeStream.FullClose(); err != nil { if err := handshakeStream.FullClose(); err != nil {
_ = s.Disconnect(i.BzzAddress.Overlay) _ = s.Disconnect(overlay)
return nil, fmt.Errorf("peer exists, full close: %w", err) return nil, fmt.Errorf("peer exists, full close: %w", err)
} }
...@@ -570,40 +583,49 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -570,40 +583,49 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
} }
if err := handshakeStream.FullClose(); err != nil { if err := handshakeStream.FullClose(); err != nil {
_ = s.Disconnect(i.BzzAddress.Overlay) _ = s.Disconnect(overlay)
return nil, fmt.Errorf("connect full close %w", err) return nil, fmt.Errorf("connect full close %w", err)
} }
err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress) if i.FullNode {
if err != nil { err = s.addressbook.Put(overlay, *i.BzzAddress)
_ = s.Disconnect(i.BzzAddress.Overlay) if err != nil {
return nil, fmt.Errorf("storing bzz address: %w", err) _ = s.Disconnect(overlay)
return nil, fmt.Errorf("storing bzz address: %w", err)
}
} }
s.protocolsmu.RLock() s.protocolsmu.RLock()
for _, tn := range s.protocols { for _, tn := range s.protocols {
if tn.ConnectOut != nil { if tn.ConnectOut != nil {
if err := tn.ConnectOut(ctx, p2p.Peer{Address: i.BzzAddress.Overlay}); err != nil { if err := tn.ConnectOut(ctx, p2p.Peer{Address: overlay}); err != nil {
s.logger.Debugf("connectOut: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, i.BzzAddress.Overlay, err) s.logger.Debugf("connectOut: protocol: %s, version:%s, peer: %s: %v", tn.Name, tn.Version, overlay, err)
_ = s.Disconnect(overlay)
s.protocolsmu.RUnlock()
return nil, fmt.Errorf("connectOut: protocol: %s, version:%s: %w", tn.Name, tn.Version, err)
} }
} }
} }
s.protocolsmu.RUnlock() s.protocolsmu.RUnlock()
if !s.peers.Exists(overlay) {
_ = s.Disconnect(overlay)
return nil, fmt.Errorf("libp2p connect: peer %s does not exist %w", overlay, p2p.ErrPeerNotFound)
}
s.metrics.CreatedConnectionCount.Inc() s.metrics.CreatedConnectionCount.Inc()
s.logger.Debugf("successfully connected to peer %s%s (outbound)", i.BzzAddress.ShortString(), i.LightString()) s.logger.Debugf("successfully connected to peer %s%s (outbound)", i.BzzAddress.ShortString(), i.LightString())
s.logger.Infof("successfully connected to peer %s%s (outbound)", i.BzzAddress.Overlay, i.LightString()) s.logger.Infof("successfully connected to peer %s%s (outbound)", overlay, i.LightString())
return i.BzzAddress, nil return i.BzzAddress, nil
} }
func (s *Service) Disconnect(overlay swarm.Address) error { func (s *Service) Disconnect(overlay swarm.Address) error {
s.metrics.DisconnectCount.Inc() s.metrics.DisconnectCount.Inc()
s.logger.Debugf("libp2p disconnect: disconnecting peer %s", overlay)
found, peerID := s.peers.remove(overlay) found, peerID := s.peers.remove(overlay)
if !found {
return p2p.ErrPeerNotFound
}
_ = s.host.Network().ClosePeer(peerID) _ = s.host.Network().ClosePeer(peerID)
...@@ -617,8 +639,8 @@ func (s *Service) Disconnect(overlay swarm.Address) error { ...@@ -617,8 +639,8 @@ func (s *Service) Disconnect(overlay swarm.Address) error {
} }
} }
} }
s.protocolsmu.RUnlock() s.protocolsmu.RUnlock()
if s.notifier != nil { if s.notifier != nil {
s.notifier.Disconnected(peer) s.notifier.Disconnected(peer)
} }
...@@ -626,11 +648,17 @@ func (s *Service) Disconnect(overlay swarm.Address) error { ...@@ -626,11 +648,17 @@ func (s *Service) Disconnect(overlay swarm.Address) error {
s.lightNodes.Disconnected(peer) s.lightNodes.Disconnected(peer)
} }
if !found {
s.logger.Debugf("libp2p disconnect: peer %s not found", overlay)
return p2p.ErrPeerNotFound
}
return nil return nil
} }
// disconnected is a registered peer registry event // disconnected is a registered peer registry event
func (s *Service) disconnected(address swarm.Address) { func (s *Service) disconnected(address swarm.Address) {
peer := p2p.Peer{Address: address} peer := p2p.Peer{Address: address}
s.protocolsmu.RLock() s.protocolsmu.RLock()
for _, tn := range s.protocols { for _, tn := range s.protocols {
......
...@@ -721,6 +721,7 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address) error { ...@@ -721,6 +721,7 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address) error {
err := k.discovery.BroadcastPeers(ctx, peer, addrs...) err := k.discovery.BroadcastPeers(ctx, peer, addrs...)
if err != nil { if err != nil {
k.logger.Errorf("kademlia: could not broadcast to peer %s", peer)
_ = k.p2p.Disconnect(peer) _ = k.p2p.Disconnect(peer)
} }
...@@ -807,6 +808,9 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error { ...@@ -807,6 +808,9 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
// Disconnected is called when peer disconnects. // Disconnected is called when peer disconnects.
func (k *Kad) Disconnected(peer p2p.Peer) { func (k *Kad) Disconnected(peer p2p.Peer) {
k.logger.Debugf("kademlia: disconnected peer %s", peer.Address)
po := swarm.Proximity(k.base.Bytes(), peer.Address.Bytes()) po := swarm.Proximity(k.base.Bytes(), peer.Address.Bytes())
k.connectedPeers.Remove(peer.Address, po) k.connectedPeers.Remove(peer.Address, po)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment