Commit 1c050f52 authored by acud's avatar acud Committed by GitHub

integrate split pipeline (#610)

* integrate splitter pipeline into the api
parent 4e143e4f
...@@ -7,10 +7,8 @@ package api ...@@ -7,10 +7,8 @@ package api
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"strings"
"github.com/ethersphere/bee/pkg/file" "github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/splitter"
"github.com/ethersphere/bee/pkg/jsonhttp" "github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/sctx" "github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
...@@ -34,9 +32,8 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -34,9 +32,8 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
// Add the tag to the context // Add the tag to the context
ctx := sctx.SetTag(r.Context(), tag) ctx := sctx.SetTag(r.Context(), tag)
toEncrypt := strings.ToLower(r.Header.Get(EncryptHeader)) == "true" pipe := pipeline.NewPipeline(ctx, s.Storer, requestModePut(r))
sp := splitter.NewSimpleSplitter(s.Storer, requestModePut(r)) address, err := pipeline.FeedPipeline(ctx, pipe, r.Body, r.ContentLength)
address, err := file.SplitWriteAll(ctx, sp, r.Body, r.ContentLength, toEncrypt)
if err != nil { if err != nil {
s.Logger.Debugf("bytes upload: split write all: %v", err) s.Logger.Debugf("bytes upload: split write all: %v", err)
s.Logger.Error("bytes upload: split write all") s.Logger.Error("bytes upload: split write all")
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"mime" "mime"
"net/http" "net/http"
...@@ -16,8 +17,7 @@ import ( ...@@ -16,8 +17,7 @@ import (
"testing" "testing"
"github.com/ethersphere/bee/pkg/collection/entry" "github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file" "github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/splitter"
"github.com/ethersphere/bee/pkg/jsonhttp" "github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest" "github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
...@@ -32,14 +32,17 @@ func TestBzz(t *testing.T) { ...@@ -32,14 +32,17 @@ func TestBzz(t *testing.T) {
var ( var (
bzzDownloadResource = func(addr, path string) string { return "/bzz/" + addr + "/" + path } bzzDownloadResource = func(addr, path string) string { return "/bzz/" + addr + "/" + path }
storer = smock.NewStorer() storer = smock.NewStorer()
sp = splitter.NewSimpleSplitter(storer, storage.ModePutUpload) ctx = context.Background()
client = newTestServer(t, testServerOptions{ client = newTestServer(t, testServerOptions{
Storer: storer, Storer: storer,
Tags: tags.NewTags(), Tags: tags.NewTags(),
Logger: logging.New(ioutil.Discard, 5), Logger: logging.New(ioutil.Discard, 5),
}) })
pipeWriteAll = func(r io.Reader, l int64) (swarm.Address, error) {
pipe := pipeline.NewPipeline(ctx, storer, storage.ModePutUpload)
return pipeline.FeedPipeline(ctx, pipe, r, l)
}
) )
t.Run("download-file-by-path", func(t *testing.T) { t.Run("download-file-by-path", func(t *testing.T) {
fileName := "sample.html" fileName := "sample.html"
filePath := "test/" + fileName filePath := "test/" + fileName
...@@ -61,8 +64,8 @@ func TestBzz(t *testing.T) { ...@@ -61,8 +64,8 @@ func TestBzz(t *testing.T) {
var manifestFileReference swarm.Address var manifestFileReference swarm.Address
// save file // save file
fileContentReference, err = pipeWriteAll(strings.NewReader(sampleHtml), int64(len(sampleHtml)))
fileContentReference, err = file.SplitWriteAll(context.Background(), sp, strings.NewReader(sampleHtml), int64(len(sampleHtml)), false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -74,7 +77,8 @@ func TestBzz(t *testing.T) { ...@@ -74,7 +77,8 @@ func TestBzz(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
fileMetadataReference, err := file.SplitWriteAll(context.Background(), sp, bytes.NewReader(fileMetadataBytes), int64(len(fileMetadataBytes)), false) fileMetadataReference, err := pipeWriteAll(bytes.NewReader(fileMetadataBytes), int64(len(fileMetadataBytes)))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -84,13 +88,13 @@ func TestBzz(t *testing.T) { ...@@ -84,13 +88,13 @@ func TestBzz(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
fileReference, err = file.SplitWriteAll(context.Background(), sp, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)), false) fileReference, err = pipeWriteAll(bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// save manifest // save manifest
m, err := manifest.NewDefaultManifest(false, storer) m, err := manifest.NewDefaultManifest(false, storer)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -115,7 +119,7 @@ func TestBzz(t *testing.T) { ...@@ -115,7 +119,7 @@ func TestBzz(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
mr, err := file.SplitWriteAll(context.Background(), sp, bytes.NewReader(metadataBytes), int64(len(metadataBytes)), false) mr, err := pipeWriteAll(bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -127,7 +131,7 @@ func TestBzz(t *testing.T) { ...@@ -127,7 +131,7 @@ func TestBzz(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
manifestFileReference, err = file.SplitWriteAll(context.Background(), sp, bytes.NewReader(manifestFileEntryBytes), int64(len(manifestFileEntryBytes)), false) manifestFileReference, err = pipeWriteAll(bytes.NewReader(manifestFileEntryBytes), int64(len(manifestFileEntryBytes)))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -18,8 +18,7 @@ import ( ...@@ -18,8 +18,7 @@ import (
"strings" "strings"
"github.com/ethersphere/bee/pkg/collection/entry" "github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file" "github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/splitter"
"github.com/ethersphere/bee/pkg/jsonhttp" "github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/manifest" "github.com/ethersphere/bee/pkg/manifest"
...@@ -169,8 +168,8 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode ...@@ -169,8 +168,8 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode
return swarm.ZeroAddress, fmt.Errorf("metadata marshal: %w", err) return swarm.ZeroAddress, fmt.Errorf("metadata marshal: %w", err)
} }
sp := splitter.NewSimpleSplitter(s, mode) pipe := pipeline.NewPipeline(ctx, s, mode)
mr, err := file.SplitWriteAll(ctx, sp, bytes.NewReader(metadataBytes), int64(len(metadataBytes)), toEncrypt) mr, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil { if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split metadata: %w", err) return swarm.ZeroAddress, fmt.Errorf("split metadata: %w", err)
} }
...@@ -182,8 +181,8 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode ...@@ -182,8 +181,8 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode
return swarm.ZeroAddress, fmt.Errorf("entry marshal: %w", err) return swarm.ZeroAddress, fmt.Errorf("entry marshal: %w", err)
} }
sp = splitter.NewSimpleSplitter(s, mode) pipe = pipeline.NewPipeline(ctx, s, mode)
manifestFileReference, err := file.SplitWriteAll(ctx, sp, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)), toEncrypt) manifestFileReference, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
if err != nil { if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split entry: %w", err) return swarm.ZeroAddress, fmt.Errorf("split entry: %w", err)
} }
...@@ -194,12 +193,9 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode ...@@ -194,12 +193,9 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode
// storeFile uploads the given file and returns its reference // storeFile uploads the given file and returns its reference
// this function was extracted from `fileUploadHandler` and should eventually replace its current code // this function was extracted from `fileUploadHandler` and should eventually replace its current code
func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer, mode storage.ModePut) (swarm.Address, error) { func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer, mode storage.ModePut) (swarm.Address, error) {
v := ctx.Value(toEncryptContextKey{})
toEncrypt, _ := v.(bool) // default is false
// first store the file and get its reference // first store the file and get its reference
sp := splitter.NewSimpleSplitter(s, mode) pipe := pipeline.NewPipeline(ctx, s, mode)
fr, err := file.SplitWriteAll(ctx, sp, fileInfo.reader, fileInfo.size, toEncrypt) fr, err := pipeline.FeedPipeline(ctx, pipe, fileInfo.reader, fileInfo.size)
if err != nil { if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split file: %w", err) return swarm.ZeroAddress, fmt.Errorf("split file: %w", err)
} }
...@@ -217,8 +213,8 @@ func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer, ...@@ -217,8 +213,8 @@ func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer,
return swarm.ZeroAddress, fmt.Errorf("metadata marshal: %w", err) return swarm.ZeroAddress, fmt.Errorf("metadata marshal: %w", err)
} }
sp = splitter.NewSimpleSplitter(s, mode) pipe = pipeline.NewPipeline(ctx, s, mode)
mr, err := file.SplitWriteAll(ctx, sp, bytes.NewReader(metadataBytes), int64(len(metadataBytes)), toEncrypt) mr, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil { if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split metadata: %w", err) return swarm.ZeroAddress, fmt.Errorf("split metadata: %w", err)
} }
...@@ -229,8 +225,8 @@ func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer, ...@@ -229,8 +225,8 @@ func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer,
if err != nil { if err != nil {
return swarm.ZeroAddress, fmt.Errorf("entry marshal: %w", err) return swarm.ZeroAddress, fmt.Errorf("entry marshal: %w", err)
} }
sp = splitter.NewSimpleSplitter(s, mode) pipe = pipeline.NewPipeline(ctx, s, mode)
reference, err := file.SplitWriteAll(ctx, sp, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)), toEncrypt) reference, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
if err != nil { if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split entry: %w", err) return swarm.ZeroAddress, fmt.Errorf("split entry: %w", err)
} }
......
...@@ -17,15 +17,14 @@ import ( ...@@ -17,15 +17,14 @@ import (
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/ethersphere/bee/pkg/collection/entry" "github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/encryption" "github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/file" "github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner" "github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/seekjoiner" "github.com/ethersphere/bee/pkg/file/seekjoiner"
"github.com/ethersphere/bee/pkg/file/splitter"
"github.com/ethersphere/bee/pkg/jsonhttp" "github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/sctx" "github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
...@@ -52,7 +51,6 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -52,7 +51,6 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
fileName, contentLength string fileName, contentLength string
fileSize uint64 fileSize uint64
mode = requestModePut(r) mode = requestModePut(r)
toEncrypt = strings.ToLower(r.Header.Get(EncryptHeader)) == "true"
contentType = r.Header.Get("Content-Type") contentType = r.Header.Get("Content-Type")
) )
...@@ -155,8 +153,8 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -155,8 +153,8 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
} }
// first store the file and get its reference // first store the file and get its reference
sp := splitter.NewSimpleSplitter(s.Storer, mode) pipe := pipeline.NewPipeline(ctx, s.Storer, mode)
fr, err := file.SplitWriteAll(ctx, sp, reader, int64(fileSize), toEncrypt) fr, err := pipeline.FeedPipeline(ctx, pipe, reader, int64(fileSize))
if err != nil { if err != nil {
s.Logger.Debugf("file upload: file store, file %q: %v", fileName, err) s.Logger.Debugf("file upload: file store, file %q: %v", fileName, err)
s.Logger.Errorf("file upload: file store, file %q", fileName) s.Logger.Errorf("file upload: file store, file %q", fileName)
...@@ -179,8 +177,8 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -179,8 +177,8 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "metadata marshal error") jsonhttp.InternalServerError(w, "metadata marshal error")
return return
} }
sp = splitter.NewSimpleSplitter(s.Storer, mode) pipe = pipeline.NewPipeline(ctx, s.Storer, mode)
mr, err := file.SplitWriteAll(ctx, sp, bytes.NewReader(metadataBytes), int64(len(metadataBytes)), toEncrypt) mr, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil { if err != nil {
s.Logger.Debugf("file upload: metadata store, file %q: %v", fileName, err) s.Logger.Debugf("file upload: metadata store, file %q: %v", fileName, err)
s.Logger.Errorf("file upload: metadata store, file %q", fileName) s.Logger.Errorf("file upload: metadata store, file %q", fileName)
...@@ -197,8 +195,8 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -197,8 +195,8 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "entry marshal error") jsonhttp.InternalServerError(w, "entry marshal error")
return return
} }
sp = splitter.NewSimpleSplitter(s.Storer, mode) pipe = pipeline.NewPipeline(ctx, s.Storer, mode)
reference, err := file.SplitWriteAll(ctx, sp, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)), toEncrypt) reference, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
if err != nil { if err != nil {
s.Logger.Debugf("file upload: entry store, file %q: %v", fileName, err) s.Logger.Debugf("file upload: entry store, file %q: %v", fileName, err)
s.Logger.Errorf("file upload: entry store, file %q", fileName) s.Logger.Errorf("file upload: entry store, file %q", fileName)
......
...@@ -39,9 +39,8 @@ func (w *bmtWriter) chainWrite(p *pipeWriteArgs) error { ...@@ -39,9 +39,8 @@ func (w *bmtWriter) chainWrite(p *pipeWriteArgs) error {
if err != nil { if err != nil {
return err return err
} }
bytes := w.b.Sum(nil) p.ref = w.b.Sum(nil)
args := &pipeWriteArgs{ref: bytes, data: p.data, span: p.data[:swarm.SpanSize]} return w.next.chainWrite(p)
return w.next.chainWrite(args)
} }
// sum calls the next writer for the cryptographic sum // sum calls the next writer for the cryptographic sum
......
...@@ -212,6 +212,9 @@ func TestFeederFlush(t *testing.T) { ...@@ -212,6 +212,9 @@ func TestFeederFlush(t *testing.T) {
} }
} }
// countingResultWriter counts how many writes were done to it
// and passes the results to the caller using the pointer provided
// in the constructor.
type countingResultWriter struct { type countingResultWriter struct {
target *pipeWriteArgs target *pipeWriteArgs
count int count int
......
...@@ -94,7 +94,7 @@ func (f *chunkFeeder) Sum() ([]byte, error) { ...@@ -94,7 +94,7 @@ func (f *chunkFeeder) Sum() ([]byte, error) {
d := make([]byte, f.bufferIdx+span) d := make([]byte, f.bufferIdx+span)
copy(d[span:], f.buffer[:f.bufferIdx]) copy(d[span:], f.buffer[:f.bufferIdx])
binary.LittleEndian.PutUint64(d[:span], uint64(f.bufferIdx)) binary.LittleEndian.PutUint64(d[:span], uint64(f.bufferIdx))
args := &pipeWriteArgs{data: d} args := &pipeWriteArgs{data: d, span: d[:span]}
err := f.next.chainWrite(args) err := f.next.chainWrite(args)
if err != nil { if err != nil {
return nil, err return nil, err
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
/* Package pipeline provides functionality for hashing pipelines needed to create different flavors of merkelised representations
of arbitrary data.
The interface exposes an io.Writer and Sum method, for components to use as a black box.
Within a pipeline, writers are chainable. It is up for the implementer to decide whether a writer calls the next writer.
Implementers should always implement the Sum method and call the next writer's Sum method (in case there is one), returning its result to the calling context.
*/
package pipeline
...@@ -6,10 +6,13 @@ package pipeline ...@@ -6,10 +6,13 @@ package pipeline
import ( import (
"encoding/binary" "encoding/binary"
"errors"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
) )
var errInconsistentRefs = errors.New("inconsistent reference lengths in level")
type hashTrieWriter struct { type hashTrieWriter struct {
branching int branching int
chunkSize int chunkSize int
...@@ -76,22 +79,22 @@ func (h *hashTrieWriter) wrapFullLevel(level int) error { ...@@ -76,22 +79,22 @@ func (h *hashTrieWriter) wrapFullLevel(level int) error {
spb := make([]byte, 8) spb := make([]byte, 8)
binary.LittleEndian.PutUint64(spb, sp) binary.LittleEndian.PutUint64(spb, sp)
hashes = append(spb, hashes...) hashes = append(spb, hashes...)
var results pipeWriteArgs writer := h.pipelineFn()
writer := h.pipelineFn(&results)
args := pipeWriteArgs{ args := pipeWriteArgs{
data: hashes, data: hashes,
span: spb,
} }
err := writer.chainWrite(&args) err := writer.chainWrite(&args)
if err != nil { if err != nil {
return err return err
} }
err = h.writeToLevel(level+1, results.span, results.ref) err = h.writeToLevel(level+1, args.span, args.ref)
if err != nil { if err != nil {
return err return err
} }
// this "truncates" the current level that was wrapped // this "truncates" the current level that was wrapped
// by setting the cursors the the cursors of one level above // by setting the cursors to the cursors of one level above
h.cursors[level] = h.cursors[level+1] h.cursors[level] = h.cursors[level+1]
return nil return nil
} }
...@@ -101,6 +104,9 @@ func (h *hashTrieWriter) hoistLevels(target int) ([]byte, error) { ...@@ -101,6 +104,9 @@ func (h *hashTrieWriter) hoistLevels(target int) ([]byte, error) {
oneRef := h.refSize + swarm.SpanSize oneRef := h.refSize + swarm.SpanSize
for i := 1; i < target; i++ { for i := 1; i < target; i++ {
l := h.levelSize(i) l := h.levelSize(i)
if l%oneRef != 0 {
return nil, errInconsistentRefs
}
switch { switch {
case l == 0: case l == 0:
continue continue
...@@ -122,6 +128,9 @@ func (h *hashTrieWriter) hoistLevels(target int) ([]byte, error) { ...@@ -122,6 +128,9 @@ func (h *hashTrieWriter) hoistLevels(target int) ([]byte, error) {
level := target level := target
tlen := h.levelSize(target) tlen := h.levelSize(target)
data := h.buffer[h.cursors[level+1]:h.cursors[level]] data := h.buffer[h.cursors[level+1]:h.cursors[level]]
if tlen%oneRef != 0 {
return nil, errInconsistentRefs
}
if tlen == oneRef { if tlen == oneRef {
return data[8:], nil return data[8:], nil
} }
...@@ -139,14 +148,14 @@ func (h *hashTrieWriter) hoistLevels(target int) ([]byte, error) { ...@@ -139,14 +148,14 @@ func (h *hashTrieWriter) hoistLevels(target int) ([]byte, error) {
spb := make([]byte, 8) spb := make([]byte, 8)
binary.LittleEndian.PutUint64(spb, sp) binary.LittleEndian.PutUint64(spb, sp)
hashes = append(spb, hashes...) hashes = append(spb, hashes...)
var results pipeWriteArgs writer := h.pipelineFn()
writer := h.pipelineFn(&results)
args := pipeWriteArgs{ args := pipeWriteArgs{
data: hashes, data: hashes,
span: spb,
} }
err := writer.chainWrite(&args) err := writer.chainWrite(&args)
return results.ref, err return args.ref, err
} }
func (h *hashTrieWriter) levelSize(level int) int { func (h *hashTrieWriter) levelSize(level int) int {
......
...@@ -6,18 +6,20 @@ package pipeline ...@@ -6,18 +6,20 @@ package pipeline
import ( import (
"bytes" "bytes"
"context"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"testing" "testing"
test "github.com/ethersphere/bee/pkg/file/testing" test "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock" "github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
) )
func TestPartialWrites(t *testing.T) { func TestPartialWrites(t *testing.T) {
m := mock.NewStorer() m := mock.NewStorer()
p := NewPipeline(m) p := NewPipeline(context.Background(), m, storage.ModePutUpload)
_, _ = p.Write([]byte("hello ")) _, _ = p.Write([]byte("hello "))
_, _ = p.Write([]byte("world")) _, _ = p.Write([]byte("world"))
...@@ -33,9 +35,14 @@ func TestPartialWrites(t *testing.T) { ...@@ -33,9 +35,14 @@ func TestPartialWrites(t *testing.T) {
func TestHelloWorld(t *testing.T) { func TestHelloWorld(t *testing.T) {
m := mock.NewStorer() m := mock.NewStorer()
p := NewPipeline(m) p := NewPipeline(context.Background(), m, storage.ModePutUpload)
data := []byte("hello world") data := []byte("hello world")
_, _ = p.Write(data) _, err := p.Write(data)
if err != nil {
t.Fatal(err)
}
sum, err := p.Sum() sum, err := p.Sum()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -51,9 +58,12 @@ func TestAllVectors(t *testing.T) { ...@@ -51,9 +58,12 @@ func TestAllVectors(t *testing.T) {
data, expect := test.GetVector(t, i) data, expect := test.GetVector(t, i)
t.Run(fmt.Sprintf("data length %d, vector %d", len(data), i), func(t *testing.T) { t.Run(fmt.Sprintf("data length %d, vector %d", len(data), i), func(t *testing.T) {
m := mock.NewStorer() m := mock.NewStorer()
p := NewPipeline(m) p := NewPipeline(context.Background(), m, storage.ModePutUpload)
_, _ = p.Write(data) _, err := p.Write(data)
if err != nil {
t.Fatal(err)
}
sum, err := p.Sum() sum, err := p.Sum()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
......
...@@ -5,6 +5,10 @@ ...@@ -5,6 +5,10 @@
package pipeline package pipeline
import ( import (
"context"
"fmt"
"io"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
) )
...@@ -18,25 +22,77 @@ type pipeWriteArgs struct { ...@@ -18,25 +22,77 @@ type pipeWriteArgs struct {
// NewPipeline creates a standard pipeline that only hashes content with BMT to create // NewPipeline creates a standard pipeline that only hashes content with BMT to create
// a merkle-tree of hashes that represent the given arbitrary size byte stream. Partial // a merkle-tree of hashes that represent the given arbitrary size byte stream. Partial
// writes are supported. The pipeline flow is: Data -> Feeder -> BMT -> Storage -> HashTrie. // writes are supported. The pipeline flow is: Data -> Feeder -> BMT -> Storage -> HashTrie.
func NewPipeline(s storage.Storer) Interface { func NewPipeline(ctx context.Context, s storage.Storer, mode storage.ModePut) Interface {
tw := newHashTrieWriter(swarm.ChunkSize, swarm.Branches, swarm.HashSize, newShortPipelineFunc(s)) tw := newHashTrieWriter(swarm.ChunkSize, swarm.Branches, swarm.HashSize, newShortPipelineFunc(ctx, s, mode))
lsw := newStoreWriter(s, tw) lsw := newStoreWriter(ctx, s, mode, tw)
b := newBmtWriter(128, lsw) b := newBmtWriter(128, lsw)
feeder := newChunkFeederWriter(swarm.ChunkSize, b) return newChunkFeederWriter(swarm.ChunkSize, b)
return feeder
} }
type pipelineFunc func(p *pipeWriteArgs) chainWriter type pipelineFunc func() chainWriter
// newShortPipelineFunc returns a constructor function for an ephemeral hashing pipeline // newShortPipelineFunc returns a constructor function for an ephemeral hashing pipeline
// needed by the hashTrieWriter. // needed by the hashTrieWriter.
func newShortPipelineFunc(s storage.Storer) func(*pipeWriteArgs) chainWriter { func newShortPipelineFunc(ctx context.Context, s storage.Storer, mode storage.ModePut) func() chainWriter {
return func(p *pipeWriteArgs) chainWriter { return func() chainWriter {
rsw := newResultWriter(p) lsw := newStoreWriter(ctx, s, mode, nil)
lsw := newStoreWriter(s, rsw) return newBmtWriter(128, lsw)
bw := newBmtWriter(128, lsw) }
}
return bw // FeedPipeline feeds the pipeline with the given reader until EOF is reached.
// It returns the cryptographic root hash of the content.
func FeedPipeline(ctx context.Context, pipeline Interface, r io.Reader, dataLength int64) (addr swarm.Address, err error) {
var total int64
data := make([]byte, swarm.ChunkSize)
var eof bool
for !eof {
c, err := r.Read(data)
total += int64(c)
if err != nil {
if err == io.EOF {
if total < dataLength {
return swarm.ZeroAddress, fmt.Errorf("pipline short write: read %d out of %d bytes", total+int64(c), dataLength)
}
eof = true
if c > 0 {
cc, err := pipeline.Write(data[:c])
if err != nil {
return swarm.ZeroAddress, err
}
if cc < c {
return swarm.ZeroAddress, fmt.Errorf("pipeline short write: %d mismatches %d", cc, c)
}
}
continue
} else {
return swarm.ZeroAddress, err
}
}
cc, err := pipeline.Write(data[:c])
if err != nil {
return swarm.ZeroAddress, err
}
if cc < c {
return swarm.ZeroAddress, fmt.Errorf("pipeline short write: %d mismatches %d", cc, c)
}
select {
case <-ctx.Done():
return swarm.ZeroAddress, ctx.Err()
default:
}
} }
select {
case <-ctx.Done():
return swarm.ZeroAddress, ctx.Err()
default:
}
sum, err := pipeline.Sum()
if err != nil {
return swarm.ZeroAddress, err
}
newAddress := swarm.NewAddress(sum)
return newAddress, nil
} }
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
import "errors"
type resultWriter struct {
target *pipeWriteArgs
}
func newResultWriter(b *pipeWriteArgs) chainWriter {
return &resultWriter{target: b}
}
func (w *resultWriter) chainWrite(p *pipeWriteArgs) error {
*w.target = *p
return nil
}
func (w *resultWriter) sum() ([]byte, error) {
return nil, errors.New("not implemented")
}
...@@ -7,28 +7,47 @@ package pipeline ...@@ -7,28 +7,47 @@ package pipeline
import ( import (
"context" "context"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
) )
type storeWriter struct { type storeWriter struct {
l storage.Putter l storage.Putter
mode storage.ModePut
ctx context.Context
next chainWriter next chainWriter
} }
// newStoreWriter returns a storeWriter. It just writes the given data // newStoreWriter returns a storeWriter. It just writes the given data
// to a given storage.Storer. // to a given storage.Storer.
func newStoreWriter(l storage.Putter, next chainWriter) chainWriter { func newStoreWriter(ctx context.Context, l storage.Putter, mode storage.ModePut, next chainWriter) chainWriter {
return &storeWriter{l: l, next: next} return &storeWriter{ctx: ctx, l: l, mode: mode, next: next}
} }
func (w *storeWriter) chainWrite(p *pipeWriteArgs) error { func (w *storeWriter) chainWrite(p *pipeWriteArgs) error {
tag := sctx.GetTag(w.ctx)
if tag != nil {
tag.Inc(tags.StateSplit)
}
c := swarm.NewChunk(swarm.NewAddress(p.ref), p.data) c := swarm.NewChunk(swarm.NewAddress(p.ref), p.data)
_, err := w.l.Put(context.Background(), storage.ModePutUpload, c) seen, err := w.l.Put(w.ctx, w.mode, c)
if err != nil { if err != nil {
return err return err
} }
if tag != nil {
tag.Inc(tags.StateStored)
if seen[0] {
tag.Inc(tags.StateSeen)
}
}
if w.next == nil {
return nil
}
return w.next.chainWrite(p) return w.next.chainWrite(p)
} }
func (w *storeWriter) sum() ([]byte, error) { func (w *storeWriter) sum() ([]byte, error) {
......
...@@ -12,7 +12,7 @@ import ( ...@@ -12,7 +12,7 @@ import (
"github.com/ethersphere/bee/pkg/file" "github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner" "github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/splitter" "github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/manifest/simple" "github.com/ethersphere/manifest/simple"
...@@ -106,9 +106,9 @@ func (m *simpleManifest) Store(ctx context.Context, mode storage.ModePut) (swarm ...@@ -106,9 +106,9 @@ func (m *simpleManifest) Store(ctx context.Context, mode storage.ModePut) (swarm
return swarm.ZeroAddress, fmt.Errorf("manifest marshal error: %w", err) return swarm.ZeroAddress, fmt.Errorf("manifest marshal error: %w", err)
} }
sp := splitter.NewSimpleSplitter(m.storer, mode) pipe := pipeline.NewPipeline(ctx, m.storer, mode)
address, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(data), int64(len(data)))
address, err := file.SplitWriteAll(ctx, sp, bytes.NewReader(data), int64(len(data)), m.encrypted) _ = m.encrypted // need this field for encryption but this is to avoid linter complaints
if err != nil { if err != nil {
return swarm.ZeroAddress, fmt.Errorf("manifest save error: %w", err) return swarm.ZeroAddress, fmt.Errorf("manifest save error: %w", err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment