Commit 2c50eec2 authored by acud's avatar acud Committed by GitHub

instrument libp2p errors, print pusher trace only when successful (#368)

* instrument libp2p errors, print pusher trace only when successful
parent ef68bf26
...@@ -134,7 +134,7 @@ func (s *Service) Handshake(stream p2p.Stream, peerMultiaddr ma.Multiaddr, peerI ...@@ -134,7 +134,7 @@ func (s *Service) Handshake(stream p2p.Stream, peerMultiaddr ma.Multiaddr, peerI
return nil, fmt.Errorf("write ack message: %w", err) return nil, fmt.Errorf("write ack message: %w", err)
} }
s.logger.Tracef("handshake finished for peer %s", remoteBzzAddress.Overlay.String()) s.logger.Tracef("handshake finished for peer (outbound) %s", remoteBzzAddress.Overlay.String())
return &Info{ return &Info{
BzzAddress: remoteBzzAddress, BzzAddress: remoteBzzAddress,
Light: resp.Ack.Light, Light: resp.Ack.Light,
...@@ -213,7 +213,7 @@ func (s *Service) Handle(stream p2p.Stream, remoteMultiaddr ma.Multiaddr, remote ...@@ -213,7 +213,7 @@ func (s *Service) Handle(stream p2p.Stream, remoteMultiaddr ma.Multiaddr, remote
return nil, err return nil, err
} }
s.logger.Tracef("handshake finished for peer %s", remoteBzzAddress.Overlay.String()) s.logger.Tracef("handshake finished for peer (inbound) %s", remoteBzzAddress.Overlay.String())
return &Info{ return &Info{
BzzAddress: remoteBzzAddress, BzzAddress: remoteBzzAddress,
Light: ack.Light, Light: ack.Light,
......
...@@ -249,7 +249,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay ...@@ -249,7 +249,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
} }
s.metrics.HandledStreamCount.Inc() s.metrics.HandledStreamCount.Inc()
s.logger.Infof("successfully connected to peer %s", i.BzzAddress.ShortString()) s.logger.Infof("successfully connected to peer (inbound) %s", i.BzzAddress.ShortString())
}) })
h.Network().SetConnHandler(func(_ network.Conn) { h.Network().SetConnHandler(func(_ network.Conn) {
...@@ -347,7 +347,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -347,7 +347,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
// Extract the peer ID from the multiaddr. // Extract the peer ID from the multiaddr.
info, err := libp2ppeer.AddrInfoFromP2pAddr(addr) info, err := libp2ppeer.AddrInfoFromP2pAddr(addr)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("addr from p2p: %w", err)
} }
if _, found := s.peers.overlay(info.ID); found { if _, found := s.peers.overlay(info.ID); found {
...@@ -364,7 +364,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -364,7 +364,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
stream, err := s.newStreamForPeerID(ctx, info.ID, handshake.ProtocolName, handshake.ProtocolVersion, handshake.StreamName) stream, err := s.newStreamForPeerID(ctx, info.ID, handshake.ProtocolName, handshake.ProtocolVersion, handshake.StreamName)
if err != nil { if err != nil {
_ = s.disconnect(info.ID) _ = s.disconnect(info.ID)
return nil, err return nil, fmt.Errorf("connect new stream: %w", err)
} }
i, err := s.handshakeService.Handshake(NewStream(stream), stream.Conn().RemoteMultiaddr(), stream.Conn().RemotePeer()) i, err := s.handshakeService.Handshake(NewStream(stream), stream.Conn().RemoteMultiaddr(), stream.Conn().RemotePeer())
...@@ -376,7 +376,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -376,7 +376,7 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists { if exists := s.peers.addIfNotExists(stream.Conn(), i.BzzAddress.Overlay); exists {
if err := helpers.FullClose(stream); err != nil { if err := helpers.FullClose(stream); err != nil {
_ = s.disconnect(info.ID) _ = s.disconnect(info.ID)
return nil, err return nil, fmt.Errorf("peer exists, full close: %w", err)
} }
return i.BzzAddress, nil return i.BzzAddress, nil
...@@ -384,11 +384,11 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz. ...@@ -384,11 +384,11 @@ func (s *Service) Connect(ctx context.Context, addr ma.Multiaddr) (address *bzz.
if err := helpers.FullClose(stream); err != nil { if err := helpers.FullClose(stream); err != nil {
_ = s.disconnect(info.ID) _ = s.disconnect(info.ID)
return nil, err return nil, fmt.Errorf("connect full close %w", err)
} }
s.metrics.CreatedConnectionCount.Inc() s.metrics.CreatedConnectionCount.Inc()
s.logger.Infof("successfully connected to peer %s", i.BzzAddress.ShortString()) s.logger.Infof("successfully connected to peer (outbound) %s", i.BzzAddress.ShortString())
return i.BzzAddress, nil return i.BzzAddress, nil
} }
...@@ -428,7 +428,7 @@ func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, headers ...@@ -428,7 +428,7 @@ func (s *Service) NewStream(ctx context.Context, overlay swarm.Address, headers
streamlibp2p, err := s.newStreamForPeerID(ctx, peerID, protocolName, protocolVersion, streamName) streamlibp2p, err := s.newStreamForPeerID(ctx, peerID, protocolName, protocolVersion, streamName)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("new stream for peerid: %w", err)
} }
stream := newStream(streamlibp2p) stream := newStream(streamlibp2p)
......
...@@ -290,7 +290,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) ...@@ -290,7 +290,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
cursors, err := p.syncer.GetCursors(ctx, peer) cursors, err := p.syncer.GetCursors(ctx, peer)
if err != nil { if err != nil {
if logMore { if logMore {
p.logger.Debugf("error getting cursors: %v", err) p.logger.Debugf("error getting cursors from peer %s: %v", peer.String(), err)
} }
delete(p.syncPeers[po], peer.String()) delete(p.syncPeers[po], peer.String())
return return
......
...@@ -89,7 +89,7 @@ LOOP: ...@@ -89,7 +89,7 @@ LOOP:
} }
// postpone a retry only after we've finished processing everything in index // postpone a retry only after we've finished processing everything in index
timer.Reset(1 * time.Second) timer.Reset(retryInterval)
chunksInBatch++ chunksInBatch++
s.metrics.TotalChunksToBeSentCounter.Inc() s.metrics.TotalChunksToBeSentCounter.Inc()
select { select {
...@@ -111,8 +111,12 @@ LOOP: ...@@ -111,8 +111,12 @@ LOOP:
mtx.Unlock() mtx.Unlock()
go func(ctx context.Context, ch swarm.Chunk) { go func(ctx context.Context, ch swarm.Chunk) {
var err error
defer func() { defer func() {
s.logger.Tracef("pusher pushed chunk %s", ch.Address().String()) if err == nil {
// only print this if there was no error while sending the chunk
s.logger.Tracef("pusher pushed chunk %s", ch.Address().String())
}
mtx.Lock() mtx.Lock()
delete(inflight, ch.Address().String()) delete(inflight, ch.Address().String())
mtx.Unlock() mtx.Unlock()
...@@ -120,7 +124,7 @@ LOOP: ...@@ -120,7 +124,7 @@ LOOP:
}() }()
// Later when we process receipt, get the receipt and process it // Later when we process receipt, get the receipt and process it
// for now ignoring the receipt and checking only for error // for now ignoring the receipt and checking only for error
_, err := s.pushSyncer.PushChunkToClosest(ctx, ch) _, err = s.pushSyncer.PushChunkToClosest(ctx, ch)
if err != nil { if err != nil {
if !errors.Is(err, topology.ErrNotFound) { if !errors.Is(err, topology.ErrNotFound) {
s.logger.Errorf("pusher: error while sending chunk or receiving receipt: %v", err) s.logger.Errorf("pusher: error while sending chunk or receiving receipt: %v", err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment