Commit 9cf23652 authored by acud's avatar acud Committed by GitHub

libp2p: wait for readyness before accepting incoming connections (#1390)

Co-authored-by: default avatarRalph Pichler <pichler.ralph@gmail.com>
parent bca20b91
......@@ -110,7 +110,7 @@ type Options struct {
SwapEnable bool
}
func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, signer crypto.Signer, networkID uint64, logger logging.Logger, libp2pPrivateKey, pssPrivateKey *ecdsa.PrivateKey, o Options) (*Bee, error) {
func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, signer crypto.Signer, networkID uint64, logger logging.Logger, libp2pPrivateKey, pssPrivateKey *ecdsa.PrivateKey, o Options) (b *Bee, err error) {
tracer, tracerCloser, err := tracing.NewTracer(&tracing.Options{
Enabled: o.TracingEnabled,
Endpoint: o.TracingEndpoint,
......@@ -121,8 +121,16 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
}
p2pCtx, p2pCancel := context.WithCancel(context.Background())
defer func() {
// if there's been an error on this function
// we'd like to cancel the p2p context so that
// incoming connections will not be possible
if err != nil {
p2pCancel()
}
}()
b := &Bee{
b = &Bee{
p2pCancel: p2pCancel,
errorLogWriter: logger.WriterLevel(logrus.ErrorLevel),
tracerCloser: tracerCloser,
......@@ -507,6 +515,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
if err := kad.Start(p2pCtx); err != nil {
return nil, err
}
p2ps.Ready()
return b, nil
}
......
......@@ -64,6 +64,7 @@ type Service struct {
notifier p2p.Notifier
logger logging.Logger
tracer *tracing.Tracer
ready chan struct{}
protocolsmu sync.RWMutex
}
......@@ -222,6 +223,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
logger: logger,
tracer: tracer,
connectionBreaker: breaker.NewBreaker(breaker.Options{}), // use default options
ready: make(chan struct{}),
}
peerRegistry.setDisconnecter(s)
......@@ -235,6 +237,11 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
// handshake
s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) {
select {
case <-s.ready:
case <-s.ctx.Done():
return
}
peerID := stream.Conn().RemotePeer()
handshakeStream := NewStream(stream)
i, err := s.handshakeService.Handle(ctx, handshakeStream, stream.Conn().RemoteMultiaddr(), peerID)
......@@ -658,3 +665,7 @@ func (s *Service) SetWelcomeMessage(val string) error {
func (s *Service) GetWelcomeMessage() string {
return s.handshakeService.GetWelcomeMessage()
}
func (s *Service) Ready() {
close(s.ready)
}
......@@ -69,6 +69,7 @@ func newService(t *testing.T, networkID uint64, o libp2pServiceOpts) (s *libp2p.
if err != nil {
t.Fatal(err)
}
s.Ready()
t.Cleanup(func() {
cancel()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment