Commit 42c4eb40 authored by Zahoor Mohamed's avatar Zahoor Mohamed Committed by GitHub

Tagging chunks (#199)


* Tag for chunks
parent be6d5024
......@@ -11,6 +11,7 @@ import (
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/tracing"
)
......@@ -27,6 +28,7 @@ type server struct {
type Options struct {
Pingpong pingpong.Interface
Tags *tags.Tags
Storer storage.Storer
Logger logging.Logger
Tracer *tracing.Tracer
......
......@@ -15,17 +15,20 @@ import (
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/tags"
"resenje.org/web"
)
type testServerOptions struct {
Pingpong pingpong.Interface
Storer storage.Storer
Tags *tags.Tags
}
func newTestServer(t *testing.T, o testServerOptions) *http.Client {
s := api.New(api.Options{
Pingpong: o.Pingpong,
Tags: o.Tags,
Storer: o.Storer,
Logger: logging.New(ioutil.Discard, 0),
})
......
......@@ -7,17 +7,24 @@ package api
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/gorilla/mux"
)
// Presence of this header means that it needs to be tagged using the uid
const TagHeaderUid = "x-swarm-tag-uid"
// Presence of this header in the HTTP request indicates the chunk needs to be pinned.
const PinHeaderName = "x-swarm-pin"
......@@ -28,11 +35,46 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
address, err := swarm.ParseHexAddress(addr)
if err != nil {
s.Logger.Debugf("bzz-chunk: parse chunk address %s: %v", addr, err)
s.Logger.Error("bzz-chunk: error uploading chunk")
s.Logger.Error("bzz-chunk: parse chunk address")
jsonhttp.BadRequest(w, "invalid chunk address")
return
}
// if tag header is not there create a new one
var tag *tags.Tag
tagUidStr := r.Header.Get(TagHeaderUid)
if tagUidStr == "" {
tagName := fmt.Sprintf("unnamed_tag_%d", time.Now().Unix())
tag, err = s.Tags.Create(tagName, 0, false)
if err != nil {
s.Logger.Debugf("bzz-chunk: tag creation error: %v, addr %s", err, address)
s.Logger.Error("bzz-chunk: tag creation error")
jsonhttp.InternalServerError(w, "cannot create tag")
return
}
} else {
// if the tag uid header is present, then use the tag sent
tagUid, err := strconv.ParseUint(tagUidStr, 10, 32)
if err != nil {
s.Logger.Debugf("bzz-chunk: parse taguid %s: %v", tagUidStr, err)
s.Logger.Error("bzz-chunk: parse taguid")
jsonhttp.BadRequest(w, "invalid taguid")
return
}
tag, err = s.Tags.Get(uint32(tagUid))
if err != nil {
s.Logger.Debugf("bzz-chunk: tag get error: %v, addr %s", err, address)
s.Logger.Error("bzz-chunk: tag get error")
jsonhttp.InternalServerError(w, "cannot create tag")
return
}
}
// Increment the total tags here since we dont have a splitter
// for the file upload, it will done in the early stage itself in bulk
tag.Inc(tags.TotalChunks)
data, err := ioutil.ReadAll(r.Body)
if err != nil {
s.Logger.Debugf("bzz-chunk: read chunk data error: %v, addr %s", err, address)
......@@ -42,14 +84,19 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
}
_, err = s.Storer.Put(ctx, storage.ModePutUpload, swarm.NewChunk(address, data))
seen, err := s.Storer.Put(ctx, storage.ModePutUpload, swarm.NewChunk(address, data))
if err != nil {
s.Logger.Debugf("bzz-chunk: chunk write error: %v, addr %s", err, address)
s.Logger.Error("bzz-chunk: chunk write error")
jsonhttp.BadRequest(w, "chunk write error")
return
} else if len(seen) > 0 && seen[0] {
tag.Inc(tags.StateSeen)
}
// Indicate that the chunk is stored
tag.Inc(tags.StateStored)
// Check if this chunk needs to pinned and pin it
pinHeaderValues := r.Header.Get(PinHeaderName)
if pinHeaderValues != "" && strings.ToLower(pinHeaderValues) == "true" {
......@@ -62,6 +109,9 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
}
}
tag.Address = address
w.Header().Set(TagHeaderUid, fmt.Sprint(tag.Uid))
w.Header().Set("Access-Control-Expose-Headers", TagHeaderUid)
jsonhttp.OK(w, nil)
}
......
......@@ -11,6 +11,8 @@ import (
"net/http"
"testing"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
......@@ -31,9 +33,11 @@ func TestChunkUploadDownload(t *testing.T) {
validContent = []byte("bbaatt")
invalidContent = []byte("bbaattss")
mockValidator = validator.NewMockValidator(validHash, validContent)
mockValidatingStorer = mock.NewValidatingStorer(mockValidator)
tag = tags.NewTags()
mockValidatingStorer = mock.NewValidatingStorer(mockValidator, tag)
client = newTestServer(t, testServerOptions{
Storer: mockValidatingStorer,
Tags: tag,
})
)
......@@ -74,10 +78,11 @@ func TestChunkUploadDownload(t *testing.T) {
t.Fatal("data retrieved doesnt match uploaded content")
}
})
t.Run("pin-invalid-value", func(t *testing.T) {
headers := make(map[string][]string)
headers[api.PinHeaderName] = []string{"hdgdh"}
jsonhttptest.ResponseDirectWithHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, headers)
......@@ -89,7 +94,7 @@ func TestChunkUploadDownload(t *testing.T) {
})
t.Run("pin-header-missing", func(t *testing.T) {
headers := make(map[string][]string)
jsonhttptest.ResponseDirectWithHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, headers)
......@@ -102,7 +107,7 @@ func TestChunkUploadDownload(t *testing.T) {
t.Run("pin-ok", func(t *testing.T) {
headers := make(map[string][]string)
headers[api.PinHeaderName] = []string{"True"}
jsonhttptest.ResponseDirectWithHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, headers)
......@@ -113,7 +118,6 @@ func TestChunkUploadDownload(t *testing.T) {
}
})
}
func request(t *testing.T, client *http.Client, method string, resource string, body io.Reader, responseCode int) *http.Response {
......
......@@ -7,4 +7,5 @@ package api
type (
PingpongResponse = pingpongResponse
BzzPostResponse = bzzPostResponse
TagResponse = tagResponse
)
......@@ -45,6 +45,18 @@ func (s *server) setupRouting() {
"POST": http.HandlerFunc(s.chunkUploadHandler),
})
router.Handle("/bzz-tag/name/{name}", jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.CreateTag),
})
router.Handle("/bzz-tag/addr/{addr}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.getTagInfoUsingAddress),
})
router.Handle("/bzz-tag/uuid/{uuid}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.getTagInfoUsingUUid),
})
s.Handler = web.ChainHandlers(
logging.NewHTTPAccessLogHandler(s.Logger, logrus.InfoLevel, "api access"),
handlers.CompressHandler,
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"net/http"
"strconv"
"time"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/gorilla/mux"
)
type tagResponse struct {
Total int64 `json:"total"`
Split int64 `json:"split"`
Seen int64 `json:"seen"`
Stored int64 `json:"stored"`
Sent int64 `json:"sent"`
Synced int64 `json:"synced"`
Uid uint32 `json:"uid"`
Anonymous bool `json:"anonymous"`
Name string `json:"name"`
Address swarm.Address `json:"address"`
StartedAt time.Time `json:"startedAt"`
}
func newTagResponse(tag *tags.Tag) tagResponse {
return tagResponse{
Total: tag.Total,
Split: tag.Split,
Seen: tag.Seen,
Stored: tag.Stored,
Sent: tag.Sent,
Synced: tag.Synced,
Uid: tag.Uid,
Anonymous: tag.Anonymous,
Name: tag.Name,
Address: tag.Address,
StartedAt: tag.StartedAt,
}
}
func (s *server) CreateTag(w http.ResponseWriter, r *http.Request) {
tagName := mux.Vars(r)["name"]
tag, err := s.Tags.Create(tagName, 0, false)
if err != nil {
s.Logger.Debugf("bzz-chunk: tag create error: %v", err)
s.Logger.Error("bzz-chunk: tag create error")
jsonhttp.InternalServerError(w, "cannot create tag")
return
}
w.Header().Set("Cache-Control", "no-cache, private, max-age=0")
jsonhttp.OK(w, newTagResponse(tag))
}
func (s *server) getTagInfoUsingAddress(w http.ResponseWriter, r *http.Request) {
addr := mux.Vars(r)["addr"]
address, err := swarm.ParseHexAddress(addr)
if err != nil {
s.Logger.Debugf("bzz-tag: parse chunk address %s: %v", addr, err)
s.Logger.Error("bzz-tag: parse chunk address")
jsonhttp.BadRequest(w, "invalid chunk address")
return
}
tag, err := s.Tags.GetByAddress(address)
if err != nil {
s.Logger.Debugf("bzz-tag: tag not present %s : %v, ", address.String(), err)
s.Logger.Error("bzz-tag: tag not present")
jsonhttp.InternalServerError(w, "tag not present")
return
}
w.Header().Set("Cache-Control", "no-cache, private, max-age=0")
jsonhttp.OK(w, newTagResponse(tag))
}
func (s *server) getTagInfoUsingUUid(w http.ResponseWriter, r *http.Request) {
uidStr := mux.Vars(r)["uuid"]
uuid, err := strconv.ParseUint(uidStr, 10, 32)
if err != nil {
s.Logger.Debugf("bzz-tag: parse uid %s: %v", uidStr, err)
s.Logger.Error("bzz-tag: parse uid")
jsonhttp.BadRequest(w, "invalid uid")
return
}
tag, err := s.Tags.Get(uint32(uuid))
if err != nil {
s.Logger.Debugf("bzz-tag: tag not present : %v, uuid %s", err, uidStr)
s.Logger.Error("bzz-tag: tag not present")
jsonhttp.InternalServerError(w, "tag not present")
return
}
w.Header().Set("Cache-Control", "no-cache, private, max-age=0")
jsonhttp.OK(w, newTagResponse(tag))
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"bytes"
"net/http"
"strconv"
"testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
mp "github.com/ethersphere/bee/pkg/pusher/mock"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/storage/mock/validator"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
)
func TestTags(t *testing.T) {
var (
resource = func(addr swarm.Address) string { return "/bzz-chunk/" + addr.String() }
tagResourceAddress = func(addr swarm.Address) string { return "/bzz-tag/addr/" + addr.String() }
tagResourceUidCreate = func(name string) string { return "/bzz-tag/name/" + name }
tagResourceUUid = func(uuid uint64) string { return "/bzz-tag/uuid/" + strconv.FormatUint(uuid, 10) }
validHash = swarm.MustParseHexAddress("aabbcc")
validContent = []byte("bbaatt")
mockValidator = validator.NewMockValidator(validHash, validContent)
tag = tags.NewTags()
mockValidatingStorer = mock.NewValidatingStorer(mockValidator, tag)
mockPusher = mp.NewMockPusher(tag)
client = newTestServer(t, testServerOptions{
Storer: mockValidatingStorer,
Tags: tag,
})
)
t.Run("send-invalid-tag-id", func(t *testing.T) {
sentHheaders := make(http.Header)
sentHheaders.Set(api.TagHeaderUid, "file.jpg") // the value should be uint32
_ = jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusBadRequest, jsonhttp.StatusResponse{
Message: "invalid taguid",
Code: http.StatusBadRequest,
}, sentHheaders)
})
t.Run("uid-header-in-return-for-empty-tag", func(t *testing.T) {
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, nil)
isTagFoundInResponse(t, rcvdHeaders, nil)
})
t.Run("get-tag-and-use-it-to-upload-chunk", func(t *testing.T) {
// Get a tag using API
ta := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodPost, tagResourceUidCreate("file.jpg"), nil, http.StatusOK, &ta)
if ta.Name != "file.jpg" {
t.Fatalf("tagname is not the same that we sent")
}
// Now upload a chunk and see if we receive a tag with the same uid
sentHheaders := make(http.Header)
sentHheaders.Set(api.TagHeaderUid, strconv.FormatUint(uint64(ta.Uid), 10))
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, sentHheaders)
isTagFoundInResponse(t, rcvdHeaders, &ta)
})
t.Run("get-tag-and-use-it-to-upload-multiple-chunk", func(t *testing.T) {
// Get a tag using API
ta := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodPost, tagResourceUidCreate("file.jpg"), nil, http.StatusOK, &ta)
if ta.Name != "file.jpg" {
t.Fatalf("tagname is not the same that we sent")
}
// Now upload a chunk and see if we receive a tag with the same uid
sentHheaders := make(http.Header)
sentHheaders.Set(api.TagHeaderUid, strconv.FormatUint(uint64(ta.Uid), 10))
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, sentHheaders)
isTagFoundInResponse(t, rcvdHeaders, &ta)
// Add asecond valid contentto validator
secondValidHash := swarm.MustParseHexAddress("deadbeaf")
secondValidContent := []byte("123456")
mockValidator.AddPair(secondValidHash, secondValidContent)
sentHheaders = make(http.Header)
sentHheaders.Set(api.TagHeaderUid, strconv.FormatUint(uint64(ta.Uid), 10))
rcvdHeaders = jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, resource(secondValidHash), bytes.NewReader(secondValidContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, sentHheaders)
isTagFoundInResponse(t, rcvdHeaders, &ta)
})
t.Run("get-tag-indirectly-and-use-it-to-upload-chunk", func(t *testing.T) {
//Upload anew chunk and we give aUID in response and apps can use that too
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, nil)
uuid := isTagFoundInResponse(t, rcvdHeaders, nil)
// see if the tagid is present and has valid values
ta := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodGet, tagResourceUUid(uuid), nil, http.StatusOK, &ta)
// Now upload another chunk using the same tag id
sentHheaders := make(http.Header)
sentHheaders.Set(api.TagHeaderUid, strconv.FormatUint(uuid, 10))
_ = jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, sentHheaders)
// see if the tagid is present and has valid values
ta = api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodGet, tagResourceUUid(uuid), nil, http.StatusOK, &ta)
if uuid != uint64(ta.Uid) {
t.Fatalf("Invalid uuid response")
}
if ta.Stored != 2 {
t.Fatalf("same tag not used")
}
})
t.Run("get-tag-using-address", func(t *testing.T) {
// Get a tag
ta := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodPost, tagResourceUidCreate("file.jpg"), nil, http.StatusOK, &ta)
if ta.Name != "file.jpg" {
t.Fatalf("tagname is not the same that we sent")
}
// Now upload a chunk and see if we receive a tag with the same uid
sentHheaders := make(http.Header)
sentHheaders.Set(api.TagHeaderUid, strconv.FormatUint(uint64(ta.Uid), 10))
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, sentHheaders)
uuid := isTagFoundInResponse(t, rcvdHeaders, &ta)
// Request the tag and see if the UUID is the same
rtag := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodGet, tagResourceAddress(validHash), nil, http.StatusOK, &rtag)
if uuid != uint64(rtag.Uid) {
t.Fatalf("Invalid uuid response")
}
})
t.Run("get-tag-using-uuid", func(t *testing.T) {
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, nil)
uuid := isTagFoundInResponse(t, rcvdHeaders, nil)
// Request the tag and see if the UUID is the same
ta := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodGet, tagResourceUUid(uuid), nil, http.StatusOK, &ta)
if uuid != uint64(ta.Uid) {
t.Fatalf("Invalid uuid response")
}
})
t.Run("tag-counters", func(t *testing.T) {
rcvdHeaders := jsonhttptest.ResponseDirectSendHeadersAndReceiveHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, nil)
uuid1 := isTagFoundInResponse(t, rcvdHeaders, nil)
tagToVerify, err := tag.Get(uint32(uuid1))
if err != nil {
t.Fatal(err)
}
err = mockPusher.SendChunk(validHash)
if err != nil {
t.Fatal(err)
}
err = mockPusher.RcvdReceipt(validHash)
if err != nil {
t.Fatal(err)
}
finalTag := api.TagResponse{}
jsonhttptest.ResponseUnmarshal(t, client, http.MethodGet, tagResourceUUid(uuid1), nil, http.StatusOK, &finalTag)
if tagToVerify.Total != finalTag.Total ||
tagToVerify.Seen != finalTag.Seen ||
tagToVerify.Stored != finalTag.Stored ||
tagToVerify.Sent != finalTag.Seen ||
tagToVerify.Synced != finalTag.Synced {
t.Fatalf("Invalid counters")
}
})
}
func isTagFoundInResponse(t *testing.T, headers http.Header, tag *api.TagResponse) uint64 {
uidStr := headers.Get(api.TagHeaderUid)
if uidStr == "" {
t.Fatalf("could not find tagid header in chunk upload response")
}
uid, err := strconv.ParseUint(uidStr, 10, 32)
if err != nil {
t.Fatal(err)
}
if tag != nil {
if uid != uint64(tag.Uid) {
t.Fatalf("uid created is not received while uploading chunk, expected : %d, got %d", tag.Uid, uid)
}
}
return uid
}
......@@ -12,6 +12,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology"
"github.com/prometheus/client_golang/prometheus"
)
......@@ -35,6 +36,7 @@ type Options struct {
TopologyDriver topology.Notifier
Storer storage.Storer
Logger logging.Logger
Tags *tags.Tags
}
func New(o Options) Service {
......
......@@ -19,6 +19,7 @@ import (
mockstore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/mock"
"github.com/multiformats/go-multiaddr"
......@@ -30,6 +31,7 @@ type testServerOptions struct {
P2P p2p.Service
Storer storage.Storer
TopologyOpts []mock.Option
Tags *tags.Tags
}
type testServer struct {
......@@ -46,6 +48,7 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer {
s := debugapi.New(debugapi.Options{
Overlay: o.Overlay,
P2P: o.P2P,
Tags: o.Tags,
Logger: logging.New(ioutil.Discard, 0),
Addressbook: addrbook,
Storer: o.Storer,
......@@ -73,6 +76,7 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer {
func newBZZTestServer(t *testing.T, o testServerOptions) *http.Client {
s := api.New(api.Options{
Storer: o.Storer,
Tags: o.Tags,
Logger: logging.New(ioutil.Discard, 0),
})
ts := httptest.NewServer(s)
......
......@@ -15,6 +15,7 @@ import (
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/storage/mock/validator"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
)
// TestPinChunkHandler checks for pinning, unpinning and listing of chunks.
......@@ -26,13 +27,16 @@ func TestPinChunkHandler(t *testing.T) {
hash := swarm.MustParseHexAddress("aabbcc")
data := []byte("bbaatt")
mockValidator := validator.NewMockValidator(hash, data)
mockValidatingStorer := mock.NewValidatingStorer(mockValidator)
tag := tags.NewTags()
mockValidatingStorer := mock.NewValidatingStorer(mockValidator, tag)
debugTestServer := newTestServer(t, testServerOptions{
Storer: mockValidatingStorer,
Tags: tag,
})
// This server is used to store chunks
bzzTestServer := newBZZTestServer(t, testServerOptions{
Storer: mockValidatingStorer,
Tags: tag,
})
// bad chunk address
......
......@@ -35,8 +35,8 @@ func ResponseDirect(t *testing.T, client *http.Client, method, url string, body
}
}
func ResponseDirectWithHeaders(t *testing.T, client *http.Client, method, url string, body io.Reader, responseCode int,
response interface{}, headers http.Header) {
func ResponseDirectSendHeadersAndReceiveHeaders(t *testing.T, client *http.Client, method, url string, body io.Reader, responseCode int,
response interface{}, headers http.Header) http.Header {
t.Helper()
resp := request(t, client, method, url, body, responseCode, headers)
......@@ -56,6 +56,8 @@ func ResponseDirectWithHeaders(t *testing.T, client *http.Client, method, url st
if !bytes.Equal(got, want) {
t.Errorf("got response %s, want %s", string(got), string(want))
}
return resp.Header
}
func ResponseUnmarshal(t *testing.T, client *http.Client, method, url string, body io.Reader, responseCode int, response interface{}) {
......
......@@ -37,6 +37,7 @@ import (
mockinmem "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/bee/pkg/validator"
ma "github.com/multiformats/go-multiaddr"
......@@ -204,6 +205,7 @@ func NewBee(o Options) (*Bee, error) {
Storer: storer,
Logger: logger,
})
tag := tags.NewTags()
if err = p2ps.AddProtocol(retrieve.Protocol()); err != nil {
return nil, fmt.Errorf("retrieval service: %w", err)
......@@ -226,6 +228,7 @@ func NewBee(o Options) (*Bee, error) {
Storer: storer,
PeerSuggester: topologyDriver,
PushSyncer: pushSyncProtocol,
Tags: tag,
Logger: logger,
})
b.pusherCloser = pushSyncPusher
......@@ -235,6 +238,7 @@ func NewBee(o Options) (*Bee, error) {
// API server
apiService = api.New(api.Options{
Pingpong: pingPong,
Tags: tag,
Storer: ns,
Logger: logger,
Tracer: tracer,
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mock
import (
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
)
type MockPusher struct {
tag *tags.Tags
}
func NewMockPusher(tag *tags.Tags) *MockPusher {
return &MockPusher{
tag: tag,
}
}
func (m *MockPusher) SendChunk(address swarm.Address) error {
ta, err := m.tag.GetByAddress(address)
if err != nil {
return err
}
ta.Inc(tags.StateSent)
return nil
}
func (m *MockPusher) RcvdReceipt(address swarm.Address) error {
ta, err := m.tag.GetByAddress(address)
if err != nil {
return err
}
ta.Inc(tags.StateSynced)
return nil
}
......@@ -12,12 +12,14 @@ import (
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology"
)
type Service struct {
storer storage.Storer
pushSyncer pushsync.PushSyncer
tag *tags.Tags
logger logging.Logger
metrics metrics
quit chan struct{}
......@@ -27,6 +29,7 @@ type Service struct {
type Options struct {
Storer storage.Storer
PeerSuggester topology.ClosestPeerer
Tags *tags.Tags
PushSyncer pushsync.PushSyncer
Logger logging.Logger
}
......@@ -37,6 +40,7 @@ func New(o Options) *Service {
service := &Service{
storer: o.Storer,
pushSyncer: o.PushSyncer,
tag: o.Tags,
logger: o.Logger,
metrics: newMetrics(),
quit: make(chan struct{}),
......@@ -79,9 +83,16 @@ func (s *Service) chunksWorker() {
chunksInBatch++
s.metrics.TotalChunksToBeSentCounter.Inc()
t, err := s.tag.GetByAddress(ch.Address())
if err != nil {
s.logger.Debugf("pusher: get tag by address %s: %v", ch.Address(), err)
continue
}
t.Inc(tags.StateSent)
// Later when we process receipt, get the receipt and process it
// for now ignoring the receipt and checking only for error
_, err := s.pushSyncer.PushChunkToClosest(ctx, ch)
_, err = s.pushSyncer.PushChunkToClosest(ctx, ch)
if err != nil {
s.logger.Errorf("pusher: error while sending chunk or receiving receipt: %v", err)
continue
......@@ -123,6 +134,12 @@ func (s *Service) setChunkAsSynced(ctx context.Context, addr swarm.Address) {
s.metrics.ErrorSettingChunkToSynced.Inc()
} else {
s.metrics.TotalChunksSynced.Inc()
ta, err := s.tag.GetByAddress(addr)
if err != nil {
s.logger.Debugf("pusher: get tag by address %s: %v", addr, err)
return
}
ta.Inc(tags.StateSynced)
}
}
......
......@@ -7,6 +7,7 @@ package pusher_test
import (
"context"
"errors"
"github.com/ethersphere/bee/pkg/tags"
"io/ioutil"
"sync"
"testing"
......@@ -23,7 +24,7 @@ import (
)
// no of times to retry to see if we have received response from pushsync
var noOfRetries = 10
var noOfRetries = 20
// Wrap the actual storer to intercept the modeSet that the pusher will call when a valid receipt is received
type Store struct {
......@@ -58,11 +59,16 @@ func TestSendChunkToPushSync(t *testing.T) {
}
return receipt, nil
})
p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
mtag := tags.NewTags()
tag, err := mtag.Create("name", 1, false)
if err != nil {
t.Fatal(err)
}
tag.Address = chunk.Address()
p, storer := createPusher(t, triggerPeer, pushSyncService, mtag, mock.WithClosestPeer(closestPeer))
defer storer.Close()
_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
_, err = storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
......@@ -97,10 +103,16 @@ func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) {
return nil, errors.New("invalid receipt")
})
p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
mtag := tags.NewTags()
tag, err := mtag.Create("name", 1, false)
if err != nil {
t.Fatal(err)
}
tag.Address = chunk.Address()
p, storer := createPusher(t, triggerPeer, pushSyncService, mtag, mock.WithClosestPeer(closestPeer))
defer storer.Close()
_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
_, err = storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
......@@ -138,10 +150,16 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
return nil, nil
})
p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
mtag := tags.NewTags()
tag, err := mtag.Create("name", 1, false)
if err != nil {
t.Fatal(err)
}
tag.Address = chunk.Address()
p, storer := createPusher(t, triggerPeer, pushSyncService, mtag, mock.WithClosestPeer(closestPeer))
defer storer.Close()
_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
_, err = storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
......@@ -169,7 +187,7 @@ func createChunk() swarm.Chunk {
return swarm.NewChunk(chunkAddress, chunkData)
}
func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, mockOpts ...mock.Option) (*pusher.Service, *Store) {
func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, tag *tags.Tags, mockOpts ...mock.Option) (*pusher.Service, *Store) {
t.Helper()
logger := logging.New(ioutil.Discard, 0)
storer, err := localstore.New("", addr.Bytes(), nil, logger)
......@@ -183,7 +201,8 @@ func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.Pus
modeSetMu: &sync.Mutex{},
}
peerSuggester := mock.NewTopologyDriver(mockOpts...)
pusherService := pusher.New(pusher.Options{Storer: pusherStorer, PushSyncer: pushSyncService, PeerSuggester: peerSuggester, Logger: logger})
pusherService := pusher.New(pusher.Options{Storer: pusherStorer, Tags: tag, PushSyncer: pushSyncService, PeerSuggester: peerSuggester, Logger: logger})
return pusherService, pusherStorer
}
......
......@@ -11,6 +11,7 @@ import (
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
)
var _ storage.Storer = (*MockStorer)(nil)
......@@ -23,6 +24,7 @@ type MockStorer struct {
pinnedCounter []uint64 // and its respective counter. These are stored as slices to preserve the order.
pinSetMu sync.Mutex
validator swarm.ChunkValidator
tags *tags.Tags
}
func NewStorer() storage.Storer {
......@@ -33,13 +35,14 @@ func NewStorer() storage.Storer {
}
}
func NewValidatingStorer(v swarm.ChunkValidator) *MockStorer {
func NewValidatingStorer(v swarm.ChunkValidator, tags *tags.Tags) *MockStorer {
return &MockStorer{
store: make(map[string][]byte),
modeSet: make(map[string]storage.ModeSet),
modeSetMu: sync.Mutex{},
pinSetMu: sync.Mutex{},
validator: v,
tags: tags,
}
}
......@@ -59,8 +62,19 @@ func (m *MockStorer) Put(ctx context.Context, mode storage.ModePut, chs ...swarm
}
}
m.store[ch.Address().String()] = ch.Data()
yes, err := m.Has(ctx, ch.Address())
if err != nil {
exist = append(exist, false)
continue
}
if yes {
exist = append(exist, true)
} else {
exist = append(exist, false)
}
}
return nil, nil
return exist, nil
}
func (m *MockStorer) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm.Address) (ch []swarm.Chunk, err error) {
......
......@@ -3,6 +3,7 @@ package mock_test
import (
"bytes"
"context"
"github.com/ethersphere/bee/pkg/tags"
"testing"
"github.com/ethersphere/bee/pkg/storage"
......@@ -64,7 +65,7 @@ func TestMockValidatingStorer(t *testing.T) {
validContent := []byte("bbaatt")
invalidContent := []byte("bbaattss")
s := mock.NewValidatingStorer(validator.NewMockValidator(validAddress, validContent))
s := mock.NewValidatingStorer(validator.NewMockValidator(validAddress, validContent), tags.NewTags())
ctx := context.Background()
......
......@@ -40,7 +40,8 @@ var (
type State = uint32
const (
StateSplit State = iota // chunk has been processed by filehasher/swarm safe call
TotalChunks State = iota // The total no of chunks for the tag
StateSplit // chunk has been processed by filehasher/swarm safe call
StateStored // chunk stored locally
StateSeen // chunk previously seen
StateSent // chunk sent to neighbourhood
......@@ -100,6 +101,8 @@ func (t *Tag) FinishRootSpan() {
func (t *Tag) IncN(state State, n int) {
var v *int64
switch state {
case TotalChunks:
v = &t.Total
case StateSplit:
v = &t.Split
case StateStored:
......@@ -123,6 +126,8 @@ func (t *Tag) Inc(state State) {
func (t *Tag) Get(state State) int64 {
var v *int64
switch state {
case TotalChunks:
v = &t.Total
case StateSplit:
v = &t.Split
case StateStored:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment