Commit 5f5c6a13 authored by acud's avatar acud Committed by GitHub

p2p, kademlia: cleanup connect logic (#252)

* debugapi, p2p: simplify connection logic
parent badf9abf
......@@ -7,7 +7,6 @@ package debugapi
import (
"net/http"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/pingpong"
......@@ -35,8 +34,7 @@ type Options struct {
Overlay swarm.Address
P2P p2p.Service
Pingpong pingpong.Interface
Addressbook addressbook.GetPutter
TopologyDriver topology.Notifier
TopologyDriver topology.PeerAdder
Storer storage.Storer
Logger logging.Logger
Tracer *tracing.Tracer
......
......@@ -11,17 +11,14 @@ import (
"net/url"
"testing"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
mockp2p "github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/ethersphere/bee/pkg/pingpong"
mockstore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/mock"
"github.com/multiformats/go-multiaddr"
"resenje.org/web"
......@@ -29,7 +26,7 @@ import (
type testServerOptions struct {
Overlay swarm.Address
P2P p2p.Service
P2P *mockp2p.Service
Pingpong pingpong.Interface
Storer storage.Storer
TopologyOpts []mock.Option
......@@ -37,14 +34,11 @@ type testServerOptions struct {
}
type testServer struct {
Client *http.Client
Addressbook addressbook.GetPutter
TopologyDriver topology.Driver
Client *http.Client
P2PMock *mockp2p.Service
}
func newTestServer(t *testing.T, o testServerOptions) *testServer {
statestore := mockstore.NewStateStore()
addrbook := addressbook.New(statestore)
topologyDriver := mock.NewTopologyDriver(o.TopologyOpts...)
s := debugapi.New(debugapi.Options{
......@@ -53,7 +47,6 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer {
Pingpong: o.Pingpong,
Tags: o.Tags,
Logger: logging.New(ioutil.Discard, 0),
Addressbook: addrbook,
Storer: o.Storer,
TopologyDriver: topologyDriver,
})
......@@ -71,8 +64,8 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer {
}),
}
return &testServer{
Client: client,
Addressbook: addrbook,
Client: client,
P2PMock: o.P2P,
}
}
......
......@@ -27,7 +27,7 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) {
return
}
bzzAddr, err := s.P2P.Connect(r.Context(), addr)
bzzAddr, err := s.P2P.ConnectNotify(r.Context(), addr)
if err != nil {
s.Logger.Debugf("debug api: peer connect %s: %v", addr, err)
s.Logger.Errorf("unable to connect to peer %s", addr)
......@@ -35,21 +35,6 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) {
return
}
err = s.Addressbook.Put(bzzAddr.Overlay, *bzzAddr)
if err != nil {
s.Logger.Debugf("debug api: addressbook.put %s: %v", addr, err)
s.Logger.Errorf("unable to persist peer %s", addr)
jsonhttp.InternalServerError(w, err)
return
}
if err := s.TopologyDriver.Connected(r.Context(), bzzAddr.Overlay); err != nil {
_ = s.P2P.Disconnect(bzzAddr.Overlay)
s.Logger.Debugf("debug api: topologyDriver.Connected %s: %v", addr, err)
s.Logger.Errorf("unable to connect to peer %s", addr)
jsonhttp.InternalServerError(w, err)
return
}
jsonhttp.OK(w, peerConnectResponse{
Address: bzzAddr.Overlay.String(),
})
......
......@@ -18,7 +18,6 @@ import (
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/ethersphere/bee/pkg/swarm"
topmock "github.com/ethersphere/bee/pkg/topology/mock"
ma "github.com/multiformats/go-multiaddr"
)
......@@ -59,13 +58,8 @@ func TestConnect(t *testing.T) {
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodPost, "/connect"+underlay, nil, http.StatusOK, debugapi.PeerConnectResponse{
Address: overlay.String(),
})
bzzAddr, err := testServer.Addressbook.Get(overlay)
if err != nil {
t.Fatal(err)
}
if !bzzAddress.Equal(bzzAddr) {
t.Fatalf("found wrong underlay. expected: %+v, found: %+v", bzzAddress, bzzAddr)
if testServer.P2PMock.ConnectNotifyCalls() != 1 {
t.Fatal("connect notify not called")
}
})
......@@ -84,38 +78,20 @@ func TestConnect(t *testing.T) {
})
t.Run("error - add peer", func(t *testing.T) {
disconnectCalled := false
testServer := newTestServer(t, testServerOptions{
P2P: mock.New(mock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (*bzz.Address, error) {
if addr.String() == errorUnderlay {
return nil, testErr
}
return bzzAddress, nil
}), mock.WithDisconnectFunc(func(addr swarm.Address) error {
disconnectCalled = true
return nil
})),
TopologyOpts: []topmock.Option{topmock.WithAddPeerErr(testErr)},
})
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodPost, "/connect"+underlay, nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
jsonhttptest.ResponseDirect(t, testServer.Client, http.MethodPost, "/connect"+errorUnderlay, nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
Code: http.StatusInternalServerError,
Message: testErr.Error(),
})
bzzAddr, err := testServer.Addressbook.Get(overlay)
if err != nil {
t.Fatal(err)
}
if !bzzAddress.Equal(bzzAddr) {
t.Fatalf("found wrong underlay. expected: %+v, found: %+v", bzzAddress, bzzAddr)
}
if !disconnectCalled {
t.Fatalf("disconnect not called.")
}
})
}
func TestDisconnect(t *testing.T) {
......
......@@ -14,7 +14,6 @@ import (
"net/http"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/ethersphere/bee/pkg/addressbook"
......@@ -42,7 +41,6 @@ import (
"github.com/ethersphere/bee/pkg/statestore/leveldb"
mockinmem "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/bee/pkg/validator"
......@@ -330,7 +328,6 @@ func NewBee(o Options) (*Bee, error) {
Pingpong: pingPong,
Logger: logger,
Tracer: tracer,
Addressbook: addressbook,
TopologyDriver: topologyDriver,
Storer: storer,
})
......@@ -366,37 +363,26 @@ func NewBee(o Options) (*Bee, error) {
b.debugAPIServer = debugAPIServer
}
overlays, err := addressbook.Overlays()
addresses, err := addressbook.Overlays()
if err != nil {
return nil, fmt.Errorf("addressbook overlays: %w", err)
}
var count int32
var wg sync.WaitGroup
jobsC := make(chan struct{}, 16)
for _, o := range overlays {
jobsC <- struct{}{}
wg.Add(1)
go func(overlay swarm.Address) {
defer func() {
<-jobsC
}()
defer wg.Done()
if err := topologyDriver.AddPeer(p2pCtx, overlay); err != nil {
logger.Debugf("topology add peer fail %s: %v", overlay, err)
logger.Warningf("topology add peer %s", overlay)
return
}
atomic.AddInt32(&count, 1)
}(o)
// add the peers to topology and allow it to connect independently
for _, o := range addresses {
err = topologyDriver.AddPeer(p2pCtx, o)
if err != nil {
logger.Debugf("topology add peer from addressbook: %v", err)
} else {
count++
}
}
wg.Wait()
// Connect bootnodes if no nodes from the addressbook was sucesufully added to topology
// Connect bootnodes if the address book is clean
if count == 0 {
var wg sync.WaitGroup
for _, a := range o.Bootnodes {
wg.Add(1)
go func(a string) {
......@@ -409,8 +395,8 @@ func NewBee(o Options) (*Bee, error) {
}
var count int
if _, err := p2p.Discover(p2pCtx, addr, func(addr ma.Multiaddr) (stop bool, err error) {
logger.Tracef("connecting to peer %s", addr)
bzzAddr, err := p2ps.Connect(p2pCtx, addr)
logger.Tracef("connecting to bootnode %s", addr)
_, err = p2ps.ConnectNotify(p2pCtx, addr)
if err != nil {
if !errors.Is(err, p2p.ErrAlreadyConnected) {
logger.Debugf("connect fail %s: %v", addr, err)
......@@ -418,22 +404,7 @@ func NewBee(o Options) (*Bee, error) {
}
return false, nil
}
logger.Tracef("connected to peer %s", addr)
err = addressbook.Put(bzzAddr.Overlay, *bzzAddr)
if err != nil {
_ = p2ps.Disconnect(bzzAddr.Overlay)
logger.Debugf("addressbook error persisting %s %s: %v", addr, bzzAddr.Overlay, err)
logger.Warningf("connect to bootnode %s", addr)
return false, nil
}
if err := topologyDriver.Connected(p2pCtx, bzzAddr.Overlay); err != nil {
_ = p2ps.Disconnect(bzzAddr.Overlay)
logger.Debugf("topology connected fail %s %s: %v", addr, bzzAddr.Overlay, err)
logger.Warningf("connect to bootnode %s", addr)
return false, nil
}
logger.Tracef("connected to bootnode %s", addr)
count++
// connect to max 3 bootnodes
return count > 3, nil
......@@ -444,7 +415,6 @@ func NewBee(o Options) (*Bee, error) {
}
}(a)
}
wg.Wait()
}
......
......@@ -11,12 +11,15 @@ import (
"testing"
"time"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
func TestAddresses(t *testing.T) {
......@@ -299,12 +302,13 @@ func TestConnectRepeatHandshake(t *testing.T) {
expectPeersEventually(t, s1)
}
func TestTopologyNotifiee(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func TestTopologyNotifier(t *testing.T) {
var (
mtx sync.Mutex
mtx sync.Mutex
ctx = context.Background()
ab1, ab2 = addressbook.New(mock.NewStateStore()), addressbook.New(mock.NewStateStore())
n1connectedAddr swarm.Address
n1disconnectedAddr swarm.Address
n2connectedAddr swarm.Address
......@@ -337,16 +341,16 @@ func TestTopologyNotifiee(t *testing.T) {
}
)
notifier1 := mockNotifier(n1c, n1d)
s1, overlay1 := newService(t, 1, libp2p.Options{})
s1, overlay1 := newService(t, 1, libp2p.Options{Addressbook: ab1})
s1.SetNotifier(notifier1)
notifier2 := mockNotifier(n2c, n2d)
s2, overlay2 := newService(t, 1, libp2p.Options{})
s2, overlay2 := newService(t, 1, libp2p.Options{Addressbook: ab2})
s2.SetNotifier(notifier2)
addr := serviceUnderlayAddress(t, s1)
// s2 connects to s1, thus the notifiee on s1 should be called on Connect
// s2 connects to s1, thus the notifier on s1 should be called on Connect
bzzAddr, err := s2.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
......@@ -362,6 +366,9 @@ func TestTopologyNotifiee(t *testing.T) {
expectZeroAddress(t, n1disconnectedAddr, n2connectedAddr, n2disconnectedAddr)
mtx.Unlock()
// check address book entries are there
checkAddressbook(t, ab2, overlay1, addr)
// s2 disconnects from s1 so s1 disconnect notifiee should be called
if err := s2.Disconnect(bzzAddr.Overlay); err != nil {
t.Fatal(err)
......@@ -400,6 +407,42 @@ func TestTopologyNotifiee(t *testing.T) {
waitAddrSet(t, &n2disconnectedAddr, &mtx, overlay1)
}
func TestTopologyLocalNotifier(t *testing.T) {
var (
mtx sync.Mutex
n2connectedAddr swarm.Address
n2c = func(_ context.Context, a swarm.Address) error {
mtx.Lock()
defer mtx.Unlock()
n2connectedAddr = a
return nil
}
n2d = func(a swarm.Address) {
}
)
s1, overlay1 := newService(t, 1, libp2p.Options{})
notifier2 := mockNotifier(n2c, n2d)
s2, overlay2 := newService(t, 1, libp2p.Options{})
s2.SetNotifier(notifier2)
addr := serviceUnderlayAddress(t, s1)
// s2 connects to s1, thus the notifier on s1 should be called on Connect
_, err := s2.ConnectNotify(context.Background(), addr)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
// expect that n1 notifee called with s2 overlay
waitAddrSet(t, &n2connectedAddr, &mtx, overlay1)
}
func waitAddrSet(t *testing.T, addr *swarm.Address, mtx *sync.Mutex, exp swarm.Address) {
t.Helper()
for i := 0; i < 20; i++ {
......@@ -414,6 +457,21 @@ func waitAddrSet(t *testing.T, addr *swarm.Address, mtx *sync.Mutex, exp swarm.A
t.Fatal("timed out waiting for address to be set")
}
func checkAddressbook(t *testing.T, ab addressbook.Getter, overlay swarm.Address, underlay ma.Multiaddr) {
t.Helper()
addr, err := ab.Get(overlay)
if err != nil {
t.Fatal(err)
}
if !addr.Overlay.Equal(overlay) {
t.Fatalf("overlay mismatch. got %s want %s", addr.Overlay, overlay)
}
if !addr.Underlay.Equal(underlay) {
t.Fatalf("underlay mismatch. got %s, want %s", addr.Underlay, underlay)
}
}
type notifiee struct {
connected func(context.Context, swarm.Address) error
disconnected func(swarm.Address)
......
......@@ -70,8 +70,7 @@ type Options struct {
Tracer *tracing.Tracer
}
func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay swarm.Address, addr string,
o Options) (*Service, error) {
func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay swarm.Address, addr string, o Options) (*Service, error) {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, fmt.Errorf("address: %w", err)
......@@ -349,6 +348,25 @@ func buildUnderlayAddress(addr ma.Multiaddr, peerID libp2ppeer.ID) (ma.Multiaddr
return addr.Encapsulate(hostAddr), nil
}
func (s *Service) ConnectNotify(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) {
info, err := libp2ppeer.AddrInfoFromP2pAddr(addr)
if err != nil {
return nil, fmt.Errorf("addr from p2p: %w", err)
}
address, err = s.Connect(ctx, addr)
if err != nil {
return nil, fmt.Errorf("connect notify: %w", err)
}
if s.topologyNotifier != nil {
if err := s.topologyNotifier.Connected(ctx, address.Overlay); err != nil {
_ = s.disconnect(info.ID)
return nil, fmt.Errorf("notify topology: %w", err)
}
}
return address, nil
}
func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) {
// Extract the peer ID from the multiaddr.
info, err := libp2ppeer.AddrInfoFromP2pAddr(addr)
......@@ -395,6 +413,12 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return nil, fmt.Errorf("connect full close %w", err)
}
err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress)
if err != nil {
_ = s.disconnect(info.ID)
return nil, fmt.Errorf("storing bzz address: %w", err)
}
s.metrics.CreatedConnectionCount.Inc()
s.logger.Infof("successfully connected to peer (outbound) %s", i.BzzAddress.ShortString())
return i.BzzAddress, nil
......
......@@ -7,6 +7,7 @@ package mock
import (
"context"
"errors"
"sync/atomic"
"github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/p2p"
......@@ -22,6 +23,7 @@ type Service struct {
peersFunc func() []p2p.Peer
setNotifierFunc func(topology.Notifier)
addressesFunc func() ([]ma.Multiaddr, error)
notifyCalled int32
}
func WithAddProtocolFunc(f func(p2p.ProtocolSpec) error) Option {
......@@ -75,6 +77,14 @@ func (s *Service) AddProtocol(spec p2p.ProtocolSpec) error {
return s.addProtocolFunc(spec)
}
func (s *Service) ConnectNotify(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) {
if s.connectFunc == nil {
return nil, errors.New("function Connect not configured")
}
atomic.AddInt32(&s.notifyCalled, 1)
return s.connectFunc(ctx, addr)
}
func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) {
if s.connectFunc == nil {
return nil, errors.New("function Connect not configured")
......@@ -111,6 +121,11 @@ func (s *Service) Peers() []p2p.Peer {
return s.peersFunc()
}
func (s *Service) ConnectNotifyCalls() int32 {
c := atomic.LoadInt32(&s.notifyCalled)
return c
}
type Option interface {
apply(*Service)
}
......
......@@ -17,6 +17,10 @@ import (
// Service provides methods to handle p2p Peers and Protocols.
type Service interface {
AddProtocol(ProtocolSpec) error
// ConnectNotify connects to the given multiaddress and notifies the topology once the
// peer has been successfully connected.
ConnectNotify(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error)
// Connect to a peer but do not notify topology about the established connection.
Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error)
Disconnect(overlay swarm.Address) error
Peers() []Peer
......
......@@ -34,17 +34,20 @@ type Notifier interface {
type PeerAdder interface {
// AddPeer is called when a peer is added to the topology backlog
// for further processing by connectivity strategy.
AddPeer(ctx context.Context, addr swarm.Address) error
}
type Connecter interface {
// Connected is called when a peer dials in.
// Connected is called when a peer dials in, or in case explicit
// notification to kademlia on dial out is requested.
Connected(context.Context, swarm.Address) error
}
type Disconnecter interface {
// Disconnected is called when a peer disconnects.
// The disconnect event can be initiated on the local
// node or on the remote node, this handle does not make
// any distinctions between either of them.
Disconnected(swarm.Address)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment