Commit d37698be authored by Janoš Guljaš's avatar Janoš Guljaš Committed by GitHub

retrieval, headers and kademlia announce improvements (#331)

Co-authored-by: default avataracud <12988138+acud@users.noreply.github.com>
Co-authored-by: default avatarPetar Radovic <petar.radovic@gmail.com>
parent 1ab01607
...@@ -207,7 +207,7 @@ func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -207,7 +207,7 @@ func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
s.Logger.Debugf("file download: read entry %s: %v", addr, err) s.Logger.Debugf("file download: read entry %s: %v", addr, err)
s.Logger.Errorf("file download: read entry %s", addr) s.Logger.Errorf("file download: read entry %s", addr)
jsonhttp.InternalServerError(w, "error reading entry") jsonhttp.NotFound(w, nil)
return return
} }
e := &entry.Entry{} e := &entry.Entry{}
...@@ -235,7 +235,7 @@ func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -235,7 +235,7 @@ func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
s.Logger.Debugf("file download: read metadata %s: %v", addr, err) s.Logger.Debugf("file download: read metadata %s: %v", addr, err)
s.Logger.Errorf("file download: read metadata %s", addr) s.Logger.Errorf("file download: read metadata %s", addr)
jsonhttp.InternalServerError(w, "error reading metadata") jsonhttp.NotFound(w, nil)
return return
} }
metaData := &entry.Metadata{} metaData := &entry.Metadata{}
...@@ -253,7 +253,7 @@ func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -253,7 +253,7 @@ func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) {
if errors.Is(err, storage.ErrNotFound) { if errors.Is(err, storage.ErrNotFound) {
s.Logger.Debugf("file download: not found %s: %v", e.Reference(), err) s.Logger.Debugf("file download: not found %s: %v", e.Reference(), err)
s.Logger.Errorf("file download: not found %s", addr) s.Logger.Errorf("file download: not found %s", addr)
jsonhttp.NotFound(w, "not found") jsonhttp.NotFound(w, nil)
return return
} }
s.Logger.Debugf("file download: invalid root chunk %s: %v", e.Reference(), err) s.Logger.Debugf("file download: invalid root chunk %s: %v", e.Reference(), err)
...@@ -267,7 +267,7 @@ func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -267,7 +267,7 @@ func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) {
if err != nil && c == 0 { if err != nil && c == 0 {
s.Logger.Debugf("file download: data join %s: %v", addr, err) s.Logger.Debugf("file download: data join %s: %v", addr, err)
s.Logger.Errorf("file download: data join %s", addr) s.Logger.Errorf("file download: data join %s", addr)
jsonhttp.InternalServerError(w, "error reading data") jsonhttp.NotFound(w, nil)
return return
} }
w.Header().Set("ETag", fmt.Sprintf("%q", e.Reference())) w.Header().Set("ETag", fmt.Sprintf("%q", e.Reference()))
......
...@@ -339,6 +339,10 @@ func (k *Kad) AddPeer(ctx context.Context, addr swarm.Address) error { ...@@ -339,6 +339,10 @@ func (k *Kad) AddPeer(ctx context.Context, addr swarm.Address) error {
// Connected is called when a peer has dialed in. // Connected is called when a peer has dialed in.
func (k *Kad) Connected(ctx context.Context, addr swarm.Address) error { func (k *Kad) Connected(ctx context.Context, addr swarm.Address) error {
if err := k.announce(ctx, addr); err != nil {
return err
}
po := swarm.Proximity(k.base.Bytes(), addr.Bytes()) po := swarm.Proximity(k.base.Bytes(), addr.Bytes())
k.knownPeers.Add(addr, po) k.knownPeers.Add(addr, po)
k.connectedPeers.Add(addr, po) k.connectedPeers.Add(addr, po)
...@@ -357,7 +361,8 @@ func (k *Kad) Connected(ctx context.Context, addr swarm.Address) error { ...@@ -357,7 +361,8 @@ func (k *Kad) Connected(ctx context.Context, addr swarm.Address) error {
case k.manageC <- struct{}{}: case k.manageC <- struct{}{}:
default: default:
} }
return k.announce(ctx, addr)
return nil
} }
// Disconnected is called when peer disconnects. // Disconnected is called when peer disconnects.
......
...@@ -14,9 +14,14 @@ import ( ...@@ -14,9 +14,14 @@ import (
"github.com/ethersphere/bee/pkg/p2p/protobuf" "github.com/ethersphere/bee/pkg/p2p/protobuf"
) )
var sendHeadersTimeout = 10 * time.Second
func sendHeaders(ctx context.Context, headers p2p.Headers, stream *stream) error { func sendHeaders(ctx context.Context, headers p2p.Headers, stream *stream) error {
w, r := protobuf.NewWriterAndReader(stream) w, r := protobuf.NewWriterAndReader(stream)
ctx, cancel := context.WithTimeout(ctx, sendHeadersTimeout)
defer cancel()
if err := w.WriteMsgWithContext(ctx, headersP2PToPB(headers)); err != nil { if err := w.WriteMsgWithContext(ctx, headersP2PToPB(headers)); err != nil {
return fmt.Errorf("write message: %w", err) return fmt.Errorf("write message: %w", err)
} }
......
...@@ -7,6 +7,7 @@ package retrieval ...@@ -7,6 +7,7 @@ package retrieval
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
...@@ -31,14 +32,14 @@ type Interface interface { ...@@ -31,14 +32,14 @@ type Interface interface {
type Service struct { type Service struct {
streamer p2p.Streamer streamer p2p.Streamer
peerSuggester topology.ClosestPeerer peerSuggester topology.EachPeerer
storer storage.Storer storer storage.Storer
logger logging.Logger logger logging.Logger
} }
type Options struct { type Options struct {
Streamer p2p.Streamer Streamer p2p.Streamer
ChunkPeerer topology.ClosestPeerer ChunkPeerer topology.EachPeerer
Storer storage.Storer Storer storage.Storer
Logger logging.Logger Logger logging.Logger
} }
...@@ -65,31 +66,99 @@ func (s *Service) Protocol() p2p.ProtocolSpec { ...@@ -65,31 +66,99 @@ func (s *Service) Protocol() p2p.ProtocolSpec {
} }
} }
const (
maxPeers = 10
retrieveChunkTimeout = 3 * time.Second
)
func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (data []byte, err error) { func (s *Service) RetrieveChunk(ctx context.Context, addr swarm.Address) (data []byte, err error) {
peer, err := s.peerSuggester.ClosestPeer(addr) var skipPeers []swarm.Address
for i := 0; i < maxPeers; i++ {
var peer swarm.Address
data, peer, err = s.retrieveChunk(ctx, addr, skipPeers)
if err != nil {
if peer.IsZero() {
return nil, err
}
s.logger.Debugf("retrieval: failed to get chunk %s from peer %s: %v", addr, peer, err)
skipPeers = append(skipPeers, peer)
continue
}
s.logger.Tracef("retrieval: got chunk %s from peer %s", addr, peer)
return data, nil
}
return nil, err
}
func (s *Service) retrieveChunk(ctx context.Context, addr swarm.Address, skipPeers []swarm.Address) (data []byte, peer swarm.Address, err error) {
ctx, cancel := context.WithTimeout(ctx, retrieveChunkTimeout)
defer cancel()
peer, err = s.closestPeer(addr, skipPeers)
if err != nil { if err != nil {
return nil, fmt.Errorf("get closest: %w", err) return nil, peer, fmt.Errorf("get closest: %w", err)
} }
s.logger.Tracef("retrieval: get chunk %s from peer %s", addr, peer)
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil { if err != nil {
return nil, fmt.Errorf("new stream: %w", err) return nil, peer, fmt.Errorf("new stream: %w", err)
} }
defer stream.Close() defer stream.Close()
w, r := protobuf.NewWriterAndReader(stream) w, r := protobuf.NewWriterAndReader(stream)
if err := w.WriteMsg(&pb.Request{ if err := w.WriteMsgWithContext(ctx, &pb.Request{
Addr: addr.Bytes(), Addr: addr.Bytes(),
}); err != nil { }); err != nil {
return nil, fmt.Errorf("write request: %w peer %s", err, peer.String()) return nil, peer, fmt.Errorf("write request: %w peer %s", err, peer.String())
} }
var d pb.Delivery var d pb.Delivery
if err := r.ReadMsg(&d); err != nil { if err := r.ReadMsgWithContext(ctx, &d); err != nil {
return nil, fmt.Errorf("read delivery: %w peer %s", err, peer.String()) return nil, peer, fmt.Errorf("read delivery: %w peer %s", err, peer.String())
}
return d.Data, peer, nil
}
func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address) (swarm.Address, error) {
closest := swarm.Address{}
err := s.peerSuggester.EachPeerRev(func(peer swarm.Address, po uint8) (bool, bool, error) {
for _, a := range skipPeers {
if a.Equal(peer) {
return false, false, nil
}
}
if closest.IsZero() {
closest = peer
return false, false, nil
}
dcmp, err := swarm.DistanceCmp(addr.Bytes(), closest.Bytes(), peer.Bytes())
if err != nil {
return false, false, err
}
switch dcmp {
case 0:
// do nothing
case -1:
// current peer is closer
closest = peer
case 1:
// closest is already closer to chunk
// do nothing
}
return false, false, nil
})
if err != nil {
return swarm.Address{}, err
}
// check if found
if closest.IsZero() {
return swarm.Address{}, topology.ErrNotFound
} }
return d.Data, nil return closest, nil
} }
func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error { func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) error {
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/hex" "encoding/hex"
"errors"
"io/ioutil" "io/ioutil"
"testing" "testing"
"time" "time"
...@@ -20,6 +21,7 @@ import ( ...@@ -20,6 +21,7 @@ import (
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
storemock "github.com/ethersphere/bee/pkg/storage/mock" storemock "github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
) )
var testTimeout = 5 * time.Second var testTimeout = 5 * time.Second
...@@ -56,9 +58,10 @@ func TestDelivery(t *testing.T) { ...@@ -56,9 +58,10 @@ func TestDelivery(t *testing.T) {
// was successful // was successful
clientMockStorer := storemock.NewStorer() clientMockStorer := storemock.NewStorer()
ps := mockPeerSuggester{spFunc: func(_ swarm.Address) (swarm.Address, error) { peerID := swarm.MustParseHexAddress("9ee7add7")
v, err := swarm.ParseHexAddress("9ee7add7") ps := mockPeerSuggester{eachPeerRevFunc: func(f topology.EachPeerFunc) error {
return v, err _, _, _ = f(peerID, 0)
return nil
}} }}
client := retrieval.New(retrieval.Options{ client := retrieval.New(retrieval.Options{
Streamer: recorder, Streamer: recorder,
...@@ -75,7 +78,6 @@ func TestDelivery(t *testing.T) { ...@@ -75,7 +78,6 @@ func TestDelivery(t *testing.T) {
if !bytes.Equal(v, reqData) { if !bytes.Equal(v, reqData) {
t.Fatalf("request and response data not equal. got %s want %s", v, reqData) t.Fatalf("request and response data not equal. got %s want %s", v, reqData)
} }
peerID, _ := ps.ClosestPeer(swarm.ZeroAddress)
records, err := recorder.Records(peerID, "retrieval", "1.0.0", "retrieval") records, err := recorder.Records(peerID, "retrieval", "1.0.0", "retrieval")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -121,9 +123,12 @@ func TestDelivery(t *testing.T) { ...@@ -121,9 +123,12 @@ func TestDelivery(t *testing.T) {
} }
type mockPeerSuggester struct { type mockPeerSuggester struct {
spFunc func(swarm.Address) (swarm.Address, error) eachPeerRevFunc func(f topology.EachPeerFunc) error
} }
func (v mockPeerSuggester) ClosestPeer(addr swarm.Address) (swarm.Address, error) { func (s mockPeerSuggester) EachPeer(f topology.EachPeerFunc) error {
return v.spFunc(addr) return s.eachPeerRevFunc(f)
}
func (s mockPeerSuggester) EachPeerRev(topology.EachPeerFunc) error {
return errors.New("not implemented")
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment