Commit 029cf9b7 authored by mortelli's avatar mortelli Committed by GitHub

api, debugapi, internal, sctx, tags: tags cleanup (#557)

api, api_test, debugapi, debugapi_test, internal, sctx, tags: restore tags API from the debugapi package and iterate on it.
parent ca8acf3f
......@@ -5,8 +5,11 @@
package api
import (
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/ethersphere/bee/pkg/logging"
m "github.com/ethersphere/bee/pkg/metrics"
......@@ -15,6 +18,11 @@ import (
"github.com/ethersphere/bee/pkg/tracing"
)
const (
SwarmPinHeader = "Swarm-Pin"
SwarmTagUidHeader = "Swarm-Tag-Uid"
)
type Service interface {
http.Handler
m.Collector
......@@ -50,10 +58,26 @@ func New(tags *tags.Tags, storer storage.Storer, corsAllowedOrigins []string, lo
return s
}
const (
SwarmPinHeader = "Swarm-Pin"
TagHeaderUid = "swarm-tag-uid"
)
// getOrCreateTag attempts to get the tag if an id is supplied, and returns an error if it does not exist.
// If no id is supplied, it will attempt to create a new tag with a generated name and return it.
func (s *server) getOrCreateTag(tagUid string) (*tags.Tag, bool, error) {
// if tag ID is not supplied, create a new tag
if tagUid == "" {
tagName := fmt.Sprintf("unnamed_tag_%d", time.Now().Unix())
var err error
tag, err := s.Tags.Create(tagName, 0, false)
if err != nil {
return nil, false, fmt.Errorf("cannot create tag: %w", err)
}
return tag, true, nil
}
uid, err := strconv.Atoi(tagUid)
if err != nil {
return nil, false, fmt.Errorf("cannot parse taguid: %w", err)
}
t, err := s.Tags.Get(uint32(uid))
return t, false, err
}
// requestModePut returns the desired storage.ModePut for this request based on the request headers.
func requestModePut(r *http.Request) storage.ModePut {
......
......@@ -5,18 +5,15 @@
package api
import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/splitter"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/gorilla/mux"
)
......@@ -26,28 +23,31 @@ type bytesPostResponse struct {
// bytesUploadHandler handles upload of raw binary data of arbitrary length.
func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
ta := s.createTag(w, r)
if ta == nil {
tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagUidHeader))
if err != nil {
s.Logger.Debugf("bytes upload: get or create tag: %v", err)
s.Logger.Error("bytes upload: get or create tag")
jsonhttp.InternalServerError(w, "cannot get or create tag")
return
}
// Add the tag to the context
r = r.WithContext(context.WithValue(r.Context(), tags.TagsContextKey{}, ta))
ctx := r.Context()
ctx := sctx.SetTag(r.Context(), tag)
toEncrypt := strings.ToLower(r.Header.Get(EncryptHeader)) == "true"
sp := splitter.NewSimpleSplitter(s.Storer, requestModePut(r))
address, err := file.SplitWriteAll(ctx, sp, r.Body, r.ContentLength, toEncrypt)
if err != nil {
s.Logger.Debugf("bytes upload: %v", err)
s.Logger.Debugf("bytes upload: split write all: %v", err)
s.Logger.Error("bytes upload: split write all")
jsonhttp.InternalServerError(w, nil)
return
}
ta.DoneSplit(address)
w.Header().Set(TagHeaderUid, fmt.Sprint(ta.Uid))
w.Header().Set("Access-Control-Expose-Headers", TagHeaderUid)
if created {
tag.DoneSplit(address)
}
w.Header().Set(SwarmTagUidHeader, fmt.Sprint(tag.Uid))
w.Header().Set("Access-Control-Expose-Headers", SwarmTagUidHeader)
jsonhttp.OK(w, bytesPostResponse{
Reference: address,
})
......@@ -71,39 +71,3 @@ func (s *server) bytesGetHandler(w http.ResponseWriter, r *http.Request) {
s.downloadHandler(w, r, address, additionalHeaders)
}
func (s *server) createTag(w http.ResponseWriter, r *http.Request) *tags.Tag {
// if tag header is not there create a new one
var tag *tags.Tag
tagUidStr := r.Header.Get(TagHeaderUid)
if tagUidStr == "" {
tagName := fmt.Sprintf("unnamed_tag_%d", time.Now().Unix())
var err error
tag, err = s.Tags.Create(tagName, 0, false)
if err != nil {
s.Logger.Debugf("bytes upload: tag creation error: %v", err)
s.Logger.Error("bytes upload: tag creation")
jsonhttp.InternalServerError(w, "cannot create tag")
return nil
}
} else {
// if the tag uid header is present, then use the tag sent
tagUid, err := strconv.ParseUint(tagUidStr, 10, 32)
if err != nil {
s.Logger.Debugf("bytes upload: parse taguid %s: %v", tagUidStr, err)
s.Logger.Error("bytes upload: parse taguid")
jsonhttp.BadRequest(w, "invalid taguid")
return nil
}
tag, err = s.Tags.Get(uint32(tagUid))
if err != nil {
s.Logger.Debugf("bytes upload: get tag error: %v", err)
s.Logger.Error("bytes upload: get tag")
jsonhttp.InternalServerError(w, "cannot create tag")
return nil
}
}
return tag
}
......@@ -6,14 +6,14 @@ package api
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/netstore"
"io"
"io/ioutil"
"net/http"
"github.com/ethersphere/bee/pkg/netstore"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/storage"
......@@ -32,14 +32,16 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}
tag := s.createTag(w, r)
if tag == nil {
tag, _, err := s.getOrCreateTag(r.Header.Get(SwarmTagUidHeader))
if err != nil {
s.Logger.Debugf("chunk upload: get or create tag: %v", err)
s.Logger.Error("chunk upload: get or create tag")
jsonhttp.InternalServerError(w, "cannot get or create tag")
return
}
// Add the tag to the context
r = r.WithContext(context.WithValue(r.Context(), tags.TagsContextKey{}, tag))
ctx := r.Context()
ctx := sctx.SetTag(r.Context(), tag)
// Increment the StateSplit here since we dont have a splitter for the file upload
tag.Inc(tags.StateSplit)
......@@ -68,10 +70,8 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
// Indicate that the chunk is stored
tag.Inc(tags.StateStored)
tag.DoneSplit(address)
w.Header().Set(TagHeaderUid, fmt.Sprint(tag.Uid))
w.Header().Set("Access-Control-Expose-Headers", TagHeaderUid)
w.Header().Set(SwarmTagUidHeader, fmt.Sprint(tag.Uid))
w.Header().Set("Access-Control-Expose-Headers", SwarmTagUidHeader)
jsonhttp.OK(w, nil)
}
......
......@@ -23,6 +23,7 @@ import (
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/manifest/jsonmanifest"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -44,14 +45,28 @@ func (s *server) dirUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}
tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagUidHeader))
if err != nil {
s.Logger.Debugf("dir upload: get or create tag: %v", err)
s.Logger.Error("dir upload: get or create tag")
jsonhttp.InternalServerError(w, "cannot get or create tag")
return
}
// Add the tag to the context
ctx = sctx.SetTag(ctx, tag)
reference, err := storeDir(ctx, r.Body, s.Storer, requestModePut(r), s.Logger)
if err != nil {
s.Logger.Errorf("dir upload, store dir")
s.Logger.Debugf("dir upload, store dir err: %v", err)
s.Logger.Errorf("dir upload, store dir")
jsonhttp.InternalServerError(w, "could not store dir")
return
}
if created {
tag.DoneSplit(reference)
}
w.Header().Set(SwarmTagUidHeader, fmt.Sprint(tag.Uid))
jsonhttp.OK(w, fileUploadResponse{
Reference: reference,
})
......
......@@ -7,6 +7,8 @@ package api
type (
BytesPostResponse = bytesPostResponse
FileUploadResponse = fileUploadResponse
TagResponse = tagResponse
TagRequest = tagRequest
)
var (
......
......@@ -7,7 +7,6 @@ package api
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
......@@ -31,7 +30,6 @@ import (
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/gorilla/mux"
)
......@@ -66,14 +64,16 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}
ta := s.createTag(w, r)
if ta == nil {
tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagUidHeader))
if err != nil {
s.Logger.Debugf("file upload: get or create tag: %v", err)
s.Logger.Error("file upload: get or create tag")
jsonhttp.InternalServerError(w, "cannot get or create tag")
return
}
// Add the tag to the context
r = r.WithContext(context.WithValue(r.Context(), tags.TagsContextKey{}, ta))
ctx := r.Context()
ctx := sctx.SetTag(r.Context(), tag)
if mediaType == multiPartFormData {
mr := multipart.NewReader(r.Body, params["boundary"])
......@@ -205,12 +205,12 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "could not store entry")
return
}
ta.DoneSplit(reference)
if created {
tag.DoneSplit(reference)
}
w.Header().Set("ETag", fmt.Sprintf("%q", reference.String()))
w.Header().Set(TagHeaderUid, fmt.Sprint(ta.Uid))
w.Header().Set("Access-Control-Expose-Headers", TagHeaderUid)
w.Header().Set(SwarmTagUidHeader, fmt.Sprint(tag.Uid))
w.Header().Set("Access-Control-Expose-Headers", SwarmTagUidHeader)
jsonhttp.OK(w, fileUploadResponse{
Reference: reference,
})
......
......@@ -66,6 +66,22 @@ func (s *server) setupRouting() {
"GET": http.HandlerFunc(s.bzzDownloadHandler),
})
router.Handle("/tags", jsonhttp.MethodHandler{
"POST": web.ChainHandlers(
jsonhttp.NewMaxBodyBytesHandler(1024),
web.FinalHandlerFunc(s.createTag),
),
})
router.Handle("/tags/{id}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.getTag),
"DELETE": http.HandlerFunc(s.deleteTag),
"PATCH": web.ChainHandlers(
jsonhttp.NewMaxBodyBytesHandler(1024),
web.FinalHandlerFunc(s.doneSplit),
),
})
s.Handler = web.ChainHandlers(
logging.NewHTTPAccessLogHandler(s.Logger, logrus.InfoLevel, "api access"),
handlers.CompressHandler,
......
......@@ -2,12 +2,13 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package debugapi
package api
import (
crand "crypto/rand"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"time"
......@@ -19,6 +20,11 @@ import (
"github.com/gorilla/mux"
)
type tagRequest struct {
Name string `json:"name,omitempty"`
Address swarm.Address `json:"address,omitempty"`
}
type tagResponse struct {
Total int64 `json:"total"`
Split int64 `json:"split"`
......@@ -50,56 +56,148 @@ func newTagResponse(tag *tags.Tag) tagResponse {
}
func (s *server) createTag(w http.ResponseWriter, r *http.Request) {
name := r.URL.Query().Get("name")
if name == "" {
b := make([]byte, 4)
_, err := crand.Read(b)
body, err := ioutil.ReadAll(r.Body)
if err != nil {
if jsonhttp.HandleBodyReadError(err, w) {
return
}
s.Logger.Debugf("create tag: read request body error: %v", err)
s.Logger.Error("create tag: read request body error")
jsonhttp.InternalServerError(w, "cannot read request")
return
}
tagr := tagRequest{}
if len(body) > 0 {
err = json.Unmarshal(body, &tagr)
if err != nil {
s.Logger.Debugf("create tag: read random bytes %v", err)
s.Logger.Errorf("create tag: read random bytes error")
jsonhttp.InternalServerError(w, nil)
s.Logger.Debugf("create tag: unmarshal tag name error: %v", err)
s.Logger.Errorf("create tag: unmarshal tag name error")
jsonhttp.InternalServerError(w, "error unmarshaling metadata")
return
}
name = fmt.Sprintf("tag-%v-%x", time.Now().UnixNano(), b)
}
tag, err := s.Tags.Create(name, 0, false)
if tagr.Name == "" {
tagr.Name = fmt.Sprintf("unnamed_tag_%d", time.Now().Unix())
}
tag, err := s.Tags.Create(tagr.Name, 0, false)
if err != nil {
s.Logger.Debugf("create tag: %s %v", name, err)
s.Logger.Errorf("create tag: %s error", name)
s.Logger.Debugf("create tag: tag create error: %v", err)
s.Logger.Error("create tag: tag create error")
jsonhttp.InternalServerError(w, "cannot create tag")
return
}
w.Header().Set("Cache-Control", "no-cache, private, max-age=0")
jsonhttp.OK(w, newTagResponse(tag))
jsonhttp.Created(w, newTagResponse(tag))
}
func (s *server) getTag(w http.ResponseWriter, r *http.Request) {
uidStr := mux.Vars(r)["uid"]
idStr := mux.Vars(r)["id"]
uid, err := strconv.ParseUint(uidStr, 10, 32)
id, err := strconv.Atoi(idStr)
if err != nil {
s.Logger.Debugf("get tag: parse uid %s: %v", uidStr, err)
s.Logger.Error("get tag: parse uid")
jsonhttp.BadRequest(w, "invalid uid")
s.Logger.Debugf("get tag: parse id %s: %v", idStr, err)
s.Logger.Error("get tag: parse id")
jsonhttp.BadRequest(w, "invalid id")
return
}
tag, err := s.Tags.Get(uint32(uid))
tag, err := s.Tags.Get(uint32(id))
if err != nil {
if errors.Is(err, tags.ErrNotFound) {
s.Logger.Debugf("get tag: tag %v not present: %v", uid, err)
s.Logger.Warningf("get tag: tag %v not present", uid)
s.Logger.Debugf("get tag: tag not present: %v, id %s", err, idStr)
s.Logger.Error("get tag: tag not present")
jsonhttp.NotFound(w, "tag not present")
return
}
s.Logger.Debugf("get tag: tag %v: %v", uid, err)
s.Logger.Errorf("get tag: %v", uid)
jsonhttp.InternalServerError(w, nil)
s.Logger.Debugf("get tag: tag %v: %v", idStr, err)
s.Logger.Errorf("get tag: %v", idStr)
jsonhttp.InternalServerError(w, "cannot get tag")
return
}
w.Header().Set("Cache-Control", "no-cache, private, max-age=0")
jsonhttp.OK(w, newTagResponse(tag))
}
func (s *server) deleteTag(w http.ResponseWriter, r *http.Request) {
idStr := mux.Vars(r)["id"]
id, err := strconv.Atoi(idStr)
if err != nil {
s.Logger.Debugf("delete tag: parse id %s: %v", idStr, err)
s.Logger.Error("delete tag: parse id")
jsonhttp.BadRequest(w, "invalid id")
return
}
tag, err := s.Tags.Get(uint32(id))
if err != nil {
if errors.Is(err, tags.ErrNotFound) {
s.Logger.Debugf("delete tag: tag not present: %v, id %s", err, idStr)
s.Logger.Error("delete tag: tag not present")
jsonhttp.NotFound(w, "tag not present")
return
}
s.Logger.Debugf("delete tag: tag %v: %v", idStr, err)
s.Logger.Errorf("delete tag: %v", idStr)
jsonhttp.InternalServerError(w, "cannot get tag")
return
}
s.Tags.Delete(tag.Uid)
jsonhttp.NoContent(w)
}
func (s *server) doneSplit(w http.ResponseWriter, r *http.Request) {
idStr := mux.Vars(r)["id"]
id, err := strconv.Atoi(idStr)
if err != nil {
s.Logger.Debugf("done split tag: parse id %s: %v", idStr, err)
s.Logger.Error("done split tag: parse id")
jsonhttp.BadRequest(w, "invalid id")
return
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
if jsonhttp.HandleBodyReadError(err, w) {
return
}
s.Logger.Debugf("done split tag: read request body error: %v", err)
s.Logger.Error("done split tag: read request body error")
jsonhttp.InternalServerError(w, "cannot read request")
return
}
tagr := tagRequest{}
if len(body) > 0 {
err = json.Unmarshal(body, &tagr)
if err != nil {
s.Logger.Debugf("done split tag: unmarshal tag name error: %v", err)
s.Logger.Errorf("done split tag: unmarshal tag name error")
jsonhttp.InternalServerError(w, "error unmarshaling metadata")
return
}
}
tag, err := s.Tags.Get(uint32(id))
if err != nil {
if errors.Is(err, tags.ErrNotFound) {
s.Logger.Debugf("done split: tag not present: %v, id %s", err, idStr)
s.Logger.Error("done split: tag not present")
jsonhttp.NotFound(w, "tag not present")
return
}
s.Logger.Debugf("done split: tag %v: %v", idStr, err)
s.Logger.Errorf("done split: %v", idStr)
jsonhttp.InternalServerError(w, "cannot get tag")
return
}
tag.DoneSplit(tagr.Address)
jsonhttp.OK(w, "ok")
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
mp "github.com/ethersphere/bee/pkg/pusher/mock"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/swarm/test"
"github.com/ethersphere/bee/pkg/tags"
"gitlab.com/nolash/go-mockbytes"
)
type fileUploadResponse struct {
Reference swarm.Address `json:"reference"`
}
func TestTags(t *testing.T) {
var (
filesResource = "/files"
dirResource = "/dirs"
bytesResource = "/bytes"
chunksResource = func(addr swarm.Address) string { return "/chunks/" + addr.String() }
tagsResource = "/tags"
tagsWithIdResource = func(id uint32) string { return fmt.Sprintf("/tags/%d", id) }
someHash = swarm.MustParseHexAddress("aabbcc")
someContent = []byte("bbaatt")
someTagName = "file.jpg"
tag = tags.NewTags()
mockPusher = mp.NewMockPusher(tag)
client = newTestServer(t, testServerOptions{
Storer: mock.NewStorer(),
Tags: tag,
})
)
t.Run("create-unnamed-tag", func(t *testing.T) {
tReq := &api.TagRequest{}
b, err := json.Marshal(tReq)
if err != nil {
t.Fatal(err)
}
tr := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodPost, tagsResource, bytes.NewReader(b), http.StatusCreated, &tr)
if !strings.Contains(tr.Name, "unnamed_tag_") {
t.Fatalf("expected tag name to contain %s but is %s instead", "unnamed_tag_", tr.Name)
}
})
t.Run("create-tag-with-name", func(t *testing.T) {
tReq := &api.TagRequest{
Name: someTagName,
}
b, err := json.Marshal(tReq)
if err != nil {
t.Fatal(err)
}
tr := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodPost, tagsResource, bytes.NewReader(b), http.StatusCreated, &tr)
if tr.Name != someTagName {
t.Fatalf("expected tag name to be %s but is %s instead", someTagName, tr.Name)
}
})
t.Run("create-tag-from-chunk-upload-with-invalid-id", func(t *testing.T) {
sentHeaders := make(http.Header)
sentHeaders.Set(api.SwarmTagUidHeader, "invalid_id.jpg") // the value should be uint32
_ = jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, chunksResource(someHash), bytes.NewReader(someContent), http.StatusInternalServerError, jsonhttp.StatusResponse{
Message: "cannot get or create tag",
Code: http.StatusInternalServerError,
}, sentHeaders)
})
t.Run("get-invalid-tags", func(t *testing.T) {
// invalid tag
jsonhttptest.ResponseDirect(t, client, http.MethodGet, tagsResource+"/foobar", nil, http.StatusBadRequest, jsonhttp.StatusResponse{
Message: "invalid id",
Code: http.StatusBadRequest,
})
// non-existent tag
jsonhttptest.ResponseDirect(t, client, http.MethodDelete, tagsWithIdResource(uint32(333)), nil, http.StatusNotFound, jsonhttp.StatusResponse{
Message: "tag not present",
Code: http.StatusNotFound,
})
})
t.Run("get-tag-id-from-chunk-upload-without-tag", func(t *testing.T) {
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, chunksResource(someHash), bytes.NewReader(someContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, nil)
isTagFoundInResponse(t, rcvdHeaders, nil)
})
t.Run("create-tag-and-use-it-to-upload-chunk", func(t *testing.T) {
// create a tag using the API
b, err := json.Marshal(api.TagResponse{
Name: someTagName,
})
if err != nil {
t.Fatal(err)
}
tr := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodPost, tagsResource, bytes.NewReader(b), http.StatusCreated, &tr)
if tr.Name != someTagName {
t.Fatalf("sent tag name %s does not match received tag name %s", someTagName, tr.Name)
}
// now upload a chunk and see if we receive a tag with the same id
sentHeaders := make(http.Header)
sentHeaders.Set(api.SwarmTagUidHeader, strconv.FormatUint(uint64(tr.Uid), 10))
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, chunksResource(someHash), bytes.NewReader(someContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, sentHeaders)
isTagFoundInResponse(t, rcvdHeaders, &tr)
})
t.Run("create-tag-and-use-it-to-upload-multiple-chunks", func(t *testing.T) {
// create a tag using the API
b, err := json.Marshal(api.TagResponse{
Name: someTagName,
})
if err != nil {
t.Fatal(err)
}
tr := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodPost, tagsResource, bytes.NewReader(b), http.StatusCreated, &tr)
if tr.Name != someTagName {
t.Fatalf("sent tag name %s does not match received tag name %s", someTagName, tr.Name)
}
// now upload a chunk and see if we receive a tag with the same id
sentHeaders := make(http.Header)
sentHeaders.Set(api.SwarmTagUidHeader, fmt.Sprint(tr.Uid))
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, chunksResource(someHash), bytes.NewReader(someContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, sentHeaders)
isTagFoundInResponse(t, rcvdHeaders, &tr)
// add a second valid content validator
secondValidHash := swarm.MustParseHexAddress("deadbeaf")
secondValidContent := []byte("123456")
sentHeaders = make(http.Header)
sentHeaders.Set(api.SwarmTagUidHeader, fmt.Sprint(tr.Uid))
rcvdHeaders = jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, chunksResource(secondValidHash), bytes.NewReader(secondValidContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, sentHeaders)
isTagFoundInResponse(t, rcvdHeaders, &tr)
})
t.Run("get-tag-from-chunk-upload-and-use-it-again", func(t *testing.T) {
// upload a new chunk and get the generated tag id
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, chunksResource(someHash), bytes.NewReader(someContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, nil)
id := isTagFoundInResponse(t, rcvdHeaders, nil)
// see if the tag id is present and has valid values
tr := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodGet, tagsWithIdResource(id), nil, http.StatusOK, &tr)
// now upload another chunk using the same tag id
sentHeaders := make(http.Header)
sentHeaders.Set(api.SwarmTagUidHeader, fmt.Sprint(tr.Uid))
_ = jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, chunksResource(someHash), bytes.NewReader(someContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, sentHeaders)
// see if the tag id is present and has valid values
tr = api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodGet, tagsWithIdResource(id), nil, http.StatusOK, &tr)
if id != tr.Uid {
t.Fatalf("expected tag id to be %d but is %d", id, tr.Uid)
}
if tr.Stored != 2 {
t.Fatalf("expected stored counter to be %d but is %d", 2, tr.Stored)
}
})
t.Run("get-tag-using-id", func(t *testing.T) {
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, chunksResource(someHash), bytes.NewReader(someContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, nil)
id := isTagFoundInResponse(t, rcvdHeaders, nil)
// request the tag and see if the ID is the same
tr := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodGet, tagsWithIdResource(id), nil, http.StatusOK, &tr)
if id != tr.Uid {
t.Fatalf("expected tag id to be %d but is %d", id, tr.Uid)
}
})
t.Run("tag-counters", func(t *testing.T) {
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, chunksResource(someHash), bytes.NewReader(someContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, nil)
id := isTagFoundInResponse(t, rcvdHeaders, nil)
tagToVerify, err := tag.Get(id)
if err != nil {
t.Fatal(err)
}
err = mockPusher.SendChunk(id)
if err != nil {
t.Fatal(err)
}
err = mockPusher.RcvdReceipt(id)
if err != nil {
t.Fatal(err)
}
finalTag := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodGet, tagsWithIdResource(id), nil, http.StatusOK, &finalTag)
if tagToVerify.Total != finalTag.Total {
t.Errorf("tag total count mismatch. got %d want %d", tagToVerify.Total, finalTag.Total)
}
if tagToVerify.Seen != finalTag.Seen {
t.Errorf("tag seen count mismatch. got %d want %d", tagToVerify.Seen, finalTag.Seen)
}
if tagToVerify.Stored != finalTag.Stored {
t.Errorf("tag stored count mismatch. got %d want %d", tagToVerify.Stored, finalTag.Stored)
}
if tagToVerify.Sent != finalTag.Sent {
t.Errorf("tag sent count mismatch. got %d want %d", tagToVerify.Sent, finalTag.Sent)
}
if tagToVerify.Synced != finalTag.Synced {
t.Errorf("tag synced count mismatch. got %d want %d", tagToVerify.Synced, finalTag.Synced)
}
})
t.Run("delete-tag-error", func(t *testing.T) {
// try to delete invalid tag
jsonhttptest.ResponseDirect(t, client, http.MethodDelete, tagsResource+"/foobar", nil, http.StatusBadRequest, jsonhttp.StatusResponse{
Message: "invalid id",
Code: http.StatusBadRequest,
})
// try to delete non-existent tag
jsonhttptest.ResponseDirect(t, client, http.MethodDelete, tagsWithIdResource(uint32(333)), nil, http.StatusNotFound, jsonhttp.StatusResponse{
Message: "tag not present",
Code: http.StatusNotFound,
})
})
t.Run("delete-tag", func(t *testing.T) {
// create a tag through API
b, err := json.Marshal(api.TagResponse{
Name: someTagName,
})
if err != nil {
t.Fatal(err)
}
tRes := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodPost, tagsResource, bytes.NewReader(b), http.StatusCreated, &tRes)
// delete tag through API
jsonhttptest.ResponseDirect(t, client, http.MethodDelete, tagsWithIdResource(tRes.Uid), nil, http.StatusNoContent, nil)
// try to get tag
jsonhttptest.ResponseDirect(t, client, http.MethodGet, tagsWithIdResource(tRes.Uid), nil, http.StatusNotFound, jsonhttp.StatusResponse{
Message: "tag not present",
Code: http.StatusNotFound,
})
})
t.Run("done-split-error", func(t *testing.T) {
b, err := json.Marshal(api.TagResponse{})
if err != nil {
t.Fatal(err)
}
// invalid tag
jsonhttptest.ResponseDirect(t, client, http.MethodPatch, tagsResource+"/foobar", bytes.NewReader(b), http.StatusBadRequest, jsonhttp.StatusResponse{
Message: "invalid id",
Code: http.StatusBadRequest,
})
// non-existent tag
jsonhttptest.ResponseDirect(t, client, http.MethodPatch, tagsWithIdResource(uint32(333)), bytes.NewReader(b), http.StatusNotFound, jsonhttp.StatusResponse{
Message: "tag not present",
Code: http.StatusNotFound,
})
})
t.Run("done-split", func(t *testing.T) {
// create a tag through API
tResB, err := json.Marshal(api.TagResponse{
Name: someTagName,
})
if err != nil {
t.Fatal(err)
}
tRes := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodPost, tagsResource, bytes.NewReader(tResB), http.StatusCreated, &tRes)
tagId := tRes.Uid
// generate address to be supplied to the done split
addr := test.RandomAddress()
tReqB, err := json.Marshal(api.TagRequest{
Address: addr,
})
if err != nil {
t.Fatal(err)
}
// upload content with tag
sentHeaders := make(http.Header)
sentHeaders.Set(api.SwarmTagUidHeader, fmt.Sprint(tagId))
jsonhttptest.ResponseDirectSendHeadersAndDontCheckResponse(t, client, http.MethodPost, chunksResource(someHash), bytes.NewReader(someContent), http.StatusOK, sentHeaders)
// call done split
jsonhttptest.ResponseDirect(t, client, http.MethodPatch, tagsWithIdResource(tagId), bytes.NewReader(tReqB), http.StatusOK, jsonhttp.StatusResponse{
Message: "ok",
Code: http.StatusOK,
})
// check tag data
jsonhttptest.ResponseUnmarshal(t, client, http.MethodGet, tagsWithIdResource(tagId), nil, http.StatusOK, &tRes)
if !tRes.Address.Equal(addr) {
t.Fatalf("expected tag address to be %s but is %s", addr.String(), tRes.Address.String())
}
total := tRes.Total
if !(total > 0) {
t.Errorf("tag total should be greater than 0 but it is not")
}
// try different address value
addr = test.RandomAddress()
tReqB, err = json.Marshal(api.TagRequest{
Address: addr,
})
if err != nil {
t.Fatal(err)
}
// call done split
jsonhttptest.ResponseDirect(t, client, http.MethodPatch, tagsWithIdResource(tagId), bytes.NewReader(tReqB), http.StatusOK, jsonhttp.StatusResponse{
Message: "ok",
Code: http.StatusOK,
})
// check tag data
jsonhttptest.ResponseUnmarshal(t, client, http.MethodGet, tagsWithIdResource(tagId), nil, http.StatusOK, &tRes)
if !tRes.Address.Equal(addr) {
t.Fatalf("expected tag address to be %s but is %s", addr.String(), tRes.Address.String())
}
if tRes.Total != total {
t.Errorf("tag total should not have changed")
}
})
t.Run("file-tags", func(t *testing.T) {
// upload a file without supplying tag
expectedHash := swarm.MustParseHexAddress("8e27bb803ff049e8c2f4650357026723220170c15ebf9b635a7026539879a1a8")
expectedResponse := api.FileUploadResponse{Reference: expectedHash}
sentHeaders := make(http.Header)
sentHeaders.Set("Content-Type", "application/octet-stream")
respHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, filesResource, bytes.NewReader([]byte("some data")), http.StatusOK, expectedResponse, sentHeaders)
tagId, err := strconv.Atoi(respHeaders.Get(api.SwarmTagUidHeader))
if err != nil {
t.Fatal(err)
}
// check tag data
tRes := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodGet, tagsWithIdResource(uint32(tagId)), nil, http.StatusOK, &tRes)
if !(tRes.Total > 0) {
t.Errorf("tag total should be greater than 0 but it is not")
}
if !(tRes.Stored > 0) {
t.Errorf("tag stored should be greater than 0 but it is not")
}
if !(tRes.Split > 0) {
t.Errorf("tag split should be greater than 0 but it is not")
}
})
t.Run("dir-tags", func(t *testing.T) {
// upload a dir without supplying tag
tarReader := tarFiles(t, []f{{
data: []byte("some data"),
name: "binary-file",
}})
expectedHash := swarm.MustParseHexAddress("9e5acfbfeb7e074d4c79f5f9922e8a25990dad267d0ea7becaaad07b47fb2a87")
expectedResponse := api.FileUploadResponse{Reference: expectedHash}
sentHeaders := make(http.Header)
sentHeaders.Set("Content-Type", api.ContentTypeTar)
respHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, dirResource, tarReader, http.StatusOK, expectedResponse, sentHeaders)
tagId, err := strconv.Atoi(respHeaders.Get(api.SwarmTagUidHeader))
if err != nil {
t.Fatal(err)
}
// check tag data
tRes := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodGet, tagsWithIdResource(uint32(tagId)), nil, http.StatusOK, &tRes)
if !(tRes.Total > 0) {
t.Errorf("tag total should be greater than 0 but it is not")
}
if !(tRes.Stored > 0) {
t.Errorf("tag stored should be greater than 0 but it is not")
}
if !(tRes.Split > 0) {
t.Errorf("tag split should be greater than 0 but it is not")
}
})
t.Run("bytes-tags", func(t *testing.T) {
// create a tag using the API
tr := api.TagResponse{}
b, err := json.Marshal(api.TagResponse{
Name: someTagName,
})
if err != nil {
t.Fatal(err)
}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodPost, tagsResource, bytes.NewReader(b), http.StatusCreated, &tr)
if tr.Name != someTagName {
t.Fatalf("sent tag name %s does not match received tag name %s", someTagName, tr.Name)
}
sentHeaders := make(http.Header)
sentHeaders.Set(api.SwarmTagUidHeader, strconv.FormatUint(uint64(tr.Uid), 10))
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
dataChunk, err := g.SequentialBytes(swarm.ChunkSize)
if err != nil {
t.Fatal(err)
}
rootAddress := swarm.MustParseHexAddress("5e2a21902f51438be1adbd0e29e1bd34c53a21d3120aefa3c7275129f2f88de9")
content := make([]byte, swarm.ChunkSize*2)
copy(content[swarm.ChunkSize:], dataChunk)
copy(content[:swarm.ChunkSize], dataChunk)
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, bytesResource, bytes.NewReader(content), http.StatusOK, fileUploadResponse{
Reference: rootAddress,
}, sentHeaders)
id := isTagFoundInResponse(t, rcvdHeaders, nil)
tagToVerify, err := tag.Get(id)
if err != nil {
t.Fatal(err)
}
if tagToVerify.Uid != tr.Uid {
t.Fatalf("expected tag id to be %d but is %d", tagToVerify.Uid, tr.Uid)
}
finalTag := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodGet, tagsWithIdResource(id), nil, http.StatusOK, &finalTag)
if finalTag.Total != 0 {
t.Errorf("tag total count mismatch. got %d want %d", finalTag.Total, 0)
}
if finalTag.Seen != 1 {
t.Errorf("tag seen count mismatch. got %d want %d", finalTag.Seen, 1)
}
if finalTag.Stored != 3 {
t.Errorf("tag stored count mismatch. got %d want %d", finalTag.Stored, 3)
}
if !finalTag.Address.Equal(swarm.ZeroAddress) {
t.Errorf("address mismatch: expected %s got %s", rootAddress.String(), finalTag.Address.String())
}
})
}
// isTagFoundInResponse verifies that the tag id is found in the supplied HTTP headers
// if an API tag response is supplied, it also verifies that it contains an id which matches the headers
func isTagFoundInResponse(t *testing.T, headers http.Header, tr *api.TagResponse) uint32 {
idStr := headers.Get(api.SwarmTagUidHeader)
if idStr == "" {
t.Fatalf("could not find tag id header in chunk upload response")
}
nId, err := strconv.Atoi(idStr)
id := uint32(nId)
if err != nil {
t.Fatal(err)
}
if tr != nil {
if id != tr.Uid {
t.Fatalf("expected created tag id to be %d, but got %d when uploading chunk", tr.Uid, id)
}
}
return id
}
......@@ -12,7 +12,6 @@ type (
AddressesResponse = addressesResponse
PinnedChunk = pinnedChunk
ListPinnedChunksResponse = listPinnedChunksResponse
TagResponse = tagResponse
WelcomeMessageRequest = welcomeMessageRequest
WelcomeMessageResponse = welcomeMessageResponse
BalancesResponse = balancesResponse
......
......@@ -78,12 +78,6 @@ func (s *server) setupRouting() {
router.Handle("/chunks-pin", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.listPinnedChunks),
})
router.Handle("/tags", jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.createTag),
})
router.Handle("/tags/{uid}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.getTag),
})
router.Handle("/topology", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.topologyHandler),
})
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package debugapi_test
import (
"bytes"
"net/http"
"strconv"
"testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
mp "github.com/ethersphere/bee/pkg/pusher/mock"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"gitlab.com/nolash/go-mockbytes"
)
type fileUploadResponse struct {
Reference swarm.Address `json:"reference"`
}
func TestTags(t *testing.T) {
var (
bytesResource = "/bytes"
resource = func(addr swarm.Address) string { return "/chunks/" + addr.String() }
tagResourceUidCreate = func(name string) string { return "/tags?name=" + name }
tagResourceUUid = func(uuid uint64) string { return "/tags/" + strconv.FormatUint(uuid, 10) }
validHash = swarm.MustParseHexAddress("aabbcc")
validContent = []byte("bbaatt")
tag = tags.NewTags()
mockStorer = mock.NewStorer()
mockPusher = mp.NewMockPusher(tag)
ts = newTestServer(t, testServerOptions{
Storer: mockStorer,
Tags: tag,
})
// This server is used to store chunks
apiClient = newBZZTestServer(t, testServerOptions{
Storer: mockStorer,
Tags: tag,
})
)
t.Run("send-invalid-tag-id", func(t *testing.T) {
sentHheaders := make(http.Header)
sentHheaders.Set(api.TagHeaderUid, "file.jpg") // the value should be uint32
_ = jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, apiClient, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusBadRequest, jsonhttp.StatusResponse{
Message: "invalid taguid",
Code: http.StatusBadRequest,
}, sentHheaders)
})
t.Run("uid-header-in-return-for-empty-tag", func(t *testing.T) {
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, apiClient, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, nil)
isTagFoundInResponse(t, rcvdHeaders, nil)
})
t.Run("get-tag-and-use-it-to-upload-chunk", func(t *testing.T) {
// Get a tag using API
ta := debugapi.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, ts.Client, http.MethodPost, tagResourceUidCreate("file.jpg"), nil, http.StatusOK, &ta)
if ta.Name != "file.jpg" {
t.Fatalf("tagname is not the same that we sent")
}
// Now upload a chunk and see if we receive a tag with the same uid
sentHheaders := make(http.Header)
sentHheaders.Set(api.TagHeaderUid, strconv.FormatUint(uint64(ta.Uid), 10))
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, apiClient, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, sentHheaders)
isTagFoundInResponse(t, rcvdHeaders, &ta)
})
t.Run("get-tag-and-use-it-to-upload-multiple-chunk", func(t *testing.T) {
// Get a tag using API
ta := debugapi.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, ts.Client, http.MethodPost, tagResourceUidCreate("file.jpg"), nil, http.StatusOK, &ta)
if ta.Name != "file.jpg" {
t.Fatalf("tagname is not the same that we sent")
}
// Now upload a chunk and see if we receive a tag with the same uid
sentHheaders := make(http.Header)
sentHheaders.Set(api.TagHeaderUid, strconv.FormatUint(uint64(ta.Uid), 10))
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, apiClient, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, sentHheaders)
isTagFoundInResponse(t, rcvdHeaders, &ta)
secondValidHash := swarm.MustParseHexAddress("deadbeaf")
secondValidContent := []byte("123456")
sentHheaders = make(http.Header)
sentHheaders.Set(api.TagHeaderUid, strconv.FormatUint(uint64(ta.Uid), 10))
rcvdHeaders = jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, apiClient, http.MethodPost, resource(secondValidHash), bytes.NewReader(secondValidContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, sentHheaders)
isTagFoundInResponse(t, rcvdHeaders, &ta)
})
t.Run("get-tag-indirectly-and-use-it-to-upload-chunk", func(t *testing.T) {
//Upload anew chunk and we give aUID in response and apps can use that too
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, apiClient, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, nil)
uuid := isTagFoundInResponse(t, rcvdHeaders, nil)
// see if the tagid is present and has valid values
ta := debugapi.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, ts.Client, http.MethodGet, tagResourceUUid(uuid), nil, http.StatusOK, &ta)
// Now upload another chunk using the same tag id
sentHheaders := make(http.Header)
sentHheaders.Set(api.TagHeaderUid, strconv.FormatUint(uuid, 10))
_ = jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, apiClient, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, sentHheaders)
// see if the tagid is present and has valid values
ta = debugapi.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, ts.Client, http.MethodGet, tagResourceUUid(uuid), nil, http.StatusOK, &ta)
if uuid != uint64(ta.Uid) {
t.Fatalf("Invalid uuid response")
}
if ta.Stored != 2 {
t.Fatalf("same tag not used")
}
})
t.Run("get-tag-using-uuid", func(t *testing.T) {
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, apiClient, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, nil)
uuid := isTagFoundInResponse(t, rcvdHeaders, nil)
// Request the tag and see if the UUID is the same
ta := debugapi.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, ts.Client, http.MethodGet, tagResourceUUid(uuid), nil, http.StatusOK, &ta)
if uuid != uint64(ta.Uid) {
t.Fatalf("Invalid uuid response")
}
})
t.Run("tag-counters", func(t *testing.T) {
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, apiClient, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, nil)
uuid1 := isTagFoundInResponse(t, rcvdHeaders, nil)
tagToVerify, err := tag.Get(uint32(uuid1))
if err != nil {
t.Fatal(err)
}
err = mockPusher.SendChunk(uint32(uuid1))
if err != nil {
t.Fatal(err)
}
err = mockPusher.RcvdReceipt(uint32(uuid1))
if err != nil {
t.Fatal(err)
}
finalTag := debugapi.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, ts.Client, http.MethodGet, tagResourceUUid(uuid1), nil, http.StatusOK, &finalTag)
if tagToVerify.Total != finalTag.Total {
t.Errorf("tag total count mismatch. got %d want %d", tagToVerify.Total, finalTag.Total)
}
if tagToVerify.Seen != finalTag.Seen {
t.Errorf("tag seen count mismatch. got %d want %d", tagToVerify.Seen, finalTag.Seen)
}
if tagToVerify.Stored != finalTag.Stored {
t.Errorf("tag stored count mismatch. got %d want %d", tagToVerify.Stored, finalTag.Stored)
}
if tagToVerify.Sent != finalTag.Sent {
t.Errorf("tag sent count mismatch. got %d want %d", tagToVerify.Sent, finalTag.Sent)
}
if tagToVerify.Synced != finalTag.Synced {
t.Errorf("tag synced count mismatch. got %d want %d", tagToVerify.Synced, finalTag.Synced)
}
})
t.Run("bytes-tag-counters", func(t *testing.T) {
// Get a tag using API
ta := debugapi.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, ts.Client, http.MethodPost, tagResourceUidCreate("file.jpg"), nil, http.StatusOK, &ta)
if ta.Name != "file.jpg" {
t.Fatalf("tagname is not the same that we sent")
}
sentHheaders := make(http.Header)
sentHheaders.Set(api.TagHeaderUid, strconv.FormatUint(uint64(ta.Uid), 10))
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
dataChunk, err := g.SequentialBytes(swarm.ChunkSize)
if err != nil {
t.Fatal(err)
}
rootAddress := swarm.MustParseHexAddress("5e2a21902f51438be1adbd0e29e1bd34c53a21d3120aefa3c7275129f2f88de9")
content := make([]byte, swarm.ChunkSize*2)
copy(content[swarm.ChunkSize:], dataChunk)
copy(content[:swarm.ChunkSize], dataChunk)
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, apiClient, http.MethodPost, bytesResource, bytes.NewReader(content), http.StatusOK, fileUploadResponse{
Reference: rootAddress,
}, sentHheaders)
uuid1 := isTagFoundInResponse(t, rcvdHeaders, nil)
tagToVerify, err := tag.Get(uint32(uuid1))
if err != nil {
t.Fatal(err)
}
if tagToVerify.Uid != ta.Uid {
t.Fatalf("Invalid tagid received")
}
finalTag := debugapi.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, ts.Client, http.MethodGet, tagResourceUUid(uuid1), nil, http.StatusOK, &finalTag)
if finalTag.Total != 3 {
t.Errorf("tag total count mismatch. got %d want %d", finalTag.Total, 3)
}
if finalTag.Seen != 1 {
t.Errorf("tag seen count mismatch. got %d want %d", finalTag.Seen, 1)
}
if finalTag.Stored != 3 {
t.Errorf("tag stored count mismatch. got %d want %d", finalTag.Stored, 3)
}
if !finalTag.Address.Equal(rootAddress) {
t.Errorf("Address mismatch: expected %s got %s", rootAddress.String(), finalTag.Address.String())
}
})
}
func isTagFoundInResponse(t *testing.T, headers http.Header, tag *debugapi.TagResponse) uint64 {
uidStr := headers.Get(api.TagHeaderUid)
if uidStr == "" {
t.Fatalf("could not find tagid header in chunk upload response")
}
uid, err := strconv.ParseUint(uidStr, 10, 32)
if err != nil {
t.Fatal(err)
}
if tag != nil {
if uid != uint64(tag.Uid) {
t.Fatalf("uid created is not received while uploading chunk, expected : %d, got %d", tag.Uid, uid)
}
}
return uid
}
......@@ -13,6 +13,7 @@ import (
"github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bmt"
......@@ -51,7 +52,7 @@ type SimpleSplitterJob struct {
cursors []int // section write position, indexed per level
hasher bmt.Hash // underlying hasher used for hashing the tree
buffer []byte // keeps data and hashes, indexed by cursors
tagg *tags.Tag
tag *tags.Tag
toEncrypt bool // to encryrpt the chunks or not
refSize int64
}
......@@ -67,10 +68,6 @@ func NewSimpleSplitterJob(ctx context.Context, putter Putter, spanLength int64,
}
p := bmtlegacy.NewTreePool(hashFunc, swarm.Branches, bmtlegacy.PoolSize)
ta, ok := ctx.Value(tags.TagsContextKey{}).(*tags.Tag)
if !ok {
ta = nil
}
return &SimpleSplitterJob{
ctx: ctx,
putter: putter,
......@@ -79,7 +76,7 @@ func NewSimpleSplitterJob(ctx context.Context, putter Putter, spanLength int64,
cursors: make([]int, levelBufferLimit),
hasher: bmtlegacy.New(p),
buffer: make([]byte, swarm.ChunkWithSpanSize*levelBufferLimit*2), // double size as temp workaround for weak calculation of needed buffer space
tagg: ta,
tag: sctx.GetTag(ctx),
toEncrypt: toEncrypt,
refSize: refSize,
}
......@@ -181,8 +178,8 @@ func (s *SimpleSplitterJob) sumLevel(lvl int) ([]byte, error) {
// Add tag to the chunk if tag is valid
var ch swarm.Chunk
if s.tagg != nil {
ch = swarm.NewChunk(addr, c).WithTagID(s.tagg.Uid)
if s.tag != nil {
ch = swarm.NewChunk(addr, c).WithTagID(s.tag.Uid)
} else {
ch = swarm.NewChunk(addr, c)
}
......@@ -314,7 +311,7 @@ func (s *SimpleSplitterJob) newDataEncryption(key encryption.Key) *encryption.En
}
func (s *SimpleSplitterJob) incrTag(state tags.State) {
if s.tagg != nil {
s.tagg.Inc(state)
if s.tag != nil {
s.tag.Inc(state)
}
}
......@@ -108,6 +108,13 @@ func NonAuthoritativeInfo(w http.ResponseWriter, response interface{}) {
Respond(w, http.StatusNonAuthoritativeInfo, response)
}
// NoContent writes a response with status code 204. It does not
// accept a response value since the HTTP server will not write it
// to the client when returning a NoContent response.
func NoContent(w http.ResponseWriter) {
Respond(w, http.StatusNoContent, nil)
}
// ResetContent writes a response with status code 205.
func ResetContent(w http.ResponseWriter, response interface{}) {
Respond(w, http.StatusResetContent, response)
......
......@@ -28,7 +28,9 @@ func ResponseDirect(t *testing.T, client *http.Client, method, url string, body
t.Fatal(err)
}
got = bytes.TrimSpace(got)
if response == nil && len(got) == 0 {
return
}
want, err := json.Marshal(response)
if err != nil {
t.Error(err)
......
......@@ -209,6 +209,12 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
peer, err := ps.peerSuggester.ClosestPeer(ch.Address())
if err != nil {
if errors.Is(err, topology.ErrWantSelf) {
// this is to make sure that the sent number does not diverge from the synced counter
t, err := ps.tagg.Get(ch.TagID())
if err == nil && t != nil {
t.Inc(tags.StateSent)
}
// if you are the closest node return a receipt immediately
return &Receipt{
Address: ch.Address(),
......@@ -228,6 +234,7 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
_ = streamer.Reset()
return nil, fmt.Errorf("chunk deliver to peer %s: %w", peer.String(), err)
}
// if you manage to get a tag, just increment the respective counter
t, err := ps.tagg.Get(ch.TagID())
if err == nil && t != nil {
......
......@@ -6,10 +6,13 @@ package sctx
import (
"context"
"encoding/hex"
"errors"
"strings"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/trojan"
)
......@@ -39,18 +42,18 @@ func GetHost(ctx context.Context) string {
return ""
}
// SetTag sets the tag unique identifier in the context
func SetTag(ctx context.Context, tagId uint32) context.Context {
// SetTag sets the tag instance in the context
func SetTag(ctx context.Context, tagId *tags.Tag) context.Context {
return context.WithValue(ctx, tagKey{}, tagId)
}
// GetTag gets the tag unique identifier from the context
func GetTag(ctx context.Context) uint32 {
v, ok := ctx.Value(tagKey{}).(uint32)
if ok {
return v
// GetTag gets the tag instance from the context
func GetTag(ctx context.Context) *tags.Tag {
v, ok := ctx.Value(tagKey{}).(*tags.Tag)
if !ok {
return nil
}
return 0
return v
}
// SetTargets set the target string in the context to be used downstream in netstore
......
......@@ -177,7 +177,11 @@ func (t *Tag) Done(s State) bool {
func (t *Tag) DoneSplit(address swarm.Address) int64 {
total := atomic.LoadInt64(&t.Split)
atomic.StoreInt64(&t.Total, total)
t.Address = address
if !address.Equal(swarm.ZeroAddress) {
t.Address = address
}
return total
}
......
......@@ -26,7 +26,6 @@ import (
"sync"
"time"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -35,8 +34,6 @@ var (
ErrNotFound = errors.New("tag not found")
)
type TagsContextKey struct{}
// Tags hold tag information indexed by a unique random uint32
type Tags struct {
tags *sync.Map
......@@ -101,16 +98,6 @@ func (ts *Tags) GetByAddress(address swarm.Address) (*Tag, error) {
return t, nil
}
// GetFromContext gets a tag from the tag uid stored in the context
func (ts *Tags) GetFromContext(ctx context.Context) (*Tag, error) {
uid := sctx.GetTag(ctx)
t, ok := ts.tags.Load(uid)
if !ok {
return nil, ErrNotFound
}
return t.(*Tag), nil
}
// Range exposes sync.Map's iterator
func (ts *Tags) Range(fn func(k, v interface{}) bool) {
ts.tags.Range(fn)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment