Commit 02f37ece authored by acud's avatar acud Committed by GitHub

pushsync: remove some indirection and redundant error logging (#716)

parent dea34546
...@@ -328,7 +328,15 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service, ...@@ -328,7 +328,15 @@ func NewBee(addr string, swarmAddress swarm.Address, keystore keystore.Service,
} }
retrieve.SetStorer(ns) retrieve.SetStorer(ns)
pushSyncProtocol := pushsync.New(p2ps, storer, kad, tagg, psss.TryUnwrap, logger, acc, accounting.NewFixedPricer(swarmAddress, 10), tracer) silenceNoHandlerFunc := func(ctx context.Context, ch swarm.Chunk) error {
err := psss.TryUnwrap(ctx, ch)
if errors.Is(err, pss.ErrNoHandler) {
return nil
}
return err
}
pushSyncProtocol := pushsync.New(p2ps, storer, kad, tagg, silenceNoHandlerFunc, logger, acc, accounting.NewFixedPricer(swarmAddress, 10), tracer)
// set the pushSyncer in the PSS // set the pushSyncer in the PSS
psss.SetPushSyncer(pushSyncProtocol) psss.SetPushSyncer(pushSyncProtocol)
......
...@@ -92,11 +92,15 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -92,11 +92,15 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
_ = stream.FullClose() _ = stream.FullClose()
} }
}() }()
// Get the delivery
chunk, err := ps.getChunkDelivery(r) var ch pb.Delivery
if err != nil { if err = r.ReadMsgWithContext(ctx, &ch); err != nil {
return fmt.Errorf("chunk delivery from peer %s: %w", p.Address.String(), err) ps.metrics.ReceivedChunkErrorCounter.Inc()
return fmt.Errorf("pushsync read delivery: %w", err)
} }
ps.metrics.ChunksReceivedCounter.Inc()
chunk := swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data)
span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunk.Address().String()}) span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunk.Address().String()})
defer span.Finish() defer span.Finish()
...@@ -170,20 +174,6 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) ...@@ -170,20 +174,6 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
return ps.accounting.Debit(p.Address, ps.pricer.Price(chunk.Address())) return ps.accounting.Debit(p.Address, ps.pricer.Price(chunk.Address()))
} }
func (ps *PushSync) getChunkDelivery(r protobuf.Reader) (chunk swarm.Chunk, err error) {
var ch pb.Delivery
if err = r.ReadMsg(&ch); err != nil {
ps.metrics.ReceivedChunkErrorCounter.Inc()
return nil, err
}
ps.metrics.ChunksSentCounter.Inc()
// create chunk
addr := swarm.NewAddress(ch.Address)
chunk = swarm.NewChunk(addr, ch.Data)
return chunk, nil
}
func (ps *PushSync) sendChunkDelivery(w protobuf.Writer, chunk swarm.Chunk) (err error) { func (ps *PushSync) sendChunkDelivery(w protobuf.Writer, chunk swarm.Chunk) (err error) {
startTimer := time.Now() startTimer := time.Now()
if err = w.WriteMsgWithTimeout(timeToWaitForReceipt, &pb.Delivery{ if err = w.WriteMsgWithTimeout(timeToWaitForReceipt, &pb.Delivery{
...@@ -299,14 +289,6 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re ...@@ -299,14 +289,6 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
return rec, nil return rec, nil
} }
func (ps *PushSync) deliverToPSS(ctx context.Context, ch swarm.Chunk) error {
// if callback is defined, call it for every new, valid chunk
if ps.deliveryCallback != nil {
return ps.deliveryCallback(ctx, ch)
}
return nil
}
func (ps *PushSync) handleDeliveryResponse(ctx context.Context, w protobuf.Writer, p p2p.Peer, chunk swarm.Chunk) error { func (ps *PushSync) handleDeliveryResponse(ctx context.Context, w protobuf.Writer, p p2p.Peer, chunk swarm.Chunk) error {
// Store the chunk in the local store // Store the chunk in the local store
_, err := ps.storer.Put(ctx, storage.ModePutSync, chunk) _, err := ps.storer.Put(ctx, storage.ModePutSync, chunk)
...@@ -327,10 +309,12 @@ func (ps *PushSync) handleDeliveryResponse(ctx context.Context, w protobuf.Write ...@@ -327,10 +309,12 @@ func (ps *PushSync) handleDeliveryResponse(ctx context.Context, w protobuf.Write
return err return err
} }
// since all PSS messages comes through push sync, deliver them here if this node is the destination if ps.deliveryCallback != nil {
err = ps.deliverToPSS(ctx, chunk) err = ps.deliveryCallback(ctx, chunk)
if err != nil { if err != nil {
ps.logger.Debugf("error pss delivery for chunk %v: %v", chunk.Address(), err) ps.logger.Debugf("pushsync delivery callback: %v", err)
}
} }
return nil return nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment