Commit 2b491c9f authored by Nemanja Zbiljić's avatar Nemanja Zbiljić Committed by GitHub

Updates to API endpoints for pin and unpin (#940)

parent 9287808c
......@@ -156,7 +156,7 @@ func newTestServer(t *testing.T, storer storage.Storer) *url.URL {
t.Helper()
logger := logging.New(ioutil.Discard, 0)
store := statestore.NewStateStore()
s := api.New(tags.NewTags(store, logger), storer, nil, nil, logger, nil, api.Options{})
s := api.New(tags.NewTags(store, logger), storer, nil, nil, nil, logger, nil, api.Options{})
ts := httptest.NewServer(s)
srvUrl, err := url.Parse(ts.URL)
if err != nil {
......
......@@ -474,7 +474,7 @@ paths:
default:
description: Default response
'/pinning/chunks/{address}':
'/pin/chunks/{address}':
parameters:
- in: path
name: address
......@@ -538,7 +538,7 @@ paths:
default:
description: Default response
'/pinning/chunks':
'/pin/chunks':
get:
summary: Get list of pinned chunks
tags:
......@@ -557,6 +557,147 @@ paths:
default:
description: Default response
'/pin/bytes/{address}':
parameters:
- in: path
name: address
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/SwarmAddress'
required: true
description: Swarm address of the bytes
post:
summary: Pin bytes with given address
tags:
- Bytes pinning
responses:
'200':
description: Pinning bytes chunks with address
content:
application/json:
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/Response'
'400':
$ref: 'SwarmCommon.yaml#/components/responses/400'
'403':
$ref: 'SwarmCommon.yaml#/components/responses/403'
'404':
$ref: 'SwarmCommon.yaml#/components/responses/404'
default:
description: Default response
delete:
summary: Unpin bytes chunks with given address
tags:
- Bytes pinning
responses:
'200':
description: Unpinning chunk with address
content:
application/json:
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/Response'
'400':
$ref: 'SwarmCommon.yaml#/components/responses/400'
'403':
$ref: 'SwarmCommon.yaml#/components/responses/403'
'404':
$ref: 'SwarmCommon.yaml#/components/responses/404'
default:
description: Default response
'/pin/files/{address}':
parameters:
- in: path
name: address
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/SwarmAddress'
required: true
description: Swarm address of the file
post:
summary: Pin file with given address
tags:
- File pinning
responses:
'200':
description: Pinning file chunks with address
content:
application/json:
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/Response'
'400':
$ref: 'SwarmCommon.yaml#/components/responses/400'
'403':
$ref: 'SwarmCommon.yaml#/components/responses/403'
'404':
$ref: 'SwarmCommon.yaml#/components/responses/404'
default:
description: Default response
delete:
summary: Unpin file chunks with given address
tags:
- File pinning
responses:
'200':
description: Unpinning file chunks with address
content:
application/json:
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/Response'
'400':
$ref: 'SwarmCommon.yaml#/components/responses/400'
'403':
$ref: 'SwarmCommon.yaml#/components/responses/403'
'404':
$ref: 'SwarmCommon.yaml#/components/responses/404'
default:
description: Default response
'/pin/bzz/{address}':
parameters:
- in: path
name: address
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/SwarmAddress'
required: true
description: Swarm address of the collection
post:
summary: Pin collection with given address
tags:
- Collection pinning
responses:
'200':
description: Pinning collection chunks (and all referenced files) with address
content:
application/json:
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/Response'
'400':
$ref: 'SwarmCommon.yaml#/components/responses/400'
'403':
$ref: 'SwarmCommon.yaml#/components/responses/403'
'404':
$ref: 'SwarmCommon.yaml#/components/responses/404'
default:
description: Default response
delete:
summary: Unpin file chunks with given address
tags:
- Collection pinning
responses:
'200':
description: Unpinning collection chunks (and all referenced files) with address
content:
application/json:
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/Response'
'400':
$ref: 'SwarmCommon.yaml#/components/responses/400'
'403':
$ref: 'SwarmCommon.yaml#/components/responses/403'
'404':
$ref: 'SwarmCommon.yaml#/components/responses/404'
default:
description: Default response
'/pss/send/{topic}/{targets}':
post:
summary: Send to recipient or target with Postal Service for Swarm
......
......@@ -22,6 +22,7 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/bee/pkg/traversal"
)
const (
......@@ -57,12 +58,13 @@ type Service interface {
}
type server struct {
Tags *tags.Tags
Storer storage.Storer
Resolver resolver.Interface
Pss pss.Interface
Logger logging.Logger
Tracer *tracing.Tracer
Tags *tags.Tags
Storer storage.Storer
Resolver resolver.Interface
Pss pss.Interface
Traversal traversal.Service
Logger logging.Logger
Tracer *tracing.Tracer
Options
http.Handler
metrics metrics
......@@ -83,17 +85,18 @@ const (
)
// New will create a and initialize a new API service.
func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, pss pss.Interface, logger logging.Logger, tracer *tracing.Tracer, o Options) Service {
func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, pss pss.Interface, traversalService traversal.Service, logger logging.Logger, tracer *tracing.Tracer, o Options) Service {
s := &server{
Tags: tags,
Storer: storer,
Resolver: resolver,
Pss: pss,
Options: o,
Logger: logger,
Tracer: tracer,
metrics: newMetrics(),
quit: make(chan struct{}),
Tags: tags,
Storer: storer,
Resolver: resolver,
Pss: pss,
Traversal: traversalService,
Options: o,
Logger: logger,
Tracer: tracer,
metrics: newMetrics(),
quit: make(chan struct{}),
}
s.setupRouting()
......
......@@ -21,6 +21,7 @@ import (
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/gorilla/websocket"
"resenje.org/web"
)
......@@ -29,6 +30,7 @@ type testServerOptions struct {
Storer storage.Storer
Resolver resolver.Interface
Pss pss.Interface
Traversal traversal.Service
WsPath string
Tags *tags.Tags
GatewayMode bool
......@@ -47,7 +49,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
if o.WsPingPeriod == 0 {
o.WsPingPeriod = 60 * time.Second
}
s := api.New(o.Tags, o.Storer, o.Resolver, o.Pss, o.Logger, nil, api.Options{
s := api.New(o.Tags, o.Storer, o.Resolver, o.Pss, o.Traversal, o.Logger, nil, api.Options{
GatewayMode: o.GatewayMode,
WsPingPeriod: o.WsPingPeriod,
})
......@@ -147,7 +149,7 @@ func TestParseName(t *testing.T) {
}))
}
s := api.New(nil, nil, tC.res, nil, tC.log, nil, api.Options{}).(*api.Server)
s := api.New(nil, nil, tC.res, nil, nil, tC.log, nil, api.Options{}).(*api.Server)
t.Run(tC.desc, func(t *testing.T) {
got, err := s.ResolveNameOrAddress(tC.name)
......
......@@ -33,11 +33,11 @@ func TestGatewayMode(t *testing.T) {
})
t.Run("pinning endpoints", func(t *testing.T) {
path := "/pinning/chunks/0773a91efd6547c754fc1d95fb1c62c7d1b47f959c2caa685dfec8736da95c1c"
path := "/pin/chunks/0773a91efd6547c754fc1d95fb1c62c7d1b47f959c2caa685dfec8736da95c1c"
jsonhttptest.Request(t, client, http.MethodGet, path, http.StatusForbidden, forbiddenResponseOption)
jsonhttptest.Request(t, client, http.MethodPost, path, http.StatusForbidden, forbiddenResponseOption)
jsonhttptest.Request(t, client, http.MethodDelete, path, http.StatusForbidden, forbiddenResponseOption)
jsonhttptest.Request(t, client, http.MethodGet, "/pinning/chunks", http.StatusForbidden, forbiddenResponseOption)
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks", http.StatusForbidden, forbiddenResponseOption)
})
t.Run("tags endpoints", func(t *testing.T) {
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"errors"
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/gorilla/mux"
)
// pinBytes is used to pin an already uploaded content.
func (s *server) pinBytes(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.Logger.Debugf("pin bytes: parse address: %v", err)
s.Logger.Error("pin bytes: parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.Storer.Has(r.Context(), addr)
if err != nil {
s.Logger.Debugf("pin bytes: localstore has: %v", err)
s.Logger.Error("pin bytes: store")
jsonhttp.InternalServerError(w, err)
return
}
if !has {
jsonhttp.NotFound(w, nil)
return
}
ctx := r.Context()
chunkAddressFn := s.pinChunkAddressFn(ctx, addr)
err = s.Traversal.TraverseBytesAddresses(ctx, addr, chunkAddressFn)
if err != nil {
s.Logger.Debugf("pin bytes: traverse chunks: %v, addr %s", err, addr)
if errors.Is(err, traversal.ErrInvalidType) {
s.Logger.Error("pin bytes: invalid type")
jsonhttp.BadRequest(w, "invalid type")
return
}
s.Logger.Error("pin bytes: cannot pin")
jsonhttp.InternalServerError(w, "cannot pin")
return
}
jsonhttp.OK(w, nil)
}
// unpinBytes removes pinning from content.
func (s *server) unpinBytes(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.Logger.Debugf("pin bytes: parse address: %v", err)
s.Logger.Error("pin bytes: parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.Storer.Has(r.Context(), addr)
if err != nil {
s.Logger.Debugf("pin bytes: localstore has: %v", err)
s.Logger.Error("pin bytes: store")
jsonhttp.InternalServerError(w, err)
return
}
if !has {
jsonhttp.NotFound(w, nil)
return
}
ctx := r.Context()
chunkAddressFn := s.unpinChunkAddressFn(ctx, addr)
err = s.Traversal.TraverseBytesAddresses(ctx, addr, chunkAddressFn)
if err != nil {
s.Logger.Debugf("pin bytes: traverse chunks: %v, addr %s", err, addr)
if errors.Is(err, traversal.ErrInvalidType) {
s.Logger.Error("pin bytes: invalid type")
jsonhttp.BadRequest(w, "invalid type")
return
}
s.Logger.Error("pin bytes: cannot unpin")
jsonhttp.InternalServerError(w, "cannot unpin")
return
}
jsonhttp.OK(w, nil)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"bytes"
"io/ioutil"
"net/http"
"sort"
"testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/traversal"
)
func TestPinBytesHandler(t *testing.T) {
var (
bytesUploadResource = "/bytes"
pinBytesResource = "/pin/bytes"
pinBytesAddressResource = func(addr string) string { return pinBytesResource + "/" + addr }
pinChunksResource = "/pin/chunks"
simpleData = []byte("this is a simple text")
mockStorer = mock.NewStorer()
mockStatestore = statestore.NewStateStore()
traversalService = traversal.NewService(mockStorer)
logger = logging.New(ioutil.Discard, 0)
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Traversal: traversalService,
Tags: tags.NewTags(mockStatestore, logger),
})
)
t.Run("pin-bytes-1", func(t *testing.T) {
rootHash := "838d0a193ecd1152d1bb1432d5ecc02398533b2494889e23b8bd5ace30ac2aeb"
jsonhttptest.Request(t, client, http.MethodPost, bytesUploadResource, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(simpleData)),
jsonhttptest.WithExpectedJSONResponse(api.FileUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash),
}),
)
jsonhttptest.Request(t, client, http.MethodPost, pinBytesAddressResource(rootHash), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
hashes := []string{rootHash}
sort.Strings(hashes)
expectedResponse := api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{},
}
for _, h := range hashes {
expectedResponse.Chunks = append(expectedResponse.Chunks, api.PinnedChunk{
Address: swarm.MustParseHexAddress(h),
PinCounter: 1,
})
}
jsonhttptest.Request(t, client, http.MethodGet, pinChunksResource, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(expectedResponse),
)
})
t.Run("unpin-bytes-1", func(t *testing.T) {
rootHash := "838d0a193ecd1152d1bb1432d5ecc02398533b2494889e23b8bd5ace30ac2aeb"
jsonhttptest.Request(t, client, http.MethodDelete, pinBytesAddressResource(rootHash), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
jsonhttptest.Request(t, client, http.MethodGet, pinChunksResource, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{},
}),
)
})
t.Run("pin-bytes-2", func(t *testing.T) {
var b []byte
for {
b = append(b, simpleData...)
if len(b) > swarm.ChunkSize {
break
}
}
rootHash := "42ee01ae3a50663ca0903f2d5c3b55fc5ef4faf98368b74cf9a24c75955fa388"
data1Hash := "933db58bbd119e5d3a8eb4fc7d4a923d5e7cfc7ca35b07832ced495d05721b6d"
data2Hash := "430274f4e6d2af72b5491ad0dc7707e45892fb3a166c54b6ac9cee3c14149757"
jsonhttptest.Request(t, client, http.MethodPost, bytesUploadResource, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(b)),
jsonhttptest.WithExpectedJSONResponse(api.FileUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash),
}),
)
jsonhttptest.Request(t, client, http.MethodPost, pinBytesAddressResource(rootHash), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
hashes := []string{rootHash, data1Hash, data2Hash}
expectedResponse := api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{},
}
for _, h := range hashes {
expectedResponse.Chunks = append(expectedResponse.Chunks, api.PinnedChunk{
Address: swarm.MustParseHexAddress(h),
PinCounter: 1,
})
}
jsonhttptest.Request(t, client, http.MethodGet, pinChunksResource, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(expectedResponse),
)
})
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"errors"
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/gorilla/mux"
)
// pinBzz is used to pin an already uploaded content.
func (s *server) pinBzz(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.Logger.Debugf("pin bzz: parse address: %v", err)
s.Logger.Error("pin bzz: parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.Storer.Has(r.Context(), addr)
if err != nil {
s.Logger.Debugf("pin bzz: localstore has: %v", err)
s.Logger.Error("pin bzz: store")
jsonhttp.InternalServerError(w, err)
return
}
if !has {
jsonhttp.NotFound(w, nil)
return
}
ctx := r.Context()
chunkAddressFn := s.pinChunkAddressFn(ctx, addr)
err = s.Traversal.TraverseManifestAddresses(ctx, addr, chunkAddressFn)
if err != nil {
s.Logger.Debugf("pin bzz: traverse chunks: %v, addr %s", err, addr)
if errors.Is(err, traversal.ErrInvalidType) {
s.Logger.Error("pin bzz: invalid type")
jsonhttp.BadRequest(w, "invalid type")
return
}
s.Logger.Error("pin bzz: cannot pin")
jsonhttp.InternalServerError(w, "cannot pin")
return
}
jsonhttp.OK(w, nil)
}
// unpinBzz removes pinning from content.
func (s *server) unpinBzz(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.Logger.Debugf("pin bzz: parse address: %v", err)
s.Logger.Error("pin bzz: parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.Storer.Has(r.Context(), addr)
if err != nil {
s.Logger.Debugf("pin bzz: localstore has: %v", err)
s.Logger.Error("pin bzz: store")
jsonhttp.InternalServerError(w, err)
return
}
if !has {
jsonhttp.NotFound(w, nil)
return
}
ctx := r.Context()
chunkAddressFn := s.unpinChunkAddressFn(ctx, addr)
err = s.Traversal.TraverseManifestAddresses(ctx, addr, chunkAddressFn)
if err != nil {
s.Logger.Debugf("pin bzz: traverse chunks: %v, addr %s", err, addr)
if errors.Is(err, traversal.ErrInvalidType) {
s.Logger.Error("pin bzz: invalid type")
jsonhttp.BadRequest(w, "invalid type")
return
}
s.Logger.Error("pin bzz: cannot unpin")
jsonhttp.InternalServerError(w, "cannot unpin")
return
}
jsonhttp.OK(w, nil)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/ethersphere/manifest/mantaray"
)
func TestPinBzzHandler(t *testing.T) {
var (
dirUploadResource = "/dirs"
pinBzzResource = "/pin/bzz"
pinBzzAddressResource = func(addr string) string { return pinBzzResource + "/" + addr }
pinChunksResource = "/pin/chunks"
mockStorer = mock.NewStorer()
mockStatestore = statestore.NewStateStore()
traversalService = traversal.NewService(mockStorer)
logger = logging.New(ioutil.Discard, 0)
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Traversal: traversalService,
Tags: tags.NewTags(mockStatestore, logger),
})
)
var (
obfuscationKey = make([]byte, 32)
obfuscationKeyFn = func(p []byte) (n int, err error) {
n = copy(p, obfuscationKey)
return
}
)
mantaray.SetObfuscationKeyFn(obfuscationKeyFn)
t.Run("pin-bzz-1", func(t *testing.T) {
files := []f{
{
data: []byte("<h1>Swarm"),
name: "index.html",
dir: "",
},
}
tarReader := tarFiles(t, files)
rootHash := "a85aaea6a34a5c7127a3546196f2111f866fe369c6d6562ed5d3313a99388c03"
// verify directory tar upload response
jsonhttptest.Request(t, client, http.MethodPost, dirUploadResource, http.StatusOK,
jsonhttptest.WithRequestBody(tarReader),
jsonhttptest.WithRequestHeader("Content-Type", api.ContentTypeTar),
jsonhttptest.WithExpectedJSONResponse(api.FileUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash),
}),
)
jsonhttptest.Request(t, client, http.MethodPost, pinBzzAddressResource(rootHash), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
expectedChunkCount := 7
var respBytes []byte
jsonhttptest.Request(t, client, http.MethodGet, pinChunksResource, http.StatusOK,
jsonhttptest.WithPutResponseBody(&respBytes),
)
read := bytes.NewReader(respBytes)
// get the reference as everytime it will change because of random encryption key
var resp api.ListPinnedChunksResponse
err := json.NewDecoder(read).Decode(&resp)
if err != nil {
t.Fatal(err)
}
if expectedChunkCount != len(resp.Chunks) {
t.Fatalf("expected to find %d pinned chunks, got %d", expectedChunkCount, len(resp.Chunks))
}
})
t.Run("unpin-bzz-1", func(t *testing.T) {
rootHash := "a85aaea6a34a5c7127a3546196f2111f866fe369c6d6562ed5d3313a99388c03"
jsonhttptest.Request(t, client, http.MethodDelete, pinBzzAddressResource(rootHash), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
jsonhttptest.Request(t, client, http.MethodGet, pinChunksResource, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{},
}),
)
})
}
......@@ -5,6 +5,7 @@
package api
import (
"context"
"errors"
"net/http"
"strconv"
......@@ -182,3 +183,33 @@ func (s *server) getPinnedChunk(w http.ResponseWriter, r *http.Request) {
PinCounter: pinCounter,
})
}
func (s *server) pinChunkAddressFn(ctx context.Context, reference swarm.Address) func(address swarm.Address) (stop bool) {
return func(address swarm.Address) (stop bool) {
err := s.Storer.Set(ctx, storage.ModeSetPin, address)
if err != nil {
s.Logger.Debugf("pin error: for reference %s, address %s: %w", reference, address, err)
// stop pinning on first error
return true
}
return false
}
}
func (s *server) unpinChunkAddressFn(ctx context.Context, reference swarm.Address) func(address swarm.Address) (stop bool) {
return func(address swarm.Address) (stop bool) {
_, err := s.Storer.PinCounter(address)
if err != nil {
return false
}
err = s.Storer.Set(ctx, storage.ModeSetUnpin, address)
if err != nil {
s.Logger.Debugf("unpin error: for reference %s, address %s: %w", reference, address, err)
// continue un-pinning all chunks
}
return false
}
}
......@@ -46,7 +46,7 @@ func TestPinChunkHandler(t *testing.T) {
// bad chunk address
t.Run("pin-bad-address", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodPost, "/pinning/chunks/abcd1100zz", http.StatusBadRequest,
jsonhttptest.Request(t, client, http.MethodPost, "/pin/chunks/abcd1100zz", http.StatusBadRequest,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: "bad address",
Code: http.StatusBadRequest,
......@@ -56,7 +56,7 @@ func TestPinChunkHandler(t *testing.T) {
// list pins without anything pinned
t.Run("list-pins-zero-pins", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodGet, "/pinning/chunks", http.StatusOK,
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks", http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{},
}),
......@@ -65,7 +65,7 @@ func TestPinChunkHandler(t *testing.T) {
// pin a chunk which is not existing
t.Run("pin-absent-chunk", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodPost, "/pinning/chunks/123456", http.StatusNotFound,
jsonhttptest.Request(t, client, http.MethodPost, "/pin/chunks/123456", http.StatusNotFound,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusNotFound),
Code: http.StatusNotFound,
......@@ -84,7 +84,7 @@ func TestPinChunkHandler(t *testing.T) {
}),
)
jsonhttptest.Request(t, client, http.MethodDelete, "/pinning/chunks/"+hash.String(), http.StatusBadRequest,
jsonhttptest.Request(t, client, http.MethodDelete, "/pin/chunks/"+hash.String(), http.StatusBadRequest,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: "chunk is not yet pinned",
Code: http.StatusBadRequest,
......@@ -103,7 +103,7 @@ func TestPinChunkHandler(t *testing.T) {
}),
)
jsonhttptest.Request(t, client, http.MethodPost, "/pinning/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.Request(t, client, http.MethodPost, "/pin/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
......@@ -111,7 +111,7 @@ func TestPinChunkHandler(t *testing.T) {
)
// Check is the chunk is pinned once
jsonhttptest.Request(t, client, http.MethodGet, "/pinning/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.PinnedChunk{
Address: swarm.MustParseHexAddress("aabbcc"),
PinCounter: 1,
......@@ -122,7 +122,7 @@ func TestPinChunkHandler(t *testing.T) {
// pin a existing chunk second time
t.Run("pin-chunk-2", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodPost, "/pinning/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.Request(t, client, http.MethodPost, "/pin/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
......@@ -130,7 +130,7 @@ func TestPinChunkHandler(t *testing.T) {
)
// Check is the chunk is pinned twice
jsonhttptest.Request(t, client, http.MethodGet, "/pinning/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.PinnedChunk{
Address: swarm.MustParseHexAddress("aabbcc"),
PinCounter: 2,
......@@ -140,7 +140,7 @@ func TestPinChunkHandler(t *testing.T) {
// unpin a chunk first time
t.Run("unpin-chunk-1", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodDelete, "/pinning/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.Request(t, client, http.MethodDelete, "/pin/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
......@@ -148,7 +148,7 @@ func TestPinChunkHandler(t *testing.T) {
)
// Check is the chunk is pinned once
jsonhttptest.Request(t, client, http.MethodGet, "/pinning/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.PinnedChunk{
Address: swarm.MustParseHexAddress("aabbcc"),
PinCounter: 1,
......@@ -158,7 +158,7 @@ func TestPinChunkHandler(t *testing.T) {
// unpin a chunk second time
t.Run("unpin-chunk-2", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodDelete, "/pinning/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.Request(t, client, http.MethodDelete, "/pin/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
......@@ -166,7 +166,7 @@ func TestPinChunkHandler(t *testing.T) {
)
// Check if the chunk is removed from the pinIndex
jsonhttptest.Request(t, client, http.MethodGet, "/pinning/chunks/"+hash.String(), http.StatusNotFound,
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks/"+hash.String(), http.StatusNotFound,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusNotFound),
Code: http.StatusNotFound,
......@@ -185,7 +185,7 @@ func TestPinChunkHandler(t *testing.T) {
}),
)
jsonhttptest.Request(t, client, http.MethodPost, "/pinning/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.Request(t, client, http.MethodPost, "/pin/chunks/"+hash.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
......@@ -203,14 +203,14 @@ func TestPinChunkHandler(t *testing.T) {
Code: http.StatusOK,
}),
)
jsonhttptest.Request(t, client, http.MethodPost, "/pinning/chunks/"+hash2.String(), http.StatusOK,
jsonhttptest.Request(t, client, http.MethodPost, "/pin/chunks/"+hash2.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
jsonhttptest.Request(t, client, http.MethodGet, "/pinning/chunks", http.StatusOK,
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks", http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{
{
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"errors"
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/gorilla/mux"
)
// pinFile is used to pin an already uploaded content.
func (s *server) pinFile(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.Logger.Debugf("pin files: parse address: %v", err)
s.Logger.Error("pin files: parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.Storer.Has(r.Context(), addr)
if err != nil {
s.Logger.Debugf("pin files: localstore has: %v", err)
s.Logger.Error("pin files: store")
jsonhttp.InternalServerError(w, err)
return
}
if !has {
jsonhttp.NotFound(w, nil)
return
}
ctx := r.Context()
chunkAddressFn := s.pinChunkAddressFn(ctx, addr)
err = s.Traversal.TraverseFileAddresses(ctx, addr, chunkAddressFn)
if err != nil {
s.Logger.Debugf("pin files: traverse chunks: %v, addr %s", err, addr)
if errors.Is(err, traversal.ErrInvalidType) {
s.Logger.Error("pin files: invalid type")
jsonhttp.BadRequest(w, "invalid type")
return
}
s.Logger.Error("pin files: cannot pin")
jsonhttp.InternalServerError(w, "cannot pin")
return
}
jsonhttp.OK(w, nil)
}
// unpinFile removes pinning from content.
func (s *server) unpinFile(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.Logger.Debugf("pin files: parse address: %v", err)
s.Logger.Error("pin files: parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.Storer.Has(r.Context(), addr)
if err != nil {
s.Logger.Debugf("pin files: localstore has: %v", err)
s.Logger.Error("pin files: store")
jsonhttp.InternalServerError(w, err)
return
}
if !has {
jsonhttp.NotFound(w, nil)
return
}
ctx := r.Context()
chunkAddressFn := s.unpinChunkAddressFn(ctx, addr)
err = s.Traversal.TraverseFileAddresses(ctx, addr, chunkAddressFn)
if err != nil {
s.Logger.Debugf("pin files: traverse chunks: %v, addr %s", err, addr)
if errors.Is(err, traversal.ErrInvalidType) {
s.Logger.Error("pin files: invalid type")
jsonhttp.BadRequest(w, "invalid type")
return
}
s.Logger.Error("pin files: cannot unpin")
jsonhttp.InternalServerError(w, "cannot unpin")
return
}
jsonhttp.OK(w, nil)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"bytes"
"io/ioutil"
"net/http"
"sort"
"testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/traversal"
)
func TestPinFilesHandler(t *testing.T) {
var (
fileUploadResource = "/files"
pinFilesResource = "/pin/files"
pinFilesAddressResource = func(addr string) string { return pinFilesResource + "/" + addr }
pinChunksResource = "/pin/chunks"
simpleData = []byte("this is a simple text")
mockStorer = mock.NewStorer()
mockStatestore = statestore.NewStateStore()
traversalService = traversal.NewService(mockStorer)
logger = logging.New(ioutil.Discard, 0)
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Traversal: traversalService,
Tags: tags.NewTags(mockStatestore, logger),
})
)
t.Run("pin-file-1", func(t *testing.T) {
rootHash := "dc82503e0ed041a57327ad558d7aa69a867024c8221306c461ae359dc34d1c6a"
metadataHash := "d936d7180f230b3424842ea10848aa205f2f0e830cb9cc7588a39c9381544bf9"
contentHash := "838d0a193ecd1152d1bb1432d5ecc02398533b2494889e23b8bd5ace30ac2aeb"
jsonhttptest.Request(t, client, http.MethodPost, fileUploadResource, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(simpleData)),
jsonhttptest.WithExpectedJSONResponse(api.FileUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash),
}),
jsonhttptest.WithRequestHeader("Content-Type", "text/plain"),
)
jsonhttptest.Request(t, client, http.MethodPost, pinFilesAddressResource(rootHash), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
hashes := []string{rootHash, metadataHash, contentHash}
sort.Strings(hashes)
expectedResponse := api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{},
}
for _, h := range hashes {
expectedResponse.Chunks = append(expectedResponse.Chunks, api.PinnedChunk{
Address: swarm.MustParseHexAddress(h),
PinCounter: 1,
})
}
jsonhttptest.Request(t, client, http.MethodGet, pinChunksResource, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(expectedResponse),
)
})
t.Run("unpin-file-1", func(t *testing.T) {
rootHash := "dc82503e0ed041a57327ad558d7aa69a867024c8221306c461ae359dc34d1c6a"
jsonhttptest.Request(t, client, http.MethodDelete, pinFilesAddressResource(rootHash), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
jsonhttptest.Request(t, client, http.MethodGet, pinChunksResource, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{},
}),
)
})
}
......@@ -120,7 +120,7 @@ func (s *server) setupRouting() {
})),
)
handle(router, "/pinning/chunks/{address}", web.ChainHandlers(
handle(router, "/pin/chunks/{address}", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler,
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.getPinnedChunk),
......@@ -128,13 +128,37 @@ func (s *server) setupRouting() {
"DELETE": http.HandlerFunc(s.unpinChunk),
})),
)
handle(router, "/pinning/chunks", web.ChainHandlers(
handle(router, "/pin/chunks", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler,
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.listPinnedChunks),
})),
)
handle(router, "/pin/bytes/{address}", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler,
web.FinalHandler(jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.pinBytes),
"DELETE": http.HandlerFunc(s.unpinBytes),
})),
)
handle(router, "/pin/files/{address}", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler,
web.FinalHandler(jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.pinFile),
"DELETE": http.HandlerFunc(s.unpinFile),
})),
)
handle(router, "/pin/bzz/{address}", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler,
web.FinalHandler(jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.pinBzz),
"DELETE": http.HandlerFunc(s.unpinBzz),
})),
)
s.Handler = web.ChainHandlers(
logging.NewHTTPAccessLogHandler(s.Logger, logrus.InfoLevel, "api access"),
handlers.CompressHandler,
......
......@@ -56,6 +56,7 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/bee/pkg/traversal"
ma "github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
......@@ -352,6 +353,8 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
pssService := pss.New(swarmPrivateKey, logger)
b.pssCloser = pssService
traversalService := traversal.NewService(storer)
var ns storage.Storer
if o.GlobalPinningEnabled {
// create recovery callback for content repair
......@@ -401,7 +404,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
var apiService api.Service
if o.APIAddr != "" {
// API server
apiService = api.New(tagService, ns, multiResolver, pssService, logger, tracer, api.Options{
apiService = api.New(tagService, ns, multiResolver, pssService, traversalService, logger, tracer, api.Options{
CORSAllowedOrigins: o.CORSAllowedOrigins,
GatewayMode: o.GatewayMode,
WsPingPeriod: 60 * time.Second,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment