Commit 4abf0717 authored by Nemanja Zbiljić's avatar Nemanja Zbiljić Committed by GitHub

Simplify public tags API response (#1148)

- add API endpoint for listing tags from store
- rename 'swarm-tag-uid' header to 'swarm-tag'
- rename field 'stored' to 'processed'
- estimate total number of chunks during upload
parent 11631c51
......@@ -40,7 +40,7 @@ paths:
- Bytes
parameters:
- in: header
name: swarm-tag-uid
name: swarm-tag
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/Uid'
required: false
......@@ -140,7 +140,7 @@ paths:
- Chunk
parameters:
- in: header
name: swarm-tag-uid
name: swarm-tag
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/Uid'
required: false
......@@ -192,7 +192,7 @@ paths:
required: false
description: Filename
- in: header
name: swarm-tag-uid
name: swarm-tag
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/Uid'
required: false
......@@ -280,7 +280,7 @@ paths:
- Collection
parameters:
- in: header
name: swarm-tag-uid
name: swarm-tag
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/Uid'
required: false
......@@ -412,6 +412,23 @@ paths:
description: Default response
'/tags':
get:
summary: Get list of tags
tags:
- Tag
responses:
'200':
description: List of tags
content:
application/json:
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/TagsList'
'403':
$ref: 'SwarmCommon.yaml#/components/responses/403'
'500':
$ref: 'SwarmCommon.yaml#/components/responses/500'
default:
description: Default response
post:
summary: 'Create Tag'
tags:
......
......@@ -166,12 +166,24 @@ components:
NewTagRequest:
type: object
properties:
name:
type: string
address:
$ref: '#/components/schemas/SwarmAddress'
NewTagResponse:
type: object
properties:
uid:
$ref: '#/components/schemas/Uid'
startedAt:
$ref: '#/components/schemas/DateTime'
total:
type: integer
processed:
type: integer
synced:
type: integer
NewTagDebugResponse:
type: object
properties:
total:
......@@ -188,15 +200,19 @@ components:
type: integer
uid:
$ref: '#/components/schemas/Uid'
anonymous:
type: boolean
name:
type: string
address:
$ref: '#/components/schemas/SwarmAddress'
startedAt:
$ref: '#/components/schemas/DateTime'
TagsList:
type: object
properties:
tags:
type: array
items:
$ref: '#/components/schemas/NewTagResponse'
P2PUnderlay:
type: string
example: "/ip4/127.0.0.1/tcp/1634/p2p/16Uiu2HAmTm17toLDaPYzRyjKn27iCB76yjKnJ5DjQXneFmifFvaX"
......
......@@ -572,4 +572,32 @@ paths:
'500':
$ref: 'SwarmCommon.yaml#/components/responses/500'
default:
description: Default response
\ No newline at end of file
description: Default response
'/tags/{uid}':
get:
summary: 'Get Tag information using Uid'
tags:
- Tag
parameters:
- in: path
name: uid
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/Uid'
required: true
description: Uid
responses:
'200':
description: Tag info
content:
application/json:
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/NewTagDebugResponse'
'400':
$ref: 'SwarmCommon.yaml#/components/responses/400'
'403':
$ref: 'SwarmCommon.yaml#/components/responses/403'
'500':
$ref: 'SwarmCommon.yaml#/components/responses/500'
default:
description: Default response
......@@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"io"
"math"
"net/http"
"strconv"
"strings"
......@@ -30,7 +31,7 @@ import (
const (
SwarmPinHeader = "Swarm-Pin"
SwarmTagUidHeader = "Swarm-Tag-Uid"
SwarmTagHeader = "Swarm-Tag"
SwarmEncryptHeader = "Swarm-Encrypt"
SwarmIndexDocumentHeader = "Swarm-Index-Document"
SwarmErrorDocumentHeader = "Swarm-Error-Document"
......@@ -132,9 +133,7 @@ func (s *server) Close() error {
func (s *server) getOrCreateTag(tagUid string) (*tags.Tag, bool, error) {
// if tag ID is not supplied, create a new tag
if tagUid == "" {
tagName := fmt.Sprintf("unnamed_tag_%d", time.Now().Unix())
var err error
tag, err := s.Tags.Create(tagName, 0)
tag, err := s.Tags.Create(0)
if err != nil {
return nil, false, fmt.Errorf("cannot create tag: %w", err)
}
......@@ -273,3 +272,33 @@ func requestPipelineFn(s storage.Storer, r *http.Request) pipelineFunc {
return builder.FeedPipeline(ctx, pipe, r, l)
}
}
// calculateNumberOfChunks calculates the number of chunks in an arbitrary
// content length.
func calculateNumberOfChunks(contentLength int64, isEncrypted bool) int64 {
if contentLength <= swarm.ChunkSize {
return 1
}
branchingFactor := swarm.Branches
if isEncrypted {
branchingFactor = swarm.EncryptedBranches
}
dataChunks := math.Ceil(float64(contentLength) / float64(swarm.ChunkSize))
totalChunks := dataChunks
intermediate := dataChunks / float64(branchingFactor)
for intermediate > 1 {
totalChunks += math.Ceil(intermediate)
intermediate = intermediate / float64(branchingFactor)
}
return int64(totalChunks) + 1
}
func requestCalculateNumberOfChunks(r *http.Request) int64 {
if !strings.Contains(r.Header.Get(contentTypeHeader), "multipart") && r.ContentLength > 0 {
return calculateNumberOfChunks(r.ContentLength, requestEncrypt(r))
}
return 0
}
......@@ -181,3 +181,40 @@ func TestParseName(t *testing.T) {
})
}
}
// TestCalculateNumberOfChunks is a unit test for
// the chunk-number-according-to-content-length calculation.
func TestCalculateNumberOfChunks(t *testing.T) {
for _, tc := range []struct{ len, chunks int64 }{
{len: 1000, chunks: 1},
{len: 5000, chunks: 3},
{len: 10000, chunks: 4},
{len: 100000, chunks: 26},
{len: 1000000, chunks: 248},
{len: 325839339210, chunks: 79550620 + 621490 + 4856 + 38 + 1},
} {
res := api.CalculateNumberOfChunks(tc.len, false)
if res != tc.chunks {
t.Fatalf("expected result for %d bytes to be %d got %d", tc.len, tc.chunks, res)
}
}
}
// TestCalculateNumberOfChunksEncrypted is a unit test for
// the chunk-number-according-to-content-length calculation with encryption
// (branching factor=64)
func TestCalculateNumberOfChunksEncrypted(t *testing.T) {
for _, tc := range []struct{ len, chunks int64 }{
{len: 1000, chunks: 1},
{len: 5000, chunks: 3},
{len: 10000, chunks: 4},
{len: 100000, chunks: 26},
{len: 1000000, chunks: 245 + 4 + 1},
{len: 325839339210, chunks: 79550620 + 1242979 + 19422 + 304 + 5 + 1},
} {
res := api.CalculateNumberOfChunks(tc.len, true)
if res != tc.chunks {
t.Fatalf("expected result for %d bytes to be %d got %d", tc.len, tc.chunks, res)
}
}
}
......@@ -12,6 +12,7 @@ import (
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/gorilla/mux"
)
......@@ -24,7 +25,7 @@ type bytesPostResponse struct {
func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
logger := tracing.NewLoggerWithTraceID(r.Context(), s.Logger)
tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagUidHeader))
tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagHeader))
if err != nil {
logger.Debugf("bytes upload: get or create tag: %v", err)
logger.Error("bytes upload: get or create tag")
......@@ -32,6 +33,19 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}
if !created {
// only in the case when tag is sent via header (i.e. not created by this request)
if estimatedTotalChunks := requestCalculateNumberOfChunks(r); estimatedTotalChunks > 0 {
err = tag.IncN(tags.TotalChunks, estimatedTotalChunks)
if err != nil {
s.Logger.Debugf("bytes upload: increment tag: %v", err)
s.Logger.Error("bytes upload: increment tag")
jsonhttp.InternalServerError(w, "increment tag")
return
}
}
}
// Add the tag to the context
ctx := sctx.SetTag(r.Context(), tag)
......@@ -52,8 +66,8 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}
}
w.Header().Set(SwarmTagUidHeader, fmt.Sprint(tag.Uid))
w.Header().Set("Access-Control-Expose-Headers", SwarmTagUidHeader)
w.Header().Set(SwarmTagHeader, fmt.Sprint(tag.Uid))
w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader)
jsonhttp.OK(w, bytesPostResponse{
Reference: address,
})
......
......@@ -34,7 +34,7 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
err error
)
if h := r.Header.Get(SwarmTagUidHeader); h != "" {
if h := r.Header.Get(SwarmTagHeader); h != "" {
tag, err = s.getTag(h)
if err != nil {
s.Logger.Debugf("chunk upload: get tag: %v", err)
......@@ -119,10 +119,10 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "increment tag")
return
}
w.Header().Set(SwarmTagUidHeader, fmt.Sprint(tag.Uid))
w.Header().Set(SwarmTagHeader, fmt.Sprint(tag.Uid))
}
w.Header().Set("Access-Control-Expose-Headers", SwarmTagUidHeader)
w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader)
jsonhttp.OK(w, chunkAddressResponse{Reference: address})
}
......
......@@ -26,6 +26,7 @@ import (
"github.com/ethersphere/bee/pkg/manifest"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/tracing"
)
......@@ -51,7 +52,7 @@ func (s *server) dirUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}
tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagUidHeader))
tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagHeader))
if err != nil {
logger.Debugf("dir upload: get or create tag: %v", err)
logger.Error("dir upload: get or create tag")
......@@ -64,7 +65,7 @@ func (s *server) dirUploadHandler(w http.ResponseWriter, r *http.Request) {
p := requestPipelineFn(s.Storer, r)
encrypt := requestEncrypt(r)
l := loadsave.New(s.Storer, requestModePut(r), encrypt)
reference, err := storeDir(ctx, encrypt, r.Body, s.Logger, p, l, r.Header.Get(SwarmIndexDocumentHeader), r.Header.Get(SwarmErrorDocumentHeader))
reference, err := storeDir(ctx, encrypt, r.Body, s.Logger, p, l, r.Header.Get(SwarmIndexDocumentHeader), r.Header.Get(SwarmErrorDocumentHeader), tag, created)
if err != nil {
logger.Debugf("dir upload: store dir err: %v", err)
logger.Errorf("dir upload: store dir")
......@@ -80,7 +81,7 @@ func (s *server) dirUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}
}
w.Header().Set(SwarmTagUidHeader, fmt.Sprint(tag.Uid))
w.Header().Set(SwarmTagHeader, fmt.Sprint(tag.Uid))
jsonhttp.OK(w, fileUploadResponse{
Reference: reference,
})
......@@ -104,7 +105,7 @@ func validateRequest(r *http.Request) error {
// storeDir stores all files recursively contained in the directory given as a tar
// it returns the hash for the uploaded manifest corresponding to the uploaded dir
func storeDir(ctx context.Context, encrypt bool, reader io.ReadCloser, log logging.Logger, p pipelineFunc, ls file.LoadSaver, indexFilename string, errorFilename string) (swarm.Address, error) {
func storeDir(ctx context.Context, encrypt bool, reader io.ReadCloser, log logging.Logger, p pipelineFunc, ls file.LoadSaver, indexFilename string, errorFilename string, tag *tags.Tag, tagCreated bool) (swarm.Address, error) {
logger := tracing.NewLoggerWithTraceID(ctx, log)
dirManifest, err := manifest.NewDefaultManifest(ls, encrypt)
......@@ -159,7 +160,19 @@ func storeDir(ctx context.Context, encrypt bool, reader io.ReadCloser, log loggi
contentType: contentType,
reader: tarReader,
}
fileReference, err := storeFile(ctx, fileInfo, p)
if !tagCreated {
// only in the case when tag is sent via header (i.e. not created by this request)
// for each file
if estimatedTotalChunks := calculateNumberOfChunks(fileInfo.size, encrypt); estimatedTotalChunks > 0 {
err = tag.IncN(tags.TotalChunks, estimatedTotalChunks)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("increment tag: %w", err)
}
}
}
fileReference, err := storeFile(ctx, fileInfo, p, encrypt, tag, tagCreated)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("store dir file: %w", err)
}
......@@ -195,8 +208,23 @@ func storeDir(ctx context.Context, encrypt bool, reader io.ReadCloser, log loggi
}
}
storeSizeFn := []manifest.StoreSizeFunc{}
if !tagCreated {
// only in the case when tag is sent via header (i.e. not created by this request)
// each content that is saved for manifest
storeSizeFn = append(storeSizeFn, func(dataSize int64) error {
if estimatedTotalChunks := calculateNumberOfChunks(dataSize, encrypt); estimatedTotalChunks > 0 {
err = tag.IncN(tags.TotalChunks, estimatedTotalChunks)
if err != nil {
return fmt.Errorf("increment tag: %w", err)
}
}
return nil
})
}
// save manifest
manifestBytesReference, err := dirManifest.Store(ctx)
manifestBytesReference, err := dirManifest.Store(ctx, storeSizeFn...)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("store manifest: %w", err)
}
......@@ -209,6 +237,17 @@ func storeDir(ctx context.Context, encrypt bool, reader io.ReadCloser, log loggi
return swarm.ZeroAddress, fmt.Errorf("metadata marshal: %w", err)
}
if !tagCreated {
// we have additional chunks:
// - for manifest file metadata (1 or more) -> we use estimation function
// - for manifest file collection entry (1)
estimatedTotalChunks := calculateNumberOfChunks(int64(len(metadataBytes)), encrypt)
err = tag.IncN(tags.TotalChunks, estimatedTotalChunks+1)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("increment tag: %w", err)
}
}
mr, err := p(ctx, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split metadata: %w", err)
......@@ -231,7 +270,7 @@ func storeDir(ctx context.Context, encrypt bool, reader io.ReadCloser, log loggi
// storeFile uploads the given file and returns its reference
// this function was extracted from `fileUploadHandler` and should eventually replace its current code
func storeFile(ctx context.Context, fileInfo *fileUploadInfo, p pipelineFunc) (swarm.Address, error) {
func storeFile(ctx context.Context, fileInfo *fileUploadInfo, p pipelineFunc, encrypt bool, tag *tags.Tag, tagCreated bool) (swarm.Address, error) {
// first store the file and get its reference
fr, err := p(ctx, fileInfo.reader, fileInfo.size)
if err != nil {
......@@ -251,6 +290,17 @@ func storeFile(ctx context.Context, fileInfo *fileUploadInfo, p pipelineFunc) (s
return swarm.ZeroAddress, fmt.Errorf("metadata marshal: %w", err)
}
if !tagCreated {
// here we have additional chunks:
// - for metadata (1 or more) -> we use estimation function
// - for collection entry (1)
estimatedTotalChunks := calculateNumberOfChunks(int64(len(metadataBytes)), encrypt)
err = tag.IncN(tags.TotalChunks, estimatedTotalChunks+1)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("increment tag: %w", err)
}
}
mr, err := p(ctx, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split metadata: %w", err)
......
......@@ -15,6 +15,7 @@ type (
FileUploadResponse = fileUploadResponse
TagResponse = tagResponse
TagRequest = tagRequest
ListTagsResponse = listTagsResponse
PinnedChunk = pinnedChunk
ListPinnedChunksResponse = listPinnedChunksResponse
UpdatePinCounter = updatePinCounter
......@@ -38,3 +39,7 @@ var (
func (s *Server) ResolveNameOrAddress(str string) (swarm.Address, error) {
return s.resolveNameOrAddress(str)
}
func CalculateNumberOfChunks(contentLength int64, isEncrypted bool) int64 {
return calculateNumberOfChunks(contentLength, isEncrypted)
}
......@@ -26,6 +26,7 @@ import (
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/langos"
"github.com/gorilla/mux"
......@@ -60,7 +61,7 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}
tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagUidHeader))
tag, created, err := s.getOrCreateTag(r.Header.Get(SwarmTagHeader))
if err != nil {
logger.Debugf("file upload: get or create tag: %v", err)
logger.Error("file upload: get or create tag")
......@@ -68,6 +69,19 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}
if !created {
// only in the case when tag is sent via header (i.e. not created by this request)
if estimatedTotalChunks := requestCalculateNumberOfChunks(r); estimatedTotalChunks > 0 {
err = tag.IncN(tags.TotalChunks, estimatedTotalChunks)
if err != nil {
s.Logger.Debugf("file upload: increment tag: %v", err)
s.Logger.Error("file upload: increment tag")
jsonhttp.InternalServerError(w, "increment tag")
return
}
}
}
// Add the tag to the context
ctx := sctx.SetTag(r.Context(), tag)
......@@ -176,6 +190,23 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "metadata marshal error")
return
}
if !created {
// only in the case when tag is sent via header (i.e. not created by this request)
// here we have additional chunks:
// - for metadata (1 or more) -> we use estimation function
// - for collection entry (1)
estimatedTotalChunks := calculateNumberOfChunks(int64(len(metadataBytes)), requestEncrypt(r))
err = tag.IncN(tags.TotalChunks, estimatedTotalChunks+1)
if err != nil {
s.Logger.Debugf("file upload: increment tag: %v", err)
s.Logger.Error("file upload: increment tag")
jsonhttp.InternalServerError(w, "increment tag")
return
}
}
mr, err := p(ctx, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil {
logger.Debugf("file upload: metadata store, file %q: %v", fileName, err)
......@@ -210,8 +241,8 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
}
}
w.Header().Set("ETag", fmt.Sprintf("%q", reference.String()))
w.Header().Set(SwarmTagUidHeader, fmt.Sprint(tag.Uid))
w.Header().Set("Access-Control-Expose-Headers", SwarmTagUidHeader)
w.Header().Set(SwarmTagHeader, fmt.Sprint(tag.Uid))
w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader)
jsonhttp.OK(w, fileUploadResponse{
Reference: reference,
})
......
......@@ -119,6 +119,7 @@ func (s *server) setupRouting() {
handle(router, "/tags", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler,
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.listTagsHandler),
"POST": web.ChainHandlers(
jsonhttp.NewMaxBodyBytesHandler(1024),
web.FinalHandlerFunc(s.createTagHandler),
......
......@@ -7,7 +7,6 @@ package api
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strconv"
......@@ -20,35 +19,28 @@ import (
)
type tagRequest struct {
Name string `json:"name,omitempty"`
Address swarm.Address `json:"address,omitempty"`
}
type tagResponse struct {
Total int64 `json:"total"`
Split int64 `json:"split"`
Seen int64 `json:"seen"`
Stored int64 `json:"stored"`
Sent int64 `json:"sent"`
Synced int64 `json:"synced"`
Uid uint32 `json:"uid"`
Name string `json:"name"`
Address swarm.Address `json:"address"`
StartedAt time.Time `json:"startedAt"`
Uid uint32 `json:"uid"`
StartedAt time.Time `json:"startedAt"`
Total int64 `json:"total"`
Processed int64 `json:"processed"`
Synced int64 `json:"synced"`
}
type listTagsResponse struct {
Tags []tagResponse `json:"tags"`
}
func newTagResponse(tag *tags.Tag) tagResponse {
return tagResponse{
Total: tag.Total,
Split: tag.Split,
Seen: tag.Seen,
Stored: tag.Stored,
Sent: tag.Sent,
Synced: tag.Synced,
Uid: tag.Uid,
Name: tag.Name,
Address: tag.Address,
StartedAt: tag.StartedAt,
Total: tag.Total,
Processed: tag.Stored,
Synced: tag.Seen + tag.Synced,
}
}
......@@ -75,11 +67,7 @@ func (s *server) createTagHandler(w http.ResponseWriter, r *http.Request) {
}
}
if tagr.Name == "" {
tagr.Name = fmt.Sprintf("unnamed_tag_%d", time.Now().Unix())
}
tag, err := s.Tags.Create(tagr.Name, 0)
tag, err := s.Tags.Create(0)
if err != nil {
s.Logger.Debugf("create tag: tag create error: %v", err)
s.Logger.Error("create tag: tag create error")
......@@ -204,3 +192,44 @@ func (s *server) doneSplitHandler(w http.ResponseWriter, r *http.Request) {
}
jsonhttp.OK(w, "ok")
}
func (s *server) listTagsHandler(w http.ResponseWriter, r *http.Request) {
var (
err error
offset, limit = 0, 100 // default offset is 0, default limit 100
)
if v := r.URL.Query().Get("offset"); v != "" {
offset, err = strconv.Atoi(v)
if err != nil {
s.Logger.Debugf("list tags: parse offset: %v", err)
s.Logger.Errorf("list tags: bad offset")
jsonhttp.BadRequest(w, "bad offset")
}
}
if v := r.URL.Query().Get("limit"); v != "" {
limit, err = strconv.Atoi(v)
if err != nil {
s.Logger.Debugf("list tags: parse limit: %v", err)
s.Logger.Errorf("list tags: bad limit")
jsonhttp.BadRequest(w, "bad limit")
}
}
tagList, err := s.Tags.ListAll(r.Context(), offset, limit)
if err != nil {
s.Logger.Debugf("list tags: listing: %v", err)
s.Logger.Errorf("list tags: listing")
jsonhttp.InternalServerError(w, err)
return
}
tags := make([]tagResponse, len(tagList))
for i, t := range tagList {
tags[i] = newTagResponse(t)
}
jsonhttp.OK(w, listTagsResponse{
Tags: tags,
})
}
......@@ -9,8 +9,8 @@ import (
"fmt"
"io/ioutil"
"net/http"
"sort"
"strconv"
"strings"
"testing"
"github.com/ethersphere/bee/pkg/logging"
......@@ -41,7 +41,6 @@ func TestTags(t *testing.T) {
chunksResource = "/chunks"
tagsResource = "/tags"
chunk = testingc.GenerateTestRandomChunk()
someTagName = "file.jpg"
mockStatestore = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0)
tag = tags.NewTags(mockStatestore, logger)
......@@ -51,30 +50,21 @@ func TestTags(t *testing.T) {
})
)
t.Run("create unnamed tag", func(t *testing.T) {
tr := api.TagResponse{}
jsonhttptest.Request(t, client, http.MethodPost, tagsResource, http.StatusCreated,
jsonhttptest.WithJSONRequestBody(api.TagRequest{}),
jsonhttptest.WithUnmarshalJSONResponse(&tr),
// list tags without anything pinned
t.Run("list tags zero", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodGet, tagsResource, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.ListTagsResponse{
Tags: []api.TagResponse{},
}),
)
if !strings.Contains(tr.Name, "unnamed_tag_") {
t.Fatalf("expected tag name to contain %s but is %s instead", "unnamed_tag_", tr.Name)
}
})
t.Run("create tag with name", func(t *testing.T) {
t.Run("create tag", func(t *testing.T) {
tr := api.TagResponse{}
jsonhttptest.Request(t, client, http.MethodPost, tagsResource, http.StatusCreated,
jsonhttptest.WithJSONRequestBody(api.TagRequest{
Name: someTagName,
}),
jsonhttptest.WithJSONRequestBody(api.TagRequest{}),
jsonhttptest.WithUnmarshalJSONResponse(&tr),
)
if tr.Name != someTagName {
t.Fatalf("expected tag name to be %s but is %s instead", someTagName, tr.Name)
}
})
t.Run("create tag with invalid id", func(t *testing.T) {
......@@ -84,7 +74,7 @@ func TestTags(t *testing.T) {
Message: "cannot get tag",
Code: http.StatusBadRequest,
}),
jsonhttptest.WithRequestHeader(api.SwarmTagUidHeader, "invalid_id.jpg"), // the value should be uint32
jsonhttptest.WithRequestHeader(api.SwarmTagHeader, "invalid_id.jpg"), // the value should be uint32
)
})
......@@ -110,16 +100,10 @@ func TestTags(t *testing.T) {
// create a tag using the API
tr := api.TagResponse{}
jsonhttptest.Request(t, client, http.MethodPost, tagsResource, http.StatusCreated,
jsonhttptest.WithJSONRequestBody(api.TagResponse{
Name: someTagName,
}),
jsonhttptest.WithJSONRequestBody(api.TagRequest{}),
jsonhttptest.WithUnmarshalJSONResponse(&tr),
)
if tr.Name != someTagName {
t.Fatalf("sent tag name %s does not match received tag name %s", someTagName, tr.Name)
}
_ = jsonhttptest.Request(t, client, http.MethodPost, chunksResource, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())),
jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: chunk.Address()}),
......@@ -128,13 +112,49 @@ func TestTags(t *testing.T) {
rcvdHeaders := jsonhttptest.Request(t, client, http.MethodPost, chunksResource, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())),
jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: chunk.Address()}),
jsonhttptest.WithRequestHeader(api.SwarmTagUidHeader, strconv.FormatUint(uint64(tr.Uid), 10)),
jsonhttptest.WithRequestHeader(api.SwarmTagHeader, strconv.FormatUint(uint64(tr.Uid), 10)),
)
isTagFoundInResponse(t, rcvdHeaders, &tr)
tagValueTest(t, tr.Uid, 1, 1, 1, 0, 0, 0, swarm.ZeroAddress, client)
})
t.Run("list tags", func(t *testing.T) {
// list all current tags
var resp api.ListTagsResponse
jsonhttptest.Request(t, client, http.MethodGet, tagsResource, http.StatusOK,
jsonhttptest.WithUnmarshalJSONResponse(&resp),
)
// create 2 new tags
tRes1 := api.TagResponse{}
jsonhttptest.Request(t, client, http.MethodPost, tagsResource, http.StatusCreated,
jsonhttptest.WithJSONRequestBody(api.TagRequest{}),
jsonhttptest.WithUnmarshalJSONResponse(&tRes1),
)
tRes2 := api.TagResponse{}
jsonhttptest.Request(t, client, http.MethodPost, tagsResource, http.StatusCreated,
jsonhttptest.WithJSONRequestBody(api.TagRequest{}),
jsonhttptest.WithUnmarshalJSONResponse(&tRes2),
)
expectedTags := []api.TagResponse{
tRes1,
tRes2,
}
expectedTags = append(expectedTags, resp.Tags...)
sort.Slice(expectedTags, func(i, j int) bool { return expectedTags[i].Uid < expectedTags[j].Uid })
// check if listing returns expected tags
jsonhttptest.Request(t, client, http.MethodGet, tagsResource, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.ListTagsResponse{
Tags: expectedTags,
}),
)
})
t.Run("delete tag error", func(t *testing.T) {
// try to delete invalid tag
jsonhttptest.Request(t, client, http.MethodDelete, tagsResource+"/foobar", http.StatusBadRequest,
......@@ -157,9 +177,7 @@ func TestTags(t *testing.T) {
// create a tag through API
tRes := api.TagResponse{}
jsonhttptest.Request(t, client, http.MethodPost, tagsResource, http.StatusCreated,
jsonhttptest.WithJSONRequestBody(api.TagResponse{
Name: someTagName,
}),
jsonhttptest.WithJSONRequestBody(api.TagRequest{}),
jsonhttptest.WithUnmarshalJSONResponse(&tRes),
)
......@@ -201,9 +219,7 @@ func TestTags(t *testing.T) {
// create a tag through API
tRes := api.TagResponse{}
jsonhttptest.Request(t, client, http.MethodPost, tagsResource, http.StatusCreated,
jsonhttptest.WithJSONRequestBody(api.TagResponse{
Name: someTagName,
}),
jsonhttptest.WithJSONRequestBody(api.TagRequest{}),
jsonhttptest.WithUnmarshalJSONResponse(&tRes),
)
tagId := tRes.Uid
......@@ -214,7 +230,7 @@ func TestTags(t *testing.T) {
// upload content with tag
jsonhttptest.Request(t, client, http.MethodPost, chunksResource, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())),
jsonhttptest.WithRequestHeader(api.SwarmTagUidHeader, fmt.Sprint(tagId)),
jsonhttptest.WithRequestHeader(api.SwarmTagHeader, fmt.Sprint(tagId)),
)
// call done split
......@@ -256,7 +272,7 @@ func TestTags(t *testing.T) {
jsonhttptest.WithRequestHeader("Content-Type", "application/octet-stream"),
)
tagId, err := strconv.Atoi(respHeaders.Get(api.SwarmTagUidHeader))
tagId, err := strconv.Atoi(respHeaders.Get(api.SwarmTagHeader))
if err != nil {
t.Fatal(err)
}
......@@ -278,7 +294,7 @@ func TestTags(t *testing.T) {
jsonhttptest.WithRequestHeader("Content-Type", api.ContentTypeTar),
)
tagId, err := strconv.Atoi(respHeaders.Get(api.SwarmTagUidHeader))
tagId, err := strconv.Atoi(respHeaders.Get(api.SwarmTagHeader))
if err != nil {
t.Fatal(err)
}
......@@ -289,17 +305,12 @@ func TestTags(t *testing.T) {
// create a tag using the API
tr := api.TagResponse{}
jsonhttptest.Request(t, client, http.MethodPost, tagsResource, http.StatusCreated,
jsonhttptest.WithJSONRequestBody(api.TagResponse{
Name: someTagName,
}),
jsonhttptest.WithJSONRequestBody(api.TagRequest{}),
jsonhttptest.WithUnmarshalJSONResponse(&tr),
)
if tr.Name != someTagName {
t.Fatalf("sent tag name %s does not match received tag name %s", someTagName, tr.Name)
}
sentHeaders := make(http.Header)
sentHeaders.Set(api.SwarmTagUidHeader, strconv.FormatUint(uint64(tr.Uid), 10))
sentHeaders.Set(api.SwarmTagHeader, strconv.FormatUint(uint64(tr.Uid), 10))
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
dataChunk, err := g.SequentialBytes(swarm.ChunkSize)
......@@ -318,7 +329,7 @@ func TestTags(t *testing.T) {
jsonhttptest.WithExpectedJSONResponse(fileUploadResponse{
Reference: rootAddress,
}),
jsonhttptest.WithRequestHeader(api.SwarmTagUidHeader, strconv.FormatUint(uint64(tr.Uid), 10)),
jsonhttptest.WithRequestHeader(api.SwarmTagHeader, strconv.FormatUint(uint64(tr.Uid), 10)),
)
id := isTagFoundInResponse(t, rcvdHeaders, nil)
......@@ -330,7 +341,7 @@ func TestTags(t *testing.T) {
if tagToVerify.Uid != tr.Uid {
t.Fatalf("expected tag id to be %d but is %d", tagToVerify.Uid, tr.Uid)
}
tagValueTest(t, id, 3, 3, 1, 0, 0, 0, swarm.ZeroAddress, client)
tagValueTest(t, id, 3, 3, 1, 0, 0, 3, swarm.ZeroAddress, client)
})
}
......@@ -339,7 +350,7 @@ func TestTags(t *testing.T) {
func isTagFoundInResponse(t *testing.T, headers http.Header, tr *api.TagResponse) uint32 {
t.Helper()
idStr := headers.Get(api.SwarmTagUidHeader)
idStr := headers.Get(api.SwarmTagHeader)
if idStr == "" {
t.Fatalf("could not find tag id header in chunk upload response")
}
......@@ -363,26 +374,13 @@ func tagValueTest(t *testing.T, id uint32, split, stored, seen, sent, synced, to
jsonhttptest.WithUnmarshalJSONResponse(&tag),
)
if tag.Split != split {
t.Errorf("tag split count mismatch. got %d want %d", tag.Split, split)
if tag.Processed != stored {
t.Errorf("tag processed count mismatch. got %d want %d", tag.Processed, stored)
}
if tag.Stored != stored {
t.Errorf("tag stored count mismatch. got %d want %d", tag.Stored, stored)
}
if tag.Seen != seen {
t.Errorf("tag seen count mismatch. got %d want %d", tag.Seen, seen)
}
if tag.Sent != sent {
t.Errorf("tag sent count mismatch. got %d want %d", tag.Sent, sent)
}
if tag.Synced != synced {
t.Errorf("tag synced count mismatch. got %d want %d", tag.Synced, synced)
if tag.Synced != seen+synced {
t.Errorf("tag synced count mismatch. got %d want %d (seen: %d, synced: %d)", tag.Synced, seen+synced, seen, synced)
}
if tag.Total != total {
t.Errorf("tag total count mismatch. got %d want %d", tag.Total, total)
}
if !tag.Address.Equal(address) {
t.Errorf("address mismatch: expected %s got %s", address.String(), tag.Address.String())
}
}
......@@ -25,6 +25,7 @@ type (
SwapCashoutResponse = swapCashoutResponse
SwapCashoutStatusResponse = swapCashoutStatusResponse
SwapCashoutStatusResult = swapCashoutStatusResult
TagResponse = tagResponse
)
var (
......
......@@ -145,6 +145,10 @@ func (s *server) setupRouting() {
})
}
router.Handle("/tags/{id}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.getTagHandler),
})
baseRouter.Handle("/", web.ChainHandlers(
httpaccess.NewHTTPAccessLogHandler(s.Logger, logrus.InfoLevel, s.Tracer, "debug api access"),
handlers.CompressHandler,
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package debugapi
import (
"errors"
"net/http"
"strconv"
"time"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/gorilla/mux"
)
type tagResponse struct {
Total int64 `json:"total"`
Split int64 `json:"split"`
Seen int64 `json:"seen"`
Stored int64 `json:"stored"`
Sent int64 `json:"sent"`
Synced int64 `json:"synced"`
Uid uint32 `json:"uid"`
Address swarm.Address `json:"address"`
StartedAt time.Time `json:"startedAt"`
}
func newTagResponse(tag *tags.Tag) tagResponse {
return tagResponse{
Total: tag.Total,
Split: tag.Split,
Seen: tag.Seen,
Stored: tag.Stored,
Sent: tag.Sent,
Synced: tag.Synced,
Uid: tag.Uid,
Address: tag.Address,
StartedAt: tag.StartedAt,
}
}
func (s *server) getTagHandler(w http.ResponseWriter, r *http.Request) {
idStr := mux.Vars(r)["id"]
id, err := strconv.Atoi(idStr)
if err != nil {
s.Logger.Debugf("get tag: parse id %s: %v", idStr, err)
s.Logger.Error("get tag: parse id")
jsonhttp.BadRequest(w, "invalid id")
return
}
tag, err := s.Tags.Get(uint32(id))
if err != nil {
if errors.Is(err, tags.ErrNotFound) {
s.Logger.Debugf("get tag: tag not present: %v, id %s", err, idStr)
s.Logger.Error("get tag: tag not present")
jsonhttp.NotFound(w, "tag not present")
return
}
s.Logger.Debugf("get tag: tag %v: %v", idStr, err)
s.Logger.Errorf("get tag: %v", idStr)
jsonhttp.InternalServerError(w, "cannot get tag")
return
}
w.Header().Set("Cache-Control", "no-cache, private, max-age=0")
jsonhttp.OK(w, newTagResponse(tag))
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package debugapi_test
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"testing"
"github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
testingc "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
)
func tagsWithIdResource(id uint32) string { return fmt.Sprintf("/tags/%d", id) }
func TestTags(t *testing.T) {
var (
logger = logging.New(ioutil.Discard, 0)
chunk = testingc.GenerateTestRandomChunk()
mockStorer = mock.NewStorer()
mockStatestore = statestore.NewStateStore()
tagsStore = tags.NewTags(mockStatestore, logger)
testServer = newTestServer(t, testServerOptions{
Storer: mockStorer,
Tags: tagsStore,
})
)
_, err := mockStorer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
t.Run("all", func(t *testing.T) {
tag, err := tagsStore.Create(0)
if err != nil {
t.Fatal(err)
}
_ = tag.Inc(tags.StateSplit)
_ = tag.Inc(tags.StateStored)
_ = tag.Inc(tags.StateSeen)
_ = tag.Inc(tags.StateSent)
_ = tag.Inc(tags.StateSynced)
_, err = tag.DoneSplit(chunk.Address())
if err != nil {
t.Fatal(err)
}
tagValueTest(t, tag.Uid, 1, 1, 1, 1, 1, 1, chunk.Address(), testServer.Client)
})
}
func tagValueTest(t *testing.T, id uint32, split, stored, seen, sent, synced, total int64, address swarm.Address, client *http.Client) {
t.Helper()
tag := debugapi.TagResponse{}
jsonhttptest.Request(t, client, http.MethodGet, tagsWithIdResource(id), http.StatusOK,
jsonhttptest.WithUnmarshalJSONResponse(&tag),
)
if tag.Split != split {
t.Errorf("tag split count mismatch. got %d want %d", tag.Split, split)
}
if tag.Stored != stored {
t.Errorf("tag stored count mismatch. got %d want %d", tag.Stored, stored)
}
if tag.Seen != seen {
t.Errorf("tag seen count mismatch. got %d want %d", tag.Seen, seen)
}
if tag.Sent != sent {
t.Errorf("tag sent count mismatch. got %d want %d", tag.Sent, sent)
}
if tag.Synced != synced {
t.Errorf("tag synced count mismatch. got %d want %d", tag.Synced, synced)
}
if tag.Total != total {
t.Errorf("tag total count mismatch. got %d want %d", tag.Total, total)
}
if !tag.Address.Equal(address) {
t.Errorf("address mismatch: expected %s got %s", address.String(), tag.Address.String())
}
}
......@@ -40,7 +40,7 @@ func TestModeSetSyncNormalTag(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
db := newTestDB(t, &Options{Tags: tags.NewTags(mockStatestore, logger)})
tag, err := db.tags.Create("test", 1)
tag, err := db.tags.Create(1)
if err != nil {
t.Fatal(err)
}
......
......@@ -27,6 +27,10 @@ var (
ErrMissingReference = errors.New("manifest: missing reference")
)
// StoreSizeFunc is a callback on every content size that will be stored by
// the Store function.
type StoreSizeFunc func(int64) error
// Interface for operations with manifest.
type Interface interface {
// Type returns manifest implementation type information
......@@ -40,7 +44,7 @@ type Interface interface {
// HasPrefix tests whether the specified prefix path exists.
HasPrefix(context.Context, string) (bool, error)
// Store stores the manifest, returning the resulting address.
Store(context.Context) (swarm.Address, error)
Store(context.Context, ...StoreSizeFunc) (swarm.Address, error)
// IterateAddresses is used to iterate over chunks addresses for
// the manifest.
IterateAddresses(context.Context, swarm.AddressIterFunc) error
......
......@@ -107,8 +107,18 @@ func (m *mantarayManifest) HasPrefix(ctx context.Context, prefix string) (bool,
return m.trie.HasPrefix(ctx, p, m.ls)
}
func (m *mantarayManifest) Store(ctx context.Context) (swarm.Address, error) {
err := m.trie.Save(ctx, m.ls)
func (m *mantarayManifest) Store(ctx context.Context, storeSizeFn ...StoreSizeFunc) (swarm.Address, error) {
var ls mantaray.LoadSaver
if len(storeSizeFn) > 0 {
ls = &mantarayLoadSaver{
ls: m.ls,
storeSizeFn: storeSizeFn,
}
} else {
ls = m.ls
}
err := m.trie.Save(ctx, ls)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("manifest save error: %w", err)
}
......@@ -159,3 +169,24 @@ func (m *mantarayManifest) IterateAddresses(ctx context.Context, fn swarm.Addres
return nil
}
type mantarayLoadSaver struct {
ls file.LoadSaver
storeSizeFn []StoreSizeFunc
}
func (ls *mantarayLoadSaver) Load(ctx context.Context, ref []byte) ([]byte, error) {
return ls.ls.Load(ctx, ref)
}
func (ls *mantarayLoadSaver) Save(ctx context.Context, data []byte) ([]byte, error) {
dataLen := int64(len(data))
for i := range ls.storeSizeFn {
err := ls.storeSizeFn[i](dataLen)
if err != nil {
return nil, fmt.Errorf("manifest store size func: %w", err)
}
}
return ls.ls.Save(ctx, data)
}
......@@ -88,12 +88,22 @@ func (m *simpleManifest) HasPrefix(_ context.Context, prefix string) (bool, erro
return m.manifest.HasPrefix(prefix), nil
}
func (m *simpleManifest) Store(ctx context.Context) (swarm.Address, error) {
func (m *simpleManifest) Store(ctx context.Context, storeSizeFn ...StoreSizeFunc) (swarm.Address, error) {
data, err := m.manifest.MarshalBinary()
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("manifest marshal error: %w", err)
}
if len(storeSizeFn) > 0 {
dataLen := int64(len(data))
for i := range storeSizeFn {
err = storeSizeFn[i](dataLen)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("manifest store size func: %w", err)
}
}
}
ref, err := m.ls.Save(ctx, data)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("manifest save error: %w", err)
......
......@@ -82,7 +82,7 @@ func TestSendChunkToSyncWithTag(t *testing.T) {
mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
defer storer.Close()
ta, err := mtags.Create("test", 1)
ta, err := mtags.Create(1)
if err != nil {
t.Fatal(err)
}
......
......@@ -111,7 +111,7 @@ func TestPushChunkToClosest(t *testing.T) {
psPivot, storerPivot, pivotTags, pivotAccounting := createPushSyncNode(t, pivotNode, recorder, nil, mock.WithClosestPeer(closestPeer))
defer storerPivot.Close()
ta, err := pivotTags.Create("test", 1)
ta, err := pivotTags.Create(1)
if err != nil {
t.Fatal(err)
}
......@@ -230,7 +230,7 @@ func TestPushChunkToNextClosest(t *testing.T) {
)
defer storerPivot.Close()
ta, err := pivotTags.Create("test", 1)
ta, err := pivotTags.Create(1)
if err != nil {
t.Fatal(err)
}
......
......@@ -19,6 +19,7 @@ const (
SpanSize = 8
SectionSize = 32
Branches = 128
EncryptedBranches = Branches / 2
BmtBranches = 128
ChunkSize = SectionSize * Branches
HashSize = 32
......
......@@ -60,7 +60,6 @@ type Tag struct {
Synced int64 // number of chunks synced with proof
Uid uint32 // a unique identifier for this tag
Name string // a name tag for this tag
Address swarm.Address // the associated swarm hash for this tag
StartedAt time.Time // tag started to calculate ETA
......@@ -73,10 +72,9 @@ type Tag struct {
}
// NewTag creates a new tag, and returns it
func NewTag(ctx context.Context, uid uint32, s string, total int64, tracer *tracing.Tracer, stateStore storage.StateStorer, logger logging.Logger) *Tag {
func NewTag(ctx context.Context, uid uint32, total int64, tracer *tracing.Tracer, stateStore storage.StateStorer, logger logging.Logger) *Tag {
t := &Tag{
Uid: uid,
Name: s,
StartedAt: time.Now(),
Total: total,
stateStore: stateStore,
......@@ -102,7 +100,7 @@ func (t *Tag) FinishRootSpan() {
}
// IncN increments the count for a state
func (t *Tag) IncN(state State, n int) error {
func (t *Tag) IncN(state State, n int64) error {
var v *int64
switch state {
case TotalChunks:
......@@ -118,7 +116,7 @@ func (t *Tag) IncN(state State, n int) error {
case StateSynced:
v = &t.Synced
}
atomic.AddInt64(v, int64(n))
atomic.AddInt64(v, n)
// check if syncing is over and persist the tag
if state == StateSynced {
......@@ -259,7 +257,6 @@ func (tag *Tag) MarshalBinary() (data []byte, err error) {
n = binary.PutVarint(intBuffer, int64(len(tag.Address.Bytes())))
buffer = append(buffer, intBuffer[:n]...)
buffer = append(buffer, tag.Address.Bytes()...)
buffer = append(buffer, []byte(tag.Name)...)
return buffer, nil
}
......@@ -288,7 +285,6 @@ func (tag *Tag) UnmarshalBinary(buffer []byte) error {
if t > 0 {
tag.Address = swarm.NewAddress(buffer[:t])
}
tag.Name = string(buffer[t:])
return nil
}
......
......@@ -176,8 +176,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(10 * 5 * n)
for i := 0; i < 10; i++ {
s := string([]byte{uint8(i)})
tag, err := ts.Create(s, int64(n))
tag, err := ts.Create(int64(n))
if err != nil {
t.Fatal(err)
}
......@@ -223,7 +222,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) {
func TestMarshallingWithAddr(t *testing.T) {
mockStatestore := statestore.NewStateStore()
logger := logging.New(ioutil.Discard, 0)
tg := NewTag(context.Background(), 111, "test/tag", 10, nil, mockStatestore, logger)
tg := NewTag(context.Background(), 111, 10, nil, mockStatestore, logger)
tg.Address = swarm.NewAddress([]byte{0, 1, 2, 3, 4, 5, 6})
for _, f := range allStates {
......@@ -248,10 +247,6 @@ func TestMarshallingWithAddr(t *testing.T) {
t.Fatalf("tag uids not equal. want %d got %d", tg.Uid, unmarshalledTag.Uid)
}
if unmarshalledTag.Name != tg.Name {
t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name)
}
for _, state := range allStates {
uv, tv := unmarshalledTag.Get(state), tg.Get(state)
if uv != tv {
......@@ -260,7 +255,7 @@ func TestMarshallingWithAddr(t *testing.T) {
}
if unmarshalledTag.TotalCounter() != tg.TotalCounter() {
t.Fatalf("tag names not equal. want %d got %d", tg.TotalCounter(), unmarshalledTag.TotalCounter())
t.Fatalf("tag total counters not equal. want %d got %d", tg.TotalCounter(), unmarshalledTag.TotalCounter())
}
if len(unmarshalledTag.Address.Bytes()) != len(tg.Address.Bytes()) {
......@@ -276,7 +271,7 @@ func TestMarshallingWithAddr(t *testing.T) {
func TestMarshallingNoAddr(t *testing.T) {
mockStatestore := statestore.NewStateStore()
logger := logging.New(ioutil.Discard, 0)
tg := NewTag(context.Background(), 111, "test/tag", 10, nil, mockStatestore, logger)
tg := NewTag(context.Background(), 111, 10, nil, mockStatestore, logger)
for _, f := range allStates {
err := tg.Inc(f)
if err != nil {
......@@ -299,10 +294,6 @@ func TestMarshallingNoAddr(t *testing.T) {
t.Fatalf("tag uids not equal. want %d got %d", tg.Uid, unmarshalledTag.Uid)
}
if unmarshalledTag.Name != tg.Name {
t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name)
}
for _, state := range allStates {
uv, tv := unmarshalledTag.Get(state), tg.Get(state)
if uv != tv {
......@@ -311,7 +302,7 @@ func TestMarshallingNoAddr(t *testing.T) {
}
if unmarshalledTag.TotalCounter() != tg.TotalCounter() {
t.Fatalf("tag names not equal. want %d got %d", tg.TotalCounter(), unmarshalledTag.TotalCounter())
t.Fatalf("tag total counters not equal. want %d got %d", tg.TotalCounter(), unmarshalledTag.TotalCounter())
}
if len(unmarshalledTag.Address.Bytes()) != len(tg.Address.Bytes()) {
......
......@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"math/rand"
"sort"
"strconv"
"sync"
"time"
......@@ -31,6 +32,10 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
)
const (
maxPage = 1000 // hard limit of page size
)
var (
TagUidFunc = rand.Uint32
ErrNotFound = errors.New("tag not found")
......@@ -52,10 +57,10 @@ func NewTags(stateStore storage.StateStorer, logger logging.Logger) *Tags {
}
}
// Create creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists
func (ts *Tags) Create(s string, total int64) (*Tag, error) {
t := NewTag(context.Background(), TagUidFunc(), s, total, nil, ts.stateStore, ts.logger)
// Create creates a new tag, stores it by the UID and returns it
// it returns an error if the tag with this UID already exists
func (ts *Tags) Create(total int64) (*Tag, error) {
t := NewTag(context.Background(), TagUidFunc(), total, nil, ts.stateStore, ts.logger)
if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded {
return nil, errExists
......@@ -157,6 +162,79 @@ func (ts *Tags) UnmarshalJSON(value []byte) error {
return err
}
func (ts *Tags) ListAll(ctx context.Context, offset, limit int) (t []*Tag, err error) {
if limit > maxPage {
limit = maxPage
}
// range sync.Map first
allTags := ts.All()
sort.Slice(allTags, func(i, j int) bool { return allTags[i].Uid < allTags[j].Uid })
for _, tag := range allTags {
if offset > 0 {
offset--
continue
}
t = append(t, tag)
limit--
if limit == 0 {
break
}
}
if limit == 0 {
return
}
// and then from statestore
err = ts.stateStore.Iterate("tags_", func(key, value []byte) (stop bool, err error) {
if offset > 0 {
offset--
return false, nil
}
var ta *Tag
ta, err = decodeTagValueFromStore(value)
if err != nil {
return true, err
}
if _, ok := ts.tags.Load(ta.Uid); ok {
// tag was already returned from sync.Map
return false, nil
}
t = append(t, ta)
limit--
if limit == 0 {
return true, nil
}
return false, nil
})
return t, err
}
func decodeTagValueFromStore(value []byte) (*Tag, error) {
var data []byte
err := json.Unmarshal(value, &data)
if err != nil {
return nil, err
}
var ta Tag
err = ta.UnmarshalBinary(data)
if err != nil {
return nil, err
}
return &ta, nil
}
// getTagFromStore get a given tag from the state store.
func (ts *Tags) getTagFromStore(uid uint32) (*Tag, error) {
key := "tags_" + strconv.Itoa(int(uid))
......
......@@ -17,7 +17,9 @@
package tags
import (
"context"
"io/ioutil"
"sort"
"testing"
"github.com/ethersphere/bee/pkg/logging"
......@@ -29,10 +31,10 @@ func TestAll(t *testing.T) {
mockStatestore := statestore.NewStateStore()
logger := logging.New(ioutil.Discard, 0)
ts := NewTags(mockStatestore, logger)
if _, err := ts.Create("1", 1); err != nil {
if _, err := ts.Create(1); err != nil {
t.Fatal(err)
}
if _, err := ts.Create("2", 1); err != nil {
if _, err := ts.Create(1); err != nil {
t.Fatal(err)
}
......@@ -50,7 +52,7 @@ func TestAll(t *testing.T) {
t.Fatalf("expected tag 1 Total to be 1 got %d", n)
}
if _, err := ts.Create("3", 1); err != nil {
if _, err := ts.Create(1); err != nil {
t.Fatal(err)
}
all = ts.All()
......@@ -60,11 +62,83 @@ func TestAll(t *testing.T) {
}
}
func TestListAll(t *testing.T) {
mockStatestore := statestore.NewStateStore()
logger := logging.New(ioutil.Discard, 0)
ts1 := NewTags(mockStatestore, logger)
// create few tags
for i := 0; i < 5; i++ {
if _, err := ts1.Create(1); err != nil {
t.Fatal(err)
}
}
// tags are from sync.Map
tagList1, err := ts1.ListAll(context.Background(), 0, 5)
if err != nil {
t.Fatal(err)
}
if len(tagList1) != 5 {
t.Fatalf("want %d tags but got %d", 5, len(tagList1))
}
// save all returned tags to statestore
for _, tag := range tagList1 {
err = tag.saveTag()
if err != nil {
t.Fatal(err)
}
}
// use new tags object
ts2 := NewTags(mockStatestore, logger)
// create few more tags in new tags object
for i := 0; i < 5; i++ {
if _, err := ts2.Create(1); err != nil {
t.Fatal(err)
}
}
// first tags are from sync.Map
tagList2, err := ts2.ListAll(context.Background(), 0, 5)
if err != nil {
t.Fatal(err)
}
if len(tagList2) != 5 {
t.Fatalf("want %d tags but got %d", 5, len(tagList2))
}
// now tags are returned from statestore
tagList3, err := ts2.ListAll(context.Background(), 5, 5)
if err != nil {
t.Fatal(err)
}
if len(tagList3) != 5 {
t.Fatalf("want %d tags but got %d", 5, len(tagList2))
}
// where they are not sorted
sort.Slice(tagList3, func(i, j int) bool { return tagList3[i].Uid < tagList3[j].Uid })
// and are the same as ones returned from first tags object
for i := range tagList3 {
if tagList1[i].Uid != tagList3[i].Uid {
t.Fatalf("expected tag %d, but got %d", tagList1[i].Uid, tagList3[i].Uid)
}
}
}
func TestPersistence(t *testing.T) {
mockStatestore := statestore.NewStateStore()
logger := logging.New(ioutil.Discard, 0)
ts := NewTags(mockStatestore, logger)
ta, err := ts.Create("one", 1)
ta, err := ts.Create(1)
if err != nil {
t.Fatal(err)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment