Commit 9ba745a8 authored by metacertain's avatar metacertain Committed by GitHub

Pullsync - reset stream on errors (#428)

* pullsync zombie threads
parent 9a179405
......@@ -83,13 +83,18 @@ func (s *Service) SetPeerAddedHandler(h func(ctx context.Context, addr swarm.Add
s.peerHandler = h
}
func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swarm.Address) error {
func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swarm.Address) (err error) {
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, peersStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
defer stream.FullClose()
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose()
}
}()
w, _ := protobuf.NewWriterAndReader(stream)
var peersRequest pb.Peers
for _, p := range peers {
......@@ -99,8 +104,6 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa
s.logger.Debugf("hive broadcast peers: peer not found in the addressbook. Skipping peer %s", p)
continue
}
_ = stream.Reset()
return err
}
......@@ -112,7 +115,6 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa
}
if err := w.WriteMsg(&peersRequest); err != nil {
_ = stream.Reset()
return fmt.Errorf("write Peers message: %w", err)
}
......
......@@ -112,6 +112,13 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
if err != nil {
return 0, 0, fmt.Errorf("new stream: %w", err)
}
defer func() {
if err != nil {
_ = stream.Reset()
} else {
go stream.FullClose()
}
}()
var ru pb.Ruid
b := make([]byte, 4)
......@@ -123,7 +130,6 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
ru.Ruid = binary.BigEndian.Uint32(b)
w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close()
if err = w.WriteMsgWithContext(ctx, &ru); err != nil {
return 0, 0, fmt.Errorf("write ruid: %w", err)
......@@ -213,10 +219,15 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
}
// handler handles an incoming request to sync an interval
func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close()
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose()
}
}()
var ru pb.Ruid
if err := r.ReadMsgWithContext(ctx, &ru); err != nil {
return fmt.Errorf("send ruid: %w", err)
......@@ -342,12 +353,18 @@ func (s *Syncer) processWant(ctx context.Context, o *pb.Offer, w *pb.Want) ([]sw
return s.storage.Get(ctx, storage.ModeGetSync, addrs...)
}
func (s *Syncer) GetCursors(ctx context.Context, peer swarm.Address) ([]uint64, error) {
func (s *Syncer) GetCursors(ctx context.Context, peer swarm.Address) (retr []uint64, err error) {
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, cursorStreamName)
if err != nil {
return nil, fmt.Errorf("new stream: %w", err)
}
defer stream.Close()
defer func() {
if err != nil {
_ = stream.Reset()
} else {
go stream.FullClose()
}
}()
w, r := protobuf.NewWriterAndReader(stream)
syn := &pb.Syn{}
......@@ -360,12 +377,20 @@ func (s *Syncer) GetCursors(ctx context.Context, peer swarm.Address) ([]uint64,
return nil, fmt.Errorf("read ack: %w", err)
}
return ack.Cursors, nil
retr = ack.Cursors
return retr, nil
}
func (s *Syncer) cursorHandler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
func (s *Syncer) cursorHandler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
w, r := protobuf.NewWriterAndReader(stream)
defer stream.Close()
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose()
}
}()
var syn pb.Syn
if err := r.ReadMsgWithContext(ctx, &syn); err != nil {
......@@ -376,7 +401,6 @@ func (s *Syncer) cursorHandler(ctx context.Context, p p2p.Peer, stream p2p.Strea
s.metrics.DbOpsCounter.Inc()
ints, err := s.storage.Cursors(ctx)
if err != nil {
_ = stream.FullClose()
return err
}
ack.Cursors = ints
......@@ -387,14 +411,20 @@ func (s *Syncer) cursorHandler(ctx context.Context, p p2p.Peer, stream p2p.Strea
return nil
}
func (s *Syncer) CancelRuid(peer swarm.Address, ruid uint32) error {
func (s *Syncer) CancelRuid(peer swarm.Address, ruid uint32) (err error) {
stream, err := s.streamer.NewStream(context.Background(), peer, nil, protocolName, protocolVersion, cancelStreamName)
if err != nil {
return fmt.Errorf("new stream: %w", err)
}
w := protobuf.NewWriter(stream)
defer stream.Close()
defer func() {
if err != nil {
_ = stream.Reset()
} else {
go stream.FullClose()
}
}()
var c pb.Cancel
c.Ruid = ruid
......@@ -405,9 +435,15 @@ func (s *Syncer) CancelRuid(peer swarm.Address, ruid uint32) error {
}
// handler handles an incoming request to explicitly cancel a ruid
func (s *Syncer) cancelHandler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
func (s *Syncer) cancelHandler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
r := protobuf.NewReader(stream)
defer stream.Close()
defer func() {
if err != nil {
_ = stream.Reset()
} else {
_ = stream.FullClose()
}
}()
var c pb.Cancel
if err := r.ReadMsgWithContext(ctx, &c); err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment