Commit 363dcd69 authored by acud's avatar acud Committed by GitHub

shadow stream descriptor in libp2p (#295)

* shadow stream descriptor in libp2p
parent 6c85c5c8
...@@ -231,6 +231,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -231,6 +231,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
for _, ss := range p.StreamSpecs { for _, ss := range p.StreamSpecs {
ss := ss
id := protocol.ID(p2p.NewSwarmStreamName(p.Name, p.Version, ss.Name)) id := protocol.ID(p2p.NewSwarmStreamName(p.Name, p.Version, ss.Name))
matcher, err := s.protocolSemverMatcher(id) matcher, err := s.protocolSemverMatcher(id)
if err != nil { if err != nil {
......
...@@ -7,6 +7,7 @@ package libp2p_test ...@@ -7,6 +7,7 @@ package libp2p_test
import ( import (
"context" "context"
"errors" "errors"
"sync/atomic"
"testing" "testing"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
...@@ -43,6 +44,52 @@ func TestNewStream(t *testing.T) { ...@@ -43,6 +44,52 @@ func TestNewStream(t *testing.T) {
} }
} }
// TestNewStreamMulti is a regression test to see that we trigger
// the right handler when multiple streams are registered under
// a single protocol.
func TestNewStreamMulti(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1 := newService(t, 1, libp2p.Options{})
var (
h1calls, h2calls int32
h1 = func(_ context.Context, _ p2p.Peer, _ p2p.Stream) error {
_ = atomic.AddInt32(&h1calls, 1)
return nil
}
h2 = func(_ context.Context, _ p2p.Peer, _ p2p.Stream) error {
_ = atomic.AddInt32(&h2calls, 1)
return nil
}
)
s2, _ := newService(t, 1, libp2p.Options{})
if err := s1.AddProtocol(newTestMultiProtocol(h1, h2)); err != nil {
t.Fatal(err)
}
addr := serviceUnderlayAddress(t, s1)
if _, err := s2.Connect(ctx, addr); err != nil {
t.Fatal(err)
}
stream, err := s2.NewStream(ctx, overlay1, nil, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
t.Fatal(err)
}
if err := stream.Close(); err != nil {
t.Fatal(err)
}
if atomic.LoadInt32(&h1calls) != 1 {
t.Fatal("handler should have been called but wasnt")
}
if atomic.LoadInt32(&h2calls) > 0 {
t.Fatal("handler should not have been called")
}
}
func TestNewStream_errNotSupported(t *testing.T) { func TestNewStream_errNotSupported(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
...@@ -166,9 +213,10 @@ func TestDisconnectError(t *testing.T) { ...@@ -166,9 +213,10 @@ func TestDisconnectError(t *testing.T) {
} }
const ( const (
testProtocolName = "testing" testProtocolName = "testing"
testProtocolVersion = "2.3.4" testProtocolVersion = "2.3.4"
testStreamName = "messages" testStreamName = "messages"
testSecondStreamName = "cookies"
) )
func newTestProtocol(h p2p.HandlerFunc) p2p.ProtocolSpec { func newTestProtocol(h p2p.HandlerFunc) p2p.ProtocolSpec {
...@@ -184,6 +232,23 @@ func newTestProtocol(h p2p.HandlerFunc) p2p.ProtocolSpec { ...@@ -184,6 +232,23 @@ func newTestProtocol(h p2p.HandlerFunc) p2p.ProtocolSpec {
} }
} }
func newTestMultiProtocol(h1, h2 p2p.HandlerFunc) p2p.ProtocolSpec {
return p2p.ProtocolSpec{
Name: testProtocolName,
Version: testProtocolVersion,
StreamSpecs: []p2p.StreamSpec{
{
Name: testStreamName,
Handler: h1,
},
{
Name: testSecondStreamName,
Handler: h2,
},
},
}
}
func expectErrNotSupported(t *testing.T, err error) { func expectErrNotSupported(t *testing.T, err error) {
t.Helper() t.Helper()
if e := (*p2p.IncompatibleStreamError)(nil); !errors.As(err, &e) { if e := (*p2p.IncompatibleStreamError)(nil); !errors.As(err, &e) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment