Commit 36018f53 authored by Janoš Guljaš's avatar Janoš Guljaš Committed by GitHub

use singleflight in retrieval (#338)

parent 303ac3af
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
"golang.org/x/sync/singleflight"
) )
const ( const (
...@@ -34,6 +35,7 @@ type Service struct { ...@@ -34,6 +35,7 @@ type Service struct {
streamer p2p.Streamer streamer p2p.Streamer
peerSuggester topology.EachPeerer peerSuggester topology.EachPeerer
storer storage.Storer storer storage.Storer
singleflight singleflight.Group
logger logging.Logger logger logging.Logger
} }
...@@ -72,22 +74,28 @@ const ( ...@@ -72,22 +74,28 @@ const (
) )
func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (data []byte, err error) { func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (data []byte, err error) {
var skipPeers []swarm.Address v, err, _ := s.singleflight.Do(addr.String(), func() (v interface{}, err error) {
for i := 0; i < maxPeers; i++ { var skipPeers []swarm.Address
var peer swarm.Address for i := 0; i < maxPeers; i++ {
data, peer, err = s.retrieveChunk(ctx, addr, skipPeers) var peer swarm.Address
if err != nil { data, peer, err = s.retrieveChunk(ctx, addr, skipPeers)
if peer.IsZero() { if err != nil {
return nil, err if peer.IsZero() {
return nil, err
}
s.logger.Debugf("retrieval: failed to get chunk %s from peer %s: %v", addr, peer, err)
skipPeers = append(skipPeers, peer)
continue
} }
s.logger.Debugf("retrieval: failed to get chunk %s from peer %s: %v", addr, peer, err) s.logger.Tracef("retrieval: got chunk %s from peer %s", addr, peer)
skipPeers = append(skipPeers, peer) return data, nil
continue
} }
s.logger.Tracef("retrieval: got chunk %s from peer %s", addr, peer) return nil, err
return data, nil })
if err != nil {
return nil, err
} }
return nil, err return v.([]byte), nil
} }
func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPeers []swarm.Address) (data []byte, peer swarm.Address, err error) { func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPeers []swarm.Address) (data []byte, peer swarm.Address, err error) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment