Commit e094e221 authored by Janos Guljas's avatar Janos Guljas

add handler middleware for mock recorder and improve TestPing in windows

parent 1971bae2
This diff is collapsed.
...@@ -14,16 +14,32 @@ import ( ...@@ -14,16 +14,32 @@ import (
) )
type Recorder struct { type Recorder struct {
records map[string][]Record records map[string][]Record
recordsMu sync.Mutex recordsMu sync.Mutex
protocols []p2p.ProtocolSpec protocols []p2p.ProtocolSpec
middlewares []p2p.HandlerMiddleware
} }
func NewRecorder(protocols ...p2p.ProtocolSpec) *Recorder { func WithProtocols(protocols ...p2p.ProtocolSpec) Option {
return &Recorder{ return optionFunc(func(r *Recorder) {
records: make(map[string][]Record), r.protocols = append(r.protocols, protocols...)
protocols: protocols, })
}
func WithMiddlewares(middlewares ...p2p.HandlerMiddleware) Option {
return optionFunc(func(r *Recorder) {
r.middlewares = append(r.middlewares, middlewares...)
})
}
func NewRecorder(opts ...Option) *Recorder {
r := &Recorder{
records: make(map[string][]Record),
}
for _, o := range opts {
o.apply(r)
} }
return r
} }
func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamName, version string) (p2p.Stream, error) { func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamName, version string) (p2p.Stream, error) {
...@@ -45,6 +61,9 @@ func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamNam ...@@ -45,6 +61,9 @@ func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamNam
if handler == nil { if handler == nil {
return nil, fmt.Errorf("unsupported protocol stream %q %q %q", protocolName, streamName, version) return nil, fmt.Errorf("unsupported protocol stream %q %q %q", protocolName, streamName, version)
} }
for _, m := range r.middlewares {
handler = m(handler)
}
go handler(p2p.Peer{Address: overlay}, streamIn) go handler(p2p.Peer{Address: overlay}, streamIn)
id := overlay + p2p.NewSwarmStreamName(protocolName, streamName, version) id := overlay + p2p.NewSwarmStreamName(protocolName, streamName, version)
...@@ -167,3 +186,10 @@ func (r *record) bytes() []byte { ...@@ -167,3 +186,10 @@ func (r *record) bytes() []byte {
return r.b return r.b
} }
type Option interface {
apply(*Recorder)
}
type optionFunc func(*Recorder)
func (f optionFunc) apply(r *Recorder) { f(r) }
...@@ -34,9 +34,13 @@ type ProtocolSpec struct { ...@@ -34,9 +34,13 @@ type ProtocolSpec struct {
type StreamSpec struct { type StreamSpec struct {
Name string Name string
Version string Version string
Handler func(Peer, Stream) Handler HandlerFunc
} }
type HandlerFunc func(Peer, Stream)
type HandlerMiddleware func(HandlerFunc) HandlerFunc
type IncompatibleStreamError struct { type IncompatibleStreamError struct {
err error err error
} }
......
...@@ -94,7 +94,6 @@ func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) { ...@@ -94,7 +94,6 @@ func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) {
w, r := protobuf.NewWriterAndReader(stream) w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close() defer stream.Close()
fmt.Printf("Initiate pinpong for peer %s", peer)
var ping Ping var ping Ping
for { for {
if err := r.ReadMsg(&ping); err != nil { if err := r.ReadMsg(&ping); err != nil {
......
...@@ -9,15 +9,17 @@ import ( ...@@ -9,15 +9,17 @@ import (
"context" "context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"runtime"
"testing" "testing"
"time"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/mock" "github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/ethersphere/bee/pkg/p2p/protobuf" "github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/pingpong"
) )
func TestPing(t *testing.T) { func TestPing(t *testing.T) {
logger := logging.New(ioutil.Discard) logger := logging.New(ioutil.Discard)
...@@ -27,7 +29,18 @@ func TestPing(t *testing.T) { ...@@ -27,7 +29,18 @@ func TestPing(t *testing.T) {
}) })
// setup the stream recorder to record stream data // setup the stream recorder to record stream data
recorder := mock.NewRecorder(server.Protocol()) recorder := mock.NewRecorder(
mock.WithProtocols(server.Protocol()),
mock.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc {
if runtime.GOOS == "windows" {
// windows has a bit lower time resolution
// so, slow down the handler with a middleware
// not to get 0s for rtt value
time.Sleep(100 * time.Millisecond)
}
return f
}),
)
// create a pingpong client that will do pinging // create a pingpong client that will do pinging
client := pingpong.New(pingpong.Options{ client := pingpong.New(pingpong.Options{
...@@ -36,13 +49,18 @@ func TestPing(t *testing.T) { ...@@ -36,13 +49,18 @@ func TestPing(t *testing.T) {
}) })
// ping // ping
peerID := "/p2p/QmZt98UimwpW9ptJumKTq7B7t3FzNfyoWVNGcd8PFCd7XS" peerID := "124"
greetings := []string{"hey", "there", "fella"} greetings := []string{"hey", "there", "fella"}
_, err := client.Ping(context.Background(), peerID, greetings...) rtt, err := client.Ping(context.Background(), peerID, greetings...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// check that RTT is a sane value
if rtt <= 0 {
t.Errorf("invalid RTT value %v", rtt)
}
// get a record for this stream // get a record for this stream
records, err := recorder.Records(peerID, "pingpong", "pingpong", "1.0.0") records, err := recorder.Records(peerID, "pingpong", "pingpong", "1.0.0")
if err != nil { if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment