Commit 3132c693 authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Hive - Topology driver (#26)

* hive

* full connectivity topology implementation
parent cefe893b
...@@ -22,12 +22,27 @@ Execute the command terminals to start `node 1`: ...@@ -22,12 +22,27 @@ Execute the command terminals to start `node 1`:
bee start --api-addr :8081 --p2p-addr :7071 --data-dir data1 bee start --api-addr :8081 --p2p-addr :7071 --data-dir data1
``` ```
### Bootnodes
Use one of the multiaddresses as bootnode for `node 2` in order to connect them: Use one of the multiaddresses as bootnode for `node 2` in order to connect them:
```sh ```sh
bee start --api-addr :8082 --p2p-addr :7072 --data-dir data2 --bootnode /ip4/127.0.0.1/tcp/30401/p2p/QmT4TNB4cKYanUjdYodw1Cns8cuVaRVo24hHNYcT7JjkTB bee start --api-addr :8082 --p2p-addr :7072 --data-dir data2 --bootnode /ip4/127.0.0.1/tcp/30401/p2p/QmT4TNB4cKYanUjdYodw1Cns8cuVaRVo24hHNYcT7JjkTB
``` ```
### Debug API
Start `node 2` with debugapi enabled:
```sh
bee start --api-addr :8082 --p2p-addr :7072 --debug-api-addr :6062 --enable-debug-api --data-dir dist/storage2
```
Use one of the multiaddresses of `node 1` in order to connect them:
```sh
curl -XPOST localhost:6062/connect/ip4/127.0.0.1/tcp/30401/p2p/QmT4TNB4cKYanUjdYodw1Cns8cuVaRVo24hHNYcT7JjkTB
```
### Ping-pong
Take the address of the connected peer to `node 1` from log line `peer "4932309428148935717" connected` and make an HTTP POST request to `localhost:{PORT1}/pingpong/{ADDRESS}` like: Take the address of the connected peer to `node 1` from log line `peer "4932309428148935717" connected` and make an HTTP POST request to `localhost:{PORT1}/pingpong/{ADDRESS}` like:
```sh ```sh
......
...@@ -98,10 +98,10 @@ func (c *command) initStartCmd() (err error) { ...@@ -98,10 +98,10 @@ func (c *command) initStartCmd() (err error) {
Addr: c.config.GetString(optionNameP2PAddr), Addr: c.config.GetString(optionNameP2PAddr),
DisableWS: c.config.GetBool(optionNameP2PDisableWS), DisableWS: c.config.GetBool(optionNameP2PDisableWS),
DisableQUIC: c.config.GetBool(optionNameP2PDisableQUIC), DisableQUIC: c.config.GetBool(optionNameP2PDisableQUIC),
Bootnodes: c.config.GetStringSlice(optionNameBootnodes),
NetworkID: c.config.GetInt32(optionNameNetworkID), NetworkID: c.config.GetInt32(optionNameNetworkID),
Logger: logger, Logger: logger,
}, },
Bootnodes: c.config.GetStringSlice(optionNameBootnodes),
Logger: logger, Logger: logger,
}) })
if err != nil { if err != nil {
......
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
type GetterPutter interface { type GetPutter interface {
Getter Getter
Putter Putter
} }
......
...@@ -23,7 +23,7 @@ type peerEntry struct { ...@@ -23,7 +23,7 @@ type peerEntry struct {
multiaddr ma.Multiaddr multiaddr ma.Multiaddr
} }
func New() addressbook.GetterPutter { func New() addressbook.GetPutter {
return &inmem{ return &inmem{
entries: make(map[string]peerEntry), entries: make(map[string]peerEntry),
} }
...@@ -40,7 +40,6 @@ func (i *inmem) Get(overlay swarm.Address) (addr ma.Multiaddr, exists bool) { ...@@ -40,7 +40,6 @@ func (i *inmem) Get(overlay swarm.Address) (addr ma.Multiaddr, exists bool) {
func (i *inmem) Put(overlay swarm.Address, addr ma.Multiaddr) (exists bool) { func (i *inmem) Put(overlay swarm.Address, addr ma.Multiaddr) (exists bool) {
i.mtx.Lock() i.mtx.Lock()
defer i.mtx.Unlock() defer i.mtx.Unlock()
_, e := i.entries[overlay.String()] _, e := i.entries[overlay.String()]
i.entries[overlay.String()] = peerEntry{overlay: overlay, multiaddr: addr} i.entries[overlay.String()] = peerEntry{overlay: overlay, multiaddr: addr}
return e return e
......
...@@ -7,8 +7,10 @@ package debugapi ...@@ -7,8 +7,10 @@ package debugapi
import ( import (
"net/http" "net/http"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/topology"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
...@@ -26,6 +28,8 @@ type server struct { ...@@ -26,6 +28,8 @@ type server struct {
type Options struct { type Options struct {
P2P p2p.Service P2P p2p.Service
Addressbook addressbook.GetPutter
TopologyDriver topology.PeerAdder
Logger logging.Logger Logger logging.Logger
} }
......
...@@ -11,9 +11,12 @@ import ( ...@@ -11,9 +11,12 @@ import (
"net/url" "net/url"
"testing" "testing"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/addressbook/inmem"
"github.com/ethersphere/bee/pkg/debugapi" "github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/topology/mock"
"resenje.org/web" "resenje.org/web"
) )
...@@ -21,15 +24,27 @@ type testServerOptions struct { ...@@ -21,15 +24,27 @@ type testServerOptions struct {
P2P p2p.Service P2P p2p.Service
} }
func newTestServer(t *testing.T, o testServerOptions) (client *http.Client, cleanup func()) { type testServer struct {
Client *http.Client
Addressbook addressbook.GetPutter
TopologyDriver *mock.TopologyDriver
Cleanup func()
}
func newTestServer(t *testing.T, o testServerOptions) *testServer {
addressbook := inmem.New()
topologyDriver := mock.NewTopologyDriver()
s := debugapi.New(debugapi.Options{ s := debugapi.New(debugapi.Options{
P2P: o.P2P, P2P: o.P2P,
Logger: logging.New(ioutil.Discard, 0), Logger: logging.New(ioutil.Discard, 0),
Addressbook: addressbook,
TopologyDriver: topologyDriver,
}) })
ts := httptest.NewServer(s) ts := httptest.NewServer(s)
cleanup = ts.Close cleanup := ts.Close
client = &http.Client{ client := &http.Client{
Transport: web.RoundTripperFunc(func(r *http.Request) (*http.Response, error) { Transport: web.RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
u, err := url.Parse(ts.URL + r.URL.String()) u, err := url.Parse(ts.URL + r.URL.String())
if err != nil { if err != nil {
...@@ -39,5 +54,10 @@ func newTestServer(t *testing.T, o testServerOptions) (client *http.Client, clea ...@@ -39,5 +54,10 @@ func newTestServer(t *testing.T, o testServerOptions) (client *http.Client, clea
return ts.Client().Transport.RoundTrip(r) return ts.Client().Transport.RoundTrip(r)
}), }),
} }
return client, cleanup return &testServer{
Client: client,
Addressbook: addressbook,
TopologyDriver: topologyDriver,
Cleanup: cleanup,
}
} }
...@@ -35,6 +35,15 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) { ...@@ -35,6 +35,15 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
s.Addressbook.Put(address, addr)
if err := s.TopologyDriver.AddPeer(r.Context(), address); err != nil {
_ = s.P2P.Disconnect(address)
s.Logger.Debugf("debug api: topologyDriver.AddPeer %s: %v", addr, err)
s.Logger.Errorf("unable to connect to peer %s", addr)
jsonhttp.InternalServerError(w, err)
return
}
jsonhttp.OK(w, peerConnectResponse{ jsonhttp.OK(w, peerConnectResponse{
Address: address.String(), Address: address.String(),
}) })
......
...@@ -25,7 +25,7 @@ func TestConnect(t *testing.T) { ...@@ -25,7 +25,7 @@ func TestConnect(t *testing.T) {
overlay := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c") overlay := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
testErr := errors.New("test error") testErr := errors.New("test error")
client, cleanup := newTestServer(t, testServerOptions{ testServer := newTestServer(t, testServerOptions{
P2P: mock.New(mock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) { P2P: mock.New(mock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) {
if addr.String() == errorUnderlay { if addr.String() == errorUnderlay {
return swarm.Address{}, testErr return swarm.Address{}, testErr
...@@ -33,27 +33,63 @@ func TestConnect(t *testing.T) { ...@@ -33,27 +33,63 @@ func TestConnect(t *testing.T) {
return overlay, nil return overlay, nil
})), })),
}) })
defer cleanup() defer testServer.Cleanup()
t.Run("ok", func(t *testing.T) { t.Run("ok", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodPost, "/connect"+underlay, nil, http.StatusOK, debugapi.PeerConnectResponse{ jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodPost, "/connect"+underlay, nil, http.StatusOK, debugapi.PeerConnectResponse{
Address: overlay.String(), Address: overlay.String(),
}) })
multia, exists := testServer.Addressbook.Get(overlay)
if exists != true && underlay != multia.String() {
t.Fatalf("found wrong underlay. expected: %s, found: %s", underlay, multia.String())
}
}) })
t.Run("error", func(t *testing.T) { t.Run("error", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodPost, "/connect"+errorUnderlay, nil, http.StatusInternalServerError, jsonhttp.StatusResponse{ jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodPost, "/connect"+errorUnderlay, nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
Code: http.StatusInternalServerError, Code: http.StatusInternalServerError,
Message: testErr.Error(), Message: testErr.Error(),
}) })
}) })
t.Run("get method not allowed", func(t *testing.T) { t.Run("get method not allowed", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodGet, "/connect"+underlay, nil, http.StatusMethodNotAllowed, jsonhttp.StatusResponse{ jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodGet, "/connect"+underlay, nil, http.StatusMethodNotAllowed, jsonhttp.StatusResponse{
Code: http.StatusMethodNotAllowed, Code: http.StatusMethodNotAllowed,
Message: http.StatusText(http.StatusMethodNotAllowed), Message: http.StatusText(http.StatusMethodNotAllowed),
}) })
}) })
t.Run("error - add peer", func(t *testing.T) {
disconnectCalled := false
testServer := newTestServer(t, testServerOptions{
P2P: mock.New(mock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) {
if addr.String() == errorUnderlay {
return swarm.Address{}, testErr
}
return overlay, nil
}), mock.WithDisconnectFunc(func(addr swarm.Address) error {
disconnectCalled = true
return nil
})),
})
defer testServer.Cleanup()
testServer.TopologyDriver.SetAddPeerErr(testErr)
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodPost, "/connect"+underlay, nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
Code: http.StatusInternalServerError,
Message: testErr.Error(),
})
multia, exists := testServer.Addressbook.Get(overlay)
if exists != true && underlay != multia.String() {
t.Fatalf("found wrong underlay. expected: %s, found: %s", underlay, multia.String())
}
if !disconnectCalled {
t.Fatalf("disconnect not called.")
}
})
} }
func TestDisconnect(t *testing.T) { func TestDisconnect(t *testing.T) {
...@@ -62,7 +98,7 @@ func TestDisconnect(t *testing.T) { ...@@ -62,7 +98,7 @@ func TestDisconnect(t *testing.T) {
errorAddress := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59a") errorAddress := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59a")
testErr := errors.New("test error") testErr := errors.New("test error")
client, cleanup := newTestServer(t, testServerOptions{ testServer := newTestServer(t, testServerOptions{
P2P: mock.New(mock.WithDisconnectFunc(func(addr swarm.Address) error { P2P: mock.New(mock.WithDisconnectFunc(func(addr swarm.Address) error {
if addr.Equal(address) { if addr.Equal(address) {
return nil return nil
...@@ -75,31 +111,31 @@ func TestDisconnect(t *testing.T) { ...@@ -75,31 +111,31 @@ func TestDisconnect(t *testing.T) {
return p2p.ErrPeerNotFound return p2p.ErrPeerNotFound
})), })),
}) })
defer cleanup() defer testServer.Cleanup()
t.Run("ok", func(t *testing.T) { t.Run("ok", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodDelete, "/peers/"+address.String(), nil, http.StatusOK, jsonhttp.StatusResponse{ jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodDelete, "/peers/"+address.String(), nil, http.StatusOK, jsonhttp.StatusResponse{
Code: http.StatusOK, Code: http.StatusOK,
Message: http.StatusText(http.StatusOK), Message: http.StatusText(http.StatusOK),
}) })
}) })
t.Run("unknown", func(t *testing.T) { t.Run("unknown", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodDelete, "/peers/"+unknownAdddress.String(), nil, http.StatusBadRequest, jsonhttp.StatusResponse{ jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodDelete, "/peers/"+unknownAdddress.String(), nil, http.StatusBadRequest, jsonhttp.StatusResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,
Message: "peer not found", Message: "peer not found",
}) })
}) })
t.Run("invalid peer address", func(t *testing.T) { t.Run("invalid peer address", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodDelete, "/peers/invalid-address", nil, http.StatusBadRequest, jsonhttp.StatusResponse{ jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodDelete, "/peers/invalid-address", nil, http.StatusBadRequest, jsonhttp.StatusResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,
Message: "invalid peer address", Message: "invalid peer address",
}) })
}) })
t.Run("error", func(t *testing.T) { t.Run("error", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodDelete, "/peers/"+errorAddress.String(), nil, http.StatusInternalServerError, jsonhttp.StatusResponse{ jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodDelete, "/peers/"+errorAddress.String(), nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
Code: http.StatusInternalServerError, Code: http.StatusInternalServerError,
Message: testErr.Error(), Message: testErr.Error(),
}) })
...@@ -109,21 +145,21 @@ func TestDisconnect(t *testing.T) { ...@@ -109,21 +145,21 @@ func TestDisconnect(t *testing.T) {
func TestPeer(t *testing.T) { func TestPeer(t *testing.T) {
overlay := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c") overlay := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
client, cleanup := newTestServer(t, testServerOptions{ testServer := newTestServer(t, testServerOptions{
P2P: mock.New(mock.WithPeersFunc(func() []p2p.Peer { P2P: mock.New(mock.WithPeersFunc(func() []p2p.Peer {
return []p2p.Peer{{Address: overlay}} return []p2p.Peer{{Address: overlay}}
})), })),
}) })
defer cleanup() defer testServer.Cleanup()
t.Run("ok", func(t *testing.T) { t.Run("ok", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodGet, "/peers", nil, http.StatusOK, debugapi.PeersResponse{ jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodGet, "/peers", nil, http.StatusOK, debugapi.PeersResponse{
Peers: []p2p.Peer{{Address: overlay}}, Peers: []p2p.Peer{{Address: overlay}},
}) })
}) })
t.Run("get method not allowed", func(t *testing.T) { t.Run("get method not allowed", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodPost, "/peers", nil, http.StatusMethodNotAllowed, jsonhttp.StatusResponse{ jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodPost, "/peers", nil, http.StatusMethodNotAllowed, jsonhttp.StatusResponse{
Code: http.StatusMethodNotAllowed, Code: http.StatusMethodNotAllowed,
Message: http.StatusText(http.StatusMethodNotAllowed), Message: http.StatusText(http.StatusMethodNotAllowed),
}) })
......
...@@ -13,19 +13,19 @@ import ( ...@@ -13,19 +13,19 @@ import (
) )
func TestHealth(t *testing.T) { func TestHealth(t *testing.T) {
client, cleanup := newTestServer(t, testServerOptions{}) testServer := newTestServer(t, testServerOptions{})
defer cleanup() defer testServer.Cleanup()
jsonhttptest.ResponseDirect(t, client, http.MethodGet, "/health", nil, http.StatusOK, debugapi.StatusResponse{ jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodGet, "/health", nil, http.StatusOK, debugapi.StatusResponse{
Status: "ok", Status: "ok",
}) })
} }
func TestReadiness(t *testing.T) { func TestReadiness(t *testing.T) {
client, cleanup := newTestServer(t, testServerOptions{}) testServer := newTestServer(t, testServerOptions{})
defer cleanup() defer testServer.Cleanup()
jsonhttptest.ResponseDirect(t, client, http.MethodGet, "/readiness", nil, http.StatusOK, debugapi.StatusResponse{ jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodGet, "/readiness", nil, http.StatusOK, debugapi.StatusResponse{
Status: "ok", Status: "ok",
}) })
} }
...@@ -8,14 +8,8 @@ import ( ...@@ -8,14 +8,8 @@ import (
"context" "context"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
) )
type BroadcastRecord struct {
Overlay swarm.Address
Addr ma.Multiaddr
}
type Driver interface { type Driver interface {
BroadcastPeers(ctx context.Context, addressee swarm.Address, peers ...BroadcastRecord) error BroadcastPeers(ctx context.Context, addressee swarm.Address, peers ...swarm.Address) error
} }
...@@ -8,32 +8,31 @@ import ( ...@@ -8,32 +8,31 @@ import (
"context" "context"
"sync" "sync"
"github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
) )
type Discovery struct { type Discovery struct {
mtx sync.Mutex mtx sync.Mutex
ctr int //how many ops ctr int //how many ops
records map[string]discovery.BroadcastRecord records map[string][]swarm.Address
} }
func NewDiscovery() *Discovery { func NewDiscovery() *Discovery {
return &Discovery{ return &Discovery{
records: make(map[string]discovery.BroadcastRecord), records: make(map[string][]swarm.Address),
} }
} }
func (d *Discovery) BroadcastPeers(ctx context.Context, addressee swarm.Address, peers ...discovery.BroadcastRecord) error { func (d *Discovery) BroadcastPeers(ctx context.Context, addressee swarm.Address, peers ...swarm.Address) error {
for _, peer := range peers { for _, peer := range peers {
d.mtx.Lock() d.mtx.Lock()
d.ctr++ d.records[addressee.String()] = append(d.records[addressee.String()], peer)
d.records[addressee.String()] = discovery.BroadcastRecord{Overlay: peer.Overlay, Addr: peer.Addr}
d.mtx.Unlock() d.mtx.Unlock()
} }
d.mtx.Lock()
d.ctr++
d.mtx.Unlock()
return nil return nil
} }
...@@ -43,12 +42,9 @@ func (d *Discovery) Broadcasts() int { ...@@ -43,12 +42,9 @@ func (d *Discovery) Broadcasts() int {
return d.ctr return d.ctr
} }
func (d *Discovery) AddresseeRecord(addressee swarm.Address) (overlay swarm.Address, addr ma.Multiaddr) { func (d *Discovery) AddresseeRecords(addressee swarm.Address) (peers []swarm.Address, exists bool) {
d.mtx.Lock() d.mtx.Lock()
defer d.mtx.Unlock() defer d.mtx.Unlock()
rec, exists := d.records[addressee.String()] peers, exists = d.records[addressee.String()]
if !exists { return
return swarm.Address{}, nil
}
return rec.Overlay, rec.Addr
} }
...@@ -10,7 +10,6 @@ import ( ...@@ -10,7 +10,6 @@ import (
"time" "time"
"github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/hive/pb" "github.com/ethersphere/bee/pkg/hive/pb"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
...@@ -30,13 +29,14 @@ const ( ...@@ -30,13 +29,14 @@ const (
type Service struct { type Service struct {
streamer p2p.Streamer streamer p2p.Streamer
addressBook addressbook.GetterPutter addressBook addressbook.GetPutter
peerHandler func(context.Context, swarm.Address) error
logger logging.Logger logger logging.Logger
} }
type Options struct { type Options struct {
Streamer p2p.Streamer Streamer p2p.Streamer
AddressBook addressbook.GetterPutter AddressBook addressbook.GetPutter
Logger logging.Logger Logger logging.Logger
} }
...@@ -61,7 +61,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec { ...@@ -61,7 +61,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
} }
} }
func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, peers ...discovery.BroadcastRecord) error { func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, peers ...swarm.Address) error {
max := maxBatchSize max := maxBatchSize
for len(peers) > 0 { for len(peers) > 0 {
if max > len(peers) { if max > len(peers) {
...@@ -77,20 +77,29 @@ func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, p ...@@ -77,20 +77,29 @@ func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, p
return nil return nil
} }
func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []discovery.BroadcastRecord) error { func (s *Service) SetPeerAddedHandler(h func(ctx context.Context, addr swarm.Address) error) {
s.peerHandler = h
}
func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swarm.Address) error {
stream, err := s.streamer.NewStream(ctx, peer, protocolName, protocolVersion, peersStreamName) stream, err := s.streamer.NewStream(ctx, peer, protocolName, protocolVersion, peersStreamName)
if err != nil { if err != nil {
return fmt.Errorf("new stream: %w", err) return fmt.Errorf("new stream: %w", err)
} }
defer stream.Close() defer stream.Close()
w, _ := protobuf.NewWriterAndReader(stream) w, _ := protobuf.NewWriterAndReader(stream)
var peersRequest pb.Peers var peersRequest pb.Peers
for _, p := range peers { for _, p := range peers {
addr, found := s.addressBook.Get(p)
if !found {
s.logger.Debugf("Peer not found %s", peer, err)
continue
}
peersRequest.Peers = append(peersRequest.Peers, &pb.BzzAddress{ peersRequest.Peers = append(peersRequest.Peers, &pb.BzzAddress{
Overlay: p.Overlay.Bytes(), Overlay: p.Bytes(),
Underlay: p.Addr.String(), Underlay: addr.String(),
}) })
} }
...@@ -103,6 +112,7 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []dis ...@@ -103,6 +112,7 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []dis
func (s *Service) peersHandler(peer p2p.Peer, stream p2p.Stream) error { func (s *Service) peersHandler(peer p2p.Peer, stream p2p.Stream) error {
defer stream.Close() defer stream.Close()
_, r := protobuf.NewWriterAndReader(stream) _, r := protobuf.NewWriterAndReader(stream)
var peersReq pb.Peers var peersReq pb.Peers
if err := r.ReadMsgWithTimeout(messageTimeout, &peersReq); err != nil { if err := r.ReadMsgWithTimeout(messageTimeout, &peersReq); err != nil {
...@@ -116,8 +126,12 @@ func (s *Service) peersHandler(peer p2p.Peer, stream p2p.Stream) error { ...@@ -116,8 +126,12 @@ func (s *Service) peersHandler(peer p2p.Peer, stream p2p.Stream) error {
continue continue
} }
// todo: this might be changed depending on where do we decide to connect peers
s.addressBook.Put(swarm.NewAddress(newPeer.Overlay), addr) s.addressBook.Put(swarm.NewAddress(newPeer.Overlay), addr)
if s.peerHandler != nil {
if err := s.peerHandler(context.Background(), swarm.NewAddress(newPeer.Overlay)); err != nil {
return err
}
}
} }
return nil return nil
......
...@@ -19,7 +19,6 @@ import ( ...@@ -19,7 +19,6 @@ import (
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/ethersphere/bee/pkg/addressbook/inmem" "github.com/ethersphere/bee/pkg/addressbook/inmem"
"github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/hive" "github.com/ethersphere/bee/pkg/hive"
"github.com/ethersphere/bee/pkg/hive/pb" "github.com/ethersphere/bee/pkg/hive/pb"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
...@@ -36,12 +35,12 @@ type AddressExporter interface { ...@@ -36,12 +35,12 @@ type AddressExporter interface {
func TestBroadcastPeers(t *testing.T) { func TestBroadcastPeers(t *testing.T) {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
addressbook := inmem.New()
// populate all expected and needed random resources for 2 full batches // populate all expected and needed random resources for 2 full batches
// tests cases that uses fewer resources can use sub-slices of this data // tests cases that uses fewer resources can use sub-slices of this data
var multiaddrs []ma.Multiaddr var multiaddrs []ma.Multiaddr
var addrs []swarm.Address var addrs []swarm.Address
var records []discovery.BroadcastRecord
var wantMsgs []pb.Peers var wantMsgs []pb.Peers
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
...@@ -56,57 +55,57 @@ func TestBroadcastPeers(t *testing.T) { ...@@ -56,57 +55,57 @@ func TestBroadcastPeers(t *testing.T) {
multiaddrs = append(multiaddrs, ma) multiaddrs = append(multiaddrs, ma)
addrs = append(addrs, swarm.NewAddress(createRandomBytes())) addrs = append(addrs, swarm.NewAddress(createRandomBytes()))
addressbook.Put(addrs[i], multiaddrs[i])
wantMsgs[i/hive.MaxBatchSize].Peers = append(wantMsgs[i/hive.MaxBatchSize].Peers, &pb.BzzAddress{Overlay: addrs[i].Bytes(), Underlay: multiaddrs[i].String()}) wantMsgs[i/hive.MaxBatchSize].Peers = append(wantMsgs[i/hive.MaxBatchSize].Peers, &pb.BzzAddress{Overlay: addrs[i].Bytes(), Underlay: multiaddrs[i].String()})
records = append(records, discovery.BroadcastRecord{Overlay: addrs[i], Addr: multiaddrs[i]})
} }
testCases := map[string]struct { testCases := map[string]struct {
addresee swarm.Address addresee swarm.Address
peers []discovery.BroadcastRecord peers []swarm.Address
wantMsgs []pb.Peers wantMsgs []pb.Peers
wantKeys []swarm.Address wantOverlays []swarm.Address
wantValues []ma.Multiaddr wantMultiAddresses []ma.Multiaddr
}{ }{
"OK - single record": { "OK - single record": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"), addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: []discovery.BroadcastRecord{{Overlay: addrs[0], Addr: multiaddrs[0]}}, peers: []swarm.Address{addrs[0]},
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:1]}}, wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:1]}},
wantKeys: []swarm.Address{addrs[0]}, wantOverlays: []swarm.Address{addrs[0]},
wantValues: []ma.Multiaddr{multiaddrs[0]}, wantMultiAddresses: []ma.Multiaddr{multiaddrs[0]},
}, },
"OK - single batch - multiple records": { "OK - single batch - multiple records": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"), addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: records[:15], peers: addrs[:15],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:15]}}, wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:15]}},
wantKeys: addrs[:15], wantOverlays: addrs[:15],
wantValues: multiaddrs[:15], wantMultiAddresses: multiaddrs[:15],
}, },
"OK - single batch - max number of records": { "OK - single batch - max number of records": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"), addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: records[:hive.MaxBatchSize], peers: addrs[:hive.MaxBatchSize],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:hive.MaxBatchSize]}}, wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:hive.MaxBatchSize]}},
wantKeys: addrs[:hive.MaxBatchSize], wantOverlays: addrs[:hive.MaxBatchSize],
wantValues: multiaddrs[:hive.MaxBatchSize], wantMultiAddresses: multiaddrs[:hive.MaxBatchSize],
}, },
"OK - multiple batches": { "OK - multiple batches": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"), addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: records[:hive.MaxBatchSize+10], peers: addrs[:hive.MaxBatchSize+10],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers}, {Peers: wantMsgs[1].Peers[:10]}}, wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers}, {Peers: wantMsgs[1].Peers[:10]}},
wantKeys: addrs[:hive.MaxBatchSize+10], wantOverlays: addrs[:hive.MaxBatchSize+10],
wantValues: multiaddrs[:hive.MaxBatchSize+10], wantMultiAddresses: multiaddrs[:hive.MaxBatchSize+10],
}, },
"OK - multiple batches - max number of records": { "OK - multiple batches - max number of records": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"), addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: records[:2*hive.MaxBatchSize], peers: addrs[:2*hive.MaxBatchSize],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers}, {Peers: wantMsgs[1].Peers}}, wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers}, {Peers: wantMsgs[1].Peers}},
wantKeys: addrs[:2*hive.MaxBatchSize], wantOverlays: addrs[:2*hive.MaxBatchSize],
wantValues: multiaddrs[:2*hive.MaxBatchSize], wantMultiAddresses: multiaddrs[:2*hive.MaxBatchSize],
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
addressbook := inmem.New() addressbookclean := inmem.New()
exporter, ok := addressbook.(AddressExporter) exporter, ok := addressbookclean.(AddressExporter)
if !ok { if !ok {
t.Fatal("could not type assert AddressExporter") t.Fatal("could not type assert AddressExporter")
} }
...@@ -114,7 +113,7 @@ func TestBroadcastPeers(t *testing.T) { ...@@ -114,7 +113,7 @@ func TestBroadcastPeers(t *testing.T) {
// create a hive server that handles the incoming stream // create a hive server that handles the incoming stream
server := hive.New(hive.Options{ server := hive.New(hive.Options{
Logger: logger, Logger: logger,
AddressBook: addressbook, AddressBook: addressbookclean,
}) })
// setup the stream recorder to record stream data // setup the stream recorder to record stream data
...@@ -126,6 +125,7 @@ func TestBroadcastPeers(t *testing.T) { ...@@ -126,6 +125,7 @@ func TestBroadcastPeers(t *testing.T) {
client := hive.New(hive.Options{ client := hive.New(hive.Options{
Streamer: recorder, Streamer: recorder,
Logger: logger, Logger: logger,
AddressBook: addressbook,
}) })
if err := client.BroadcastPeers(context.Background(), tc.addresee, tc.peers...); err != nil { if err := client.BroadcastPeers(context.Background(), tc.addresee, tc.peers...); err != nil {
...@@ -153,12 +153,12 @@ func TestBroadcastPeers(t *testing.T) { ...@@ -153,12 +153,12 @@ func TestBroadcastPeers(t *testing.T) {
} }
} }
if !compareOverlays(exporter.Overlays(), tc.wantKeys) { if !compareOverlays(exporter.Overlays(), tc.wantOverlays) {
t.Errorf("Overlays got %v, want %v", exporter.Overlays(), tc.wantKeys) t.Errorf("Overlays got %v, want %v", exporter.Overlays(), tc.wantOverlays)
} }
if !compareMultiaddrses(exporter.Multiaddresses(), tc.wantValues) { if !compareMultiaddrses(exporter.Multiaddresses(), tc.wantMultiAddresses) {
t.Errorf("Multiaddresses got %v, want %v", exporter.Multiaddresses(), tc.wantValues) t.Errorf("Multiaddresses got %v, want %v", exporter.Multiaddresses(), tc.wantMultiAddresses)
} }
} }
......
...@@ -16,9 +16,11 @@ import ( ...@@ -16,9 +16,11 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/ethersphere/bee/pkg/addressbook/inmem"
"github.com/ethersphere/bee/pkg/api" "github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/debugapi" "github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/hive"
"github.com/ethersphere/bee/pkg/keystore" "github.com/ethersphere/bee/pkg/keystore"
filekeystore "github.com/ethersphere/bee/pkg/keystore/file" filekeystore "github.com/ethersphere/bee/pkg/keystore/file"
memkeystore "github.com/ethersphere/bee/pkg/keystore/mem" memkeystore "github.com/ethersphere/bee/pkg/keystore/mem"
...@@ -26,6 +28,8 @@ import ( ...@@ -26,6 +28,8 @@ import (
"github.com/ethersphere/bee/pkg/metrics" "github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/topology/full"
ma "github.com/multiformats/go-multiaddr"
) )
type Bee struct { type Bee struct {
...@@ -42,11 +46,13 @@ type Options struct { ...@@ -42,11 +46,13 @@ type Options struct {
APIAddr string APIAddr string
DebugAPIAddr string DebugAPIAddr string
LibP2POptions libp2p.Options LibP2POptions libp2p.Options
Bootnodes []string
Logger logging.Logger Logger logging.Logger
} }
func NewBee(o Options) (*Bee, error) { func NewBee(o Options) (*Bee, error) {
logger := o.Logger logger := o.Logger
addressbook := inmem.New()
p2pCtx, p2pCancel := context.WithCancel(context.Background()) p2pCtx, p2pCancel := context.WithCancel(context.Background())
...@@ -86,23 +92,49 @@ func NewBee(o Options) (*Bee, error) { ...@@ -86,23 +92,49 @@ func NewBee(o Options) (*Bee, error) {
libP2POptions := o.LibP2POptions libP2POptions := o.LibP2POptions
libP2POptions.Overlay = address libP2POptions.Overlay = address
libP2POptions.PrivateKey = libp2pPrivateKey libP2POptions.PrivateKey = libp2pPrivateKey
libP2POptions.Addressbook = addressbook
p2ps, err := libp2p.New(p2pCtx, libP2POptions) p2ps, err := libp2p.New(p2pCtx, libP2POptions)
if err != nil { if err != nil {
return nil, fmt.Errorf("p2p service: %w", err) return nil, fmt.Errorf("p2p service: %w", err)
} }
b.p2pService = p2ps b.p2pService = p2ps
// TODO: be more resilient on connection errors and connect in parallel
for _, a := range o.Bootnodes {
addr, err := ma.NewMultiaddr(a)
if err != nil {
return nil, fmt.Errorf("bootnode %s: %w", a, err)
}
overlay, err := p2ps.Connect(p2pCtx, addr)
if err != nil {
return nil, fmt.Errorf("connect to bootnode %s %s: %w", a, overlay, err)
}
}
// Construct protocols. // Construct protocols.
pingPong := pingpong.New(pingpong.Options{ pingPong := pingpong.New(pingpong.Options{
Streamer: p2ps, Streamer: p2ps,
Logger: logger, Logger: logger,
}) })
// Add protocols to the P2P service.
if err = p2ps.AddProtocol(pingPong.Protocol()); err != nil { if err = p2ps.AddProtocol(pingPong.Protocol()); err != nil {
return nil, fmt.Errorf("pingpong service: %w", err) return nil, fmt.Errorf("pingpong service: %w", err)
} }
hive := hive.New(hive.Options{
Streamer: p2ps,
AddressBook: addressbook,
Logger: logger,
})
if err = p2ps.AddProtocol(hive.Protocol()); err != nil {
return nil, fmt.Errorf("hive service: %w", err)
}
topologyDriver := full.New(hive, addressbook, p2ps, logger)
hive.SetPeerAddedHandler(topologyDriver.AddPeer)
p2ps.SetPeerAddedHandler(topologyDriver.AddPeer)
addrs, err := p2ps.Addresses() addrs, err := p2ps.Addresses()
if err != nil { if err != nil {
return nil, fmt.Errorf("get server addresses: %w", err) return nil, fmt.Errorf("get server addresses: %w", err)
...@@ -146,6 +178,8 @@ func NewBee(o Options) (*Bee, error) { ...@@ -146,6 +178,8 @@ func NewBee(o Options) (*Bee, error) {
debugAPIService := debugapi.New(debugapi.Options{ debugAPIService := debugapi.New(debugapi.Options{
P2P: p2ps, P2P: p2ps,
Logger: logger, Logger: logger,
Addressbook: addressbook,
TopologyDriver: topologyDriver,
}) })
// register metrics from components // register metrics from components
debugAPIService.MustRegisterMetrics(p2ps.Metrics()...) debugAPIService.MustRegisterMetrics(p2ps.Metrics()...)
......
...@@ -9,11 +9,8 @@ import ( ...@@ -9,11 +9,8 @@ import (
"errors" "errors"
"testing" "testing"
"github.com/ethersphere/bee/pkg/addressbook/inmem"
"github.com/ethersphere/bee/pkg/discovery/mock"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/topology/full"
) )
func TestAddresses(t *testing.T) { func TestAddresses(t *testing.T) {
...@@ -284,36 +281,6 @@ func TestDifferentNetworkIDs(t *testing.T) { ...@@ -284,36 +281,6 @@ func TestDifferentNetworkIDs(t *testing.T) {
expectPeers(t, s2) expectPeers(t, s2)
} }
func TestBootnodes(t *testing.T) {
s1, overlay1, cleanup1 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup1()
s2, overlay2, cleanup2 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup2()
addrs1, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
addrs2, err := s2.Addresses()
if err != nil {
t.Fatal(err)
}
s3, overlay3, cleanup3 := newService(t, libp2p.Options{
NetworkID: 1,
Bootnodes: []string{
addrs1[0].String(),
addrs2[0].String(),
},
})
defer cleanup3()
expectPeers(t, s3, overlay1, overlay2)
expectPeers(t, s1, overlay3)
expectPeers(t, s2, overlay3)
}
func TestConnectWithDisabledQUICAndWSTransports(t *testing.T) { func TestConnectWithDisabledQUICAndWSTransports(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
...@@ -341,54 +308,3 @@ func TestConnectWithDisabledQUICAndWSTransports(t *testing.T) { ...@@ -341,54 +308,3 @@ func TestConnectWithDisabledQUICAndWSTransports(t *testing.T) {
expectPeers(t, s2, overlay1) expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2) expectPeersEventually(t, s1, overlay2)
} }
func TestConnectWithMockDiscovery(t *testing.T) {
o1 := libp2p.Options{
NetworkID: 1,
}
s1, _, cleanup1 := newService(t, o1)
defer cleanup1()
disc2 := mock.NewDiscovery()
ab2 := inmem.New()
o2 := libp2p.Options{
NetworkID: 1,
TopologyDriver: full.New(disc2, ab2),
AddressBook: ab2,
}
s2, _, cleanup2 := newService(t, o2)
defer cleanup2()
s3, _, cleanup3 := newService(t, o2)
defer cleanup3()
addrs, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
addr := addrs[0]
_, err = s2.Connect(context.Background(), addr)
if err != nil {
t.Fatal(err)
}
if v := disc2.Broadcasts(); v != 0 {
t.Fatalf("expected 0 peer broadcasts, got %d", v)
}
addrs, err = s3.Addresses()
if err != nil {
t.Fatal(err)
}
addr = addrs[0]
_, err = s2.Connect(context.Background(), addr)
if err != nil {
t.Fatal(err)
}
if v := disc2.Broadcasts(); v != 2 {
t.Fatalf("expected 2 peer broadcasts, got %d", v)
}
}
...@@ -16,7 +16,6 @@ import ( ...@@ -16,7 +16,6 @@ import (
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake" handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat-svc" autonat "github.com/libp2p/go-libp2p-autonat-svc"
crypto "github.com/libp2p/go-libp2p-core/crypto" crypto "github.com/libp2p/go-libp2p-core/crypto"
...@@ -42,9 +41,9 @@ type Service struct { ...@@ -42,9 +41,9 @@ type Service struct {
metrics metrics metrics metrics
networkID int32 networkID int32
handshakeService *handshake.Service handshakeService *handshake.Service
addrssbook addressbook.Putter
peers *peerRegistry peers *peerRegistry
topologyDriver topology.Driver peerHandler func(context.Context, swarm.Address) error
addressBook addressbook.Putter
logger logging.Logger logger logging.Logger
} }
...@@ -54,10 +53,8 @@ type Options struct { ...@@ -54,10 +53,8 @@ type Options struct {
Addr string Addr string
DisableWS bool DisableWS bool
DisableQUIC bool DisableQUIC bool
Bootnodes []string
NetworkID int32 NetworkID int32
AddressBook addressbook.GetterPutter Addressbook addressbook.Putter
TopologyDriver topology.Driver
Logger logging.Logger Logger logging.Logger
} }
...@@ -158,8 +155,7 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -158,8 +155,7 @@ func New(ctx context.Context, o Options) (*Service, error) {
networkID: o.NetworkID, networkID: o.NetworkID,
handshakeService: handshake.New(peerRegistry, o.Overlay, o.NetworkID, o.Logger), handshakeService: handshake.New(peerRegistry, o.Overlay, o.NetworkID, o.Logger),
peers: peerRegistry, peers: peerRegistry,
addressBook: o.AddressBook, addrssbook: o.Addressbook,
topologyDriver: o.TopologyDriver,
logger: o.Logger, logger: o.Logger,
} }
...@@ -191,22 +187,16 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -191,22 +187,16 @@ func New(ctx context.Context, o Options) (*Service, error) {
} }
s.peers.add(stream.Conn(), i.Address) s.peers.add(stream.Conn(), i.Address)
s.metrics.HandledStreamCount.Inc() s.addrssbook.Put(i.Address, stream.Conn().RemoteMultiaddr())
s.logger.Infof("peer %s connected", i.Address) if s.peerHandler != nil {
}) if err := s.peerHandler(ctx, i.Address); err != nil {
s.logger.Debugf("peerhandler error: %s: %v", peerID, err)
// TODO: be more resilient on connection errors and connect in parallel
for _, a := range o.Bootnodes {
addr, err := ma.NewMultiaddr(a)
if err != nil {
return nil, fmt.Errorf("bootnode %s: %w", a, err)
} }
overlay, err := s.Connect(ctx, addr)
if err != nil {
return nil, fmt.Errorf("connect to bootnode %s %s: %w", a, overlay, err)
}
} }
s.metrics.HandledStreamCount.Inc()
s.logger.Infof("peer %s connected", i.Address)
})
h.Network().SetConnHandler(func(_ network.Conn) { h.Network().SetConnHandler(func(_ network.Conn) {
s.metrics.HandledConnectionCount.Inc() s.metrics.HandledConnectionCount.Inc()
...@@ -294,13 +284,6 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm ...@@ -294,13 +284,6 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm
} }
s.peers.add(stream.Conn(), i.Address) s.peers.add(stream.Conn(), i.Address)
s.addressBook.Put(i.Address, addr)
err = s.topologyDriver.AddPeer(i.Address)
if err != nil {
return swarm.Address{}, fmt.Errorf("topology addpeer: %w", err)
}
s.metrics.CreatedConnectionCount.Inc() s.metrics.CreatedConnectionCount.Inc()
s.logger.Infof("peer %s connected", i.Address) s.logger.Infof("peer %s connected", i.Address)
return i.Address, nil return i.Address, nil
...@@ -326,6 +309,10 @@ func (s *Service) Peers() []p2p.Peer { ...@@ -326,6 +309,10 @@ func (s *Service) Peers() []p2p.Peer {
return s.peers.peers() return s.peers.peers()
} }
func (s *Service) SetPeerAddedHandler(h func(context.Context, swarm.Address) error) {
s.peerHandler = h
}
func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, protocolName, protocolVersion, streamName string) (p2p.Stream, error) { func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, protocolName, protocolVersion, streamName string) (p2p.Stream, error) {
peerID, found := s.peers.peerID(overlay) peerID, found := s.peers.peerID(overlay)
if !found { if !found {
......
...@@ -14,12 +14,10 @@ import ( ...@@ -14,12 +14,10 @@ import (
"github.com/ethersphere/bee/pkg/addressbook/inmem" "github.com/ethersphere/bee/pkg/addressbook/inmem"
"github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/discovery/mock"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology/full"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
) )
...@@ -51,13 +49,9 @@ func newService(t *testing.T, o libp2p.Options) (s *libp2p.Service, overlay swar ...@@ -51,13 +49,9 @@ func newService(t *testing.T, o libp2p.Options) (s *libp2p.Service, overlay swar
if o.Addr == "" { if o.Addr == "" {
o.Addr = ":0" o.Addr = ":0"
} }
if o.AddressBook == nil {
o.AddressBook = inmem.New()
}
if o.TopologyDriver == nil { if o.Addressbook == nil {
disc := mock.NewDiscovery() o.Addressbook = inmem.New()
o.TopologyDriver = full.New(disc, o.AddressBook)
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
......
...@@ -18,6 +18,7 @@ type Service struct { ...@@ -18,6 +18,7 @@ type Service struct {
connectFunc func(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error) connectFunc func(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error)
disconnectFunc func(overlay swarm.Address) error disconnectFunc func(overlay swarm.Address) error
peersFunc func() []p2p.Peer peersFunc func() []p2p.Peer
setPeerAddedHandlerFunc func(func(context.Context, swarm.Address) error)
} }
func WithAddProtocolFunc(f func(p2p.ProtocolSpec) error) Option { func WithAddProtocolFunc(f func(p2p.ProtocolSpec) error) Option {
...@@ -44,6 +45,12 @@ func WithPeersFunc(f func() []p2p.Peer) Option { ...@@ -44,6 +45,12 @@ func WithPeersFunc(f func() []p2p.Peer) Option {
}) })
} }
func WithSetPeerAddedHandlerFunc(f func(func(context.Context, swarm.Address) error)) Option {
return optionFunc(func(s *Service) {
s.setPeerAddedHandlerFunc = f
})
}
func New(opts ...Option) *Service { func New(opts ...Option) *Service {
s := new(Service) s := new(Service)
for _, o := range opts { for _, o := range opts {
...@@ -73,6 +80,14 @@ func (s *Service) Disconnect(overlay swarm.Address) error { ...@@ -73,6 +80,14 @@ func (s *Service) Disconnect(overlay swarm.Address) error {
return s.disconnectFunc(overlay) return s.disconnectFunc(overlay)
} }
func (s *Service) SetPeerAddedHandler(f func(context.Context, swarm.Address) error) {
if s.setPeerAddedHandlerFunc == nil {
return
}
s.setPeerAddedHandlerFunc(f)
}
func (s *Service) Peers() []p2p.Peer { func (s *Service) Peers() []p2p.Peer {
if s.peersFunc == nil { if s.peersFunc == nil {
return nil return nil
......
...@@ -17,6 +17,7 @@ type Service interface { ...@@ -17,6 +17,7 @@ type Service interface {
Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error)
Disconnect(overlay swarm.Address) error Disconnect(overlay swarm.Address) error
Peers() []Peer Peers() []Peer
SetPeerAddedHandler(func(context.Context, swarm.Address) error)
} }
type Streamer interface { type Streamer interface {
......
...@@ -12,6 +12,8 @@ import ( ...@@ -12,6 +12,8 @@ import (
"github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/discovery" "github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
) )
...@@ -20,81 +22,113 @@ func init() { ...@@ -20,81 +22,113 @@ func init() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
} }
var _ topology.Driver = (*driver)(nil) var _ topology.Driver = (*Driver)(nil)
// driver drives the connectivity between nodes. It is a basic implementation of a connectivity driver. // Driver drives the connectivity between nodes. It is a basic implementation of a connectivity Driver.
// that enabled full connectivity in the sense that: // that enabled full connectivity in the sense that:
// - Every peer which is added to the driver gets broadcasted to every other peer regardless of its address. // - Every peer which is added to the Driver gets broadcasted to every other peer regardless of its address.
// - A random peer is picked when asking for a peer to retrieve an arbitrary chunk (Peerer interface). // - A random peer is picked when asking for a peer to retrieve an arbitrary chunk (Peerer interface).
type driver struct { type Driver struct {
mtx sync.Mutex
connected map[string]swarm.Address
discovery discovery.Driver discovery discovery.Driver
addressBook addressbook.Getter addressBook addressbook.GetPutter
p2pService p2p.Service
receivedPeers map[string]struct{} // track already received peers. Note: implement cleanup or expiration if needed to stop infinite grow
mtx sync.Mutex // guards received peers
logger logging.Logger
} }
func New(disc discovery.Driver, addressBook addressbook.Getter) topology.Driver { func New(disc discovery.Driver, addressBook addressbook.GetPutter, p2pService p2p.Service, logger logging.Logger) *Driver {
return &driver{ return &Driver{
connected: make(map[string]swarm.Address),
discovery: disc, discovery: disc,
addressBook: addressBook, addressBook: addressBook,
p2pService: p2pService,
receivedPeers: make(map[string]struct{}),
logger: logger,
} }
} }
// AddPeer adds a new peer to the topology driver. // AddPeer adds a new peer to the topology driver.
// The peer would be subsequently broadcasted to all connected peers. // The peer would be subsequently broadcasted to all connected peers.
// All conneceted peers are also broadcasted to the new peer. // All conneceted peers are also broadcasted to the new peer.
func (d *driver) AddPeer(overlay swarm.Address) error { func (d *Driver) AddPeer(ctx context.Context, addr swarm.Address) error {
d.mtx.Lock() d.mtx.Lock()
defer d.mtx.Unlock() if _, ok := d.receivedPeers[addr.ByteString()]; ok {
d.mtx.Unlock()
ma, exists := d.addressBook.Get(overlay) return nil
if !exists {
return topology.ErrNotFound
} }
d.mtx.Unlock()
var connectedNodes []discovery.BroadcastRecord connectedPeers := d.p2pService.Peers()
for _, addressee := range d.connected { ma, exists := d.addressBook.Get(addr)
cma, exists := d.addressBook.Get(addressee)
if !exists { if !exists {
return topology.ErrNotFound return topology.ErrNotFound
} }
err := d.discovery.BroadcastPeers(context.Background(), addressee, discovery.BroadcastRecord{Overlay: overlay, Addr: ma}) if !isConnected(addr, connectedPeers) {
peerAddr, err := d.p2pService.Connect(ctx, ma)
if err != nil { if err != nil {
return err return err
} }
connectedNodes = append(connectedNodes, discovery.BroadcastRecord{Overlay: addressee, Addr: cma}) // update addr if it is wrong or it has been changed
if !addr.Equal(peerAddr) {
addr = peerAddr
d.addressBook.Put(peerAddr, ma)
}
} }
err := d.discovery.BroadcastPeers(context.Background(), overlay, connectedNodes...) connectedAddrs := []swarm.Address{}
if err != nil { for _, addressee := range connectedPeers {
// skip newly added peer
if addressee.Address.Equal(addr) {
continue
}
connectedAddrs = append(connectedAddrs, addressee.Address)
if err := d.discovery.BroadcastPeers(context.Background(), addressee.Address, addr); err != nil {
return err return err
} }
}
// add new node to connected nodes to avoid double broadcasts if len(connectedAddrs) == 0 {
d.connected[overlay.String()] = overlay
return nil return nil
} }
if err := d.discovery.BroadcastPeers(context.Background(), addr, connectedAddrs...); err != nil {
return err
}
// ChunkPeer is used to suggest a peer to ask a certain chunk from.
func (d *driver) ChunkPeer(addr swarm.Address) (peerAddr swarm.Address, err error) {
d.mtx.Lock() d.mtx.Lock()
defer d.mtx.Unlock() d.receivedPeers[addr.ByteString()] = struct{}{}
d.mtx.Unlock()
return nil
}
if len(d.connected) == 0 { // ChunkPeer is used to suggest a peer to ask a certain chunk from.
func (d *Driver) ChunkPeer(addr swarm.Address) (peerAddr swarm.Address, err error) {
connectedPeers := d.p2pService.Peers()
if len(connectedPeers) == 0 {
return swarm.Address{}, topology.ErrNotFound return swarm.Address{}, topology.ErrNotFound
} }
itemIdx := rand.Intn(len(d.connected)) itemIdx := rand.Intn(len(connectedPeers))
i := 0 i := 0
for _, v := range d.connected { for _, v := range connectedPeers {
if i == itemIdx { if i == itemIdx {
return v, nil return v.Address, nil
} }
i++ i++
} }
return swarm.Address{}, topology.ErrNotFound return swarm.Address{}, topology.ErrNotFound
} }
func isConnected(addr swarm.Address, connectedPeers []p2p.Peer) bool {
for _, p := range connectedPeers {
if p.Address.Equal(addr) {
return true
}
}
return false
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package full_test
import (
"context"
"errors"
"fmt"
"io/ioutil"
"testing"
"github.com/ethersphere/bee/pkg/addressbook/inmem"
"github.com/ethersphere/bee/pkg/discovery/mock"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
p2pmock "github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/full"
ma "github.com/multiformats/go-multiaddr"
)
func TestAddPeer(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
underlay := "/ip4/127.0.0.1/tcp/7070/p2p/16Uiu2HAkx8ULY8cTXhdVAcMmLcH9AsTKz6uBQ7DPLKRjMLgBVYkS"
overlay := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59a")
connectedPeers := []p2p.Peer{
{
Address: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59b"),
},
{
Address: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
},
{
Address: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59d"),
},
}
t.Run("OK - no connected peers", func(t *testing.T) {
discovery := mock.NewDiscovery()
addressbook := inmem.New()
p2p := p2pmock.New(p2pmock.WithConnectFunc(func(_ context.Context, addr ma.Multiaddr) (swarm.Address, error) {
if addr.String() != underlay {
t.Fatalf("expected multiaddr %s, got %s", addr.String(), underlay)
}
return overlay, nil
}))
fullDriver := full.New(discovery, addressbook, p2p, logger)
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal(err)
}
addressbook.Put(overlay, multiaddr)
err = fullDriver.AddPeer(context.Background(), overlay)
if err != nil {
t.Fatalf("full conn driver returned err %s", err.Error())
}
if discovery.Broadcasts() != 0 {
t.Fatalf("broadcasts expected %v, got %v ", 0, discovery.Broadcasts())
}
})
t.Run("ERROR - peer not added", func(t *testing.T) {
discovery := mock.NewDiscovery()
addressbook := inmem.New()
p2p := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) {
t.Fatal("should not be called")
return swarm.Address{}, nil
}))
fullDriver := full.New(discovery, addressbook, p2p, logger)
err := fullDriver.AddPeer(context.Background(), overlay)
if !errors.Is(err, topology.ErrNotFound) {
t.Fatalf("full conn driver returned err %v", err)
}
if discovery.Broadcasts() != 0 {
t.Fatalf("broadcasts expected %v, got %v ", 0, discovery.Broadcasts())
}
})
t.Run("OK - connected peers - peer already connected", func(t *testing.T) {
discovery := mock.NewDiscovery()
addressbook := inmem.New()
alreadyConnected := connectedPeers[0].Address
p2p := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) {
t.Fatal("should not be called")
return swarm.Address{}, nil
}), p2pmock.WithPeersFunc(func() []p2p.Peer {
return connectedPeers
}))
fullDriver := full.New(discovery, addressbook, p2p, logger)
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal("error creating multiaddr")
}
addressbook.Put(alreadyConnected, multiaddr)
err = fullDriver.AddPeer(context.Background(), alreadyConnected)
if err != nil {
t.Fatalf("full conn driver returned err %s", err.Error())
}
if discovery.Broadcasts() != 3 {
t.Fatalf("broadcasts expected %v, got %v ", 3, discovery.Broadcasts())
}
// check newly added node
if err := checkAddreseeRecords(discovery, alreadyConnected, connectedPeers[1:]); err != nil {
t.Fatal(err)
}
// check other nodes
for _, p := range connectedPeers[1:] {
if err := checkAddreseeRecords(discovery, p.Address, connectedPeers[0:1]); err != nil {
t.Fatal(err)
}
}
})
t.Run("OK - connected peers - peer not already connected", func(t *testing.T) {
discovery := mock.NewDiscovery()
addressbook := inmem.New()
p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) {
if addr.String() != underlay {
t.Fatalf("expected multiaddr %s, got %s", addr.String(), underlay)
}
return overlay, nil
}), p2pmock.WithPeersFunc(func() []p2p.Peer {
return connectedPeers
}))
fullDriver := full.New(discovery, addressbook, p2ps, logger)
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal(err)
}
addressbook.Put(overlay, multiaddr)
err = fullDriver.AddPeer(context.Background(), overlay)
if err != nil {
t.Fatalf("full conn driver returned err %s", err.Error())
}
if discovery.Broadcasts() != 4 {
t.Fatalf("broadcasts expected %v, got %v ", 4, discovery.Broadcasts())
}
// check newly added node
if err := checkAddreseeRecords(discovery, overlay, connectedPeers); err != nil {
t.Fatal(err)
}
// check other nodes
for _, p := range connectedPeers {
if err := checkAddreseeRecords(discovery, p.Address, []p2p.Peer{{Address: overlay}}); err != nil {
t.Fatal(err)
}
}
})
}
func checkAddreseeRecords(discovery *mock.Discovery, addr swarm.Address, expected []p2p.Peer) error {
got, exists := discovery.AddresseeRecords(addr)
if exists != true {
return errors.New("addressee record does not exist")
}
for i, e := range expected {
if !e.Address.Equal(got[i]) {
return fmt.Errorf("addressee record expected %s, got %s ", e.Address.String(), got[i].String())
}
}
return nil
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mock
import (
"context"
"sync"
"github.com/ethersphere/bee/pkg/swarm"
)
type TopologyDriver struct {
peers []swarm.Address
addPeerErr error
mtx sync.Mutex
}
func NewTopologyDriver() *TopologyDriver {
return &TopologyDriver{}
}
func (d *TopologyDriver) SetAddPeerErr(err error) {
d.addPeerErr = err
}
func (d *TopologyDriver) AddPeer(_ context.Context, addr swarm.Address) error {
if d.addPeerErr != nil {
return d.addPeerErr
}
d.mtx.Lock()
d.peers = append(d.peers, addr)
d.mtx.Unlock()
return nil
}
func (d *TopologyDriver) Peers() []swarm.Address {
return d.peers
}
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
package topology package topology
import ( import (
"context"
"errors" "errors"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
...@@ -13,10 +14,14 @@ import ( ...@@ -13,10 +14,14 @@ import (
var ErrNotFound = errors.New("no peer found") var ErrNotFound = errors.New("no peer found")
type Driver interface { type Driver interface {
AddPeer(overlay swarm.Address) error PeerAdder
ChunkPeerer ChunkPeerer
} }
type PeerAdder interface {
AddPeer(ctx context.Context, addr swarm.Address) error
}
type ChunkPeerer interface { type ChunkPeerer interface {
ChunkPeer(addr swarm.Address) (peerAddr swarm.Address, err error) ChunkPeer(addr swarm.Address) (peerAddr swarm.Address, err error)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment