Commit 0b337f6b authored by acud's avatar acud Committed by GitHub

api, manifest: abstract saver/loader functionality (#939)

parent d768c0fe
......@@ -10,7 +10,7 @@ require (
github.com/ethereum/go-ethereum v1.9.20
github.com/ethersphere/bmt v0.1.4
github.com/ethersphere/langos v1.0.0
github.com/ethersphere/manifest v0.3.3
github.com/ethersphere/manifest v0.3.5
github.com/ethersphere/sw3-bindings/v2 v2.1.0
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
......@@ -61,7 +61,7 @@ require (
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/mod v0.3.0 // indirect
golang.org/x/net v0.0.0-20200707034311-ab3426394381
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
golang.org/x/text v0.3.3 // indirect
golang.org/x/tools v0.0.0-20200626171337-aa94e735be7f // indirect
......
......@@ -167,8 +167,8 @@ github.com/ethersphere/bmt v0.1.4 h1:+rkWYNtMgDx6bkNqGdWu+U9DgGI1rRZplpSW3YhBr1Q
github.com/ethersphere/bmt v0.1.4/go.mod h1:Yd8ft1U69WDuHevZc/rwPxUv1rzPSMpMnS6xbU53aY8=
github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc=
github.com/ethersphere/langos v1.0.0/go.mod h1:dlcN2j4O8sQ+BlCaxeBu43bgr4RQ+inJ+pHwLeZg5Tw=
github.com/ethersphere/manifest v0.3.3 h1:Fc4nE1c28v9j2IOGHdpaU7DQLjDWSJxXjCHL0Vl/9pQ=
github.com/ethersphere/manifest v0.3.3/go.mod h1:ygAx0KLhXYmKqsjUab95RCbXf8UcO7yMDjyfP0lY76Y=
github.com/ethersphere/manifest v0.3.5 h1:/UMN4X4eKyTCARS9dv2HqqdFCJI2Emu09tivYsp5FZM=
github.com/ethersphere/manifest v0.3.5/go.mod h1:frSxQFT67hQvmTN5CBtgVuqHzGQpg0V0oIIm/B3Am+U=
github.com/ethersphere/sw3-bindings/v2 v2.1.0 h1:QefDtzU94UelICMPXWr7m52E2oj6r018Yc0XLoCWOxw=
github.com/ethersphere/sw3-bindings/v2 v2.1.0/go.mod h1:ozMVBZZlAirS/FcUpFwzV60v8gC0nVbA/5ZXtCX3xCc=
github.com/fatih/color v1.3.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
......@@ -1152,8 +1152,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
......
......@@ -5,6 +5,7 @@
package api
import (
"context"
"errors"
"fmt"
"io"
......@@ -15,6 +16,7 @@ import (
"time"
"unicode/utf8"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/logging"
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/pss"
......@@ -195,6 +197,13 @@ func (s *server) newTracingHandler(spanName string) func(h http.Handler) http.Ha
}
}
func lookaheadBufferSize(size int64) int {
if size <= largeBufferFilesizeThreshold {
return smallFileBufferSize
}
return largeFileBufferSize
}
// checkOrigin returns true if the origin is not set or is equal to the request host.
func (s *server) checkOrigin(r *http.Request) bool {
origin := r.Header["Origin"]
......@@ -215,13 +224,6 @@ func (s *server) checkOrigin(r *http.Request) bool {
return false
}
func lookaheadBufferSize(size int64) int {
if size <= largeBufferFilesizeThreshold {
return smallFileBufferSize
}
return largeFileBufferSize
}
// equalASCIIFold returns true if s is equal to t with ASCII case folding as
// defined in RFC 4790.
func equalASCIIFold(s, t string) bool {
......@@ -245,3 +247,13 @@ func equalASCIIFold(s, t string) bool {
}
return s == t
}
type pipelineFunc func(context.Context, io.Reader, int64) (swarm.Address, error)
func requestPipelineFn(s storage.Storer, r *http.Request) pipelineFunc {
mode, encrypt := requestModePut(r), requestEncrypt(r)
return func(ctx context.Context, r io.Reader, l int64) (swarm.Address, error) {
pipe := builder.NewPipelineBuilder(ctx, s, mode, encrypt)
return builder.FeedPipeline(ctx, pipe, r, l)
}
}
......@@ -6,6 +6,7 @@ package api
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
......@@ -18,9 +19,11 @@ import (
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/loadsave"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/manifest"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tracing"
)
......@@ -49,9 +52,6 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
return
}
// this is a hack and is needed because encryption is coupled into manifests
toDecrypt := len(address.Bytes()) == 64
// read manifest entry
j, _, err := joiner.New(ctx, s.Storer, address)
if err != nil {
......@@ -107,11 +107,9 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
// we are expecting manifest Mime type here
m, err := manifest.NewManifestReference(
ctx,
manifestMetadata.MimeType,
e.Reference(),
toDecrypt,
s.Storer,
loadsave.New(s.Storer, storage.ModePutRequest, false), // mode and encryption values are fallback
)
if err != nil {
logger.Debugf("bzz download: not manifest %s: %v", address, err)
......@@ -123,9 +121,9 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
if pathVar == "" {
logger.Tracef("bzz download: handle empty path %s", address)
if indexDocumentSuffixKey, ok := manifestMetadataLoad(m, manifestRootPath, manifestWebsiteIndexDocumentSuffixKey); ok {
if indexDocumentSuffixKey, ok := manifestMetadataLoad(ctx, m, manifestRootPath, manifestWebsiteIndexDocumentSuffixKey); ok {
pathWithIndex := path.Join(pathVar, indexDocumentSuffixKey)
indexDocumentManifestEntry, err := m.Lookup(pathWithIndex)
indexDocumentManifestEntry, err := m.Lookup(ctx, pathWithIndex)
if err == nil {
// index document exists
logger.Debugf("bzz download: serving path: %s", pathWithIndex)
......@@ -136,7 +134,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
}
}
me, err := m.Lookup(pathVar)
me, err := m.Lookup(ctx, pathVar)
if err != nil {
logger.Debugf("bzz download: invalid path %s/%s: %v", address, pathVar, err)
logger.Error("bzz download: invalid path")
......@@ -146,7 +144,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(pathVar, "/") {
// check for directory
dirPath := pathVar + "/"
exists, err := m.HasPrefix(dirPath)
exists, err := m.HasPrefix(ctx, dirPath)
if err == nil && exists {
// redirect to directory
u := r.URL
......@@ -161,11 +159,11 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
}
// check index suffix path
if indexDocumentSuffixKey, ok := manifestMetadataLoad(m, manifestRootPath, manifestWebsiteIndexDocumentSuffixKey); ok {
if indexDocumentSuffixKey, ok := manifestMetadataLoad(ctx, m, manifestRootPath, manifestWebsiteIndexDocumentSuffixKey); ok {
if !strings.HasSuffix(pathVar, indexDocumentSuffixKey) {
// check if path is directory with index
pathWithIndex := path.Join(pathVar, indexDocumentSuffixKey)
indexDocumentManifestEntry, err := m.Lookup(pathWithIndex)
indexDocumentManifestEntry, err := m.Lookup(ctx, pathWithIndex)
if err == nil {
// index document exists
logger.Debugf("bzz download: serving path: %s", pathWithIndex)
......@@ -177,9 +175,9 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
}
// check if error document is to be shown
if errorDocumentPath, ok := manifestMetadataLoad(m, manifestRootPath, manifestWebsiteErrorDocumentPathKey); ok {
if errorDocumentPath, ok := manifestMetadataLoad(ctx, m, manifestRootPath, manifestWebsiteErrorDocumentPathKey); ok {
if pathVar != errorDocumentPath {
errorDocumentManifestEntry, err := m.Lookup(errorDocumentPath)
errorDocumentManifestEntry, err := m.Lookup(ctx, errorDocumentPath)
if err == nil {
// error document exists
logger.Debugf("bzz download: serving path: %s", errorDocumentPath)
......@@ -272,8 +270,8 @@ func (s *server) serveManifestEntry(w http.ResponseWriter, r *http.Request, addr
// manifestMetadataLoad returns the value for a key stored in the metadata of
// manifest path, or empty string if no value is present.
// The ok result indicates whether value was found in the metadata.
func manifestMetadataLoad(manifest manifest.Interface, path, metadataKey string) (string, bool) {
me, err := manifest.Lookup(path)
func manifestMetadataLoad(ctx context.Context, manifest manifest.Interface, path, metadataKey string) (string, bool) {
me, err := manifest.Lookup(ctx, path)
if err != nil {
return "", false
}
......
......@@ -17,6 +17,7 @@ import (
"testing"
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file/loadsave"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
......@@ -98,19 +99,19 @@ func TestBzz(t *testing.T) {
}
// save manifest
m, err := manifest.NewDefaultManifest(false, storer)
m, err := manifest.NewDefaultManifest(loadsave.New(storer, storage.ModePutRequest, false))
if err != nil {
t.Fatal(err)
}
e := manifest.NewEntry(fileReference, nil)
err = m.Add(filePath, e)
err = m.Add(ctx, filePath, e)
if err != nil {
t.Fatal(err)
}
manifestBytesReference, err := m.Store(context.Background(), storage.ModePutUpload)
manifestBytesReference, err := m.Store(ctx)
if err != nil {
t.Fatal(err)
}
......
......@@ -18,12 +18,12 @@ import (
"strings"
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/loadsave"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/manifest"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tracing"
)
......@@ -60,8 +60,9 @@ func (s *server) dirUploadHandler(w http.ResponseWriter, r *http.Request) {
// Add the tag to the context
ctx := sctx.SetTag(r.Context(), tag)
reference, err := storeDir(ctx, r.Body, s.Storer, requestModePut(r), s.Logger, requestEncrypt(r), r.Header.Get(SwarmIndexDocumentHeader), r.Header.Get(SwarmErrorDocumentHeader))
p := requestPipelineFn(s.Storer, r)
l := loadsave.New(s.Storer, requestModePut(r), requestEncrypt(r))
reference, err := storeDir(ctx, r.Body, s.Logger, p, l, r.Header.Get(SwarmIndexDocumentHeader), r.Header.Get(SwarmErrorDocumentHeader))
if err != nil {
logger.Debugf("dir upload: store dir err: %v", err)
logger.Errorf("dir upload: store dir")
......@@ -101,10 +102,10 @@ func validateRequest(r *http.Request) error {
// storeDir stores all files recursively contained in the directory given as a tar
// it returns the hash for the uploaded manifest corresponding to the uploaded dir
func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode storage.ModePut, log logging.Logger, encrypt bool, indexFilename string, errorFilename string) (swarm.Address, error) {
func storeDir(ctx context.Context, reader io.ReadCloser, log logging.Logger, p pipelineFunc, ls file.LoadSaver, indexFilename string, errorFilename string) (swarm.Address, error) {
logger := tracing.NewLoggerWithTraceID(ctx, log)
dirManifest, err := manifest.NewDefaultManifest(encrypt, s)
dirManifest, err := manifest.NewDefaultManifest(ls)
if err != nil {
return swarm.ZeroAddress, err
}
......@@ -146,14 +147,14 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode
contentType: contentType,
reader: tarReader,
}
fileReference, err := storeFile(ctx, fileInfo, s, mode, encrypt)
fileReference, err := storeFile(ctx, fileInfo, p)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("store dir file: %w", err)
}
logger.Tracef("uploaded dir file %v with reference %v", filePath, fileReference)
// add file entry to dir manifest
err = dirManifest.Add(filePath, manifest.NewEntry(fileReference, nil))
err = dirManifest.Add(ctx, filePath, manifest.NewEntry(fileReference, nil))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("add to manifest: %w", err)
}
......@@ -176,14 +177,14 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode
metadata[manifestWebsiteErrorDocumentPathKey] = errorFilename
}
rootManifestEntry := manifest.NewEntry(swarm.ZeroAddress, metadata)
err = dirManifest.Add(manifestRootPath, rootManifestEntry)
err = dirManifest.Add(ctx, manifestRootPath, rootManifestEntry)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("add to manifest: %w", err)
}
}
// save manifest
manifestBytesReference, err := dirManifest.Store(ctx, mode)
manifestBytesReference, err := dirManifest.Store(ctx)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("store manifest: %w", err)
}
......@@ -196,8 +197,7 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode
return swarm.ZeroAddress, fmt.Errorf("metadata marshal: %w", err)
}
pipe := builder.NewPipelineBuilder(ctx, s, mode, encrypt)
mr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
mr, err := p(ctx, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split metadata: %w", err)
}
......@@ -209,8 +209,7 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode
return swarm.ZeroAddress, fmt.Errorf("entry marshal: %w", err)
}
pipe = builder.NewPipelineBuilder(ctx, s, mode, encrypt)
manifestFileReference, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
manifestFileReference, err := p(ctx, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split entry: %w", err)
}
......@@ -220,10 +219,9 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode
// storeFile uploads the given file and returns its reference
// this function was extracted from `fileUploadHandler` and should eventually replace its current code
func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer, mode storage.ModePut, encrypt bool) (swarm.Address, error) {
func storeFile(ctx context.Context, fileInfo *fileUploadInfo, p pipelineFunc) (swarm.Address, error) {
// first store the file and get its reference
pipe := builder.NewPipelineBuilder(ctx, s, mode, encrypt)
fr, err := builder.FeedPipeline(ctx, pipe, fileInfo.reader, fileInfo.size)
fr, err := p(ctx, fileInfo.reader, fileInfo.size)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split file: %w", err)
}
......@@ -241,8 +239,7 @@ func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer,
return swarm.ZeroAddress, fmt.Errorf("metadata marshal: %w", err)
}
pipe = builder.NewPipelineBuilder(ctx, s, mode, encrypt)
mr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
mr, err := p(ctx, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split metadata: %w", err)
}
......@@ -253,11 +250,10 @@ func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer,
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("entry marshal: %w", err)
}
pipe = builder.NewPipelineBuilder(ctx, s, mode, encrypt)
reference, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
ref, err := p(ctx, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split entry: %w", err)
}
return reference, nil
return ref, nil
}
......@@ -19,11 +19,13 @@ import (
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/loadsave"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/manifest"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
......@@ -34,6 +36,7 @@ func TestDirs(t *testing.T) {
dirUploadResource = "/dirs"
fileDownloadResource = func(addr string) string { return "/files/" + addr }
bzzDownloadResource = func(addr, path string) string { return "/bzz/" + addr + "/" + path }
ctx = context.Background()
storer = mock.NewStorer()
mockStatestore = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0)
......@@ -279,11 +282,9 @@ func TestDirs(t *testing.T) {
// verify manifest content
verifyManifest, err := manifest.NewManifestReference(
context.Background(),
manifest.DefaultManifestType,
e.Reference(),
false,
storer,
loadsave.New(storer, storage.ModePutRequest, false),
)
if err != nil {
t.Fatal(err)
......@@ -292,7 +293,7 @@ func TestDirs(t *testing.T) {
validateFile := func(t *testing.T, file f, filePath string) {
t.Helper()
entry, err := verifyManifest.Lookup(filePath)
entry, err := verifyManifest.Lookup(ctx, filePath)
if err != nil {
t.Fatal(err)
}
......@@ -322,7 +323,7 @@ func TestDirs(t *testing.T) {
validateBzzPath := func(t *testing.T, fromPath, toPath string) {
t.Helper()
toEntry, err := verifyManifest.Lookup(toPath)
toEntry, err := verifyManifest.Lookup(ctx, toPath)
if err != nil {
t.Fatal(err)
}
......@@ -345,7 +346,7 @@ func TestDirs(t *testing.T) {
// check index filename
if tc.wantIndexFilename != "" {
entry, err := verifyManifest.Lookup(api.ManifestRootPath)
entry, err := verifyManifest.Lookup(ctx, api.ManifestRootPath)
if err != nil {
t.Fatal(err)
}
......@@ -367,7 +368,7 @@ func TestDirs(t *testing.T) {
// check error filename
if tc.wantErrorFilename != "" {
entry, err := verifyManifest.Lookup(api.ManifestRootPath)
entry, err := verifyManifest.Lookup(ctx, api.ManifestRootPath)
if err != nil {
t.Fatal(err)
}
......
......@@ -22,7 +22,6 @@ import (
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/storage"
......@@ -50,7 +49,6 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
logger = tracing.NewLoggerWithTraceID(r.Context(), s.Logger)
fileName, contentLength string
fileSize uint64
mode = requestModePut(r)
contentType = r.Header.Get("Content-Type")
)
......@@ -152,9 +150,10 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
reader = tmp
}
p := requestPipelineFn(s.Storer, r)
// first store the file and get its reference
pipe := builder.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r))
fr, err := builder.FeedPipeline(ctx, pipe, reader, int64(fileSize))
fr, err := p(ctx, reader, int64(fileSize))
if err != nil {
logger.Debugf("file upload: file store, file %q: %v", fileName, err)
logger.Errorf("file upload: file store, file %q", fileName)
......@@ -177,8 +176,7 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "metadata marshal error")
return
}
pipe = builder.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r))
mr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
mr, err := p(ctx, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil {
logger.Debugf("file upload: metadata store, file %q: %v", fileName, err)
logger.Errorf("file upload: metadata store, file %q", fileName)
......@@ -195,8 +193,7 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "entry marshal error")
return
}
pipe = builder.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r))
reference, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
reference, err := p(ctx, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
if err != nil {
logger.Debugf("file upload: entry store, file %q: %v", fileName, err)
logger.Errorf("file upload: entry store, file %q", fileName)
......
......@@ -107,3 +107,18 @@ func SplitWriteAll(ctx context.Context, s Splitter, r io.Reader, l int64, toEncr
}
return addr, nil
}
type Loader interface {
// Load a reference in byte slice representation and return all content associated with the reference.
Load(context.Context, []byte) ([]byte, error)
}
type Saver interface {
// Save an arbitrary byte slice and return the reference byte slice representation.
Save(context.Context, []byte) ([]byte, error)
}
type LoadSaver interface {
Loader
Saver
}
package loadsave
import (
"bytes"
"context"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
// loadSave is needed for manifest operations and provides
// simple wrapping over load and save operations using file
// package abstractions. use with caution since Loader will
// load all of the subtrie of a given hash in memory.
type loadSave struct {
storer storage.Storer
mode storage.ModePut
encrypted bool
}
func New(storer storage.Storer, mode storage.ModePut, enc bool) file.LoadSaver {
return &loadSave{
storer: storer,
mode: mode,
encrypted: enc,
}
}
func (ls *loadSave) Load(ctx context.Context, ref []byte) ([]byte, error) {
j, _, err := joiner.New(ctx, ls.storer, swarm.NewAddress(ref))
if err != nil {
return nil, err
}
buf := bytes.NewBuffer(nil)
_, err = file.JoinReadAll(ctx, j, buf)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (ls *loadSave) Save(ctx context.Context, data []byte) ([]byte, error) {
pipe := builder.NewPipelineBuilder(ctx, ls.storer, ls.mode, ls.encrypted)
address, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data), int64(len(data)))
if err != nil {
return swarm.ZeroAddress.Bytes(), err
}
return address.Bytes(), nil
}
......@@ -8,7 +8,7 @@ import (
"context"
"errors"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -36,16 +36,15 @@ type Interface interface {
// Type returns manifest implementation type information
Type() string
// Add a manifest entry to the specified path.
Add(string, Entry) error
Add(context.Context, string, Entry) error
// Remove a manifest entry on the specified path.
Remove(string) error
Remove(context.Context, string) error
// Lookup returns a manifest entry if one is found in the specified path.
Lookup(string) (Entry, error)
Lookup(context.Context, string) (Entry, error)
// HasPrefix tests whether the specified prefix path exists.
HasPrefix(string) (bool, error)
HasPrefix(context.Context, string) (bool, error)
// Store stores the manifest, returning the resulting address.
Store(context.Context, storage.ModePut) (swarm.Address, error)
Store(context.Context) (swarm.Address, error)
// IterateAddresses is used to iterate over chunks addresses for
// the manifest.
IterateAddresses(context.Context, swarm.AddressIterFunc) error
......@@ -60,24 +59,20 @@ type Entry interface {
}
// NewDefaultManifest creates a new manifest with default type.
func NewDefaultManifest(
encrypted bool,
storer storage.Storer,
) (Interface, error) {
return NewManifest(DefaultManifestType, encrypted, storer)
func NewDefaultManifest(ls file.LoadSaver) (Interface, error) {
return NewManifest(DefaultManifestType, ls)
}
// NewManifest creates a new manifest.
func NewManifest(
manifestType string,
encrypted bool,
storer storage.Storer,
ls file.LoadSaver,
) (Interface, error) {
switch manifestType {
case ManifestSimpleContentType:
return NewSimpleManifest(encrypted, storer)
return NewSimpleManifest(ls)
case ManifestMantarayContentType:
return NewMantarayManifest(encrypted, storer)
return NewMantarayManifest(ls)
default:
return nil, ErrInvalidManifestType
}
......@@ -85,17 +80,15 @@ func NewManifest(
// NewManifestReference loads existing manifest.
func NewManifestReference(
ctx context.Context,
manifestType string,
reference swarm.Address,
encrypted bool,
storer storage.Storer,
ls file.LoadSaver,
) (Interface, error) {
switch manifestType {
case ManifestSimpleContentType:
return NewSimpleManifestReference(ctx, reference, encrypted, storer)
return NewSimpleManifestReference(reference, ls)
case ManifestMantarayContentType:
return NewMantarayManifestReference(ctx, reference, encrypted, storer)
return NewMantarayManifestReference(reference, ls)
default:
return nil, ErrInvalidManifestType
}
......
......@@ -5,15 +5,11 @@
package manifest
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/manifest/mantaray"
)
......@@ -27,21 +23,14 @@ const (
type mantarayManifest struct {
trie *mantaray.Node
encrypted bool
storer storage.Storer
loader mantaray.LoadSaver
ls file.LoadSaver
}
// NewMantarayManifest creates a new mantaray-based manifest.
func NewMantarayManifest(
encrypted bool,
storer storage.Storer,
) (Interface, error) {
func NewMantarayManifest(ls file.LoadSaver) (Interface, error) {
return &mantarayManifest{
trie: mantaray.New(),
encrypted: encrypted,
storer: storer,
trie: mantaray.New(),
ls: ls,
}, nil
}
......@@ -50,14 +39,12 @@ func NewMantarayManifest(
//
// NOTE: This should only be used in tests.
func NewMantarayManifestWithObfuscationKeyFn(
encrypted bool,
storer storage.Storer,
ls file.LoadSaver,
obfuscationKeyFn func([]byte) (int, error),
) (Interface, error) {
mm := &mantarayManifest{
trie: mantaray.New(),
encrypted: encrypted,
storer: storer,
trie: mantaray.New(),
ls: ls,
}
mantaray.SetObfuscationKeyFn(obfuscationKeyFn)
return mm, nil
......@@ -65,16 +52,12 @@ func NewMantarayManifestWithObfuscationKeyFn(
// NewMantarayManifestReference loads existing mantaray-based manifest.
func NewMantarayManifestReference(
ctx context.Context,
reference swarm.Address,
encrypted bool,
storer storage.Storer,
ls file.LoadSaver,
) (Interface, error) {
return &mantarayManifest{
trie: mantaray.NewNodeRef(reference.Bytes()),
encrypted: encrypted,
storer: storer,
loader: newMantarayLoader(ctx, encrypted, storer),
trie: mantaray.NewNodeRef(reference.Bytes()),
ls: ls,
}, nil
}
......@@ -82,17 +65,17 @@ func (m *mantarayManifest) Type() string {
return ManifestMantarayContentType
}
func (m *mantarayManifest) Add(path string, entry Entry) error {
func (m *mantarayManifest) Add(ctx context.Context, path string, entry Entry) error {
p := []byte(path)
e := entry.Reference().Bytes()
return m.trie.Add(p, e, entry.Metadata(), m.loader)
return m.trie.Add(ctx, p, e, entry.Metadata(), m.ls)
}
func (m *mantarayManifest) Remove(path string) error {
func (m *mantarayManifest) Remove(ctx context.Context, path string) error {
p := []byte(path)
err := m.trie.Remove(p, m.loader)
err := m.trie.Remove(ctx, p, m.ls)
if err != nil {
if errors.Is(err, mantaray.ErrNotFound) {
return ErrNotFound
......@@ -103,10 +86,10 @@ func (m *mantarayManifest) Remove(path string) error {
return nil
}
func (m *mantarayManifest) Lookup(path string) (Entry, error) {
func (m *mantarayManifest) Lookup(ctx context.Context, path string) (Entry, error) {
p := []byte(path)
node, err := m.trie.LookupNode(p, m.loader)
node, err := m.trie.LookupNode(ctx, p, m.ls)
if err != nil {
if errors.Is(err, mantaray.ErrNotFound) {
return nil, ErrNotFound
......@@ -125,18 +108,14 @@ func (m *mantarayManifest) Lookup(path string) (Entry, error) {
return entry, nil
}
func (m *mantarayManifest) HasPrefix(prefix string) (bool, error) {
func (m *mantarayManifest) HasPrefix(ctx context.Context, prefix string) (bool, error) {
p := []byte(prefix)
return m.trie.HasPrefix(p, m.loader)
return m.trie.HasPrefix(ctx, p, m.ls)
}
func (m *mantarayManifest) Store(ctx context.Context, mode storage.ModePut) (swarm.Address, error) {
saver := newMantaraySaver(ctx, m.encrypted, m.storer, mode)
m.loader = saver
err := m.trie.Save(saver)
func (m *mantarayManifest) Store(ctx context.Context) (swarm.Address, error) {
err := m.trie.Save(ctx, m.ls)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("manifest save error: %w", err)
}
......@@ -182,7 +161,7 @@ func (m *mantarayManifest) IterateAddresses(ctx context.Context, fn swarm.Addres
return nil
}
err := m.trie.WalkNode([]byte{}, m.loader, walker)
err := m.trie.WalkNode(ctx, []byte{}, m.ls, walker)
if err != nil {
if !errors.Is(err, errStopIterator) {
return fmt.Errorf("manifest iterate addresses: %w", err)
......@@ -192,67 +171,3 @@ func (m *mantarayManifest) IterateAddresses(ctx context.Context, fn swarm.Addres
return nil
}
// mantarayLoadSaver implements required interface 'mantaray.LoadSaver'
type mantarayLoadSaver struct {
ctx context.Context
encrypted bool
storer storage.Storer
modePut storage.ModePut
}
func newMantarayLoader(
ctx context.Context,
encrypted bool,
storer storage.Storer,
) *mantarayLoadSaver {
return &mantarayLoadSaver{
ctx: ctx,
encrypted: encrypted,
storer: storer,
}
}
func newMantaraySaver(
ctx context.Context,
encrypted bool,
storer storage.Storer,
modePut storage.ModePut,
) *mantarayLoadSaver {
return &mantarayLoadSaver{
ctx: ctx,
encrypted: encrypted,
storer: storer,
modePut: modePut,
}
}
func (ls *mantarayLoadSaver) Load(ref []byte) ([]byte, error) {
ctx := ls.ctx
j, _, err := joiner.New(ctx, ls.storer, swarm.NewAddress(ref))
if err != nil {
return nil, err
}
buf := bytes.NewBuffer(nil)
_, err = file.JoinReadAll(ctx, j, buf)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (ls *mantarayLoadSaver) Save(data []byte) ([]byte, error) {
ctx := ls.ctx
pipe := builder.NewPipelineBuilder(ctx, ls.storer, ls.modePut, ls.encrypted)
address, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data), int64(len(data)))
if err != nil {
return swarm.ZeroAddress.Bytes(), err
}
return address.Bytes(), nil
}
......@@ -5,15 +5,11 @@
package manifest
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/manifest/simple"
)
......@@ -28,36 +24,25 @@ type simpleManifest struct {
manifest simple.Manifest
reference swarm.Address
encrypted bool
storer storage.Storer
ls file.LoadSaver
}
// NewSimpleManifest creates a new simple manifest.
func NewSimpleManifest(
encrypted bool,
storer storage.Storer,
) (Interface, error) {
func NewSimpleManifest(ls file.LoadSaver) (Interface, error) {
return &simpleManifest{
manifest: simple.NewManifest(),
encrypted: encrypted,
storer: storer,
manifest: simple.NewManifest(),
ls: ls,
}, nil
}
// NewSimpleManifestReference loads existing simple manifest.
func NewSimpleManifestReference(
ctx context.Context,
reference swarm.Address,
encrypted bool,
storer storage.Storer,
) (Interface, error) {
func NewSimpleManifestReference(ref swarm.Address, l file.LoadSaver) (Interface, error) {
m := &simpleManifest{
manifest: simple.NewManifest(),
reference: reference,
encrypted: encrypted,
storer: storer,
reference: ref,
ls: l,
}
err := m.load(ctx, reference)
err := m.load(context.Background(), ref)
return m, err
}
......@@ -65,14 +50,13 @@ func (m *simpleManifest) Type() string {
return ManifestSimpleContentType
}
func (m *simpleManifest) Add(path string, entry Entry) error {
func (m *simpleManifest) Add(_ context.Context, path string, entry Entry) error {
e := entry.Reference().String()
return m.manifest.Add(path, e, entry.Metadata())
}
func (m *simpleManifest) Remove(path string) error {
func (m *simpleManifest) Remove(_ context.Context, path string) error {
err := m.manifest.Remove(path)
if err != nil {
if errors.Is(err, simple.ErrNotFound) {
......@@ -84,8 +68,7 @@ func (m *simpleManifest) Remove(path string) error {
return nil
}
func (m *simpleManifest) Lookup(path string) (Entry, error) {
func (m *simpleManifest) Lookup(_ context.Context, path string) (Entry, error) {
n, err := m.manifest.Lookup(path)
if err != nil {
return nil, ErrNotFound
......@@ -101,26 +84,22 @@ func (m *simpleManifest) Lookup(path string) (Entry, error) {
return entry, nil
}
func (m *simpleManifest) HasPrefix(prefix string) (bool, error) {
func (m *simpleManifest) HasPrefix(_ context.Context, prefix string) (bool, error) {
return m.manifest.HasPrefix(prefix), nil
}
func (m *simpleManifest) Store(ctx context.Context, mode storage.ModePut) (swarm.Address, error) {
func (m *simpleManifest) Store(ctx context.Context) (swarm.Address, error) {
data, err := m.manifest.MarshalBinary()
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("manifest marshal error: %w", err)
}
pipe := builder.NewPipelineBuilder(ctx, m.storer, mode, m.encrypted)
address, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data), int64(len(data)))
ref, err := m.ls.Save(ctx, data)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("manifest save error: %w", err)
}
m.reference = address
return address, nil
m.reference = swarm.NewAddress(ref)
return m.reference, nil
}
func (m *simpleManifest) IterateAddresses(ctx context.Context, fn swarm.AddressIterFunc) error {
......@@ -164,18 +143,12 @@ func (m *simpleManifest) IterateAddresses(ctx context.Context, fn swarm.AddressI
}
func (m *simpleManifest) load(ctx context.Context, reference swarm.Address) error {
j, _, err := joiner.New(ctx, m.storer, reference)
if err != nil {
return fmt.Errorf("new joiner: %w", err)
}
buf := bytes.NewBuffer(nil)
_, err = file.JoinReadAll(ctx, j, buf)
buf, err := m.ls.Load(ctx, reference.Bytes())
if err != nil {
return fmt.Errorf("manifest load error: %w", err)
}
err = m.manifest.UnmarshalBinary(buf.Bytes())
err = m.manifest.UnmarshalBinary(buf)
if err != nil {
return fmt.Errorf("manifest unmarshal error: %w", err)
}
......
......@@ -14,6 +14,7 @@ import (
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/loadsave"
"github.com/ethersphere/bee/pkg/manifest"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -324,11 +325,9 @@ func (s *traversalService) checkIsManifest(
// NOTE: 'encrypted' parameter only used for saving manifest
m, err = manifest.NewManifestReference(
ctx,
metadata.MimeType,
e.Reference(),
false,
s.storer,
loadsave.New(s.storer, storage.ModePutRequest, false),
)
if err != nil {
if err == manifest.ErrInvalidManifestType {
......
......@@ -18,6 +18,7 @@ import (
"time"
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file/loadsave"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/manifest"
"github.com/ethersphere/bee/pkg/storage"
......@@ -461,15 +462,15 @@ func TestTraversalManifest(t *testing.T) {
ctx := context.Background()
var dirManifest manifest.Interface
ls := loadsave.New(mockStorer, storage.ModePutRequest, false)
switch tc.manifestType {
case manifest.ManifestSimpleContentType:
dirManifest, err = manifest.NewSimpleManifest(false, mockStorer)
dirManifest, err = manifest.NewSimpleManifest(ls)
if err != nil {
t.Fatal(err)
}
case manifest.ManifestMantarayContentType:
dirManifest, err = manifest.NewMantarayManifestWithObfuscationKeyFn(false, mockStorer, obfuscationKeyFn)
dirManifest, err = manifest.NewMantarayManifestWithObfuscationKeyFn(ls, obfuscationKeyFn)
if err != nil {
t.Fatal(err)
}
......@@ -518,14 +519,14 @@ func TestTraversalManifest(t *testing.T) {
filePath := path.Join(f.dir, fileName)
err = dirManifest.Add(filePath, manifest.NewEntry(reference, nil))
err = dirManifest.Add(ctx, filePath, manifest.NewEntry(reference, nil))
if err != nil {
t.Fatal(err)
}
}
// save manifest
manifestBytesReference, err := dirManifest.Store(ctx, storage.ModePutUpload)
manifestBytesReference, err := dirManifest.Store(ctx)
if err != nil {
t.Fatal(err)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment