Commit 995af63f authored by Anatolie Lupacescu's avatar Anatolie Lupacescu Committed by GitHub

fix(p2p): detect dial to light nodes and disconnect (#1689)

parent 89251900
......@@ -16,6 +16,8 @@ var (
ErrPeerNotFound = errors.New("peer not found")
// ErrAlreadyConnected is returned if connect was called for already connected node.
ErrAlreadyConnected = errors.New("already connected")
// ErrDialLightNode is returned if connect was attempted to a light node.
ErrDialLightNode = errors.New("target peer is a light node")
)
const (
......
......@@ -39,7 +39,9 @@ func TestConnectDisconnect(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
addr := serviceUnderlayAddress(t, s1)
......@@ -60,11 +62,33 @@ func TestConnectDisconnect(t *testing.T) {
expectPeersEventually(t, s1)
}
func TestConnectToLightPeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, _ := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: false,
}})
s2, _ := newService(t, 1, libp2pServiceOpts{})
addr := serviceUnderlayAddress(t, s1)
_, err := s2.Connect(ctx, addr)
if err != p2p.ErrDialLightNode {
t.Fatalf("expected err %v, got %v", p2p.ErrDialLightNode, err)
}
expectPeers(t, s2)
expectPeersEventually(t, s1)
}
func TestDoubleConnect(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
addr := serviceUnderlayAddress(t, s1)
......@@ -88,7 +112,9 @@ func TestDoubleDisconnect(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
addr := serviceUnderlayAddress(t, s1)
......@@ -120,7 +146,9 @@ func TestMultipleConnectDisconnect(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
......@@ -161,7 +189,9 @@ func TestConnectDisconnectOnAllAddresses(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
......@@ -191,16 +221,16 @@ func TestDoubleConnectOnAllAddresses(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
addrs, err := s1.Addresses()
if err != nil {
t.Fatal(err)
}
for _, addr := range addrs {
// creating new remote host for each address
s2, overlay2 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
if _, err := s2.Connect(ctx, addr); err != nil {
t.Fatal(err)
......@@ -279,7 +309,9 @@ func TestConnectRepeatHandshake(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
addr := serviceUnderlayAddress(t, s1)
......@@ -311,7 +343,9 @@ func TestConnectRepeatHandshake(t *testing.T) {
}
func TestBlocklisting(t *testing.T) {
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
addr1 := serviceUnderlayAddress(t, s1)
......@@ -499,7 +533,9 @@ func TestTopologyOverSaturated(t *testing.T) {
)
//this notifier will not pick the peer
notifier1 := mockNotifier(n1c, n1d, false)
s1, overlay1 := newService(t, 1, libp2pServiceOpts{Addressbook: ab1})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{Addressbook: ab1, libp2pOpts: libp2p.Options{
FullNode: true,
}})
s1.SetPickyNotifier(notifier1)
notifier2 := mockNotifier(n2c, n2d, false)
......@@ -524,7 +560,9 @@ func TestWithDisconnectStreams(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
testSpec := p2p.ProtocolSpec{
......@@ -568,7 +606,9 @@ func TestWithBlocklistStreams(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
testSpec := p2p.ProtocolSpec{
......
......@@ -11,6 +11,7 @@ import (
"time"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -23,7 +24,9 @@ func TestHeaders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
......@@ -70,7 +73,9 @@ func TestHeaders_empty(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
......@@ -126,7 +131,9 @@ func TestHeadler(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, _ := newService(t, 1, libp2pServiceOpts{})
......
......@@ -538,6 +538,12 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return nil, fmt.Errorf("handshake: %w", err)
}
if !i.FullNode {
_ = handshakeStream.Reset()
_ = s.host.Network().ClosePeer(info.ID)
return nil, p2p.ErrDialLightNode
}
blocked, err := s.blocklist.Exists(i.BzzAddress.Overlay)
if err != nil {
s.logger.Debugf("blocklisting: exists %s: %v", info.ID, err)
......@@ -568,12 +574,10 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
return nil, fmt.Errorf("connect full close %w", err)
}
if i.FullNode {
err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress)
if err != nil {
_ = s.Disconnect(i.BzzAddress.Overlay)
return nil, fmt.Errorf("storing bzz address: %w", err)
}
err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress)
if err != nil {
_ = s.Disconnect(i.BzzAddress.Overlay)
return nil, fmt.Errorf("storing bzz address: %w", err)
}
s.protocolsmu.RLock()
......
......@@ -13,6 +13,7 @@ import (
"time"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/multiformats/go-multistream"
)
......@@ -20,7 +21,9 @@ func TestNewStream(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, _ := newService(t, 1, libp2pServiceOpts{})
......@@ -52,7 +55,10 @@ func TestNewStreamMulti(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
var (
h1calls, h2calls int32
h1 = func(_ context.Context, _ p2p.Peer, s p2p.Stream) error {
......@@ -97,7 +103,9 @@ func TestNewStream_errNotSupported(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, _ := newService(t, 1, libp2pServiceOpts{})
......@@ -132,7 +140,9 @@ func TestNewStream_semanticVersioning(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, _ := newService(t, 1, libp2pServiceOpts{})
......@@ -191,7 +201,9 @@ func TestDisconnectError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, overlay2 := newService(t, 1, libp2pServiceOpts{})
......@@ -219,7 +231,9 @@ func TestConnectDisconnectEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, _ := newService(t, 1, libp2pServiceOpts{})
testProtocol := newTestProtocol(func(_ context.Context, _ p2p.Peer, _ p2p.Stream) error {
......
......@@ -11,6 +11,7 @@ import (
"time"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/tracing"
)
......@@ -33,7 +34,9 @@ func TestTracing(t *testing.T) {
}
defer closer2.Close()
s1, overlay1 := newService(t, 1, libp2pServiceOpts{})
s1, overlay1 := newService(t, 1, libp2pServiceOpts{libp2pOpts: libp2p.Options{
FullNode: true,
}})
s2, _ := newService(t, 1, libp2pServiceOpts{})
......
......@@ -41,7 +41,10 @@ var (
broadcastBinSize = 4
)
var errOverlayMismatch = errors.New("overlay mismatch")
var (
errOverlayMismatch = errors.New("overlay mismatch")
errPruneEntry = errors.New("prune entry")
)
type binSaturationFunc func(bin uint8, peers, connected *pslice.PSlice) (saturated bool, oversaturated bool)
type sanctionedPeerFunc func(peer swarm.Address) bool
......@@ -331,9 +334,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
return
}
switch err = k.connect(ctx, peer.addr, bzzAddr.Underlay); {
case errors.Is(err, errOverlayMismatch):
k.logger.Debugf("overlay mismatch has occurred to an overlay %q with underlay %q", peer.addr, bzzAddr.Underlay)
remove := func(peer *peerConnInfo) {
k.waitNextMu.Lock()
delete(k.waitNext, peer.addr.String())
k.waitNextMu.Unlock()
......@@ -341,7 +342,17 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
if err := k.addressBook.Remove(peer.addr); err != nil {
k.logger.Debugf("could not remove peer %q from addressbook", peer.addr)
}
fallthrough
}
switch err = k.connect(ctx, peer.addr, bzzAddr.Underlay); {
case errors.Is(err, errPruneEntry):
k.logger.Debugf("dial to light node with overlay %q and underlay %q", peer.addr, bzzAddr.Underlay)
remove(peer)
return
case errors.Is(err, errOverlayMismatch):
k.logger.Debugf("overlay mismatch has occurred to an overlay %q with underlay %q", peer.addr, bzzAddr.Underlay)
remove(peer)
return
case err != nil:
k.logger.Debugf("peer not reachable from kademlia %q: %v", bzzAddr, err)
k.logger.Warningf("peer not reachable when attempting to connect")
......@@ -497,8 +508,14 @@ func (k *Kad) connectBootnodes(ctx context.Context) {
return true, nil
}
bzzAddress, err := k.p2p.Connect(ctx, addr)
attempts++
if err != nil {
if errors.Is(err, p2p.ErrDialLightNode) {
k.logger.Debugf("connect fail %s: %v", addr, err)
k.logger.Warningf("connect to bootnode %s", addr)
return false, err
}
if !errors.Is(err, p2p.ErrAlreadyConnected) {
k.logger.Debugf("connect fail %s: %v", addr, err)
k.logger.Warningf("connect to bootnode %s", addr)
......@@ -619,6 +636,9 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
defer cancel()
i, err := k.p2p.Connect(ctx, ma)
if err != nil {
if errors.Is(err, p2p.ErrDialLightNode) {
return errPruneEntry
}
if errors.Is(err, p2p.ErrAlreadyConnected) {
if !i.Overlay.Equal(peer) {
return errOverlayMismatch
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment