Commit 8224bf4c authored by Janos Guljas's avatar Janos Guljas

add libp2p peer registry

parent dc195184
...@@ -14,7 +14,6 @@ import ( ...@@ -14,7 +14,6 @@ import (
"net" "net"
"os" "os"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/janos/bee/pkg/p2p" "github.com/janos/bee/pkg/p2p"
...@@ -42,9 +41,7 @@ type Service struct { ...@@ -42,9 +41,7 @@ type Service struct {
host host.Host host host.Host
metrics metrics metrics metrics
handshakeService *handshake.Service handshakeService *handshake.Service
overlayToPeerID map[string]libp2ppeer.ID peers *peerRegistry
peerIDToOverlay map[libp2ppeer.ID]string
overlayPeerIDMu sync.RWMutex
} }
type Options struct { type Options struct {
...@@ -185,9 +182,8 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -185,9 +182,8 @@ func New(ctx context.Context, o Options) (*Service, error) {
s := &Service{ s := &Service{
host: h, host: h,
metrics: newMetrics(), metrics: newMetrics(),
overlayToPeerID: make(map[string]libp2ppeer.ID),
peerIDToOverlay: make(map[libp2ppeer.ID]string),
handshakeService: handshake.New(overlay), handshakeService: handshake.New(overlay),
peers: newPeerRegistry(),
} }
// Construct protocols. // Construct protocols.
...@@ -201,7 +197,7 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -201,7 +197,7 @@ func New(ctx context.Context, o Options) (*Service, error) {
s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) { s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) {
s.metrics.HandledStreamCount.Inc() s.metrics.HandledStreamCount.Inc()
overlay := s.handshakeService.Handler(stream) overlay := s.handshakeService.Handler(stream)
s.addAddresses(overlay, stream.Conn().RemotePeer()) s.peers.add(stream.Conn().RemotePeer(), overlay)
}) })
// TODO: be more resilient on connection errors and connect in parallel // TODO: be more resilient on connection errors and connect in parallel
...@@ -234,8 +230,8 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { ...@@ -234,8 +230,8 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) { s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) {
peerID := stream.Conn().RemotePeer() peerID := stream.Conn().RemotePeer()
overlay, ok := s.overlayForPeerID(peerID) overlay, found := s.peers.overlay(peerID)
if !ok { if !found {
// todo: handle better // todo: handle better
fmt.Printf("Could not fetch handshake for peerID %s\n", stream) fmt.Printf("Could not fetch handshake for peerID %s\n", stream)
return return
...@@ -285,14 +281,14 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (err error) { ...@@ -285,14 +281,14 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (err error) {
return err return err
} }
s.addAddresses(overlay, info.ID) s.peers.add(info.ID, overlay)
s.metrics.CreatedConnectionCount.Inc() s.metrics.CreatedConnectionCount.Inc()
fmt.Println("handshake handshake finished") fmt.Println("handshake handshake finished")
return nil return nil
} }
func (s *Service) NewStream(ctx context.Context, overlay, protocolName, streamName, version string) (p2p.Stream, error) { func (s *Service) NewStream(ctx context.Context, overlay, protocolName, streamName, version string) (p2p.Stream, error) {
peerID, ok := s.peerIDForOverlay(overlay) peerID, found := s.peers.peerID(overlay)
if !ok { if !found {
return nil, p2p.ErrPeerNotFound return nil, p2p.ErrPeerNotFound
} }
...@@ -312,27 +308,6 @@ func (s *Service) newStreamForPeerID(ctx context.Context, peerID libp2ppeer.ID, ...@@ -312,27 +308,6 @@ func (s *Service) newStreamForPeerID(ctx context.Context, peerID libp2ppeer.ID,
return st, nil return st, nil
} }
func (s *Service) peerIDForOverlay(overlay string) (peerID libp2ppeer.ID, ok bool) {
s.overlayPeerIDMu.RLock()
peerID, ok = s.overlayToPeerID[overlay]
s.overlayPeerIDMu.RUnlock()
return peerID, ok
}
func (s *Service) overlayForPeerID(peerID libp2ppeer.ID) (overlay string, ok bool) {
s.overlayPeerIDMu.RLock()
overlay, ok = s.peerIDToOverlay[peerID]
s.overlayPeerIDMu.RUnlock()
return overlay, ok
}
func (s *Service) addAddresses(overlay string, peerID libp2ppeer.ID) {
s.overlayPeerIDMu.Lock()
s.overlayToPeerID[overlay] = peerID
s.peerIDToOverlay[peerID] = overlay
s.overlayPeerIDMu.Unlock()
}
func (s *Service) Close() error { func (s *Service) Close() error {
return s.host.Close() return s.host.Close()
} }
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package libp2p
import (
"sync"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
)
type peerRegistry struct {
peers map[string]libp2ppeer.ID
overlays map[libp2ppeer.ID]string
mu sync.RWMutex
}
func newPeerRegistry() *peerRegistry {
return &peerRegistry{
peers: make(map[string]libp2ppeer.ID),
overlays: make(map[libp2ppeer.ID]string),
}
}
func (r *peerRegistry) add(peerID libp2ppeer.ID, overlay string) {
r.mu.Lock()
r.peers[overlay] = peerID
r.overlays[peerID] = overlay
r.mu.Unlock()
}
func (r *peerRegistry) peerID(overlay string) (peerID libp2ppeer.ID, found bool) {
r.mu.RLock()
peerID, found = r.peers[overlay]
r.mu.RUnlock()
return peerID, found
}
func (r *peerRegistry) overlay(peerID libp2ppeer.ID) (overlay string, found bool) {
r.mu.RLock()
overlay, found = r.overlays[peerID]
r.mu.RUnlock()
return overlay, found
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment