Commit c6abb208 authored by Zahoor Mohamed's avatar Zahoor Mohamed Committed by GitHub

Add create tags in /files and /bytes API (#393)

Update tags for /file , /bytes and /chunks API
parent 3fa63f4c
......@@ -5,13 +5,18 @@
package api
import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/splitter"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/gorilla/mux"
)
......@@ -21,6 +26,10 @@ type bytesPostResponse struct {
// bytesUploadHandler handles upload of raw binary data of arbitrary length.
func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
ta := s.createTag(w, r)
// Add the tag to the context
r = r.WithContext(context.WithValue(r.Context(), tags.TagsContextKey{}, ta))
ctx := r.Context()
toEncrypt := strings.ToLower(r.Header.Get(EncryptHeader)) == "true"
......@@ -31,6 +40,11 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, nil)
return
}
ta.DoneSplit(address)
w.Header().Set(TagHeaderUid, fmt.Sprint(ta.Uid))
w.Header().Set("Access-Control-Expose-Headers", TagHeaderUid)
jsonhttp.OK(w, bytesPostResponse{
Reference: address,
})
......@@ -54,3 +68,39 @@ func (s *server) bytesGetHandler(w http.ResponseWriter, r *http.Request) {
s.downloadHandler(w, r, address, additionalHeaders)
}
func (s *server) createTag(w http.ResponseWriter, r *http.Request) *tags.Tag {
// if tag header is not there create a new one
var tag *tags.Tag
tagUidStr := r.Header.Get(TagHeaderUid)
if tagUidStr == "" {
tagName := fmt.Sprintf("unnamed_tag_%d", time.Now().Unix())
var err error
tag, err = s.Tags.Create(tagName, 0, false)
if err != nil {
s.Logger.Debugf("bytes upload: tag creation error: %v", err)
s.Logger.Error("bytes upload: tag creation")
jsonhttp.InternalServerError(w, "cannot create tag")
return nil
}
} else {
// if the tag uid header is present, then use the tag sent
tagUid, err := strconv.ParseUint(tagUidStr, 10, 32)
if err != nil {
s.Logger.Debugf("bytes upload: parse taguid %s: %v", tagUidStr, err)
s.Logger.Error("bytes upload: parse taguid")
jsonhttp.BadRequest(w, "invalid taguid")
return nil
}
tag, err = s.Tags.Get(uint32(tagUid))
if err != nil {
s.Logger.Debugf("bytes upload: get tag error: %v", err)
s.Logger.Error("bytes upload: get tag")
jsonhttp.InternalServerError(w, "cannot create tag")
return nil
}
}
return tag
}
......@@ -12,9 +12,7 @@ import (
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
......@@ -31,8 +29,6 @@ const PinHeaderName = "swarm-pin"
func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
addr := mux.Vars(r)["addr"]
ctx := r.Context()
address, err := swarm.ParseHexAddress(addr)
if err != nil {
s.Logger.Debugf("chunk upload: parse chunk address %s: %v", addr, err)
......@@ -41,40 +37,17 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}
// if tag header is not there create a new one
var tag *tags.Tag
tagUidStr := r.Header.Get(TagHeaderUid)
if tagUidStr == "" {
tagName := fmt.Sprintf("unnamed_tag_%d", time.Now().Unix())
tag, err = s.Tags.Create(tagName, 0, false)
if err != nil {
s.Logger.Debugf("chunk upload: tag creation error: %v, addr %s", err, address)
s.Logger.Error("chunk upload: tag creation error")
jsonhttp.InternalServerError(w, "cannot create tag")
return
}
} else {
// if the tag uid header is present, then use the tag sent
tagUid, err := strconv.ParseUint(tagUidStr, 10, 32)
if err != nil {
s.Logger.Debugf("chunk upload: parse taguid %s: %v", tagUidStr, err)
s.Logger.Error("chunk upload: parse taguid")
jsonhttp.BadRequest(w, "invalid taguid")
return
}
tag, err = s.Tags.Get(uint32(tagUid))
if err != nil {
s.Logger.Debugf("chunk upload: tag get error: %v, addr %s", err, address)
s.Logger.Error("chunk upload: tag get error")
jsonhttp.InternalServerError(w, "cannot create tag")
return
}
tag := s.createTag(w, r)
if tag == nil {
return
}
// Increment the total tags here since we dont have a splitter
// for the file upload, it will done in the early stage itself in bulk
tag.Inc(tags.TotalChunks)
// Add the tag to the context
r = r.WithContext(context.WithValue(r.Context(), tags.TagsContextKey{}, tag))
ctx := r.Context()
// Increment the StateSplit here since we dont have a splitter for the file upload
tag.Inc(tags.StateSplit)
data, err := ioutil.ReadAll(r.Body)
if err != nil {
......@@ -82,7 +55,6 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
s.Logger.Error("chunk upload: read chunk data error")
jsonhttp.InternalServerError(w, "cannot read chunk data")
return
}
seen, err := s.Storer.Put(ctx, storage.ModePutUpload, swarm.NewChunk(address, data))
......@@ -110,6 +82,8 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
}
}
tag.DoneSplit(address)
w.Header().Set(TagHeaderUid, fmt.Sprint(tag.Uid))
w.Header().Set("Access-Control-Expose-Headers", TagHeaderUid)
jsonhttp.OK(w, nil)
......
......@@ -28,6 +28,7 @@ import (
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/gorilla/mux"
)
......@@ -61,11 +62,16 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}
ctx := r.Context()
var reader io.Reader
var fileName, contentLength string
var fileSize uint64
ta := s.createTag(w, r)
// Add the tag to the context
r = r.WithContext(context.WithValue(r.Context(), tags.TagsContextKey{}, ta))
ctx := r.Context()
if mediaType == multiPartFormData {
mr := multipart.NewReader(r.Body, params["boundary"])
......@@ -196,7 +202,12 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "could not store entry")
return
}
ta.DoneSplit(reference)
w.Header().Set("ETag", fmt.Sprintf("%q", reference.String()))
w.Header().Set(TagHeaderUid, fmt.Sprint(ta.Uid))
w.Header().Set("Access-Control-Expose-Headers", TagHeaderUid)
jsonhttp.OK(w, fileUploadResponse{
Reference: reference,
})
......
......@@ -19,10 +19,16 @@ import (
"github.com/ethersphere/bee/pkg/storage/mock/validator"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"gitlab.com/nolash/go-mockbytes"
)
type fileUploadResponse struct {
Reference swarm.Address `json:"reference"`
}
func TestTags(t *testing.T) {
var (
bytesResource = "/bytes"
resource = func(addr swarm.Address) string { return "/chunks/" + addr.String() }
tagResourceUidCreate = func(name string) string { return "/tags?name=" + name }
tagResourceUUid = func(uuid uint64) string { return "/tags/" + strconv.FormatUint(uuid, 10) }
......@@ -202,6 +208,70 @@ func TestTags(t *testing.T) {
t.Errorf("tag synced count mismatch. got %d want %d", tagToVerify.Synced, finalTag.Synced)
}
})
t.Run("bytes-tag-counters", func(t *testing.T) {
// Get a tag using API
ta := debugapi.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, ts.Client, http.MethodPost, tagResourceUidCreate("file.jpg"), nil, http.StatusOK, &ta)
if ta.Name != "file.jpg" {
t.Fatalf("tagname is not the same that we sent")
}
sentHheaders := make(http.Header)
sentHheaders.Set(api.TagHeaderUid, strconv.FormatUint(uint64(ta.Uid), 10))
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
dataChunk, err := g.SequentialBytes(swarm.ChunkSize)
if err != nil {
t.Fatal(err)
}
chunkAddress := swarm.MustParseHexAddress("c10090961e7682a10890c334d759a28426647141213abda93b096b892824d2ef")
rootBytes := swarm.MustParseHexAddress("c10090961e7682a10890c334d759a28426647141213abda93b096b892824d2ef").Bytes()
rootChunk := make([]byte, 64)
copy(rootChunk[:32], rootBytes)
copy(rootChunk[32:], rootBytes)
rootAddress := swarm.MustParseHexAddress("5e2a21902f51438be1adbd0e29e1bd34c53a21d3120aefa3c7275129f2f88de9")
mockValidator.AddPair(chunkAddress, dataChunk)
mockValidator.AddPair(rootAddress, rootChunk)
content := make([]byte, swarm.ChunkSize*2)
copy(content[swarm.ChunkSize:], dataChunk)
copy(content[:swarm.ChunkSize], dataChunk)
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, apiClient, http.MethodPost, bytesResource, bytes.NewReader(content), http.StatusOK, fileUploadResponse{
Reference: rootAddress,
}, sentHheaders)
uuid1 := isTagFoundInResponse(t, rcvdHeaders, nil)
tagToVerify, err := tag.Get(uint32(uuid1))
if err != nil {
t.Fatal(err)
}
if tagToVerify.Uid != ta.Uid {
t.Fatalf("Invalid tagid received")
}
finalTag := debugapi.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, ts.Client, http.MethodGet, tagResourceUUid(uuid1), nil, http.StatusOK, &finalTag)
if finalTag.Total != 3 {
t.Errorf("tag total count mismatch. got %d want %d", finalTag.Total, 3)
}
if finalTag.Seen != 3 {
t.Errorf("tag seen count mismatch. got %d want %d", finalTag.Seen, 3)
}
if finalTag.Stored != 3 {
t.Errorf("tag stored count mismatch. got %d want %d", finalTag.Stored, 3)
}
if !finalTag.Address.Equal(rootAddress) {
t.Errorf("Address mismatch: expected %s got %s", rootAddress.String(), finalTag.Address.String())
}
})
}
func isTagFoundInResponse(t *testing.T, headers http.Header, tag *debugapi.TagResponse) uint64 {
......
......@@ -15,6 +15,7 @@ import (
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bmt"
bmtlegacy "github.com/ethersphere/bmt/legacy"
"golang.org/x/crypto/sha3"
......@@ -47,7 +48,8 @@ type SimpleSplitterJob struct {
cursors []int // section write position, indexed per level
hasher bmt.Hash // underlying hasher used for hashing the tree
buffer []byte // keeps data and hashes, indexed by cursors
toEncrypt bool // to encryrpt the chunks or not
tagg *tags.Tag
toEncrypt bool // to encryrpt the chunks or not
refSize int64
}
......@@ -61,6 +63,11 @@ func NewSimpleSplitterJob(ctx context.Context, putter storage.Putter, spanLength
refSize += encryption.KeyLength
}
p := bmtlegacy.NewTreePool(hashFunc, swarm.Branches, bmtlegacy.PoolSize)
ta, ok := ctx.Value(tags.TagsContextKey{}).(*tags.Tag)
if !ok {
ta = nil
}
return &SimpleSplitterJob{
ctx: ctx,
putter: putter,
......@@ -69,6 +76,7 @@ func NewSimpleSplitterJob(ctx context.Context, putter storage.Putter, spanLength
cursors: make([]int, levelBufferLimit),
hasher: bmtlegacy.New(p),
buffer: make([]byte, swarm.ChunkWithSpanSize*levelBufferLimit*2), // double size as temp workaround for weak calculation of needed buffer space
tagg: ta,
toEncrypt: toEncrypt,
refSize: refSize,
}
......@@ -144,6 +152,7 @@ func (s *SimpleSplitterJob) sumLevel(lvl int) ([]byte, error) {
binary.LittleEndian.PutUint64(head, uint64(span))
tail := s.buffer[s.cursors[lvl+1]:s.cursors[lvl]]
chunkData = append(head, tail...)
s.incrTag(tags.StateSplit)
c := chunkData
var encryptionKey encryption.Key
......@@ -168,11 +177,15 @@ func (s *SimpleSplitterJob) sumLevel(lvl int) ([]byte, error) {
addr = swarm.NewAddress(ref)
ch := swarm.NewChunk(addr, c)
_, err = s.putter.Put(s.ctx, storage.ModePutUpload, ch)
seen, err := s.putter.Put(s.ctx, storage.ModePutUpload, ch)
if err != nil {
return nil, err
} else if len(seen) > 0 && seen[0] {
s.incrTag(tags.StateSeen)
}
s.incrTag(tags.StateStored)
return append(ch.Address().Bytes(), encryptionKey...), nil
}
......@@ -289,3 +302,9 @@ func (s *SimpleSplitterJob) newSpanEncryption(key encryption.Key) *encryption.En
func (s *SimpleSplitterJob) newDataEncryption(key encryption.Key) *encryption.Encryption {
return encryption.New(key, int(swarm.ChunkSize), 0, sha3.NewLegacyKeccak256)
}
func (s *SimpleSplitterJob) incrTag(state tags.State) {
if s.tagg != nil {
s.tagg.Inc(state)
}
}
......@@ -63,5 +63,6 @@ func (s *simpleSplitter) Split(ctx context.Context, r io.ReadCloser, dataLength
}
sum := j.Sum(nil)
return swarm.NewAddress(sum), nil
newAddress := swarm.NewAddress(sum)
return newAddress, nil
}
......@@ -96,7 +96,7 @@ func TestNetstoreNoRetrieval(t *testing.T) {
}
// returns a mock retrieval protocol, a mock local storage and a netstore
func newRetrievingNetstore() (ret *retrievalMock, mockStore storage.Storer, ns storage.Storer) {
func newRetrievingNetstore() (ret *retrievalMock, mockStore, ns storage.Storer) {
retrieve := &retrievalMock{}
store := mock.NewStorer()
logger := logging.New(ioutil.Discard, 0)
......
......@@ -47,7 +47,6 @@ func (s Store) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Add
// once the receipt is got this check to see if the localstore is updated to see if the chunk is set
// as ModeSetSyncPush status.
func TestSendChunkToPushSyncWithTag(t *testing.T) {
// create a trigger and a closestpeer
triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
......
......@@ -35,6 +35,8 @@ func (v *MockValidator) Validate(ch swarm.Chunk) (valid bool) {
if data, ok := v.addressDataPair[ch.Address().String()]; ok {
if bytes.Equal(data, ch.Data()) {
return true
} else if len(ch.Data()) > 8 && bytes.Equal(data, ch.Data()[8:]) {
return true
}
}
return false
......
......@@ -35,6 +35,8 @@ var (
ErrNotFound = errors.New("tag not found")
)
type TagsContextKey struct{}
// Tags hold tag information indexed by a unique random uint32
type Tags struct {
tags *sync.Map
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment