Commit a593c574 authored by Svetomir Smiljkovic's avatar Svetomir Smiljkovic

Merge from upstream

parents 0add8ce6 1971bae2
...@@ -10,7 +10,7 @@ builds: ...@@ -10,7 +10,7 @@ builds:
- -trimpath - -trimpath
ldflags: ldflags:
- -s -w -X github.com/janos/bee.version={{.Version}} -X github.com/janos/bee.commit={{.ShortCommit}} - -s -w -X github.com/ethersphere/bee.version={{.Version}} -X github.com/ethersphere/bee.commit={{.ShortCommit}}
env: env:
- CGO_ENABLED=0 - CGO_ENABLED=0
......
...@@ -3,10 +3,14 @@ COMMIT ?= "" ...@@ -3,10 +3,14 @@ COMMIT ?= ""
GO ?= go GO ?= go
GOLANGCI_LINT ?= golangci-lint GOLANGCI_LINT ?= golangci-lint
<<<<<<< HEAD
LDFLAGS ?= -s -w -X github.com/janos/bee.commit="$(COMMIT)" LDFLAGS ?= -s -w -X github.com/janos/bee.commit="$(COMMIT)"
=======
LDFLAGS ?= -s -w -X github.com/ethersphere/bee.commit="$(COMMIT)"
>>>>>>> 1971bae2015d482f60af4e87b73d1999abc90157
.PHONY: all .PHONY: all
all: lint vet test binary all: build lint vet test binary
.PHONY: binary .PHONY: binary
binary: export CGO_ENABLED=0 binary: export CGO_ENABLED=0
......
...@@ -11,8 +11,6 @@ make binary ...@@ -11,8 +11,6 @@ make binary
cp dist/bee /usr/local/bin/bee cp dist/bee /usr/local/bin/bee
``` ```
Docker image `janos/bee`.
## Usage (experimental api) ## Usage (experimental api)
Execute the command terminals to start `node 1`: Execute the command terminals to start `node 1`:
...@@ -27,10 +25,10 @@ Use one of the multiaddresses as bootnode for `node 2` in order to connect them: ...@@ -27,10 +25,10 @@ Use one of the multiaddresses as bootnode for `node 2` in order to connect them:
bee start --api-addr :8502 --p2p-addr :30402 --data-dir data2 --bootnode /ip4/127.0.0.1/tcp/30401/p2p/QmT4TNB4cKYanUjdYodw1Cns8cuVaRVo24hHNYcT7JjkTB bee start --api-addr :8502 --p2p-addr :30402 --data-dir data2 --bootnode /ip4/127.0.0.1/tcp/30401/p2p/QmT4TNB4cKYanUjdYodw1Cns8cuVaRVo24hHNYcT7JjkTB
``` ```
Use the last part of `node 1` multiaddress to ping it using `node 2` by making an HTTP request to `localhost:{PORT2}/pingpong/{ID1}` like: Take the address of the connected peer to `node 1` from log line `peer "4932309428148935717" connected` and make an HTTP request to `localhost:{PORT1}/pingpong/{ADDRESS}` like:
```sh ```sh
curl localhost:8502/pingpong/QmT4TNB4cKYanUjdYodw1Cns8cuVaRVo24hHNYcT7JjkTB curl localhost:8502/pingpong/4932309428148935717
``` ```
## Structure ## Structure
...@@ -52,6 +50,5 @@ curl localhost:8502/pingpong/QmT4TNB4cKYanUjdYodw1Cns8cuVaRVo24hHNYcT7JjkTB ...@@ -52,6 +50,5 @@ curl localhost:8502/pingpong/QmT4TNB4cKYanUjdYodw1Cns8cuVaRVo24hHNYcT7JjkTB
## TODO ## TODO
- P2P mock (protocol tester) implementation improvements - P2P mock (protocol tester) implementation improvements
- Overlay addressing in libp2p (provide overlay address in p2p.Peer)
- Figure out routing (whether to use libp2p Routing or to abstract hive on top of p2p package) - Figure out routing (whether to use libp2p Routing or to abstract hive on top of p2p package)
- Instrumentation: logging, metrics, tracing, pprof... - Instrumentation: logging, metrics, tracing, pprof...
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
"os" "os"
"testing" "testing"
"github.com/janos/bee/cmd/bee/cmd" "github.com/ethersphere/bee/cmd/bee/cmd"
) )
var homeDir string var homeDir string
......
...@@ -19,9 +19,9 @@ import ( ...@@ -19,9 +19,9 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/janos/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/janos/bee/pkg/node" "github.com/ethersphere/bee/pkg/node"
"github.com/janos/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/p2p/libp2p"
) )
func (c *command) initStartCmd() (err error) { func (c *command) initStartCmd() (err error) {
...@@ -89,10 +89,11 @@ func (c *command) initStartCmd() (err error) { ...@@ -89,10 +89,11 @@ func (c *command) initStartCmd() (err error) {
DisableWS: c.config.GetBool(optionNameP2PDisableWS), DisableWS: c.config.GetBool(optionNameP2PDisableWS),
DisableQUIC: c.config.GetBool(optionNameP2PDisableQUIC), DisableQUIC: c.config.GetBool(optionNameP2PDisableQUIC),
Bootnodes: c.config.GetStringSlice(optionNameBootnodes), Bootnodes: c.config.GetStringSlice(optionNameBootnodes),
NetworkID: c.config.GetInt(optionNameNetworkID), NetworkID: c.config.GetInt32(optionNameNetworkID),
ConnectionsLow: c.config.GetInt(optionNameConnectionsLow), ConnectionsLow: c.config.GetInt(optionNameConnectionsLow),
ConnectionsHigh: c.config.GetInt(optionNameConnectionsHigh), ConnectionsHigh: c.config.GetInt(optionNameConnectionsHigh),
ConnectionsGrace: c.config.GetDuration(optionNameConnectionsGrace), ConnectionsGrace: c.config.GetDuration(optionNameConnectionsGrace),
Logger: logger,
}, },
Logger: logger, Logger: logger,
}) })
...@@ -142,7 +143,7 @@ func (c *command) initStartCmd() (err error) { ...@@ -142,7 +143,7 @@ func (c *command) initStartCmd() (err error) {
cmd.Flags().Bool(optionNameP2PDisableQUIC, false, "disable P2P QUIC protocol") cmd.Flags().Bool(optionNameP2PDisableQUIC, false, "disable P2P QUIC protocol")
cmd.Flags().StringSlice(optionNameBootnodes, nil, "initial nodes to connect to") cmd.Flags().StringSlice(optionNameBootnodes, nil, "initial nodes to connect to")
cmd.Flags().String(optionNameDebugAPIAddr, "", "debug HTTP API listen address, e.g. 127.0.0.1:6060") cmd.Flags().String(optionNameDebugAPIAddr, "", "debug HTTP API listen address, e.g. 127.0.0.1:6060")
cmd.Flags().Int(optionNameNetworkID, 1, "ID of the Swarm network") cmd.Flags().Int32(optionNameNetworkID, 1, "ID of the Swarm network")
cmd.Flags().Int(optionNameConnectionsLow, 200, "low watermark governing the number of connections that'll be maintained") cmd.Flags().Int(optionNameConnectionsLow, 200, "low watermark governing the number of connections that'll be maintained")
cmd.Flags().Int(optionNameConnectionsHigh, 400, "high watermark governing the number of connections that'll be maintained") cmd.Flags().Int(optionNameConnectionsHigh, 400, "high watermark governing the number of connections that'll be maintained")
cmd.Flags().Duration(optionNameConnectionsGrace, time.Minute, "the amount of time a newly opened connection is given before it becomes subject to pruning") cmd.Flags().Duration(optionNameConnectionsGrace, time.Minute, "the amount of time a newly opened connection is given before it becomes subject to pruning")
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
package cmd package cmd
import ( import (
"github.com/janos/bee" "github.com/ethersphere/bee"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
......
...@@ -8,8 +8,8 @@ import ( ...@@ -8,8 +8,8 @@ import (
"bytes" "bytes"
"testing" "testing"
"github.com/janos/bee" "github.com/ethersphere/bee"
"github.com/janos/bee/cmd/bee/cmd" "github.com/ethersphere/bee/cmd/bee/cmd"
) )
func TestVersionCmd(t *testing.T) { func TestVersionCmd(t *testing.T) {
......
...@@ -8,7 +8,7 @@ import ( ...@@ -8,7 +8,7 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/janos/bee/cmd/bee/cmd" "github.com/ethersphere/bee/cmd/bee/cmd"
) )
func main() { func main() {
......
module github.com/janos/bee module github.com/ethersphere/bee
go 1.13 go 1.13
......
...@@ -7,8 +7,8 @@ package api ...@@ -7,8 +7,8 @@ package api
import ( import (
"net/http" "net/http"
"github.com/janos/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/janos/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/pingpong"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
......
...@@ -8,7 +8,7 @@ import ( ...@@ -8,7 +8,7 @@ import (
"net/http" "net/http"
"time" "time"
m "github.com/janos/bee/pkg/metrics" m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
package debugapi package debugapi
import ( import (
"github.com/janos/bee" "github.com/ethersphere/bee"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
......
...@@ -15,10 +15,10 @@ import ( ...@@ -15,10 +15,10 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/janos/bee/pkg/api" "github.com/ethersphere/bee/pkg/api"
"github.com/janos/bee/pkg/debugapi" "github.com/ethersphere/bee/pkg/debugapi"
"github.com/janos/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/janos/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/pingpong"
) )
type Bee struct { type Bee struct {
......
...@@ -8,11 +8,9 @@ package handshake ...@@ -8,11 +8,9 @@ package handshake
import ( import (
"fmt" "fmt"
"io"
"log"
"github.com/janos/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/janos/bee/pkg/p2p/protobuf" "github.com/ethersphere/bee/pkg/p2p/protobuf"
) )
const ( const (
...@@ -23,52 +21,72 @@ const ( ...@@ -23,52 +21,72 @@ const (
type Service struct { type Service struct {
overlay string overlay string
networkID int32
logger Logger
} }
func New(overlay string) *Service { func New(overlay string, networkID int32, logger Logger) *Service {
return &Service{overlay: overlay} return &Service{
overlay: overlay,
networkID: networkID,
logger: logger,
}
}
type Logger interface {
Tracef(format string, args ...interface{})
} }
func (s *Service) Handshake(stream p2p.Stream) (overlay string, err error) { func (s *Service) Handshake(stream p2p.Stream) (i *Info, err error) {
w, r := protobuf.NewWriterAndReader(stream) w, r := protobuf.NewWriterAndReader(stream)
var resp ShakeHand var resp ShakeHand
if err := w.WriteMsg(&ShakeHand{Address: s.overlay}); err != nil { if err := w.WriteMsg(&ShakeHand{
return "", fmt.Errorf("handshake handler: write message: %v\n", err) Address: s.overlay,
NetworkID: s.networkID,
}); err != nil {
return nil, fmt.Errorf("handshake handler: write message: %w", err)
} }
log.Printf("sent handshake req %s\n", s.overlay) s.logger.Tracef("handshake: sent request %s", s.overlay)
if err := r.ReadMsg(&resp); err != nil { if err := r.ReadMsg(&resp); err != nil {
if err == io.EOF { return nil, fmt.Errorf("handshake handler: read message: %w", err)
return "", nil
} }
return "", fmt.Errorf("handshake handler: read message: %v\n", err) s.logger.Tracef("handshake: read response: %s", resp.Address)
} return &Info{
Address: resp.Address,
log.Printf("read handshake resp: %s\n", resp.Address) NetworkID: resp.NetworkID,
return resp.Address, nil Light: resp.Light,
}, nil
} }
func (s *Service) Handler(stream p2p.Stream) string { func (s *Service) Handle(stream p2p.Stream) (i *Info, err error) {
w, r := protobuf.NewWriterAndReader(stream) w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close() defer stream.Close()
var req ShakeHand var req ShakeHand
if err := r.ReadMsg(&req); err != nil { if err := r.ReadMsg(&req); err != nil {
if err == io.EOF { return nil, fmt.Errorf("read message: %w", err)
return ""
}
log.Printf("handshake handler: read message: %v\n", err)
return ""
} }
log.Printf("received handshake req %s\n", req.Address) s.logger.Tracef("handshake: received request %s", req.Address)
if err := w.WriteMsg(&ShakeHand{ if err := w.WriteMsg(&ShakeHand{
Address: s.overlay, Address: s.overlay,
NetworkID: s.networkID,
}); err != nil { }); err != nil {
log.Printf("handshake handler: write message: %v\n", err) return nil, fmt.Errorf("write message: %w", err)
} }
log.Printf("sent handshake resp: %s\n", s.overlay) s.logger.Tracef("handshake: handled response: %s", s.overlay)
return req.Address return &Info{
Address: req.Address,
NetworkID: req.NetworkID,
Light: req.Light,
}, nil
}
type Info struct {
Address string
NetworkID int32
Light bool
} }
...@@ -23,7 +23,9 @@ var _ = math.Inf ...@@ -23,7 +23,9 @@ var _ = math.Inf
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type ShakeHand struct { type ShakeHand struct {
Address string `protobuf:"bytes,1,opt,name=PeerID,proto3" json:"PeerID,omitempty"` Address string `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"`
NetworkID int32 `protobuf:"varint,2,opt,name=NetworkID,proto3" json:"NetworkID,omitempty"`
Light bool `protobuf:"varint,3,opt,name=Light,proto3" json:"Light,omitempty"`
} }
func (m *ShakeHand) Reset() { *m = ShakeHand{} } func (m *ShakeHand) Reset() { *m = ShakeHand{} }
...@@ -66,6 +68,20 @@ func (m *ShakeHand) GetAddress() string { ...@@ -66,6 +68,20 @@ func (m *ShakeHand) GetAddress() string {
return "" return ""
} }
func (m *ShakeHand) GetNetworkID() int32 {
if m != nil {
return m.NetworkID
}
return 0
}
func (m *ShakeHand) GetLight() bool {
if m != nil {
return m.Light
}
return false
}
func init() { func init() {
proto.RegisterType((*ShakeHand)(nil), "handshake.ShakeHand") proto.RegisterType((*ShakeHand)(nil), "handshake.ShakeHand")
} }
...@@ -73,14 +89,17 @@ func init() { ...@@ -73,14 +89,17 @@ func init() {
func init() { proto.RegisterFile("handshake.proto", fileDescriptor_a77305914d5d202f) } func init() { proto.RegisterFile("handshake.proto", fileDescriptor_a77305914d5d202f) }
var fileDescriptor_a77305914d5d202f = []byte{ var fileDescriptor_a77305914d5d202f = []byte{
// 108 bytes of a gzipped FileDescriptorProto // 148 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0x48, 0xcc, 0x4b, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0x48, 0xcc, 0x4b,
0x29, 0xce, 0x48, 0xcc, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0x0b, 0x28, 0x29, 0xce, 0x48, 0xcc, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0x0b, 0x28,
0xa9, 0x72, 0x71, 0x06, 0x83, 0x18, 0x1e, 0x89, 0x79, 0x29, 0x42, 0x12, 0x5c, 0xec, 0x8e, 0x29, 0x45, 0x72, 0x71, 0x06, 0x83, 0x18, 0x1e, 0x89, 0x79, 0x29, 0x42, 0x12, 0x5c, 0xec, 0x8e, 0x29,
0x29, 0x45, 0xa9, 0xc5, 0xc5, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x30, 0xae, 0x93, 0xc4, 0x29, 0x45, 0xa9, 0xc5, 0xc5, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x30, 0xae, 0x90, 0x0c,
0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe1, 0xb1, 0x1c, 0x17, 0xa7, 0x5f, 0x6a, 0x49, 0x79, 0x7e, 0x51, 0xb6, 0xa7, 0x8b, 0x04, 0x93, 0x02, 0xa3, 0x06,
0xc3, 0x85, 0xc7, 0x72, 0x0c, 0x37, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0x8d, 0x34, 0x06, 0x04, 0x6b, 0x10, 0x42, 0x40, 0x48, 0x84, 0x8b, 0xd5, 0x27, 0x33, 0x3d, 0xa3, 0x44, 0x82, 0x59, 0x81,
0x00, 0x00, 0xff, 0xff, 0x5d, 0x34, 0x69, 0xba, 0x65, 0x00, 0x00, 0x00, 0x51, 0x83, 0x23, 0x08, 0xc2, 0x71, 0x92, 0x38, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6,
0x07, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, 0x8e, 0xe1, 0xc6, 0x63, 0x39,
0x86, 0x24, 0x36, 0xb0, 0x33, 0x8c, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0x62, 0x1c, 0xa2, 0x06,
0x99, 0x00, 0x00, 0x00,
} }
func (m *ShakeHand) Marshal() (dAtA []byte, err error) { func (m *ShakeHand) Marshal() (dAtA []byte, err error) {
...@@ -103,6 +122,21 @@ func (m *ShakeHand) MarshalToSizedBuffer(dAtA []byte) (int, error) { ...@@ -103,6 +122,21 @@ func (m *ShakeHand) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i _ = i
var l int var l int
_ = l _ = l
if m.Light {
i--
if m.Light {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i--
dAtA[i] = 0x18
}
if m.NetworkID != 0 {
i = encodeVarintHandshake(dAtA, i, uint64(m.NetworkID))
i--
dAtA[i] = 0x10
}
if len(m.Address) > 0 { if len(m.Address) > 0 {
i -= len(m.Address) i -= len(m.Address)
copy(dAtA[i:], m.Address) copy(dAtA[i:], m.Address)
...@@ -134,6 +168,12 @@ func (m *ShakeHand) Size() (n int) { ...@@ -134,6 +168,12 @@ func (m *ShakeHand) Size() (n int) {
if l > 0 { if l > 0 {
n += 1 + l + sovHandshake(uint64(l)) n += 1 + l + sovHandshake(uint64(l))
} }
if m.NetworkID != 0 {
n += 1 + sovHandshake(uint64(m.NetworkID))
}
if m.Light {
n += 2
}
return n return n
} }
...@@ -174,7 +214,7 @@ func (m *ShakeHand) Unmarshal(dAtA []byte) error { ...@@ -174,7 +214,7 @@ func (m *ShakeHand) Unmarshal(dAtA []byte) error {
switch fieldNum { switch fieldNum {
case 1: case 1:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PeerID", wireType) return fmt.Errorf("proto: wrong wireType = %d for field Address", wireType)
} }
var stringLen uint64 var stringLen uint64
for shift := uint(0); ; shift += 7 { for shift := uint(0); ; shift += 7 {
...@@ -204,6 +244,45 @@ func (m *ShakeHand) Unmarshal(dAtA []byte) error { ...@@ -204,6 +244,45 @@ func (m *ShakeHand) Unmarshal(dAtA []byte) error {
} }
m.Address = string(dAtA[iNdEx:postIndex]) m.Address = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field NetworkID", wireType)
}
m.NetworkID = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHandshake
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.NetworkID |= int32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Light", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHandshake
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.Light = bool(v != 0)
default: default:
iNdEx = preIndex iNdEx = preIndex
skippy, err := skipHandshake(dAtA[iNdEx:]) skippy, err := skipHandshake(dAtA[iNdEx:])
......
...@@ -4,5 +4,7 @@ package handshake; ...@@ -4,5 +4,7 @@ package handshake;
message ShakeHand { message ShakeHand {
string Address = 1; string Address = 1;
int32 NetworkID = 2;
bool Light = 3;
} }
...@@ -14,12 +14,11 @@ import ( ...@@ -14,12 +14,11 @@ import (
"net" "net"
"os" "os"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/janos/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
handshake "github.com/janos/bee/pkg/p2p/libp2p/internal/handshake" handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat-svc" autonat "github.com/libp2p/go-libp2p-autonat-svc"
connmgr "github.com/libp2p/go-libp2p-connmgr" connmgr "github.com/libp2p/go-libp2p-connmgr"
...@@ -38,13 +37,18 @@ import ( ...@@ -38,13 +37,18 @@ import (
var _ p2p.Service = (*Service)(nil) var _ p2p.Service = (*Service)(nil)
func init() {
// Only temporary for fake overlay address generation.
rand.Seed(time.Now().UnixNano())
}
type Service struct { type Service struct {
host host.Host host host.Host
metrics metrics metrics metrics
networkID int32
handshakeService *handshake.Service handshakeService *handshake.Service
overlayToPeerID map[string]libp2ppeer.ID peers *peerRegistry
peerIDToOverlay map[libp2ppeer.ID]string logger Logger
overlayPeerIDMu sync.RWMutex
} }
type Options struct { type Options struct {
...@@ -53,10 +57,17 @@ type Options struct { ...@@ -53,10 +57,17 @@ type Options struct {
DisableWS bool DisableWS bool
DisableQUIC bool DisableQUIC bool
Bootnodes []string Bootnodes []string
NetworkID int // TODO: to be used in the handshake protocol NetworkID int32
ConnectionsLow int ConnectionsLow int
ConnectionsHigh int ConnectionsHigh int
ConnectionsGrace time.Duration ConnectionsGrace time.Duration
Logger Logger
}
type Logger interface {
Tracef(format string, args ...interface{})
Infof(format string, args ...interface{})
Errorf(format string, args ...interface{})
} }
func New(ctx context.Context, o Options) (*Service, error) { func New(ctx context.Context, o Options) (*Service, error) {
...@@ -181,13 +192,16 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -181,13 +192,16 @@ func New(ctx context.Context, o Options) (*Service, error) {
return nil, fmt.Errorf("autonat: %w", err) return nil, fmt.Errorf("autonat: %w", err)
} }
// This is just a temporary way to generate an overlay address.
// TODO: proper key management and overlay address generation
overlay := strconv.Itoa(rand.Int()) overlay := strconv.Itoa(rand.Int())
s := &Service{ s := &Service{
host: h, host: h,
metrics: newMetrics(), metrics: newMetrics(),
overlayToPeerID: make(map[string]libp2ppeer.ID), networkID: o.NetworkID,
peerIDToOverlay: make(map[libp2ppeer.ID]string), handshakeService: handshake.New(overlay, o.NetworkID, o.Logger),
handshakeService: handshake.New(overlay), peers: newPeerRegistry(),
logger: o.Logger,
} }
// Construct protocols. // Construct protocols.
...@@ -199,9 +213,19 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -199,9 +213,19 @@ func New(ctx context.Context, o Options) (*Service, error) {
} }
s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) { s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) {
peerID := stream.Conn().RemotePeer()
i, err := s.handshakeService.Handle(stream)
if err != nil {
s.logger.Errorf("handshake with peer %s: %w", peerID, err)
return
}
if i.NetworkID != s.networkID {
s.logger.Errorf("handshake with peer %s: invalid network id %v", peerID, i.NetworkID)
return
}
s.peers.add(peerID, i.Address)
s.metrics.HandledStreamCount.Inc() s.metrics.HandledStreamCount.Inc()
overlay := s.handshakeService.Handler(stream) s.logger.Infof("peer %q connected", overlay)
s.addAddresses(overlay, stream.Conn().RemotePeer())
}) })
// TODO: be more resilient on connection errors and connect in parallel // TODO: be more resilient on connection errors and connect in parallel
...@@ -234,10 +258,10 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { ...@@ -234,10 +258,10 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) { s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) {
peerID := stream.Conn().RemotePeer() peerID := stream.Conn().RemotePeer()
overlay, ok := s.overlayForPeerID(peerID) overlay, found := s.peers.overlay(peerID)
if !ok { if !found {
// todo: handle better // todo: handle better
fmt.Printf("Could not fetch handshake for peerID %s\n", stream) s.logger.Errorf("overlay address for peer %q not found", peerID)
return return
} }
...@@ -280,19 +304,22 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (err error) { ...@@ -280,19 +304,22 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (err error) {
} }
defer stream.Close() defer stream.Close()
overlay, err := s.handshakeService.Handshake(stream) i, err := s.handshakeService.Handshake(stream)
if err != nil { if err != nil {
return err return err
} }
if i.NetworkID != s.networkID {
return fmt.Errorf("invalid network id %v", i.NetworkID)
}
s.addAddresses(overlay, info.ID) s.peers.add(info.ID, i.Address)
s.metrics.CreatedConnectionCount.Inc() s.metrics.CreatedConnectionCount.Inc()
fmt.Println("handshake handshake finished") s.logger.Infof("peer %q connected", i.Address)
return nil return nil
} }
func (s *Service) NewStream(ctx context.Context, overlay, protocolName, streamName, version string) (p2p.Stream, error) { func (s *Service) NewStream(ctx context.Context, overlay, protocolName, streamName, version string) (p2p.Stream, error) {
peerID, ok := s.peerIDForOverlay(overlay) peerID, found := s.peers.peerID(overlay)
if !ok { if !found {
return nil, p2p.ErrPeerNotFound return nil, p2p.ErrPeerNotFound
} }
...@@ -312,27 +339,6 @@ func (s *Service) newStreamForPeerID(ctx context.Context, peerID libp2ppeer.ID, ...@@ -312,27 +339,6 @@ func (s *Service) newStreamForPeerID(ctx context.Context, peerID libp2ppeer.ID,
return st, nil return st, nil
} }
func (s *Service) peerIDForOverlay(overlay string) (peerID libp2ppeer.ID, ok bool) {
s.overlayPeerIDMu.RLock()
peerID, ok = s.overlayToPeerID[overlay]
s.overlayPeerIDMu.RUnlock()
return peerID, ok
}
func (s *Service) overlayForPeerID(peerID libp2ppeer.ID) (overlay string, ok bool) {
s.overlayPeerIDMu.RLock()
overlay, ok = s.peerIDToOverlay[peerID]
s.overlayPeerIDMu.RUnlock()
return overlay, ok
}
func (s *Service) addAddresses(overlay string, peerID libp2ppeer.ID) {
s.overlayPeerIDMu.Lock()
s.overlayToPeerID[overlay] = peerID
s.peerIDToOverlay[peerID] = overlay
s.overlayPeerIDMu.Unlock()
}
func (s *Service) Close() error { func (s *Service) Close() error {
return s.host.Close() return s.host.Close()
} }
...@@ -5,10 +5,11 @@ ...@@ -5,10 +5,11 @@
package libp2p package libp2p
import ( import (
m "github.com/janos/bee/pkg/metrics" m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
type metrics struct { type metrics struct {
// all metrics fields must be exported // all metrics fields must be exported
// to be able to return them by Metrics() // to be able to return them by Metrics()
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package libp2p
import (
"sync"
libp2ppeer "github.com/libp2p/go-libp2p-core/peer"
)
type peerRegistry struct {
peers map[string]libp2ppeer.ID
overlays map[libp2ppeer.ID]string
mu sync.RWMutex
}
func newPeerRegistry() *peerRegistry {
return &peerRegistry{
peers: make(map[string]libp2ppeer.ID),
overlays: make(map[libp2ppeer.ID]string),
}
}
func (r *peerRegistry) add(peerID libp2ppeer.ID, overlay string) {
r.mu.Lock()
r.peers[overlay] = peerID
r.overlays[peerID] = overlay
r.mu.Unlock()
}
func (r *peerRegistry) peerID(overlay string) (peerID libp2ppeer.ID, found bool) {
r.mu.RLock()
peerID, found = r.peers[overlay]
r.mu.RUnlock()
return peerID, found
}
func (r *peerRegistry) overlay(peerID libp2ppeer.ID) (overlay string, found bool) {
r.mu.RLock()
overlay, found = r.overlays[peerID]
r.mu.RUnlock()
return overlay, found
}
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
"io" "io"
"sync" "sync"
"github.com/janos/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
) )
type Recorder struct { type Recorder struct {
......
...@@ -7,7 +7,7 @@ package protobuf ...@@ -7,7 +7,7 @@ package protobuf
import ( import (
ggio "github.com/gogo/protobuf/io" ggio "github.com/gogo/protobuf/io"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/janos/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"io" "io"
) )
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
package pingpong package pingpong
import ( import (
m "github.com/janos/bee/pkg/metrics" m "github.com/ethersphere/bee/pkg/metrics"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
......
...@@ -11,8 +11,8 @@ import ( ...@@ -11,8 +11,8 @@ import (
"io" "io"
"time" "time"
"github.com/janos/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
"github.com/janos/bee/pkg/p2p/protobuf" "github.com/ethersphere/bee/pkg/p2p/protobuf"
) )
const ( const (
......
...@@ -11,12 +11,13 @@ import ( ...@@ -11,12 +11,13 @@ import (
"io/ioutil" "io/ioutil"
"testing" "testing"
"github.com/janos/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/janos/bee/pkg/p2p/mock" "github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/janos/bee/pkg/p2p/protobuf" "github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/janos/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/pingpong"
) )
func TestPing(t *testing.T) { func TestPing(t *testing.T) {
logger := logging.New(ioutil.Discard) logger := logging.New(ioutil.Discard)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment