Commit 23f47d4d authored by Janoš Guljaš's avatar Janoš Guljaš Committed by GitHub

cancel stream contexts (#372)

parent 2c50eec2
......@@ -291,9 +291,14 @@ func (s *Service) AddProtocol(p p2p.ProtocolSpec) (err error) {
return
}
ctx, cancel := context.WithCancel(s.ctx)
s.peers.addStream(peerID, streamlibp2p, cancel)
defer s.peers.removeStream(peerID, streamlibp2p)
// tracing: get span tracing context and add it to the context
// silently ignore if the peer is not providing tracing
ctx, err := s.tracer.WithContextFromHeaders(s.ctx, stream.Headers())
ctx, err := s.tracer.WithContextFromHeaders(ctx, stream.Headers())
if err != nil && !errors.Is(err, tracing.ErrContextNotFound) {
s.logger.Debugf("handle protocol %s/%s: stream %s: peer %s: get tracing context: %v", p.Name, p.Version, ss.Name, overlay, err)
return
......
......@@ -6,6 +6,7 @@ package libp2p
import (
"bytes"
"context"
"sort"
"sync"
......@@ -20,6 +21,7 @@ type peerRegistry struct {
underlays map[string]libp2ppeer.ID // map overlay address to underlay peer id
overlays map[libp2ppeer.ID]swarm.Address // map underlay peer id to overlay address
connections map[libp2ppeer.ID]map[network.Conn]struct{} // list of connections for safe removal on Disconnect notification
streams map[libp2ppeer.ID]map[network.Stream]context.CancelFunc
mu sync.RWMutex
disconnecter topology.Disconnecter // peerRegistry notifies topology on peer disconnection
......@@ -31,6 +33,7 @@ func newPeerRegistry() *peerRegistry {
underlays: make(map[string]libp2ppeer.ID),
overlays: make(map[libp2ppeer.ID]swarm.Address),
connections: make(map[libp2ppeer.ID]map[network.Conn]struct{}),
streams: make(map[libp2ppeer.ID]map[network.Stream]context.CancelFunc),
Notifiee: new(network.NoopNotifiee),
}
......@@ -64,12 +67,43 @@ func (r *peerRegistry) Disconnected(_ network.Network, c network.Conn) {
delete(r.connections, peerID)
}
for _, cancel := range r.streams[peerID] {
cancel()
}
delete(r.streams, peerID)
r.mu.Unlock()
if r.disconnecter != nil {
r.disconnecter.Disconnected(overlay)
}
}
func (r *peerRegistry) addStream(peerID libp2ppeer.ID, stream network.Stream, cancel context.CancelFunc) {
r.mu.Lock()
defer r.mu.Unlock()
r.streams[peerID][stream] = cancel
}
func (r *peerRegistry) removeStream(peerID libp2ppeer.ID, stream network.Stream) {
r.mu.Lock()
defer r.mu.Unlock()
peer, ok := r.streams[peerID]
if !ok {
return
}
cancel, ok := peer[stream]
if !ok {
return
}
cancel()
delete(r.streams[peerID], stream)
}
func (r *peerRegistry) peers() []p2p.Peer {
r.mu.RLock()
peers := make([]p2p.Peer, 0, len(r.overlays))
......@@ -94,6 +128,7 @@ func (r *peerRegistry) addIfNotExists(c network.Conn, overlay swarm.Address) (ex
r.connections[peerID] = make(map[network.Conn]struct{})
}
r.connections[peerID][c] = struct{}{}
r.streams[peerID] = make(map[network.Stream]context.CancelFunc)
if _, exists := r.underlays[overlay.ByteString()]; !exists {
r.underlays[overlay.ByteString()] = peerID
......@@ -124,6 +159,10 @@ func (r *peerRegistry) remove(peerID libp2ppeer.ID) {
delete(r.overlays, peerID)
delete(r.underlays, overlay.ByteString())
delete(r.connections, peerID)
for _, cancel := range r.streams[peerID] {
cancel()
}
delete(r.streams, peerID)
r.mu.Unlock()
// if overlay was not found disconnect handler should not be signaled.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment