Commit 6898125c authored by Svetomir Smiljkovic's avatar Svetomir Smiljkovic

Merge branch 'master' of https://github.com/ethersphere/bee

parents 6af55846 18b2f6bc
COMMIT ?= ""
GO ?= go
GOLANGCI_LINT ?= golangci-lint
LDFLAGS ?= -s -w -X github.com/ethersphere/bee.commit="$(COMMIT)"
LDFLAGS ?= -s -w
ifdef COMMIT
LDFLAGS += -X github.com/ethersphere/bee.commit="$(COMMIT)"
endif
.PHONY: all
all: build lint vet test binary
......
This diff is collapsed.
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package libp2p
// This error is handled specially by libp2p
// If returned by specific protocol handler it causes peer disconnect
type disconnectError struct {
err error
}
// Disconnect wraps error and creates a special error that is treated specially by libp2p
// It causes peer disconnect
func Disconnect(err error) error {
return &disconnectError{
err: err,
}
}
// Unwrap returns an underlying error
func (e *disconnectError) Unwrap() error { return e.err }
// Error implements function of the standard go error interface
func (w *disconnectError) Error() string {
return w.err.Error()
}
......@@ -2,14 +2,13 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:generate sh -c "protoc -I . -I \"$(go list -f '{{ .Dir }}' -m github.com/gogo/protobuf)/protobuf\" --gogofaster_out=. handshake.proto"
package handshake
import (
"fmt"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake/pb"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
)
......@@ -39,20 +38,20 @@ type Logger interface {
func (s *Service) Handshake(stream p2p.Stream) (i *Info, err error) {
w, r := protobuf.NewWriterAndReader(stream)
var resp ShakeHand
if err := w.WriteMsg(&ShakeHand{
var resp pb.ShakeHand
if err := w.WriteMsg(&pb.ShakeHand{
Address: s.overlay,
NetworkID: s.networkID,
}); err != nil {
return nil, fmt.Errorf("handshake handler: write message: %w", err)
return nil, fmt.Errorf("handshake write message: %w", err)
}
s.logger.Tracef("handshake: sent request %s", s.overlay)
s.logger.Tracef("handshake sent request %s", s.overlay)
if err := r.ReadMsg(&resp); err != nil {
return nil, fmt.Errorf("handshake handler: read message: %w", err)
return nil, fmt.Errorf("handshake read message: %w", err)
}
s.logger.Tracef("handshake: read response: %s", resp.Address)
s.logger.Tracef("handshake read response: %s", resp.Address)
return &Info{
Address: resp.Address,
NetworkID: resp.NetworkID,
......@@ -64,20 +63,20 @@ func (s *Service) Handle(stream p2p.Stream) (i *Info, err error) {
w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close()
var req ShakeHand
var req pb.ShakeHand
if err := r.ReadMsg(&req); err != nil {
return nil, fmt.Errorf("read message: %w", err)
return nil, fmt.Errorf("handshake handler read message: %w", err)
}
s.logger.Tracef("handshake: received request %s", req.Address)
if err := w.WriteMsg(&ShakeHand{
s.logger.Tracef("handshake handler received request %s", req.Address)
if err := w.WriteMsg(&pb.ShakeHand{
Address: s.overlay,
NetworkID: s.networkID,
}); err != nil {
return nil, fmt.Errorf("write message: %w", err)
return nil, fmt.Errorf("handshake handler write message: %w", err)
}
s.logger.Tracef("handshake: handled response: %s", s.overlay)
s.logger.Tracef("handshake handled response: %s", s.overlay)
return &Info{
Address: req.Address,
NetworkID: req.NetworkID,
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package handshake
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"testing"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake/pb"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
)
type StreamMock struct {
readBuffer *bytes.Buffer
writeBuffer *bytes.Buffer
readError error
writeError error
}
func (s *StreamMock) Read(p []byte) (n int, err error) {
if s.readError != nil {
return 0, s.readError
}
return s.readBuffer.Read(p)
}
func (s *StreamMock) Write(p []byte) (n int, err error) {
if s.writeError != nil {
return 0, s.writeError
}
return s.writeBuffer.Write(p)
}
func (s *StreamMock) Close() error {
return nil
}
func TestHandshake(t *testing.T) {
logger := logging.New(ioutil.Discard)
info := Info{
Address: "node1",
NetworkID: 0,
Light: false,
}
handshakeService := New(info.Address, info.NetworkID, logger)
t.Run("OK", func(t *testing.T) {
expectedInfo := Info{
Address: "node2",
NetworkID: 1,
Light: false,
}
var buffer1 bytes.Buffer
var buffer2 bytes.Buffer
stream1 := &StreamMock{readBuffer: &buffer1, writeBuffer: &buffer2}
stream2 := &StreamMock{readBuffer: &buffer2, writeBuffer: &buffer1}
w, _ := protobuf.NewWriterAndReader(stream2)
if err := w.WriteMsg(&pb.ShakeHand{
Address: expectedInfo.Address,
NetworkID: expectedInfo.NetworkID,
Light: expectedInfo.Light,
}); err != nil {
t.Fatal(err)
}
res, err := handshakeService.Handshake(stream1)
if err != nil {
t.Fatal(err)
}
if *res != expectedInfo {
t.Fatalf("got %+v, expected %+v", res, info)
}
})
t.Run("ERROR - write error ", func(t *testing.T) {
testErr := errors.New("test error")
expectedErr := fmt.Errorf("handshake write message: %w", testErr)
stream := &StreamMock{writeError: testErr}
res, err := handshakeService.Handshake(stream)
if err == nil || err.Error() != expectedErr.Error() {
t.Fatal("expected:", expectedErr, "got:", err)
}
if res != nil {
t.Fatal("handshake returned non-nil res")
}
})
t.Run("ERROR - read error ", func(t *testing.T) {
testErr := errors.New("test error")
expectedErr := fmt.Errorf("handshake read message: %w", testErr)
stream := &StreamMock{writeBuffer: &bytes.Buffer{}, readError: testErr}
res, err := handshakeService.Handshake(stream)
if err == nil || err.Error() != expectedErr.Error() {
t.Fatal("expected:", expectedErr, "got:", err)
}
if res != nil {
t.Fatal("handshake returned non-nil res")
}
})
}
func TestHandle(t *testing.T) {
nodeInfo := Info{
Address: "node1",
NetworkID: 0,
Light: false,
}
logger := logging.New(ioutil.Discard)
handshakeService := New(nodeInfo.Address, nodeInfo.NetworkID, logger)
t.Run("OK", func(t *testing.T) {
node2Info := Info{
Address: "node2",
NetworkID: 1,
Light: false,
}
var buffer1 bytes.Buffer
var buffer2 bytes.Buffer
stream1 := &StreamMock{readBuffer: &buffer1, writeBuffer: &buffer2}
stream2 := &StreamMock{readBuffer: &buffer2, writeBuffer: &buffer1}
w, _ := protobuf.NewWriterAndReader(stream2)
if err := w.WriteMsg(&pb.ShakeHand{
Address: node2Info.Address,
NetworkID: node2Info.NetworkID,
Light: node2Info.Light,
}); err != nil {
t.Fatal(err)
}
res, err := handshakeService.Handle(stream1)
if err != nil {
t.Fatal(err)
}
if *res != node2Info {
t.Fatalf("got %+v, expected %+v", res, node2Info)
}
_, r := protobuf.NewWriterAndReader(stream2)
var got pb.ShakeHand
if err := r.ReadMsg(&got); err != nil {
t.Fatal(err)
}
if nodeInfo != Info(got) {
t.Fatalf("got %+v, expected %+v", got, node2Info)
}
})
t.Run("ERROR - read error ", func(t *testing.T) {
testErr := errors.New("test error")
expectedErr := fmt.Errorf("handshake handler read message: %w", testErr)
stream := &StreamMock{readError: testErr}
res, err := handshakeService.Handle(stream)
if err == nil || err.Error() != expectedErr.Error() {
t.Fatal("expected:", expectedErr, "got:", err)
}
if res != nil {
t.Fatal("handle returned non-nil res")
}
})
t.Run("ERROR - write error ", func(t *testing.T) {
testErr := errors.New("test error")
expectedErr := fmt.Errorf("handshake handler write message: %w", testErr)
var buffer bytes.Buffer
stream := &StreamMock{readBuffer: &buffer, writeBuffer: &buffer}
w, _ := protobuf.NewWriterAndReader(stream)
if err := w.WriteMsg(&pb.ShakeHand{
Address: "node1",
NetworkID: 0,
Light: false,
}); err != nil {
t.Fatal(err)
}
stream.writeError = testErr
res, err := handshakeService.Handle(stream)
if err == nil || err.Error() != expectedErr.Error() {
t.Fatal("expected:", expectedErr, "got:", err)
}
if res != nil {
t.Fatal("handshake returned non-nil res")
}
})
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:generate sh -c "protoc -I . -I \"$(go list -f '{{ .Dir }}' -m github.com/gogo/protobuf)/protobuf\" --gogofaster_out=. handshake.proto"
// Package pb holds only Protocol Buffer definitions and generated code.
package pb
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: handshake.proto
package handshake
package pb
import (
fmt "fmt"
......@@ -83,7 +83,7 @@ func (m *ShakeHand) GetLight() bool {
}
func init() {
proto.RegisterType((*ShakeHand)(nil), "handshake.ShakeHand")
proto.RegisterType((*ShakeHand)(nil), "pb.ShakeHand")
}
func init() { proto.RegisterFile("handshake.proto", fileDescriptor_a77305914d5d202f) }
......@@ -91,15 +91,15 @@ func init() { proto.RegisterFile("handshake.proto", fileDescriptor_a77305914d5d2
var fileDescriptor_a77305914d5d202f = []byte{
// 148 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0x48, 0xcc, 0x4b,
0x29, 0xce, 0x48, 0xcc, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0x0b, 0x28,
0x45, 0x72, 0x71, 0x06, 0x83, 0x18, 0x1e, 0x89, 0x79, 0x29, 0x42, 0x12, 0x5c, 0xec, 0x8e, 0x29,
0x29, 0x45, 0xa9, 0xc5, 0xc5, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x30, 0xae, 0x90, 0x0c,
0x17, 0xa7, 0x5f, 0x6a, 0x49, 0x79, 0x7e, 0x51, 0xb6, 0xa7, 0x8b, 0x04, 0x93, 0x02, 0xa3, 0x06,
0x6b, 0x10, 0x42, 0x40, 0x48, 0x84, 0x8b, 0xd5, 0x27, 0x33, 0x3d, 0xa3, 0x44, 0x82, 0x59, 0x81,
0x51, 0x83, 0x23, 0x08, 0xc2, 0x71, 0x92, 0x38, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6,
0x07, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, 0x8e, 0xe1, 0xc6, 0x63, 0x39,
0x86, 0x24, 0x36, 0xb0, 0x33, 0x8c, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0x62, 0x1c, 0xa2, 0x06,
0x99, 0x00, 0x00, 0x00,
0x29, 0xce, 0x48, 0xcc, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52,
0x8a, 0xe4, 0xe2, 0x0c, 0x06, 0x09, 0x79, 0x24, 0xe6, 0xa5, 0x08, 0x49, 0x70, 0xb1, 0x3b, 0xa6,
0xa4, 0x14, 0xa5, 0x16, 0x17, 0x4b, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0xc1, 0xb8, 0x42, 0x32,
0x5c, 0x9c, 0x7e, 0xa9, 0x25, 0xe5, 0xf9, 0x45, 0xd9, 0x9e, 0x2e, 0x12, 0x4c, 0x0a, 0x8c, 0x1a,
0xac, 0x41, 0x08, 0x01, 0x21, 0x11, 0x2e, 0x56, 0x9f, 0xcc, 0xf4, 0x8c, 0x12, 0x09, 0x66, 0x05,
0x46, 0x0d, 0x8e, 0x20, 0x08, 0xc7, 0x49, 0xe2, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18,
0x1f, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5,
0x18, 0x92, 0xd8, 0xc0, 0xf6, 0x1b, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x80, 0x1a, 0x2a, 0xd7,
0x92, 0x00, 0x00, 0x00,
}
func (m *ShakeHand) Marshal() (dAtA []byte, err error) {
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
syntax = "proto3";
package handshake;
package pb;
message ShakeHand {
string Address = 1;
......
......@@ -7,6 +7,7 @@ package libp2p
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
......@@ -216,11 +217,15 @@ func New(ctx context.Context, o Options) (*Service, error) {
peerID := stream.Conn().RemotePeer()
i, err := s.handshakeService.Handle(stream)
if err != nil {
s.logger.Errorf("handshake with peer %s: %w", peerID, err)
s.logger.Errorf("handshake with x %s: %w", peerID, err)
// todo: test connection close and refactor
_ = stream.Conn().Close()
return
}
if i.NetworkID != s.networkID {
s.logger.Errorf("handshake with peer %s: invalid network id %v", peerID, i.NetworkID)
// todo: test connection close and refactor
_ = stream.Conn().Close()
return
}
s.peers.add(peerID, i.Address)
......@@ -260,13 +265,24 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
peerID := stream.Conn().RemotePeer()
overlay, found := s.peers.overlay(peerID)
if !found {
// todo: handle better
// todo: this should never happen, should we disconnect in this case?
// todo: test connection close and refactor
_ = stream.Conn().Close()
s.logger.Errorf("overlay address for peer %q not found", peerID)
return
}
s.metrics.HandledStreamCount.Inc()
ss.Handler(p2p.Peer{Address: overlay}, stream)
if err := ss.Handler(p2p.Peer{Address: overlay}, stream); err != nil {
var e *disconnectError
if errors.Is(err, e) {
// todo: test connection close and refactor
s.peers.remove(peerID)
_ = stream.Conn().Close()
}
s.logger.Errorf("%s: %s/%s: %w", p.Name, ss.Name, ss.Version, err)
}
})
}
return nil
......
......@@ -9,7 +9,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
type metrics struct {
// all metrics fields must be exported
// to be able to return them by Metrics()
......
......@@ -43,3 +43,11 @@ func (r *peerRegistry) overlay(peerID libp2ppeer.ID) (overlay string, found bool
r.mu.RUnlock()
return overlay, found
}
func (r *peerRegistry) remove(peerID libp2ppeer.ID) {
r.mu.Lock()
overlay := r.overlays[peerID]
delete(r.overlays, peerID)
delete(r.peers, overlay)
r.mu.Unlock()
}
......@@ -14,16 +14,32 @@ import (
)
type Recorder struct {
records map[string][]Record
recordsMu sync.Mutex
protocols []p2p.ProtocolSpec
records map[string][]Record
recordsMu sync.Mutex
protocols []p2p.ProtocolSpec
middlewares []p2p.HandlerMiddleware
}
func NewRecorder(protocols ...p2p.ProtocolSpec) *Recorder {
return &Recorder{
records: make(map[string][]Record),
protocols: protocols,
func WithProtocols(protocols ...p2p.ProtocolSpec) Option {
return optionFunc(func(r *Recorder) {
r.protocols = append(r.protocols, protocols...)
})
}
func WithMiddlewares(middlewares ...p2p.HandlerMiddleware) Option {
return optionFunc(func(r *Recorder) {
r.middlewares = append(r.middlewares, middlewares...)
})
}
func NewRecorder(opts ...Option) *Recorder {
r := &Recorder{
records: make(map[string][]Record),
}
for _, o := range opts {
o.apply(r)
}
return r
}
func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamName, version string) (p2p.Stream, error) {
......@@ -32,7 +48,7 @@ func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamNam
streamOut := newStream(recordIn, recordOut)
streamIn := newStream(recordOut, recordIn)
var handler func(p2p.Peer, p2p.Stream)
var handler p2p.HandlerFunc
for _, p := range r.protocols {
if p.Name == protocolName {
for _, s := range p.StreamSpecs {
......@@ -45,7 +61,14 @@ func (r *Recorder) NewStream(_ context.Context, overlay, protocolName, streamNam
if handler == nil {
return nil, fmt.Errorf("unsupported protocol stream %q %q %q", protocolName, streamName, version)
}
go handler(p2p.Peer{Address: overlay}, streamIn)
for _, m := range r.middlewares {
handler = m(handler)
}
go func() {
if err := handler(p2p.Peer{Address: overlay}, streamIn); err != nil {
panic(err) // todo: store error and export error records for inspection
}
}()
id := overlay + p2p.NewSwarmStreamName(protocolName, streamName, version)
......@@ -167,3 +190,10 @@ func (r *record) bytes() []byte {
return r.b
}
type Option interface {
apply(*Recorder)
}
type optionFunc func(*Recorder)
func (f optionFunc) apply(r *Recorder) { f(r) }
......@@ -34,9 +34,13 @@ type ProtocolSpec struct {
type StreamSpec struct {
Name string
Version string
Handler func(Peer, Stream)
Handler HandlerFunc
}
type HandlerFunc func(Peer, Stream) error
type HandlerMiddleware func(HandlerFunc) HandlerFunc
type IncompatibleStreamError struct {
err error
}
......
......@@ -5,9 +5,9 @@
package protobuf
import (
"github.com/ethersphere/bee/pkg/p2p"
ggio "github.com/gogo/protobuf/io"
"github.com/gogo/protobuf/proto"
"github.com/ethersphere/bee/pkg/p2p"
"io"
)
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:generate sh -c "protoc -I . -I \"$(go list -f '{{ .Dir }}' -m github.com/gogo/protobuf)/protobuf\" --gogofaster_out=. pingpong.proto"
// Package pb holds only Protocol Buffer definitions and generated code.
package pb
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: pingpong.proto
package pingpong
package pb
import (
fmt "fmt"
......@@ -111,8 +111,8 @@ func (m *Pong) GetResponse() string {
}
func init() {
proto.RegisterType((*Ping)(nil), "pingpong.Ping")
proto.RegisterType((*Pong)(nil), "pingpong.Pong")
proto.RegisterType((*Ping)(nil), "pb.Ping")
proto.RegisterType((*Pong)(nil), "pb.Pong")
}
func init() { proto.RegisterFile("pingpong.proto", fileDescriptor_1cfbf639ab46154b) }
......@@ -120,13 +120,13 @@ func init() { proto.RegisterFile("pingpong.proto", fileDescriptor_1cfbf639ab4615
var fileDescriptor_1cfbf639ab46154b = []byte{
// 122 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2b, 0xc8, 0xcc, 0x4b,
0x2f, 0xc8, 0xcf, 0x4b, 0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x80, 0xf1, 0x95, 0x94,
0xb8, 0x58, 0x02, 0x32, 0xf3, 0xd2, 0x85, 0xa4, 0xb8, 0x38, 0xdc, 0x8b, 0x52, 0x53, 0x4b, 0x32,
0xf3, 0xd2, 0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, 0xe0, 0x7c, 0xb0, 0x9a, 0x7c, 0x88, 0x9a,
0xa0, 0xd4, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x98, 0x1a, 0x18, 0xdf, 0x49, 0xe2, 0xc4, 0x23,
0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2,
0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, 0x92, 0xd8, 0xc0, 0x56, 0x1a, 0x03, 0x02, 0x00, 0x00,
0xff, 0xff, 0xa8, 0xfc, 0xee, 0x94, 0x84, 0x00, 0x00, 0x00,
0x2f, 0xc8, 0xcf, 0x4b, 0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52, 0x52,
0xe2, 0x62, 0x09, 0xc8, 0xcc, 0x4b, 0x17, 0x92, 0xe2, 0xe2, 0x70, 0x2f, 0x4a, 0x4d, 0x2d, 0xc9,
0xcc, 0x4b, 0x97, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x82, 0xf3, 0xc1, 0x6a, 0xf2, 0x21, 0x6a,
0x82, 0x52, 0x8b, 0x0b, 0xf2, 0xf3, 0x8a, 0x53, 0x61, 0x6a, 0x60, 0x7c, 0x27, 0x89, 0x13, 0x8f,
0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0x71, 0xc2, 0x63, 0x39, 0x86, 0x0b,
0x8f, 0xe5, 0x18, 0x6e, 0x3c, 0x96, 0x63, 0x48, 0x62, 0x03, 0x5b, 0x66, 0x0c, 0x08, 0x00, 0x00,
0xff, 0xff, 0x8c, 0xe4, 0x0f, 0x96, 0x7e, 0x00, 0x00, 0x00,
}
func (m *Ping) Marshal() (dAtA []byte, err error) {
......
......@@ -4,7 +4,7 @@
syntax = "proto3";
package pingpong;
package pb;
message Ping {
string Greeting = 1;
......
......@@ -2,7 +2,6 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:generate sh -c "protoc -I . -I \"$(go list -f '{{ .Dir }}' -m github.com/gogo/protobuf)/protobuf\" --gogofaster_out=. pingpong.proto"
package pingpong
import (
......@@ -13,6 +12,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/pingpong/pb"
)
const (
......@@ -34,7 +34,6 @@ type Options struct {
type Logger interface {
Debugf(format string, args ...interface{})
Errorf(format string, args ...interface{})
}
func New(o Options) *Service {
......@@ -59,6 +58,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
}
func (s *Service) Ping(ctx context.Context, address string, msgs ...string) (rtt time.Duration, err error) {
start := time.Now()
stream, err := s.streamer.NewStream(ctx, address, protocolName, streamName, streamVersion)
if err != nil {
return 0, fmt.Errorf("new stream: %w", err)
......@@ -67,13 +67,12 @@ func (s *Service) Ping(ctx context.Context, address string, msgs ...string) (rtt
w, r := protobuf.NewWriterAndReader(stream)
var pong Pong
start := time.Now()
var pong pb.Pong
for _, msg := range msgs {
if err := w.WriteMsg(&Ping{
if err := w.WriteMsg(&pb.Ping{
Greeting: msg,
}); err != nil {
return 0, fmt.Errorf("stream write: %w", err)
return 0, fmt.Errorf("write message: %w", err)
}
s.metrics.PingSentCount.Inc()
......@@ -81,38 +80,36 @@ func (s *Service) Ping(ctx context.Context, address string, msgs ...string) (rtt
if err == io.EOF {
break
}
return 0, err
return 0, fmt.Errorf("read message: %w", err)
}
s.logger.Debugf("got pong: %q", pong.Response)
s.metrics.PongReceivedCount.Inc()
}
return time.Since(start) / time.Duration(len(msgs)), nil
return time.Since(start), nil
}
func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) {
func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) error {
w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close()
fmt.Printf("Initiate pinpong for peer %s", peer)
var ping Ping
var ping pb.Ping
for {
if err := r.ReadMsg(&ping); err != nil {
if err == io.EOF {
break
}
s.logger.Errorf("pingpong handler: read message: %v\n", err)
return
return fmt.Errorf("read message: %w", err)
}
s.logger.Debugf("got ping: %q", ping.Greeting)
s.metrics.PingReceivedCount.Inc()
if err := w.WriteMsg(&Pong{
if err := w.WriteMsg(&pb.Pong{
Response: "{" + ping.Greeting + "}",
}); err != nil {
s.logger.Errorf("pingpong handler: write message: %v\n", err)
return
return fmt.Errorf("write message: %w", err)
}
s.metrics.PongSentCount.Inc()
}
return nil
}
......@@ -9,15 +9,18 @@ import (
"context"
"fmt"
"io/ioutil"
"runtime"
"testing"
"time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/pingpong/pb"
)
func TestPing(t *testing.T) {
logger := logging.New(ioutil.Discard)
......@@ -27,7 +30,18 @@ func TestPing(t *testing.T) {
})
// setup the stream recorder to record stream data
recorder := mock.NewRecorder(server.Protocol())
recorder := mock.NewRecorder(
mock.WithProtocols(server.Protocol()),
mock.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc {
if runtime.GOOS == "windows" {
// windows has a bit lower time resolution
// so, slow down the handler with a middleware
// not to get 0s for rtt value
time.Sleep(100 * time.Millisecond)
}
return f
}),
)
// create a pingpong client that will do pinging
client := pingpong.New(pingpong.Options{
......@@ -36,13 +50,18 @@ func TestPing(t *testing.T) {
})
// ping
peerID := "/p2p/QmZt98UimwpW9ptJumKTq7B7t3FzNfyoWVNGcd8PFCd7XS"
peerID := "124"
greetings := []string{"hey", "there", "fella"}
_, err := client.Ping(context.Background(), peerID, greetings...)
rtt, err := client.Ping(context.Background(), peerID, greetings...)
if err != nil {
t.Fatal(err)
}
// check that RTT is a sane value
if rtt <= 0 {
t.Errorf("invalid RTT value %v", rtt)
}
// get a record for this stream
records, err := recorder.Records(peerID, "pingpong", "pingpong", "1.0.0")
if err != nil {
......@@ -57,14 +76,14 @@ func TestPing(t *testing.T) {
wantGreetings := greetings
messages, err := protobuf.ReadMessages(
bytes.NewReader(record.In()),
func() protobuf.Message { return new(pingpong.Ping) },
func() protobuf.Message { return new(pb.Ping) },
)
if err != nil {
t.Fatal(err)
}
var gotGreetings []string
for _, m := range messages {
gotGreetings = append(gotGreetings, m.(*pingpong.Ping).Greeting)
gotGreetings = append(gotGreetings, m.(*pb.Ping).Greeting)
}
if fmt.Sprint(gotGreetings) != fmt.Sprint(wantGreetings) {
t.Errorf("got greetings %v, want %v", gotGreetings, wantGreetings)
......@@ -77,14 +96,14 @@ func TestPing(t *testing.T) {
}
messages, err = protobuf.ReadMessages(
bytes.NewReader(record.Out()),
func() protobuf.Message { return new(pingpong.Pong) },
func() protobuf.Message { return new(pb.Pong) },
)
if err != nil {
t.Fatal(err)
}
var gotResponses []string
for _, m := range messages {
gotResponses = append(gotResponses, m.(*pingpong.Pong).Response)
gotResponses = append(gotResponses, m.(*pb.Pong).Response)
}
if fmt.Sprint(gotResponses) != fmt.Sprint(wantResponses) {
t.Errorf("got responses %v, want %v", gotResponses, wantResponses)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment