Commit 23cee426 authored by acud's avatar acud Committed by GitHub

feat: announce to lightnodes (#2351)

parent 4237d08d
...@@ -545,6 +545,98 @@ func TestTopologyNotifier(t *testing.T) { ...@@ -545,6 +545,98 @@ func TestTopologyNotifier(t *testing.T) {
waitAddrSet(t, &n2disconnectedPeer.Address, &mtx, overlay1) waitAddrSet(t, &n2disconnectedPeer.Address, &mtx, overlay1)
} }
// TestTopologyAnnounce checks that announcement
// works correctly for full nodes and light nodes.
func TestTopologyAnnounce(t *testing.T) {
var (
mtx sync.Mutex
ctx = context.Background()
ab1, ab2, ab3 = addressbook.New(mock.NewStateStore()), addressbook.New(mock.NewStateStore()), addressbook.New(mock.NewStateStore())
announceCalled = false
announceToCalled = false
n1a = func(context.Context, swarm.Address, bool) error {
mtx.Lock()
announceCalled = true
mtx.Unlock()
return nil
}
n1at = func(context.Context, swarm.Address, swarm.Address, bool) error {
mtx.Lock()
announceToCalled = true
mtx.Unlock()
return nil
}
)
// test setup: 2 full nodes and one light
// light connect to full(1), then full(2)
// connects to full(1), check that full(1)
// tried to announce full(2) to light.
notifier1 := mockAnnouncingNotifier(n1a, n1at)
s1, overlay1 := newService(t, 1, libp2pServiceOpts{
Addressbook: ab1,
libp2pOpts: libp2p.Options{
FullNode: true,
},
})
s1.SetPickyNotifier(notifier1)
s2, overlay2 := newService(t, 1, libp2pServiceOpts{
Addressbook: ab2,
libp2pOpts: libp2p.Options{
FullNode: true,
},
})
s3, overlay3 := newService(t, 1, libp2pServiceOpts{
Addressbook: ab3,
libp2pOpts: libp2p.Options{
FullNode: false,
},
})
addr := serviceUnderlayAddress(t, s1)
// s3 (light) connects to s1 (full)
_, err := s3.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s3, overlay1)
expectPeersEventually(t, s1, overlay3)
mtx.Lock()
if !announceCalled {
t.Error("expected announce to be called")
}
if announceToCalled {
t.Error("announceTo called but should not")
}
mtx.Unlock()
// check address book entries are there
checkAddressbook(t, ab3, overlay1, addr)
// s2 (full) connects to s1 (full)
_, err = s2.Connect(ctx, addr)
if err != nil {
t.Fatal(err)
}
expectPeers(t, s2, overlay1)
expectPeersEventually(t, s1, overlay2, overlay3)
mtx.Lock()
if !announceToCalled {
t.Error("expected announceTo to be called")
}
mtx.Unlock()
}
func TestTopologyOverSaturated(t *testing.T) { func TestTopologyOverSaturated(t *testing.T) {
var ( var (
mtx sync.Mutex mtx sync.Mutex
...@@ -773,9 +865,11 @@ func checkAddressbook(t *testing.T, ab addressbook.Getter, overlay swarm.Address ...@@ -773,9 +865,11 @@ func checkAddressbook(t *testing.T, ab addressbook.Getter, overlay swarm.Address
} }
type notifiee struct { type notifiee struct {
connected func(context.Context, p2p.Peer, bool) error connected cFunc
disconnected func(p2p.Peer) disconnected dFunc
pick bool pick bool
announce announceFunc
announceTo announceToFunc
} }
func (n *notifiee) Connected(c context.Context, p p2p.Peer, f bool) error { func (n *notifiee) Connected(c context.Context, p p2p.Peer, f bool) error {
...@@ -790,21 +884,30 @@ func (n *notifiee) Pick(p p2p.Peer) bool { ...@@ -790,21 +884,30 @@ func (n *notifiee) Pick(p p2p.Peer) bool {
return n.pick return n.pick
} }
func (n *notifiee) Announce(context.Context, swarm.Address, bool) error { func (n *notifiee) Announce(ctx context.Context, a swarm.Address, full bool) error {
return nil return n.announce(ctx, a, full)
}
func (n *notifiee) AnnounceTo(ctx context.Context, a, b swarm.Address, full bool) error {
return n.announceTo(ctx, a, b, full)
} }
func mockNotifier(c cFunc, d dFunc, pick bool) p2p.PickyNotifier { func mockNotifier(c cFunc, d dFunc, pick bool) p2p.PickyNotifier {
return &notifiee{connected: c, disconnected: d, pick: pick} return &notifiee{connected: c, disconnected: d, pick: pick, announce: noopAnnounce, announceTo: noopAnnounceTo}
}
func mockAnnouncingNotifier(a announceFunc, at announceToFunc) p2p.PickyNotifier {
return &notifiee{connected: noopCf, disconnected: noopDf, pick: true, announce: a, announceTo: at}
} }
type ( type (
cFunc func(context.Context, p2p.Peer, bool) error cFunc func(context.Context, p2p.Peer, bool) error
dFunc func(p2p.Peer) dFunc func(p2p.Peer)
announceFunc func(context.Context, swarm.Address, bool) error
announceToFunc func(context.Context, swarm.Address, swarm.Address, bool) error
) )
var noopCf = func(_ context.Context, _ p2p.Peer, _ bool) error { var noopCf = func(context.Context, p2p.Peer, bool) error { return nil }
return nil var noopDf = func(p2p.Peer) {}
} var noopAnnounce = func(context.Context, swarm.Address, bool) error { return nil }
var noopAnnounceTo = func(context.Context, swarm.Address, swarm.Address, bool) error { return nil }
var noopDf = func(p p2p.Peer) {}
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake" handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/lightnode" "github.com/ethersphere/bee/pkg/topology/lightnode"
"github.com/ethersphere/bee/pkg/tracing" "github.com/ethersphere/bee/pkg/tracing"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
...@@ -82,6 +83,7 @@ type lightnodes interface { ...@@ -82,6 +83,7 @@ type lightnodes interface {
Disconnected(p2p.Peer) Disconnected(p2p.Peer)
Count() int Count() int
RandomPeer(swarm.Address) (swarm.Address, error) RandomPeer(swarm.Address) (swarm.Address, error)
EachPeer(pf topology.EachPeerFunc) error
} }
type Options struct { type Options struct {
...@@ -396,7 +398,8 @@ func (s *Service) handleIncoming(stream network.Stream) { ...@@ -396,7 +398,8 @@ func (s *Service) handleIncoming(stream network.Stream) {
return return
} }
} }
} else if err := s.notifier.Connected(s.ctx, peer, false); err != nil { } else {
if err := s.notifier.Connected(s.ctx, peer, false); err != nil {
// full node announces implicitly // full node announces implicitly
s.logger.Debugf("stream handler: notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err) s.logger.Debugf("stream handler: notifier.Connected: peer disconnected: %s: %v", i.BzzAddress.Overlay, err)
// note: this cannot be unit tested since the node // note: this cannot be unit tested since the node
...@@ -411,6 +414,18 @@ func (s *Service) handleIncoming(stream network.Stream) { ...@@ -411,6 +414,18 @@ func (s *Service) handleIncoming(stream network.Stream) {
_ = s.Disconnect(overlay) _ = s.Disconnect(overlay)
return return
} }
// when a full node connects, we gossip about it to the
// light nodes so that they can also have a chance at building
// a solid topology.
_ = s.lightNodes.EachPeer(func(addr swarm.Address, _ uint8) (bool, bool, error) {
go func(addressee, peer swarm.Address, fullnode bool) {
if err := s.notifier.AnnounceTo(s.ctx, addressee, peer, fullnode); err != nil {
s.logger.Debugf("stream handler: notifier.Announce to light node %s %s: %v", addressee.String(), peer.String(), err)
}
}(addr, peer.Address, i.FullNode)
return false, false, nil
})
}
} }
s.metrics.HandledStreamCount.Inc() s.metrics.HandledStreamCount.Inc()
...@@ -483,6 +498,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { ...@@ -483,6 +498,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
if errors.As(err, &de) { if errors.As(err, &de) {
_ = stream.Reset() _ = stream.Reset()
_ = s.Disconnect(overlay) _ = s.Disconnect(overlay)
logger.Tracef("handler(%s): disconnecting %s due to disconnect error", p.Name, overlay.String())
} }
var bpe *p2p.BlockPeerError var bpe *p2p.BlockPeerError
...@@ -492,7 +508,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { ...@@ -492,7 +508,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
logger.Debugf("blocklist: could not blocklist peer %s: %v", peerID, err) logger.Debugf("blocklist: could not blocklist peer %s: %v", peerID, err)
logger.Errorf("unable to blocklist peer %v", peerID) logger.Errorf("unable to blocklist peer %v", peerID)
} }
logger.Tracef("blocklisted a peer %s", peerID) logger.Tracef("handler(%s): blocklisted %s", p.Name, overlay.String())
} }
// count unexpected requests // count unexpected requests
if errors.Is(err, p2p.ErrUnexpected) { if errors.Is(err, p2p.ErrUnexpected) {
......
...@@ -50,7 +50,8 @@ type PickyNotifier interface { ...@@ -50,7 +50,8 @@ type PickyNotifier interface {
type Notifier interface { type Notifier interface {
Connected(context.Context, Peer, bool) error Connected(context.Context, Peer, bool) error
Disconnected(Peer) Disconnected(Peer)
Announce(context.Context, swarm.Address, bool) error Announce(ctx context.Context, peer swarm.Address, fullnode bool) error
AnnounceTo(ctx context.Context, addressee, peer swarm.Address, fullnode bool) error
} }
// DebugService extends the Service with method used for debugging. // DebugService extends the Service with method used for debugging.
......
...@@ -53,6 +53,7 @@ var ( ...@@ -53,6 +53,7 @@ var (
errOverlayMismatch = errors.New("overlay mismatch") errOverlayMismatch = errors.New("overlay mismatch")
errPruneEntry = errors.New("prune entry") errPruneEntry = errors.New("prune entry")
errEmptyBin = errors.New("empty bin") errEmptyBin = errors.New("empty bin")
errAnnounceLightNode = errors.New("announcing light node")
) )
type ( type (
...@@ -842,6 +843,15 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) e ...@@ -842,6 +843,15 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) e
return err return err
} }
// AnnounceTo announces a selected peer to another.
func (k *Kad) AnnounceTo(ctx context.Context, addressee, peer swarm.Address, fullnode bool) error {
if !fullnode {
return errAnnounceLightNode
}
return k.discovery.BroadcastPeers(ctx, addressee, peer)
}
// AddPeers adds peers to the knownPeers list. // AddPeers adds peers to the knownPeers list.
// This does not guarantee that a connection will immediately // This does not guarantee that a connection will immediately
// be made to the peer. // be made to the peer.
......
...@@ -671,6 +671,32 @@ func TestDiscoveryHooks(t *testing.T) { ...@@ -671,6 +671,32 @@ func TestDiscoveryHooks(t *testing.T) {
waitBcast(t, disc, p3, p1, p2) waitBcast(t, disc, p3, p1, p2)
} }
func TestAnnounceTo(t *testing.T) {
var (
conns int32
_, kad, ab, disc, signer = newTestKademlia(t, &conns, nil, kademlia.Options{})
p1, p2 = test.RandomAddress(), test.RandomAddress()
)
if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
defer kad.Close()
// first add a peer from AddPeers, wait for the connection
addOne(t, signer, kad, ab, p1)
waitConn(t, &conns)
if err := kad.AnnounceTo(context.Background(), p1, p2, true); err != nil {
t.Fatal(err)
}
waitBcast(t, disc, p1, p2)
if err := kad.AnnounceTo(context.Background(), p1, p2, false); err == nil {
t.Fatal("expected error")
}
}
func TestBackoff(t *testing.T) { func TestBackoff(t *testing.T) {
// cheat and decrease the timer // cheat and decrease the timer
defer func(t time.Duration) { defer func(t time.Duration) {
......
...@@ -152,6 +152,10 @@ func (m *Mock) Announce(_ context.Context, _ swarm.Address, _ bool) error { ...@@ -152,6 +152,10 @@ func (m *Mock) Announce(_ context.Context, _ swarm.Address, _ bool) error {
return nil return nil
} }
func (m *Mock) AnnounceTo(_ context.Context, _, _ swarm.Address, _ bool) error {
return nil
}
func (m *Mock) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) { func (m *Mock) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) {
channel := make(chan struct{}, 1) channel := make(chan struct{}, 1)
var closeOnce sync.Once var closeOnce sync.Once
......
...@@ -96,6 +96,10 @@ PICKPEER: ...@@ -96,6 +96,10 @@ PICKPEER:
return addr, nil return addr, nil
} }
func (c *Container) EachPeer(pf topology.EachPeerFunc) error {
return c.connectedPeers.EachBin(pf)
}
func (c *Container) PeerInfo() topology.BinInfo { func (c *Container) PeerInfo() topology.BinInfo {
return topology.BinInfo{ return topology.BinInfo{
BinPopulation: uint(c.connectedPeers.Length()), BinPopulation: uint(c.connectedPeers.Length()),
......
...@@ -6,6 +6,7 @@ package lightnode_test ...@@ -6,6 +6,7 @@ package lightnode_test
import ( import (
"context" "context"
"errors"
"reflect" "reflect"
"testing" "testing"
...@@ -62,6 +63,18 @@ func TestContainer(t *testing.T) { ...@@ -62,6 +63,18 @@ func TestContainer(t *testing.T) {
if !p.Equal(p1) { if !p.Equal(p1) {
t.Fatalf("expected p2 but got %s", p.String()) t.Fatalf("expected p2 but got %s", p.String())
} }
i := 0
peers := []swarm.Address{p2, p1}
if err = c.EachPeer(func(p swarm.Address, _ uint8) (bool, bool, error) {
if !p.Equal(peers[i]) {
return false, false, errors.New("peer not in order")
}
i++
return false, false, nil
}); err != nil {
t.Fatal(err)
}
}) })
t.Run("empty container after peer disconnect", func(t *testing.T) { t.Run("empty container after peer disconnect", func(t *testing.T) {
c := lightnode.NewContainer(base) c := lightnode.NewContainer(base)
......
...@@ -103,6 +103,10 @@ func (d *mock) Announce(_ context.Context, _ swarm.Address, _ bool) error { ...@@ -103,6 +103,10 @@ func (d *mock) Announce(_ context.Context, _ swarm.Address, _ bool) error {
return nil return nil
} }
func (d *mock) AnnounceTo(_ context.Context, _, _ swarm.Address, _ bool) error {
return nil
}
func (d *mock) Peers() []swarm.Address { func (d *mock) Peers() []swarm.Address {
return d.peers return d.peers
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment