Commit 0107aa44 authored by Janoš Guljaš's avatar Janoš Guljaš Committed by GitHub

propagate context in pushsync receipt message exchange (#811)

parent 4d72cce7
...@@ -142,12 +142,12 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -142,12 +142,12 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
}() }()
wc, rc := protobuf.NewWriterAndReader(streamer) wc, rc := protobuf.NewWriterAndReader(streamer)
if err := ps.sendChunkDelivery(wc, chunk); err != nil { if err := ps.sendChunkDelivery(ctx, wc, chunk); err != nil {
return fmt.Errorf("forward chunk to peer %s: %w", peer.String(), err) return fmt.Errorf("forward chunk to peer %s: %w", peer.String(), err)
} }
receiptRTTTimer := time.Now() receiptRTTTimer := time.Now()
receipt, err := ps.receiveReceipt(rc) receipt, err := ps.receiveReceipt(ctx, rc)
if err != nil { if err != nil {
return fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err) return fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err)
} }
...@@ -165,7 +165,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -165,7 +165,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
} }
// pass back the received receipt in the previously received stream // pass back the received receipt in the previously received stream
err = ps.sendReceipt(w, &receipt) err = ps.sendReceipt(ctx, w, &receipt)
if err != nil { if err != nil {
return fmt.Errorf("send receipt to peer %s: %w", peer.String(), err) return fmt.Errorf("send receipt to peer %s: %w", peer.String(), err)
} }
...@@ -174,9 +174,11 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -174,9 +174,11 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
return ps.accounting.Debit(p.Address, ps.pricer.Price(chunk.Address())) return ps.accounting.Debit(p.Address, ps.pricer.Price(chunk.Address()))
} }
func (ps *PushSync) sendChunkDelivery(w protobuf.Writer, chunk swarm.Chunk) (err error) { func (ps *PushSync) sendChunkDelivery(ctx context.Context, w protobuf.Writer, chunk swarm.Chunk) (err error) {
ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt)
defer cancel()
startTimer := time.Now() startTimer := time.Now()
if err = w.WriteMsgWithTimeout(timeToWaitForReceipt, &pb.Delivery{ if err = w.WriteMsgWithContext(ctx, &pb.Delivery{
Address: chunk.Address().Bytes(), Address: chunk.Address().Bytes(),
Data: chunk.Data(), Data: chunk.Data(),
}); err != nil { }); err != nil {
...@@ -188,8 +190,10 @@ func (ps *PushSync) sendChunkDelivery(w protobuf.Writer, chunk swarm.Chunk) (err ...@@ -188,8 +190,10 @@ func (ps *PushSync) sendChunkDelivery(w protobuf.Writer, chunk swarm.Chunk) (err
return nil return nil
} }
func (ps *PushSync) sendReceipt(w protobuf.Writer, receipt *pb.Receipt) (err error) { func (ps *PushSync) sendReceipt(ctx context.Context, w protobuf.Writer, receipt *pb.Receipt) (err error) {
if err := w.WriteMsg(receipt); err != nil { ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt)
defer cancel()
if err := w.WriteMsgWithContext(ctx, receipt); err != nil {
ps.metrics.SendReceiptErrorCounter.Inc() ps.metrics.SendReceiptErrorCounter.Inc()
return err return err
} }
...@@ -197,8 +201,10 @@ func (ps *PushSync) sendReceipt(w protobuf.Writer, receipt *pb.Receipt) (err err ...@@ -197,8 +201,10 @@ func (ps *PushSync) sendReceipt(w protobuf.Writer, receipt *pb.Receipt) (err err
return nil return nil
} }
func (ps *PushSync) receiveReceipt(r protobuf.Reader) (receipt pb.Receipt, err error) { func (ps *PushSync) receiveReceipt(ctx context.Context, r protobuf.Reader) (receipt pb.Receipt, err error) {
if err := r.ReadMsg(&receipt); err != nil { ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt)
defer cancel()
if err := r.ReadMsgWithContext(ctx, &receipt); err != nil {
ps.metrics.ReceiveReceiptErrorCounter.Inc() ps.metrics.ReceiveReceiptErrorCounter.Inc()
return receipt, err return receipt, err
} }
...@@ -248,7 +254,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re ...@@ -248,7 +254,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
defer func() { go streamer.FullClose() }() defer func() { go streamer.FullClose() }()
w, r := protobuf.NewWriterAndReader(streamer) w, r := protobuf.NewWriterAndReader(streamer)
if err := ps.sendChunkDelivery(w, ch); err != nil { if err := ps.sendChunkDelivery(ctx, w, ch); err != nil {
_ = streamer.Reset() _ = streamer.Reset()
return nil, fmt.Errorf("chunk deliver to peer %s: %w", peer.String(), err) return nil, fmt.Errorf("chunk deliver to peer %s: %w", peer.String(), err)
} }
...@@ -263,7 +269,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re ...@@ -263,7 +269,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
} }
receiptRTTTimer := time.Now() receiptRTTTimer := time.Now()
receipt, err := ps.receiveReceipt(r) receipt, err := ps.receiveReceipt(ctx, r)
if err != nil { if err != nil {
_ = streamer.Reset() _ = streamer.Reset()
return nil, fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err) return nil, fmt.Errorf("receive receipt from peer %s: %w", peer.String(), err)
...@@ -299,7 +305,7 @@ func (ps *PushSync) handleDeliveryResponse(ctx context.Context, w protobuf.Write ...@@ -299,7 +305,7 @@ func (ps *PushSync) handleDeliveryResponse(ctx context.Context, w protobuf.Write
// Send a receipt immediately once the storage of the chunk is successfully // Send a receipt immediately once the storage of the chunk is successfully
receipt := &pb.Receipt{Address: chunk.Address().Bytes()} receipt := &pb.Receipt{Address: chunk.Address().Bytes()}
err = ps.sendReceipt(w, receipt) err = ps.sendReceipt(ctx, w, receipt)
if err != nil { if err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err) return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment