Commit 2edc5328 authored by aloknerurkar's avatar aloknerurkar Committed by GitHub

fix: flaky ping test and log correction (#2349)

parent f58b9d55
...@@ -32,6 +32,7 @@ require ( ...@@ -32,6 +32,7 @@ require (
github.com/libp2p/go-libp2p-discovery v0.5.1 // indirect github.com/libp2p/go-libp2p-discovery v0.5.1 // indirect
github.com/libp2p/go-libp2p-peerstore v0.2.7 github.com/libp2p/go-libp2p-peerstore v0.2.7
github.com/libp2p/go-libp2p-quic-transport v0.10.0 github.com/libp2p/go-libp2p-quic-transport v0.10.0
github.com/libp2p/go-libp2p-swarm v0.5.0 // indirect
github.com/libp2p/go-libp2p-transport-upgrader v0.4.2 github.com/libp2p/go-libp2p-transport-upgrader v0.4.2
github.com/libp2p/go-tcp-transport v0.2.3 github.com/libp2p/go-tcp-transport v0.2.3
github.com/libp2p/go-ws-transport v0.4.0 github.com/libp2p/go-ws-transport v0.4.0
......
...@@ -279,7 +279,7 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) { ...@@ -279,7 +279,7 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {
_, err = s.streamer.Ping(ctx, multiUnderlay) _, err = s.streamer.Ping(ctx, multiUnderlay)
if err != nil { if err != nil {
s.metrics.UnreachablePeers.Inc() s.metrics.UnreachablePeers.Inc()
s.logger.Warningf("hive: multi address underlay %s not reachable err: %w", multiUnderlay, err) s.logger.Debugf("hive: multi address underlay %s not reachable err: %s", multiUnderlay, err.Error())
return return
} }
......
...@@ -8,6 +8,8 @@ import ( ...@@ -8,6 +8,8 @@ import (
"context" "context"
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake" handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
libp2pm "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer" libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
) )
...@@ -23,3 +25,9 @@ func (s *Service) NewStreamForPeerID(peerID libp2ppeer.ID, protocolName, protoco ...@@ -23,3 +25,9 @@ func (s *Service) NewStreamForPeerID(peerID libp2ppeer.ID, protocolName, protoco
type StaticAddressResolver = staticAddressResolver type StaticAddressResolver = staticAddressResolver
var NewStaticAddressResolver = newStaticAddressResolver var NewStaticAddressResolver = newStaticAddressResolver
func WithHostFactory(factory func(context.Context, ...libp2pm.Option) (host.Host, error)) Options {
return Options{
hostFactory: factory,
}
}
...@@ -93,6 +93,7 @@ type Options struct { ...@@ -93,6 +93,7 @@ type Options struct {
LightNodeLimit int LightNodeLimit int
WelcomeMessage string WelcomeMessage string
Transaction []byte Transaction []byte
hostFactory func(context.Context, ...libp2p.Option) (host.Host, error)
} }
func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay swarm.Address, addr string, ab addressbook.Putter, storer storage.StateStorer, lightNodes *lightnode.Container, swapBackend handshake.SenderMatcher, logger logging.Logger, tracer *tracing.Tracer, o Options) (*Service, error) { func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay swarm.Address, addr string, ab addressbook.Putter, storer storage.StateStorer, lightNodes *lightnode.Container, swapBackend handshake.SenderMatcher, logger logging.Logger, tracer *tracing.Tracer, o Options) (*Service, error) {
...@@ -181,14 +182,19 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -181,14 +182,19 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
opts = append(opts, transports...) opts = append(opts, transports...)
h, err := libp2p.New(ctx, opts...) if o.hostFactory == nil {
// Use the default libp2p host creation
o.hostFactory = libp2p.New
}
h, err := o.hostFactory(ctx, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Support same non default security and transport options as // Support same non default security and transport options as
// original host. // original host.
dialer, err := libp2p.New(ctx, append(transports, security)...) dialer, err := o.hostFactory(ctx, append(transports, security)...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -224,7 +230,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -224,7 +230,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
// the addresses used are not dialable and hence should be cleaned up. We should create // the addresses used are not dialable and hence should be cleaned up. We should create
// this host with the same transports and security options to be able to dial to other // this host with the same transports and security options to be able to dial to other
// peers. // peers.
pingDialer, err := libp2p.New(ctx, append(transports, security, libp2p.NoListenAddrs)...) pingDialer, err := o.hostFactory(ctx, append(transports, security, libp2p.NoListenAddrs)...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -14,6 +14,10 @@ import ( ...@@ -14,6 +14,10 @@ import (
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/p2p/libp2p"
libp2pm "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/multiformats/go-multistream" "github.com/multiformats/go-multistream"
) )
...@@ -379,21 +383,26 @@ func TestConnectDisconnectEvents(t *testing.T) { ...@@ -379,21 +383,26 @@ func TestConnectDisconnectEvents(t *testing.T) {
} }
func TestPing(t *testing.T) { func TestPing(t *testing.T) {
t.Skip("test flaking")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
s1, _ := newService(t, 1, libp2pServiceOpts{}) s1, _ := newService(t, 1, libp2pServiceOpts{
libp2pOpts: libp2p.WithHostFactory(
s2, _ := newService(t, 1, libp2pServiceOpts{}) func(ctx context.Context, _ ...libp2pm.Option) (host.Host, error) {
return bhost.NewHost(ctx, swarmt.GenSwarm(t, ctx), &bhost.HostOpts{EnablePing: true})
},
),
})
defer s1.Close()
// Wait for listeners to start. There are times when the test fails unexpectedly s2, _ := newService(t, 1, libp2pServiceOpts{
// during CI and we suspect it is due to the listeners not starting in time. The libp2pOpts: libp2p.WithHostFactory(
// sleep here ensures CPU is given up for any goroutines which are not getting func(ctx context.Context, _ ...libp2pm.Option) (host.Host, error) {
// scheduled. Ideally we should explicitly check the TCP status on the port return bhost.NewHost(ctx, swarmt.GenSwarm(t, ctx), &bhost.HostOpts{EnablePing: true})
// where the libp2p.Host is started before assuming the host is up. This seems like },
// a bit of an overkill here unless the test starts flaking. ),
time.Sleep(time.Second) })
defer s2.Close()
addr := serviceUnderlayAddress(t, s1) addr := serviceUnderlayAddress(t, s1)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment