Commit 3c484dd7 authored by Petar Radovic's avatar Petar Radovic

Merge branch 'master' of github.com:ethersphere/bee

parents f66a687a cf1b011f
This diff is collapsed.
......@@ -14,16 +14,32 @@ import (
)
type Recorder struct {
records map[string][]Record
recordsMu sync.Mutex
protocols []p2p.ProtocolSpec
records map[string][]Record
recordsMu sync.Mutex
protocols []p2p.ProtocolSpec
middlewares []p2p.HandlerMiddleware
}
func NewRecorder(protocols ...p2p.ProtocolSpec) *Recorder {
return &Recorder{
records: make(map[string][]Record),
protocols: protocols,
func WithProtocols(protocols ...p2p.ProtocolSpec) Option {
return optionFunc(func(r *Recorder) {
r.protocols = append(r.protocols, protocols...)
})
}
func WithMiddlewares(middlewares ...p2p.HandlerMiddleware) Option {
return optionFunc(func(r *Recorder) {
r.middlewares = append(r.middlewares, middlewares...)
})
}
func NewRecorder(opts ...Option) *Recorder {
r := &Recorder{
records: make(map[string][]Record),
}
for _, o := range opts {
o.apply(r)
}
return r
}
func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamName, version string) (p2p.Stream, error) {
......@@ -45,6 +61,9 @@ func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamNam
if handler == nil {
return nil, fmt.Errorf("unsupported protocol stream %q %q %q", protocolName, streamName, version)
}
for _, m := range r.middlewares {
handler = m(handler)
}
go handler(p2p.Peer{Address: overlay}, streamIn)
id := overlay + p2p.NewSwarmStreamName(protocolName, streamName, version)
......@@ -167,3 +186,10 @@ func (r *record) bytes() []byte {
return r.b
}
type Option interface {
apply(*Recorder)
}
type optionFunc func(*Recorder)
func (f optionFunc) apply(r *Recorder) { f(r) }
......@@ -34,9 +34,13 @@ type ProtocolSpec struct {
type StreamSpec struct {
Name string
Version string
Handler func(Peer, Stream)
Handler HandlerFunc
}
type HandlerFunc func(Peer, Stream)
type HandlerMiddleware func(HandlerFunc) HandlerFunc
type IncompatibleStreamError struct {
err error
}
......
......@@ -59,6 +59,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
}
func (s *Service) Ping(ctx context.Context, address string, msgs ...string) (rtt time.Duration, err error) {
start := time.Now()
stream, err := s.streamer.NewStream(ctx, address, protocolName, streamName, streamVersion)
if err != nil {
return 0, fmt.Errorf("new stream: %w", err)
......@@ -68,7 +69,6 @@ func (s *Service) Ping(ctx context.Context, address string, msgs ...string) (rtt
w, r := protobuf.NewWriterAndReader(stream)
var pong Pong
start := time.Now()
for _, msg := range msgs {
if err := w.WriteMsg(&Ping{
Greeting: msg,
......@@ -87,14 +87,13 @@ func (s *Service) Ping(ctx context.Context, address string, msgs ...string) (rtt
s.logger.Debugf("got pong: %q", pong.Response)
s.metrics.PongReceivedCount.Inc()
}
return time.Since(start) / time.Duration(len(msgs)), nil
return time.Since(start), nil
}
func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) {
w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close()
fmt.Printf("Initiate pinpong for peer %s", peer)
var ping Ping
for {
if err := r.ReadMsg(&ping); err != nil {
......
......@@ -9,15 +9,17 @@ import (
"context"
"fmt"
"io/ioutil"
"runtime"
"testing"
"time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/pingpong"
)
func TestPing(t *testing.T) {
logger := logging.New(ioutil.Discard)
......@@ -27,7 +29,18 @@ func TestPing(t *testing.T) {
})
// setup the stream recorder to record stream data
recorder := mock.NewRecorder(server.Protocol())
recorder := mock.NewRecorder(
mock.WithProtocols(server.Protocol()),
mock.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc {
if runtime.GOOS == "windows" {
// windows has a bit lower time resolution
// so, slow down the handler with a middleware
// not to get 0s for rtt value
time.Sleep(100 * time.Millisecond)
}
return f
}),
)
// create a pingpong client that will do pinging
client := pingpong.New(pingpong.Options{
......@@ -36,13 +49,18 @@ func TestPing(t *testing.T) {
})
// ping
peerID := "/p2p/QmZt98UimwpW9ptJumKTq7B7t3FzNfyoWVNGcd8PFCd7XS"
peerID := "124"
greetings := []string{"hey", "there", "fella"}
_, err := client.Ping(context.Background(), peerID, greetings...)
rtt, err := client.Ping(context.Background(), peerID, greetings...)
if err != nil {
t.Fatal(err)
}
// check that RTT is a sane value
if rtt <= 0 {
t.Errorf("invalid RTT value %v", rtt)
}
// get a record for this stream
records, err := recorder.Records(peerID, "pingpong", "pingpong", "1.0.0")
if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment