Commit b46a390c authored by Janoš Guljaš's avatar Janoš Guljaš Committed by GitHub

declare version per protocol, not per stream (#12)

parent 60620c7e
......@@ -4,12 +4,12 @@ go 1.13
require (
github.com/btcsuite/btcd v0.20.1-beta
github.com/coreos/go-semver v0.3.0
github.com/gogo/protobuf v1.3.1
github.com/gorilla/handlers v1.4.2
github.com/gorilla/mux v1.7.3
github.com/libp2p/go-libp2p v0.5.1
github.com/libp2p/go-libp2p-autonat-svc v0.1.0
github.com/libp2p/go-libp2p-connmgr v0.2.1
github.com/libp2p/go-libp2p-core v0.3.0
github.com/libp2p/go-libp2p-quic-transport v0.2.2
github.com/libp2p/go-libp2p-secio v0.2.1
......
......@@ -209,8 +209,6 @@ github.com/libp2p/go-libp2p-blankhost v0.1.4/go.mod h1:oJF0saYsAXQCSfDq254GMNmLN
github.com/libp2p/go-libp2p-circuit v0.1.0/go.mod h1:Ahq4cY3V9VJcHcn1SBXjr78AbFkZeIRmfunbA7pmFh8=
github.com/libp2p/go-libp2p-circuit v0.1.4 h1:Phzbmrg3BkVzbqd4ZZ149JxCuUWu2wZcXf/Kr6hZJj8=
github.com/libp2p/go-libp2p-circuit v0.1.4/go.mod h1:CY67BrEjKNDhdTk8UgBX1Y/H5c3xkAcs3gnksxY7osU=
github.com/libp2p/go-libp2p-connmgr v0.2.1 h1:1ed0HFhCb39sIMK7QYgRBW0vibBBqFQMs4xt9a9AalY=
github.com/libp2p/go-libp2p-connmgr v0.2.1/go.mod h1:JReKEFcgzSHKT9lL3rhYcUtXBs9uMIiMKJGM1tl3xJE=
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/libp2p/go-libp2p-core v0.0.4/go.mod h1:jyuCQP356gzfCFtRKyvAbNkyeuxb7OlyhWZ3nls5d2I=
github.com/libp2p/go-libp2p-core v0.2.0/go.mod h1:X0eyB0Gy93v0DZtSYbEM7RnMChm9Uv3j7yRXjO77xSI=
......
......@@ -18,8 +18,8 @@ import (
const (
ProtocolName = "handshake"
ProtocolVersion = "1.0.0"
StreamName = "handshake"
StreamVersion = "1.0.0"
)
// ErrNetworkIDIncompatible should be returned by handshake handlers if
......
......@@ -18,7 +18,6 @@ import (
"github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat-svc"
crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
......@@ -150,10 +149,10 @@ func New(ctx context.Context, o Options) (*Service, error) {
// Construct protocols.
id := protocol.ID(p2p.NewSwarmStreamName(handshake.ProtocolName, handshake.StreamName, handshake.StreamVersion))
matcher, err := helpers.MultistreamSemverMatcher(id)
id := protocol.ID(p2p.NewSwarmStreamName(handshake.ProtocolName, handshake.ProtocolVersion, handshake.StreamName))
matcher, err := s.protocolSemverMatcher(id)
if err != nil {
return nil, fmt.Errorf("match semver %s: %w", id, err)
return nil, fmt.Errorf("protocol version match %s: %w", id, err)
}
s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) {
......@@ -202,10 +201,10 @@ func New(ctx context.Context, o Options) (*Service, error) {
func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
for _, ss := range p.StreamSpecs {
id := protocol.ID(p2p.NewSwarmStreamName(p.Name, ss.Name, ss.Version))
matcher, err := helpers.MultistreamSemverMatcher(id)
id := protocol.ID(p2p.NewSwarmStreamName(p.Name, p.Version, ss.Name))
matcher, err := s.protocolSemverMatcher(id)
if err != nil {
return fmt.Errorf("match semver %s: %w", id, err)
return fmt.Errorf("protocol version match %s: %w", id, err)
}
s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) {
......@@ -227,7 +226,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
_ = s.Disconnect(overlay)
}
s.logger.Debugf("handle protocol %s: stream %s/%s: peer %s: %w", p.Name, ss.Name, ss.Version, overlay, err)
s.logger.Debugf("handle protocol %s/%s: stream %s: peer %s: %w", p.Name, p.Version, ss.Name, overlay, err)
}
})
}
......@@ -260,7 +259,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm
return swarm.Address{}, err
}
stream, err := s.newStreamForPeerID(ctx, info.ID, handshake.ProtocolName, handshake.StreamName, handshake.StreamVersion)
stream, err := s.newStreamForPeerID(ctx, info.ID, handshake.ProtocolName, handshake.ProtocolVersion, handshake.StreamName)
if err != nil {
_ = s.host.Network().ClosePeer(info.ID)
return swarm.Address{}, fmt.Errorf("new stream: %w", err)
......@@ -291,17 +290,17 @@ func (s *Service) Disconnect(overlay swarm.Address) error {
return nil
}
func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, protocolName, streamName, version string) (p2p.Stream, error) {
func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, protocolName, protocolVersion, streamName string) (p2p.Stream, error) {
peerID, found := s.peers.peerID(overlay)
if !found {
return nil, p2p.ErrPeerNotFound
}
return s.newStreamForPeerID(ctx, peerID, protocolName, streamName, version)
return s.newStreamForPeerID(ctx, peerID, protocolName, protocolVersion, streamName)
}
func (s *Service) newStreamForPeerID(ctx context.Context, peerID libp2ppeer.ID, protocolName, streamName, version string) (p2p.Stream, error) {
swarmStreamName := p2p.NewSwarmStreamName(protocolName, streamName, version)
func (s *Service) newStreamForPeerID(ctx context.Context, peerID libp2ppeer.ID, protocolName, protocolVersion, streamName string) (p2p.Stream, error) {
swarmStreamName := p2p.NewSwarmStreamName(protocolName, protocolVersion, streamName)
st, err := s.host.NewStream(ctx, peerID, protocol.ID(swarmStreamName))
if err != nil {
if err == multistream.ErrNotSupported || err == multistream.ErrIncorrectVersion {
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package libp2p
import (
"errors"
"strings"
"github.com/coreos/go-semver/semver"
protocol "github.com/libp2p/go-libp2p-core/protocol"
)
// protocolSemverMatcher returns a matcher function for a given base protocol.
// Protocol ID must be constructed according to the Swarm protocol ID
// specification, where the second to last part is the version is semver format.
// The matcher function will return a boolean indicating whether a protocol ID
// matches the base protocol. A given protocol ID matches the base protocol if
// the IDs are the same and if the semantic version of the base protocol is the
// same or higher than that of the protocol ID provided.
func (s *Service) protocolSemverMatcher(base protocol.ID) (func(string) bool, error) {
parts := strings.Split(string(base), "/")
partsLen := len(parts)
if partsLen < 2 {
return nil, errors.New("invalid protocol id")
}
vers, err := semver.NewVersion(parts[partsLen-2])
if err != nil {
return nil, err
}
return func(check string) bool {
chparts := strings.Split(check, "/")
chpartsLen := len(chparts)
if chpartsLen != partsLen {
return false
}
for i, v := range chparts[:chpartsLen-2] {
if parts[i] != v {
return false
}
}
chvers, err := semver.NewVersion(chparts[chpartsLen-2])
if err != nil {
s.logger.Debugf("invalid protocol version %q: %v", check, err)
return false
}
return vers.Major == chvers.Major && vers.Minor >= chvers.Minor
}, nil
}
......@@ -19,7 +19,7 @@ type Service interface {
}
type Streamer interface {
NewStream(ctx context.Context, address swarm.Address, protocol, stream, version string) (Stream, error)
NewStream(ctx context.Context, address swarm.Address, protocol, version, stream string) (Stream, error)
}
type Stream interface {
......@@ -34,12 +34,12 @@ type PeerSuggester interface {
type ProtocolSpec struct {
Name string
Version string
StreamSpecs []StreamSpec
}
type StreamSpec struct {
Name string
Version string
Handler HandlerFunc
}
......@@ -51,6 +51,6 @@ type HandlerFunc func(Peer, Stream) error
type HandlerMiddleware func(HandlerFunc) HandlerFunc
func NewSwarmStreamName(protocol, stream, version string) string {
return "/swarm/" + protocol + "/" + stream + "/" + version
func NewSwarmStreamName(protocol, version, stream string) string {
return "/swarm/" + protocol + "/" + version + "/" + stream
}
......@@ -11,8 +11,8 @@ import (
)
func TestNewSwarmStreamName(t *testing.T) {
want := "/swarm/hive/peers/1.2.0"
got := p2p.NewSwarmStreamName("hive", "peers", "1.2.0")
want := "/swarm/hive/1.2.0/peers"
got := p2p.NewSwarmStreamName("hive", "1.2.0", "peers")
if got != want {
t.Errorf("got %s, want %s", got, want)
......
......@@ -48,7 +48,7 @@ func New(opts ...Option) *Recorder {
return r
}
func (r *Recorder) NewStream(_ context.Context, addr swarm.Address, protocolName, streamName, version string) (p2p.Stream, error) {
func (r *Recorder) NewStream(_ context.Context, addr swarm.Address, protocolName, protocolVersion, streamName string) (p2p.Stream, error) {
recordIn := newRecord()
recordOut := newRecord()
streamOut := newStream(recordIn, recordOut)
......@@ -56,9 +56,9 @@ func (r *Recorder) NewStream(_ context.Context, addr swarm.Address, protocolName
var handler p2p.HandlerFunc
for _, p := range r.protocols {
if p.Name == protocolName {
if p.Name == protocolName && p.Version == protocolVersion {
for _, s := range p.StreamSpecs {
if s.Name == streamName && s.Version == version {
if s.Name == streamName {
handler = s.Handler
}
}
......@@ -78,7 +78,7 @@ func (r *Recorder) NewStream(_ context.Context, addr swarm.Address, protocolName
}
}()
id := addr.String() + p2p.NewSwarmStreamName(protocolName, streamName, version)
id := addr.String() + p2p.NewSwarmStreamName(protocolName, protocolVersion, streamName)
r.recordsMu.Lock()
defer r.recordsMu.Unlock()
......@@ -87,8 +87,8 @@ func (r *Recorder) NewStream(_ context.Context, addr swarm.Address, protocolName
return streamOut, nil
}
func (r *Recorder) Records(addr swarm.Address, protocolName, streamName, version string) ([]*Record, error) {
id := addr.String() + p2p.NewSwarmStreamName(protocolName, streamName, version)
func (r *Recorder) Records(addr swarm.Address, protocolName, protocolVersio, streamName string) ([]*Record, error) {
id := addr.String() + p2p.NewSwarmStreamName(protocolName, protocolVersio, streamName)
r.recordsMu.Lock()
defer r.recordsMu.Unlock()
......
......@@ -54,7 +54,7 @@ func TestRecorder(t *testing.T) {
)
ask := func(ctx context.Context, s p2p.Streamer, address swarm.Address, questions ...string) (answers []string, err error) {
stream, err := s.NewStream(ctx, address, testProtocolName, testStreamName, testStreamVersion)
stream, err := s.NewStream(ctx, address, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
return nil, fmt.Errorf("new stream: %w", err)
}
......@@ -93,12 +93,12 @@ func TestRecorder(t *testing.T) {
}
}
_, err = recorder.Records(swarm.ZeroAddress, testProtocolName, "invalid stream name", testStreamVersion)
_, err = recorder.Records(swarm.ZeroAddress, testProtocolName, testProtocolVersion, "invalid stream name")
if err != streamtest.ErrRecordsNotFound {
t.Errorf("got error %v, want %v", err, streamtest.ErrRecordsNotFound)
}
records, err := recorder.Records(swarm.ZeroAddress, testProtocolName, testStreamName, testStreamVersion)
records, err := recorder.Records(swarm.ZeroAddress, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
t.Fatal(err)
}
......@@ -133,7 +133,7 @@ func TestRecorder_closeAfterPartialWrite(t *testing.T) {
)
request := func(ctx context.Context, s p2p.Streamer, address swarm.Address) (err error) {
stream, err := s.NewStream(ctx, address, testProtocolName, testStreamName, testStreamVersion)
stream, err := s.NewStream(ctx, address, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
......@@ -169,7 +169,7 @@ func TestRecorder_closeAfterPartialWrite(t *testing.T) {
t.Fatal(err)
}
records, err := recorder.Records(swarm.ZeroAddress, testProtocolName, testStreamName, testStreamVersion)
records, err := recorder.Records(swarm.ZeroAddress, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
t.Fatal(err)
}
......@@ -261,7 +261,7 @@ func TestRecorder_withMiddlewares(t *testing.T) {
)
request := func(ctx context.Context, s p2p.Streamer, address swarm.Address) error {
stream, err := s.NewStream(ctx, address, testProtocolName, testStreamName, testStreamVersion)
stream, err := s.NewStream(ctx, address, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
......@@ -284,7 +284,7 @@ func TestRecorder_withMiddlewares(t *testing.T) {
t.Fatal(err)
}
records, err := recorder.Records(swarm.ZeroAddress, testProtocolName, testStreamName, testStreamVersion)
records, err := recorder.Records(swarm.ZeroAddress, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
t.Fatal(err)
}
......@@ -323,7 +323,7 @@ func TestRecorder_recordErr(t *testing.T) {
)
request := func(ctx context.Context, s p2p.Streamer, address swarm.Address) (err error) {
stream, err := s.NewStream(ctx, address, testProtocolName, testStreamName, testStreamVersion)
stream, err := s.NewStream(ctx, address, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
......@@ -342,7 +342,7 @@ func TestRecorder_recordErr(t *testing.T) {
t.Fatal(err)
}
records, err := recorder.Records(swarm.ZeroAddress, testProtocolName, testStreamName, testStreamVersion)
records, err := recorder.Records(swarm.ZeroAddress, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
t.Fatal(err)
}
......@@ -357,17 +357,17 @@ func TestRecorder_recordErr(t *testing.T) {
const (
testProtocolName = "testing"
testProtocolVersion = "1.0.1"
testStreamName = "messages"
testStreamVersion = "1.0.1"
)
func newTestProtocol(h p2p.HandlerFunc) p2p.ProtocolSpec {
return p2p.ProtocolSpec{
Name: testProtocolName,
Version: testProtocolVersion,
StreamSpecs: []p2p.StreamSpec{
{
Name: testStreamName,
Version: testStreamVersion,
Handler: h,
},
},
......
......@@ -19,8 +19,8 @@ import (
const (
protocolName = "pingpong"
protocolVersion = "1.0.0"
streamName = "pingpong"
streamVersion = "1.0.0"
)
type Interface interface {
......@@ -49,10 +49,10 @@ func New(o Options) *Service {
func (s *Service) Protocol() p2p.ProtocolSpec {
return p2p.ProtocolSpec{
Name: protocolName,
Version: protocolVersion,
StreamSpecs: []p2p.StreamSpec{
{
Name: streamName,
Version: streamVersion,
Handler: s.Handler,
},
},
......@@ -61,7 +61,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
func (s *Service) Ping(ctx context.Context, address swarm.Address, msgs ...string) (rtt time.Duration, err error) {
start := time.Now()
stream, err := s.streamer.NewStream(ctx, address, protocolName, streamName, streamVersion)
stream, err := s.streamer.NewStream(ctx, address, protocolName, protocolVersion, streamName)
if err != nil {
return 0, fmt.Errorf("new stream: %w", err)
}
......
......@@ -8,12 +8,13 @@ import (
"bytes"
"context"
"fmt"
"github.com/ethersphere/bee/pkg/swarm"
"io/ioutil"
"runtime"
"testing"
"time"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
......@@ -64,7 +65,7 @@ func TestPing(t *testing.T) {
}
// get a record for this stream
records, err := recorder.Records(addr, "pingpong", "pingpong", "1.0.0")
records, err := recorder.Records(addr, "pingpong", "1.0.0", "pingpong")
if err != nil {
t.Fatal(err)
}
......
......@@ -18,8 +18,8 @@ import (
const (
protocolName = "retrieval"
protocolVersion = "1.0.0"
streamName = "retrieval"
streamVersion = "1.0.0"
)
type Service struct {
......@@ -51,10 +51,10 @@ func New(o Options) *Service {
func (s *Service) Protocol() p2p.ProtocolSpec {
return p2p.ProtocolSpec{
Name: protocolName,
Version: protocolVersion,
StreamSpecs: []p2p.StreamSpec{
{
Name: streamName,
Version: streamVersion,
Handler: s.Handler,
},
},
......@@ -66,7 +66,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (data [
if err != nil {
return nil, err
}
stream, err := s.streamer.NewStream(ctx, peerID, protocolName, streamName, streamVersion)
stream, err := s.streamer.NewStream(ctx, peerID, protocolName, protocolVersion, streamName)
if err != nil {
return nil, fmt.Errorf("new stream: %w", err)
}
......
......@@ -72,7 +72,7 @@ func TestDelivery(t *testing.T) {
t.Fatalf("request and response data not equal. got %s want %s", v, reqData)
}
peerID, _ := ps.SuggestPeer(swarm.ZeroAddress)
records, err := recorder.Records(peerID, "retrieval", "retrieval", "1.0.0")
records, err := recorder.Records(peerID, "retrieval", "1.0.0", "retrieval")
if err != nil {
t.Fatal(err)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment