Commit 137dae3b authored by santicomp2014's avatar santicomp2014 Committed by GitHub

bee-file/main, bee-join/main, api, api_test, file, file_test, netstore,...

bee-file/main, bee-join/main, api, api_test, file, file_test, netstore, netstore_test, node: add targets to query param on download path (#430)

* pkg/api,pkg/file,pkg/netstore: adds targets as query param to downloadfile and propagates the ctx all the way down to netstore

* cmd/bee-file,pkg/file: added missing context

* pkg/api added type for targets ctx

* pkg/netstore: commented out recovery section in netstore.get

* cmd/bee-join,pkg/api,pkg/netstore: simplified comment in netstore.get, changed targetsContextKey to struct and moved up to the package level

* api/test: added test for targets

* api,api_test: added test for targets

* api: removed antipattern

* api: changed targets location due to master conflicts

* api_test,api: added targets to chunk api and test for this use case

* api_test,netstore_test,netstore,node: refactored test for query param check and also integrated logs in netstore

* api_test: condensed targets test to minimal expression

* api,api_test: refactored Targets into swarm-recovery-targets and extracted it into api as a const

* api: fixed lint error space in comment

* netstore: removed todo from get
parent 3cdcdc62
......@@ -59,7 +59,7 @@ func getEntry(cmd *cobra.Command, args []string) (err error) {
writeCloser := cmdfile.NopWriteCloser(buf)
limitBuf := cmdfile.NewLimitWriteCloser(writeCloser, limitMetadataLength)
j := joiner.NewSimpleJoiner(store)
_, err = file.JoinReadAll(j, addr, limitBuf, false)
_, err = file.JoinReadAll(cmd.Context(), j, addr, limitBuf, false)
if err != nil {
return err
}
......@@ -70,7 +70,7 @@ func getEntry(cmd *cobra.Command, args []string) (err error) {
}
buf = bytes.NewBuffer(nil)
_, err = file.JoinReadAll(j, e.Metadata(), buf, false)
_, err = file.JoinReadAll(cmd.Context(), j, e.Metadata(), buf, false)
if err != nil {
return err
}
......@@ -116,7 +116,7 @@ func getEntry(cmd *cobra.Command, args []string) (err error) {
return err
}
defer outFile.Close()
_, err = file.JoinReadAll(j, e.Reference(), outFile, false)
_, err = file.JoinReadAll(cmd.Context(), j, e.Reference(), outFile, false)
return err
}
......
......@@ -83,7 +83,7 @@ func Join(cmd *cobra.Command, args []string) (err error) {
// create the join and get its data reader
j := joiner.NewSimpleJoiner(store)
_, err = file.JoinReadAll(j, addr, outFile, false)
_, err = file.JoinReadAll(cmd.Context(), j, addr, outFile, false)
return err
}
......
......@@ -35,6 +35,11 @@ type Options struct {
Tracer *tracing.Tracer
}
const (
// TargetsRecoveryHeader defines the Header for Recovery targets in Global Pinning
TargetsRecoveryHeader = "swarm-recovery-targets"
)
func New(o Options) Service {
s := &server{
Options: o,
......
......@@ -25,6 +25,7 @@ import (
func TestBytes(t *testing.T) {
var (
resource = "/bytes"
targets = "0x222"
expHash = "29a5fb121ce96194ba8b7b823a1f9c6af87e1791f824940a53b5a7efe3f790d9"
mockStorer = mock.NewStorer()
client = newTestServer(t, testServerOptions{
......@@ -57,6 +58,14 @@ func TestBytes(t *testing.T) {
}
})
t.Run("download-with-targets", func(t *testing.T) {
resp := request(t, client, http.MethodGet, resource+"/"+expHash+"?targets="+targets, nil, http.StatusOK)
if resp.Header.Get(api.TargetsRecoveryHeader) != targets {
t.Fatalf("targets mismatch. got %s, want %s", resp.Header.Get(api.TargetsRecoveryHeader), targets)
}
})
t.Run("not found", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodGet, resource+"/abcd", nil, http.StatusNotFound, jsonhttp.StatusResponse{
Message: "not found",
......
......@@ -6,6 +6,7 @@ package api
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
......@@ -26,6 +27,10 @@ const (
)
func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
targets := r.URL.Query().Get("targets")
r = r.WithContext(context.WithValue(r.Context(), targetsContextKey{}, targets))
ctx := r.Context()
addressHex := mux.Vars(r)["address"]
path := mux.Vars(r)["path"]
......@@ -42,7 +47,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
// read manifest entry
j := joiner.NewSimpleJoiner(s.Storer)
buf := bytes.NewBuffer(nil)
_, err = file.JoinReadAll(j, address, buf, toDecrypt)
_, err = file.JoinReadAll(ctx, j, address, buf, toDecrypt)
if err != nil {
s.Logger.Debugf("bzz download: read entry %s: %v", address, err)
s.Logger.Errorf("bzz download: read entry %s", address)
......@@ -60,7 +65,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
// read metadata
buf = bytes.NewBuffer(nil)
_, err = file.JoinReadAll(j, e.Metadata(), buf, toDecrypt)
_, err = file.JoinReadAll(ctx, j, e.Metadata(), buf, toDecrypt)
if err != nil {
s.Logger.Debugf("bzz download: read metadata %s: %v", address, err)
s.Logger.Errorf("bzz download: read metadata %s", address)
......@@ -86,7 +91,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
// read manifest content
buf = bytes.NewBuffer(nil)
_, err = file.JoinReadAll(j, e.Reference(), buf, toDecrypt)
_, err = file.JoinReadAll(ctx, j, e.Reference(), buf, toDecrypt)
if err != nil {
s.Logger.Debugf("bzz download: data join %s: %v", address, err)
s.Logger.Errorf("bzz download: data join %s", address)
......@@ -127,7 +132,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
// read file entry
buf = bytes.NewBuffer(nil)
_, err = file.JoinReadAll(j, manifestEntryAddress, buf, toDecrypt)
_, err = file.JoinReadAll(ctx, j, manifestEntryAddress, buf, toDecrypt)
if err != nil {
s.Logger.Debugf("bzz download: read file entry %s: %v", address, err)
s.Logger.Errorf("bzz download: read file entry %s", address)
......
......@@ -6,6 +6,7 @@ package api
import (
"bytes"
"context"
"errors"
"fmt"
"io"
......@@ -115,6 +116,9 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
}
func (s *server) chunkGetHandler(w http.ResponseWriter, r *http.Request) {
targets := r.URL.Query().Get("targets")
r = r.WithContext(context.WithValue(r.Context(), targetsContextKey{}, targets))
addr := mux.Vars(r)["addr"]
ctx := r.Context()
......@@ -140,5 +144,6 @@ func (s *server) chunkGetHandler(w http.ResponseWriter, r *http.Request) {
return
}
w.Header().Set("Content-Type", "binary/octet-stream")
w.Header().Set(TargetsRecoveryHeader, targets)
_, _ = io.Copy(w, bytes.NewReader(chunk.Data()))
}
......@@ -27,7 +27,9 @@ import (
func TestChunkUploadDownload(t *testing.T) {
var (
targets = "0x222"
resource = func(addr swarm.Address) string { return "/chunks/" + addr.String() }
resourceTargets = func(addr swarm.Address) string { return "/chunks/" + addr.String() + "?targets=" + targets }
validHash = swarm.MustParseHexAddress("aabbcc")
invalidHash = swarm.MustParseHexAddress("bbccdd")
validContent = []byte("bbaatt")
......@@ -118,6 +120,14 @@ func TestChunkUploadDownload(t *testing.T) {
}
})
t.Run("retrieve-targets", func(t *testing.T) {
resp := request(t, client, http.MethodGet, resourceTargets(validHash), nil, http.StatusOK)
// Check if the target is obtained correctly
if resp.Header.Get(api.TargetsRecoveryHeader) != targets {
t.Fatalf("targets mismatch. got %s, want %s", resp.Header.Get(api.TargetsRecoveryHeader), targets)
}
})
}
func request(t *testing.T, client *http.Client, method, resource string, body io.Reader, responseCode int) *http.Response {
......
......@@ -7,6 +7,7 @@ package api
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
......@@ -39,6 +40,8 @@ const (
EncryptHeader = "swarm-encrypt"
)
type targetsContextKey struct{}
type fileUploadResponse struct {
Reference swarm.Address `json:"reference"`
}
......@@ -210,11 +213,14 @@ func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) {
}
toDecrypt := len(address.Bytes()) == (swarm.HashSize + encryption.KeyLength)
targets := r.URL.Query().Get("targets")
r = r.WithContext(context.WithValue(r.Context(), targetsContextKey{}, targets))
// read entry.
j := joiner.NewSimpleJoiner(s.Storer)
buf := bytes.NewBuffer(nil)
_, err = file.JoinReadAll(j, address, buf, toDecrypt)
_, err = file.JoinReadAll(r.Context(), j, address, buf, toDecrypt)
if err != nil {
s.Logger.Debugf("file download: read entry %s: %v", addr, err)
s.Logger.Errorf("file download: read entry %s", addr)
......@@ -242,7 +248,7 @@ func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) {
// Read metadata.
buf = bytes.NewBuffer(nil)
_, err = file.JoinReadAll(j, e.Metadata(), buf, toDecrypt)
_, err = file.JoinReadAll(r.Context(), j, e.Metadata(), buf, toDecrypt)
if err != nil {
s.Logger.Debugf("file download: read metadata %s: %v", addr, err)
s.Logger.Errorf("file download: read metadata %s", addr)
......@@ -273,6 +279,9 @@ func (s *server) downloadHandler(
reference swarm.Address,
additionalHeaders http.Header,
) {
targets := r.URL.Query().Get("targets")
r = r.WithContext(context.WithValue(r.Context(), targetsContextKey{}, targets))
ctx := r.Context()
toDecrypt := len(reference.Bytes()) == (swarm.HashSize + encryption.KeyLength)
......@@ -308,7 +317,7 @@ func (s *server) downloadHandler(
}()
go func() {
_, err := file.JoinReadAll(j, reference, pw, toDecrypt)
_, err := file.JoinReadAll(r.Context(), j, reference, pw, toDecrypt)
if err := pw.CloseWithError(err); err != nil {
s.Logger.Debugf("api download: data join close %s: %v", reference, err)
s.Logger.Errorf("api download: data join close %s", reference)
......@@ -339,6 +348,7 @@ func (s *server) downloadHandler(
w.Header().Set("ETag", fmt.Sprintf("%q", reference))
w.Header().Set("Content-Length", fmt.Sprintf("%d", dataSize))
w.Header().Set("Decompressed-Content-Length", fmt.Sprintf("%d", dataSize))
w.Header().Set(TargetsRecoveryHeader, targets)
if _, err = io.Copy(w, bpr); err != nil {
s.Logger.Debugf("api download: data read %s: %v", reference, err)
s.Logger.Errorf("api download: data read %s", reference)
......
......@@ -26,6 +26,7 @@ import (
func TestFiles(t *testing.T) {
var (
fileUploadResource = "/files"
targets = "0x222"
fileDownloadResource = func(addr string) string { return "/files/" + addr }
simpleData = []byte("this is a simple text")
client = newTestServer(t, testServerOptions{
......@@ -197,4 +198,21 @@ func TestFiles(t *testing.T) {
})
})
t.Run("upload-then-download-with-targets", func(t *testing.T) {
fileName := "simple_file.txt"
rootHash := "19d2e82c076031ec4e456978f839472d2f1b1b969a765420404d8d315a0c6123"
headers := make(http.Header)
headers.Add("Content-Type", "text/html; charset=utf-8")
jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, fileUploadResource+"?name="+fileName, bytes.NewReader(simpleData), http.StatusOK, api.FileUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash),
}, headers)
rcvdHeader := jsonhttptest.ResponseDirectCheckBinaryResponse(t, client, http.MethodGet, fileDownloadResource(rootHash)+"?targets="+targets, nil, http.StatusOK, simpleData, nil)
if rcvdHeader.Get(api.TargetsRecoveryHeader) != targets {
t.Fatalf("targets mismatch. got %s, want %s", rcvdHeader.Get(api.TargetsRecoveryHeader), targets)
}
})
}
......@@ -34,8 +34,8 @@ type Splitter interface {
}
// JoinReadAll reads all output from the provided joiner.
func JoinReadAll(j Joiner, addr swarm.Address, outFile io.Writer, toDecrypt bool) (int64, error) {
r, l, err := j.Join(context.Background(), addr, toDecrypt)
func JoinReadAll(ctx context.Context, j Joiner, addr swarm.Address, outFile io.Writer, toDecrypt bool) (int64, error) {
r, l, err := j.Join(ctx, addr, toDecrypt)
if err != nil {
return 0, err
}
......
......@@ -93,7 +93,7 @@ func TestJoinReadAll(t *testing.T) {
var dataLength int64 = swarm.ChunkSize + 2
j := newMockJoiner(dataLength)
buf := bytes.NewBuffer(nil)
c, err := file.JoinReadAll(j, swarm.ZeroAddress, buf, false)
c, err := file.JoinReadAll(context.Background(), j, swarm.ZeroAddress, buf, false)
if err != nil {
t.Fatal(err)
}
......
......@@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/retrieval"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -19,11 +20,12 @@ type store struct {
retrieval retrieval.Interface
validators []swarm.ChunkValidator
logger logging.Logger
}
// New returns a new NetStore that wraps a given Storer.
func New(s storage.Storer, r retrieval.Interface, validators ...swarm.ChunkValidator) storage.Storer {
return &store{Storer: s, retrieval: r, validators: validators}
func New(s storage.Storer, r retrieval.Interface, logger logging.Logger, validators ...swarm.ChunkValidator) storage.Storer {
return &store{Storer: s, retrieval: r, logger: logger, validators: validators}
}
// Get retrieves a given chunk address.
......@@ -35,6 +37,7 @@ func (s *store) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Addres
// request from network
data, err := s.retrieval.RetrieveChunk(ctx, addr)
if err != nil {
s.logger.Debug("INVOKE RECOVERY PROCESS")
return nil, fmt.Errorf("netstore retrieve chunk: %w", err)
}
......
......@@ -7,9 +7,11 @@ package netstore_test
import (
"bytes"
"context"
"io/ioutil"
"sync/atomic"
"testing"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/netstore"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
......@@ -97,7 +99,8 @@ func TestNetstoreNoRetrieval(t *testing.T) {
func newRetrievingNetstore() (ret *retrievalMock, mockStore storage.Storer, ns storage.Storer) {
retrieve := &retrievalMock{}
store := mock.NewStorer()
nstore := netstore.New(store, retrieve, mockValidator{})
logger := logging.New(ioutil.Discard, 0)
nstore := netstore.New(store, retrieve, logger, mockValidator{})
return retrieve, store, nstore
}
......
......@@ -255,7 +255,7 @@ func NewBee(o Options) (*Bee, error) {
return nil, fmt.Errorf("retrieval service: %w", err)
}
ns := netstore.New(storer, retrieve, content.NewValidator(), soc.NewValidator())
ns := netstore.New(storer, retrieve, logger, content.NewValidator(), soc.NewValidator())
retrieve.SetStorer(ns)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment