Commit 06680c4f authored by Janos Guljas's avatar Janos Guljas

add p2p Service Peers method, fix libp2p Disconnect and improve tests

parent 0f3803f0
......@@ -176,7 +176,7 @@ func New(ctx context.Context, o Options) (*Service, error) {
return
}
s.peers.add(peerID, i.Address)
s.peers.add(stream.Conn(), i.Address)
s.metrics.HandledStreamCount.Inc()
s.logger.Infof("peer %s connected", i.Address)
})
......@@ -280,7 +280,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm
return swarm.Address{}, err
}
s.peers.add(info.ID, i.Address)
s.peers.add(stream.Conn(), i.Address)
s.metrics.CreatedConnectionCount.Inc()
s.logger.Infof("peer %s connected", i.Address)
return i.Address, nil
......@@ -302,6 +302,10 @@ func (s *Service) disconnect(peerID libp2ppeer.ID) error {
return nil
}
func (s *Service) Peers() []p2p.Peer {
return s.peers.peers()
}
func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, protocolName, protocolVersion, streamName string) (p2p.Stream, error) {
peerID, found := s.peers.peerID(overlay)
if !found {
......
......@@ -5,21 +5,24 @@
package libp2p_test
import (
"bytes"
"context"
"errors"
"io/ioutil"
"sort"
"testing"
"time"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/swarm"
)
func TestAddresses(t *testing.T) {
s := newService(t, &libp2p.Options{
NetworkID: 1,
})
s, _, cleanup := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup()
addrs, err := s.Addresses()
if err != nil {
......@@ -31,14 +34,14 @@ func TestAddresses(t *testing.T) {
}
func TestConnectDisconnect(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1 := newService(t, &o1)
o2 := libp2p.Options{
NetworkID: 1,
}
s2 := newService(t, &o2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1, cleanup1 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup1()
s2, overlay2, cleanup2 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup2()
addrs, err := s1.Addresses()
if err != nil {
......@@ -46,25 +49,31 @@ func TestConnectDisconnect(t *testing.T) {
}
addr := addrs[0]
overlay, err := s2.Connect(context.Background(), addr)
overlay, err := s2.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
if err := s2.Disconnect(overlay); err != nil {
t.Fatal(err)
}
expectPeers(t, s2)
expectPeersEventually(t, s1)
}
func TestDoubleConnect(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1 := newService(t, &o1)
o2 := libp2p.Options{
NetworkID: 1,
}
s2 := newService(t, &o2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1, cleanup1 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup1()
s2, overlay2, cleanup2 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup2()
addrs, err := s1.Addresses()
if err != nil {
......@@ -72,24 +81,30 @@ func TestDoubleConnect(t *testing.T) {
}
addr := addrs[0]
if _, err := s2.Connect(context.Background(), addr); err != nil {
if _, err := s2.Connect(ctx, addr); err != nil {
t.Fatal(err)
}
if _, err := s2.Connect(context.Background(), addr); err == nil {
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
if _, err := s2.Connect(ctx, addr); err == nil {
t.Fatal("second connect attempt should result with an error")
}
expectPeers(t, s2)
expectPeersEventually(t, s1)
}
func TestDoubleDisconnect(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1 := newService(t, &o1)
o2 := libp2p.Options{
NetworkID: 1,
}
s2 := newService(t, &o2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1, cleanup1 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup1()
s2, overlay2, cleanup2 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup2()
addrs, err := s1.Addresses()
if err != nil {
......@@ -97,28 +112,38 @@ func TestDoubleDisconnect(t *testing.T) {
}
addr := addrs[0]
overlay, err := s2.Connect(context.Background(), addr)
overlay, err := s2.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
if err := s2.Disconnect(overlay); err != nil {
t.Fatal(err)
}
expectPeers(t, s2)
expectPeersEventually(t, s1)
if err := s2.Disconnect(overlay); !errors.Is(err, p2p.ErrPeerNotFound) {
t.Errorf("got error %v, want %v", err, p2p.ErrPeerNotFound)
}
expectPeers(t, s2)
expectPeersEventually(t, s1)
}
func TestReconnectAfterDoubleConnect(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1 := newService(t, &o1)
o2 := libp2p.Options{
NetworkID: 1,
}
s2 := newService(t, &o2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1, cleanup1 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup1()
s2, overlay2, cleanup2 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup2()
addrs, err := s1.Addresses()
if err != nil {
......@@ -126,33 +151,42 @@ func TestReconnectAfterDoubleConnect(t *testing.T) {
}
addr := addrs[0]
overlay, err := s2.Connect(context.Background(), addr)
overlay, err := s2.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}
if _, err := s2.Connect(context.Background(), addr); err == nil {
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
if _, err := s2.Connect(ctx, addr); err == nil {
t.Fatal("second connect attempt should result with an error")
}
overlay, err = s2.Connect(context.Background(), addr)
expectPeers(t, s2)
expectPeersEventually(t, s1)
overlay, err = s2.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}
if !overlay.Equal(o1.Overlay) {
t.Errorf("got overlay %s, want %s", overlay, o1.Overlay)
if !overlay.Equal(overlay1) {
t.Errorf("got overlay %s, want %s", overlay, overlay1)
}
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
}
func TestMultipleConnectDisconnect(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1 := newService(t, &o1)
o2 := libp2p.Options{
NetworkID: 1,
}
s2 := newService(t, &o2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1, cleanup1 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup1()
s2, overlay2, cleanup2 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup2()
addrs, err := s1.Addresses()
if err != nil {
......@@ -160,81 +194,164 @@ func TestMultipleConnectDisconnect(t *testing.T) {
}
addr := addrs[0]
overlay, err := s2.Connect(context.Background(), addr)
overlay, err := s2.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
if err := s2.Disconnect(overlay); err != nil {
t.Fatal(err)
}
overlay, err = s2.Connect(context.Background(), addr)
expectPeers(t, s2)
expectPeersEventually(t, s1)
overlay, err = s2.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
if err := s2.Disconnect(overlay); err != nil {
t.Fatal(err)
}
expectPeers(t, s2)
expectPeersEventually(t, s1)
}
func TestConnectDisconnectOnAllAddresses(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1 := newService(t, &o1)
o2 := libp2p.Options{
NetworkID: 1,
}
s2 := newService(t, &o2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1, cleanup1 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup1()
s2, overlay2, cleanup2 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup2()
addrs, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
for _, addr := range addrs {
overlay, err := s2.Connect(context.Background(), addr)
overlay, err := s2.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
if err := s2.Disconnect(overlay); err != nil {
t.Fatal(err)
}
expectPeers(t, s2)
expectPeersEventually(t, s1)
}
}
func TestDoubleConnectOnAllAddresses(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1 := newService(t, &o1)
o2 := libp2p.Options{
NetworkID: 1,
}
s2 := newService(t, &o2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1, cleanup1 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup1()
s2, overlay2, cleanup2 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup2()
addrs, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
for _, addr := range addrs {
if _, err := s2.Connect(context.Background(), addr); err != nil {
if _, err := s2.Connect(ctx, addr); err != nil {
t.Fatal(err)
}
if _, err := s2.Connect(context.Background(), addr); err == nil {
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
if _, err := s2.Connect(ctx, addr); err == nil {
t.Fatal("second connect attempt should result with an error")
}
expectPeers(t, s2)
expectPeersEventually(t, s1)
}
}
func newService(t *testing.T, o *libp2p.Options) *libp2p.Service {
t.Helper()
func TestDifferentNetworkIDs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, _, cleanup1 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup1()
s2, _, cleanup2 := newService(t, libp2p.Options{NetworkID: 2})
defer cleanup2()
addrs, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
addr := addrs[0]
if _, err := s2.Connect(ctx, addr); err == nil {
t.Fatal("connect attempt should result with an error")
}
if o == nil {
o = new(libp2p.Options)
expectPeers(t, s1)
expectPeers(t, s2)
}
func TestBootnodes(t *testing.T) {
s1, overlay1, cleanup1 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup1()
s2, overlay2, cleanup2 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup2()
addrs1, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
addrs2, err := s2.Addresses()
if err != nil {
t.Fatal(err)
}
s3, overlay3, cleanup3 := newService(t, libp2p.Options{
NetworkID: 1,
Bootnodes: []string{
addrs1[0].String(),
addrs2[0].String(),
},
})
defer cleanup3()
expectPeers(t, s3, overlay1, overlay2)
expectPeers(t, s1, overlay3)
expectPeers(t, s2, overlay3)
}
// newService constructs a new libp2p service.
func newService(t *testing.T, o libp2p.Options) (s *libp2p.Service, overlay swarm.Address, cleanup func()) {
t.Helper()
// disable ws until the race condition in:
// github.com/gorilla/websocket@v1.4.1/conn.go:614
// github.com/gorilla/websocket@v1.4.1/conn.go:781
// using github.com/libp2p/go-libp2p-transport-upgrader@v0.1.1
// is solved
o.DisableWS = true
if o.PrivateKey == nil {
var err error
......@@ -261,9 +378,72 @@ func newService(t *testing.T, o *libp2p.Options) *libp2p.Service {
o.Addr = ":0"
}
s, err := libp2p.New(context.Background(), *o)
ctx, cancel := context.WithCancel(context.Background())
s, err := libp2p.New(ctx, o)
if err != nil {
t.Fatal(err)
}
return s
return s, o.Overlay, func() {
cancel()
s.Close()
}
}
// expectPeers validates that peers with addresses are connected.
func expectPeers(t *testing.T, s *libp2p.Service, addrs ...swarm.Address) {
t.Helper()
peers := s.Peers()
if len(peers) != len(addrs) {
t.Fatalf("got peers %v, want %v", len(peers), len(addrs))
}
sort.Slice(addrs, func(i, j int) bool {
return bytes.Compare(addrs[i].Bytes(), addrs[j].Bytes()) == -1
})
sort.Slice(peers, func(i, j int) bool {
return bytes.Compare(peers[i].Address.Bytes(), peers[j].Address.Bytes()) == -1
})
for i, got := range peers {
want := addrs[i]
if !got.Address.Equal(want) {
t.Errorf("got %v peer %s, want %s", i, got.Address, want)
}
}
}
// expectPeersEventually validates that peers with addresses are connected with
// retires. It is supposed to be used to validate asynchronous connecting on the
// peer that is connected to.
func expectPeersEventually(t *testing.T, s *libp2p.Service, addrs ...swarm.Address) {
t.Helper()
var peers []p2p.Peer
for i := 0; i < 100; i++ {
peers = s.Peers()
if len(peers) == len(addrs) {
break
}
time.Sleep(50 * time.Millisecond)
}
if len(peers) != len(addrs) {
t.Fatalf("got peers %v, want %v", len(peers), len(addrs))
}
sort.Slice(addrs, func(i, j int) bool {
return bytes.Compare(addrs[i].Bytes(), addrs[j].Bytes()) == -1
})
sort.Slice(peers, func(i, j int) bool {
return bytes.Compare(peers[i].Address.Bytes(), peers[j].Address.Bytes()) == -1
})
for i, got := range peers {
want := addrs[i]
if !got.Address.Equal(want) {
t.Errorf("got %v peer %s, want %s", i, got.Address, want)
}
}
}
......@@ -5,26 +5,31 @@
package libp2p
import (
"bytes"
"sort"
"sync"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/libp2p/go-libp2p-core/network"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
)
type peerRegistry struct {
peers map[string]libp2ppeer.ID // map overlay address to underlay peer id
overlays map[libp2ppeer.ID]swarm.Address // map underlay peer id to overlay address
mu sync.RWMutex
underlays map[string]libp2ppeer.ID // map overlay address to underlay peer id
overlays map[libp2ppeer.ID]swarm.Address // map underlay peer id to overlay address
connections map[libp2ppeer.ID]map[network.Conn]struct{} // list of connections for safe removal on Disconnect notification
mu sync.RWMutex
network.Notifiee // peerRegistry can be the receiver for network.Notify
}
func newPeerRegistry() *peerRegistry {
return &peerRegistry{
peers: make(map[string]libp2ppeer.ID),
overlays: make(map[libp2ppeer.ID]swarm.Address),
Notifiee: new(network.NoopNotifiee),
underlays: make(map[string]libp2ppeer.ID),
overlays: make(map[libp2ppeer.ID]swarm.Address),
connections: make(map[libp2ppeer.ID]map[network.Conn]struct{}),
Notifiee: new(network.NoopNotifiee),
}
}
......@@ -33,16 +38,61 @@ func (r *peerRegistry) Exists(overlay swarm.Address) (found bool) {
return found
}
func (r *peerRegistry) add(peerID libp2ppeer.ID, overlay swarm.Address) {
// Disconnect removes the peer from registry in disconnect.
// peerRegistry has to be set by network.Network.Notify().
func (r *peerRegistry) Disconnected(_ network.Network, c network.Conn) {
peerID := c.RemotePeer()
r.mu.Lock()
defer r.mu.Unlock()
// remove only the related connection,
// not eventually newly created one for the same peer
if _, ok := r.connections[peerID][c]; !ok {
return
}
overlay := r.overlays[peerID]
delete(r.overlays, peerID)
delete(r.underlays, encodeunderlaysKey(overlay))
delete(r.connections[peerID], c)
if len(r.connections[peerID]) == 0 {
delete(r.connections, peerID)
}
}
func (r *peerRegistry) peers() []p2p.Peer {
r.mu.Lock()
peers := make([]p2p.Peer, 0, len(r.overlays))
for _, a := range r.overlays {
peers = append(peers, p2p.Peer{
Address: a,
})
}
r.mu.Unlock()
sort.Slice(peers, func(i, j int) bool {
return bytes.Compare(peers[i].Address.Bytes(), peers[j].Address.Bytes()) == -1
})
return peers
}
func (r *peerRegistry) add(c network.Conn, overlay swarm.Address) {
peerID := c.RemotePeer()
r.mu.Lock()
r.peers[encodePeersKey(overlay)] = peerID
r.underlays[encodeunderlaysKey(overlay)] = peerID
r.overlays[peerID] = overlay
if _, ok := r.connections[peerID]; !ok {
r.connections[peerID] = make(map[network.Conn]struct{})
}
r.connections[peerID][c] = struct{}{}
r.mu.Unlock()
}
func (r *peerRegistry) peerID(overlay swarm.Address) (peerID libp2ppeer.ID, found bool) {
r.mu.RLock()
peerID, found = r.peers[encodePeersKey(overlay)]
peerID, found = r.underlays[encodeunderlaysKey(overlay)]
r.mu.RUnlock()
return peerID, found
}
......@@ -58,16 +108,11 @@ func (r *peerRegistry) remove(peerID libp2ppeer.ID) {
r.mu.Lock()
overlay := r.overlays[peerID]
delete(r.overlays, peerID)
delete(r.peers, encodePeersKey(overlay))
delete(r.underlays, encodeunderlaysKey(overlay))
delete(r.connections, peerID)
r.mu.Unlock()
}
func encodePeersKey(overlay swarm.Address) string {
func encodeunderlaysKey(overlay swarm.Address) string {
return string(overlay.Bytes())
}
// Disconnect removes the peer from registry in disconnect.
// peerRegistry has to be set by network.Network.Notify().
func (r *peerRegistry) Disconnected(_ network.Network, c network.Conn) {
r.remove(c.RemotePeer())
}
......@@ -17,6 +17,7 @@ type Service struct {
addProtocolFunc func(p2p.ProtocolSpec) error
connectFunc func(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error)
disconnectFunc func(overlay swarm.Address) error
peersFunc func() []p2p.Peer
}
func WithAddProtocolFunc(f func(p2p.ProtocolSpec) error) Option {
......@@ -37,6 +38,12 @@ func WithDisconnectFunc(f func(overlay swarm.Address) error) Option {
})
}
func WithPeersFunc(f func() []p2p.Peer) Option {
return optionFunc(func(s *Service) {
s.peersFunc = f
})
}
func New(opts ...Option) *Service {
s := new(Service)
for _, o := range opts {
......@@ -47,25 +54,32 @@ func New(opts ...Option) *Service {
func (s *Service) AddProtocol(spec p2p.ProtocolSpec) error {
if s.addProtocolFunc == nil {
return errors.New("AddProtocol function not configured")
return errors.New("function AddProtocol not configured")
}
return s.addProtocolFunc(spec)
}
func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error) {
if s.connectFunc == nil {
return swarm.Address{}, errors.New("Connect function not configured")
return swarm.Address{}, errors.New("function Connect not configured")
}
return s.connectFunc(ctx, addr)
}
func (s *Service) Disconnect(overlay swarm.Address) error {
if s.disconnectFunc == nil {
return errors.New("Disconnect function not configured")
return errors.New("function Disconnect not configured")
}
return s.disconnectFunc(overlay)
}
func (s *Service) Peers() []p2p.Peer {
if s.peersFunc == nil {
return nil
}
return s.peersFunc()
}
type Option interface {
apply(*Service)
}
......
......@@ -16,6 +16,7 @@ type Service interface {
AddProtocol(ProtocolSpec) error
Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error)
Disconnect(overlay swarm.Address) error
Peers() []Peer
}
type Streamer interface {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment