Commit e5f141cf authored by acud's avatar acud Committed by GitHub

pushsync: improve context handling (#1416)

parent f63c396f
......@@ -58,7 +58,7 @@ type PushSync struct {
tracer *tracing.Tracer
}
var timeToWaitForReceipt = 3 * time.Second // time to wait to get a receipt for a chunk
var timeToLive = 5 * time.Second // request time to live
func New(streamer p2p.StreamerDisconnecter, storer storage.Putter, closestPeerer topology.ClosestPeerer, tagger *tags.Tags, unwrap func(swarm.Chunk), logger logging.Logger, accounting accounting.Interface, pricer accounting.Pricer, tracer *tracing.Tracer) *PushSync {
ps := &PushSync{
......@@ -93,6 +93,8 @@ func (s *PushSync) Protocol() p2p.ProtocolSpec {
// If the current node is the destination, it stores in the local store and sends a receipt.
func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
w, r := protobuf.NewWriterAndReader(stream)
ctx, cancel := context.WithTimeout(ctx, timeToLive)
defer cancel()
defer func() {
if err != nil {
ps.metrics.TotalErrors.Inc()
......@@ -123,15 +125,13 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
receipt, err := ps.pushToClosest(ctx, chunk)
if err != nil {
if errors.Is(err, topology.ErrWantSelf) {
// store the chunk in the local store
_, err = ps.storer.Put(ctx, storage.ModePutSync, chunk)
if err != nil {
return fmt.Errorf("chunk store: %w", err)
}
receipt := pb.Receipt{Address: chunk.Address().Bytes()}
if err := w.WriteMsg(&receipt); err != nil {
if err := w.WriteMsgWithContext(ctx, &receipt); err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
}
......@@ -140,9 +140,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
return fmt.Errorf("handler: push to closest: %w", err)
}
// pass back the received receipt in the previously received stream
ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt)
defer cancel()
// pass back the receipt
if err := w.WriteMsgWithContext(ctx, receipt); err != nil {
return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err)
}
......@@ -189,13 +187,6 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.R
defersFn()
deferFuncs = append(deferFuncs, func() {
if lastErr != nil {
ps.metrics.TotalErrors.Inc()
logger.Errorf("pushsync: %v", lastErr)
}
})
// find next closest peer
peer, err := ps.peerSuggester.ClosestPeer(ch.Address(), skipPeers...)
if err != nil {
......@@ -208,6 +199,13 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.R
// save found peer (to be skipped if there is some error with him)
skipPeers = append(skipPeers, peer)
deferFuncs = append(deferFuncs, func() {
if lastErr != nil {
ps.metrics.TotalErrors.Inc()
logger.Errorf("pushsync: %v", lastErr)
}
})
// compute the price we pay for this receipt and reserve it for the rest of this function
receiptPrice := ps.pricer.PeerPrice(peer, ch.Address())
err = ps.accounting.Reserve(ctx, peer, receiptPrice)
......@@ -224,9 +222,9 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.R
deferFuncs = append(deferFuncs, func() { go streamer.FullClose() })
w, r := protobuf.NewWriterAndReader(streamer)
ctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt)
defer cancel()
if err := w.WriteMsgWithContext(ctx, &pb.Delivery{
ctxd, canceld := context.WithTimeout(ctx, timeToLive)
deferFuncs = append(deferFuncs, func() { canceld() })
if err := w.WriteMsgWithContext(ctxd, &pb.Delivery{
Address: ch.Address().Bytes(),
Data: ch.Data(),
}); err != nil {
......@@ -249,9 +247,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.R
}
var receipt pb.Receipt
cctx, cancel := context.WithTimeout(ctx, timeToWaitForReceipt)
defer cancel()
if err := r.ReadMsgWithContext(cctx, &receipt); err != nil {
if err := r.ReadMsgWithContext(ctxd, &receipt); err != nil {
_ = streamer.Reset()
lastErr = fmt.Errorf("chunk %s receive receipt from peer %s: %w", ch.Address().String(), peer.String(), err)
continue
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment