Commit 13f6f727 authored by Petar Radovic's avatar Petar Radovic Committed by GitHub

use swarm.Address for overlay node addressing (#6)

parent 6c4d1833
......@@ -11,6 +11,7 @@ import (
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/gorilla/mux"
)
......@@ -22,7 +23,13 @@ func (s *server) pingpongHandler(w http.ResponseWriter, r *http.Request) {
peerID := mux.Vars(r)["peer-id"]
ctx := r.Context()
rtt, err := s.Pingpong.Ping(ctx, peerID, "hey", "there", ",", "how are", "you", "?")
address, err := swarm.ParseHexAddress(peerID)
if err != nil {
s.Logger.Debugf("pingpong: ping %s: %v", peerID, err)
jsonhttp.BadRequest(w, "invalid peer address")
}
rtt, err := s.Pingpong.Ping(ctx, address, "hey", "there", ",", "how are", "you", "?")
if err != nil {
s.Logger.Debugf("pingpong: ping %s: %v", peerID, err)
if errors.Is(err, p2p.ErrPeerNotFound) {
......
......@@ -16,20 +16,21 @@ import (
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/p2p"
pingpongmock "github.com/ethersphere/bee/pkg/pingpong/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
func TestPingpong(t *testing.T) {
rtt := time.Minute
peerID := "124762324"
unknownPeerID := "55555555"
errorPeerID := "77777777"
peerID, _ := swarm.ParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
unknownPeerID, _ := swarm.ParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59e")
errorPeerID, _ := swarm.ParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59a")
testErr := errors.New("test error")
pingpongService := pingpongmock.New(func(ctx context.Context, address string, msgs ...string) (time.Duration, error) {
if address == errorPeerID {
pingpongService := pingpongmock.New(func(ctx context.Context, address swarm.Address, msgs ...string) (time.Duration, error) {
if address.Equal(errorPeerID) {
return 0, testErr
}
if address != peerID {
if !address.Equal(peerID) {
return 0, p2p.ErrPeerNotFound
}
return rtt, nil
......@@ -41,27 +42,27 @@ func TestPingpong(t *testing.T) {
defer cleanup()
t.Run("ok", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodPost, "/pingpong/"+peerID, nil, http.StatusOK, api.PingpongResponse{
jsonhttptest.ResponseDirect(t, client, http.MethodPost, "/pingpong/"+peerID.String(), nil, http.StatusOK, api.PingpongResponse{
RTT: rtt,
})
})
t.Run("peer not found", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodPost, "/pingpong/"+unknownPeerID, nil, http.StatusNotFound, jsonhttp.StatusResponse{
jsonhttptest.ResponseDirect(t, client, http.MethodPost, "/pingpong/"+unknownPeerID.String(), nil, http.StatusNotFound, jsonhttp.StatusResponse{
Code: http.StatusNotFound,
Message: "peer not found",
})
})
t.Run("error", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodPost, "/pingpong/"+errorPeerID, nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
jsonhttptest.ResponseDirect(t, client, http.MethodPost, "/pingpong/"+errorPeerID.String(), nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
Code: http.StatusInternalServerError,
Message: http.StatusText(http.StatusInternalServerError), // do not leek internal error
})
})
t.Run("get method not allowed", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodGet, "/pingpong/"+peerID, nil, http.StatusMethodNotAllowed, jsonhttp.StatusResponse{
jsonhttptest.ResponseDirect(t, client, http.MethodGet, "/pingpong/"+peerID.String(), nil, http.StatusMethodNotAllowed, jsonhttp.StatusResponse{
Code: http.StatusMethodNotAllowed,
Message: http.StatusText(http.StatusMethodNotAllowed),
})
......
......@@ -10,6 +10,7 @@ import (
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/gorilla/mux"
"github.com/multiformats/go-multiaddr"
)
......@@ -35,14 +36,20 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) {
}
jsonhttp.OK(w, peerConnectResponse{
Address: address,
Address: address.String(),
})
}
func (s *server) peerDisconnectHandler(w http.ResponseWriter, r *http.Request) {
addr := mux.Vars(r)["address"]
swarmAddr, err := swarm.ParseHexAddress(addr)
if err != nil {
s.Logger.Debugf("debug api: peer disconnect %s: %v", addr, err)
jsonhttp.BadRequest(w, "invalid peer address")
return
}
if err := s.P2P.Disconnect(addr); err != nil {
if err := s.P2P.Disconnect(swarmAddr); err != nil {
s.Logger.Debugf("debug api: peer disconnect %s: %v", addr, err)
if errors.Is(err, p2p.ErrPeerNotFound) {
jsonhttp.BadRequest(w, "peer not found")
......
......@@ -15,19 +15,20 @@ import (
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
func TestConnect(t *testing.T) {
underlay := "/ip4/127.0.0.1/tcp/7070/p2p/16Uiu2HAkx8ULY8cTXhdVAcMmLcH9AsTKz6uBQ7DPLKRjMLgBVYkS"
errorUnderlay := "/ip4/127.0.0.1/tcp/7070/p2p/16Uiu2HAkw88cjH2orYrB6fDui4eUNdmgkwnDM8W681UbfsPgM9QY"
overlay := "985732527402"
overlay, _ := swarm.ParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
testErr := errors.New("test error")
client, cleanup := newTestServer(t, testServerOptions{
P2P: mock.New(mock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (string, error) {
P2P: mock.New(mock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) {
if addr.String() == errorUnderlay {
return "", testErr
return swarm.Address{}, testErr
}
return overlay, nil
})),
......@@ -36,7 +37,7 @@ func TestConnect(t *testing.T) {
t.Run("ok", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodPost, "/connect"+underlay, nil, http.StatusOK, debugapi.PeerConnectResponse{
Address: overlay,
Address: overlay.String(),
})
})
......@@ -56,41 +57,42 @@ func TestConnect(t *testing.T) {
}
func TestDisconnect(t *testing.T) {
address := "985732527402"
unknwonAdddress := "123456"
errorAddress := "33333333"
address, _ := swarm.ParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
unknownAdddress, _ := swarm.ParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59e")
errorAddress, _ := swarm.ParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59a")
testErr := errors.New("test error")
client, cleanup := newTestServer(t, testServerOptions{
P2P: mock.New(mock.WithDisconnectFunc(func(addr string) error {
switch addr {
case address:
P2P: mock.New(mock.WithDisconnectFunc(func(addr swarm.Address) error {
if addr.Equal(address) {
return nil
case errorAddress:
}
if addr.Equal(errorAddress) {
return testErr
default:
return p2p.ErrPeerNotFound
}
return p2p.ErrPeerNotFound
})),
})
defer cleanup()
t.Run("ok", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodDelete, "/peers/"+address, nil, http.StatusOK, jsonhttp.StatusResponse{
jsonhttptest.ResponseDirect(t, client, http.MethodDelete, "/peers/"+address.String(), nil, http.StatusOK, jsonhttp.StatusResponse{
Code: http.StatusOK,
Message: http.StatusText(http.StatusOK),
})
})
t.Run("unknown", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodDelete, "/peers/"+unknwonAdddress, nil, http.StatusBadRequest, jsonhttp.StatusResponse{
jsonhttptest.ResponseDirect(t, client, http.MethodDelete, "/peers/"+unknownAdddress.String(), nil, http.StatusBadRequest, jsonhttp.StatusResponse{
Code: http.StatusBadRequest,
Message: "peer not found",
})
})
t.Run("error", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodDelete, "/peers/"+errorAddress, nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
jsonhttptest.ResponseDirect(t, client, http.MethodDelete, "/peers/"+errorAddress.String(), nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
Code: http.StatusInternalServerError,
Message: testErr.Error(),
})
......
......@@ -10,6 +10,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake/pb"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/swarm"
)
const (
......@@ -19,12 +20,12 @@ const (
)
type Service struct {
overlay string
overlay swarm.Address
networkID int32
logger logging.Logger
}
func New(overlay string, networkID int32, logger logging.Logger) *Service {
func New(overlay swarm.Address, networkID int32, logger logging.Logger) *Service {
return &Service{
overlay: overlay,
networkID: networkID,
......@@ -36,7 +37,7 @@ func (s *Service) Handshake(stream p2p.Stream) (i *Info, err error) {
w, r := protobuf.NewWriterAndReader(stream)
var resp pb.ShakeHandAck
if err := w.WriteMsg(&pb.ShakeHand{
Address: s.overlay,
Address: s.overlay.Bytes(),
NetworkID: s.networkID,
}); err != nil {
return nil, fmt.Errorf("write message: %w", err)
......@@ -53,7 +54,7 @@ func (s *Service) Handshake(stream p2p.Stream) (i *Info, err error) {
s.logger.Tracef("handshake finished for peer %s", resp.ShakeHand.Address)
return &Info{
Address: resp.ShakeHand.Address,
Address: swarm.NewAddress(resp.ShakeHand.Address),
NetworkID: resp.ShakeHand.NetworkID,
Light: resp.ShakeHand.Light,
}, nil
......@@ -70,7 +71,7 @@ func (s *Service) Handle(stream p2p.Stream) (i *Info, err error) {
if err := w.WriteMsg(&pb.ShakeHandAck{
ShakeHand: &pb.ShakeHand{
Address: s.overlay,
Address: s.overlay.Bytes(),
NetworkID: s.networkID,
},
Ack: &pb.Ack{Address: req.Address},
......@@ -85,14 +86,19 @@ func (s *Service) Handle(stream p2p.Stream) (i *Info, err error) {
s.logger.Tracef("handshake finished for peer %s", req.Address)
return &Info{
Address: req.Address,
Address: swarm.NewAddress(req.Address),
NetworkID: req.NetworkID,
Light: req.Light,
}, nil
}
type Info struct {
Address string
Address swarm.Address
NetworkID int32
Light bool
}
// Equal returns true if two info objects are identical.
func (a Info) Equal(b Info) bool {
return a.Address.Equal(b.Address) && a.NetworkID == b.NetworkID && a.Light == b.Light
}
......@@ -7,6 +7,7 @@ import (
"bytes"
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/swarm"
"io/ioutil"
"testing"
......@@ -61,9 +62,11 @@ func (s *StreamMock) Close() error {
}
func TestHandshake(t *testing.T) {
node1Addr, _ := swarm.ParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
node2Addr, _ := swarm.ParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59b")
logger := logging.New(ioutil.Discard, 0)
info := Info{
Address: "node1",
Address: node1Addr,
NetworkID: 0,
Light: false,
}
......@@ -71,7 +74,7 @@ func TestHandshake(t *testing.T) {
t.Run("OK", func(t *testing.T) {
expectedInfo := Info{
Address: "node2",
Address: node2Addr,
NetworkID: 1,
Light: false,
}
......@@ -84,11 +87,11 @@ func TestHandshake(t *testing.T) {
w, r := protobuf.NewWriterAndReader(stream2)
if err := w.WriteMsg(&pb.ShakeHandAck{
ShakeHand: &pb.ShakeHand{
Address: expectedInfo.Address,
Address: expectedInfo.Address.Bytes(),
NetworkID: expectedInfo.NetworkID,
Light: expectedInfo.Light,
},
Ack: &pb.Ack{Address: info.Address},
Ack: &pb.Ack{Address: info.Address.Bytes()},
}); err != nil {
t.Fatal(err)
}
......@@ -98,7 +101,7 @@ func TestHandshake(t *testing.T) {
t.Fatal(err)
}
if *res != expectedInfo {
if !res.Equal(expectedInfo) {
t.Fatalf("got %+v, expected %+v", res, info)
}
......@@ -140,9 +143,8 @@ func TestHandshake(t *testing.T) {
t.Run("ERROR - ack write error ", func(t *testing.T) {
testErr := errors.New("test error")
expectedErr := fmt.Errorf("ack: write message: %w", testErr)
expectedInfo := Info{
Address: "node2",
Address: node2Addr,
NetworkID: 1,
Light: false,
}
......@@ -156,11 +158,11 @@ func TestHandshake(t *testing.T) {
w, _ := protobuf.NewWriterAndReader(stream2)
if err := w.WriteMsg(&pb.ShakeHandAck{
ShakeHand: &pb.ShakeHand{
Address: expectedInfo.Address,
Address: expectedInfo.Address.Bytes(),
NetworkID: expectedInfo.NetworkID,
Light: expectedInfo.Light,
},
Ack: &pb.Ack{Address: info.Address},
Ack: &pb.Ack{Address: info.Address.Bytes()},
}); err != nil {
t.Fatal(err)
}
......@@ -177,8 +179,10 @@ func TestHandshake(t *testing.T) {
}
func TestHandle(t *testing.T) {
node1Addr, _ := swarm.ParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
node2Addr, _ := swarm.ParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59b")
nodeInfo := Info{
Address: "node1",
Address: node1Addr,
NetworkID: 0,
Light: false,
}
......@@ -188,7 +192,7 @@ func TestHandle(t *testing.T) {
t.Run("OK", func(t *testing.T) {
node2Info := Info{
Address: "node2",
Address: node2Addr,
NetworkID: 1,
Light: false,
}
......@@ -200,14 +204,14 @@ func TestHandle(t *testing.T) {
w, _ := protobuf.NewWriterAndReader(stream2)
if err := w.WriteMsg(&pb.ShakeHand{
Address: node2Info.Address,
Address: node2Info.Address.Bytes(),
NetworkID: node2Info.NetworkID,
Light: node2Info.Light,
}); err != nil {
t.Fatal(err)
}
if err := w.WriteMsg(&pb.Ack{Address: node2Info.Address}); err != nil {
if err := w.WriteMsg(&pb.Ack{Address: node2Info.Address.Bytes()}); err != nil {
t.Fatal(err)
}
......@@ -216,7 +220,7 @@ func TestHandle(t *testing.T) {
t.Fatal(err)
}
if *res != node2Info {
if !res.Equal(node2Info) {
t.Fatalf("got %+v, expected %+v", res, node2Info)
}
......@@ -226,7 +230,11 @@ func TestHandle(t *testing.T) {
t.Fatal(err)
}
if nodeInfo != Info(*got.ShakeHand) {
if !nodeInfo.Equal(Info{
Address: swarm.NewAddress(got.ShakeHand.Address),
NetworkID: got.ShakeHand.NetworkID,
Light: got.ShakeHand.Light,
}) {
t.Fatalf("got %+v, expected %+v", got, node2Info)
}
})
......@@ -254,7 +262,7 @@ func TestHandle(t *testing.T) {
stream.setWriteErr(testErr, 1)
w, _ := protobuf.NewWriterAndReader(stream)
if err := w.WriteMsg(&pb.ShakeHand{
Address: "node1",
Address: node1Addr.Bytes(),
NetworkID: 0,
Light: false,
}); err != nil {
......@@ -274,9 +282,8 @@ func TestHandle(t *testing.T) {
t.Run("ERROR - ack read error ", func(t *testing.T) {
testErr := errors.New("test error")
expectedErr := fmt.Errorf("ack: read message: %w", testErr)
node2Info := Info{
Address: "node2",
Address: node2Addr,
NetworkID: 1,
Light: false,
}
......@@ -288,7 +295,7 @@ func TestHandle(t *testing.T) {
stream1.setReadErr(testErr, 1)
w, _ := protobuf.NewWriterAndReader(stream2)
if err := w.WriteMsg(&pb.ShakeHand{
Address: node2Info.Address,
Address: node2Info.Address.Bytes(),
NetworkID: node2Info.NetworkID,
Light: node2Info.Light,
}); err != nil {
......
......@@ -23,7 +23,7 @@ var _ = math.Inf
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type ShakeHand struct {
Address string `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"`
Address []byte `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"`
NetworkID int32 `protobuf:"varint,2,opt,name=NetworkID,proto3" json:"NetworkID,omitempty"`
Light bool `protobuf:"varint,3,opt,name=Light,proto3" json:"Light,omitempty"`
}
......@@ -61,11 +61,11 @@ func (m *ShakeHand) XXX_DiscardUnknown() {
var xxx_messageInfo_ShakeHand proto.InternalMessageInfo
func (m *ShakeHand) GetAddress() string {
func (m *ShakeHand) GetAddress() []byte {
if m != nil {
return m.Address
}
return ""
return nil
}
func (m *ShakeHand) GetNetworkID() int32 {
......@@ -135,7 +135,7 @@ func (m *ShakeHandAck) GetAck() *Ack {
}
type Ack struct {
Address string `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"`
Address []byte `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"`
}
func (m *Ack) Reset() { *m = Ack{} }
......@@ -171,11 +171,11 @@ func (m *Ack) XXX_DiscardUnknown() {
var xxx_messageInfo_Ack proto.InternalMessageInfo
func (m *Ack) GetAddress() string {
func (m *Ack) GetAddress() []byte {
if m != nil {
return m.Address
}
return ""
return nil
}
func init() {
......@@ -191,7 +191,7 @@ var fileDescriptor_a77305914d5d202f = []byte{
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0x48, 0xcc, 0x4b,
0x29, 0xce, 0x48, 0xcc, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52,
0x8a, 0xe4, 0xe2, 0x0c, 0x06, 0x09, 0x79, 0x24, 0xe6, 0xa5, 0x08, 0x49, 0x70, 0xb1, 0x3b, 0xa6,
0xa4, 0x14, 0xa5, 0x16, 0x17, 0x4b, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0xc1, 0xb8, 0x42, 0x32,
0xa4, 0x14, 0xa5, 0x16, 0x17, 0x4b, 0x30, 0x2a, 0x30, 0x6a, 0xf0, 0x04, 0xc1, 0xb8, 0x42, 0x32,
0x5c, 0x9c, 0x7e, 0xa9, 0x25, 0xe5, 0xf9, 0x45, 0xd9, 0x9e, 0x2e, 0x12, 0x4c, 0x0a, 0x8c, 0x1a,
0xac, 0x41, 0x08, 0x01, 0x21, 0x11, 0x2e, 0x56, 0x9f, 0xcc, 0xf4, 0x8c, 0x12, 0x09, 0x66, 0x05,
0x46, 0x0d, 0x8e, 0x20, 0x08, 0x47, 0x29, 0x8c, 0x8b, 0x07, 0x6e, 0xb4, 0x63, 0x72, 0xb6, 0x90,
......@@ -200,7 +200,7 @@ var fileDescriptor_a77305914d5d202f = []byte{
0x9c, 0x1d, 0x04, 0x12, 0x53, 0x92, 0x07, 0x4b, 0xe1, 0x76, 0xac, 0x93, 0xc4, 0x89, 0x47, 0x72,
0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe1, 0xb1, 0x1c, 0xc3, 0x85, 0xc7,
0x72, 0x0c, 0x37, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0x3d, 0x6e, 0x0c, 0x08, 0x00, 0x00, 0xff,
0xff, 0x75, 0x16, 0xc0, 0x1d, 0x0b, 0x01, 0x00, 0x00,
0xff, 0x3e, 0xb7, 0x4c, 0x93, 0x0b, 0x01, 0x00, 0x00,
}
func (m *ShakeHand) Marshal() (dAtA []byte, err error) {
......@@ -424,7 +424,7 @@ func (m *ShakeHand) Unmarshal(dAtA []byte) error {
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Address", wireType)
}
var stringLen uint64
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHandshake
......@@ -434,23 +434,25 @@ func (m *ShakeHand) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
if byteLen < 0 {
return ErrInvalidLengthHandshake
}
postIndex := iNdEx + intStringLen
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthHandshake
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Address = string(dAtA[iNdEx:postIndex])
m.Address = append(m.Address[:0], dAtA[iNdEx:postIndex]...)
if m.Address == nil {
m.Address = []byte{}
}
iNdEx = postIndex
case 2:
if wireType != 0 {
......@@ -673,7 +675,7 @@ func (m *Ack) Unmarshal(dAtA []byte) error {
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Address", wireType)
}
var stringLen uint64
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHandshake
......@@ -683,23 +685,25 @@ func (m *Ack) Unmarshal(dAtA []byte) error {
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
if byteLen < 0 {
return ErrInvalidLengthHandshake
}
postIndex := iNdEx + intStringLen
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthHandshake
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Address = string(dAtA[iNdEx:postIndex])
m.Address = append(m.Address[:0], dAtA[iNdEx:postIndex]...)
if m.Address == nil {
m.Address = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
......
......@@ -6,7 +6,7 @@ syntax = "proto3";
package pb;
message ShakeHand {
string Address = 1;
bytes Address = 1;
int32 NetworkID = 2;
bool Light = 3;
}
......@@ -17,5 +17,5 @@ message ShakeHandAck {
}
message Ack {
string Address = 1;
bytes Address = 1;
}
......@@ -14,12 +14,12 @@ import (
"math/rand"
"net"
"os"
"strconv"
"time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat-svc"
connmgr "github.com/libp2p/go-libp2p-connmgr"
......@@ -189,12 +189,13 @@ func New(ctx context.Context, o Options) (*Service, error) {
// This is just a temporary way to generate an overlay address.
// TODO: proper key management and overlay address generation
overlay := strconv.Itoa(rand.Int())
overlay := make([]byte, 32)
rand.Read(overlay)
s := &Service{
host: h,
metrics: newMetrics(),
networkID: o.NetworkID,
handshakeService: handshake.New(overlay, o.NetworkID, o.Logger),
handshakeService: handshake.New(swarm.NewAddress(overlay), o.NetworkID, o.Logger),
peers: newPeerRegistry(),
logger: o.Logger,
}
......@@ -298,29 +299,29 @@ func (s *Service) Addresses() (addrs []string, err error) {
return addrs, nil
}
func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay string, err error) {
func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error) {
// Extract the peer ID from the multiaddr.
info, err := libp2ppeer.AddrInfoFromP2pAddr(addr)
if err != nil {
return "", err
return swarm.Address{}, err
}
if err := s.host.Connect(ctx, *info); err != nil {
return "", err
return swarm.Address{}, err
}
stream, err := s.newStreamForPeerID(ctx, info.ID, handshake.ProtocolName, handshake.StreamName, handshake.StreamVersion)
if err != nil {
return "", fmt.Errorf("new stream: %w", err)
return swarm.Address{}, fmt.Errorf("new stream: %w", err)
}
defer stream.Close()
i, err := s.handshakeService.Handshake(stream)
if err != nil {
return "", err
return swarm.Address{}, err
}
if i.NetworkID != s.networkID {
return "", fmt.Errorf("invalid network id %v", i.NetworkID)
return swarm.Address{}, fmt.Errorf("invalid network id %v", i.NetworkID)
}
s.peers.add(info.ID, i.Address)
......@@ -329,7 +330,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay strin
return i.Address, nil
}
func (s *Service) Disconnect(overlay string) error {
func (s *Service) Disconnect(overlay swarm.Address) error {
peerID, found := s.peers.peerID(overlay)
if !found {
return p2p.ErrPeerNotFound
......@@ -341,7 +342,7 @@ func (s *Service) Disconnect(overlay string) error {
return nil
}
func (s *Service) NewStream(ctx context.Context, overlay, protocolName, streamName, version string) (p2p.Stream, error) {
func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, protocolName, streamName, version string) (p2p.Stream, error) {
peerID, found := s.peers.peerID(overlay)
if !found {
return nil, p2p.ErrPeerNotFound
......
......@@ -7,6 +7,7 @@ package libp2p
import (
"sync"
"github.com/ethersphere/bee/pkg/swarm"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
)
......@@ -23,25 +24,25 @@ func newPeerRegistry() *peerRegistry {
}
}
func (r *peerRegistry) add(peerID libp2ppeer.ID, overlay string) {
func (r *peerRegistry) add(peerID libp2ppeer.ID, overlay swarm.Address) {
r.mu.Lock()
r.peers[overlay] = peerID
r.overlays[peerID] = overlay
r.peers[string(overlay.Bytes())] = peerID
r.overlays[peerID] = string(overlay.Bytes())
r.mu.Unlock()
}
func (r *peerRegistry) peerID(overlay string) (peerID libp2ppeer.ID, found bool) {
func (r *peerRegistry) peerID(overlay swarm.Address) (peerID libp2ppeer.ID, found bool) {
r.mu.RLock()
peerID, found = r.peers[overlay]
peerID, found = r.peers[string(overlay.Bytes())]
r.mu.RUnlock()
return peerID, found
}
func (r *peerRegistry) overlay(peerID libp2ppeer.ID) (overlay string, found bool) {
func (r *peerRegistry) overlay(peerID libp2ppeer.ID) (swarm.Address, bool) {
r.mu.RLock()
overlay, found = r.overlays[peerID]
overlay, found := r.overlays[peerID]
r.mu.RUnlock()
return overlay, found
return swarm.NewAddress([]byte(overlay)), found
}
func (r *peerRegistry) remove(peerID libp2ppeer.ID) {
......
......@@ -9,13 +9,14 @@ import (
"errors"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
type Service struct {
addProtocolFunc func(p2p.ProtocolSpec) error
connectFunc func(ctx context.Context, addr ma.Multiaddr) (overlay string, err error)
disconnectFunc func(overlay string) error
connectFunc func(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error)
disconnectFunc func(overlay swarm.Address) error
}
func WithAddProtocolFunc(f func(p2p.ProtocolSpec) error) Option {
......@@ -24,13 +25,13 @@ func WithAddProtocolFunc(f func(p2p.ProtocolSpec) error) Option {
})
}
func WithConnectFunc(f func(ctx context.Context, addr ma.Multiaddr) (overlay string, err error)) Option {
func WithConnectFunc(f func(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error)) Option {
return optionFunc(func(s *Service) {
s.connectFunc = f
})
}
func WithDisconnectFunc(f func(overlay string) error) Option {
func WithDisconnectFunc(f func(overlay swarm.Address) error) Option {
return optionFunc(func(s *Service) {
s.disconnectFunc = f
})
......@@ -51,14 +52,14 @@ func (s *Service) AddProtocol(spec p2p.ProtocolSpec) error {
return s.addProtocolFunc(spec)
}
func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay string, err error) {
func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error) {
if s.connectFunc == nil {
return "", errors.New("Connect function not configured")
return swarm.Address{}, errors.New("Connect function not configured")
}
return s.connectFunc(ctx, addr)
}
func (s *Service) Disconnect(overlay string) error {
func (s *Service) Disconnect(overlay swarm.Address) error {
if s.disconnectFunc == nil {
return errors.New("Disconnect function not configured")
}
......
......@@ -9,17 +9,18 @@ import (
"fmt"
"io"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
type Service interface {
AddProtocol(ProtocolSpec) error
Connect(ctx context.Context, addr ma.Multiaddr) (overlay string, err error)
Disconnect(overlay string) error
Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error)
Disconnect(overlay swarm.Address) error
}
type Streamer interface {
NewStream(ctx context.Context, address, protocol, stream, version string) (Stream, error)
NewStream(ctx context.Context, address swarm.Address, protocol, stream, version string) (Stream, error)
}
type Stream interface {
......
......@@ -4,10 +4,14 @@
package p2p
import "errors"
import (
"errors"
"github.com/ethersphere/bee/pkg/swarm"
)
type Peer struct {
Address string
Address swarm.Address
}
var ErrPeerNotFound = errors.New("peer not found")
......@@ -7,6 +7,7 @@ package streamtest
import (
"context"
"fmt"
"github.com/ethersphere/bee/pkg/swarm"
"io"
"sync"
......@@ -42,7 +43,7 @@ func New(opts ...Option) *Recorder {
return r
}
func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamName, version string) (p2p.Stream, error) {
func (r *Recorder) NewStream(_ context.Context, overlay swarm.Address, protocolName, streamName, version string) (p2p.Stream, error) {
recordIn := newRecord()
recordOut := newRecord()
streamOut := newStream(recordIn, recordOut)
......@@ -70,7 +71,7 @@ func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamNam
}
}()
id := overlay + p2p.NewSwarmStreamName(protocolName, streamName, version)
id := overlay.String() + p2p.NewSwarmStreamName(protocolName, streamName, version)
r.recordsMu.Lock()
defer r.recordsMu.Unlock()
......
......@@ -6,17 +6,18 @@ package mock
import (
"context"
"github.com/ethersphere/bee/pkg/swarm"
"time"
)
type Service struct {
pingFunc func(ctx context.Context, address string, msgs ...string) (rtt time.Duration, err error)
pingFunc func(ctx context.Context, address swarm.Address, msgs ...string) (rtt time.Duration, err error)
}
func New(pingFunc func(ctx context.Context, address string, msgs ...string) (rtt time.Duration, err error)) *Service {
func New(pingFunc func(ctx context.Context, address swarm.Address, msgs ...string) (rtt time.Duration, err error)) *Service {
return &Service{pingFunc: pingFunc}
}
func (s *Service) Ping(ctx context.Context, address string, msgs ...string) (rtt time.Duration, err error) {
func (s *Service) Ping(ctx context.Context, address swarm.Address, msgs ...string) (rtt time.Duration, err error) {
return s.pingFunc(ctx, address, msgs...)
}
......@@ -14,6 +14,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/pingpong/pb"
"github.com/ethersphere/bee/pkg/swarm"
)
const (
......@@ -23,7 +24,7 @@ const (
)
type Interface interface {
Ping(ctx context.Context, address string, msgs ...string) (rtt time.Duration, err error)
Ping(ctx context.Context, address swarm.Address, msgs ...string) (rtt time.Duration, err error)
}
type Service struct {
......@@ -58,7 +59,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
}
}
func (s *Service) Ping(ctx context.Context, address string, msgs ...string) (rtt time.Duration, err error) {
func (s *Service) Ping(ctx context.Context, address swarm.Address, msgs ...string) (rtt time.Duration, err error) {
start := time.Now()
stream, err := s.streamer.NewStream(ctx, address, protocolName, streamName, streamVersion)
if err != nil {
......
......@@ -8,6 +8,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/ethersphere/bee/pkg/swarm"
"io/ioutil"
"runtime"
"testing"
......@@ -50,9 +51,10 @@ func TestPing(t *testing.T) {
})
// ping
peerID := "124"
peerID := "ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"
peerIDAddress, _ := swarm.ParseHexAddress(peerID)
greetings := []string{"hey", "there", "fella"}
rtt, err := client.Ping(context.Background(), peerID, greetings...)
rtt, err := client.Ping(context.Background(), peerIDAddress, greetings...)
if err != nil {
t.Fatal(err)
}
......
......@@ -44,5 +44,10 @@ func (a Address) IsZero() bool {
return a.Equal(ZeroAddress)
}
// Bytes returns
func (a Address) Bytes() []byte {
return a.b
}
// ZeroAddress is the address that has no value.
var ZeroAddress = NewAddress(nil)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment