Commit ed2d6085 authored by Janoš Guljaš's avatar Janoš Guljaš Committed by GitHub

allow first retrieval request to be upstream (#742)

parent 298a5386
......@@ -115,18 +115,25 @@ func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (swarm.
func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPeers []swarm.Address) (chunk swarm.Chunk, peer swarm.Address, err error) {
v := ctx.Value(requestSourceContextKey{})
// allow upstream requests if this node is the source of the request
// i.e. the request was not forwarded, to improve retrieval
// if this node is the closest to he chunk but still does not contain it
allowUpstream := true
if src, ok := v.(string); ok {
skipAddr, err := swarm.ParseHexAddress(src)
if err == nil {
skipPeers = append(skipPeers, skipAddr)
}
// do not allow upstream requests if the request was forwarded to this node
// to avoid the request loops
allowUpstream = false
}
ctx, cancel := context.WithTimeout(ctx, retrieveChunkTimeout)
defer cancel()
peer, err = s.closestPeer(addr, skipPeers)
peer, err = s.closestPeer(addr, skipPeers, allowUpstream)
if err != nil {
return nil, peer, fmt.Errorf("get closest: %w", err)
return nil, peer, fmt.Errorf("get closest for address %s, allow upstream %v: %w", addr.String(), allowUpstream, err)
}
// compute the price we pay for this chunk and reserve it for the rest of this function
......@@ -177,7 +184,12 @@ func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPee
return chunk, peer, err
}
func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address) (swarm.Address, error) {
// closestPeer returns address of the peer that is closest to the chunk with
// provided address addr. This function will ignore peers with addresses
// provided in skipPeers and if allowUpstream is true, peers that are further of
// the chunk than this node is, could also be returned, allowing the upstream
// retrieve request.
func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address, allowUpstream bool) (swarm.Address, error) {
closest := swarm.Address{}
err := s.peerSuggester.EachPeerRev(func(peer swarm.Address, po uint8) (bool, bool, error) {
for _, a := range skipPeers {
......@@ -214,6 +226,10 @@ func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address) (sw
return swarm.Address{}, topology.ErrNotFound
}
if allowUpstream {
return closest, nil
}
dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), s.addr.Bytes())
if err != nil {
return swarm.Address{}, fmt.Errorf("distance compare addr %s closest %s base address %s: %w", addr.String(), closest.String(), s.addr.String(), err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment