Commit fdd955ee authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

bootnode init improvement + libp2p incoming connection addr fix (#46)

parent 354ac8f8
......@@ -46,6 +46,8 @@ func (i *inmem) Put(overlay swarm.Address, addr ma.Multiaddr) (exists bool) {
}
func (i *inmem) Overlays() []swarm.Address {
i.mtx.Lock()
defer i.mtx.Unlock()
keys := make([]swarm.Address, 0, len(i.entries))
for k := range i.entries {
keys = append(keys, swarm.MustParseHexAddress(k))
......@@ -55,6 +57,8 @@ func (i *inmem) Overlays() []swarm.Address {
}
func (i *inmem) Multiaddresses() []ma.Multiaddr {
i.mtx.Lock()
defer i.mtx.Unlock()
values := make([]ma.Multiaddr, 0, len(i.entries))
for _, v := range i.entries {
values = append(values, v.multiaddr)
......
......@@ -86,8 +86,8 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
defer stream.Close()
w, _ := protobuf.NewWriterAndReader(stream)
var peersRequest pb.Peers
for _, p := range peers {
......@@ -111,14 +111,14 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa
}
func (s *Service) peersHandler(_ context.Context, peer p2p.Peer, stream p2p.Stream) error {
defer stream.Close()
_, r := protobuf.NewWriterAndReader(stream)
var peersReq pb.Peers
if err := r.ReadMsgWithTimeout(messageTimeout, &peersReq); err != nil {
stream.Close()
return fmt.Errorf("read requestPeers message: %w", err)
}
stream.Close()
for _, newPeer := range peersReq.Peers {
addr, err := ma.NewMultiaddr(newPeer.Underlay)
if err != nil {
......
......@@ -12,6 +12,7 @@ import (
"net"
"net/http"
"path/filepath"
"sync"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
......@@ -123,19 +124,6 @@ func NewBee(o Options) (*Bee, error) {
}
b.p2pService = p2ps
// TODO: be more resilient on connection errors and connect in parallel
for _, a := range o.Bootnodes {
addr, err := ma.NewMultiaddr(a)
if err != nil {
return nil, fmt.Errorf("bootnode %s: %w", a, err)
}
overlay, err := p2ps.Connect(p2pCtx, addr)
if err != nil {
return nil, fmt.Errorf("connect to bootnode %s %s: %w", a, overlay, err)
}
}
// Construct protocols.
pingPong := pingpong.New(pingpong.Options{
Streamer: p2ps,
......@@ -239,6 +227,37 @@ func NewBee(o Options) (*Bee, error) {
b.debugAPIServer = debugAPIServer
}
// Connect bootnodes
var wg sync.WaitGroup
for _, a := range o.Bootnodes {
wg.Add(1)
go func(aa string) {
defer wg.Done()
addr, err := ma.NewMultiaddr(aa)
if err != nil {
logger.Debugf("multiaddress fail %s: %v", aa, err)
logger.Errorf("connect to bootnode %s", aa)
return
}
overlay, err := p2ps.Connect(p2pCtx, addr)
if err != nil {
logger.Debugf("connect fail %s: %v", aa, err)
logger.Errorf("connect to bootnode %s", aa)
return
}
addressbook.Put(overlay, addr)
if err := topologyDriver.AddPeer(p2pCtx, overlay); err != nil {
_ = p2ps.Disconnect(overlay)
logger.Debugf("topology add peer fail %s %s: %v", aa, overlay, err)
logger.Errorf("connect to bootnode %s", aa)
return
}
}(a)
}
wg.Wait()
return b, nil
}
......
......@@ -193,13 +193,21 @@ func New(ctx context.Context, o Options) (*Service, error) {
}
s.peers.add(stream.Conn(), i.Address)
s.addrssbook.Put(i.Address, stream.Conn().RemoteMultiaddr())
remoteMultiaddr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", stream.Conn().RemoteMultiaddr().String(), peerID.Pretty()))
if err != nil {
s.logger.Debugf("multiaddr error: handle %s: %v", peerID, err)
s.logger.Errorf("unable to connect with peer %v", peerID)
_ = s.disconnect(peerID)
return
}
s.addrssbook.Put(i.Address, remoteMultiaddr)
if s.peerHandler != nil {
if err := s.peerHandler(ctx, i.Address); err != nil {
s.logger.Debugf("peerhandler error: %s: %v", peerID, err)
}
}
s.metrics.HandledStreamCount.Inc()
s.logger.Infof("peer %s connected", i.Address)
})
......@@ -209,7 +217,6 @@ func New(ctx context.Context, o Options) (*Service, error) {
})
h.Network().Notify(peerRegistry) // update peer registry on network events
return s, nil
}
......
......@@ -56,6 +56,8 @@ func (d *Driver) AddPeer(ctx context.Context, addr swarm.Address) error {
d.mtx.Unlock()
return nil
}
d.receivedPeers[addr.ByteString()] = struct{}{}
d.mtx.Unlock()
connectedPeers := d.p2pService.Peers()
......@@ -98,9 +100,6 @@ func (d *Driver) AddPeer(ctx context.Context, addr swarm.Address) error {
return err
}
d.mtx.Lock()
d.receivedPeers[addr.ByteString()] = struct{}{}
d.mtx.Unlock()
return nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment