Commit d7294150 authored by Janos Guljas's avatar Janos Guljas

improve logging and error reporting in libp2p and handshake

parent 40c77d92
...@@ -93,6 +93,7 @@ func (c *command) initStartCmd() (err error) { ...@@ -93,6 +93,7 @@ func (c *command) initStartCmd() (err error) {
ConnectionsLow: c.config.GetInt(optionNameConnectionsLow), ConnectionsLow: c.config.GetInt(optionNameConnectionsLow),
ConnectionsHigh: c.config.GetInt(optionNameConnectionsHigh), ConnectionsHigh: c.config.GetInt(optionNameConnectionsHigh),
ConnectionsGrace: c.config.GetDuration(optionNameConnectionsGrace), ConnectionsGrace: c.config.GetDuration(optionNameConnectionsGrace),
Logger: logger,
}, },
Logger: logger, Logger: logger,
}) })
......
...@@ -8,8 +8,6 @@ package handshake ...@@ -8,8 +8,6 @@ package handshake
import ( import (
"fmt" "fmt"
"io"
"log"
"github.com/janos/bee/pkg/p2p" "github.com/janos/bee/pkg/p2p"
"github.com/janos/bee/pkg/p2p/protobuf" "github.com/janos/bee/pkg/p2p/protobuf"
...@@ -23,52 +21,52 @@ const ( ...@@ -23,52 +21,52 @@ const (
type Service struct { type Service struct {
overlay string overlay string
logger Logger
} }
func New(overlay string) *Service { func New(overlay string, logger Logger) *Service {
return &Service{overlay: overlay} return &Service{
overlay: overlay,
logger: logger,
}
}
type Logger interface {
Tracef(format string, args ...interface{})
} }
func (s *Service) Handshake(stream p2p.Stream) (overlay string, err error) { func (s *Service) Handshake(stream p2p.Stream) (overlay string, err error) {
w, r := protobuf.NewWriterAndReader(stream) w, r := protobuf.NewWriterAndReader(stream)
var resp ShakeHand var resp ShakeHand
if err := w.WriteMsg(&ShakeHand{Address: s.overlay}); err != nil { if err := w.WriteMsg(&ShakeHand{Address: s.overlay}); err != nil {
return "", fmt.Errorf("handshake handler: write message: %v\n", err) return "", fmt.Errorf("handshake handler: write message: %w", err)
} }
log.Printf("sent handshake req %s\n", s.overlay) s.logger.Tracef("handshake: sent request %s", s.overlay)
if err := r.ReadMsg(&resp); err != nil { if err := r.ReadMsg(&resp); err != nil {
if err == io.EOF { return "", fmt.Errorf("handshake handler: read message: %w", err)
return "", nil
} }
return "", fmt.Errorf("handshake handler: read message: %v\n", err) s.logger.Tracef("handshake: read response: %s", resp.Address)
}
log.Printf("read handshake resp: %s\n", resp.Address)
return resp.Address, nil return resp.Address, nil
} }
func (s *Service) Handler(stream p2p.Stream) string { func (s *Service) Handle(stream p2p.Stream) (overlay string, err error) {
w, r := protobuf.NewWriterAndReader(stream) w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close() defer stream.Close()
var req ShakeHand var req ShakeHand
if err := r.ReadMsg(&req); err != nil { if err := r.ReadMsg(&req); err != nil {
if err == io.EOF { return "", fmt.Errorf("read message: %w", err)
return ""
}
log.Printf("handshake handler: read message: %v\n", err)
return ""
} }
log.Printf("received handshake req %s\n", req.Address) s.logger.Tracef("handshake: received request %s", req.Address)
if err := w.WriteMsg(&ShakeHand{ if err := w.WriteMsg(&ShakeHand{
Address: s.overlay, Address: s.overlay,
}); err != nil { }); err != nil {
log.Printf("handshake handler: write message: %v\n", err) return "", fmt.Errorf("write message: %w", err)
} }
log.Printf("sent handshake resp: %s\n", s.overlay) s.logger.Tracef("handshake: handled response: %s", s.overlay)
return req.Address return req.Address, nil
} }
...@@ -42,6 +42,7 @@ type Service struct { ...@@ -42,6 +42,7 @@ type Service struct {
metrics metrics metrics metrics
handshakeService *handshake.Service handshakeService *handshake.Service
peers *peerRegistry peers *peerRegistry
logger Logger
} }
type Options struct { type Options struct {
...@@ -54,6 +55,13 @@ type Options struct { ...@@ -54,6 +55,13 @@ type Options struct {
ConnectionsLow int ConnectionsLow int
ConnectionsHigh int ConnectionsHigh int
ConnectionsGrace time.Duration ConnectionsGrace time.Duration
Logger Logger
}
type Logger interface {
Tracef(format string, args ...interface{})
Infof(format string, args ...interface{})
Errorf(format string, args ...interface{})
} }
func New(ctx context.Context, o Options) (*Service, error) { func New(ctx context.Context, o Options) (*Service, error) {
...@@ -182,8 +190,9 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -182,8 +190,9 @@ func New(ctx context.Context, o Options) (*Service, error) {
s := &Service{ s := &Service{
host: h, host: h,
metrics: newMetrics(), metrics: newMetrics(),
handshakeService: handshake.New(overlay), handshakeService: handshake.New(overlay, o.Logger),
peers: newPeerRegistry(), peers: newPeerRegistry(),
logger: o.Logger,
} }
// Construct protocols. // Construct protocols.
...@@ -195,9 +204,14 @@ func New(ctx context.Context, o Options) (*Service, error) { ...@@ -195,9 +204,14 @@ func New(ctx context.Context, o Options) (*Service, error) {
} }
s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) { s.host.SetStreamHandlerMatch(id, matcher, func(stream network.Stream) {
peerID := stream.Conn().RemotePeer()
overlay, err := s.handshakeService.Handle(stream)
if err != nil {
s.logger.Errorf("handshake with peer %s: %w", peerID, err)
}
s.peers.add(peerID, overlay)
s.metrics.HandledStreamCount.Inc() s.metrics.HandledStreamCount.Inc()
overlay := s.handshakeService.Handler(stream) s.logger.Infof("peer %q connected", overlay)
s.peers.add(stream.Conn().RemotePeer(), overlay)
}) })
// TODO: be more resilient on connection errors and connect in parallel // TODO: be more resilient on connection errors and connect in parallel
...@@ -233,7 +247,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) { ...@@ -233,7 +247,7 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
overlay, found := s.peers.overlay(peerID) overlay, found := s.peers.overlay(peerID)
if !found { if !found {
// todo: handle better // todo: handle better
fmt.Printf("Could not fetch handshake for peerID %s\n", stream) s.logger.Errorf("overlay address for peer %q not found", peerID)
return return
} }
...@@ -283,7 +297,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (err error) { ...@@ -283,7 +297,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (err error) {
s.peers.add(info.ID, overlay) s.peers.add(info.ID, overlay)
s.metrics.CreatedConnectionCount.Inc() s.metrics.CreatedConnectionCount.Inc()
fmt.Println("handshake handshake finished") s.logger.Infof("peer %q connected", overlay)
return nil return nil
} }
func (s *Service) NewStream(ctx context.Context, overlay, protocolName, streamName, version string) (p2p.Stream, error) { func (s *Service) NewStream(ctx context.Context, overlay, protocolName, streamName, version string) (p2p.Stream, error) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment