Commit 0f3803f0 authored by Janos Guljas's avatar Janos Guljas

add libp2p connection tests and handle peer registry disconnected peer

parent 9bf7e6dd
...@@ -169,10 +169,10 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -169,10 +169,10 @@ func New(ctx context.Context, o Options) (*Service, error) {
s.logger.Warningf("handshake happened for already connected peer %s", peerID) s.logger.Warningf("handshake happened for already connected peer %s", peerID)
} }
s.logger.Debugf("handshake: handle %s: %w", peerID, err) s.logger.Debugf("handshake: handle %s: %v", peerID, err)
s.logger.Errorf("unable to handshake with peer %v", peerID) s.logger.Errorf("unable to handshake with peer %v", peerID)
// todo: test connection close and refactor // todo: test connection close and refactor
_ = s.host.Network().ClosePeer(peerID) _ = s.disconnect(peerID)
return return
} }
...@@ -198,6 +198,8 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -198,6 +198,8 @@ func New(ctx context.Context, o Options) (*Service, error) {
s.metrics.HandledConnectionCount.Inc() s.metrics.HandledConnectionCount.Inc()
}) })
h.Network().Notify(peerRegistry) // update peer registry on network events
return s, nil return s, nil
} }
...@@ -215,7 +217,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { ...@@ -215,7 +217,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
if !found { if !found {
// todo: this should never happen, should we disconnect in this case? // todo: this should never happen, should we disconnect in this case?
// todo: test connection close and refactor // todo: test connection close and refactor
_ = s.host.Network().ClosePeer(peerID) _ = s.disconnect(peerID)
s.logger.Errorf("overlay address for peer %q not found", peerID) s.logger.Errorf("overlay address for peer %q not found", peerID)
return return
} }
...@@ -235,7 +237,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { ...@@ -235,7 +237,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
return nil return nil
} }
func (s *Service) Addresses() (addrs []string, err error) { func (s *Service) Addresses() (addrs []ma.Multiaddr, err error) {
// Build host multiaddress // Build host multiaddress
hostAddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", s.host.ID().Pretty())) hostAddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", s.host.ID().Pretty()))
if err != nil { if err != nil {
...@@ -245,7 +247,7 @@ func (s *Service) Addresses() (addrs []string, err error) { ...@@ -245,7 +247,7 @@ func (s *Service) Addresses() (addrs []string, err error) {
// Now we can build a full multiaddress to reach this host // Now we can build a full multiaddress to reach this host
// by encapsulating both addresses: // by encapsulating both addresses:
for _, addr := range s.host.Addrs() { for _, addr := range s.host.Addrs() {
addrs = append(addrs, addr.Encapsulate(hostAddr).String()) addrs = append(addrs, addr.Encapsulate(hostAddr))
} }
return addrs, nil return addrs, nil
} }
...@@ -263,14 +265,14 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm ...@@ -263,14 +265,14 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm
stream, err := s.newStreamForPeerID(ctx, info.ID, handshake.ProtocolName, handshake.ProtocolVersion, handshake.StreamName) stream, err := s.newStreamForPeerID(ctx, info.ID, handshake.ProtocolName, handshake.ProtocolVersion, handshake.StreamName)
if err != nil { if err != nil {
_ = s.host.Network().ClosePeer(info.ID) _ = s.disconnect(info.ID)
return swarm.Address{}, err return swarm.Address{}, err
} }
i, err := s.handshakeService.Handshake(stream) i, err := s.handshakeService.Handshake(stream)
if err != nil { if err != nil {
_ = s.host.Network().ClosePeer(info.ID) _ = s.disconnect(info.ID)
return swarm.Address{}, err return swarm.Address{}, fmt.Errorf("handshake: %w", err)
} }
if err := helpers.FullClose(stream); err != nil { if err := helpers.FullClose(stream); err != nil {
...@@ -289,6 +291,10 @@ func (s *Service) Disconnect(overlay swarm.Address) error { ...@@ -289,6 +291,10 @@ func (s *Service) Disconnect(overlay swarm.Address) error {
if !found { if !found {
return p2p.ErrPeerNotFound return p2p.ErrPeerNotFound
} }
return s.disconnect(peerID)
}
func (s *Service) disconnect(peerID libp2ppeer.ID) error {
if err := s.host.Network().ClosePeer(peerID); err != nil { if err := s.host.Network().ClosePeer(peerID); err != nil {
return err return err
} }
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package libp2p_test
import (
"context"
"errors"
"io/ioutil"
"testing"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
)
func TestAddresses(t *testing.T) {
s := newService(t, &libp2p.Options{
NetworkID: 1,
})
addrs, err := s.Addresses()
if err != nil {
t.Fatal(err)
}
if l := len(addrs); l == 0 {
t.Fatal("no addresses")
}
}
func TestConnectDisconnect(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1 := newService(t, &o1)
o2 := libp2p.Options{
NetworkID: 1,
}
s2 := newService(t, &o2)
addrs, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
addr := addrs[0]
overlay, err := s2.Connect(context.Background(), addr)
if err != nil {
t.Fatal(err)
}
if err := s2.Disconnect(overlay); err != nil {
t.Fatal(err)
}
}
func TestDoubleConnect(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1 := newService(t, &o1)
o2 := libp2p.Options{
NetworkID: 1,
}
s2 := newService(t, &o2)
addrs, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
addr := addrs[0]
if _, err := s2.Connect(context.Background(), addr); err != nil {
t.Fatal(err)
}
if _, err := s2.Connect(context.Background(), addr); err == nil {
t.Fatal("second connect attempt should result with an error")
}
}
func TestDoubleDisconnect(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1 := newService(t, &o1)
o2 := libp2p.Options{
NetworkID: 1,
}
s2 := newService(t, &o2)
addrs, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
addr := addrs[0]
overlay, err := s2.Connect(context.Background(), addr)
if err != nil {
t.Fatal(err)
}
if err := s2.Disconnect(overlay); err != nil {
t.Fatal(err)
}
if err := s2.Disconnect(overlay); !errors.Is(err, p2p.ErrPeerNotFound) {
t.Errorf("got error %v, want %v", err, p2p.ErrPeerNotFound)
}
}
func TestReconnectAfterDoubleConnect(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1 := newService(t, &o1)
o2 := libp2p.Options{
NetworkID: 1,
}
s2 := newService(t, &o2)
addrs, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
addr := addrs[0]
overlay, err := s2.Connect(context.Background(), addr)
if err != nil {
t.Fatal(err)
}
if _, err := s2.Connect(context.Background(), addr); err == nil {
t.Fatal("second connect attempt should result with an error")
}
overlay, err = s2.Connect(context.Background(), addr)
if err != nil {
t.Fatal(err)
}
if !overlay.Equal(o1.Overlay) {
t.Errorf("got overlay %s, want %s", overlay, o1.Overlay)
}
}
func TestMultipleConnectDisconnect(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1 := newService(t, &o1)
o2 := libp2p.Options{
NetworkID: 1,
}
s2 := newService(t, &o2)
addrs, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
addr := addrs[0]
overlay, err := s2.Connect(context.Background(), addr)
if err != nil {
t.Fatal(err)
}
if err := s2.Disconnect(overlay); err != nil {
t.Fatal(err)
}
overlay, err = s2.Connect(context.Background(), addr)
if err != nil {
t.Fatal(err)
}
if err := s2.Disconnect(overlay); err != nil {
t.Fatal(err)
}
}
func TestConnectDisconnectOnAllAddresses(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1 := newService(t, &o1)
o2 := libp2p.Options{
NetworkID: 1,
}
s2 := newService(t, &o2)
addrs, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
for _, addr := range addrs {
overlay, err := s2.Connect(context.Background(), addr)
if err != nil {
t.Fatal(err)
}
if err := s2.Disconnect(overlay); err != nil {
t.Fatal(err)
}
}
}
func TestDoubleConnectOnAllAddresses(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1 := newService(t, &o1)
o2 := libp2p.Options{
NetworkID: 1,
}
s2 := newService(t, &o2)
addrs, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
for _, addr := range addrs {
if _, err := s2.Connect(context.Background(), addr); err != nil {
t.Fatal(err)
}
if _, err := s2.Connect(context.Background(), addr); err == nil {
t.Fatal("second connect attempt should result with an error")
}
}
}
func newService(t *testing.T, o *libp2p.Options) *libp2p.Service {
t.Helper()
if o == nil {
o = new(libp2p.Options)
}
if o.PrivateKey == nil {
var err error
o.PrivateKey, err = crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}
}
if o.Overlay.IsZero() {
var err error
swarmPK, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}
o.Overlay = crypto.NewAddress(swarmPK.PublicKey)
}
if o.Logger == nil {
o.Logger = logging.New(ioutil.Discard, 0)
}
if o.Addr == "" {
o.Addr = ":0"
}
s, err := libp2p.New(context.Background(), *o)
if err != nil {
t.Fatal(err)
}
return s
}
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"sync" "sync"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/libp2p/go-libp2p-core/network"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer" libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
) )
...@@ -15,12 +16,15 @@ type peerRegistry struct { ...@@ -15,12 +16,15 @@ type peerRegistry struct {
peers map[string]libp2ppeer.ID // map overlay address to underlay peer id peers map[string]libp2ppeer.ID // map overlay address to underlay peer id
overlays map[libp2ppeer.ID]swarm.Address // map underlay peer id to overlay address overlays map[libp2ppeer.ID]swarm.Address // map underlay peer id to overlay address
mu sync.RWMutex mu sync.RWMutex
network.Notifiee // peerRegistry can be the receiver for network.Notify
} }
func newPeerRegistry() *peerRegistry { func newPeerRegistry() *peerRegistry {
return &peerRegistry{ return &peerRegistry{
peers: make(map[string]libp2ppeer.ID), peers: make(map[string]libp2ppeer.ID),
overlays: make(map[libp2ppeer.ID]swarm.Address), overlays: make(map[libp2ppeer.ID]swarm.Address),
Notifiee: new(network.NoopNotifiee),
} }
} }
...@@ -61,3 +65,9 @@ func (r *peerRegistry) remove(peerID libp2ppeer.ID) { ...@@ -61,3 +65,9 @@ func (r *peerRegistry) remove(peerID libp2ppeer.ID) {
func encodePeersKey(overlay swarm.Address) string { func encodePeersKey(overlay swarm.Address) string {
return string(overlay.Bytes()) return string(overlay.Bytes())
} }
// Disconnect removes the peer from registry in disconnect.
// peerRegistry has to be set by network.Network.Notify().
func (r *peerRegistry) Disconnected(_ network.Network, c network.Conn) {
r.remove(c.RemotePeer())
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment