Commit c4b7fa9a authored by Peter Mrekaj's avatar Peter Mrekaj Committed by GitHub

Change pins API (#1566)

The pins API is unified and exposes only addresses of pinned roots
without exposinge the associated counter to the users.
parent 99ef02e2
...@@ -19,6 +19,7 @@ require ( ...@@ -19,6 +19,7 @@ require (
github.com/gorilla/handlers v1.4.2 github.com/gorilla/handlers v1.4.2
github.com/gorilla/mux v1.7.4 github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/hashicorp/go-multierror v1.1.1
github.com/kardianos/service v1.2.0 github.com/kardianos/service v1.2.0
github.com/koron/go-ssdp v0.0.2 // indirect github.com/koron/go-ssdp v0.0.2 // indirect
github.com/kr/text v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect
......
This diff is collapsed.
...@@ -438,21 +438,21 @@ paths: ...@@ -438,21 +438,21 @@ paths:
default: default:
description: Default response description: Default response
"/pin/chunks/{address}": "/pins/{address}":
parameters: parameters:
- in: path - in: path
name: address name: address
schema: schema:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmAddress" $ref: "SwarmCommon.yaml#/components/schemas/SwarmAddress"
required: true required: true
description: Swarm address of chunk description: Swarm address of the root hash
post: post:
summary: Pin chunk with given address summary: Pin root hash with given address
tags: tags:
- Chunk pinning - Root hash pinning
responses: responses:
"200": "200":
description: Pinning chunk with address description: Pinning root hash with address
content: content:
application/json: application/json:
schema: schema:
...@@ -466,12 +466,12 @@ paths: ...@@ -466,12 +466,12 @@ paths:
default: default:
description: Default response description: Default response
delete: delete:
summary: Unpin chunk with given address summary: Unpin root hash with given address
tags: tags:
- Chunk pinning - Root hash pinning
responses: responses:
"200": "200":
description: Unpinning chunk with address description: Unpinning root hash with address
content: content:
application/json: application/json:
schema: schema:
...@@ -485,12 +485,12 @@ paths: ...@@ -485,12 +485,12 @@ paths:
default: default:
description: Default response description: Default response
get: get:
summary: Get pinning status of chunk with given address summary: Get pinning status of root hash with given address
tags: tags:
- Chunk pinning - Root hash pinning
responses: responses:
"200": "200":
description: Pinning state of chunk with address description: Address of pinned root hash
content: content:
application/json: application/json:
schema: schema:
...@@ -501,52 +501,15 @@ paths: ...@@ -501,52 +501,15 @@ paths:
$ref: "SwarmCommon.yaml#/components/responses/500" $ref: "SwarmCommon.yaml#/components/responses/500"
default: default:
description: Default response description: Default response
put:
summary: Update chunk pin counter
tags:
- Chunk pinning
responses:
"200":
description: Pinning state of chunk with address
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/PinningState"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"403":
$ref: "SwarmCommon.yaml#/components/responses/403"
"404":
$ref: "SwarmCommon.yaml#/components/responses/404"
default:
description: Default response
"/pin/chunks": "/pins":
get: get:
summary: Get list of pinned chunks summary: Get list of pinned root hashes addresses
tags: tags:
- Chunk pinning - Root hash pinning
parameters:
- in: query
name: offset
schema:
type: integer
minimum: 0
default: 0
required: false
description: The number of items to skip before starting to collect the result set.
- in: query
name: limit
schema:
type: integer
minimum: 1
maximum: 1000
default: 100
required: false
description: The numbers of items to return.
responses: responses:
"200": "200":
description: List of pinned chunks description: List of pinned root hashes addresses
content: content:
application/json: application/json:
schema: schema:
...@@ -558,147 +521,6 @@ paths: ...@@ -558,147 +521,6 @@ paths:
default: default:
description: Default response description: Default response
"/pin/bytes/{address}":
parameters:
- in: path
name: address
schema:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmAddress"
required: true
description: Swarm address of the bytes
post:
summary: Pin bytes with given address
tags:
- Bytes pinning
responses:
"200":
description: Pinning bytes chunks with address
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/Response"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"403":
$ref: "SwarmCommon.yaml#/components/responses/403"
"404":
$ref: "SwarmCommon.yaml#/components/responses/404"
default:
description: Default response
delete:
summary: Unpin bytes chunks with given address
tags:
- Bytes pinning
responses:
"200":
description: Unpinning chunk with address
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/Response"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"403":
$ref: "SwarmCommon.yaml#/components/responses/403"
"404":
$ref: "SwarmCommon.yaml#/components/responses/404"
default:
description: Default response
"/pin/files/{address}":
parameters:
- in: path
name: address
schema:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmAddress"
required: true
description: Swarm address of the file
post:
summary: Pin file with given address
tags:
- File pinning
responses:
"200":
description: Pinning file chunks with address
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/Response"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"403":
$ref: "SwarmCommon.yaml#/components/responses/403"
"404":
$ref: "SwarmCommon.yaml#/components/responses/404"
default:
description: Default response
delete:
summary: Unpin file chunks with given address
tags:
- File pinning
responses:
"200":
description: Unpinning file chunks with address
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/Response"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"403":
$ref: "SwarmCommon.yaml#/components/responses/403"
"404":
$ref: "SwarmCommon.yaml#/components/responses/404"
default:
description: Default response
"/pin/bzz/{address}":
parameters:
- in: path
name: address
schema:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmAddress"
required: true
description: Swarm address of the collection
post:
summary: Pin collection with given address
tags:
- Collection pinning
responses:
"200":
description: Pinning collection chunks (and all referenced files) with address
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/Response"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"403":
$ref: "SwarmCommon.yaml#/components/responses/403"
"404":
$ref: "SwarmCommon.yaml#/components/responses/404"
default:
description: Default response
delete:
summary: Unpin file chunks with given address
tags:
- Collection pinning
responses:
"200":
description: Unpinning collection chunks (and all referenced files) with address
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/Response"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"403":
$ref: "SwarmCommon.yaml#/components/responses/403"
"404":
$ref: "SwarmCommon.yaml#/components/responses/404"
default:
description: Default response
"/pss/send/{topic}/{targets}": "/pss/send/{topic}/{targets}":
post: post:
summary: Send to recipient or target with Postal Service for Swarm summary: Send to recipient or target with Postal Service for Swarm
......
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
"github.com/ethersphere/bee/pkg/file/pipeline/builder" "github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
m "github.com/ethersphere/bee/pkg/metrics" m "github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/pinning"
"github.com/ethersphere/bee/pkg/pss" "github.com/ethersphere/bee/pkg/pss"
"github.com/ethersphere/bee/pkg/resolver" "github.com/ethersphere/bee/pkg/resolver"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
...@@ -83,7 +84,8 @@ type server struct { ...@@ -83,7 +84,8 @@ type server struct {
storer storage.Storer storer storage.Storer
resolver resolver.Interface resolver resolver.Interface
pss pss.Interface pss pss.Interface
traversal traversal.Service traversal traversal.Traverser
pinning pinning.Interface
logger logging.Logger logger logging.Logger
tracer *tracing.Tracer tracer *tracing.Tracer
feedFactory feeds.Factory feedFactory feeds.Factory
...@@ -107,13 +109,14 @@ const ( ...@@ -107,13 +109,14 @@ const (
) )
// New will create a and initialize a new API service. // New will create a and initialize a new API service.
func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, pss pss.Interface, traversalService traversal.Service, feedFactory feeds.Factory, logger logging.Logger, tracer *tracing.Tracer, o Options) Service { func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, pss pss.Interface, traversalService traversal.Traverser, pinning pinning.Interface, feedFactory feeds.Factory, logger logging.Logger, tracer *tracing.Tracer, o Options) Service {
s := &server{ s := &server{
tags: tags, tags: tags,
storer: storer, storer: storer,
resolver: resolver, resolver: resolver,
pss: pss, pss: pss,
traversal: traversalService, traversal: traversalService,
pinning: pinning,
feedFactory: feedFactory, feedFactory: feedFactory,
Options: o, Options: o,
logger: logger, logger: logger,
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"github.com/ethersphere/bee/pkg/api" "github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/feeds" "github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pinning"
"github.com/ethersphere/bee/pkg/pss" "github.com/ethersphere/bee/pkg/pss"
"github.com/ethersphere/bee/pkg/resolver" "github.com/ethersphere/bee/pkg/resolver"
resolverMock "github.com/ethersphere/bee/pkg/resolver/mock" resolverMock "github.com/ethersphere/bee/pkg/resolver/mock"
...@@ -32,7 +33,8 @@ type testServerOptions struct { ...@@ -32,7 +33,8 @@ type testServerOptions struct {
Storer storage.Storer Storer storage.Storer
Resolver resolver.Interface Resolver resolver.Interface
Pss pss.Interface Pss pss.Interface
Traversal traversal.Service Traversal traversal.Traverser
Pinning pinning.Interface
WsPath string WsPath string
Tags *tags.Tags Tags *tags.Tags
GatewayMode bool GatewayMode bool
...@@ -44,6 +46,8 @@ type testServerOptions struct { ...@@ -44,6 +46,8 @@ type testServerOptions struct {
} }
func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string) { func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string) {
t.Helper()
if o.Logger == nil { if o.Logger == nil {
o.Logger = logging.New(ioutil.Discard, 0) o.Logger = logging.New(ioutil.Discard, 0)
} }
...@@ -53,7 +57,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. ...@@ -53,7 +57,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
if o.WsPingPeriod == 0 { if o.WsPingPeriod == 0 {
o.WsPingPeriod = 60 * time.Second o.WsPingPeriod = 60 * time.Second
} }
s := api.New(o.Tags, o.Storer, o.Resolver, o.Pss, o.Traversal, o.Feeds, o.Logger, nil, api.Options{ s := api.New(o.Tags, o.Storer, o.Resolver, o.Pss, o.Traversal, o.Pinning, o.Feeds, o.Logger, nil, api.Options{
CORSAllowedOrigins: o.CORSAllowedOrigins, CORSAllowedOrigins: o.CORSAllowedOrigins,
GatewayMode: o.GatewayMode, GatewayMode: o.GatewayMode,
WsPingPeriod: o.WsPingPeriod, WsPingPeriod: o.WsPingPeriod,
...@@ -171,7 +175,7 @@ func TestParseName(t *testing.T) { ...@@ -171,7 +175,7 @@ func TestParseName(t *testing.T) {
})) }))
} }
s := api.New(nil, nil, tC.res, nil, nil, nil, tC.log, nil, api.Options{}).(*api.Server) s := api.New(nil, nil, tC.res, nil, nil, nil, nil, tC.log, nil, api.Options{}).(*api.Server)
t.Run(tC.desc, func(t *testing.T) { t.Run(tC.desc, func(t *testing.T) {
got, err := s.ResolveNameOrAddress(tC.name) got, err := s.ResolveNameOrAddress(tC.name)
......
...@@ -7,6 +7,7 @@ package api ...@@ -7,6 +7,7 @@ package api
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"strings"
"github.com/ethersphere/bee/pkg/file/pipeline/builder" "github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/jsonhttp" "github.com/ethersphere/bee/pkg/jsonhttp"
...@@ -66,6 +67,16 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -66,6 +67,16 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
} }
if strings.ToLower(r.Header.Get(SwarmPinHeader)) == "true" {
if err := s.pinning.CreatePin(ctx, address, false); err != nil {
logger.Debugf("bytes upload: creation of pin for %q failed: %v", address, err)
logger.Error("bytes upload: creation of pin failed")
jsonhttp.InternalServerError(w, nil)
return
}
}
w.Header().Set(SwarmTagHeader, fmt.Sprint(tag.Uid)) w.Header().Set(SwarmTagHeader, fmt.Sprint(tag.Uid))
w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader) w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader)
jsonhttp.OK(w, bytesPostResponse{ jsonhttp.OK(w, bytesPostResponse{
......
...@@ -6,38 +6,43 @@ package api_test ...@@ -6,38 +6,43 @@ package api_test
import ( import (
"bytes" "bytes"
"context"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"testing" "testing"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/api" "github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp" "github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest" "github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
pinning "github.com/ethersphere/bee/pkg/pinning/mock"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage/mock" "github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
mockbytes "gitlab.com/nolash/go-mockbytes" "gitlab.com/nolash/go-mockbytes"
) )
// TestBytes tests that the data upload api responds as expected when uploading, // TestBytes tests that the data upload api responds as expected when uploading,
// downloading and requesting a resource that cannot be found. // downloading and requesting a resource that cannot be found.
func TestBytes(t *testing.T) { func TestBytes(t *testing.T) {
const (
resource = "/bytes"
targets = "0x222"
expHash = "29a5fb121ce96194ba8b7b823a1f9c6af87e1791f824940a53b5a7efe3f790d9"
)
var ( var (
resource = "/bytes" storerMock = mock.NewStorer()
targets = "0x222" pinningMock = pinning.NewServiceMock()
expHash = "29a5fb121ce96194ba8b7b823a1f9c6af87e1791f824940a53b5a7efe3f790d9" client, _, _ = newTestServer(t, testServerOptions{
mockStorer = mock.NewStorer() Storer: storerMock,
mockStatestore = statestore.NewStateStore() Tags: tags.NewTags(statestore.NewStateStore(), logging.New(ioutil.Discard, 0)),
logger = logging.New(ioutil.Discard, 0) Pinning: pinningMock,
client, _, _ = newTestServer(t, testServerOptions{ Logger: logging.New(ioutil.Discard, 5),
Storer: mockStorer,
Tags: tags.NewTags(mockStatestore, logger),
Logger: logging.New(ioutil.Discard, 5),
}) })
) )
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255) g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
content, err := g.SequentialBytes(swarm.ChunkSize * 2) content, err := g.SequentialBytes(swarm.ChunkSize * 2)
if err != nil { if err != nil {
...@@ -45,12 +50,54 @@ func TestBytes(t *testing.T) { ...@@ -45,12 +50,54 @@ func TestBytes(t *testing.T) {
} }
t.Run("upload", func(t *testing.T) { t.Run("upload", func(t *testing.T) {
chunkAddr := swarm.MustParseHexAddress(expHash)
jsonhttptest.Request(t, client, http.MethodPost, resource, http.StatusOK, jsonhttptest.Request(t, client, http.MethodPost, resource, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(content)), jsonhttptest.WithRequestBody(bytes.NewReader(content)),
jsonhttptest.WithExpectedJSONResponse(api.BytesPostResponse{ jsonhttptest.WithExpectedJSONResponse(api.BytesPostResponse{
Reference: swarm.MustParseHexAddress(expHash), Reference: chunkAddr,
}), }),
) )
has, err := storerMock.Has(context.Background(), chunkAddr)
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("storer check root chunk address: have none; want one")
}
if have, want := len(pinningMock.Entries()), 0; have != want {
t.Fatalf("root pin count mismatch: have %d; want %d", have, want)
}
})
t.Run("upload-with-pinning", func(t *testing.T) {
var res api.BytesPostResponse
jsonhttptest.Request(t, client, http.MethodPost, resource, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(content)),
jsonhttptest.WithRequestHeader(api.SwarmPinHeader, "true"),
jsonhttptest.WithUnmarshalJSONResponse(&res),
)
chunkAddr := res.Reference
has, err := storerMock.Has(context.Background(), chunkAddr)
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("storer check root chunk address: have none; want one")
}
if have, want := len(pinningMock.Entries()), 1; have != want {
t.Fatalf("root pin count mismatch: have %d; want %d", have, want)
}
addrs, err := pinningMock.Pins()
if err != nil {
t.Fatal(err)
}
if have, want := addrs[0], chunkAddr; !have.Equal(want) {
t.Fatalf("root pin reference mismatch: have %q; want %q", have, want)
}
}) })
t.Run("download", func(t *testing.T) { t.Run("download", func(t *testing.T) {
......
...@@ -224,6 +224,16 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -224,6 +224,16 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
} }
if strings.ToLower(r.Header.Get(SwarmPinHeader)) == "true" {
if err := s.pinning.CreatePin(ctx, manifestReference, false); err != nil {
logger.Debugf("bzz upload file: creation of pin for %q failed: %v", manifestReference, err)
logger.Error("bzz upload file: creation of pin failed")
jsonhttp.InternalServerError(w, nil)
return
}
}
w.Header().Set("ETag", fmt.Sprintf("%q", manifestReference.String())) w.Header().Set("ETag", fmt.Sprintf("%q", manifestReference.String()))
w.Header().Set(SwarmTagHeader, fmt.Sprint(tag.Uid)) w.Header().Set(SwarmTagHeader, fmt.Sprint(tag.Uid))
w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader) w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader)
......
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest" "github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/manifest" "github.com/ethersphere/bee/pkg/manifest"
pinning "github.com/ethersphere/bee/pkg/pinning/mock"
statestore "github.com/ethersphere/bee/pkg/statestore/mock" statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
smock "github.com/ethersphere/bee/pkg/storage/mock" smock "github.com/ethersphere/bee/pkg/storage/mock"
...@@ -36,12 +37,15 @@ func TestBzzFiles(t *testing.T) { ...@@ -36,12 +37,15 @@ func TestBzzFiles(t *testing.T) {
targets = "0x222" targets = "0x222"
fileDownloadResource = func(addr string) string { return "/bzz/" + addr } fileDownloadResource = func(addr string) string { return "/bzz/" + addr }
simpleData = []byte("this is a simple text") simpleData = []byte("this is a simple text")
mockStatestore = statestore.NewStateStore() storerMock = smock.NewStorer()
statestoreMock = statestore.NewStateStore()
pinningMock = pinning.NewServiceMock()
logger = logging.New(ioutil.Discard, 0) logger = logging.New(ioutil.Discard, 0)
client, _, _ = newTestServer(t, testServerOptions{ client, _, _ = newTestServer(t, testServerOptions{
Storer: smock.NewStorer(), Storer: storerMock,
Tags: tags.NewTags(mockStatestore, logger), Pinning: pinningMock,
Logger: logger, Tags: tags.NewTags(statestoreMock, logger),
Logger: logger,
}) })
) )
...@@ -83,14 +87,83 @@ func TestBzzFiles(t *testing.T) { ...@@ -83,14 +87,83 @@ func TestBzzFiles(t *testing.T) {
}, },
}, },
}) })
rootHash := "f30c0aa7e9e2a0ef4c9b1b750ebfeaeb7c7c24da700bb089da19a46e3677824b" address := swarm.MustParseHexAddress("f30c0aa7e9e2a0ef4c9b1b750ebfeaeb7c7c24da700bb089da19a46e3677824b")
jsonhttptest.Request(t, client, http.MethodPost, fileUploadResource, http.StatusOK, jsonhttptest.Request(t, client, http.MethodPost, fileUploadResource, http.StatusOK,
jsonhttptest.WithRequestBody(tr), jsonhttptest.WithRequestBody(tr),
jsonhttptest.WithRequestHeader("Content-Type", api.ContentTypeTar), jsonhttptest.WithRequestHeader("Content-Type", api.ContentTypeTar),
jsonhttptest.WithExpectedJSONResponse(api.BzzUploadResponse{ jsonhttptest.WithExpectedJSONResponse(api.BzzUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash), Reference: address,
}), }),
) )
has, err := storerMock.Has(context.Background(), address)
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("storer check root chunk address: have none; want one")
}
if have, want := len(pinningMock.Entries()), 0; have != want {
t.Fatalf("root pin count mismatch: have %d; want %d", have, want)
}
})
t.Run("tar-file-upload-with-pinning", func(t *testing.T) {
tr := tarFiles(t, []f{
{
data: []byte("robots text"),
name: "robots.txt",
dir: "",
header: http.Header{
"Content-Type": {"text/plain; charset=utf-8"},
},
},
{
data: []byte("image 1"),
name: "1.png",
dir: "img",
header: http.Header{
"Content-Type": {"image/png"},
},
},
{
data: []byte("image 2"),
name: "2.png",
dir: "img",
header: http.Header{
"Content-Type": {"image/png"},
},
},
})
address := swarm.MustParseHexAddress("f30c0aa7e9e2a0ef4c9b1b750ebfeaeb7c7c24da700bb089da19a46e3677824b")
jsonhttptest.Request(t, client, http.MethodPost, fileUploadResource, http.StatusOK,
jsonhttptest.WithRequestHeader(api.SwarmPinHeader, "true"),
jsonhttptest.WithRequestBody(tr),
jsonhttptest.WithRequestHeader("Content-Type", api.ContentTypeTar),
jsonhttptest.WithExpectedJSONResponse(api.BzzUploadResponse{
Reference: address,
}),
)
has, err := storerMock.Has(context.Background(), address)
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("storer check root chunk address: have none; want one")
}
if have, want := len(pinningMock.Entries()), 1; have != want {
t.Fatalf("root pin count mismatch: have %d; want %d", have, want)
}
addrs, err := pinningMock.Pins()
if err != nil {
t.Fatal(err)
}
if have, want := addrs[0], address; !have.Equal(want) {
t.Fatalf("root pin reference mismatch: have %q; want %q", have, want)
}
}) })
t.Run("encrypt-decrypt", func(t *testing.T) { t.Run("encrypt-decrypt", func(t *testing.T) {
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strings"
"github.com/ethersphere/bee/pkg/cac" "github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/netstore" "github.com/ethersphere/bee/pkg/netstore"
...@@ -111,6 +112,15 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -111,6 +112,15 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set(SwarmTagHeader, fmt.Sprint(tag.Uid)) w.Header().Set(SwarmTagHeader, fmt.Sprint(tag.Uid))
} }
if strings.ToLower(r.Header.Get(SwarmPinHeader)) == "true" {
if err := s.pinning.CreatePin(ctx, chunk.Address(), false); err != nil {
s.logger.Debugf("chunk upload: creation of pin for %q failed: %v", chunk.Address(), err)
s.logger.Error("chunk upload: creation of pin failed")
jsonhttp.InternalServerError(w, nil)
return
}
}
w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader) w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader)
jsonhttp.OK(w, chunkAddressResponse{Reference: chunk.Address()}) jsonhttp.OK(w, chunkAddressResponse{Reference: chunk.Address()})
} }
......
...@@ -6,11 +6,13 @@ package api_test ...@@ -6,11 +6,13 @@ package api_test
import ( import (
"bytes" "bytes"
"context"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"testing" "testing"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
pinning "github.com/ethersphere/bee/pkg/pinning/mock"
statestore "github.com/ethersphere/bee/pkg/statestore/mock" statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
...@@ -34,13 +36,15 @@ func TestChunkUploadDownload(t *testing.T) { ...@@ -34,13 +36,15 @@ func TestChunkUploadDownload(t *testing.T) {
chunksResource = func(a swarm.Address) string { return "/chunks/" + a.String() } chunksResource = func(a swarm.Address) string { return "/chunks/" + a.String() }
resourceTargets = func(addr swarm.Address) string { return "/chunks/" + addr.String() + "?targets=" + targets } resourceTargets = func(addr swarm.Address) string { return "/chunks/" + addr.String() + "?targets=" + targets }
chunk = testingc.GenerateTestRandomChunk() chunk = testingc.GenerateTestRandomChunk()
mockStatestore = statestore.NewStateStore() statestoreMock = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0) logger = logging.New(ioutil.Discard, 0)
tag = tags.NewTags(mockStatestore, logger) tag = tags.NewTags(statestoreMock, logger)
mockStorer = mock.NewStorer() storerMock = mock.NewStorer()
pinningMock = pinning.NewServiceMock()
client, _, _ = newTestServer(t, testServerOptions{ client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer, Storer: storerMock,
Tags: tag, Pinning: pinningMock,
Tags: tag,
}) })
) )
...@@ -79,7 +83,7 @@ func TestChunkUploadDownload(t *testing.T) { ...@@ -79,7 +83,7 @@ func TestChunkUploadDownload(t *testing.T) {
) )
// Also check if the chunk is NOT pinned // Also check if the chunk is NOT pinned
if mockStorer.GetModeSet(chunk.Address()) == storage.ModeSetPin { if storerMock.GetModeSet(chunk.Address()) == storage.ModeSetPin {
t.Fatal("chunk should not be pinned") t.Fatal("chunk should not be pinned")
} }
}) })
...@@ -90,20 +94,35 @@ func TestChunkUploadDownload(t *testing.T) { ...@@ -90,20 +94,35 @@ func TestChunkUploadDownload(t *testing.T) {
) )
// Also check if the chunk is NOT pinned // Also check if the chunk is NOT pinned
if mockStorer.GetModeSet(chunk.Address()) == storage.ModeSetPin { if storerMock.GetModeSet(chunk.Address()) == storage.ModeSetPin {
t.Fatal("chunk should not be pinned") t.Fatal("chunk should not be pinned")
} }
}) })
t.Run("pin-ok", func(t *testing.T) { t.Run("pin-ok", func(t *testing.T) {
address := chunk.Address()
jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusOK, jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())), jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())),
jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: chunk.Address()}), jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: address}),
jsonhttptest.WithRequestHeader(api.SwarmPinHeader, "True"), jsonhttptest.WithRequestHeader(api.SwarmPinHeader, "True"),
) )
// Also check if the chunk is pinned has, err := storerMock.Has(context.Background(), address)
if mockStorer.GetModePut(chunk.Address()) != storage.ModePutUploadPin { if err != nil {
t.Fatal("chunk is not pinned") t.Fatal(err)
}
if !has {
t.Fatal("storer check root chunk address: have none; want one")
}
if have, want := len(pinningMock.Entries()), 1; have != want {
t.Fatalf("root pin count mismatch: have %d; want %d", have, want)
}
addrs, err := pinningMock.Pins()
if err != nil {
t.Fatal(err)
}
if have, want := addrs[0], address; !have.Equal(want) {
t.Fatalf("root pin reference mismatch: have %q; want %q", have, want)
} }
}) })
......
...@@ -95,6 +95,16 @@ func (s *server) dirUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -95,6 +95,16 @@ func (s *server) dirUploadHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
} }
if strings.ToLower(r.Header.Get(SwarmPinHeader)) == "true" {
if err := s.pinning.CreatePin(r.Context(), reference, false); err != nil {
logger.Debugf("bzz upload dir: creation of pin for %q failed: %v", reference, err)
logger.Error("bzz upload dir: creation of pin failed")
jsonhttp.InternalServerError(w, nil)
return
}
}
w.Header().Set(SwarmTagHeader, fmt.Sprint(tag.Uid)) w.Header().Set(SwarmTagHeader, fmt.Sprint(tag.Uid))
jsonhttp.OK(w, bzzUploadResponse{ jsonhttp.OK(w, bzzUploadResponse{
Reference: reference, Reference: reference,
......
...@@ -9,17 +9,14 @@ import "github.com/ethersphere/bee/pkg/swarm" ...@@ -9,17 +9,14 @@ import "github.com/ethersphere/bee/pkg/swarm"
type Server = server type Server = server
type ( type (
BytesPostResponse = bytesPostResponse BytesPostResponse = bytesPostResponse
ChunkAddressResponse = chunkAddressResponse ChunkAddressResponse = chunkAddressResponse
SocPostResponse = socPostResponse SocPostResponse = socPostResponse
FeedReferenceResponse = feedReferenceResponse FeedReferenceResponse = feedReferenceResponse
BzzUploadResponse = bzzUploadResponse BzzUploadResponse = bzzUploadResponse
TagResponse = tagResponse TagResponse = tagResponse
TagRequest = tagRequest TagRequest = tagRequest
ListTagsResponse = listTagsResponse ListTagsResponse = listTagsResponse
PinnedChunk = pinnedChunk
ListPinnedChunksResponse = listPinnedChunksResponse
UpdatePinCounter = updatePinCounter
) )
var ( var (
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -170,6 +171,16 @@ func (s *server) feedPostHandler(w http.ResponseWriter, r *http.Request) { ...@@ -170,6 +171,16 @@ func (s *server) feedPostHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, nil) jsonhttp.InternalServerError(w, nil)
return return
} }
if strings.ToLower(r.Header.Get(SwarmPinHeader)) == "true" {
if err := s.pinning.CreatePin(r.Context(), ref, false); err != nil {
s.logger.Debugf("feed post: creation of pin for %q failed: %v", ref, err)
s.logger.Error("feed post: creation of pin failed")
jsonhttp.InternalServerError(w, nil)
return
}
}
jsonhttp.Created(w, feedReferenceResponse{Reference: ref}) jsonhttp.Created(w, feedReferenceResponse{Reference: ref})
} }
......
...@@ -36,11 +36,11 @@ func TestGatewayMode(t *testing.T) { ...@@ -36,11 +36,11 @@ func TestGatewayMode(t *testing.T) {
}) })
t.Run("pinning endpoints", func(t *testing.T) { t.Run("pinning endpoints", func(t *testing.T) {
path := "/pin/chunks/0773a91efd6547c754fc1d95fb1c62c7d1b47f959c2caa685dfec8736da95c1c" path := "/pins/0773a91efd6547c754fc1d95fb1c62c7d1b47f959c2caa685dfec8736da95c1c"
jsonhttptest.Request(t, client, http.MethodGet, path, http.StatusForbidden, forbiddenResponseOption) jsonhttptest.Request(t, client, http.MethodGet, path, http.StatusForbidden, forbiddenResponseOption)
jsonhttptest.Request(t, client, http.MethodPost, path, http.StatusForbidden, forbiddenResponseOption) jsonhttptest.Request(t, client, http.MethodPost, path, http.StatusForbidden, forbiddenResponseOption)
jsonhttptest.Request(t, client, http.MethodDelete, path, http.StatusForbidden, forbiddenResponseOption) jsonhttptest.Request(t, client, http.MethodDelete, path, http.StatusForbidden, forbiddenResponseOption)
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks", http.StatusForbidden, forbiddenResponseOption) jsonhttptest.Request(t, client, http.MethodGet, "/pins", http.StatusForbidden, forbiddenResponseOption)
}) })
t.Run("tags endpoints", func(t *testing.T) { t.Run("tags endpoints", func(t *testing.T) {
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"errors"
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/pinning"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/gorilla/mux"
)
// pinRootHash pins root hash of given address. This method is idempotent.
func (s *server) pinRootHash(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.logger.Debugf("pin root hash: unable to parse address %q: %v", addr, err)
s.logger.Error("pin root hash: unable to parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.pinning.HasPin(addr)
if err != nil {
s.logger.Debugf("pin root hash: checking of tracking pin for %q failed: %v", addr, err)
s.logger.Error("pin root hash: checking of tracking pin failed")
jsonhttp.InternalServerError(w, nil)
return
}
if has {
jsonhttp.OK(w, nil)
return
}
err = s.pinning.CreatePin(r.Context(), addr, true)
if err != nil {
s.logger.Debugf("pin root hash: creation of tracking pin for %q failed: %v", addr, err)
s.logger.Error("pin root hash: creation of tracking pin failed")
jsonhttp.InternalServerError(w, nil)
return
}
jsonhttp.OK(w, nil)
}
// unpinRootHash unpin's an already pinned root hash. This method is idempotent.
func (s *server) unpinRootHash(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.logger.Debugf("unpin root hash: unable to parse address: %v", err)
s.logger.Error("unpin root hash: unable to parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.pinning.HasPin(addr)
if err != nil {
s.logger.Debugf("pin root hash: checking of tracking pin for %q failed: %v", addr, err)
s.logger.Error("pin root hash: checking of tracking pin failed")
jsonhttp.InternalServerError(w, nil)
return
}
if !has {
jsonhttp.NotFound(w, nil)
return
}
switch err := s.pinning.DeletePin(r.Context(), addr); {
case errors.Is(err, pinning.ErrTraversal):
s.logger.Debugf("unpin root hash: deletion of pin for %q failed: %v", addr, err)
jsonhttp.InternalServerError(w, nil)
return
case err != nil:
s.logger.Debugf("unpin root hash: deletion of pin for %q failed: %v", addr, err)
s.logger.Error("unpin root hash: deletion of pin for failed")
jsonhttp.InternalServerError(w, nil)
return
}
jsonhttp.OK(w, nil)
}
// getPinnedRootHash returns back the given address if its root hash is pinned.
func (s *server) getPinnedRootHash(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.logger.Debugf("pinned root hash: unable to parse address %q: %v", addr, err)
s.logger.Error("pinned root hash: unable to parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.pinning.HasPin(addr)
if err != nil {
s.logger.Debugf("pinned root hash: unable to check address %q in the localstore: %v", addr, err)
s.logger.Error("pinned root hash: unable to check address in the localstore")
jsonhttp.InternalServerError(w, nil)
return
}
if !has {
jsonhttp.NotFound(w, nil)
return
}
jsonhttp.OK(w, struct {
Address swarm.Address `json:"address"`
}{
Address: addr,
})
}
// listPinnedRootHashes lists all the address of the pinned root hashes..
func (s *server) listPinnedRootHashes(w http.ResponseWriter, r *http.Request) {
pinned, err := s.pinning.Pins()
if err != nil {
s.logger.Debugf("list pinned root addresses: unable to list addresses: %v", err)
s.logger.Error("list pinned root addresses: unable to list addresses")
jsonhttp.InternalServerError(w, nil)
return
}
jsonhttp.OK(w, struct {
Addresses []swarm.Address `json:"addresses"`
}{
Addresses: pinned,
})
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"errors"
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/gorilla/mux"
)
// pinBytes is used to pin an already uploaded content.
func (s *server) pinBytes(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.logger.Debugf("pin bytes: parse address: %v", err)
s.logger.Error("pin bytes: parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.storer.Has(r.Context(), addr)
if err != nil {
s.logger.Debugf("pin bytes: localstore has: %v", err)
s.logger.Error("pin bytes: store")
jsonhttp.InternalServerError(w, err)
return
}
if !has {
_, err := s.storer.Get(r.Context(), storage.ModeGetRequest, addr)
if err != nil {
s.logger.Debugf("pin chunk: netstore get: %v", err)
s.logger.Error("pin chunk: netstore")
jsonhttp.NotFound(w, nil)
return
}
}
ctx := r.Context()
chunkAddressFn := s.pinChunkAddressFn(ctx, addr)
err = s.traversal.TraverseBytesAddresses(ctx, addr, chunkAddressFn)
if err != nil {
s.logger.Debugf("pin bytes: traverse chunks: %v, addr %s", err, addr)
if errors.Is(err, traversal.ErrInvalidType) {
s.logger.Error("pin bytes: invalid type")
jsonhttp.BadRequest(w, "invalid type")
return
}
s.logger.Error("pin bytes: cannot pin")
jsonhttp.InternalServerError(w, "cannot pin")
return
}
jsonhttp.OK(w, nil)
}
// unpinBytes removes pinning from content.
func (s *server) unpinBytes(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.logger.Debugf("pin bytes: parse address: %v", err)
s.logger.Error("pin bytes: parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.storer.Has(r.Context(), addr)
if err != nil {
s.logger.Debugf("pin bytes: localstore has: %v", err)
s.logger.Error("pin bytes: store")
jsonhttp.InternalServerError(w, err)
return
}
if !has {
jsonhttp.NotFound(w, nil)
return
}
ctx := r.Context()
chunkAddressFn := s.unpinChunkAddressFn(ctx, addr)
err = s.traversal.TraverseBytesAddresses(ctx, addr, chunkAddressFn)
if err != nil {
s.logger.Debugf("pin bytes: traverse chunks: %v, addr %s", err, addr)
if errors.Is(err, traversal.ErrInvalidType) {
s.logger.Error("pin bytes: invalid type")
jsonhttp.BadRequest(w, "invalid type")
return
}
s.logger.Error("pin bytes: cannot unpin")
jsonhttp.InternalServerError(w, "cannot unpin")
return
}
jsonhttp.OK(w, nil)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"bytes"
"io/ioutil"
"net/http"
"sort"
"testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/traversal"
)
func TestPinBytesHandler(t *testing.T) {
var (
bytesUploadResource = "/bytes"
pinBytesResource = "/pin/bytes"
pinBytesAddressResource = func(addr string) string { return pinBytesResource + "/" + addr }
pinChunksResource = "/pin/chunks"
simpleData = []byte("this is a simple text")
mockStorer = mock.NewStorer()
mockStatestore = statestore.NewStateStore()
traversalService = traversal.NewService(mockStorer)
logger = logging.New(ioutil.Discard, 0)
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Traversal: traversalService,
Tags: tags.NewTags(mockStatestore, logger),
})
)
t.Run("pin-bytes-1", func(t *testing.T) {
rootHash := "838d0a193ecd1152d1bb1432d5ecc02398533b2494889e23b8bd5ace30ac2aeb"
jsonhttptest.Request(t, client, http.MethodPost, bytesUploadResource, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(simpleData)),
jsonhttptest.WithExpectedJSONResponse(api.BzzUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash),
}),
)
jsonhttptest.Request(t, client, http.MethodPost, pinBytesAddressResource(rootHash), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
hashes := []string{rootHash}
sort.Strings(hashes)
expectedResponse := api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{},
}
for _, h := range hashes {
expectedResponse.Chunks = append(expectedResponse.Chunks, api.PinnedChunk{
Address: swarm.MustParseHexAddress(h),
PinCounter: 1,
})
}
jsonhttptest.Request(t, client, http.MethodGet, pinChunksResource, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(expectedResponse),
)
})
t.Run("unpin-bytes-1", func(t *testing.T) {
rootHash := "838d0a193ecd1152d1bb1432d5ecc02398533b2494889e23b8bd5ace30ac2aeb"
jsonhttptest.Request(t, client, http.MethodDelete, pinBytesAddressResource(rootHash), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
jsonhttptest.Request(t, client, http.MethodGet, pinChunksResource, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{},
}),
)
})
t.Run("pin-bytes-2", func(t *testing.T) {
var b []byte
for {
b = append(b, simpleData...)
if len(b) > swarm.ChunkSize {
break
}
}
rootHash := "42ee01ae3a50663ca0903f2d5c3b55fc5ef4faf98368b74cf9a24c75955fa388"
data1Hash := "933db58bbd119e5d3a8eb4fc7d4a923d5e7cfc7ca35b07832ced495d05721b6d"
data2Hash := "430274f4e6d2af72b5491ad0dc7707e45892fb3a166c54b6ac9cee3c14149757"
jsonhttptest.Request(t, client, http.MethodPost, bytesUploadResource, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(b)),
jsonhttptest.WithExpectedJSONResponse(api.BzzUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash),
}),
)
jsonhttptest.Request(t, client, http.MethodPost, pinBytesAddressResource(rootHash), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
hashes := []string{rootHash, data1Hash, data2Hash}
sort.Strings(hashes)
// NOTE: all this because we cannot rely on sort from response
var resp api.ListPinnedChunksResponse
jsonhttptest.Request(t, client, http.MethodGet, pinChunksResource, http.StatusOK,
jsonhttptest.WithUnmarshalJSONResponse(&resp),
)
if len(hashes) != len(resp.Chunks) {
t.Fatalf("expected to find %d pinned chunks, got %d", len(hashes), len(resp.Chunks))
}
respChunksHashes := make([]string, 0)
for _, rc := range resp.Chunks {
respChunksHashes = append(respChunksHashes, rc.Address.String())
}
sort.Strings(respChunksHashes)
for i, h := range hashes {
if h != respChunksHashes[i] {
t.Fatalf("expected to find %s address, found %s", h, respChunksHashes[i])
}
}
})
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"errors"
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/gorilla/mux"
)
// pinBzz is used to pin an already uploaded content.
func (s *server) pinBzz(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.logger.Debugf("pin bzz: parse address: %v", err)
s.logger.Error("pin bzz: parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.storer.Has(r.Context(), addr)
if err != nil {
s.logger.Debugf("pin bzz: localstore has: %v", err)
s.logger.Error("pin bzz: store")
jsonhttp.InternalServerError(w, err)
return
}
if !has {
_, err := s.storer.Get(r.Context(), storage.ModeGetRequest, addr)
if err != nil {
s.logger.Debugf("pin chunk: netstore get: %v", err)
s.logger.Error("pin chunk: netstore")
jsonhttp.NotFound(w, nil)
return
}
}
ctx := r.Context()
chunkAddressFn := s.pinChunkAddressFn(ctx, addr)
err = s.traversal.TraverseManifestAddresses(ctx, addr, chunkAddressFn)
if err != nil {
s.logger.Debugf("pin bzz: traverse chunks: %v, addr %s", err, addr)
if errors.Is(err, traversal.ErrInvalidType) {
s.logger.Error("pin bzz: invalid type")
jsonhttp.BadRequest(w, "invalid type")
return
}
s.logger.Error("pin bzz: cannot pin")
jsonhttp.InternalServerError(w, "cannot pin")
return
}
jsonhttp.OK(w, nil)
}
// unpinBzz removes pinning from content.
func (s *server) unpinBzz(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.logger.Debugf("pin bzz: parse address: %v", err)
s.logger.Error("pin bzz: parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.storer.Has(r.Context(), addr)
if err != nil {
s.logger.Debugf("pin bzz: localstore has: %v", err)
s.logger.Error("pin bzz: store")
jsonhttp.InternalServerError(w, err)
return
}
if !has {
jsonhttp.NotFound(w, nil)
return
}
ctx := r.Context()
chunkAddressFn := s.unpinChunkAddressFn(ctx, addr)
err = s.traversal.TraverseManifestAddresses(ctx, addr, chunkAddressFn)
if err != nil {
s.logger.Debugf("pin bzz: traverse chunks: %v, addr %s", err, addr)
if errors.Is(err, traversal.ErrInvalidType) {
s.logger.Error("pin bzz: invalid type")
jsonhttp.BadRequest(w, "invalid type")
return
}
s.logger.Error("pin bzz: cannot unpin")
jsonhttp.InternalServerError(w, "cannot unpin")
return
}
jsonhttp.OK(w, nil)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"io/ioutil"
"net/http"
"testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/traversal"
)
func TestPinBzzHandler(t *testing.T) {
var (
dirUploadResource = "/bzz"
pinBzzResource = "/pin/bzz"
pinBzzAddressResource = func(addr string) string { return pinBzzResource + "/" + addr }
pinChunksResource = "/pin/chunks"
mockStorer = mock.NewStorer()
mockStatestore = statestore.NewStateStore()
traversalService = traversal.NewService(mockStorer)
logger = logging.New(ioutil.Discard, 0)
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Traversal: traversalService,
Tags: tags.NewTags(mockStatestore, logger),
Logger: logger,
})
)
t.Run("pin-bzz-1", func(t *testing.T) {
files := []f{
{
data: []byte("<h1>Swarm"),
name: "index.html",
dir: "",
},
}
tarReader := tarFiles(t, files)
rootHash := "9e178dbd1ed4b748379e25144e28dfb29c07a4b5114896ef454480115a56b237"
// verify directory tar upload response
jsonhttptest.Request(t, client, http.MethodPost, dirUploadResource, http.StatusOK,
jsonhttptest.WithRequestBody(tarReader),
jsonhttptest.WithRequestHeader("Content-Type", api.ContentTypeTar),
jsonhttptest.WithRequestHeader(api.SwarmCollectionHeader, "True"),
jsonhttptest.WithExpectedJSONResponse(api.BzzUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash),
}),
)
jsonhttptest.Request(t, client, http.MethodPost, pinBzzAddressResource(rootHash), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
expectedChunkCount := 3
// get the reference as everytime it will change because of random encryption key
var resp api.ListPinnedChunksResponse
jsonhttptest.Request(t, client, http.MethodGet, pinChunksResource, http.StatusOK,
jsonhttptest.WithUnmarshalJSONResponse(&resp),
)
if expectedChunkCount != len(resp.Chunks) {
t.Fatalf("expected to find %d pinned chunks, got %d", expectedChunkCount, len(resp.Chunks))
}
})
t.Run("unpin-bzz-1", func(t *testing.T) {
rootHash := "9e178dbd1ed4b748379e25144e28dfb29c07a4b5114896ef454480115a56b237"
jsonhttptest.Request(t, client, http.MethodDelete, pinBzzAddressResource(rootHash), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
jsonhttptest.Request(t, client, http.MethodGet, pinChunksResource, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{},
}),
)
})
}
This diff is collapsed.
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"bytes"
"io/ioutil"
"net/http"
"testing"
"github.com/ethersphere/bee/pkg/logging"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/storage/mock"
testingc "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/tags"
)
// TestPinChunkHandler checks for pinning, unpinning and listing of chunks.
// It also check other edgw cases like chunk not present and checking for pinning,
// invalid chunk address case etc. This test case has to be run in sequence and
// it assumes some state of the DB before another case is run.
func TestPinChunkHandler(t *testing.T) {
var (
chunksEndpoint = "/chunks"
chunk = testingc.GenerateTestRandomChunk()
mockStorer = mock.NewStorer()
mockStatestore = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0)
tag = tags.NewTags(mockStatestore, logger)
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Tags: tag,
Logger: logger,
})
)
// bad chunk address
t.Run("pin-bad-address", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodPost, "/pin/chunks/abcd1100zz", http.StatusBadRequest,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: "bad address",
Code: http.StatusBadRequest,
}),
)
})
// list pins without anything pinned
t.Run("list-pins-zero-pins", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks", http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{},
}),
)
})
// pin a chunk which is not existing
t.Run("pin-absent-chunk", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodPost, "/pin/chunks/123456", http.StatusNotFound,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusNotFound),
Code: http.StatusNotFound,
}),
)
})
// unpin on a chunk which is not pinned
t.Run("unpin-while-not-pinned", func(t *testing.T) {
// Post a chunk
jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())),
jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: chunk.Address()}),
)
jsonhttptest.Request(t, client, http.MethodDelete, "/pin/chunks/"+chunk.Address().String(), http.StatusBadRequest,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: "chunk is not yet pinned",
Code: http.StatusBadRequest,
}),
)
})
// pin a existing chunk first time
t.Run("pin-chunk-1", func(t *testing.T) {
// Post a chunk
jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())),
jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: chunk.Address()}),
)
jsonhttptest.Request(t, client, http.MethodPost, "/pin/chunks/"+chunk.Address().String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
// Check is the chunk is pinned once
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks/"+chunk.Address().String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.PinnedChunk{
Address: chunk.Address(),
PinCounter: 1,
}),
)
})
// pin a existing chunk second time
t.Run("pin-chunk-2", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodPost, "/pin/chunks/"+chunk.Address().String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
// Check is the chunk is pinned twice
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks/"+chunk.Address().String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.PinnedChunk{
Address: chunk.Address(),
PinCounter: 2,
}),
)
})
// unpin a chunk first time
t.Run("unpin-chunk-1", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodDelete, "/pin/chunks/"+chunk.Address().String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
// Check is the chunk is pinned once
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks/"+chunk.Address().String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.PinnedChunk{
Address: chunk.Address(),
PinCounter: 1,
}),
)
})
// unpin a chunk second time
t.Run("unpin-chunk-2", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodDelete, "/pin/chunks/"+chunk.Address().String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
// Check if the chunk is removed from the pinIndex
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks/"+chunk.Address().String(), http.StatusNotFound,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusNotFound),
Code: http.StatusNotFound,
}),
)
})
// Add 2 chunks, pin it and check if they show up in the list
t.Run("list-chunks", func(t *testing.T) {
// Post a chunk
jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())),
jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: chunk.Address()}),
)
jsonhttptest.Request(t, client, http.MethodPost, "/pin/chunks/"+chunk.Address().String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
// post another chunk
chunk2 := testingc.GenerateTestRandomChunk()
jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(chunk2.Data())),
jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: chunk2.Address()}),
)
jsonhttptest.Request(t, client, http.MethodPost, "/pin/chunks/"+chunk2.Address().String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks", http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{
{
Address: chunk.Address(),
PinCounter: 1,
},
{
Address: chunk2.Address(),
PinCounter: 1,
},
},
}),
)
})
t.Run("update-pin-counter-up", func(t *testing.T) {
updatePinCounter := api.UpdatePinCounter{
PinCounter: 7,
}
jsonhttptest.Request(t, client, http.MethodPut, "/pin/chunks/"+chunk.Address().String(), http.StatusOK,
jsonhttptest.WithJSONRequestBody(updatePinCounter),
jsonhttptest.WithExpectedJSONResponse(api.PinnedChunk{
Address: chunk.Address(),
PinCounter: updatePinCounter.PinCounter,
}),
)
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks/"+chunk.Address().String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.PinnedChunk{
Address: chunk.Address(),
PinCounter: updatePinCounter.PinCounter,
}),
)
})
t.Run("update-pin-counter-to-zero", func(t *testing.T) {
updatePinCounter := api.UpdatePinCounter{
PinCounter: 0,
}
jsonhttptest.Request(t, client, http.MethodPut, "/pin/chunks/"+chunk.Address().String(), http.StatusOK,
jsonhttptest.WithJSONRequestBody(updatePinCounter),
jsonhttptest.WithExpectedJSONResponse(api.PinnedChunk{
Address: chunk.Address(),
PinCounter: updatePinCounter.PinCounter,
}),
)
jsonhttptest.Request(t, client, http.MethodGet, "/pin/chunks/"+chunk.Address().String(), http.StatusNotFound,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusNotFound),
Code: http.StatusNotFound,
}),
)
})
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"errors"
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/gorilla/mux"
)
// pinFile is used to pin an already uploaded content.
func (s *server) pinFile(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.logger.Debugf("pin files: parse address: %v", err)
s.logger.Error("pin files: parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.storer.Has(r.Context(), addr)
if err != nil {
s.logger.Debugf("pin files: localstore has: %v", err)
s.logger.Error("pin files: store")
jsonhttp.InternalServerError(w, err)
return
}
if !has {
_, err := s.storer.Get(r.Context(), storage.ModeGetRequest, addr)
if err != nil {
s.logger.Debugf("pin chunk: netstore get: %v", err)
s.logger.Error("pin chunk: netstore")
jsonhttp.NotFound(w, nil)
return
}
}
ctx := r.Context()
chunkAddressFn := s.pinChunkAddressFn(ctx, addr)
err = s.traversal.TraverseAddresses(ctx, addr, chunkAddressFn)
if err != nil {
s.logger.Debugf("pin files: traverse chunks: %v, addr %s", err, addr)
if errors.Is(err, traversal.ErrInvalidType) {
s.logger.Error("pin files: invalid type")
jsonhttp.BadRequest(w, "invalid type")
return
}
s.logger.Error("pin files: cannot pin")
jsonhttp.InternalServerError(w, "cannot pin")
return
}
jsonhttp.OK(w, nil)
}
// unpinFile removes pinning from content.
func (s *server) unpinFile(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.logger.Debugf("pin files: parse address: %v", err)
s.logger.Error("pin files: parse address")
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.storer.Has(r.Context(), addr)
if err != nil {
s.logger.Debugf("pin files: localstore has: %v", err)
s.logger.Error("pin files: store")
jsonhttp.InternalServerError(w, err)
return
}
if !has {
jsonhttp.NotFound(w, nil)
return
}
ctx := r.Context()
chunkAddressFn := s.unpinChunkAddressFn(ctx, addr)
err = s.traversal.TraverseAddresses(ctx, addr, chunkAddressFn)
if err != nil {
s.logger.Debugf("pin files: traverse chunks: %v, addr %s", err, addr)
if errors.Is(err, traversal.ErrInvalidType) {
s.logger.Error("pin files: invalid type")
jsonhttp.BadRequest(w, "invalid type")
return
}
s.logger.Error("pin files: cannot unpin")
jsonhttp.InternalServerError(w, "cannot unpin")
return
}
jsonhttp.OK(w, nil)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"bytes"
"io/ioutil"
"net/http"
"testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/traversal"
)
func TestPinFilesHandler(t *testing.T) {
var (
fileUploadResource = "/bzz"
pinFilesResource = "/pin/files"
pinFilesAddressResource = func(addr string) string { return pinFilesResource + "/" + addr }
pinChunksResource = "/pin/chunks"
simpleData = []byte("this is a simple text")
mockStorer = mock.NewStorer()
mockStatestore = statestore.NewStateStore()
traversalService = traversal.NewService(mockStorer)
logger = logging.New(ioutil.Discard, 0)
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Traversal: traversalService,
Tags: tags.NewTags(mockStatestore, logger),
Logger: logger,
})
)
t.Run("pin-file-1", func(t *testing.T) {
rootHash := "dd13a5a6cc9db3ef514d645e6719178dbfb1a90b49b9262cafce35b0d27cf245"
metadataHash := "0cc878d32c96126d47f63fbe391114ee1438cd521146fc975dea1546d302b6c0"
metadataHash2 := "a14d1ef845307c634e9ec74539bd668d0d1b37f37de4128939d57098135850da"
contentHash := "838d0a193ecd1152d1bb1432d5ecc02398533b2494889e23b8bd5ace30ac2aeb"
jsonhttptest.Request(t, client, http.MethodPost,
fileUploadResource+"?name=somefile.txt", http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(simpleData)),
jsonhttptest.WithExpectedJSONResponse(api.BzzUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash),
}),
jsonhttptest.WithRequestHeader("Content-Type", "text/plain"),
)
jsonhttptest.Request(t, client, http.MethodPost, pinFilesAddressResource(rootHash), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
hashes := map[string]int{
rootHash: 1,
metadataHash: 1,
metadataHash2: 1,
contentHash: 1,
}
actualResponse := api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{},
}
jsonhttptest.Request(t, client, http.MethodGet, pinChunksResource, http.StatusOK,
jsonhttptest.WithUnmarshalJSONResponse(&actualResponse),
)
if len(actualResponse.Chunks) != len(hashes) {
t.Fatalf("Response chunk count mismatch Expected: %d Found: %d",
len(hashes), len(actualResponse.Chunks))
}
for _, v := range actualResponse.Chunks {
if counter, ok := hashes[v.Address.String()]; !ok {
t.Fatalf("found unexpected hash %s", v.Address.String())
} else if uint64(counter) != v.PinCounter {
t.Fatalf("found unexpected pin counter: Expected: %d, Found: %d",
counter, v.PinCounter)
}
}
})
t.Run("unpin-file-1", func(t *testing.T) {
rootHash := "dd13a5a6cc9db3ef514d645e6719178dbfb1a90b49b9262cafce35b0d27cf245"
jsonhttptest.Request(t, client, http.MethodDelete, pinFilesAddressResource(rootHash), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
jsonhttptest.Request(t, client, http.MethodGet, pinChunksResource, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.ListPinnedChunksResponse{
Chunks: []api.PinnedChunk{},
}),
)
})
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"bytes"
"io/ioutil"
"net/http"
"strings"
"testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
pinning "github.com/ethersphere/bee/pkg/pinning/mock"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage/mock"
testingc "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/traversal"
)
func checkPinHandlers(t *testing.T, client *http.Client, rootHash string) {
t.Helper()
const pinsBasePath = "/pins"
var (
pinsAddressPath = pinsBasePath + "/" + rootHash
pinsInvalidAddressPath = pinsBasePath + "/" + "838d0a193ecd1152d1bb1432d5ecc02398533b2494889e23b8bd5ace30ac2zzz"
pinsUnknownAddressPath = pinsBasePath + "/" + "838d0a193ecd1152d1bb1432d5ecc02398533b2494889e23b8bd5ace30ac2ccc"
)
jsonhttptest.Request(t, client, http.MethodGet, pinsInvalidAddressPath, http.StatusBadRequest)
jsonhttptest.Request(t, client, http.MethodGet, pinsUnknownAddressPath, http.StatusNotFound,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusNotFound),
Code: http.StatusNotFound,
}),
)
jsonhttptest.Request(t, client, http.MethodPost, pinsAddressPath, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
jsonhttptest.Request(t, client, http.MethodGet, pinsAddressPath, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(struct {
Address swarm.Address `json:"address"`
}{
Address: swarm.MustParseHexAddress(rootHash),
}),
)
jsonhttptest.Request(t, client, http.MethodGet, pinsBasePath, http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(struct {
Addresses []swarm.Address `json:"addresses"`
}{
Addresses: []swarm.Address{swarm.MustParseHexAddress(rootHash)},
}),
)
jsonhttptest.Request(t, client, http.MethodDelete, pinsAddressPath, http.StatusOK)
jsonhttptest.Request(t, client, http.MethodGet, pinsAddressPath, http.StatusNotFound,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusNotFound),
Code: http.StatusNotFound,
}),
)
}
func TestPinHandlers(t *testing.T) {
var (
storerMock = mock.NewStorer()
client, _, _ = newTestServer(t, testServerOptions{
Storer: storerMock,
Traversal: traversal.NewService(storerMock),
Tags: tags.NewTags(statestore.NewStateStore(), logging.New(ioutil.Discard, 0)),
Pinning: pinning.NewServiceMock(),
Logger: logging.New(ioutil.Discard, 5),
})
)
t.Run("bytes", func(t *testing.T) {
const rootHash = "838d0a193ecd1152d1bb1432d5ecc02398533b2494889e23b8bd5ace30ac2aeb"
jsonhttptest.Request(t, client, http.MethodPost, "/bytes", http.StatusOK,
jsonhttptest.WithRequestBody(strings.NewReader("this is a simple text")),
jsonhttptest.WithExpectedJSONResponse(api.BzzUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash),
}),
)
checkPinHandlers(t, client, rootHash)
})
t.Run("bzz", func(t *testing.T) {
tarReader := tarFiles(t, []f{{
data: []byte("<h1>Swarm"),
name: "index.html",
dir: "",
}})
rootHash := "9e178dbd1ed4b748379e25144e28dfb29c07a4b5114896ef454480115a56b237"
jsonhttptest.Request(t, client, http.MethodPost, "/bzz", http.StatusOK,
jsonhttptest.WithRequestBody(tarReader),
jsonhttptest.WithRequestHeader("Content-Type", api.ContentTypeTar),
jsonhttptest.WithRequestHeader(api.SwarmCollectionHeader, "True"),
jsonhttptest.WithExpectedJSONResponse(api.BzzUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash),
}),
)
checkPinHandlers(t, client, rootHash)
rootHash = "dd13a5a6cc9db3ef514d645e6719178dbfb1a90b49b9262cafce35b0d27cf245"
jsonhttptest.Request(t, client, http.MethodPost, "/bzz?name=somefile.txt", http.StatusOK,
jsonhttptest.WithRequestHeader("Content-Type", "text/plain"),
jsonhttptest.WithRequestBody(strings.NewReader("this is a simple text")),
jsonhttptest.WithExpectedJSONResponse(api.BzzUploadResponse{
Reference: swarm.MustParseHexAddress(rootHash),
}),
)
checkPinHandlers(t, client, rootHash)
})
t.Run("chunk", func(t *testing.T) {
var (
chunk = testingc.GenerateTestRandomChunk()
rootHash = chunk.Address().String()
)
jsonhttptest.Request(t, client, http.MethodPost, "/chunks", http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())),
jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{
Reference: chunk.Address(),
}),
)
checkPinHandlers(t, client, rootHash)
})
}
...@@ -20,14 +20,19 @@ import ( ...@@ -20,14 +20,19 @@ import (
) )
func (s *server) setupRouting() { func (s *server) setupRouting() {
apiVersion := "v1" // only one api version exists, this should be configurable with more const (
apiVersion = "v1" // Only one api version exists, this should be configurable with more.
rootPath = "/" + apiVersion
)
router := mux.NewRouter()
handle := func(router *mux.Router, path string, handler http.Handler) { // handle is a helper closure which simplifies the router setup.
handle := func(path string, handler http.Handler) {
router.Handle(path, handler) router.Handle(path, handler)
router.Handle("/"+apiVersion+path, handler) router.Handle(rootPath+path, handler)
} }
router := mux.NewRouter()
router.NotFoundHandler = http.HandlerFunc(jsonhttp.NotFoundHandler) router.NotFoundHandler = http.HandlerFunc(jsonhttp.NotFoundHandler)
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
...@@ -38,38 +43,38 @@ func (s *server) setupRouting() { ...@@ -38,38 +43,38 @@ func (s *server) setupRouting() {
fmt.Fprintln(w, "User-agent: *\nDisallow: /") fmt.Fprintln(w, "User-agent: *\nDisallow: /")
}) })
handle(router, "/bytes", jsonhttp.MethodHandler{ handle("/bytes", jsonhttp.MethodHandler{
"POST": web.ChainHandlers( "POST": web.ChainHandlers(
s.newTracingHandler("bytes-upload"), s.newTracingHandler("bytes-upload"),
web.FinalHandlerFunc(s.bytesUploadHandler), web.FinalHandlerFunc(s.bytesUploadHandler),
), ),
}) })
handle(router, "/bytes/{address}", jsonhttp.MethodHandler{ handle("/bytes/{address}", jsonhttp.MethodHandler{
"GET": web.ChainHandlers( "GET": web.ChainHandlers(
s.newTracingHandler("bytes-download"), s.newTracingHandler("bytes-download"),
web.FinalHandlerFunc(s.bytesGetHandler), web.FinalHandlerFunc(s.bytesGetHandler),
), ),
}) })
handle(router, "/chunks", jsonhttp.MethodHandler{ handle("/chunks", jsonhttp.MethodHandler{
"POST": web.ChainHandlers( "POST": web.ChainHandlers(
jsonhttp.NewMaxBodyBytesHandler(swarm.ChunkWithSpanSize), jsonhttp.NewMaxBodyBytesHandler(swarm.ChunkWithSpanSize),
web.FinalHandlerFunc(s.chunkUploadHandler), web.FinalHandlerFunc(s.chunkUploadHandler),
), ),
}) })
handle(router, "/chunks/{addr}", jsonhttp.MethodHandler{ handle("/chunks/{addr}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.chunkGetHandler), "GET": http.HandlerFunc(s.chunkGetHandler),
}) })
handle(router, "/soc/{owner}/{id}", jsonhttp.MethodHandler{ handle("/soc/{owner}/{id}", jsonhttp.MethodHandler{
"POST": web.ChainHandlers( "POST": web.ChainHandlers(
jsonhttp.NewMaxBodyBytesHandler(swarm.ChunkWithSpanSize), jsonhttp.NewMaxBodyBytesHandler(swarm.ChunkWithSpanSize),
web.FinalHandlerFunc(s.socUploadHandler), web.FinalHandlerFunc(s.socUploadHandler),
), ),
}) })
handle(router, "/feeds/{owner}/{topic}", jsonhttp.MethodHandler{ handle("/feeds/{owner}/{topic}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.feedGetHandler), "GET": http.HandlerFunc(s.feedGetHandler),
"POST": web.ChainHandlers( "POST": web.ChainHandlers(
jsonhttp.NewMaxBodyBytesHandler(swarm.ChunkWithSpanSize), jsonhttp.NewMaxBodyBytesHandler(swarm.ChunkWithSpanSize),
...@@ -77,25 +82,25 @@ func (s *server) setupRouting() { ...@@ -77,25 +82,25 @@ func (s *server) setupRouting() {
), ),
}) })
handle(router, "/bzz", jsonhttp.MethodHandler{ handle("/bzz", jsonhttp.MethodHandler{
"POST": web.ChainHandlers( "POST": web.ChainHandlers(
s.newTracingHandler("bzz-upload"), s.newTracingHandler("bzz-upload"),
web.FinalHandlerFunc(s.bzzUploadHandler), web.FinalHandlerFunc(s.bzzUploadHandler),
), ),
}) })
handle(router, "/bzz/{address}", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { handle("/bzz/{address}", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
u := r.URL u := r.URL
u.Path += "/" u.Path += "/"
http.Redirect(w, r, u.String(), http.StatusPermanentRedirect) http.Redirect(w, r, u.String(), http.StatusPermanentRedirect)
})) }))
handle(router, "/bzz/{address}/{path:.*}", jsonhttp.MethodHandler{ handle("/bzz/{address}/{path:.*}", jsonhttp.MethodHandler{
"GET": web.ChainHandlers( "GET": web.ChainHandlers(
s.newTracingHandler("bzz-download"), s.newTracingHandler("bzz-download"),
web.FinalHandlerFunc(s.bzzDownloadHandler), web.FinalHandlerFunc(s.bzzDownloadHandler),
), ),
}) })
handle(router, "/pss/send/{topic}/{targets}", web.ChainHandlers( handle("/pss/send/{topic}/{targets}", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler, s.gatewayModeForbidEndpointHandler,
web.FinalHandler(jsonhttp.MethodHandler{ web.FinalHandler(jsonhttp.MethodHandler{
"POST": web.ChainHandlers( "POST": web.ChainHandlers(
...@@ -105,12 +110,12 @@ func (s *server) setupRouting() { ...@@ -105,12 +110,12 @@ func (s *server) setupRouting() {
})), })),
) )
handle(router, "/pss/subscribe/{topic}", web.ChainHandlers( handle("/pss/subscribe/{topic}", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler, s.gatewayModeForbidEndpointHandler,
web.FinalHandlerFunc(s.pssWsHandler), web.FinalHandlerFunc(s.pssWsHandler),
)) ))
handle(router, "/tags", web.ChainHandlers( handle("/tags", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler, s.gatewayModeForbidEndpointHandler,
web.FinalHandler(jsonhttp.MethodHandler{ web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.listTagsHandler), "GET": http.HandlerFunc(s.listTagsHandler),
...@@ -120,7 +125,7 @@ func (s *server) setupRouting() { ...@@ -120,7 +125,7 @@ func (s *server) setupRouting() {
), ),
})), })),
) )
handle(router, "/tags/{id}", web.ChainHandlers( handle("/tags/{id}", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler, s.gatewayModeForbidEndpointHandler,
web.FinalHandler(jsonhttp.MethodHandler{ web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.getTagHandler), "GET": http.HandlerFunc(s.getTagHandler),
...@@ -132,46 +137,18 @@ func (s *server) setupRouting() { ...@@ -132,46 +137,18 @@ func (s *server) setupRouting() {
})), })),
) )
handle(router, "/pin/chunks/{address}", web.ChainHandlers( handle("/pins", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler, s.gatewayModeForbidEndpointHandler,
web.FinalHandler(jsonhttp.MethodHandler{ web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.getPinnedChunk), "GET": http.HandlerFunc(s.listPinnedRootHashes),
"POST": http.HandlerFunc(s.pinChunk),
"DELETE": http.HandlerFunc(s.unpinChunk),
"PUT": web.ChainHandlers(
jsonhttp.NewMaxBodyBytesHandler(1024),
web.FinalHandlerFunc(s.updatePinnedChunkPinCounter),
),
})), })),
) )
handle(router, "/pin/chunks", web.ChainHandlers( handle("/pins/{address}", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler,
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.listPinnedChunks),
})),
)
handle(router, "/pin/bytes/{address}", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler,
web.FinalHandler(jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.pinBytes),
"DELETE": http.HandlerFunc(s.unpinBytes),
})),
)
handle(router, "/pin/files/{address}", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler,
web.FinalHandler(jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.pinFile),
"DELETE": http.HandlerFunc(s.unpinFile),
})),
)
handle(router, "/pin/bzz/{address}", web.ChainHandlers(
s.gatewayModeForbidEndpointHandler, s.gatewayModeForbidEndpointHandler,
web.FinalHandler(jsonhttp.MethodHandler{ web.FinalHandler(jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.pinBzz), "GET": http.HandlerFunc(s.getPinnedRootHash),
"DELETE": http.HandlerFunc(s.unpinBzz), "POST": http.HandlerFunc(s.pinRootHash),
"DELETE": http.HandlerFunc(s.unpinRootHash),
})), })),
) )
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"errors" "errors"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strings"
"github.com/ethersphere/bee/pkg/cac" "github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/jsonhttp" "github.com/ethersphere/bee/pkg/jsonhttp"
...@@ -135,5 +136,14 @@ func (s *server) socUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -135,5 +136,14 @@ func (s *server) socUploadHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
if strings.ToLower(r.Header.Get(SwarmPinHeader)) == "true" {
if err := s.pinning.CreatePin(ctx, sch.Address(), false); err != nil {
s.logger.Debugf("soc upload: creation of pin for %q failed: %v", sch.Address(), err)
s.logger.Error("soc upload: creation of pin failed")
jsonhttp.InternalServerError(w, nil)
return
}
}
jsonhttp.Created(w, chunkAddressResponse{Reference: sch.Address()}) jsonhttp.Created(w, chunkAddressResponse{Reference: sch.Address()})
} }
...@@ -346,6 +346,6 @@ func newMetrics() metrics { ...@@ -346,6 +346,6 @@ func newMetrics() metrics {
} }
} }
func (s *DB) Metrics() []prometheus.Collector { func (db *DB) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(s.metrics) return m.PrometheusCollectorsFromFields(db.metrics)
} }
...@@ -50,7 +50,7 @@ func (db *DB) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) ...@@ -50,7 +50,7 @@ func (db *DB) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address)
} }
return nil, err return nil, err
} }
return swarm.NewChunk(swarm.NewAddress(out.Address), out.Data).WithPinCounter(out.PinCounter), nil return swarm.NewChunk(swarm.NewAddress(out.Address), out.Data), nil
} }
// get returns Item from the retrieval index // get returns Item from the retrieval index
......
...@@ -50,7 +50,7 @@ func (db *DB) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm ...@@ -50,7 +50,7 @@ func (db *DB) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm
} }
chunks = make([]swarm.Chunk, len(out)) chunks = make([]swarm.Chunk, len(out))
for i, ch := range out { for i, ch := range out {
chunks[i] = swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data).WithPinCounter(ch.PinCounter) chunks[i] = swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data)
} }
return chunks, nil return chunks, nil
} }
......
...@@ -53,7 +53,7 @@ func TestModeGetMulti(t *testing.T) { ...@@ -53,7 +53,7 @@ func TestModeGetMulti(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
chunks[i] = ch.WithPinCounter(1) chunks[i] = ch
} }
} }
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package localstore
import (
"context"
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)
const (
maxPage = 1000 // hard limit of page size
)
// PinnedChunks
func (db *DB) PinnedChunks(ctx context.Context, offset, limit int) (chunks []*storage.Pinner, err error) {
if limit > maxPage {
limit = maxPage
}
c, err := db.pinIndex.Count()
if err != nil {
return nil, fmt.Errorf("list pinned chunks: %w", err)
}
// send empty response if there is nothing pinned
if c == 0 {
return nil, nil
}
err = db.pinIndex.Iterate(func(item shed.Item) (stop bool, err error) {
if offset > 0 {
offset--
return false, nil
}
chunks = append(chunks,
&storage.Pinner{
Address: swarm.NewAddress(item.Address),
PinCounter: item.PinCounter,
})
limit--
if limit == 0 {
return true, nil
}
return false, nil
}, nil)
return chunks, err
}
// PinCounter returns the pin counter for a given swarm address, provided that the
// address has been pinned.
func (db *DB) PinCounter(address swarm.Address) (uint64, error) {
out, err := db.pinIndex.Get(shed.Item{
Address: address.Bytes(),
})
if err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
return 0, storage.ErrNotFound
}
return 0, err
}
return out.PinCounter, nil
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package localstore
import (
"context"
"errors"
"sort"
"testing"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
func TestPinning(t *testing.T) {
chunks := generateTestRandomChunks(21)
addresses := chunksToSortedStrings(chunks)
db := newTestDB(t, nil)
_, err := db.PinnedChunks(context.Background(), 0, 10)
// error should be nil
if err != nil {
t.Fatal(err)
}
// chunk must be present
_, err = db.Put(context.Background(), storage.ModePutUpload, chunks...)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetPin, chunkAddresses(chunks)...)
if err != nil {
t.Fatal(err)
}
pinnedChunks, err := db.PinnedChunks(context.Background(), 0, 30)
if err != nil {
t.Fatal(err)
}
if len(pinnedChunks) != len(chunks) {
t.Fatalf("want %d pins but got %d", len(chunks), len(pinnedChunks))
}
// Check if they are sorted
for i, addr := range pinnedChunks {
if addresses[i] != addr.Address.String() {
t.Fatal("error in getting sorted address")
}
}
}
func TestPinCounter(t *testing.T) {
chunk := generateTestRandomChunk()
db := newTestDB(t, nil)
// chunk must be present
_, err := db.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
// pin once
err = db.Set(context.Background(), storage.ModeSetPin, swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
pinCounter, err := db.PinCounter(swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
if pinCounter != 1 {
t.Fatalf("want pin counter %d but got %d", 1, pinCounter)
}
// pin twice
err = db.Set(context.Background(), storage.ModeSetPin, swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
pinCounter, err = db.PinCounter(swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
if pinCounter != 2 {
t.Fatalf("want pin counter %d but got %d", 2, pinCounter)
}
err = db.Set(context.Background(), storage.ModeSetUnpin, swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
_, err = db.PinCounter(swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
t.Fatal(err)
}
}
}
func TestPaging(t *testing.T) {
chunks := generateTestRandomChunks(10)
addresses := chunksToSortedStrings(chunks)
db := newTestDB(t, nil)
// chunk must be present
_, err := db.Put(context.Background(), storage.ModePutUpload, chunks...)
if err != nil {
t.Fatal(err)
}
// pin once
err = db.Set(context.Background(), storage.ModeSetPin, chunkAddresses(chunks)...)
if err != nil {
t.Fatal(err)
}
pinnedChunks, err := db.PinnedChunks(context.Background(), 0, 5)
if err != nil {
t.Fatal(err)
}
if len(pinnedChunks) != 5 {
t.Fatalf("want %d pins but got %d", 5, len(pinnedChunks))
}
// Check if they are sorted
for i, addr := range pinnedChunks {
if addresses[i] != addr.Address.String() {
t.Fatal("error in getting sorted address")
}
}
pinnedChunks, err = db.PinnedChunks(context.Background(), 5, 5)
if err != nil {
t.Fatal(err)
}
if len(pinnedChunks) != 5 {
t.Fatalf("want %d pins but got %d", 5, len(pinnedChunks))
}
// Check if they are sorted
for i, addr := range pinnedChunks {
if addresses[5+i] != addr.Address.String() {
t.Fatal("error in getting sorted address")
}
}
}
func chunksToSortedStrings(chunks []swarm.Chunk) []string {
var addresses []string
for _, c := range chunks {
addresses = append(addresses, c.Address().String())
}
sort.Strings(addresses)
return addresses
}
...@@ -76,7 +76,7 @@ func NewDefaultManifest( ...@@ -76,7 +76,7 @@ func NewDefaultManifest(
return NewManifest(DefaultManifestType, ls, encrypted) return NewManifest(DefaultManifestType, ls, encrypted)
} }
// NewDefaultManifest creates a new manifest with default type. // NewDefaultManifestReference creates a new manifest with default type.
func NewDefaultManifestReference( func NewDefaultManifestReference(
reference swarm.Address, reference swarm.Address,
ls file.LoadSaver, ls file.LoadSaver,
......
...@@ -75,12 +75,12 @@ func initVersion(hash string, bytes *[]byte) { ...@@ -75,12 +75,12 @@ func initVersion(hash string, bytes *[]byte) {
} }
var ( var (
// ErrTooShort too short input // ErrTooShort signals too short input.
ErrTooShort = errors.New("serialised input too short") ErrTooShort = errors.New("serialised input too short")
// ErrInvalid input to seralise invalid // ErrInvalidInput signals invalid input to serialise.
ErrInvalid = errors.New("input invalid") ErrInvalidInput = errors.New("input invalid")
// ErrForkIvalid shows embedded node on a fork has no reference // ErrInvalidVersionHash signals unknown version of hash.
ErrForkIvalid = errors.New("fork node without reference") ErrInvalidVersionHash = errors.New("invalid version hash")
) )
var obfuscationKeyFn = rand.Read var obfuscationKeyFn = rand.Read
...@@ -96,7 +96,7 @@ func SetObfuscationKeyFn(fn func([]byte) (int, error)) { ...@@ -96,7 +96,7 @@ func SetObfuscationKeyFn(fn func([]byte) (int, error)) {
// MarshalBinary serialises the node // MarshalBinary serialises the node
func (n *Node) MarshalBinary() (bytes []byte, err error) { func (n *Node) MarshalBinary() (bytes []byte, err error) {
if n.forks == nil { if n.forks == nil {
return nil, ErrInvalid return nil, ErrInvalidInput
} }
// header // header
...@@ -323,7 +323,7 @@ func (n *Node) UnmarshalBinary(data []byte) error { ...@@ -323,7 +323,7 @@ func (n *Node) UnmarshalBinary(data []byte) error {
}) })
} }
return fmt.Errorf("invalid version hash %x", versionHash) return fmt.Errorf("%x: %w", versionHash, ErrInvalidVersionHash)
} }
func (f *fork) fromBytes(b []byte) error { func (f *fork) fromBytes(b []byte) error {
......
...@@ -39,6 +39,8 @@ func New(s storage.Storer, rcb recovery.Callback, r retrieval.Interface, logger ...@@ -39,6 +39,8 @@ func New(s storage.Storer, rcb recovery.Callback, r retrieval.Interface, logger
// Get retrieves a given chunk address. // Get retrieves a given chunk address.
// It will request a chunk from the network whenever it cannot be found locally. // It will request a chunk from the network whenever it cannot be found locally.
// If the network path is taken, the method also stores the found chunk into the
// local-store.
func (s *store) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (ch swarm.Chunk, err error) { func (s *store) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (ch swarm.Chunk, err error) {
ch, err = s.Storer.Get(ctx, mode, addr) ch, err = s.Storer.Get(ctx, mode, addr)
if err != nil { if err != nil {
...@@ -54,7 +56,12 @@ func (s *store) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Addres ...@@ -54,7 +56,12 @@ func (s *store) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Addres
return nil, ErrRecoveryAttempt return nil, ErrRecoveryAttempt
} }
_, err = s.Storer.Put(ctx, storage.ModePutRequest, ch) putMode := storage.ModePutRequest
if mode == storage.ModeGetRequestPin {
putMode = storage.ModePutRequestPin
}
_, err = s.Storer.Put(ctx, putMode, ch)
if err != nil { if err != nil {
return nil, fmt.Errorf("netstore retrieve put: %w", err) return nil, fmt.Errorf("netstore retrieve put: %w", err)
} }
......
...@@ -35,6 +35,7 @@ import ( ...@@ -35,6 +35,7 @@ import (
"github.com/ethersphere/bee/pkg/netstore" "github.com/ethersphere/bee/pkg/netstore"
"github.com/ethersphere/bee/pkg/p2p/libp2p" "github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/pingpong" "github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/pinning"
"github.com/ethersphere/bee/pkg/pricer" "github.com/ethersphere/bee/pkg/pricer"
"github.com/ethersphere/bee/pkg/pricing" "github.com/ethersphere/bee/pkg/pricing"
"github.com/ethersphere/bee/pkg/pss" "github.com/ethersphere/bee/pkg/pss"
...@@ -429,6 +430,8 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -429,6 +430,8 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
traversalService := traversal.NewService(ns) traversalService := traversal.NewService(ns)
pinningService := pinning.NewService(storer, stateStore, traversalService)
pushSyncProtocol := pushsync.New(swarmAddress, p2ps, storer, kad, tagService, pssService.TryUnwrap, logger, acc, pricer, signer, tracer) pushSyncProtocol := pushsync.New(swarmAddress, p2ps, storer, kad, tagService, pssService.TryUnwrap, logger, acc, pricer, signer, tracer)
// set the pushSyncer in the PSS // set the pushSyncer in the PSS
...@@ -470,7 +473,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -470,7 +473,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
if o.APIAddr != "" { if o.APIAddr != "" {
// API server // API server
feedFactory := factory.New(ns) feedFactory := factory.New(ns)
apiService = api.New(tagService, ns, multiResolver, pssService, traversalService, feedFactory, logger, tracer, api.Options{ apiService = api.New(tagService, ns, multiResolver, pssService, traversalService, pinningService, feedFactory, logger, tracer, api.Options{
CORSAllowedOrigins: o.CORSAllowedOrigins, CORSAllowedOrigins: o.CORSAllowedOrigins,
GatewayMode: o.GatewayMode, GatewayMode: o.GatewayMode,
WsPingPeriod: 60 * time.Second, WsPingPeriod: 60 * time.Second,
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package pinning provides a simple set of
// operations for tracking pinned addresses.
package pinning
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mock
import (
"context"
"github.com/ethersphere/bee/pkg/pinning"
"github.com/ethersphere/bee/pkg/swarm"
)
var _ pinning.Interface = (*ServiceMock)(nil)
// NewServiceMock is a convenient constructor for creating ServiceMock.
func NewServiceMock() *ServiceMock {
return &ServiceMock{index: make(map[string]int)}
}
// ServiceMock represents a simple mock of pinning.Interface.
// The implementation is not goroutine-safe.
type ServiceMock struct {
index map[string]int
entries []swarm.Address
}
// CreatePin implements pinning.Interface CreatePin method.
func (sm *ServiceMock) CreatePin(_ context.Context, addr swarm.Address, _ bool) error {
if _, ok := sm.index[addr.String()]; ok {
return nil
}
sm.index[addr.String()] = len(sm.entries)
sm.entries = append(sm.entries, addr)
return nil
}
// DeletePin implements pinning.Interface DeletePin method.
func (sm *ServiceMock) DeletePin(_ context.Context, addr swarm.Address) error {
i, ok := sm.index[addr.String()]
if !ok {
return nil
}
delete(sm.index, addr.String())
sm.entries = append(sm.entries[:i], sm.entries[i+1:]...)
return nil
}
// HasPin implements pinning.Interface HasPin method.
func (sm *ServiceMock) HasPin(addr swarm.Address) (bool, error) {
_, ok := sm.index[addr.String()]
return ok, nil
}
// Pins implements pinning.Interface Pins method.
func (sm *ServiceMock) Pins() ([]swarm.Address, error) {
return append([]swarm.Address(nil), sm.entries...), nil
}
// Entries returns all pinned entries.
func (sm *ServiceMock) Entries() []swarm.Address {
return sm.entries
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pinning
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/hashicorp/go-multierror"
)
// ErrTraversal signals that errors occurred during nodes traversal.
var ErrTraversal = errors.New("traversal iteration failed")
// Interface defines pinning operations.
type Interface interface {
// CreatePin creates a new pin for the given address.
// The boolean arguments specifies whether all nodes
// in the tree should also be traversed and pinned.
// Repeating calls of this method are idempotent.
CreatePin(context.Context, swarm.Address, bool) error
// DeletePin deletes given address. All the existing
// nodes in the tree will also be traversed and un-pinned.
// Repeating calls of this method are idempotent.
DeletePin(context.Context, swarm.Address) error
// HasPin returns true if the given address has root pin.
HasPin(swarm.Address) (bool, error)
// Pins return all pinned addresses.
Pins() ([]swarm.Address, error)
}
const storePrefix = "root-pin"
func rootPinKey(addr swarm.Address) string {
return fmt.Sprintf("%s-%s", storePrefix, addr)
}
// NewService is a convenient constructor for Service.
func NewService(
pinStorage storage.Storer,
rhStorage storage.StateStorer,
traverser traversal.Traverser,
) *Service {
return &Service{
pinStorage: pinStorage,
rhStorage: rhStorage,
traverser: traverser,
}
}
// Service is implementation of the pinning.Interface.
type Service struct {
pinStorage storage.Storer
rhStorage storage.StateStorer
traverser traversal.Traverser
}
// CreatePin implements Interface.CreatePin method.
func (s *Service) CreatePin(ctx context.Context, addr swarm.Address, traverse bool) error {
// iterFn is a pinning iterator function over the leaves of the root.
iterFn := func(leaf swarm.Address) error {
switch err := s.pinStorage.Set(ctx, storage.ModeSetPin, leaf); {
case errors.Is(err, storage.ErrNotFound):
ch, err := s.pinStorage.Get(ctx, storage.ModeGetRequestPin, leaf)
if err != nil {
return fmt.Errorf("unable to get pin for leaf %q of root %q: %w", leaf, addr, err)
}
_, err = s.pinStorage.Put(ctx, storage.ModePutRequestPin, ch)
if err != nil {
return fmt.Errorf("unable to put pin for leaf %q of root %q: %w", leaf, addr, err)
}
case err != nil:
return fmt.Errorf("unable to set pin for leaf %q of root %q: %w", leaf, addr, err)
}
return nil
}
if traverse {
if err := s.traverser.Traverse(ctx, addr, iterFn); err != nil {
return fmt.Errorf("traversal of %q failed: %w", addr, err)
}
}
key := rootPinKey(addr)
switch err := s.rhStorage.Get(key, new(swarm.Address)); {
case errors.Is(err, storage.ErrNotFound):
return s.rhStorage.Put(key, addr)
case err != nil:
return fmt.Errorf("unable to pin %q: %w", addr, err)
}
return nil
}
// DeletePin implements Interface.DeletePin method.
func (s *Service) DeletePin(ctx context.Context, addr swarm.Address) error {
var iterErr error
// iterFn is a unpinning iterator function over the leaves of the root.
iterFn := func(leaf swarm.Address) error {
err := s.pinStorage.Set(ctx, storage.ModeSetUnpin, leaf)
if err != nil {
iterErr = multierror.Append(err, fmt.Errorf("unable to unpin the chunk for leaf %q of root %q: %w", leaf, addr, err))
// Continue un-pinning all chunks.
}
return nil
}
if err := s.traverser.Traverse(ctx, addr, iterFn); err != nil {
return fmt.Errorf("traversal of %q failed: %w", addr, multierror.Append(err, iterErr))
}
if iterErr != nil {
return multierror.Append(ErrTraversal, iterErr)
}
key := rootPinKey(addr)
if err := s.rhStorage.Delete(key); err != nil {
return fmt.Errorf("unable to delete pin for key %q: %w", key, err)
}
return nil
}
// HasPin implements Interface.HasPin method.
func (s *Service) HasPin(addr swarm.Address) (bool, error) {
key, val := rootPinKey(addr), swarm.NewAddress(nil)
switch err := s.rhStorage.Get(key, &val); {
case errors.Is(err, storage.ErrNotFound):
return false, nil
case err != nil:
return false, fmt.Errorf("unable to get pin for key %q: %w", key, err)
}
return val.Equal(addr), nil
}
// Pins implements Interface.Pins method.
func (s *Service) Pins() ([]swarm.Address, error) {
var addrs []swarm.Address
err := s.rhStorage.Iterate(storePrefix, func(key, val []byte) (stop bool, err error) {
var addr swarm.Address
if err := json.Unmarshal(val, &addr); err != nil {
return true, fmt.Errorf("invalid address value %q: %w", string(val), err)
}
addrs = append(addrs, addr)
return false, nil
})
if err != nil {
return nil, fmt.Errorf("iteration failed: %w", err)
}
return addrs, nil
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pinning
import (
"context"
"strings"
"testing"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
statestorem "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
storagem "github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/traversal"
)
var _ Interface = (*Service)(nil)
func TestPinningService(t *testing.T) {
const content = "Hello, Bee!"
var (
ctx = context.Background()
storerMock = storagem.NewStorer()
service = NewService(
storerMock,
statestorem.NewStateStore(),
traversal.NewService(storerMock),
)
)
pipe := builder.NewPipelineBuilder(ctx, storerMock, storage.ModePutUpload, false)
addr, err := builder.FeedPipeline(ctx, pipe, strings.NewReader(content), int64(len(content)))
if err != nil {
t.Fatal(err)
}
t.Run("create and list", func(t *testing.T) {
if err := service.CreatePin(ctx, addr, false); err != nil {
t.Fatalf("CreatePin(...): unexpected error: %v", err)
}
addrs, err := service.Pins()
if err != nil {
t.Fatalf("Pins(...): unexpected error: %v", err)
}
if have, want := len(addrs), 1; have != want {
t.Fatalf("Pins(...): have %d; want %d", have, want)
}
if have, want := addrs[0], addr; !have.Equal(want) {
t.Fatalf("address mismatch: have %q; want %q", have, want)
}
})
t.Run("create idempotent and list", func(t *testing.T) {
if err := service.CreatePin(ctx, addr, false); err != nil {
t.Fatalf("CreatePin(...): unexpected error: %v", err)
}
addrs, err := service.Pins()
if err != nil {
t.Fatalf("Pins(...): unexpected error: %v", err)
}
if have, want := len(addrs), 1; have != want {
t.Fatalf("Pins(...): have %d; want %d", have, want)
}
if have, want := addrs[0], addr; !have.Equal(want) {
t.Fatalf("address mismatch: have %q; want %q", have, want)
}
})
t.Run("delete and has", func(t *testing.T) {
err := service.DeletePin(ctx, addr)
if err != nil {
t.Fatalf("DeletePin(...): unexpected error: %v", err)
}
has, err := service.HasPin(addr)
if err != nil {
t.Fatalf("HasPin(...): unexpected error: %v", err)
}
if has {
t.Fatalf("HasPin(...): have %t; want %t", has, !has)
}
})
t.Run("delete idempotent and has", func(t *testing.T) {
err := service.DeletePin(ctx, addr)
if err != nil {
t.Fatalf("DeletePin(...): unexpected error: %v", err)
}
has, err := service.HasPin(addr)
if err != nil {
t.Fatalf("HasPin(...): unexpected error: %v", err)
}
if has {
t.Fatalf("HasPin(...): have %t; want %t", has, !has)
}
})
}
...@@ -6,7 +6,6 @@ package mock ...@@ -6,7 +6,6 @@ package mock
import ( import (
"context" "context"
"errors"
"sync" "sync"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
...@@ -290,36 +289,6 @@ func (m *MockStorer) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, s ...@@ -290,36 +289,6 @@ func (m *MockStorer) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, s
panic("not implemented") // TODO: Implement panic("not implemented") // TODO: Implement
} }
func (m *MockStorer) PinnedChunks(ctx context.Context, offset, cursor int) (pinnedChunks []*storage.Pinner, err error) {
m.mtx.Lock()
defer m.mtx.Unlock()
if len(m.pinnedAddress) == 0 {
return pinnedChunks, nil
}
for i, addr := range m.pinnedAddress {
pi := &storage.Pinner{
Address: swarm.NewAddress(addr.Bytes()),
PinCounter: m.pinnedCounter[i],
}
pinnedChunks = append(pinnedChunks, pi)
}
if pinnedChunks == nil {
return pinnedChunks, errors.New("pin chunks: leveldb: not found")
}
return pinnedChunks, nil
}
func (m *MockStorer) PinCounter(address swarm.Address) (uint64, error) {
m.mtx.Lock()
defer m.mtx.Unlock()
for i, addr := range m.pinnedAddress {
if addr.String() == address.String() {
return m.pinnedCounter[i], nil
}
}
return 0, storage.ErrNotFound
}
func (m *MockStorer) Close() error { func (m *MockStorer) Close() error {
close(m.quit) close(m.quit)
return nil return nil
......
...@@ -34,6 +34,8 @@ func (m ModeGet) String() string { ...@@ -34,6 +34,8 @@ func (m ModeGet) String() string {
return "Lookup" return "Lookup"
case ModeGetPin: case ModeGetPin:
return "PinLookup" return "PinLookup"
case ModeGetRequestPin:
return "RequestPin"
default: default:
return "Unknown" return "Unknown"
} }
...@@ -49,6 +51,8 @@ const ( ...@@ -49,6 +51,8 @@ const (
ModeGetLookup ModeGetLookup
// ModeGetPin: used when a pinned chunk is accessed // ModeGetPin: used when a pinned chunk is accessed
ModeGetPin ModeGetPin
// ModeGetRequestPin represents request for retrieval of pinned chunk.
ModeGetRequestPin
) )
// ModePut enumerates different Putter modes. // ModePut enumerates different Putter modes.
...@@ -120,12 +124,6 @@ type Descriptor struct { ...@@ -120,12 +124,6 @@ type Descriptor struct {
BinID uint64 BinID uint64
} }
// Pinner holds the required information for pinning
type Pinner struct {
Address swarm.Address
PinCounter uint64
}
func (d *Descriptor) String() string { func (d *Descriptor) String() string {
if d == nil { if d == nil {
return "" return ""
...@@ -142,8 +140,6 @@ type Storer interface { ...@@ -142,8 +140,6 @@ type Storer interface {
LastPullSubscriptionBinID(bin uint8) (id uint64, err error) LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
PullSubscriber PullSubscriber
SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop func()) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop func())
PinnedChunks(ctx context.Context, offset, limit int) (pinnedChunks []*Pinner, err error)
PinCounter(address swarm.Address) (uint64, error)
io.Closer io.Closer
} }
......
...@@ -127,18 +127,15 @@ type AddressIterFunc func(address Address) error ...@@ -127,18 +127,15 @@ type AddressIterFunc func(address Address) error
type Chunk interface { type Chunk interface {
Address() Address Address() Address
Data() []byte Data() []byte
PinCounter() uint64
WithPinCounter(p uint64) Chunk
TagID() uint32 TagID() uint32
WithTagID(t uint32) Chunk WithTagID(t uint32) Chunk
Equal(Chunk) bool Equal(Chunk) bool
} }
type chunk struct { type chunk struct {
addr Address addr Address
sdata []byte sdata []byte
pinCounter uint64 tagID uint32
tagID uint32
} }
func NewChunk(addr Address, data []byte) Chunk { func NewChunk(addr Address, data []byte) Chunk {
...@@ -148,11 +145,6 @@ func NewChunk(addr Address, data []byte) Chunk { ...@@ -148,11 +145,6 @@ func NewChunk(addr Address, data []byte) Chunk {
} }
} }
func (c *chunk) WithPinCounter(p uint64) Chunk {
c.pinCounter = p
return c
}
func (c *chunk) WithTagID(t uint32) Chunk { func (c *chunk) WithTagID(t uint32) Chunk {
c.tagID = t c.tagID = t
return c return c
...@@ -166,10 +158,6 @@ func (c *chunk) Data() []byte { ...@@ -166,10 +158,6 @@ func (c *chunk) Data() []byte {
return c.sdata return c.sdata
} }
func (c *chunk) PinCounter() uint64 {
return c.pinCounter
}
func (c *chunk) TagID() uint32 { func (c *chunk) TagID() uint32 {
return c.tagID return c.tagID
} }
......
...@@ -16,127 +16,63 @@ import ( ...@@ -16,127 +16,63 @@ import (
"github.com/ethersphere/bee/pkg/file/joiner" "github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/loadsave" "github.com/ethersphere/bee/pkg/file/loadsave"
"github.com/ethersphere/bee/pkg/manifest" "github.com/ethersphere/bee/pkg/manifest"
"github.com/ethersphere/bee/pkg/manifest/mantaray"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
) )
var ( // Traverser represents service which traverse through address dependent chunks.
// ErrInvalidType is returned when the reference was not expected type. type Traverser interface {
ErrInvalidType = errors.New("traversal: invalid type") // Traverse iterates through each address related to the supplied one, if possible.
) Traverse(context.Context, swarm.Address, swarm.AddressIterFunc) error
// Service is the service to find dependent chunks for an address.
type Service interface {
// TraverseAddresses iterates through each address related to the supplied
// one, if possible.
TraverseAddresses(context.Context, swarm.Address, swarm.AddressIterFunc) error
// TraverseBytesAddresses iterates through each address of a bytes.
TraverseBytesAddresses(context.Context, swarm.Address, swarm.AddressIterFunc) error
// TraverseManifestAddresses iterates through each address of a manifest,
// as well as each entry found in it.
TraverseManifestAddresses(context.Context, swarm.Address, swarm.AddressIterFunc) error
}
type traversalService struct {
storer storage.Storer
}
func NewService(storer storage.Storer) Service {
return &traversalService{
storer: storer,
}
} }
func (s *traversalService) TraverseAddresses( // NewService is a convenient constructor for Service.
ctx context.Context, func NewService(store storage.Storer) *Service {
reference swarm.Address, return &Service{store: store}
chunkAddressFunc swarm.AddressIterFunc,
) error {
isManifest, m, err := s.checkIsManifest(ctx, reference)
if err != nil {
return err
}
if isManifest {
return m.IterateAddresses(ctx, func(manifestNodeAddr swarm.Address) error {
return s.processBytes(ctx, manifestNodeAddr, chunkAddressFunc)
})
}
return s.processBytes(ctx, reference, chunkAddressFunc)
} }
func (s *traversalService) TraverseBytesAddresses( // Service is implementation of Interface using storage.Storer as its storage.
ctx context.Context, type Service struct {
reference swarm.Address, store storage.Storer
chunkAddressFunc swarm.AddressIterFunc,
) error {
return s.processBytes(ctx, reference, chunkAddressFunc)
} }
func (s *traversalService) TraverseManifestAddresses( // Traverse implements Traverser.Traverse method.
ctx context.Context, func (s *Service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm.AddressIterFunc) error {
reference swarm.Address, processBytes := func(ref swarm.Address) error {
chunkAddressFunc swarm.AddressIterFunc, j, _, err := joiner.New(ctx, s.store, ref)
) error { if err != nil {
return fmt.Errorf("traversal: joiner error on %q: %w", ref, err)
isManifest, m, err := s.checkIsManifest(ctx, reference) }
if err != nil { err = j.IterateChunkAddresses(iterFn)
return err if err != nil {
} return fmt.Errorf("traversal: iterate chunk address error for %q: %w", ref, err)
if !isManifest {
return ErrInvalidType
}
err = m.IterateAddresses(ctx, func(manifestNodeAddr swarm.Address) error {
return s.processBytes(ctx, manifestNodeAddr, chunkAddressFunc)
})
if err != nil {
return fmt.Errorf("traversal: iterate chunks: %s: %w", reference, err)
}
return nil
}
// checkIsManifest checks if the content is manifest.
func (s *traversalService) checkIsManifest(
ctx context.Context,
reference swarm.Address,
) (isManifest bool, m manifest.Interface, err error) {
// NOTE: 'encrypted' parameter only used for saving manifest
m, err = manifest.NewDefaultManifestReference(
reference,
loadsave.New(s.storer, storage.ModePutRequest, false),
)
if err != nil {
if err == manifest.ErrInvalidManifestType {
// ignore
err = nil
return
} }
err = fmt.Errorf("traversal: read manifest: %s: %w", reference, err) return nil
return
} }
isManifest = true
return
}
func (s *traversalService) processBytes( ls := loadsave.New(s.store, storage.ModePutRequest, false)
ctx context.Context, switch mf, err := manifest.NewDefaultManifestReference(addr, ls); {
reference swarm.Address, case errors.Is(err, manifest.ErrInvalidManifestType):
chunkAddressFunc swarm.AddressIterFunc, break
) error { case err != nil:
j, _, err := joiner.New(ctx, s.storer, reference) return fmt.Errorf("traversal: unable to create manifest reference for %q: %w", addr, err)
if err != nil { default:
return fmt.Errorf("traversal: joiner: %s: %w", reference, err) err := mf.IterateAddresses(ctx, processBytes)
if errors.Is(err, mantaray.ErrTooShort) || errors.Is(err, mantaray.ErrInvalidVersionHash) {
// Based on the returned errors we conclude that it might
// not be a manifest, so we try non-manifest processing.
break
}
if err != nil {
return fmt.Errorf("traversal: unable to process bytes for %q: %w", addr, err)
}
return nil
} }
err = j.IterateChunkAddresses(chunkAddressFunc) // Non-manifest processing.
if err != nil { if err := processBytes(addr); err != nil {
return fmt.Errorf("traversal: iterate chunks: %s: %w", reference, err) return fmt.Errorf("traversal: unable to process bytes for %q: %w", addr, err)
} }
return nil return nil
} }
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment