Commit cb397492 authored by aloknerurkar's avatar aloknerurkar Committed by GitHub

feat(hive, libp2p): check reachability while processing peer from hive (#2319)

parent 1fc07169
This diff is collapsed.
......@@ -14,6 +14,7 @@ import (
"context"
"errors"
"fmt"
"golang.org/x/sync/semaphore"
"sync"
"time"
......@@ -44,7 +45,7 @@ var (
)
type Service struct {
streamer p2p.Streamer
streamer p2p.StreamerPinger
addressBook addressbook.GetPutter
addPeersHandler func(...swarm.Address)
networkID uint64
......@@ -53,10 +54,14 @@ type Service struct {
inLimiter *ratelimit.Limiter
outLimiter *ratelimit.Limiter
clearMtx sync.Mutex
quit chan struct{}
wg sync.WaitGroup
peersChan chan pb.Peers
sem *semaphore.Weighted
}
func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uint64, logger logging.Logger) *Service {
return &Service{
func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, networkID uint64, logger logging.Logger) *Service {
svc := &Service{
streamer: streamer,
logger: logger,
addressBook: addressbook,
......@@ -64,7 +69,12 @@ func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uin
metrics: newMetrics(),
inLimiter: ratelimit.New(limitRate, limitBurst),
outLimiter: ratelimit.New(limitRate, limitBurst),
quit: make(chan struct{}),
peersChan: make(chan pb.Peers),
sem: semaphore.NewWeighted(int64(31)),
}
svc.startCheckPeersHandler()
return svc
}
func (s *Service) Protocol() p2p.ProtocolSpec {
......@@ -111,6 +121,23 @@ func (s *Service) SetAddPeersHandler(h func(addr ...swarm.Address)) {
s.addPeersHandler = h
}
func (s *Service) Close() error {
close(s.quit)
stopped := make(chan struct{})
go func() {
defer close(stopped)
s.wg.Wait()
}()
select {
case <-stopped:
return nil
case <-time.After(time.Second * 5):
return errors.New("hive: waited 5 seconds to close active goroutines")
}
}
func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swarm.Address) (err error) {
s.metrics.BroadcastPeersSends.Inc()
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, peersStreamName)
......@@ -176,13 +203,84 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St
// but we still want to handle not closed stream from the other side to avoid zombie stream
go stream.FullClose()
var peers []swarm.Address
for _, newPeer := range peersReq.Peers {
select {
case s.peersChan <- peersReq:
case <-s.quit:
return errors.New("failed to process peers, shutting down hive")
}
return nil
}
func (s *Service) disconnect(peer p2p.Peer) error {
s.clearMtx.Lock()
defer s.clearMtx.Unlock()
s.inLimiter.Clear(peer.Address.ByteString())
s.outLimiter.Clear(peer.Address.ByteString())
return nil
}
func (s *Service) startCheckPeersHandler() {
ctx, cancel := context.WithCancel(context.Background())
s.wg.Add(1)
go func() {
defer s.wg.Done()
<-s.quit
cancel()
}()
s.wg.Add(1)
go func() {
defer s.wg.Done()
for {
select {
case <-ctx.Done():
return
case newPeers := <-s.peersChan:
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.checkAndAddPeers(ctx, newPeers)
}()
}
}
}()
}
func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {
var peersToAdd []swarm.Address
mtx := sync.Mutex{}
wg := sync.WaitGroup{}
for _, p := range peers.Peers {
err := s.sem.Acquire(ctx, 1)
if err != nil {
return
}
wg.Add(1)
go func(newPeer *pb.BzzAddress) {
defer func() {
s.sem.Release(1)
wg.Done()
}()
multiUnderlay, err := ma.NewMultiaddrBytes(newPeer.Underlay)
if err != nil {
s.logger.Errorf("hive: multi address underlay err: %v", err)
continue
return
}
// check if the underlay is usable by doing a raw ping using libp2p
_, err = s.streamer.Ping(ctx, multiUnderlay)
if err != nil {
s.metrics.UnreachablePeers.Inc()
s.logger.Warningf("hive: multi address underlay %s not reachable err: %w", multiUnderlay, err)
return
}
bzzAddress := bzz.Address{
......@@ -195,26 +293,18 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St
err = s.addressBook.Put(bzzAddress.Overlay, bzzAddress)
if err != nil {
s.logger.Warningf("skipping peer in response %s: %v", newPeer.String(), err)
continue
return
}
peers = append(peers, bzzAddress.Overlay)
mtx.Lock()
peersToAdd = append(peersToAdd, bzzAddress.Overlay)
mtx.Unlock()
}(p)
}
if s.addPeersHandler != nil {
s.addPeersHandler(peers...)
}
return nil
}
func (s *Service) disconnect(peer p2p.Peer) error {
s.clearMtx.Lock()
defer s.clearMtx.Unlock()
wg.Wait()
s.inLimiter.Clear(peer.Address.ByteString())
s.outLimiter.Clear(peer.Address.ByteString())
return nil
if s.addPeersHandler != nil && len(peersToAdd) > 0 {
s.addPeersHandler(peersToAdd...)
}
}
......@@ -7,6 +7,7 @@ package hive_test
import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"math/rand"
......@@ -45,8 +46,10 @@ func TestHandlerRateLimit(t *testing.T) {
addressbookclean := ab.New(mock.NewStateStore())
// new recorder for handling Ping
streamer := streamtest.New()
// create a hive server that handles the incoming stream
server := hive.New(nil, addressbookclean, networkID, logger)
server := hive.New(streamer, addressbookclean, networkID, logger)
serverAddress := test.RandomAddress()
......@@ -160,6 +163,7 @@ func TestBroadcastPeers(t *testing.T) {
wantMsgs []pb.Peers
wantOverlays []swarm.Address
wantBzzAddresses []bzz.Address
pingErr func(addr ma.Multiaddr) (time.Duration, error)
}{
"OK - single record": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
......@@ -196,14 +200,36 @@ func TestBroadcastPeers(t *testing.T) {
wantOverlays: overlays[:2*hive.MaxBatchSize],
wantBzzAddresses: bzzAddresses[:2*hive.MaxBatchSize],
},
"OK - single batch - skip ping failures": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: overlays[:15],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:15]}},
wantOverlays: overlays[:10],
wantBzzAddresses: bzzAddresses[:10],
pingErr: func(addr ma.Multiaddr) (rtt time.Duration, err error) {
for _, v := range bzzAddresses[10:15] {
if v.Underlay.Equal(addr) {
return rtt, errors.New("ping failure")
}
}
return rtt, nil
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
addressbookclean := ab.New(mock.NewStateStore())
// new recorder for handling Ping
var streamer *streamtest.Recorder
if tc.pingErr != nil {
streamer = streamtest.New(streamtest.WithPingErr(tc.pingErr))
} else {
streamer = streamtest.New()
}
// create a hive server that handles the incoming stream
server := hive.New(nil, addressbookclean, networkID, logger)
server := hive.New(streamer, addressbookclean, networkID, logger)
// setup the stream recorder to record stream data
recorder := streamtest.New(
......
......@@ -16,6 +16,7 @@ type metrics struct {
PeersHandler prometheus.Counter
PeersHandlerPeers prometheus.Counter
UnreachablePeers prometheus.Counter
}
func newMetrics() metrics {
......@@ -52,6 +53,12 @@ func newMetrics() metrics {
Name: "peers_handler_peers_count",
Help: "Number of peers received in peer messages.",
}),
UnreachablePeers: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "unreachable_peers_count",
Help: "Number of peers that are unreachable.",
}),
}
}
......
......@@ -107,6 +107,7 @@ type Bee struct {
listenerCloser io.Closer
postageServiceCloser io.Closer
priceOracleCloser io.Closer
hiveCloser io.Closer
shutdownInProgress bool
shutdownMutex sync.Mutex
}
......@@ -477,6 +478,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
if err = p2ps.AddProtocol(hive.Protocol()); err != nil {
return nil, fmt.Errorf("hive service: %w", err)
}
b.hiveCloser = hive
var bootnodes []ma.Multiaddr
......@@ -737,6 +739,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
debugAPIService.MustRegisterMetrics(pullStorage.Metrics()...)
debugAPIService.MustRegisterMetrics(retrieve.Metrics()...)
debugAPIService.MustRegisterMetrics(lightNodes.Metrics()...)
debugAPIService.MustRegisterMetrics(hive.Metrics()...)
if bs, ok := batchStore.(metrics.Collector); ok {
debugAPIService.MustRegisterMetrics(bs.Metrics()...)
......@@ -835,7 +838,7 @@ func (b *Bee) Shutdown(ctx context.Context) error {
b.recoveryHandleCleanup()
}
var wg sync.WaitGroup
wg.Add(5)
wg.Add(6)
go func() {
defer wg.Done()
tryClose(b.pssCloser, "pss")
......@@ -858,6 +861,10 @@ func (b *Bee) Shutdown(ctx context.Context) error {
defer wg.Done()
tryClose(b.pullSyncCloser, "pull sync")
}()
go func() {
defer wg.Done()
tryClose(b.hiveCloser, "pull sync")
}()
wg.Wait()
......
......@@ -37,6 +37,7 @@ import (
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
libp2pping "github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-tcp-transport"
ws "github.com/libp2p/go-ws-transport"
ma "github.com/multiformats/go-multiaddr"
......@@ -56,6 +57,7 @@ type Service struct {
natManager basichost.NATManager
natAddrResolver *staticAddressResolver
autonatDialer host.Host
pingDialer host.Host
libp2pPeerstore peerstore.Peerstore
metrics metrics
networkID uint64
......@@ -217,6 +219,16 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
return nil, fmt.Errorf("handshake service: %w", err)
}
// Create a new dialer for libp2p ping protocol. This ensures that the protocol
// uses a different set of keys to do ping. It prevents inconsistencies in peerstore as
// the addresses used are not dialable and hence should be cleaned up. We should create
// this host with the same transports and security options to be able to dial to other
// peers.
pingDialer, err := libp2p.New(ctx, append(transports, security, libp2p.NoListenAddrs)...)
if err != nil {
return nil, err
}
peerRegistry := newPeerRegistry()
s := &Service{
ctx: ctx,
......@@ -224,6 +236,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
natManager: natManager,
natAddrResolver: natAddrResolver,
autonatDialer: dialer,
pingDialer: pingDialer,
handshakeService: handshakeService,
libp2pPeerstore: libp2pPeerstore,
metrics: newMetrics(),
......@@ -790,6 +803,9 @@ func (s *Service) Close() error {
if err := s.autonatDialer.Close(); err != nil {
return err
}
if err := s.pingDialer.Close(); err != nil {
return err
}
return s.host.Close()
}
......@@ -811,3 +827,20 @@ func (s *Service) Ready() {
func (s *Service) Halt() {
close(s.halt)
}
func (s *Service) Ping(ctx context.Context, addr ma.Multiaddr) (rtt time.Duration, err error) {
info, err := libp2ppeer.AddrInfoFromP2pAddr(addr)
if err != nil {
return rtt, fmt.Errorf("unable to parse underlay address: %w", err)
}
// Add the address to libp2p peerstore for it to be dialable
s.pingDialer.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL)
select {
case <-ctx.Done():
return rtt, ctx.Err()
case res := <-libp2pping.Ping(ctx, s.pingDialer, info.ID):
return res.RTT, res.Error
}
}
......@@ -378,6 +378,29 @@ func TestConnectDisconnectEvents(t *testing.T) {
}
func TestPing(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
s1, _ := newService(t, 1, libp2pServiceOpts{})
s2, _ := newService(t, 1, libp2pServiceOpts{})
// Wait for listeners to start. There are times when the test fails unexpectedly
// during CI and we suspect it is due to the listeners not starting in time. The
// sleep here ensures CPU is given up for any goroutines which are not getting
// scheduled. Ideally we should explicitly check the TCP status on the port
// where the libp2p.Host is started before assuming the host is up. This seems like
// a bit of an overkill here unless the test starts flaking.
time.Sleep(time.Second)
addr := serviceUnderlayAddress(t, s1)
if _, err := s2.Ping(ctx, addr); err != nil {
t.Fatal(err)
}
}
const (
testProtocolName = "testing"
testProtocolVersion = "2.3.4"
......
......@@ -70,6 +70,18 @@ type StreamerDisconnecter interface {
Disconnecter
}
// Pinger interface is used to ping a underlay address which is not yet known to the bee node.
// It uses libp2p's default ping protocol. This is different from the PingPong protocol as this
// is meant to be used before we know a particular underlay and we can consider it useful
type Pinger interface {
Ping(ctx context.Context, addr ma.Multiaddr) (rtt time.Duration, err error)
}
type StreamerPinger interface {
Streamer
Pinger
}
// Stream represent a bidirectional data Stream.
type Stream interface {
io.ReadWriter
......
......@@ -14,6 +14,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
var (
......@@ -37,6 +38,7 @@ type Recorder struct {
protocols []p2p.ProtocolSpec
middlewares []p2p.HandlerMiddleware
streamErr func(swarm.Address, string, string, string) error
pingErr func(ma.Multiaddr) (time.Duration, error)
protocolsWithPeers map[string]p2p.ProtocolSpec
}
......@@ -76,6 +78,12 @@ func WithStreamError(streamErr func(swarm.Address, string, string, string) error
})
}
func WithPingErr(pingErr func(ma.Multiaddr) (time.Duration, error)) Option {
return optionFunc(func(r *Recorder) {
r.pingErr = pingErr
})
}
func New(opts ...Option) *Recorder {
r := &Recorder{
records: make(map[string][]*Record),
......@@ -153,6 +161,13 @@ func (r *Recorder) NewStream(ctx context.Context, addr swarm.Address, h p2p.Head
return streamOut, nil
}
func (r *Recorder) Ping(ctx context.Context, addr ma.Multiaddr) (rtt time.Duration, err error) {
if r.pingErr != nil {
return r.pingErr(addr)
}
return rtt, err
}
func (r *Recorder) Records(addr swarm.Address, protocolName, protocolVersio, streamName string) ([]*Record, error) {
id := addr.String() + p2p.NewSwarmStreamName(protocolName, protocolVersio, streamName)
......
......@@ -18,6 +18,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/streamtest"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
"golang.org/x/sync/errgroup"
)
......@@ -758,6 +759,28 @@ func TestRecorder_withStreamError(t *testing.T) {
}, nil)
}
func TestRecorder_ping(t *testing.T) {
testAddr, _ := ma.NewMultiaddr("/ip4/0.0.0.0/tcp/0")
rec := streamtest.New()
_, err := rec.Ping(context.Background(), testAddr)
if err != nil {
t.Fatalf("unable to ping err: %s", err.Error())
}
rec2 := streamtest.New(
streamtest.WithPingErr(func(_ ma.Multiaddr) (rtt time.Duration, err error) {
return rtt, errors.New("fail")
}),
)
_, err = rec2.Ping(context.Background(), testAddr)
if err == nil {
t.Fatal("expected ping err")
}
}
const (
testProtocolName = "testing"
testProtocolVersion = "1.0.1"
......
......@@ -166,13 +166,14 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin
return 0, 1, context.Canceled
}
if isLive && len(p.liveSyncReplies) > 0 {
p.mtx.Lock()
if p.liveSyncCalls >= len(p.liveSyncReplies) {
p.mtx.Unlock()
<-p.quit
// when shutting down, onthe puller side we cancel the context going into the pullsync protocol request
// this results in SyncInterval returning with a context cancelled error
return 0, 0, context.Canceled
}
p.mtx.Lock()
v := p.liveSyncReplies[p.liveSyncCalls]
p.liveSyncCalls++
p.mtx.Unlock()
......
......@@ -465,7 +465,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
}
}
}
for i := 0; i < 64; i++ {
for i := 0; i < 32; i++ {
go connAttempt(peerConnChan)
}
for i := 0; i < 8; i++ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment