Commit a78522cb authored by Janos Guljas's avatar Janos Guljas

add p2p mock Streamer and pingpong test

parent f12d4f94
package mock
import (
"context"
"io"
"sync"
"github.com/janos/bee/pkg/p2p"
ma "github.com/multiformats/go-multiaddr"
)
type Streamer struct {
In, Out *Buffer
handler func(p2p.Peer)
}
func NewStreamer(handler func(p2p.Peer)) *Streamer {
return &Streamer{
In: NewBuffer(),
Out: NewBuffer(),
handler: handler,
}
}
func (s *Streamer) NewStream(ctx context.Context, peerID, protocolName, streamName, version string) (p2p.Stream, error) {
out := NewStream(s.In, s.Out)
in := NewStream(s.Out, s.In)
go s.handler(p2p.Peer{
Addr: ma.StringCast(peerID),
Stream: in,
})
return out, nil
}
type Stream struct {
in io.WriteCloser
out io.ReadCloser
}
func NewStream(in io.WriteCloser, out io.ReadCloser) *Stream {
return &Stream{in: in, out: out}
}
func (s *Stream) Read(p []byte) (int, error) {
return s.out.Read(p)
}
func (s *Stream) Write(p []byte) (int, error) {
return s.in.Write(p)
}
func (s *Stream) Close() error {
if err := s.in.Close(); err != nil {
return err
}
if err := s.out.Close(); err != nil {
return err
}
return nil
}
type Buffer struct {
b []byte
c int
closed bool
cond *sync.Cond
}
func NewBuffer() *Buffer {
return &Buffer{
cond: sync.NewCond(new(sync.Mutex)),
}
}
func (b *Buffer) Read(p []byte) (n int, err error) {
b.cond.L.Lock()
defer b.cond.L.Unlock()
for b.c == len(b.b) || b.closed {
b.cond.Wait()
}
end := b.c + len(p)
if end > len(b.b) {
end = len(b.b)
}
n = copy(p, b.b[b.c:end])
b.c += n
if b.closed {
err = io.EOF
}
return n, err
}
func (b *Buffer) Write(p []byte) (int, error) {
b.cond.L.Lock()
defer b.cond.L.Unlock()
defer b.cond.Signal()
b.b = append(b.b, p...)
return len(p), nil
}
func (b *Buffer) Close() error {
b.cond.L.Lock()
defer b.cond.L.Unlock()
defer b.cond.Broadcast()
b.closed = true
return nil
}
func (b *Buffer) Bytes() []byte {
return b.b
}
......@@ -3,12 +3,21 @@ package protobuf
import (
ggio "github.com/gogo/protobuf/io"
"github.com/janos/bee/pkg/p2p"
"io"
)
const delimitedReaderMaxSize = 128 * 1024 // max message size
func NewRW(s p2p.Stream) (w ggio.Writer, r ggio.Reader) {
func NewWriterAndReader(s p2p.Stream) (w ggio.Writer, r ggio.Reader) {
r = ggio.NewDelimitedReader(s, delimitedReaderMaxSize)
w = ggio.NewDelimitedWriter(s)
return w, r
}
func NewReader(r io.Reader) ggio.Reader {
return ggio.NewDelimitedReader(r, delimitedReaderMaxSize)
}
func NewWriter(w io.Writer) ggio.Writer {
return ggio.NewDelimitedWriter(w)
}
......@@ -41,7 +41,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
}
func (s *Service) Handler(p p2p.Peer) {
w, r := protobuf.NewRW(p.Stream)
w, r := protobuf.NewWriterAndReader(p.Stream)
defer p.Stream.Close()
var ping Ping
......@@ -56,7 +56,7 @@ func (s *Service) Handler(p p2p.Peer) {
log.Printf("got ping: %q\n", ping.Greeting)
if err := w.WriteMsg(&Pong{
Response: ping.Greeting,
Response: "{" + ping.Greeting + "}",
}); err != nil {
log.Printf("pingpong handler: write message: %v\n", err)
return
......@@ -71,7 +71,7 @@ func (s *Service) Ping(ctx context.Context, peerID string, msgs ...string) (rtt
}
defer stream.Close()
w, r := protobuf.NewRW(stream)
w, r := protobuf.NewWriterAndReader(stream)
var pong Pong
start := time.Now()
......
package pingpong_test
import (
"bytes"
"context"
"fmt"
"io"
"testing"
"github.com/janos/bee/pkg/p2p/mock"
"github.com/janos/bee/pkg/p2p/protobuf"
"github.com/janos/bee/pkg/pingpong"
)
func TestPing(t *testing.T) {
// create a pingpong server that handles the incoming stream
server := pingpong.New(nil)
// setup the mock streamer to record stream data
streamer := mock.NewStreamer(server.Handler)
// create a pingpong client that will do pinging
client := pingpong.New(streamer)
// ping
greetings := []string{"hey", "there", "fella"}
rtt, err := client.Ping(context.Background(), "/p2p/QmZt98UimwpW9ptJumKTq7B7t3FzNfyoWVNGcd8PFCd7XS", greetings...)
if err != nil {
t.Fatal(err)
}
// check that RTT is a sane value
if rtt <= 0 {
t.Errorf("invalid RTT value %v", rtt)
}
// validate received ping greetings
r := protobuf.NewReader(bytes.NewReader(streamer.In.Bytes()))
var gotGreetings []string
for {
var ping pingpong.Ping
if err := r.ReadMsg(&ping); err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
gotGreetings = append(gotGreetings, ping.Greeting)
}
if fmt.Sprint(gotGreetings) != fmt.Sprint(greetings) {
t.Errorf("got greetings %v, want %v", gotGreetings, greetings)
}
// validate send pong responses by handler
r = protobuf.NewReader(bytes.NewReader(streamer.Out.Bytes()))
var wantResponses []string
for _, g := range greetings {
wantResponses = append(wantResponses, "{"+g+"}")
}
var gotResponses []string
for {
var pong pingpong.Pong
if err := r.ReadMsg(&pong); err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
gotResponses = append(gotResponses, pong.Response)
}
if fmt.Sprint(gotResponses) != fmt.Sprint(wantResponses) {
t.Errorf("got responses %v, want %v", gotResponses, wantResponses)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment