Commit 0d0c0267 authored by Svetomir Smiljkovic's avatar Svetomir Smiljkovic

Merge branch 'master' of https://github.com/janos/bee

parents 8a2644dd 1bf3600f
...@@ -18,13 +18,13 @@ Docker image `janos/bee`. ...@@ -18,13 +18,13 @@ Docker image `janos/bee`.
Execute the command terminals to start `node 1`: Execute the command terminals to start `node 1`:
```sh ```sh
bee start --api-addr :8501 --p2p-addr :30401 --debug-api-addr :6061 bee start --api-addr :8501 --p2p-addr :30401 --data-dir data1
``` ```
Use one of the multiaddresses as bootnode for `node 2` in order to connect them: Use one of the multiaddresses as bootnode for `node 2` in order to connect them:
```sh ```sh
bee start --api-addr :8502 --p2p-addr :30402 --debug-api-addr :6062 --bootnode /ip4/127.0.0.1/tcp/30401/p2p/QmT4TNB4cKYanUjdYodw1Cns8cuVaRVo24hHNYcT7JjkTB bee start --api-addr :8502 --p2p-addr :30402 --data-dir data2 --bootnode /ip4/127.0.0.1/tcp/30401/p2p/QmT4TNB4cKYanUjdYodw1Cns8cuVaRVo24hHNYcT7JjkTB
``` ```
Use the last part of `node 1` multiaddress to ping it using `node 2` by making an HTTP request to `localhost:{PORT2}/pingpong/{ID1}` like: Use the last part of `node 1` multiaddress to ping it using `node 2` by making an HTTP request to `localhost:{PORT2}/pingpong/{ID1}` like:
...@@ -53,7 +53,5 @@ curl localhost:8502/pingpong/QmT4TNB4cKYanUjdYodw1Cns8cuVaRVo24hHNYcT7JjkTB ...@@ -53,7 +53,5 @@ curl localhost:8502/pingpong/QmT4TNB4cKYanUjdYodw1Cns8cuVaRVo24hHNYcT7JjkTB
- P2P mock (protocol tester) implementation improvements - P2P mock (protocol tester) implementation improvements
- Overlay addressing in libp2p (provide overlay address in p2p.Peer) - Overlay addressing in libp2p (provide overlay address in p2p.Peer)
- Identity with private keys
- Figure out routing (whether to use libp2p Routing or to abstract hive on top of p2p package) - Figure out routing (whether to use libp2p Routing or to abstract hive on top of p2p package)
- Listener configurations (ipv4, ipv6, dns, tcp, ws, quic)
- Instrumentation: logging, metrics, tracing, pprof... - Instrumentation: logging, metrics, tracing, pprof...
...@@ -6,6 +6,7 @@ package cmd ...@@ -6,6 +6,7 @@ package cmd
import ( import (
"errors" "errors"
"os"
"path/filepath" "path/filepath"
"strings" "strings"
...@@ -119,3 +120,16 @@ func (c *command) setHomeDir() (err error) { ...@@ -119,3 +120,16 @@ func (c *command) setHomeDir() (err error) {
c.homeDir = dir c.homeDir = dir
return nil return nil
} }
// baseDir is the directory where the executable is located.
var baseDir = func() string {
path, err := os.Executable()
if err != nil {
panic(err)
}
path, err = filepath.EvalSymlinks(path)
if err != nil {
panic(err)
}
return filepath.Dir(path)
}()
...@@ -6,32 +6,32 @@ package cmd ...@@ -6,32 +6,32 @@ package cmd
import ( import (
"context" "context"
"fmt" "io"
"log"
"net"
"net/http"
"os" "os"
"os/signal" "os/signal"
"path/filepath"
"syscall" "syscall"
"time" "time"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/janos/bee/pkg/api" "github.com/janos/bee/pkg/logging"
"github.com/janos/bee/pkg/debugapi" "github.com/janos/bee/pkg/node"
"github.com/janos/bee/pkg/p2p/libp2p" "github.com/janos/bee/pkg/p2p/libp2p"
"github.com/janos/bee/pkg/pingpong"
) )
func (c *command) initStartCmd() (err error) { func (c *command) initStartCmd() (err error) {
const ( const (
optionNameDataDir = "data-dir"
optionNameAPIAddr = "api-addr" optionNameAPIAddr = "api-addr"
optionNameP2PAddr = "p2p-addr" optionNameP2PAddr = "p2p-addr"
optionNameP2PDisableWS = "p2p-disable-ws" optionNameP2PDisableWS = "p2p-disable-ws"
optionNameP2PDisableQUIC = "p2p-disable-quic" optionNameP2PDisableQUIC = "p2p-disable-quic"
optionNameDebugAPIAddr = "debug-api-addr" optionNameDebugAPIAddr = "debug-api-addr"
optionNameBootnodes = "bootnode" optionNameBootnodes = "bootnode"
optionNameNetworkID = "network-id"
optionNameConnectionsLow = "connections-low" optionNameConnectionsLow = "connections-low"
optionNameConnectionsHigh = "connections-high" optionNameConnectionsHigh = "connections-high"
optionNameConnectionsGrace = "connections-grace" optionNameConnectionsGrace = "connections-grace"
...@@ -45,82 +45,40 @@ func (c *command) initStartCmd() (err error) { ...@@ -45,82 +45,40 @@ func (c *command) initStartCmd() (err error) {
return cmd.Help() return cmd.Help()
} }
ctx, cancel := context.WithCancel(context.Background()) logger := logging.New(cmd.OutOrStdout())
defer cancel() logger.SetLevel(logrus.TraceLevel)
// Construct P2P service.
p2ps, err := libp2p.New(ctx, libp2p.Options{
Addr: c.config.GetString(optionNameP2PAddr),
DisableWS: c.config.GetBool(optionNameP2PDisableWS),
DisableQUIC: c.config.GetBool(optionNameP2PDisableQUIC),
Bootnodes: c.config.GetStringSlice(optionNameBootnodes),
ConnectionsLow: c.config.GetInt(optionNameConnectionsLow),
ConnectionsHigh: c.config.GetInt(optionNameConnectionsHigh),
ConnectionsGrace: c.config.GetDuration(optionNameConnectionsGrace),
})
if err != nil {
return fmt.Errorf("p2p service: %w", err)
}
// Construct protocols.
pingPong := pingpong.New(p2ps)
// Add protocols to the P2P service.
if err = p2ps.AddProtocol(pingPong.Protocol()); err != nil {
return fmt.Errorf("pingpong service: %w", err)
}
addrs, err := p2ps.Addresses()
if err != nil {
return fmt.Errorf("get server addresses: %w", err)
}
for _, addr := range addrs {
cmd.Println(addr)
}
// API server
apiService := api.New(api.Options{
P2P: p2ps,
Pingpong: pingPong,
})
apiListener, err := net.Listen("tcp", c.config.GetString(optionNameAPIAddr))
if err != nil {
return fmt.Errorf("api listener: %w", err)
}
apiServer := &http.Server{Handler: apiService}
go func() { var libp2pPrivateKey io.ReadWriteCloser
cmd.Println("api address:", apiListener.Addr()) if dataDir := c.config.GetString(optionNameDataDir); dataDir != "" {
dir := filepath.Join(dataDir, "libp2p")
if err := apiServer.Serve(apiListener); err != nil && err != http.ErrServerClosed { if err := os.MkdirAll(dir, os.ModePerm); err != nil {
log.Println("api server:", err) return err
} }
}() f, err := os.OpenFile(filepath.Join(dir, "private.key"), os.O_CREATE|os.O_RDWR, 0600)
var debugAPIServer *http.Server
if addr := c.config.GetString(optionNameDebugAPIAddr); addr != "" {
// Debug API server
debugAPIService := debugapi.New(debugapi.Options{})
// register metrics from components
debugAPIService.MustRegisterMetrics(p2ps.Metrics()...)
debugAPIService.MustRegisterMetrics(pingPong.Metrics()...)
debugAPIService.MustRegisterMetrics(apiService.Metrics()...)
debugAPIListener, err := net.Listen("tcp", addr)
if err != nil { if err != nil {
return fmt.Errorf("debug api listener: %w", err) return err
} }
libp2pPrivateKey = f
}
debugAPIServer := &http.Server{Handler: debugAPIService} b, err := node.NewBee(node.Options{
APIAddr: c.config.GetString(optionNameAPIAddr),
go func() { DebugAPIAddr: c.config.GetString(optionNameDebugAPIAddr),
cmd.Println("debug api address:", debugAPIListener.Addr()) LibP2POptions: libp2p.Options{
PrivateKey: libp2pPrivateKey,
if err := debugAPIServer.Serve(debugAPIListener); err != nil && err != http.ErrServerClosed { Addr: c.config.GetString(optionNameP2PAddr),
log.Println("debug api server:", err) DisableWS: c.config.GetBool(optionNameP2PDisableWS),
} DisableQUIC: c.config.GetBool(optionNameP2PDisableQUIC),
}() Bootnodes: c.config.GetStringSlice(optionNameBootnodes),
NetworkID: c.config.GetInt(optionNameNetworkID),
ConnectionsLow: c.config.GetInt(optionNameConnectionsLow),
ConnectionsHigh: c.config.GetInt(optionNameConnectionsHigh),
ConnectionsGrace: c.config.GetDuration(optionNameConnectionsGrace),
},
Logger: logger,
})
if err != nil {
return err
} }
// Wait for termination or interrupt signals. // Wait for termination or interrupt signals.
...@@ -131,33 +89,18 @@ func (c *command) initStartCmd() (err error) { ...@@ -131,33 +89,18 @@ func (c *command) initStartCmd() (err error) {
// Block main goroutine until it is interrupted // Block main goroutine until it is interrupted
sig := <-interruptChannel sig := <-interruptChannel
log.Println("received signal:", sig) logger.Debugf("received signal: %v", sig)
// Shutdown // Shutdown
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer func() {
if err := recover(); err != nil {
log.Println("shutdown panic:", err)
}
}()
defer close(done) defer close(done)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel() defer cancel()
if err := apiServer.Shutdown(ctx); err != nil { if err := b.Shutdown(ctx); err != nil {
log.Println("api server shutdown:", err) logger.Errorf("shutdown: %v", err)
}
if debugAPIServer != nil {
if err := debugAPIServer.Shutdown(ctx); err != nil {
log.Println("debug api server shutdown:", err)
}
}
if err := p2ps.Close(); err != nil {
log.Println("p2p server shutdown:", err)
} }
}() }()
...@@ -165,7 +108,7 @@ func (c *command) initStartCmd() (err error) { ...@@ -165,7 +108,7 @@ func (c *command) initStartCmd() (err error) {
// allow process termination by receiving another signal. // allow process termination by receiving another signal.
select { select {
case sig := <-interruptChannel: case sig := <-interruptChannel:
log.Printf("received signal: %v\n", sig) logger.Debugf("received signal: %v", sig)
case <-done: case <-done:
} }
...@@ -173,12 +116,14 @@ func (c *command) initStartCmd() (err error) { ...@@ -173,12 +116,14 @@ func (c *command) initStartCmd() (err error) {
}, },
} }
cmd.Flags().String(optionNameDataDir, filepath.Join(baseDir, "data"), "data directory")
cmd.Flags().String(optionNameAPIAddr, ":8500", "HTTP API listen address") cmd.Flags().String(optionNameAPIAddr, ":8500", "HTTP API listen address")
cmd.Flags().String(optionNameP2PAddr, ":30399", "P2P listen address") cmd.Flags().String(optionNameP2PAddr, ":30399", "P2P listen address")
cmd.Flags().Bool(optionNameP2PDisableWS, false, "disable P2P WebSocket protocol") cmd.Flags().Bool(optionNameP2PDisableWS, false, "disable P2P WebSocket protocol")
cmd.Flags().Bool(optionNameP2PDisableQUIC, false, "disable P2P QUIC protocol") cmd.Flags().Bool(optionNameP2PDisableQUIC, false, "disable P2P QUIC protocol")
cmd.Flags().StringSlice(optionNameBootnodes, nil, "initial nodes to connect to") cmd.Flags().StringSlice(optionNameBootnodes, nil, "initial nodes to connect to")
cmd.Flags().String(optionNameDebugAPIAddr, ":6060", "Debug HTTP API listen address") cmd.Flags().String(optionNameDebugAPIAddr, "", "debug HTTP API listen address, e.g. 127.0.0.1:6060")
cmd.Flags().Int(optionNameNetworkID, 1, "ID of the Swarm network")
cmd.Flags().Int(optionNameConnectionsLow, 200, "low watermark governing the number of connections that'll be maintained") cmd.Flags().Int(optionNameConnectionsLow, 200, "low watermark governing the number of connections that'll be maintained")
cmd.Flags().Int(optionNameConnectionsHigh, 400, "high watermark governing the number of connections that'll be maintained") cmd.Flags().Int(optionNameConnectionsHigh, 400, "high watermark governing the number of connections that'll be maintained")
cmd.Flags().Duration(optionNameConnectionsGrace, time.Minute, "the amount of time a newly opened connection is given before it becomes subject to pruning") cmd.Flags().Duration(optionNameConnectionsGrace, time.Minute, "the amount of time a newly opened connection is given before it becomes subject to pruning")
......
...@@ -19,7 +19,9 @@ require ( ...@@ -19,7 +19,9 @@ require (
github.com/multiformats/go-multiaddr v0.2.0 github.com/multiformats/go-multiaddr v0.2.0
github.com/multiformats/go-multistream v0.1.0 github.com/multiformats/go-multistream v0.1.0
github.com/prometheus/client_golang v1.3.0 github.com/prometheus/client_golang v1.3.0
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5 github.com/spf13/cobra v0.0.5
github.com/spf13/viper v1.6.1 github.com/spf13/viper v1.6.1
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
resenje.org/web v0.4.0 resenje.org/web v0.4.0
) )
This diff is collapsed.
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package logging
import (
"io"
"github.com/sirupsen/logrus"
)
func New(w io.Writer) *logrus.Logger {
logger := logrus.New()
logger.SetOutput(w)
logger.Formatter = &logrus.TextFormatter{
FullTimestamp: true,
}
return logger
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package node
import (
"context"
"fmt"
"io"
"log"
"net"
"net/http"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"github.com/janos/bee/pkg/api"
"github.com/janos/bee/pkg/debugapi"
"github.com/janos/bee/pkg/p2p/libp2p"
"github.com/janos/bee/pkg/pingpong"
)
type Bee struct {
p2pService io.Closer
p2pCancel context.CancelFunc
apiServer *http.Server
debugAPIServer *http.Server
errorLogWriter *io.PipeWriter
}
type Options struct {
APIAddr string
DebugAPIAddr string
LibP2POptions libp2p.Options
Logger *logrus.Logger
}
func NewBee(o Options) (*Bee, error) {
logger := o.Logger
p2pCtx, p2pCancel := context.WithCancel(context.Background())
b := &Bee{
p2pCancel: p2pCancel,
errorLogWriter: logger.WriterLevel(logrus.ErrorLevel),
}
// Construct P2P service.
p2ps, err := libp2p.New(p2pCtx, o.LibP2POptions)
if err != nil {
return nil, fmt.Errorf("p2p service: %w", err)
}
b.p2pService = p2ps
// Construct protocols.
pingPong := pingpong.New(pingpong.Options{
Streamer: p2ps,
Logger: logger,
})
// Add protocols to the P2P service.
if err = p2ps.AddProtocol(pingPong.Protocol()); err != nil {
return nil, fmt.Errorf("pingpong service: %w", err)
}
addrs, err := p2ps.Addresses()
if err != nil {
return nil, fmt.Errorf("get server addresses: %w", err)
}
for _, addr := range addrs {
logger.Infof("p2p address: %s", addr)
}
var apiService api.Service
if o.APIAddr != "" {
// API server
apiService = api.New(api.Options{
P2P: p2ps,
Pingpong: pingPong,
})
apiListener, err := net.Listen("tcp", o.APIAddr)
if err != nil {
return nil, fmt.Errorf("api listener: %w", err)
}
apiServer := &http.Server{
Handler: apiService,
ErrorLog: log.New(b.errorLogWriter, "", 0),
}
go func() {
logger.Infof("api address: %s", apiListener.Addr())
if err := apiServer.Serve(apiListener); err != nil && err != http.ErrServerClosed {
logger.Errorf("api server: %v", err)
}
}()
b.apiServer = apiServer
}
if o.DebugAPIAddr != "" {
// Debug API server
debugAPIService := debugapi.New(debugapi.Options{})
// register metrics from components
debugAPIService.MustRegisterMetrics(p2ps.Metrics()...)
debugAPIService.MustRegisterMetrics(pingPong.Metrics()...)
if apiService != nil {
debugAPIService.MustRegisterMetrics(apiService.Metrics()...)
}
debugAPIListener, err := net.Listen("tcp", o.DebugAPIAddr)
if err != nil {
return nil, fmt.Errorf("debug api listener: %w", err)
}
debugAPIServer := &http.Server{
Handler: debugAPIService,
ErrorLog: log.New(b.errorLogWriter, "", 0),
}
go func() {
logger.Infof("debug api address: %s", debugAPIListener.Addr())
if err := debugAPIServer.Serve(debugAPIListener); err != nil && err != http.ErrServerClosed {
logger.Errorf("debug api server: %v", err)
}
}()
b.debugAPIServer = debugAPIServer
}
return b, nil
}
func (b *Bee) Shutdown(ctx context.Context) error {
var eg errgroup.Group
if b.apiServer != nil {
eg.Go(func() error {
if err := b.apiServer.Shutdown(ctx); err != nil {
return fmt.Errorf("api server: %w", err)
}
return nil
})
}
if b.debugAPIServer != nil {
eg.Go(func() error {
if err := b.debugAPIServer.Shutdown(ctx); err != nil {
return fmt.Errorf("debug api server: %w", err)
}
return nil
})
}
if err := eg.Wait(); err != nil {
return err
}
b.p2pCancel()
if err := b.p2pService.Close(); err != nil {
return fmt.Errorf("p2p server: %w", err)
}
return b.errorLogWriter.Close()
}
...@@ -5,9 +5,13 @@ ...@@ -5,9 +5,13 @@
package libp2p package libp2p
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io"
"io/ioutil"
"net" "net"
"os"
"time" "time"
"github.com/janos/bee/pkg/p2p" "github.com/janos/bee/pkg/p2p"
...@@ -15,6 +19,7 @@ import ( ...@@ -15,6 +19,7 @@ import (
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat-svc" autonat "github.com/libp2p/go-libp2p-autonat-svc"
connmgr "github.com/libp2p/go-libp2p-connmgr" connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/helpers" "github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
...@@ -35,10 +40,12 @@ type Service struct { ...@@ -35,10 +40,12 @@ type Service struct {
} }
type Options struct { type Options struct {
PrivateKey io.ReadWriteCloser
Addr string Addr string
DisableWS bool DisableWS bool
DisableQUIC bool DisableQUIC bool
Bootnodes []string Bootnodes []string
NetworkID int // TODO: to be used in the handshake protocol
ConnectionsLow int ConnectionsLow int
ConnectionsHigh int ConnectionsHigh int
ConnectionsGrace time.Duration ConnectionsGrace time.Duration
...@@ -87,8 +94,6 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -87,8 +94,6 @@ func New(ctx context.Context, o Options) (*Service, error) {
} }
opts := []libp2p.Option{ opts := []libp2p.Option{
// Use the keypair we generated
//libp2p.Identity(priv),
// Multiple listen addresses // Multiple listen addresses
libp2p.ListenAddrStrings(listenAddrs...), libp2p.ListenAddrStrings(listenAddrs...),
// support TLS connections // support TLS connections
...@@ -108,6 +113,41 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -108,6 +113,41 @@ func New(ctx context.Context, o Options) (*Service, error) {
libp2p.NATPortMap(), libp2p.NATPortMap(),
} }
if o.PrivateKey != nil {
var privateKey crypto.PrivKey
privateKeyData, err := ioutil.ReadAll(o.PrivateKey)
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("read private key: %w", err)
}
if len(privateKeyData) == 0 {
var err error
privateKey, _, err = crypto.GenerateSecp256k1Key(nil)
if err != nil {
return nil, fmt.Errorf("generate secp256k1 key: %w", err)
}
d, err := crypto.MarshalPrivateKey(privateKey)
if err != nil {
return nil, fmt.Errorf("encode private key: %w", err)
}
if _, err := io.Copy(o.PrivateKey, bytes.NewReader(d)); err != nil {
return nil, fmt.Errorf("write private key: %w", err)
}
} else {
var err error
privateKey, err = crypto.UnmarshalPrivateKey(privateKeyData)
if err != nil {
return nil, fmt.Errorf("decode private key: %w", err)
}
}
if err := o.PrivateKey.Close(); err != nil {
return nil, fmt.Errorf("close private key: %w", err)
}
opts = append(opts,
// Use the keypair we generated
libp2p.Identity(privateKey),
)
}
if !o.DisableQUIC { if !o.DisableQUIC {
opts = append(opts, opts = append(opts,
// support QUIC - experimental // support QUIC - experimental
......
...@@ -10,7 +10,6 @@ import ( ...@@ -10,7 +10,6 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"log"
"time" "time"
"github.com/janos/bee/pkg/p2p" "github.com/janos/bee/pkg/p2p"
...@@ -25,12 +24,24 @@ const ( ...@@ -25,12 +24,24 @@ const (
type Service struct { type Service struct {
streamer p2p.Streamer streamer p2p.Streamer
logger Logger
metrics metrics metrics metrics
} }
func New(streamer p2p.Streamer) *Service { type Options struct {
Streamer p2p.Streamer
Logger Logger
}
type Logger interface {
Debugf(format string, args ...interface{})
Errorf(format string, args ...interface{})
}
func New(o Options) *Service {
return &Service{ return &Service{
streamer: streamer, streamer: o.Streamer,
logger: o.Logger,
metrics: newMetrics(), metrics: newMetrics(),
} }
} }
...@@ -48,32 +59,6 @@ func (s *Service) Protocol() p2p.ProtocolSpec { ...@@ -48,32 +59,6 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
} }
} }
func (s *Service) Handler(p p2p.Peer) {
w, r := protobuf.NewWriterAndReader(p.Stream)
defer p.Stream.Close()
var ping Ping
for {
if err := r.ReadMsg(&ping); err != nil {
if err == io.EOF {
break
}
log.Printf("pingpong handler: read message: %v\n", err)
return
}
log.Printf("got ping: %q\n", ping.Greeting)
s.metrics.PingReceivedCount.Inc()
if err := w.WriteMsg(&Pong{
Response: "{" + ping.Greeting + "}",
}); err != nil {
log.Printf("pingpong handler: write message: %v\n", err)
return
}
s.metrics.PongSentCount.Inc()
}
}
func (s *Service) Ping(ctx context.Context, peerID string, msgs ...string) (rtt time.Duration, err error) { func (s *Service) Ping(ctx context.Context, peerID string, msgs ...string) (rtt time.Duration, err error) {
stream, err := s.streamer.NewStream(ctx, peerID, protocolName, streamName, streamVersion) stream, err := s.streamer.NewStream(ctx, peerID, protocolName, streamName, streamVersion)
if err != nil { if err != nil {
...@@ -100,8 +85,34 @@ func (s *Service) Ping(ctx context.Context, peerID string, msgs ...string) (rtt ...@@ -100,8 +85,34 @@ func (s *Service) Ping(ctx context.Context, peerID string, msgs ...string) (rtt
return 0, err return 0, err
} }
log.Printf("got pong: %q\n", pong.Response) s.logger.Debugf("got pong: %q", pong.Response)
s.metrics.PongReceivedCount.Inc() s.metrics.PongReceivedCount.Inc()
} }
return time.Since(start) / time.Duration(len(msgs)), nil return time.Since(start) / time.Duration(len(msgs)), nil
} }
func (s *Service) Handler(p p2p.Peer) {
w, r := protobuf.NewWriterAndReader(p.Stream)
defer p.Stream.Close()
var ping Ping
for {
if err := r.ReadMsg(&ping); err != nil {
if err == io.EOF {
break
}
s.logger.Errorf("pingpong handler: read message: %v\n", err)
return
}
s.logger.Debugf("got ping: %q", ping.Greeting)
s.metrics.PingReceivedCount.Inc()
if err := w.WriteMsg(&Pong{
Response: "{" + ping.Greeting + "}",
}); err != nil {
s.logger.Errorf("pingpong handler: write message: %v\n", err)
return
}
s.metrics.PongSentCount.Inc()
}
}
...@@ -8,36 +8,40 @@ import ( ...@@ -8,36 +8,40 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"io/ioutil"
"testing" "testing"
"github.com/janos/bee/pkg/logging"
"github.com/janos/bee/pkg/p2p/mock" "github.com/janos/bee/pkg/p2p/mock"
"github.com/janos/bee/pkg/p2p/protobuf" "github.com/janos/bee/pkg/p2p/protobuf"
"github.com/janos/bee/pkg/pingpong" "github.com/janos/bee/pkg/pingpong"
) )
func TestPing(t *testing.T) { func TestPing(t *testing.T) {
logger := logging.New(ioutil.Discard)
// create a pingpong server that handles the incoming stream // create a pingpong server that handles the incoming stream
server := pingpong.New(nil) server := pingpong.New(pingpong.Options{
Logger: logger,
})
// setup the stream recorder to record stream data // setup the stream recorder to record stream data
recorder := mock.NewRecorder(server.Protocol()) recorder := mock.NewRecorder(server.Protocol())
// create a pingpong client that will do pinging // create a pingpong client that will do pinging
client := pingpong.New(recorder) client := pingpong.New(pingpong.Options{
Streamer: recorder,
Logger: logger,
})
// ping // ping
peerID := "/p2p/QmZt98UimwpW9ptJumKTq7B7t3FzNfyoWVNGcd8PFCd7XS" peerID := "/p2p/QmZt98UimwpW9ptJumKTq7B7t3FzNfyoWVNGcd8PFCd7XS"
greetings := []string{"hey", "there", "fella"} greetings := []string{"hey", "there", "fella"}
rtt, err := client.Ping(context.Background(), peerID, greetings...) _, err := client.Ping(context.Background(), peerID, greetings...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// check that RTT is a sane value
if rtt <= 0 {
t.Errorf("invalid RTT value %v", rtt)
}
// get a record for this stream // get a record for this stream
records, err := recorder.Records(peerID, "pingpong", "pingpong", "1.0.0") records, err := recorder.Records(peerID, "pingpong", "pingpong", "1.0.0")
if err != nil { if err != nil {
......
...@@ -5,13 +5,13 @@ ...@@ -5,13 +5,13 @@
package bee package bee
var ( var (
version = "v0.1.0" // manually set semantic version number version = "0.1.0" // manually set semantic version number
commit string // automatically set git commit hash commit string // automatically set git commit hash
Version = func() string { Version = func() string {
if commit != "" { if commit != "" {
return version + "-" + commit return version + "-" + commit
} }
return version return version + "-dev"
}() }()
) )
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment