Commit d1ebafc8 authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Advertisable address (#283)

Use advertisable address to try and help UPnP situation
parent 0231080f
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"path/filepath" "path/filepath"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/api" "github.com/ethersphere/bee/pkg/api"
...@@ -151,6 +152,18 @@ func NewBee(o Options) (*Bee, error) { ...@@ -151,6 +152,18 @@ func NewBee(o Options) (*Bee, error) {
} }
b.p2pService = p2ps b.p2pService = p2ps
// wait for nat manager to init
logger.Debug("initializing NAT manager")
select {
case <-p2ps.NATManager().Ready():
// this is magic sleep to give NAT time to sync the mappings
// this is a hack, kind of alchemy and should be improved
time.Sleep(3 * time.Second)
logger.Debug("NAT manager initialized")
case <-time.After(10 * time.Second):
logger.Warning("NAT manager init timeout")
}
// Construct protocols. // Construct protocols.
pingPong := pingpong.New(pingpong.Options{ pingPong := pingpong.New(pingpong.Options{
Streamer: p2ps, Streamer: p2ps,
......
...@@ -44,27 +44,33 @@ var ( ...@@ -44,27 +44,33 @@ var (
ErrInvalidSyn = errors.New("invalid syn") ErrInvalidSyn = errors.New("invalid syn")
) )
type AdvertisableAddressResolver interface {
Resolve(observedAdddress ma.Multiaddr) (ma.Multiaddr, error)
}
type Service struct { type Service struct {
signer crypto.Signer signer crypto.Signer
overlay swarm.Address advertisableAddresser AdvertisableAddressResolver
lightNode bool overlay swarm.Address
networkID uint64 lightNode bool
receivedHandshakes map[libp2ppeer.ID]struct{} networkID uint64
receivedHandshakesMu sync.Mutex receivedHandshakes map[libp2ppeer.ID]struct{}
logger logging.Logger receivedHandshakesMu sync.Mutex
logger logging.Logger
network.Notifiee // handshake service can be the receiver for network.Notify network.Notifiee // handshake service can be the receiver for network.Notify
} }
func New(overlay swarm.Address, signer crypto.Signer, networkID uint64, lighNode bool, logger logging.Logger) (*Service, error) { func New(signer crypto.Signer, advertisableAddresser AdvertisableAddressResolver, overlay swarm.Address, networkID uint64, lighNode bool, logger logging.Logger) (*Service, error) {
return &Service{ return &Service{
signer: signer, signer: signer,
overlay: overlay, advertisableAddresser: advertisableAddresser,
networkID: networkID, overlay: overlay,
lightNode: lighNode, networkID: networkID,
receivedHandshakes: make(map[libp2ppeer.ID]struct{}), lightNode: lighNode,
logger: logger, receivedHandshakes: make(map[libp2ppeer.ID]struct{}),
Notifiee: new(network.NoopNotifiee), logger: logger,
Notifiee: new(network.NoopNotifiee),
}, nil }, nil
} }
...@@ -91,24 +97,37 @@ func (s *Service) Handshake(stream p2p.Stream, peerMultiaddr ma.Multiaddr, peerI ...@@ -91,24 +97,37 @@ func (s *Service) Handshake(stream p2p.Stream, peerMultiaddr ma.Multiaddr, peerI
return nil, fmt.Errorf("read synack message: %w", err) return nil, fmt.Errorf("read synack message: %w", err)
} }
remoteBzzAddress, err := s.parseCheckAck(resp.Ack, fullRemoteMABytes) remoteBzzAddress, err := s.parseCheckAck(resp.Ack)
if err != nil { if err != nil {
return nil, err return nil, err
} }
addr, err := ma.NewMultiaddrBytes(resp.Syn.ObservedUnderlay) observedUnderlay, err := ma.NewMultiaddrBytes(resp.Syn.ObservedUnderlay)
if err != nil { if err != nil {
return nil, ErrInvalidSyn return nil, ErrInvalidSyn
} }
bzzAddress, err := bzz.NewAddress(s.signer, addr, s.overlay, s.networkID) advertisableUnderlay, err := s.advertisableAddresser.Resolve(observedUnderlay)
if err != nil {
return nil, err
}
bzzAddress, err := bzz.NewAddress(s.signer, advertisableUnderlay, s.overlay, s.networkID)
if err != nil {
return nil, err
}
advertisableUnderlayBytes, err := bzzAddress.Underlay.MarshalBinary()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := w.WriteMsgWithTimeout(messageTimeout, &pb.Ack{ if err := w.WriteMsgWithTimeout(messageTimeout, &pb.Ack{
Overlay: bzzAddress.Overlay.Bytes(), Address: &pb.BzzAddress{
Signature: bzzAddress.Signature, Underlay: advertisableUnderlayBytes,
Overlay: bzzAddress.Overlay.Bytes(),
Signature: bzzAddress.Signature,
},
NetworkID: s.networkID, NetworkID: s.networkID,
Light: s.lightNode, Light: s.lightNode,
}); err != nil { }); err != nil {
...@@ -147,12 +166,22 @@ func (s *Service) Handle(stream p2p.Stream, remoteMultiaddr ma.Multiaddr, remote ...@@ -147,12 +166,22 @@ func (s *Service) Handle(stream p2p.Stream, remoteMultiaddr ma.Multiaddr, remote
return nil, fmt.Errorf("read syn message: %w", err) return nil, fmt.Errorf("read syn message: %w", err)
} }
addr, err := ma.NewMultiaddrBytes(syn.ObservedUnderlay) observedUnderlay, err := ma.NewMultiaddrBytes(syn.ObservedUnderlay)
if err != nil { if err != nil {
return nil, ErrInvalidSyn return nil, ErrInvalidSyn
} }
bzzAddress, err := bzz.NewAddress(s.signer, addr, s.overlay, s.networkID) advertisableUnderlay, err := s.advertisableAddresser.Resolve(observedUnderlay)
if err != nil {
return nil, err
}
bzzAddress, err := bzz.NewAddress(s.signer, advertisableUnderlay, s.overlay, s.networkID)
if err != nil {
return nil, err
}
advertisableUnderlayBytes, err := bzzAddress.Underlay.MarshalBinary()
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -162,8 +191,11 @@ func (s *Service) Handle(stream p2p.Stream, remoteMultiaddr ma.Multiaddr, remote ...@@ -162,8 +191,11 @@ func (s *Service) Handle(stream p2p.Stream, remoteMultiaddr ma.Multiaddr, remote
ObservedUnderlay: fullRemoteMABytes, ObservedUnderlay: fullRemoteMABytes,
}, },
Ack: &pb.Ack{ Ack: &pb.Ack{
Overlay: bzzAddress.Overlay.Bytes(), Address: &pb.BzzAddress{
Signature: bzzAddress.Signature, Underlay: advertisableUnderlayBytes,
Overlay: bzzAddress.Overlay.Bytes(),
Signature: bzzAddress.Signature,
},
NetworkID: s.networkID, NetworkID: s.networkID,
Light: s.lightNode, Light: s.lightNode,
}, },
...@@ -176,7 +208,7 @@ func (s *Service) Handle(stream p2p.Stream, remoteMultiaddr ma.Multiaddr, remote ...@@ -176,7 +208,7 @@ func (s *Service) Handle(stream p2p.Stream, remoteMultiaddr ma.Multiaddr, remote
return nil, fmt.Errorf("read ack message: %w", err) return nil, fmt.Errorf("read ack message: %w", err)
} }
remoteBzzAddress, err := s.parseCheckAck(&ack, fullRemoteMABytes) remoteBzzAddress, err := s.parseCheckAck(&ack)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -198,12 +230,12 @@ func buildFullMA(addr ma.Multiaddr, peerID libp2ppeer.ID) (ma.Multiaddr, error) ...@@ -198,12 +230,12 @@ func buildFullMA(addr ma.Multiaddr, peerID libp2ppeer.ID) (ma.Multiaddr, error)
return ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", addr.String(), peerID.Pretty())) return ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", addr.String(), peerID.Pretty()))
} }
func (s *Service) parseCheckAck(ack *pb.Ack, remoteMA []byte) (*bzz.Address, error) { func (s *Service) parseCheckAck(ack *pb.Ack) (*bzz.Address, error) {
if ack.NetworkID != s.networkID { if ack.NetworkID != s.networkID {
return nil, ErrNetworkIDIncompatible return nil, ErrNetworkIDIncompatible
} }
bzzAddress, err := bzz.ParseAddress(remoteMA, ack.Overlay, ack.Signature, s.networkID) bzzAddress, err := bzz.ParseAddress(ack.Address.Underlay, ack.Address.Overlay, ack.Address.Signature, s.networkID)
if err != nil { if err != nil {
return nil, ErrInvalidAck return nil, ErrInvalidAck
} }
......
...@@ -13,13 +13,18 @@ message Syn { ...@@ -13,13 +13,18 @@ message Syn {
} }
message Ack { message Ack {
bytes Overlay = 1; BzzAddress Address = 1;
bytes Signature = 2; uint64 NetworkID = 2;
uint64 NetworkID = 3; bool Light = 3;
bool Light = 4;
} }
message SynAck { message SynAck {
Syn Syn = 1; Syn Syn = 1;
Ack Ack = 2; Ack Ack = 2;
}
message BzzAddress {
bytes Underlay = 1;
bytes Signature = 2;
bytes Overlay = 3;
} }
\ No newline at end of file
...@@ -32,6 +32,7 @@ import ( ...@@ -32,6 +32,7 @@ import (
protocol "github.com/libp2p/go-libp2p-core/protocol" protocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-peerstore/pstoremem" "github.com/libp2p/go-libp2p-peerstore/pstoremem"
libp2pquic "github.com/libp2p/go-libp2p-quic-transport" libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-tcp-transport" "github.com/libp2p/go-tcp-transport"
ws "github.com/libp2p/go-ws-transport" ws "github.com/libp2p/go-ws-transport"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
...@@ -43,18 +44,19 @@ var ( ...@@ -43,18 +44,19 @@ var (
) )
type Service struct { type Service struct {
ctx context.Context ctx context.Context
host host.Host host host.Host
libp2pPeerstore peerstore.Peerstore natManager basichost.NATManager
metrics metrics libp2pPeerstore peerstore.Peerstore
networkID uint64 metrics metrics
handshakeService *handshake.Service networkID uint64
addressbook addressbook.Putter handshakeService *handshake.Service
peers *peerRegistry addressbook addressbook.Putter
topologyNotifier topology.Notifier peers *peerRegistry
conectionBreaker breaker.Interface topologyNotifier topology.Notifier
logger logging.Logger connectionBreaker breaker.Interface
tracer *tracing.Tracer logger logging.Logger
tracer *tracing.Tracer
} }
type Options struct { type Options struct {
...@@ -112,11 +114,15 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -112,11 +114,15 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
security := libp2p.DefaultSecurity security := libp2p.DefaultSecurity
libp2pPeerstore := pstoremem.NewPeerstore() libp2pPeerstore := pstoremem.NewPeerstore()
var natManager basichost.NATManager
opts := []libp2p.Option{ opts := []libp2p.Option{
libp2p.ListenAddrStrings(listenAddrs...), libp2p.ListenAddrStrings(listenAddrs...),
security, security,
// Attempt to open ports using uPNP for NATed hosts. libp2p.NATManager(func(n network.Network) basichost.NATManager {
libp2p.NATPortMap(), natManager = basichost.NewNATManager(n)
return natManager
}),
// Use dedicated peerstore instead the global DefaultPeerstore // Use dedicated peerstore instead the global DefaultPeerstore
libp2p.Peerstore(libp2pPeerstore), libp2p.Peerstore(libp2pPeerstore),
} }
...@@ -157,26 +163,28 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -157,26 +163,28 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
return nil, fmt.Errorf("autonat: %w", err) return nil, fmt.Errorf("autonat: %w", err)
} }
handshakeService, err := handshake.New(overlay, signer, networkID, o.LightNode, o.Logger) handshakeService, err := handshake.New(signer, &UpnpAddressResolver{
host: h,
}, overlay, networkID, o.LightNode, o.Logger)
if err != nil { if err != nil {
return nil, fmt.Errorf("handshake service: %w", err) return nil, fmt.Errorf("handshake service: %w", err)
} }
peerRegistry := newPeerRegistry() peerRegistry := newPeerRegistry()
s := &Service{ s := &Service{
ctx: ctx, ctx: ctx,
host: h, host: h,
libp2pPeerstore: libp2pPeerstore, natManager: natManager,
metrics: newMetrics(), handshakeService: handshakeService,
networkID: networkID, libp2pPeerstore: libp2pPeerstore,
handshakeService: handshakeService, metrics: newMetrics(),
peers: peerRegistry, networkID: networkID,
addressbook: o.Addressbook, peers: peerRegistry,
logger: o.Logger, addressbook: o.Addressbook,
tracer: o.Tracer, logger: o.Logger,
conectionBreaker: breaker.NewBreaker(breaker.Options{}), // todo: fill non-default options tracer: o.Tracer,
connectionBreaker: breaker.NewBreaker(breaker.Options{}), // use default options
} }
// Construct protocols. // Construct protocols.
id := protocol.ID(p2p.NewSwarmStreamName(handshake.ProtocolName, handshake.ProtocolVersion, handshake.StreamName)) id := protocol.ID(p2p.NewSwarmStreamName(handshake.ProtocolName, handshake.ProtocolVersion, handshake.StreamName))
matcher, err := s.protocolSemverMatcher(id) matcher, err := s.protocolSemverMatcher(id)
...@@ -296,6 +304,10 @@ func (s *Service) Addresses() (addreses []ma.Multiaddr, err error) { ...@@ -296,6 +304,10 @@ func (s *Service) Addresses() (addreses []ma.Multiaddr, err error) {
return addreses, nil return addreses, nil
} }
func (s *Service) NATManager() basichost.NATManager {
return s.natManager
}
func buildUnderlayAddress(addr ma.Multiaddr, peerID libp2ppeer.ID) (ma.Multiaddr, error) { func buildUnderlayAddress(addr ma.Multiaddr, peerID libp2ppeer.ID) (ma.Multiaddr, error) {
// Build host multiaddress // Build host multiaddress
hostAddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerID.Pretty())) hostAddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerID.Pretty()))
...@@ -317,9 +329,9 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -317,9 +329,9 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return nil, p2p.ErrAlreadyConnected return nil, p2p.ErrAlreadyConnected
} }
if err := s.conectionBreaker.Execute(func() error { return s.host.Connect(ctx, *info) }); err != nil { if err := s.connectionBreaker.Execute(func() error { return s.host.Connect(ctx, *info) }); err != nil {
if errors.Is(err, breaker.ErrClosed) { if errors.Is(err, breaker.ErrClosed) {
return nil, p2p.NewConnectionBackoffError(err, s.conectionBreaker.ClosedUntil()) return nil, p2p.NewConnectionBackoffError(err, s.connectionBreaker.ClosedUntil())
} }
return nil, err return nil, err
} }
......
...@@ -58,6 +58,7 @@ func newService(t *testing.T, networkID uint64, o libp2p.Options) (s *libp2p.Ser ...@@ -58,6 +58,7 @@ func newService(t *testing.T, networkID uint64, o libp2p.Options) (s *libp2p.Ser
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Cleanup(func() { t.Cleanup(func() {
cancel() cancel()
s.Close() s.Close()
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package libp2p
import (
"errors"
"strings"
"github.com/libp2p/go-libp2p-core/host"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
type UpnpAddressResolver struct {
host host.Host
}
// Resolve checks if there is a possible better advertisable underlay then the provided observed address.
// In some NAT situations, for example in the case when nodes are behind upnp, observer might send the observed address with a wrong port.
// In this case, observed address is compared to addresses provided by host, and if there is a same address but with different port, that one is used as advertisable address instead of provided observed one.
// TODO: this is a quickfix and it will be improved in the future
func (r *UpnpAddressResolver) Resolve(observedAddress ma.Multiaddr) (ma.Multiaddr, error) {
observableAddrInfo, err := libp2ppeer.AddrInfoFromP2pAddr(observedAddress)
if err != nil {
return nil, err
}
if len(observableAddrInfo.Addrs) < 1 {
return nil, errors.New("invalid observed address")
}
observedAddrSplit := strings.Split(observableAddrInfo.Addrs[0].String(), "/")
// if address is not in a form of '/ipversion/ip/protocol/port/...` don't compare to addresses and return it
if len(observedAddrSplit) < 5 {
return observedAddress, nil
}
observedAddressPort := observedAddrSplit[4]
// observervedAddressShort is an obaserved address without port
observervedAddressShort := strings.Join(append(observedAddrSplit[:4], observedAddrSplit[5:]...), "/")
for _, a := range r.host.Addrs() {
asplit := strings.Split(a.String(), "/")
if len(asplit) != len(observedAddrSplit) {
continue
}
aport := asplit[4]
if strings.Join(append(asplit[:4], asplit[5:]...), "/") != observervedAddressShort {
continue
}
if aport != observedAddressPort {
aaddress, err := buildUnderlayAddress(a, observableAddrInfo.ID)
if err != nil {
continue
}
return aaddress, nil
}
}
return observedAddress, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment