Commit 7b3d9632 authored by Nemanja Zbiljić's avatar Nemanja Zbiljić Committed by GitHub

Support multiple notifiers in 'libp2p.Service' (#551)

* Rename function 'SetNotifier' to 'AddNotifier'

* Support multiple notifiers in 'libp2p.Service'
parent a0d37e91
......@@ -205,7 +205,7 @@ func NewBee(addr string, logger logging.Logger, o Options) (*Bee, error) {
topologyDriver := kademlia.New(kademlia.Options{Base: address, Discovery: hive, AddressBook: addressbook, P2P: p2ps, Logger: logger})
b.topologyCloser = topologyDriver
hive.SetPeerAddedHandler(topologyDriver.AddPeer)
p2ps.SetNotifier(topologyDriver)
p2ps.AddNotifier(topologyDriver)
addrs, err := p2ps.Addresses()
if err != nil {
return nil, fmt.Errorf("get server addresses: %w", err)
......
......@@ -346,11 +346,11 @@ func TestTopologyNotifier(t *testing.T) {
)
notifier1 := mockNotifier(n1c, n1d)
s1, overlay1 := newService(t, 1, libp2pServiceOpts{Addressbook: ab1})
s1.SetNotifier(notifier1)
s1.AddNotifier(notifier1)
notifier2 := mockNotifier(n2c, n2d)
s2, overlay2 := newService(t, 1, libp2pServiceOpts{Addressbook: ab2})
s2.SetNotifier(notifier2)
s2.AddNotifier(notifier2)
addr := serviceUnderlayAddress(t, s1)
......@@ -430,7 +430,7 @@ func TestTopologyLocalNotifier(t *testing.T) {
notifier2 := mockNotifier(n2c, n2d)
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
s2.SetNotifier(notifier2)
s2.AddNotifier(notifier2)
addr := serviceUnderlayAddress(t, s1)
......@@ -447,6 +447,55 @@ func TestTopologyLocalNotifier(t *testing.T) {
waitAddrSet(t, &n2connectedAddr, &mtx, overlay1)
}
func TestTopologySupportMultipleNotifiers(t *testing.T) {
var (
mtx sync.Mutex
n21connectedAddr swarm.Address
n22connectedAddr swarm.Address
n21c = func(_ context.Context, a swarm.Address) error {
mtx.Lock()
defer mtx.Unlock()
n21connectedAddr = a
return nil
}
n21d = func(a swarm.Address) {
}
n22c = func(_ context.Context, a swarm.Address) error {
mtx.Lock()
defer mtx.Unlock()
n22connectedAddr = a
return nil
}
n22d = func(a swarm.Address) {
}
)
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
notifier21 := mockNotifier(n21c, n21d)
notifier22 := mockNotifier(n22c, n22d)
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
s2.AddNotifier(notifier21)
s2.AddNotifier(notifier22)
addr := serviceUnderlayAddress(t, s1)
// s2 connects to s1, thus the notifier on s1 should be called on Connect
_, err := s2.ConnectNotify(context.Background(), addr)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2)
// expect that n1 notifee called with s2 overlay
waitAddrSet(t, &n21connectedAddr, &mtx, overlay1)
waitAddrSet(t, &n22connectedAddr, &mtx, overlay1)
}
func waitAddrSet(t *testing.T, addr *swarm.Address, mtx *sync.Mutex, exp swarm.Address) {
t.Helper()
for i := 0; i < 20; i++ {
......
......@@ -53,7 +53,7 @@ type Service struct {
handshakeService *handshake.Service
addressbook addressbook.Putter
peers *peerRegistry
topologyNotifier topology.Notifier
topologyNotifiers []topology.Notifier
connectionBreaker breaker.Interface
logger logging.Logger
tracer *tracing.Tracer
......@@ -241,9 +241,11 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
return
}
if s.topologyNotifier != nil {
if err := s.topologyNotifier.Connected(ctx, i.BzzAddress.Overlay); err != nil {
s.logger.Debugf("topology notifier: %s: %v", peerID, err)
if len(s.topologyNotifiers) > 0 {
for _, tn := range s.topologyNotifiers {
if err := tn.Connected(ctx, i.BzzAddress.Overlay); err != nil {
s.logger.Debugf("topology notifier: %s: %v", peerID, err)
}
}
}
......@@ -354,12 +356,16 @@ func (s *Service) ConnectNotify(ctx context.Context, addr ma.Multiaddr) (address
if err != nil {
return nil, fmt.Errorf("connect notify: %w", err)
}
if s.topologyNotifier != nil {
if err := s.topologyNotifier.Connected(ctx, address.Overlay); err != nil {
_ = s.disconnect(info.ID)
return nil, fmt.Errorf("notify topology: %w", err)
if len(s.topologyNotifiers) > 0 {
for _, tn := range s.topologyNotifiers {
if err := tn.Connected(ctx, address.Overlay); err != nil {
_ = s.disconnect(info.ID)
return nil, fmt.Errorf("notify topology: %w", err)
}
}
}
return address, nil
}
......@@ -423,7 +429,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
func (s *Service) Disconnect(overlay swarm.Address) error {
peerID, found := s.peers.peerID(overlay)
if !found {
s.peers.disconnecter.Disconnected(overlay)
s.peers.disconnect(overlay)
return p2p.ErrPeerNotFound
}
......@@ -442,15 +448,15 @@ func (s *Service) Peers() []p2p.Peer {
return s.peers.peers()
}
func (s *Service) SetNotifier(n topology.Notifier) {
s.topologyNotifier = n
s.peers.setDisconnecter(n)
func (s *Service) AddNotifier(n topology.Notifier) {
s.topologyNotifiers = append(s.topologyNotifiers, n)
s.peers.addDisconnecter(n)
}
func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, headers p2p.Headers, protocolName, protocolVersion, streamName string) (p2p.Stream, error) {
peerID, found := s.peers.peerID(overlay)
if !found {
s.peers.disconnecter.Disconnected(overlay)
s.peers.disconnect(overlay)
return nil, p2p.ErrPeerNotFound
}
......
......@@ -70,7 +70,7 @@ func newService(t *testing.T, networkID uint64, o libp2pServiceOpts) (s *libp2p.
t.Fatal(err)
}
s.SetNotifier(noopNotifier)
s.AddNotifier(noopNotifier)
t.Cleanup(func() {
cancel()
......
......@@ -24,8 +24,9 @@ type peerRegistry struct {
streams map[libp2ppeer.ID]map[network.Stream]context.CancelFunc
mu sync.RWMutex
disconnecter topology.Disconnecter // peerRegistry notifies topology on peer disconnection
network.Notifiee // peerRegistry can be the receiver for network.Notify
//nolint:misspell
disconnecters []topology.Disconnecter // peerRegistry notifies topology on peer disconnection
network.Notifiee // peerRegistry can be the receiver for network.Notify
}
func newPeerRegistry() *peerRegistry {
......@@ -73,8 +74,11 @@ func (r *peerRegistry) Disconnected(_ network.Network, c network.Conn) {
delete(r.streams, peerID)
r.mu.Unlock()
if r.disconnecter != nil {
r.disconnecter.Disconnected(overlay)
if len(r.disconnecters) > 0 {
for _, d := range r.disconnecters {
d.Disconnected(overlay)
}
}
}
......@@ -169,11 +173,19 @@ func (r *peerRegistry) remove(peerID libp2ppeer.ID) {
r.mu.Unlock()
// if overlay was not found disconnect handler should not be signaled.
if r.disconnecter != nil && found {
r.disconnecter.Disconnected(overlay)
if len(r.disconnecters) > 0 && found {
for _, d := range r.disconnecters {
d.Disconnected(overlay)
}
}
}
func (r *peerRegistry) setDisconnecter(d topology.Disconnecter) {
r.disconnecter = d
func (r *peerRegistry) addDisconnecter(d topology.Disconnecter) {
r.disconnecters = append(r.disconnecters, d)
}
func (r *peerRegistry) disconnect(address swarm.Address) {
for _, d := range r.disconnecters {
d.Disconnected(address)
}
}
......@@ -22,7 +22,7 @@ type Service struct {
connectFunc func(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error)
disconnectFunc func(overlay swarm.Address) error
peersFunc func() []p2p.Peer
setNotifierFunc func(topology.Notifier)
addNotifierFunc func(topology.Notifier)
addressesFunc func() ([]ma.Multiaddr, error)
setWelcomeMessageFunc func(string) error
getWelcomeMessageFunc func() string
......@@ -58,10 +58,10 @@ func WithPeersFunc(f func() []p2p.Peer) Option {
})
}
// WithSetNotifierFunc sets the mock implementation of the SetNotifier function
func WithSetNotifierFunc(f func(topology.Notifier)) Option {
// WithAddNotifierFunc sets the mock implementation of the AddNotifier function
func WithAddNotifierFunc(f func(topology.Notifier)) Option {
return optionFunc(func(s *Service) {
s.setNotifierFunc = f
s.addNotifierFunc = f
})
}
......@@ -124,12 +124,12 @@ func (s *Service) Disconnect(overlay swarm.Address) error {
return s.disconnectFunc(overlay)
}
func (s *Service) SetNotifier(f topology.Notifier) {
if s.setNotifierFunc == nil {
func (s *Service) AddNotifier(f topology.Notifier) {
if s.addNotifierFunc == nil {
return
}
s.setNotifierFunc(f)
s.addNotifierFunc(f)
}
func (s *Service) Addresses() ([]ma.Multiaddr, error) {
......
......@@ -24,7 +24,7 @@ type Service interface {
Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.Address, err error)
Disconnect(overlay swarm.Address) error
Peers() []Peer
SetNotifier(topology.Notifier)
AddNotifier(topology.Notifier)
Addresses() ([]ma.Multiaddr, error)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment