Commit e28f618b authored by Rodrigo Q. Saramago's avatar Rodrigo Q. Saramago Committed by GitHub

fix: add peer in topology through the debug api (#2093)

parent 0da5f1ac
......@@ -35,6 +35,14 @@ func (s *Service) peerConnectHandler(w http.ResponseWriter, r *http.Request) {
return
}
if err := s.topologyDriver.Connected(r.Context(), p2p.Peer{Address: bzzAddr.Overlay}, true); err != nil {
_ = s.p2p.Disconnect(bzzAddr.Overlay)
s.logger.Debugf("debug api: peer connect handler %s: %v", addr, err)
s.logger.Errorf("unable to connect to peer %s", addr)
jsonhttp.InternalServerError(w, err)
return
}
jsonhttp.OK(w, peerConnectResponse{
Address: bzzAddr.Overlay.String(),
})
......
......@@ -91,11 +91,13 @@ func TestLightPeerLimit(t *testing.T) {
var (
limit = 3
container = lightnode.NewContainer(test.RandomAddress())
sf, _ = newService(t, 1, libp2pServiceOpts{lightNodes: container,
sf, _ = newService(t, 1, libp2pServiceOpts{
lightNodes: container,
libp2pOpts: libp2p.Options{
LightNodeLimit: limit,
FullNode: true,
}})
},
})
notifier = mockNotifier(noopCf, noopDf, true)
)
......@@ -107,7 +109,8 @@ func TestLightPeerLimit(t *testing.T) {
sl, _ := newService(t, 1, libp2pServiceOpts{
libp2pOpts: libp2p.Options{
FullNode: false,
}})
},
})
_, err := sl.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
......@@ -437,7 +440,7 @@ func TestTopologyNotifier(t *testing.T) {
n2connectedPeer p2p.Peer
n2disconnectedPeer p2p.Peer
n1c = func(_ context.Context, p p2p.Peer) error {
n1c = func(_ context.Context, p p2p.Peer, _ bool) error {
mtx.Lock()
defer mtx.Unlock()
expectZeroAddress(t, n1connectedPeer.Address) // fail if set more than once
......@@ -451,7 +454,7 @@ func TestTopologyNotifier(t *testing.T) {
n1disconnectedPeer = p
}
n2c = func(_ context.Context, p p2p.Peer) error {
n2c = func(_ context.Context, p p2p.Peer, _ bool) error {
mtx.Lock()
defer mtx.Unlock()
expectZeroAddress(t, n2connectedPeer.Address) // fail if set more than once
......@@ -553,7 +556,7 @@ func TestTopologyOverSaturated(t *testing.T) {
n2connectedPeer p2p.Peer
n2disconnectedPeer p2p.Peer
n1c = func(_ context.Context, p p2p.Peer) error {
n1c = func(_ context.Context, p p2p.Peer, _ bool) error {
mtx.Lock()
defer mtx.Unlock()
expectZeroAddress(t, n1connectedPeer.Address) // fail if set more than once
......@@ -562,7 +565,7 @@ func TestTopologyOverSaturated(t *testing.T) {
}
n1d = func(p p2p.Peer) {}
n2c = func(_ context.Context, p p2p.Peer) error {
n2c = func(_ context.Context, p p2p.Peer, _ bool) error {
mtx.Lock()
defer mtx.Unlock()
expectZeroAddress(t, n2connectedPeer.Address) // fail if set more than once
......@@ -575,7 +578,7 @@ func TestTopologyOverSaturated(t *testing.T) {
n2disconnectedPeer = p
}
)
//this notifier will not pick the peer
// this notifier will not pick the peer
notifier1 := mockNotifier(n1c, n1d, false)
s1, overlay1 := newService(t, 1, libp2pServiceOpts{Addressbook: ab1, libp2pOpts: libp2p.Options{
FullNode: true,
......@@ -723,6 +726,7 @@ func expectStreamReset(t *testing.T, s io.ReadCloser, err error) {
}
}
}
func expectFullNode(t *testing.T, p p2p.Peer) {
t.Helper()
if !p.FullNode {
......@@ -769,13 +773,13 @@ func checkAddressbook(t *testing.T, ab addressbook.Getter, overlay swarm.Address
}
type notifiee struct {
connected func(context.Context, p2p.Peer) error
connected func(context.Context, p2p.Peer, bool) error
disconnected func(p2p.Peer)
pick bool
}
func (n *notifiee) Connected(c context.Context, p p2p.Peer) error {
return n.connected(c, p)
func (n *notifiee) Connected(c context.Context, p p2p.Peer, f bool) error {
return n.connected(c, p, f)
}
func (n *notifiee) Disconnected(p p2p.Peer) {
......@@ -794,10 +798,12 @@ func mockNotifier(c cFunc, d dFunc, pick bool) p2p.PickyNotifier {
return &notifiee{connected: c, disconnected: d, pick: pick}
}
type cFunc func(context.Context, p2p.Peer) error
type dFunc func(p2p.Peer)
type (
cFunc func(context.Context, p2p.Peer, bool) error
dFunc func(p2p.Peer)
)
var noopCf = func(_ context.Context, _ p2p.Peer) error {
var noopCf = func(_ context.Context, _ p2p.Peer, _ bool) error {
return nil
}
......
......@@ -363,7 +363,7 @@ func (s *Service) handleIncoming(stream network.Stream) {
if s.notifier != nil {
if !i.FullNode {
s.lightNodes.Connected(s.ctx, peer)
//light node announces explicitly
// light node announces explicitly
if err := s.notifier.Announce(s.ctx, peer.Address, i.FullNode); err != nil {
s.logger.Debugf("stream handler: notifier.Announce: %s: %v", peer.Address.String(), err)
}
......@@ -382,7 +382,7 @@ func (s *Service) handleIncoming(stream network.Stream) {
return
}
}
} else if err := s.notifier.Connected(s.ctx, peer); err != nil {
} else if err := s.notifier.Connected(s.ctx, peer, false); err != nil {
// full node announces implicitly
s.logger.Debugf("stream handler: notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
// note: this cannot be unit tested since the node
......@@ -700,7 +700,6 @@ func (s *Service) Disconnect(overlay swarm.Address) error {
// disconnected is a registered peer registry event
func (s *Service) disconnected(address swarm.Address) {
peer := p2p.Peer{Address: address}
peerID, found := s.peers.peerID(address)
if found {
......
......@@ -41,14 +41,14 @@ type Halter interface {
Halt()
}
// PickyNotifer can decide whether a peer should be picked
// PickyNotifier can decide whether a peer should be picked
type PickyNotifier interface {
Pick(Peer) bool
Notifier
}
type Notifier interface {
Connected(context.Context, Peer) error
Connected(context.Context, Peer, bool) error
Disconnected(Peer)
Announce(context.Context, swarm.Address, bool) error
}
......
......@@ -55,8 +55,10 @@ var (
errEmptyBin = errors.New("empty bin")
)
type binSaturationFunc func(bin uint8, peers, connected *pslice.PSlice) (saturated bool, oversaturated bool)
type sanctionedPeerFunc func(peer swarm.Address) bool
type (
binSaturationFunc func(bin uint8, peers, connected *pslice.PSlice) (saturated bool, oversaturated bool)
sanctionedPeerFunc func(peer swarm.Address) bool
)
var noopSanctionedPeerFn = func(_ swarm.Address) bool { return false }
......@@ -211,7 +213,6 @@ func (k *Kad) generateCommonBinPrefixes() {
}
}
}
}
// Clears the bit at pos in n.
......@@ -499,8 +500,8 @@ func (k *Kad) manage() {
// The wg makes sure that we wait for all the connection attempts,
// spun up by goroutines, to finish before we try the boot-nodes.
var wg sync.WaitGroup
var peerConnChan = make(chan *peerConnInfo)
var peerConnChan2 = make(chan *peerConnInfo)
peerConnChan := make(chan *peerConnInfo)
peerConnChan2 := make(chan *peerConnInfo)
go k.connectionAttemptsHandler(ctx, &wg, peerConnChan, peerConnChan2)
for {
......@@ -609,7 +610,7 @@ func (k *Kad) Start(_ context.Context) error {
func (k *Kad) connectBootNodes(ctx context.Context) {
var attempts, connected int
var totalAttempts = maxBootNodeAttempts * len(k.bootnodes)
totalAttempts := maxBootNodeAttempts * len(k.bootnodes)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
......@@ -874,7 +875,8 @@ func (k *Kad) Pick(peer p2p.Peer) bool {
}
// Connected is called when a peer has dialed in.
func (k *Kad) Connected(ctx context.Context, peer p2p.Peer) error {
// If forceConnection is true `overSaturated` is ignored for non-bootnodes.
func (k *Kad) Connected(ctx context.Context, peer p2p.Peer, forceConnection bool) error {
address := peer.Address
po := swarm.Proximity(k.base.Bytes(), address.Bytes())
......@@ -887,7 +889,9 @@ func (k *Kad) Connected(ctx context.Context, peer p2p.Peer) error {
_ = k.p2p.Disconnect(randPeer)
return k.connected(ctx, address)
}
return topology.ErrOversaturated
if !forceConnection {
return topology.ErrOversaturated
}
}
return k.connected(ctx, address)
......@@ -913,12 +917,10 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
k.notifyManageLoop()
k.notifyPeerSig()
return nil
}
// Disconnected is called when peer disconnects.
func (k *Kad) Disconnected(peer p2p.Peer) {
k.logger.Debugf("kademlia: disconnected peer %s", peer.Address)
k.connectedPeers.Remove(peer.Address)
......@@ -1007,7 +1009,7 @@ func (k *Kad) ClosestPeer(addr swarm.Address, includeSelf bool, skipPeers ...swa
peers := k.p2p.Peers()
var peersToDisconnect []swarm.Address
var closest = swarm.ZeroAddress
closest := swarm.ZeroAddress
if includeSelf {
closest = k.base
......@@ -1051,7 +1053,7 @@ func (k *Kad) ClosestPeer(addr swarm.Address, includeSelf bool, skipPeers ...swa
return swarm.Address{}, err
}
if closest.IsZero() { //no peers
if closest.IsZero() { // no peers
return swarm.Address{}, topology.ErrNotFound // only for light nodes
}
......@@ -1317,7 +1319,6 @@ func (k *Kad) Close() error {
}
func randomSubset(addrs []swarm.Address, count int) ([]swarm.Address, error) {
if count >= len(addrs) {
return addrs, nil
}
......@@ -1335,7 +1336,6 @@ func randomSubset(addrs []swarm.Address, count int) ([]swarm.Address, error) {
}
func (k *Kad) randomPeer(bin uint8) (swarm.Address, error) {
peers := k.connectedPeers.BinPeers(bin)
if len(peers) == 0 {
......
......@@ -366,7 +366,6 @@ func TestManageWithBalancing(t *testing.T) {
for i := 1; i <= int(swarm.MaxPO); i++ {
waitBalanced(t, kad, uint8(i))
}
}
// TestBinSaturation tests the builtin binSaturated function.
......@@ -1240,12 +1239,11 @@ func connectOne(t *testing.T, signer beeCrypto.Signer, k *kademlia.Kad, ab addre
if err := ab.Put(peer, *bzzAddr); err != nil {
t.Fatal(err)
}
err = k.Connected(context.Background(), p2p.Peer{Address: peer})
err = k.Connected(context.Background(), p2p.Peer{Address: peer}, false)
if !errors.Is(err, expErr) {
t.Fatalf("expected error %v , got %v", expErr, err)
}
}
func addOne(t *testing.T, signer beeCrypto.Signer, k *kademlia.Kad, ab addressbook.Putter, peer swarm.Address) {
......
......@@ -8,6 +8,7 @@ import (
"context"
"sync"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
)
......@@ -125,19 +126,32 @@ func (m *Mock) NeighborhoodDepth() uint8 {
}
// Connected is called when a peer dials in.
func (m *Mock) Connected(_ context.Context, addr swarm.Address) error {
func (m *Mock) Connected(_ context.Context, peer p2p.Peer, _ bool) error {
m.mtx.Lock()
m.peers = append(m.peers, addr)
m.peers = append(m.peers, peer.Address)
m.mtx.Unlock()
m.Trigger()
return nil
}
// Disconnected is called when a peer disconnects.
func (m *Mock) Disconnected(_ swarm.Address) {
func (m *Mock) Disconnected(peer p2p.Peer) {
m.mtx.Lock()
defer m.mtx.Unlock()
for i, addr := range m.peers {
if addr.Equal(peer.Address) {
m.peers = append(m.peers[:i], m.peers[i+1:]...)
break
}
}
m.Trigger()
}
func (m *Mock) Announce(_ context.Context, _ swarm.Address, _ bool) error {
return nil
}
func (m *Mock) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) {
channel := make(chan struct{}, 1)
var closeOnce sync.Once
......
......@@ -8,6 +8,7 @@ import (
"context"
"sync"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
)
......@@ -81,13 +82,25 @@ func (d *mock) AddPeers(addrs ...swarm.Address) {
d.peers = append(d.peers, addrs...)
}
func (d *mock) Connected(ctx context.Context, addr swarm.Address) error {
d.AddPeers(addr)
func (d *mock) Connected(ctx context.Context, peer p2p.Peer, _ bool) error {
d.AddPeers(peer.Address)
return nil
}
func (d *mock) Disconnected(swarm.Address) {
panic("todo")
func (d *mock) Disconnected(peer p2p.Peer) {
d.mtx.Lock()
defer d.mtx.Unlock()
for i, addr := range d.peers {
if addr.Equal(peer.Address) {
d.peers = append(d.peers[:i], d.peers[i+1:]...)
break
}
}
}
func (d *mock) Announce(_ context.Context, _ swarm.Address, _ bool) error {
return nil
}
func (d *mock) Peers() []swarm.Address {
......
......@@ -11,6 +11,7 @@ import (
"io"
"time"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -21,6 +22,7 @@ var (
)
type Driver interface {
p2p.Notifier
PeerAdder
ClosestPeerer
EachPeerer
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment