Commit 577135be authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

Remove connect notify (#641)

Remove connect notify from p2p interfac
parent 8a8487bd
...@@ -28,7 +28,7 @@ type server struct { ...@@ -28,7 +28,7 @@ type server struct {
Overlay swarm.Address Overlay swarm.Address
P2P p2p.DebugService P2P p2p.DebugService
Pingpong pingpong.Interface Pingpong pingpong.Interface
TopologyDriver topology.PeerAdder TopologyDriver topology.Driver
Storer storage.Storer Storer storage.Storer
Logger logging.Logger Logger logging.Logger
Tracer *tracing.Tracer Tracer *tracing.Tracer
...@@ -39,7 +39,7 @@ type server struct { ...@@ -39,7 +39,7 @@ type server struct {
metricsRegistry *prometheus.Registry metricsRegistry *prometheus.Registry
} }
func New(overlay swarm.Address, p2p p2p.DebugService, pingpong pingpong.Interface, topologyDriver topology.PeerAdder, storer storage.Storer, logger logging.Logger, tracer *tracing.Tracer, tags *tags.Tags, accounting accounting.Interface) Service { func New(overlay swarm.Address, p2p p2p.DebugService, pingpong pingpong.Interface, topologyDriver topology.Driver, storer storage.Storer, logger logging.Logger, tracer *tracing.Tracer, tags *tags.Tags, accounting accounting.Interface) Service {
s := &server{ s := &server{
Overlay: overlay, Overlay: overlay,
P2P: p2p, P2P: p2p,
......
...@@ -27,7 +27,7 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) { ...@@ -27,7 +27,7 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
bzzAddr, err := s.P2P.ConnectNotify(r.Context(), addr) bzzAddr, err := s.P2P.Connect(r.Context(), addr)
if err != nil { if err != nil {
s.Logger.Debugf("debug api: peer connect %s: %v", addr, err) s.Logger.Debugf("debug api: peer connect %s: %v", addr, err)
s.Logger.Errorf("unable to connect to peer %s", addr) s.Logger.Errorf("unable to connect to peer %s", addr)
...@@ -35,6 +35,14 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) { ...@@ -35,6 +35,14 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
if err := s.TopologyDriver.Connected(r.Context(), bzzAddr.Overlay); err != nil {
_ = s.P2P.Disconnect(bzzAddr.Overlay)
s.Logger.Debugf("debug api: peer connect handler %s: %v", addr, err)
s.Logger.Errorf("unable to connect to peer %s", addr)
jsonhttp.InternalServerError(w, err)
return
}
jsonhttp.OK(w, peerConnectResponse{ jsonhttp.OK(w, peerConnectResponse{
Address: bzzAddr.Overlay.String(), Address: bzzAddr.Overlay.String(),
}) })
......
...@@ -60,9 +60,6 @@ func TestConnect(t *testing.T) { ...@@ -60,9 +60,6 @@ func TestConnect(t *testing.T) {
Address: overlay.String(), Address: overlay.String(),
}), }),
) )
if testServer.P2PMock.ConnectNotifyCalls() != 1 {
t.Fatal("connect notify not called")
}
}) })
t.Run("error", func(t *testing.T) { t.Run("error", func(t *testing.T) {
......
...@@ -411,42 +411,6 @@ func TestTopologyNotifier(t *testing.T) { ...@@ -411,42 +411,6 @@ func TestTopologyNotifier(t *testing.T) {
waitAddrSet(t, &n2disconnectedAddr, &mtx, overlay1) waitAddrSet(t, &n2disconnectedAddr, &mtx, overlay1)
} }
func TestTopologyLocalNotifier(t *testing.T) {
var (
mtx sync.Mutex
n2connectedAddr swarm.Address
n2c = func(_ context.Context, a swarm.Address) error {
mtx.Lock()
defer mtx.Unlock()
n2connectedAddr = a
return nil
}
n2d = func(a swarm.Address) {
}
)
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
notifier2 := mockNotifier(n2c, n2d)
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
s2.AddNotifier(notifier2)
addr := serviceUnderlayAddress(t, s1)
// s2 connects to s1, thus the notifier on s1 should be called on Connect
_, err := s2.ConnectNotify(context.Background(), addr)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
// expect that n1 notifee called with s2 overlay
waitAddrSet(t, &n2connectedAddr, &mtx, overlay1)
}
func TestTopologySupportMultipleNotifiers(t *testing.T) { func TestTopologySupportMultipleNotifiers(t *testing.T) {
var ( var (
mtx sync.Mutex mtx sync.Mutex
...@@ -473,17 +437,15 @@ func TestTopologySupportMultipleNotifiers(t *testing.T) { ...@@ -473,17 +437,15 @@ func TestTopologySupportMultipleNotifiers(t *testing.T) {
) )
s1, overlay1 := newService(t, 1, libp2pServiceOpts{}) s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
notifier21 := mockNotifier(n21c, n21d) s1.AddNotifier(mockNotifier(n21c, n21d))
notifier22 := mockNotifier(n22c, n22d) s1.AddNotifier(mockNotifier(n22c, n22d))
s2, overlay2 := newService(t, 1, libp2pServiceOpts{}) s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
s2.AddNotifier(notifier21)
s2.AddNotifier(notifier22)
addr := serviceUnderlayAddress(t, s1) addr := serviceUnderlayAddress(t, s1)
// s2 connects to s1, thus the notifier on s1 should be called on Connect // s2 connects to s1, thus the notifier on s1 should be called on Connect
_, err := s2.ConnectNotify(context.Background(), addr) _, err := s2.Connect(context.Background(), addr)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -492,8 +454,8 @@ func TestTopologySupportMultipleNotifiers(t *testing.T) { ...@@ -492,8 +454,8 @@ func TestTopologySupportMultipleNotifiers(t *testing.T) {
expectPeersEventually(t, s1, overlay2) expectPeersEventually(t, s1, overlay2)
// expect that n1 notifee called with s2 overlay // expect that n1 notifee called with s2 overlay
waitAddrSet(t, &n21connectedAddr, &mtx, overlay1) waitAddrSet(t, &n21connectedAddr, &mtx, overlay2)
waitAddrSet(t, &n22connectedAddr, &mtx, overlay1) waitAddrSet(t, &n22connectedAddr, &mtx, overlay2)
} }
func waitAddrSet(t *testing.T, addr *swarm.Address, mtx *sync.Mutex, exp swarm.Address) { func waitAddrSet(t *testing.T, addr *swarm.Address, mtx *sync.Mutex, exp swarm.Address) {
......
...@@ -351,29 +351,6 @@ func buildUnderlayAddress(addr ma.Multiaddr, peerID libp2ppeer.ID) (ma.Multiaddr ...@@ -351,29 +351,6 @@ func buildUnderlayAddress(addr ma.Multiaddr, peerID libp2ppeer.ID) (ma.Multiaddr
return addr.Encapsulate(hostAddr), nil return addr.Encapsulate(hostAddr), nil
} }
func (s *Service) ConnectNotify(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) {
info, err := libp2ppeer.AddrInfoFromP2pAddr(addr)
if err != nil {
return nil, fmt.Errorf("addr from p2p: %w", err)
}
address, err = s.Connect(ctx, addr)
if err != nil {
return nil, fmt.Errorf("connect notify: %w", err)
}
if len(s.topologyNotifiers) > 0 {
for _, tn := range s.topologyNotifiers {
if err := tn.Connected(ctx, address.Overlay); err != nil {
_ = s.disconnect(info.ID)
return nil, fmt.Errorf("notify topology: %w", err)
}
}
}
return address, nil
}
func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) { func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) {
// Extract the peer ID from the multiaddr. // Extract the peer ID from the multiaddr.
info, err := libp2ppeer.AddrInfoFromP2pAddr(addr) info, err := libp2ppeer.AddrInfoFromP2pAddr(addr)
......
...@@ -7,7 +7,6 @@ package mock ...@@ -7,7 +7,6 @@ package mock
import ( import (
"context" "context"
"errors" "errors"
"sync/atomic"
"github.com/ethersphere/bee/pkg/bzz" "github.com/ethersphere/bee/pkg/bzz"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
...@@ -27,7 +26,6 @@ type Service struct { ...@@ -27,7 +26,6 @@ type Service struct {
setWelcomeMessageFunc func(string) error setWelcomeMessageFunc func(string) error
getWelcomeMessageFunc func() string getWelcomeMessageFunc func() string
welcomeMessage string welcomeMessage string
notifyCalled int32
} }
// WithAddProtocolFunc sets the mock implementation of the AddProtocol function // WithAddProtocolFunc sets the mock implementation of the AddProtocol function
...@@ -102,14 +100,6 @@ func (s *Service) AddProtocol(spec p2p.ProtocolSpec) error { ...@@ -102,14 +100,6 @@ func (s *Service) AddProtocol(spec p2p.ProtocolSpec) error {
return s.addProtocolFunc(spec) return s.addProtocolFunc(spec)
} }
func (s *Service) ConnectNotify(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) {
if s.connectFunc == nil {
return nil, errors.New("function Connect not configured")
}
atomic.AddInt32(&s.notifyCalled, 1)
return s.connectFunc(ctx, addr)
}
func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) { func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) {
if s.connectFunc == nil { if s.connectFunc == nil {
return nil, errors.New("function Connect not configured") return nil, errors.New("function Connect not configured")
...@@ -146,11 +136,6 @@ func (s *Service) Peers() []p2p.Peer { ...@@ -146,11 +136,6 @@ func (s *Service) Peers() []p2p.Peer {
return s.peersFunc() return s.peersFunc()
} }
func (s *Service) ConnectNotifyCalls() int32 {
c := atomic.LoadInt32(&s.notifyCalled)
return c
}
func (s *Service) SetWelcomeMessage(val string) error { func (s *Service) SetWelcomeMessage(val string) error {
if s.setWelcomeMessageFunc != nil { if s.setWelcomeMessageFunc != nil {
return s.setWelcomeMessageFunc(val) return s.setWelcomeMessageFunc(val)
......
...@@ -17,9 +17,6 @@ import ( ...@@ -17,9 +17,6 @@ import (
// Service provides methods to handle p2p Peers and Protocols. // Service provides methods to handle p2p Peers and Protocols.
type Service interface { type Service interface {
AddProtocol(ProtocolSpec) error AddProtocol(ProtocolSpec) error
// ConnectNotify connects to the given multiaddress and notifies the topology once the
// peer has been successfully connected.
ConnectNotify(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error)
// Connect to a peer but do not notify topology about the established connection. // Connect to a peer but do not notify topology about the established connection.
Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error)
Disconnect(overlay swarm.Address) error Disconnect(overlay swarm.Address) error
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment