Commit 12ca1b88 authored by Janoš Guljaš's avatar Janoš Guljaš Committed by GitHub

add p2p stream headers and tracing (#27)

parent 3132c693
......@@ -27,18 +27,21 @@ import (
func (c *command) initStartCmd() (err error) {
const (
optionNameDataDir = "data-dir"
optionNamePassword = "password"
optionNamePasswordFile = "password-file"
optionNameAPIAddr = "api-addr"
optionNameP2PAddr = "p2p-addr"
optionNameP2PDisableWS = "p2p-disable-ws"
optionNameP2PDisableQUIC = "p2p-disable-quic"
optionNameEnableDebugAPI = "enable-debug-api"
optionNameDebugAPIAddr = "debug-api-addr"
optionNameBootnodes = "bootnode"
optionNameNetworkID = "network-id"
optionNameVerbosity = "verbosity"
optionNameDataDir = "data-dir"
optionNamePassword = "password"
optionNamePasswordFile = "password-file"
optionNameAPIAddr = "api-addr"
optionNameP2PAddr = "p2p-addr"
optionNameP2PDisableWS = "p2p-disable-ws"
optionNameP2PDisableQUIC = "p2p-disable-quic"
optionNameEnableDebugAPI = "enable-debug-api"
optionNameDebugAPIAddr = "debug-api-addr"
optionNameBootnodes = "bootnode"
optionNameNetworkID = "network-id"
optionNameTracingEnabled = "tracing"
optionNameTracingEndpoint = "tracing-endpoint"
optionNameTracingServiceName = "tracing-service-name"
optionNameVerbosity = "verbosity"
)
cmd := &cobra.Command{
......@@ -101,8 +104,11 @@ func (c *command) initStartCmd() (err error) {
NetworkID: c.config.GetInt32(optionNameNetworkID),
Logger: logger,
},
Bootnodes: c.config.GetStringSlice(optionNameBootnodes),
Logger: logger,
Bootnodes: c.config.GetStringSlice(optionNameBootnodes),
TracingEnabled: c.config.GetBool(optionNameTracingEnabled),
TracingEndpoint: c.config.GetString(optionNameTracingEndpoint),
TracingServiceName: c.config.GetString(optionNameTracingServiceName),
Logger: logger,
})
if err != nil {
return err
......@@ -158,6 +164,9 @@ func (c *command) initStartCmd() (err error) {
cmd.Flags().Bool(optionNameEnableDebugAPI, false, "enable debug HTTP API")
cmd.Flags().String(optionNameDebugAPIAddr, ":6060", "debug HTTP API listen address")
cmd.Flags().Int32(optionNameNetworkID, 1, "ID of the Swarm network")
cmd.Flags().Bool(optionNameTracingEnabled, false, "enable tracing")
cmd.Flags().String(optionNameTracingEndpoint, "127.0.0.1:6831", "endpoint to send tracing data")
cmd.Flags().String(optionNameTracingServiceName, "bee", "service name identifier for tracing")
cmd.Flags().String(optionNameVerbosity, "info", "log verbosity level 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace")
c.root.AddCommand(cmd)
......
......@@ -17,10 +17,13 @@ require (
github.com/libp2p/go-ws-transport v0.2.0
github.com/multiformats/go-multiaddr v0.2.0
github.com/multiformats/go-multistream v0.1.0
github.com/opentracing/opentracing-go v1.0.2
github.com/prometheus/client_golang v1.3.0
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/spf13/viper v1.6.2
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
golang.org/x/crypto v0.0.0-20191219195013-becbf705a915
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
resenje.org/web v0.4.0
......
This diff is collapsed.
......@@ -10,6 +10,7 @@ import (
"github.com/ethersphere/bee/pkg/logging"
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/tracing"
)
type Service interface {
......@@ -26,6 +27,7 @@ type server struct {
type Options struct {
Pingpong pingpong.Interface
Logger logging.Logger
Tracer *tracing.Tracer
}
func New(o Options) Service {
......
......@@ -22,6 +22,9 @@ func (s *server) pingpongHandler(w http.ResponseWriter, r *http.Request) {
peerID := mux.Vars(r)["peer-id"]
ctx := r.Context()
span, ctx := s.Tracer.StartSpanFromContext(ctx, "pingpong-api")
defer span.Finish()
address, err := swarm.ParseHexAddress(peerID)
if err != nil {
s.Logger.Debugf("pingpong: parse peer address %s: %v", peerID, err)
......
......@@ -82,7 +82,7 @@ func (s *Service) SetPeerAddedHandler(h func(ctx context.Context, addr swarm.Add
}
func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swarm.Address) error {
stream, err := s.streamer.NewStream(ctx, peer, protocolName, protocolVersion, peersStreamName)
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, peersStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
......@@ -110,7 +110,7 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa
return stream.FullClose()
}
func (s *Service) peersHandler(peer p2p.Peer, stream p2p.Stream) error {
func (s *Service) peersHandler(_ context.Context, peer p2p.Peer, stream p2p.Stream) error {
defer stream.Close()
_, r := protobuf.NewWriterAndReader(stream)
......
......@@ -29,6 +29,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/topology/full"
"github.com/ethersphere/bee/pkg/tracing"
ma "github.com/multiformats/go-multiaddr"
)
......@@ -38,27 +39,41 @@ type Bee struct {
apiServer *http.Server
debugAPIServer *http.Server
errorLogWriter *io.PipeWriter
tracerCloser io.Closer
}
type Options struct {
DataDir string
Password string
APIAddr string
DebugAPIAddr string
LibP2POptions libp2p.Options
Bootnodes []string
Logger logging.Logger
DataDir string
Password string
APIAddr string
DebugAPIAddr string
LibP2POptions libp2p.Options
Bootnodes []string
Logger logging.Logger
TracingEnabled bool
TracingEndpoint string
TracingServiceName string
}
func NewBee(o Options) (*Bee, error) {
logger := o.Logger
addressbook := inmem.New()
tracer, tracerCloser, err := tracing.NewTracer(&tracing.Options{
Enabled: o.TracingEnabled,
Endpoint: o.TracingEndpoint,
ServiceName: o.TracingServiceName,
})
if err != nil {
return nil, fmt.Errorf("tracer: %w", err)
}
p2pCtx, p2pCancel := context.WithCancel(context.Background())
b := &Bee{
p2pCancel: p2pCancel,
errorLogWriter: logger.WriterLevel(logrus.ErrorLevel),
tracerCloser: tracerCloser,
}
var keyStore keystore.Service
......@@ -93,6 +108,7 @@ func NewBee(o Options) (*Bee, error) {
libP2POptions.Overlay = address
libP2POptions.PrivateKey = libp2pPrivateKey
libP2POptions.Addressbook = addressbook
libP2POptions.Tracer = tracer
p2ps, err := libp2p.New(p2pCtx, libP2POptions)
if err != nil {
return nil, fmt.Errorf("p2p service: %w", err)
......@@ -116,6 +132,7 @@ func NewBee(o Options) (*Bee, error) {
pingPong := pingpong.New(pingpong.Options{
Streamer: p2ps,
Logger: logger,
Tracer: tracer,
})
if err = p2ps.AddProtocol(pingPong.Protocol()); err != nil {
......@@ -150,6 +167,7 @@ func NewBee(o Options) (*Bee, error) {
apiService = api.New(api.Options{
Pingpong: pingPong,
Logger: logger,
Tracer: tracer,
})
apiListener, err := net.Listen("tcp", o.APIAddr)
if err != nil {
......@@ -243,5 +261,9 @@ func (b *Bee) Shutdown(ctx context.Context) error {
return fmt.Errorf("p2p server: %w", err)
}
if err := b.tracerCloser.Close(); err != nil {
return fmt.Errorf("tracer: %w", err)
}
return b.errorLogWriter.Close()
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package libp2p
import (
"context"
"fmt"
"time"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p/internal/headers/pb"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
)
func sendHeaders(ctx context.Context, headers p2p.Headers, stream *stream) error {
w, r := protobuf.NewWriterAndReader(stream)
if err := w.WriteMsgWithContext(ctx, headersP2PToPB(headers)); err != nil {
return fmt.Errorf("write message: %w", err)
}
h := new(pb.Headers)
if err := r.ReadMsgWithContext(ctx, h); err != nil {
return fmt.Errorf("read message: %w", err)
}
stream.headers = headersPBToP2P(h)
return nil
}
func handleHeaders(headler p2p.HeadlerFunc, stream *stream) error {
w, r := protobuf.NewWriterAndReader(stream)
headers := new(pb.Headers)
if err := r.ReadMsgWithTimeout(time.Second, headers); err != nil {
return fmt.Errorf("read message: %w", err)
}
stream.headers = headersPBToP2P(headers)
var h p2p.Headers
if headler != nil {
h = headler(stream.headers)
}
if err := w.WriteMsgWithTimeout(time.Second, headersP2PToPB(h)); err != nil {
return fmt.Errorf("write message: %w", err)
}
return nil
}
func headersPBToP2P(h *pb.Headers) p2p.Headers {
p2ph := make(p2p.Headers)
for _, rh := range h.Headers {
p2ph[rh.Key] = rh.Value
}
return p2ph
}
func headersP2PToPB(h p2p.Headers) *pb.Headers {
pbh := new(pb.Headers)
pbh.Headers = make([]*pb.Header, 0)
for key, value := range h {
pbh.Headers = append(pbh.Headers, &pb.Header{
Key: key,
Value: value,
})
}
return pbh
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package libp2p_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
)
func TestHeaders(t *testing.T) {
headers := p2p.Headers{
"test-header-key": []byte("header-value"),
"other-key": []byte("other-value"),
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1, cleanup1 := newService(t, libp2p.Options{})
defer cleanup1()
s2, overlay2, cleanup2 := newService(t, libp2p.Options{})
defer cleanup2()
var gotHeaders p2p.Headers
handled := make(chan struct{})
if err := s1.AddProtocol(newTestProtocol(func(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
if ctx == nil {
t.Fatal("missing context")
}
if !p.Address.Equal(overlay2) {
t.Fatalf("got peer %v, want %v", p.Address, overlay2)
}
gotHeaders = stream.Headers()
close(handled)
return nil
})); err != nil {
t.Fatal(err)
}
addr := serviceUnderlayAddress(t, s1)
if _, err := s2.Connect(ctx, addr); err != nil {
t.Fatal(err)
}
stream, err := s2.NewStream(ctx, overlay1, headers, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
t.Fatal(err)
}
defer stream.Close()
select {
case <-handled:
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for handler")
}
if fmt.Sprint(gotHeaders) != fmt.Sprint(headers) {
t.Errorf("got headers %+v, want %+v", gotHeaders, headers)
}
}
func TestHeaders_empty(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1, cleanup1 := newService(t, libp2p.Options{})
defer cleanup1()
s2, overlay2, cleanup2 := newService(t, libp2p.Options{})
defer cleanup2()
var gotHeaders p2p.Headers
handled := make(chan struct{})
if err := s1.AddProtocol(newTestProtocol(func(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
if ctx == nil {
t.Fatal("missing context")
}
if !p.Address.Equal(overlay2) {
t.Fatalf("got peer %v, want %v", p.Address, overlay2)
}
gotHeaders = stream.Headers()
close(handled)
return nil
})); err != nil {
t.Fatal(err)
}
addr := serviceUnderlayAddress(t, s1)
if _, err := s2.Connect(ctx, addr); err != nil {
t.Fatal(err)
}
stream, err := s2.NewStream(ctx, overlay1, nil, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
t.Fatal(err)
}
defer stream.Close()
select {
case <-handled:
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for handler")
}
if len(gotHeaders) != 0 {
t.Errorf("got headers %+v, want none", gotHeaders)
}
}
func TestHeadler(t *testing.T) {
receivedHeaders := p2p.Headers{
"test-header-key": []byte("header-value"),
"other-key": []byte("other-value"),
}
sentHeaders := p2p.Headers{
"sent-header-key": []byte("sent-value"),
"other-sent-key": []byte("other-sent-value"),
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s1, overlay1, cleanup1 := newService(t, libp2p.Options{})
defer cleanup1()
s2, _, cleanup2 := newService(t, libp2p.Options{})
defer cleanup2()
var gotReceivedHeaders p2p.Headers
handled := make(chan struct{})
if err := s1.AddProtocol(p2p.ProtocolSpec{
Name: testProtocolName,
Version: testProtocolVersion,
StreamSpecs: []p2p.StreamSpec{
{
Name: testStreamName,
Handler: func(_ context.Context, _ p2p.Peer, stream p2p.Stream) error {
return nil
},
Headler: func(headers p2p.Headers) p2p.Headers {
defer close(handled)
gotReceivedHeaders = headers
return sentHeaders
},
},
},
}); err != nil {
t.Fatal(err)
}
addr := serviceUnderlayAddress(t, s1)
if _, err := s2.Connect(ctx, addr); err != nil {
t.Fatal(err)
}
stream, err := s2.NewStream(ctx, overlay1, receivedHeaders, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
t.Fatal(err)
}
defer stream.Close()
select {
case <-handled:
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for handler")
}
if fmt.Sprint(gotReceivedHeaders) != fmt.Sprint(receivedHeaders) {
t.Errorf("got received headers %+v, want %+v", gotReceivedHeaders, receivedHeaders)
}
gotSentHeaders := stream.Headers()
if fmt.Sprint(gotSentHeaders) != fmt.Sprint(sentHeaders) {
t.Errorf("got sent headers %+v, want %+v", gotSentHeaders, sentHeaders)
}
}
......@@ -4,7 +4,11 @@
package mock
import "bytes"
import (
"bytes"
"github.com/ethersphere/bee/pkg/p2p"
)
type Stream struct {
readBuffer *bytes.Buffer
......@@ -48,6 +52,10 @@ func (s *Stream) Write(p []byte) (n int, err error) {
return s.writeBuffer.Write(p)
}
func (s *Stream) Headers() p2p.Headers {
return nil
}
func (s *Stream) Close() error {
return nil
}
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:generate sh -c "protoc -I . -I \"$(go list -f '{{ .Dir }}' -m github.com/gogo/protobuf)/protobuf\" --gogofaster_out=. headers.proto"
// Package pb holds only Protocol Buffer definitions and generated code.
package pb
This diff is collapsed.
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
syntax = "proto3";
package pb;
message Headers {
repeated Header headers = 1;
}
message Header {
string key = 1;
bytes value = 2;
}
\ No newline at end of file
......@@ -16,6 +16,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p"
handshake "github.com/ethersphere/bee/pkg/p2p/libp2p/internal/handshake"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat-svc"
crypto "github.com/libp2p/go-libp2p-core/crypto"
......@@ -36,6 +37,7 @@ import (
var _ p2p.Service = (*Service)(nil)
type Service struct {
ctx context.Context
host host.Host
libp2pPeerstore peerstore.Peerstore
metrics metrics
......@@ -45,6 +47,7 @@ type Service struct {
peers *peerRegistry
peerHandler func(context.Context, swarm.Address) error
logger logging.Logger
tracer *tracing.Tracer
}
type Options struct {
......@@ -56,6 +59,7 @@ type Options struct {
NetworkID int32
Addressbook addressbook.Putter
Logger logging.Logger
Tracer *tracing.Tracer
}
func New(ctx context.Context, o Options) (*Service, error) {
......@@ -149,6 +153,7 @@ func New(ctx context.Context, o Options) (*Service, error) {
peerRegistry := newPeerRegistry()
s := &Service{
ctx: ctx,
host: h,
libp2pPeerstore: libp2pPeerstore,
metrics: newMetrics(),
......@@ -157,6 +162,7 @@ func New(ctx context.Context, o Options) (*Service, error) {
peers: peerRegistry,
addrssbook: o.Addressbook,
logger: o.Logger,
tracer: o.Tracer,
}
// Construct protocols.
......@@ -215,8 +221,8 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
return fmt.Errorf("protocol version match %s: %w", id, err)
}
s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) {
peerID := stream.Conn().RemotePeer()
s.host.SetStreamHandlerMatch(id, matcher, func(streamlibp2p network.Stream) {
peerID := streamlibp2p.Conn().RemotePeer()
overlay, found := s.peers.overlay(peerID)
if !found {
// todo: this should never happen, should we disconnect in this case?
......@@ -226,15 +232,31 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
return
}
stream := newStream(streamlibp2p)
// exchange headers
if err := handleHeaders(ss.Headler, stream); err != nil {
s.logger.Debugf("handle protocol %s/%s: stream %s: peer %s: handle headers: %v", p.Name, p.Version, ss.Name, overlay, err)
return
}
// tracing: get span tracing context and add it to the context
// silently ignore if the peer is not providing tracing
ctx, err := s.tracer.WithContextFromHeaders(s.ctx, stream.Headers())
if err != nil && !errors.Is(err, tracing.ErrContextNotFound) {
s.logger.Debugf("handle protocol %s/%s: stream %s: peer %s: get tracing context: %v", p.Name, p.Version, ss.Name, overlay, err)
return
}
s.metrics.HandledStreamCount.Inc()
if err := ss.Handler(p2p.Peer{Address: overlay}, newStream(stream)); err != nil {
if err := ss.Handler(ctx, p2p.Peer{Address: overlay}, stream); err != nil {
var e *p2p.DisconnectError
if errors.Is(err, e) {
// todo: test connection close and refactor
_ = s.Disconnect(overlay)
}
s.logger.Debugf("handle protocol %s/%s: stream %s: peer %s: %w", p.Name, p.Version, ss.Name, overlay, err)
s.logger.Debugf("handle protocol %s/%s: stream %s: peer %s: %v", p.Name, p.Version, ss.Name, overlay, err)
}
})
}
......@@ -313,18 +335,33 @@ func (s *Service) SetPeerAddedHandler(h func(context.Context, swarm.Address) err
s.peerHandler = h
}
func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, protocolName, protocolVersion, streamName string) (p2p.Stream, error) {
func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, headers p2p.Headers, protocolName, protocolVersion, streamName string) (p2p.Stream, error) {
peerID, found := s.peers.peerID(overlay)
if !found {
return nil, p2p.ErrPeerNotFound
}
stream, err := s.newStreamForPeerID(ctx, peerID, protocolName, protocolVersion, streamName)
streamlibp2p, err := s.newStreamForPeerID(ctx, peerID, protocolName, protocolVersion, streamName)
if err != nil {
return nil, err
}
return newStream(stream), nil
stream := newStream(streamlibp2p)
// tracing: add span context header
if headers == nil {
headers = make(p2p.Headers)
}
if err := s.tracer.AddContextHeader(ctx, headers); err != nil && !errors.Is(err, tracing.ErrContextNotFound) {
return nil, err
}
// exchange headers
if err := sendHeaders(ctx, headers, stream); err != nil {
return nil, fmt.Errorf("send headers: %w", err)
}
return stream, nil
}
func (s *Service) newStreamForPeerID(ctx context.Context, peerID libp2ppeer.ID, protocolName, protocolVersion, streamName string) (network.Stream, error) {
......
......@@ -24,7 +24,7 @@ func TestNewStream(t *testing.T) {
s2, _, cleanup2 := newService(t, libp2p.Options{NetworkID: 1})
defer cleanup2()
if err := s1.AddProtocol(newTestProtocol(func(p p2p.Peer, stream p2p.Stream) error {
if err := s1.AddProtocol(newTestProtocol(func(_ context.Context, _ p2p.Peer, _ p2p.Stream) error {
return nil
})); err != nil {
t.Fatal(err)
......@@ -36,7 +36,7 @@ func TestNewStream(t *testing.T) {
t.Fatal(err)
}
stream, err := s2.NewStream(ctx, overlay1, testProtocolName, testProtocolVersion, testStreamName)
stream, err := s2.NewStream(ctx, overlay1, nil, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
t.Fatal(err)
}
......@@ -63,22 +63,22 @@ func TestNewStream_errNotSupported(t *testing.T) {
}
// test for missing protocol
_, err := s2.NewStream(ctx, overlay1, testProtocolName, testProtocolVersion, testStreamName)
_, err := s2.NewStream(ctx, overlay1, nil, testProtocolName, testProtocolVersion, testStreamName)
expectErrNotSupported(t, err)
// add protocol
if err := s1.AddProtocol(newTestProtocol(func(_ p2p.Peer, _ p2p.Stream) error {
if err := s1.AddProtocol(newTestProtocol(func(_ context.Context, _ p2p.Peer, _ p2p.Stream) error {
return nil
})); err != nil {
t.Fatal(err)
}
// test for incorrect protocol name
_, err = s2.NewStream(ctx, overlay1, testProtocolName+"invalid", testProtocolVersion, testStreamName)
_, err = s2.NewStream(ctx, overlay1, nil, testProtocolName+"invalid", testProtocolVersion, testStreamName)
expectErrNotSupported(t, err)
// test for incorrect stream name
_, err = s2.NewStream(ctx, overlay1, testProtocolName, testProtocolVersion, testStreamName+"invalid")
_, err = s2.NewStream(ctx, overlay1, nil, testProtocolName, testProtocolVersion, testStreamName+"invalid")
expectErrNotSupported(t, err)
}
......@@ -98,7 +98,7 @@ func TestNewStream_semanticVersioning(t *testing.T) {
t.Fatal(err)
}
if err := s1.AddProtocol(newTestProtocol(func(_ p2p.Peer, _ p2p.Stream) error {
if err := s1.AddProtocol(newTestProtocol(func(_ context.Context, _ p2p.Peer, _ p2p.Stream) error {
return nil
})); err != nil {
t.Fatal(err)
......@@ -132,7 +132,7 @@ func TestNewStream_semanticVersioning(t *testing.T) {
{version: "2.4.0", supported: false},
{version: "3.0.0", supported: false},
} {
_, err := s2.NewStream(ctx, overlay1, testProtocolName, tc.version, testStreamName)
_, err := s2.NewStream(ctx, overlay1, nil, testProtocolName, tc.version, testStreamName)
if tc.supported {
if err != nil {
t.Fatal(err)
......
......@@ -5,18 +5,24 @@
package libp2p
import (
"github.com/ethersphere/bee/pkg/p2p"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/network"
)
type stream struct {
network.Stream
}
func (s *stream) FullClose() error {
return helpers.FullClose(s)
headers map[string][]byte
}
func newStream(s network.Stream) *stream {
return &stream{Stream: s}
}
func (s *stream) Headers() p2p.Headers {
return s.headers
}
func (s *stream) FullClose() error {
return helpers.FullClose(s)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package libp2p_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/tracing"
)
func TestTracing(t *testing.T) {
tracer1, closer1, err := tracing.NewTracer(&tracing.Options{
Enabled: true,
ServiceName: "bee-test",
})
if err != nil {
t.Fatal(err)
}
defer closer1.Close()
tracer2, closer2, err := tracing.NewTracer(&tracing.Options{
Enabled: true,
ServiceName: "bee-test",
})
if err != nil {
t.Fatal(err)
}
defer closer2.Close()
s1, overlay1, cleanup1 := newService(t, libp2p.Options{})
defer cleanup1()
s2, _, cleanup2 := newService(t, libp2p.Options{})
defer cleanup2()
var handledTracingSpan string
handled := make(chan struct{})
if err := s1.AddProtocol(newTestProtocol(func(ctx context.Context, _ p2p.Peer, _ p2p.Stream) error {
span, _ := tracer1.StartSpanFromContext(ctx, "test-p2p-handler")
defer span.Finish()
handledTracingSpan = fmt.Sprint(span.Context())
close(handled)
return nil
})); err != nil {
t.Fatal(err)
}
addr := serviceUnderlayAddress(t, s1)
connectContext, connectCancel := context.WithCancel(context.Background())
defer connectCancel()
if _, err := s2.Connect(connectContext, addr); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
span, ctx := tracer2.StartSpanFromContext(ctx, "test-p2p-client")
defer span.Finish()
if fmt.Sprint(span.Context()) == "" {
t.Error("not tracing span context to send")
}
stream, err := s2.NewStream(ctx, overlay1, nil, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
t.Fatal(err)
}
defer stream.Close()
select {
case <-handled:
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for handler")
}
if handledTracingSpan == "" {
t.Error("got not tracing span context in handler")
}
}
......@@ -12,6 +12,7 @@ import (
ma "github.com/multiformats/go-multiaddr"
)
// Service provides methods to handle p2p Peers and Protocols.
type Service interface {
AddProtocol(ProtocolSpec) error
Connect(ctx context.Context, addr ma.Multiaddr) (overlay swarm.Address, err error)
......@@ -20,35 +21,58 @@ type Service interface {
SetPeerAddedHandler(func(context.Context, swarm.Address) error)
}
// Streamer is able to create a new Stream.
type Streamer interface {
NewStream(ctx context.Context, address swarm.Address, protocol, version, stream string) (Stream, error)
NewStream(ctx context.Context, address swarm.Address, h Headers, protocol, version, stream string) (Stream, error)
}
// Stream represent a bidirectional data Stream.
type Stream interface {
io.ReadWriter
io.Closer
Headers() Headers
FullClose() error
}
// ProtocolSpec defines a collection of Stream specifications with handlers.
type ProtocolSpec struct {
Name string
Version string
StreamSpecs []StreamSpec
}
// StreamSpec defines a Stream handling within the protocol.
type StreamSpec struct {
Name string
Handler HandlerFunc
Headler HeadlerFunc
}
// Peer holds information about a Peer.
type Peer struct {
Address swarm.Address
}
type HandlerFunc func(Peer, Stream) error
// HandlerFunc handles a received Stream from a Peer.
type HandlerFunc func(context.Context, Peer, Stream) error
// HandlerMiddleware decorates a HandlerFunc by returning a new one.
type HandlerMiddleware func(HandlerFunc) HandlerFunc
// HeadlerFunc is returning response headers based on the received request
// headers.
type HeadlerFunc func(Headers) Headers
// Headers represents a collection of p2p header key value pairs.
type Headers map[string][]byte
// Common header names.
const (
HeaderNameTracingSpanContext = "tracing-span-context"
)
// NewSwarmStreamName constructs a libp2p compatible stream name out of
// protocol name and version and stream name.
func NewSwarmStreamName(protocol, version, stream string) string {
return "/swarm/" + protocol + "/" + version + "/" + stream
}
......@@ -11,6 +11,7 @@ import (
"testing"
"time"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/p2p/protobuf/internal/pb"
)
......@@ -359,6 +360,10 @@ func (noopWriteCloser) Write(p []byte) (n int, err error) {
return 0, nil
}
func (noopWriteCloser) Headers() p2p.Headers {
return nil
}
func (noopWriteCloser) Close() error {
return nil
}
......@@ -379,6 +384,10 @@ func (noopReadCloser) Read(p []byte) (n int, err error) {
return 0, nil
}
func (noopReadCloser) Headers() p2p.Headers {
return nil
}
func (noopReadCloser) Close() error {
return nil
}
......
......@@ -52,7 +52,7 @@ func New(opts ...Option) *Recorder {
return r
}
func (r *Recorder) NewStream(_ context.Context, addr swarm.Address, protocolName, protocolVersion, streamName string) (p2p.Stream, error) {
func (r *Recorder) NewStream(ctx context.Context, addr swarm.Address, h p2p.Headers, protocolName, protocolVersion, streamName string) (p2p.Stream, error) {
recordIn := newRecord()
recordOut := newRecord()
closedIn := make(chan struct{})
......@@ -61,11 +61,13 @@ func (r *Recorder) NewStream(_ context.Context, addr swarm.Address, protocolName
streamIn := newStream(recordOut, recordIn, closedOut, closedIn)
var handler p2p.HandlerFunc
var headler p2p.HeadlerFunc
for _, p := range r.protocols {
if p.Name == protocolName && p.Version == protocolVersion {
for _, s := range p.StreamSpecs {
if s.Name == streamName {
handler = s.Handler
headler = s.Headler
}
}
}
......@@ -76,9 +78,12 @@ func (r *Recorder) NewStream(_ context.Context, addr swarm.Address, protocolName
for i := len(r.middlewares) - 1; i >= 0; i-- {
handler = r.middlewares[i](handler)
}
if headler != nil {
streamOut.headers = headler(h)
}
record := &Record{in: recordIn, out: recordOut}
go func() {
err := handler(p2p.Peer{Address: addr}, streamIn)
err := handler(ctx, p2p.Peer{Address: addr}, streamIn)
if err != nil && err != io.EOF {
record.setErr(err)
}
......@@ -138,6 +143,7 @@ func (r *Record) setErr(err error) {
type stream struct {
in io.WriteCloser
out io.ReadCloser
headers p2p.Headers
cin chan struct{}
cout chan struct{}
closeOnce sync.Once
......@@ -155,6 +161,10 @@ func (s *stream) Write(p []byte) (int, error) {
return s.in.Write(p)
}
func (s *stream) Headers() p2p.Headers {
return s.headers
}
func (s *stream) Close() error {
var e error
s.closeOnce.Do(func() {
......
......@@ -31,7 +31,7 @@ func TestRecorder(t *testing.T) {
recorder := streamtest.New(
streamtest.WithProtocols(
newTestProtocol(func(peer p2p.Peer, stream p2p.Stream) error {
newTestProtocol(func(_ context.Context, peer p2p.Peer, stream p2p.Stream) error {
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
for {
q, err := rw.ReadString('\n')
......@@ -55,7 +55,7 @@ func TestRecorder(t *testing.T) {
)
ask := func(ctx context.Context, s p2p.Streamer, address swarm.Address, questions ...string) (answers []string, err error) {
stream, err := s.NewStream(ctx, address, testProtocolName, testProtocolVersion, testStreamName)
stream, err := s.NewStream(ctx, address, nil, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
return nil, fmt.Errorf("new stream: %w", err)
}
......@@ -115,7 +115,7 @@ func TestRecorder(t *testing.T) {
func TestRecorder_errStreamNotSupported(t *testing.T) {
r := streamtest.New()
_, err := r.NewStream(context.Background(), swarm.ZeroAddress, "testing", "messages", "1.0.1")
_, err := r.NewStream(context.Background(), swarm.ZeroAddress, nil, "testing", "messages", "1.0.1")
if !errors.Is(err, streamtest.ErrStreamNotSupported) {
t.Fatalf("got error %v, want %v", err, streamtest.ErrStreamNotSupported)
}
......@@ -124,7 +124,7 @@ func TestRecorder_errStreamNotSupported(t *testing.T) {
func TestRecorder_fullcloseWithRemoteClose(t *testing.T) {
recorder := streamtest.New(
streamtest.WithProtocols(
newTestProtocol(func(peer p2p.Peer, stream p2p.Stream) error {
newTestProtocol(func(_ context.Context, peer p2p.Peer, stream p2p.Stream) error {
defer stream.Close()
_, err := bufio.NewReader(stream).ReadString('\n')
return err
......@@ -133,7 +133,7 @@ func TestRecorder_fullcloseWithRemoteClose(t *testing.T) {
)
request := func(ctx context.Context, s p2p.Streamer, address swarm.Address) (err error) {
stream, err := s.NewStream(ctx, address, testProtocolName, testProtocolVersion, testStreamName)
stream, err := s.NewStream(ctx, address, nil, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
......@@ -171,7 +171,7 @@ func TestRecorder_fullcloseWithoutRemoteClose(t *testing.T) {
defer streamtest.ResetFullCloseTimeout()
recorder := streamtest.New(
streamtest.WithProtocols(
newTestProtocol(func(peer p2p.Peer, stream p2p.Stream) error {
newTestProtocol(func(_ context.Context, peer p2p.Peer, stream p2p.Stream) error {
// don't close the stream here to initiate timeout
// just try to read the message that it terminated with
// a new line character
......@@ -182,7 +182,7 @@ func TestRecorder_fullcloseWithoutRemoteClose(t *testing.T) {
)
request := func(ctx context.Context, s p2p.Streamer, address swarm.Address) (err error) {
stream, err := s.NewStream(ctx, address, testProtocolName, testProtocolVersion, testStreamName)
stream, err := s.NewStream(ctx, address, nil, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
......@@ -218,7 +218,7 @@ func TestRecorder_fullcloseWithoutRemoteClose(t *testing.T) {
func TestRecorder_multipleParallelFullCloseAndClose(t *testing.T) {
recorder := streamtest.New(
streamtest.WithProtocols(
newTestProtocol(func(peer p2p.Peer, stream p2p.Stream) error {
newTestProtocol(func(_ context.Context, peer p2p.Peer, stream p2p.Stream) error {
if _, err := bufio.NewReader(stream).ReadString('\n'); err != nil {
return err
}
......@@ -237,7 +237,7 @@ func TestRecorder_multipleParallelFullCloseAndClose(t *testing.T) {
)
request := func(ctx context.Context, s p2p.Streamer, address swarm.Address) (err error) {
stream, err := s.NewStream(ctx, address, testProtocolName, testProtocolVersion, testStreamName)
stream, err := s.NewStream(ctx, address, nil, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
......@@ -281,7 +281,7 @@ func TestRecorder_multipleParallelFullCloseAndClose(t *testing.T) {
func TestRecorder_closeAfterPartialWrite(t *testing.T) {
recorder := streamtest.New(
streamtest.WithProtocols(
newTestProtocol(func(peer p2p.Peer, stream p2p.Stream) error {
newTestProtocol(func(_ context.Context, peer p2p.Peer, stream p2p.Stream) error {
// just try to read the message that it terminated with
// a new line character
_, err := bufio.NewReader(stream).ReadString('\n')
......@@ -291,7 +291,7 @@ func TestRecorder_closeAfterPartialWrite(t *testing.T) {
)
request := func(ctx context.Context, s p2p.Streamer, address swarm.Address) (err error) {
stream, err := s.NewStream(ctx, address, testProtocolName, testProtocolVersion, testStreamName)
stream, err := s.NewStream(ctx, address, nil, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
......@@ -343,7 +343,7 @@ func TestRecorder_closeAfterPartialWrite(t *testing.T) {
func TestRecorder_withMiddlewares(t *testing.T) {
recorder := streamtest.New(
streamtest.WithProtocols(
newTestProtocol(func(peer p2p.Peer, stream p2p.Stream) error {
newTestProtocol(func(_ context.Context, peer p2p.Peer, stream p2p.Stream) error {
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
if _, err := rw.ReadString('\n'); err != nil {
......@@ -362,8 +362,8 @@ func TestRecorder_withMiddlewares(t *testing.T) {
),
streamtest.WithMiddlewares(
func(h p2p.HandlerFunc) p2p.HandlerFunc {
return func(peer p2p.Peer, stream p2p.Stream) error {
if err := h(peer, stream); err != nil {
return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
if err := h(ctx, peer, stream); err != nil {
return err
}
// close stream after all previous middlewares wrote to it
......@@ -372,11 +372,11 @@ func TestRecorder_withMiddlewares(t *testing.T) {
}
},
func(h p2p.HandlerFunc) p2p.HandlerFunc {
return func(peer p2p.Peer, stream p2p.Stream) error {
return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
if _, err := stream.Write([]byte("pre 1, ")); err != nil {
return err
}
if err := h(peer, stream); err != nil {
if err := h(ctx, peer, stream); err != nil {
return err
}
if _, err := stream.Write([]byte("post 1, ")); err != nil {
......@@ -386,11 +386,11 @@ func TestRecorder_withMiddlewares(t *testing.T) {
}
},
func(h p2p.HandlerFunc) p2p.HandlerFunc {
return func(peer p2p.Peer, stream p2p.Stream) error {
return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
if _, err := stream.Write([]byte("pre 2, ")); err != nil {
return err
}
if err := h(peer, stream); err != nil {
if err := h(ctx, peer, stream); err != nil {
return err
}
if _, err := stream.Write([]byte("post 2, ")); err != nil {
......@@ -402,11 +402,11 @@ func TestRecorder_withMiddlewares(t *testing.T) {
),
streamtest.WithMiddlewares(
func(h p2p.HandlerFunc) p2p.HandlerFunc {
return func(peer p2p.Peer, stream p2p.Stream) error {
return func(ctx context.Context, peer p2p.Peer, stream p2p.Stream) error {
if _, err := stream.Write([]byte("pre 3, ")); err != nil {
return err
}
if err := h(peer, stream); err != nil {
if err := h(ctx, peer, stream); err != nil {
return err
}
if _, err := stream.Write([]byte("post 3, ")); err != nil {
......@@ -419,7 +419,7 @@ func TestRecorder_withMiddlewares(t *testing.T) {
)
request := func(ctx context.Context, s p2p.Streamer, address swarm.Address) error {
stream, err := s.NewStream(ctx, address, testProtocolName, testProtocolVersion, testStreamName)
stream, err := s.NewStream(ctx, address, nil, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
......@@ -460,7 +460,7 @@ func TestRecorder_recordErr(t *testing.T) {
recorder := streamtest.New(
streamtest.WithProtocols(
newTestProtocol(func(peer p2p.Peer, stream p2p.Stream) error {
newTestProtocol(func(_ context.Context, peer p2p.Peer, stream p2p.Stream) error {
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
defer stream.Close()
......@@ -481,7 +481,7 @@ func TestRecorder_recordErr(t *testing.T) {
)
request := func(ctx context.Context, s p2p.Streamer, address swarm.Address) (err error) {
stream, err := s.NewStream(ctx, address, testProtocolName, testProtocolVersion, testStreamName)
stream, err := s.NewStream(ctx, address, nil, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
......
......@@ -15,6 +15,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/pingpong/pb"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tracing"
)
const (
......@@ -30,18 +31,21 @@ type Interface interface {
type Service struct {
streamer p2p.Streamer
logger logging.Logger
tracer *tracing.Tracer
metrics metrics
}
type Options struct {
Streamer p2p.Streamer
Logger logging.Logger
Tracer *tracing.Tracer
}
func New(o Options) *Service {
return &Service{
streamer: o.Streamer,
logger: o.Logger,
tracer: o.Tracer,
metrics: newMetrics(),
}
}
......@@ -53,15 +57,18 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
StreamSpecs: []p2p.StreamSpec{
{
Name: streamName,
Handler: s.Handler,
Handler: s.handler,
},
},
}
}
func (s *Service) Ping(ctx context.Context, address swarm.Address, msgs ...string) (rtt time.Duration, err error) {
span, ctx := s.tracer.StartSpanFromContext(ctx, "pingpong-p2p-ping")
defer span.Finish()
start := time.Now()
stream, err := s.streamer.NewStream(ctx, address, protocolName, protocolVersion, streamName)
stream, err := s.streamer.NewStream(ctx, address, nil, protocolName, protocolVersion, streamName)
if err != nil {
return 0, fmt.Errorf("new stream: %w", err)
}
......@@ -91,10 +98,13 @@ func (s *Service) Ping(ctx context.Context, address swarm.Address, msgs ...strin
return time.Since(start), nil
}
func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) error {
func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close()
span, ctx := s.tracer.StartSpanFromContext(ctx, "pingpong-p2p-handler")
defer span.Finish()
var ping pb.Ping
for {
if err := r.ReadMsg(&ping); err != nil {
......@@ -106,7 +116,7 @@ func (s *Service) Handler(peer p2p.Peer, stream p2p.Stream) error {
s.logger.Tracef("got ping: %q", ping.Greeting)
s.metrics.PingReceivedCount.Inc()
if err := w.WriteMsg(&pb.Pong{
if err := w.WriteMsgWithContext(ctx, &pb.Pong{
Response: "{" + ping.Greeting + "}",
}); err != nil {
return fmt.Errorf("write message: %w", err)
......
......@@ -56,7 +56,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
StreamSpecs: []p2p.StreamSpec{
{
Name: streamName,
Handler: s.Handler,
Handler: s.handler,
},
},
}
......@@ -67,7 +67,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (data [
if err != nil {
return nil, err
}
stream, err := s.streamer.NewStream(ctx, peerID, protocolName, protocolVersion, streamName)
stream, err := s.streamer.NewStream(ctx, peerID, nil, protocolName, protocolVersion, streamName)
if err != nil {
return nil, fmt.Errorf("new stream: %w", err)
}
......@@ -89,7 +89,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (data [
return d.Data, nil
}
func (s *Service) Handler(p p2p.Peer, stream p2p.Stream) error {
func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close()
var req pb.Request
......@@ -102,7 +102,7 @@ func (s *Service) Handler(p p2p.Peer, stream p2p.Stream) error {
return err
}
if err := w.WriteMsg(&pb.Delivery{
if err := w.WriteMsgWithContext(ctx, &pb.Delivery{
Data: data,
}); err != nil {
return err
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tracing
import (
"bufio"
"bytes"
"context"
"errors"
"io"
"time"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
)
var (
// ErrContextNotFound is returned when tracing context is not present
// in p2p Headers or context.
ErrContextNotFound = errors.New("tracing context not found")
// contextKey is used to reference a tracing context span as context value.
contextKey = struct{}{}
// noopTracer is the tracer that does nothing to handle a nil Tracer usage.
noopTracer = &Tracer{tracer: new(opentracing.NoopTracer)}
)
// Tracer connect to a tracing server and handles tracing spans and contexts
// by using opentracing Tracer.
type Tracer struct {
tracer opentracing.Tracer
}
// Options are optional parameters for Tracer constructor.
type Options struct {
Enabled bool
Endpoint string
ServiceName string
}
// NewTracer creates a new Tracer and returns a closer which needs to be closed
// when the Tracer is no longer used to flush remaining traces.
func NewTracer(o *Options) (*Tracer, io.Closer, error) {
if o == nil {
o = new(Options)
}
cfg := config.Configuration{
Disabled: !o.Enabled,
ServiceName: o.ServiceName,
Sampler: &config.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
BufferFlushInterval: 1 * time.Second,
LocalAgentHostPort: o.Endpoint,
},
}
t, closer, err := cfg.NewTracer()
if err != nil {
return nil, nil, err
}
return &Tracer{tracer: t}, closer, nil
}
// StartSpanFromContext starts a new tracing span that is either a root one or a
// child of existing one from the provided Context.
func (t *Tracer) StartSpanFromContext(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) {
if t == nil {
t = noopTracer
}
var span opentracing.Span
if parentContext := FromContext(ctx); parentContext != nil {
opts = append(opts, opentracing.ChildOf(parentContext))
span = t.tracer.StartSpan(operationName, opts...)
} else {
span = t.tracer.StartSpan(operationName, opts...)
}
return span, WithContext(ctx, span.Context())
}
// AddContextHeader adds a tracing span context to provided p2p Headers from
// the go context. If the tracing span context is not present in go context,
// ErrContextNotFound is returned.
func (t *Tracer) AddContextHeader(ctx context.Context, headers p2p.Headers) error {
if t == nil {
t = noopTracer
}
c := FromContext(ctx)
if c == nil {
return ErrContextNotFound
}
var b bytes.Buffer
w := bufio.NewWriter(&b)
if err := t.tracer.Inject(c, opentracing.Binary, w); err != nil {
return err
}
if err := w.Flush(); err != nil {
return err
}
headers[p2p.HeaderNameTracingSpanContext] = b.Bytes()
return nil
}
// FromHeaders returns tracing span context from p2p Headers. If the tracing
// span context is not present in go context, ErrContextNotFound is returned.
func (t *Tracer) FromHeaders(headers p2p.Headers) (opentracing.SpanContext, error) {
if t == nil {
t = noopTracer
}
v := headers[p2p.HeaderNameTracingSpanContext]
if v == nil {
return nil, ErrContextNotFound
}
c, err := t.tracer.Extract(opentracing.Binary, bytes.NewReader(v))
if err != nil {
if errors.Is(err, opentracing.ErrSpanContextNotFound) {
return nil, ErrContextNotFound
}
return nil, err
}
return c, nil
}
// WithContextFromHeaders returns a new context with injected tracing span
// context if they are found in p2p Headers. If the tracing span context is not
// present in go context, ErrContextNotFound is returned.
func (t *Tracer) WithContextFromHeaders(ctx context.Context, headers p2p.Headers) (context.Context, error) {
if t == nil {
t = noopTracer
}
c, err := t.FromHeaders(headers)
if err != nil {
return ctx, err
}
return WithContext(ctx, c), nil
}
// WithContext adds tracing span context to go context.
func WithContext(ctx context.Context, c opentracing.SpanContext) context.Context {
return context.WithValue(ctx, contextKey, c)
}
// FromContext return tracing span context from go context. If the tracing span
// context is not present in go context, nil is returned.
func FromContext(ctx context.Context) opentracing.SpanContext {
c, ok := ctx.Value(contextKey).(opentracing.SpanContext)
if !ok {
return nil
}
return c
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tracing_test
import (
"context"
"fmt"
"io"
"testing"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/tracing"
)
func TestSpanFromHeaders(t *testing.T) {
tracer, closer := newTracer(t)
defer closer.Close()
span, ctx := tracer.StartSpanFromContext(context.Background(), "some-operation")
defer span.Finish()
headers := make(p2p.Headers)
if err := tracer.AddContextHeader(ctx, headers); err != nil {
t.Fatal(err)
}
gotSpanContext, err := tracer.FromHeaders(headers)
if err != nil {
t.Fatal(err)
}
if fmt.Sprint(gotSpanContext) == "" {
t.Fatal("got empty span context")
}
wantSpanContext := span.Context()
if fmt.Sprint(wantSpanContext) == "" {
t.Fatal("got empty start span context")
}
if fmt.Sprint(gotSpanContext) != fmt.Sprint(wantSpanContext) {
t.Errorf("got span context %+v, want %+v", gotSpanContext, wantSpanContext)
}
}
func TestSpanWithContextFromHeaders(t *testing.T) {
tracer, closer := newTracer(t)
defer closer.Close()
span, ctx := tracer.StartSpanFromContext(context.Background(), "some-operation")
defer span.Finish()
headers := make(p2p.Headers)
if err := tracer.AddContextHeader(ctx, headers); err != nil {
t.Fatal(err)
}
ctx, err := tracer.WithContextFromHeaders(context.Background(), headers)
if err != nil {
t.Fatal(err)
}
gotSpanContext := tracing.FromContext(ctx)
if fmt.Sprint(gotSpanContext) == "" {
t.Fatal("got empty span context")
}
wantSpanContext := span.Context()
if fmt.Sprint(wantSpanContext) == "" {
t.Fatal("got empty start span context")
}
if fmt.Sprint(gotSpanContext) != fmt.Sprint(wantSpanContext) {
t.Errorf("got span context %+v, want %+v", gotSpanContext, wantSpanContext)
}
}
func TestFromContext(t *testing.T) {
tracer, closer := newTracer(t)
defer closer.Close()
span, ctx := tracer.StartSpanFromContext(context.Background(), "some-operation")
defer span.Finish()
wantSpanContext := span.Context()
if fmt.Sprint(wantSpanContext) == "" {
t.Fatal("got empty start span context")
}
gotSpanContext := tracing.FromContext(ctx)
if fmt.Sprint(gotSpanContext) == "" {
t.Fatal("got empty span context")
}
if fmt.Sprint(gotSpanContext) != fmt.Sprint(wantSpanContext) {
t.Errorf("got span context %+v, want %+v", gotSpanContext, wantSpanContext)
}
}
func TestWithContext(t *testing.T) {
tracer, closer := newTracer(t)
defer closer.Close()
span, _ := tracer.StartSpanFromContext(context.Background(), "some-operation")
defer span.Finish()
wantSpanContext := span.Context()
if fmt.Sprint(wantSpanContext) == "" {
t.Fatal("got empty start span context")
}
ctx := tracing.WithContext(context.Background(), span.Context())
gotSpanContext := tracing.FromContext(ctx)
if fmt.Sprint(gotSpanContext) == "" {
t.Fatal("got empty span context")
}
if fmt.Sprint(gotSpanContext) != fmt.Sprint(wantSpanContext) {
t.Errorf("got span context %+v, want %+v", gotSpanContext, wantSpanContext)
}
}
func newTracer(t *testing.T) (*tracing.Tracer, io.Closer) {
t.Helper()
tracer, closer, err := tracing.NewTracer(&tracing.Options{
Enabled: true,
ServiceName: "test",
})
if err != nil {
t.Fatal(err)
}
return tracer, closer
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment