Commit f7088db1 authored by acud's avatar acud Committed by GitHub

pipeline hashing: refactor to different packages (#700)

* move pipeline components to individual packages
parent fb3ba9df
......@@ -8,7 +8,7 @@ import (
"fmt"
"net/http"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -35,8 +35,8 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
// Add the tag to the context
ctx := sctx.SetTag(r.Context(), tag)
pipe := pipeline.NewPipelineBuilder(ctx, s.Storer, requestModePut(r), requestEncrypt(r))
address, err := pipeline.FeedPipeline(ctx, pipe, r.Body, r.ContentLength)
pipe := builder.NewPipelineBuilder(ctx, s.Storer, requestModePut(r), requestEncrypt(r))
address, err := builder.FeedPipeline(ctx, pipe, r.Body, r.ContentLength)
if err != nil {
logger.Debugf("bytes upload: split write all: %v", err)
logger.Error("bytes upload: split write all")
......
......@@ -16,10 +16,10 @@ import (
"strings"
"testing"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
......@@ -43,8 +43,8 @@ func TestBzz(t *testing.T) {
Logger: logging.New(ioutil.Discard, 5),
})
pipeWriteAll = func(r io.Reader, l int64) (swarm.Address, error) {
pipe := pipeline.NewPipelineBuilder(ctx, storer, storage.ModePutUpload, false)
return pipeline.FeedPipeline(ctx, pipe, r, l)
pipe := builder.NewPipelineBuilder(ctx, storer, storage.ModePutUpload, false)
return builder.FeedPipeline(ctx, pipe, r, l)
}
)
t.Run("download-file-by-path", func(t *testing.T) {
......
......@@ -18,7 +18,7 @@ import (
"strings"
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/manifest"
......@@ -182,8 +182,8 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode
return swarm.ZeroAddress, fmt.Errorf("metadata marshal: %w", err)
}
pipe := pipeline.NewPipelineBuilder(ctx, s, mode, encrypt)
mr, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
pipe := builder.NewPipelineBuilder(ctx, s, mode, encrypt)
mr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split metadata: %w", err)
}
......@@ -195,8 +195,8 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode
return swarm.ZeroAddress, fmt.Errorf("entry marshal: %w", err)
}
pipe = pipeline.NewPipelineBuilder(ctx, s, mode, encrypt)
manifestFileReference, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
pipe = builder.NewPipelineBuilder(ctx, s, mode, encrypt)
manifestFileReference, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split entry: %w", err)
}
......@@ -208,8 +208,8 @@ func storeDir(ctx context.Context, reader io.ReadCloser, s storage.Storer, mode
// this function was extracted from `fileUploadHandler` and should eventually replace its current code
func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer, mode storage.ModePut, encrypt bool) (swarm.Address, error) {
// first store the file and get its reference
pipe := pipeline.NewPipelineBuilder(ctx, s, mode, encrypt)
fr, err := pipeline.FeedPipeline(ctx, pipe, fileInfo.reader, fileInfo.size)
pipe := builder.NewPipelineBuilder(ctx, s, mode, encrypt)
fr, err := builder.FeedPipeline(ctx, pipe, fileInfo.reader, fileInfo.size)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split file: %w", err)
}
......@@ -227,8 +227,8 @@ func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer,
return swarm.ZeroAddress, fmt.Errorf("metadata marshal: %w", err)
}
pipe = pipeline.NewPipelineBuilder(ctx, s, mode, encrypt)
mr, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
pipe = builder.NewPipelineBuilder(ctx, s, mode, encrypt)
mr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split metadata: %w", err)
}
......@@ -239,8 +239,8 @@ func storeFile(ctx context.Context, fileInfo *fileUploadInfo, s storage.Storer,
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("entry marshal: %w", err)
}
pipe = pipeline.NewPipelineBuilder(ctx, s, mode, encrypt)
reference, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
pipe = builder.NewPipelineBuilder(ctx, s, mode, encrypt)
reference, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("split entry: %w", err)
}
......
......@@ -21,7 +21,7 @@ import (
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/seekjoiner"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/sctx"
......@@ -152,8 +152,8 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
}
// first store the file and get its reference
pipe := pipeline.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r))
fr, err := pipeline.FeedPipeline(ctx, pipe, reader, int64(fileSize))
pipe := builder.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r))
fr, err := builder.FeedPipeline(ctx, pipe, reader, int64(fileSize))
if err != nil {
logger.Debugf("file upload: file store, file %q: %v", fileName, err)
logger.Errorf("file upload: file store, file %q", fileName)
......@@ -176,8 +176,8 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "metadata marshal error")
return
}
pipe = pipeline.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r))
mr, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
pipe = builder.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r))
mr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil {
logger.Debugf("file upload: metadata store, file %q: %v", fileName, err)
logger.Errorf("file upload: metadata store, file %q", fileName)
......@@ -194,8 +194,8 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "entry marshal error")
return
}
pipe = pipeline.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r))
reference, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
pipe = builder.NewPipelineBuilder(ctx, s.Storer, mode, requestEncrypt(r))
reference, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
if err != nil {
logger.Debugf("file upload: entry store, file %q: %v", fileName, err)
logger.Errorf("file upload: entry store, file %q", fileName)
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package encryption
import (
"github.com/ethersphere/bee/pkg/swarm"
"golang.org/x/crypto/sha3"
)
// ChunkEncrypter encrypts chunk data.
type ChunkEncrypter interface {
EncryptChunk([]byte) (key Key, encryptedSpan, encryptedData []byte, err error)
}
type chunkEncrypter struct{}
func NewChunkEncrypter() ChunkEncrypter { return &chunkEncrypter{} }
func (c *chunkEncrypter) EncryptChunk(chunkData []byte) (Key, []byte, []byte, error) {
key := GenerateRandomKey(KeyLength)
encryptedSpan, err := newSpanEncryption(key).Encrypt(chunkData[:8])
if err != nil {
return nil, nil, nil, err
}
encryptedData, err := newDataEncryption(key).Encrypt(chunkData[8:])
if err != nil {
return nil, nil, nil, err
}
return key, encryptedSpan, encryptedData, nil
}
func newSpanEncryption(key Key) Interface {
refSize := int64(swarm.HashSize + KeyLength)
return New(key, 0, uint32(swarm.ChunkSize/refSize), sha3.NewLegacyKeccak256)
}
func newDataEncryption(key Key) Interface {
return New(key, int(swarm.ChunkSize), 0, sha3.NewLegacyKeccak256)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mock
import (
"github.com/ethersphere/bee/pkg/encryption"
)
type chunkEncrypter struct {
key []byte
}
func NewChunkEncrypter(key []byte) encryption.ChunkEncrypter { return &chunkEncrypter{key: key} }
func (c *chunkEncrypter) EncryptChunk(chunkData []byte) (encryption.Key, []byte, []byte, error) {
enc := New(WithXOREncryption(c.key))
encryptedSpan, err := enc.Encrypt(chunkData[:8])
if err != nil {
return nil, nil, nil, err
}
encryptedData, err := enc.Encrypt(chunkData[8:])
if err != nil {
return nil, nil, nil, err
}
return nil, encryptedSpan, encryptedData, nil
}
......@@ -13,7 +13,7 @@ import (
"testing"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/seekjoiner"
test "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/storage"
......@@ -44,7 +44,7 @@ func testSplitThenJoin(t *testing.T) {
paramstring = strings.Split(t.Name(), "/")
dataIdx, _ = strconv.ParseInt(paramstring[1], 10, 0)
store = mock.NewStorer()
p = pipeline.NewPipelineBuilder(context.Background(), store, storage.ModePutUpload, false)
p = builder.NewPipelineBuilder(context.Background(), store, storage.ModePutUpload, false)
j = seekjoiner.NewSimpleJoiner(store)
data, _ = test.GetVector(t, int(dataIdx))
)
......@@ -53,7 +53,7 @@ func testSplitThenJoin(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dataReader := file.NewSimpleReadCloser(data)
resultAddress, err := pipeline.FeedPipeline(ctx, p, dataReader, int64(len(data)))
resultAddress, err := builder.FeedPipeline(ctx, p, dataReader, int64(len(data)))
if err != nil {
t.Fatal(err)
}
......
......@@ -2,50 +2,59 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
package bmt
import (
"errors"
"hash"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bmt"
bmtlegacy "github.com/ethersphere/bmt/legacy"
"golang.org/x/crypto/sha3"
)
var (
errInvalidData = errors.New("bmt: invalid data")
)
type bmtWriter struct {
b bmt.Hash
next chainWriter
next pipeline.ChainWriter
}
// newBmtWriter returns a new bmtWriter. Partial writes are not supported.
// NewBmtWriter returns a new bmtWriter. Partial writes are not supported.
// Note: branching factor is the BMT branching factor, not the merkle trie branching factor.
func newBmtWriter(branches int, next chainWriter) chainWriter {
func NewBmtWriter(branches int, next pipeline.ChainWriter) pipeline.ChainWriter {
return &bmtWriter{
b: bmtlegacy.New(bmtlegacy.NewTreePool(hashFunc, branches, bmtlegacy.PoolSize)),
next: next,
}
}
// chainWrite writes data in chain. It assumes span has been prepended to the data.
// ChainWrite writes data in chain. It assumes span has been prepended to the data.
// The span can be encrypted or unencrypted.
func (w *bmtWriter) chainWrite(p *pipeWriteArgs) error {
func (w *bmtWriter) ChainWrite(p *pipeline.PipeWriteArgs) error {
if len(p.Data) < swarm.SpanSize {
return errInvalidData
}
w.b.Reset()
err := w.b.SetSpanBytes(p.data[:swarm.SpanSize])
err := w.b.SetSpanBytes(p.Data[:swarm.SpanSize])
if err != nil {
return err
}
_, err = w.b.Write(p.data[swarm.SpanSize:])
_, err = w.b.Write(p.Data[swarm.SpanSize:])
if err != nil {
return err
}
p.ref = w.b.Sum(nil)
return w.next.chainWrite(p)
p.Ref = w.b.Sum(nil)
return w.next.ChainWrite(p)
}
// sum calls the next writer for the cryptographic sum
func (w *bmtWriter) sum() ([]byte, error) {
return w.next.sum()
func (w *bmtWriter) Sum() ([]byte, error) {
return w.next.Sum()
}
func hashFunc() hash.Hash {
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package bmt_test
import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"testing"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/bmt"
mock "github.com/ethersphere/bee/pkg/file/pipeline/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
// TestStoreWriter tests that store writer stores the provided data and calls the next chain writer.
func TestBmtWriter(t *testing.T) {
for _, tc := range []struct {
name string
data []byte
expHash []byte
expErr error
noSpan bool
}{
{
// this is a special case, since semantically it can be considered the hash
// of an empty file (since data is all zeros).
name: "all zeros",
data: make([]byte, swarm.ChunkSize),
expHash: mustDecodeString(t, "09ae927d0f3aaa37324df178928d3826820f3dd3388ce4aaebfc3af410bde23a"),
},
{
name: "hello world",
data: []byte("hello world"),
expHash: mustDecodeString(t, "92672a471f4419b255d7cb0cf313474a6f5856fb347c5ece85fb706d644b630f"),
},
{
name: "no data",
data: []byte{},
noSpan: true,
expErr: bmt.ErrInvalidData,
},
} {
t.Run(tc.name, func(t *testing.T) {
mockChainWriter := mock.NewChainWriter()
writer := bmt.NewBmtWriter(128, mockChainWriter)
var data []byte
if !tc.noSpan {
data = make([]byte, 8)
binary.LittleEndian.PutUint64(data, uint64(len(tc.data)))
}
data = append(data, tc.data...)
args := pipeline.PipeWriteArgs{Data: data}
err := writer.ChainWrite(&args)
if err != nil && tc.expErr != nil && errors.Is(err, tc.expErr) {
return
}
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(tc.expHash, args.Ref) {
t.Fatalf("ref mismatch. got %v want %v", args.Ref, tc.expHash)
}
if calls := mockChainWriter.ChainWriteCalls(); calls != 1 {
t.Errorf("wanted 1 ChainWrite call, got %d", calls)
}
})
}
}
// TestSum tests that calling Sum on the writer calls the next writer's Sum.
func TestSum(t *testing.T) {
mockChainWriter := mock.NewChainWriter()
writer := bmt.NewBmtWriter(128, mockChainWriter)
_, err := writer.Sum()
if err != nil {
t.Fatal(err)
}
if calls := mockChainWriter.SumCalls(); calls != 1 {
t.Fatalf("wanted 1 Sum call but got %d", calls)
}
}
func mustDecodeString(t *testing.T, s string) []byte {
t.Helper()
v, err := hex.DecodeString(s)
if err != nil {
t.Fatal(err)
}
return v
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package bmt
var ErrInvalidData = errInvalidData
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package builder
import (
"context"
"fmt"
"io"
"github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/bmt"
enc "github.com/ethersphere/bee/pkg/file/pipeline/encryption"
"github.com/ethersphere/bee/pkg/file/pipeline/feeder"
"github.com/ethersphere/bee/pkg/file/pipeline/hashtrie"
"github.com/ethersphere/bee/pkg/file/pipeline/store"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
// NewPipelineBuilder returns the appropriate pipeline according to the specified parameters
func NewPipelineBuilder(ctx context.Context, s storage.Storer, mode storage.ModePut, encrypt bool) pipeline.Interface {
if encrypt {
return newEncryptionPipeline(ctx, s, mode)
}
return newPipeline(ctx, s, mode)
}
// newPipeline creates a standard pipeline that only hashes content with BMT to create
// a merkle-tree of hashes that represent the given arbitrary size byte stream. Partial
// writes are supported. The pipeline flow is: Data -> Feeder -> BMT -> Storage -> HashTrie.
func newPipeline(ctx context.Context, s storage.Storer, mode storage.ModePut) pipeline.Interface {
tw := hashtrie.NewHashTrieWriter(swarm.ChunkSize, swarm.Branches, swarm.HashSize, newShortPipelineFunc(ctx, s, mode))
lsw := store.NewStoreWriter(ctx, s, mode, tw)
b := bmt.NewBmtWriter(128, lsw)
return feeder.NewChunkFeederWriter(swarm.ChunkSize, b)
}
// newShortPipelineFunc returns a constructor function for an ephemeral hashing pipeline
// needed by the hashTrieWriter.
func newShortPipelineFunc(ctx context.Context, s storage.Storer, mode storage.ModePut) func() pipeline.ChainWriter {
return func() pipeline.ChainWriter {
lsw := store.NewStoreWriter(ctx, s, mode, nil)
return bmt.NewBmtWriter(128, lsw)
}
}
// newEncryptionPipeline creates an encryption pipeline that encrypts using CTR, hashes content with BMT to create
// a merkle-tree of hashes that represent the given arbitrary size byte stream. Partial
// writes are supported. The pipeline flow is: Data -> Feeder -> Encryption -> BMT -> Storage -> HashTrie.
// Note that the encryption writer will mutate the data to contain the encrypted span, but the span field
// with the unencrypted span is preserved.
func newEncryptionPipeline(ctx context.Context, s storage.Storer, mode storage.ModePut) pipeline.Interface {
tw := hashtrie.NewHashTrieWriter(swarm.ChunkSize, 64, swarm.HashSize+encryption.KeyLength, newShortEncryptionPipelineFunc(ctx, s, mode))
lsw := store.NewStoreWriter(ctx, s, mode, tw)
b := bmt.NewBmtWriter(128, lsw)
enc := enc.NewEncryptionWriter(encryption.NewChunkEncrypter(), b)
return feeder.NewChunkFeederWriter(swarm.ChunkSize, enc)
}
// newShortEncryptionPipelineFunc returns a constructor function for an ephemeral hashing pipeline
// needed by the hashTrieWriter.
func newShortEncryptionPipelineFunc(ctx context.Context, s storage.Storer, mode storage.ModePut) func() pipeline.ChainWriter {
return func() pipeline.ChainWriter {
lsw := store.NewStoreWriter(ctx, s, mode, nil)
b := bmt.NewBmtWriter(128, lsw)
return enc.NewEncryptionWriter(encryption.NewChunkEncrypter(), b)
}
}
// FeedPipeline feeds the pipeline with the given reader until EOF is reached.
// It returns the cryptographic root hash of the content.
func FeedPipeline(ctx context.Context, pipeline pipeline.Interface, r io.Reader, dataLength int64) (addr swarm.Address, err error) {
var total int64
data := make([]byte, swarm.ChunkSize)
for {
c, err := r.Read(data)
total += int64(c)
if err != nil {
if err == io.EOF {
if total < dataLength {
return swarm.ZeroAddress, fmt.Errorf("pipline short write: read %d out of %d bytes", total, dataLength)
}
if c > 0 {
cc, err := pipeline.Write(data[:c])
if err != nil {
return swarm.ZeroAddress, err
}
if cc < c {
return swarm.ZeroAddress, fmt.Errorf("pipeline short write: %d mismatches %d", cc, c)
}
}
break
} else {
return swarm.ZeroAddress, err
}
}
cc, err := pipeline.Write(data[:c])
if err != nil {
return swarm.ZeroAddress, err
}
if cc < c {
return swarm.ZeroAddress, fmt.Errorf("pipeline short write: %d mismatches %d", cc, c)
}
select {
case <-ctx.Done():
return swarm.ZeroAddress, ctx.Err()
default:
}
}
select {
case <-ctx.Done():
return swarm.ZeroAddress, ctx.Err()
default:
}
sum, err := pipeline.Sum()
if err != nil {
return swarm.ZeroAddress, err
}
newAddress := swarm.NewAddress(sum)
return newAddress, nil
}
......@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline_test
package builder_test
import (
"bytes"
......@@ -11,7 +11,7 @@ import (
"fmt"
"testing"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
test "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
......@@ -20,7 +20,7 @@ import (
func TestPartialWrites(t *testing.T) {
m := mock.NewStorer()
p := pipeline.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false)
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false)
_, _ = p.Write([]byte("hello "))
_, _ = p.Write([]byte("world"))
......@@ -36,7 +36,7 @@ func TestPartialWrites(t *testing.T) {
func TestHelloWorld(t *testing.T) {
m := mock.NewStorer()
p := pipeline.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false)
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false)
data := []byte("hello world")
_, err := p.Write(data)
......@@ -59,7 +59,7 @@ func TestAllVectors(t *testing.T) {
data, expect := test.GetVector(t, i)
t.Run(fmt.Sprintf("data length %d, vector %d", len(data), i), func(t *testing.T) {
m := mock.NewStorer()
p := pipeline.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false)
p := builder.NewPipelineBuilder(context.Background(), m, storage.ModePutUpload, false)
_, err := p.Write(data)
if err != nil {
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package encryption
import (
"github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/file/pipeline"
)
type encryptionWriter struct {
next pipeline.ChainWriter
enc encryption.ChunkEncrypter
}
func NewEncryptionWriter(encrypter encryption.ChunkEncrypter, next pipeline.ChainWriter) pipeline.ChainWriter {
return &encryptionWriter{
next: next,
enc: encrypter,
}
}
// Write assumes that the span is prepended to the actual data before the write !
func (e *encryptionWriter) ChainWrite(p *pipeline.PipeWriteArgs) error {
key, encryptedSpan, encryptedData, err := e.enc.EncryptChunk(p.Data)
if err != nil {
return err
}
c := make([]byte, len(encryptedSpan)+len(encryptedData))
copy(c[:8], encryptedSpan)
copy(c[8:], encryptedData)
p.Data = c // replace the verbatim data with the encrypted data
p.Key = key
return e.next.ChainWrite(p)
}
func (e *encryptionWriter) Sum() ([]byte, error) {
return e.next.Sum()
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package encryption_test
import (
"bytes"
"encoding/binary"
"testing"
mockenc "github.com/ethersphere/bee/pkg/encryption/mock"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/encryption"
mock "github.com/ethersphere/bee/pkg/file/pipeline/mock"
)
var (
addr = []byte{0xaa, 0xbb, 0xcc}
key = []byte("abcd")
data = []byte("hello world")
encryptedSpan, encryptedData []byte
)
func init() {
mockEncrypter := mockenc.New(mockenc.WithXOREncryption(key))
var err error
encryptedData, err = mockEncrypter.Encrypt(data)
if err != nil {
panic(err)
}
span := make([]byte, 8)
binary.BigEndian.PutUint64(span, uint64(len(data)))
encryptedSpan, err = mockEncrypter.Encrypt(span)
if err != nil {
panic(err)
}
}
// TestEncyrption tests that the encyption writer works correctly.
func TestEncryption(t *testing.T) {
mockChainWriter := mock.NewChainWriter()
writer := encryption.NewEncryptionWriter(mockenc.NewChunkEncrypter(key), mockChainWriter)
args := pipeline.PipeWriteArgs{Ref: addr, Data: data}
err := writer.ChainWrite(&args)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(encryptedData, args.Data) {
t.Fatalf("data mismatch. got %v want %v", args.Data, encryptedData)
}
if calls := mockChainWriter.ChainWriteCalls(); calls != 1 {
t.Errorf("wanted 1 ChainWrite call, got %d", calls)
}
}
// TestSum tests that calling Sum on the store writer results in Sum on the next writer in the chain.
func TestSum(t *testing.T) {
mockChainWriter := mock.NewChainWriter()
writer := encryption.NewEncryptionWriter(nil, mockChainWriter)
_, err := writer.Sum()
if err != nil {
t.Fatal(err)
}
if calls := mockChainWriter.SumCalls(); calls != 1 {
t.Fatalf("wanted 1 Sum call but got %d", calls)
}
}
package pipeline
import (
"github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/swarm"
"golang.org/x/crypto/sha3"
)
type encryptionWriter struct {
next chainWriter
}
func newEncryptionWriter(next chainWriter) chainWriter {
return &encryptionWriter{
next: next,
}
}
// Write assumes that the span is prepended to the actual data before the write !
func (e *encryptionWriter) chainWrite(p *pipeWriteArgs) error {
key, encryptedSpan, encryptedData, err := encrypt(p.data)
if err != nil {
return err
}
c := make([]byte, len(encryptedSpan)+len(encryptedData))
copy(c[:8], encryptedSpan)
copy(c[8:], encryptedData)
p.data = c // replace the verbatim data with the encrypted data
p.key = key
return e.next.chainWrite(p)
}
func (e *encryptionWriter) sum() ([]byte, error) {
return e.next.sum()
}
func encrypt(chunkData []byte) (encryption.Key, []byte, []byte, error) {
key := encryption.GenerateRandomKey(encryption.KeyLength)
encryptedSpan, err := newSpanEncryption(key).Encrypt(chunkData[:8])
if err != nil {
return nil, nil, nil, err
}
encryptedData, err := newDataEncryption(key).Encrypt(chunkData[8:])
if err != nil {
return nil, nil, nil, err
}
return key, encryptedSpan, encryptedData, nil
}
func newSpanEncryption(key encryption.Key) encryption.Interface {
refSize := int64(swarm.HashSize + encryption.KeyLength)
return encryption.New(key, 0, uint32(swarm.ChunkSize/refSize), sha3.NewLegacyKeccak256)
}
func newDataEncryption(key encryption.Key) encryption.Interface {
return encryption.New(key, int(swarm.ChunkSize), 0, sha3.NewLegacyKeccak256)
}
......@@ -2,11 +2,12 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
package feeder
import (
"encoding/binary"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -14,7 +15,7 @@ const span = swarm.SpanSize
type chunkFeeder struct {
size int
next chainWriter
next pipeline.ChainWriter
buffer []byte
bufferIdx int
}
......@@ -22,7 +23,7 @@ type chunkFeeder struct {
// newChunkFeederWriter creates a new chunkFeeder that allows for partial
// writes into the pipeline. Any pending data in the buffer is flushed to
// subsequent writers when Sum() is called.
func newChunkFeederWriter(size int, next chainWriter) Interface {
func NewChunkFeederWriter(size int, next pipeline.ChainWriter) pipeline.Interface {
return &chunkFeeder{
size: size,
next: next,
......@@ -73,8 +74,8 @@ func (f *chunkFeeder) Write(b []byte) (int, error) {
sp += n
binary.LittleEndian.PutUint64(d[:span], uint64(sp))
args := &pipeWriteArgs{data: d[:span+sp], span: d[:span]}
err := f.next.chainWrite(args)
args := &pipeline.PipeWriteArgs{Data: d[:span+sp], Span: d[:span]}
err := f.next.ChainWrite(args)
if err != nil {
return 0, err
}
......@@ -94,12 +95,12 @@ func (f *chunkFeeder) Sum() ([]byte, error) {
d := make([]byte, f.bufferIdx+span)
copy(d[span:], f.buffer[:f.bufferIdx])
binary.LittleEndian.PutUint64(d[:span], uint64(f.bufferIdx))
args := &pipeWriteArgs{data: d, span: d[:span]}
err := f.next.chainWrite(args)
args := &pipeline.PipeWriteArgs{Data: d, Span: d[:span]}
err := f.next.ChainWrite(args)
if err != nil {
return nil, err
}
}
return f.next.sum()
return f.next.Sum()
}
......@@ -2,15 +2,19 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
package feeder_test
import (
"bytes"
"encoding/binary"
"errors"
"testing"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/feeder"
)
// TestFeeder tests that partial writes work correctly.
func TestFeeder(t *testing.T) {
var (
chunkSize = 5
......@@ -71,9 +75,9 @@ func TestFeeder(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
var results pipeWriteArgs
var results pipeline.PipeWriteArgs
rr := newMockResultWriter(&results)
cf := newChunkFeederWriter(chunkSize, rr)
cf := feeder.NewChunkFeederWriter(chunkSize, rr)
i := 0
for _, v := range tc.dataSize {
d := data[i : i+v]
......@@ -87,7 +91,7 @@ func TestFeeder(t *testing.T) {
i += v
}
if tc.expWrites == 0 && results.data != nil {
if tc.expWrites == 0 && results.Data != nil {
t.Fatal("expected no write but got one")
}
......@@ -95,12 +99,12 @@ func TestFeeder(t *testing.T) {
t.Fatalf("expected %d writes but got %d", tc.expWrites, rr.count)
}
if results.data != nil && !bytes.Equal(tc.writeData, results.data[8:]) {
t.Fatalf("expected write data %v but got %v", tc.writeData, results.data[8:])
if results.Data != nil && !bytes.Equal(tc.writeData, results.Data[8:]) {
t.Fatalf("expected write data %v but got %v", tc.writeData, results.Data[8:])
}
if tc.span > 0 {
v := binary.LittleEndian.Uint64(results.data[:8])
v := binary.LittleEndian.Uint64(results.Data[:8])
if v != tc.span {
t.Fatalf("span mismatch, got %d want %d", v, tc.span)
}
......@@ -110,7 +114,7 @@ func TestFeeder(t *testing.T) {
}
// TestFeederFlush tests that the feeder flushes the data in the buffer correctly
// when Summing
// on Sum().
func TestFeederFlush(t *testing.T) {
var (
chunkSize = 5
......@@ -172,9 +176,9 @@ func TestFeederFlush(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
var results pipeWriteArgs
var results pipeline.PipeWriteArgs
rr := newMockResultWriter(&results)
cf := newChunkFeederWriter(chunkSize, rr)
cf := feeder.NewChunkFeederWriter(chunkSize, rr)
i := 0
for _, v := range tc.dataSize {
d := data[i : i+v]
......@@ -190,7 +194,7 @@ func TestFeederFlush(t *testing.T) {
_, _ = cf.Sum()
if tc.expWrites == 0 && results.data != nil {
if tc.expWrites == 0 && results.Data != nil {
t.Fatal("expected no write but got one")
}
......@@ -198,12 +202,12 @@ func TestFeederFlush(t *testing.T) {
t.Fatalf("expected %d writes but got %d", tc.expWrites, rr.count)
}
if results.data != nil && !bytes.Equal(tc.writeData, results.data[8:]) {
t.Fatalf("expected write data %v but got %v", tc.writeData, results.data[8:])
if results.Data != nil && !bytes.Equal(tc.writeData, results.Data[8:]) {
t.Fatalf("expected write data %v but got %v", tc.writeData, results.Data[8:])
}
if tc.span > 0 {
v := binary.LittleEndian.Uint64(results.data[:8])
v := binary.LittleEndian.Uint64(results.Data[:8])
if v != tc.span {
t.Fatalf("span mismatch, got %d want %d", v, tc.span)
}
......@@ -216,20 +220,20 @@ func TestFeederFlush(t *testing.T) {
// and passes the results to the caller using the pointer provided
// in the constructor.
type countingResultWriter struct {
target *pipeWriteArgs
target *pipeline.PipeWriteArgs
count int
}
func newMockResultWriter(b *pipeWriteArgs) *countingResultWriter {
func newMockResultWriter(b *pipeline.PipeWriteArgs) *countingResultWriter {
return &countingResultWriter{target: b}
}
func (w *countingResultWriter) chainWrite(p *pipeWriteArgs) error {
func (w *countingResultWriter) ChainWrite(p *pipeline.PipeWriteArgs) error {
w.count++
*w.target = *p
return nil
}
func (w *countingResultWriter) sum() ([]byte, error) {
func (w *countingResultWriter) Sum() ([]byte, error) {
return nil, errors.New("not implemented")
}
......@@ -2,12 +2,13 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
package hashtrie
import (
"encoding/binary"
"errors"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -20,10 +21,10 @@ type hashTrieWriter struct {
fullChunk int // full chunk size in terms of the data represented in the buffer (span+refsize)
cursors []int // level cursors, key is level. level 0 is data level
buffer []byte // keeps all level data
pipelineFn pipelineFunc
pipelineFn pipeline.PipelineFunc
}
func newHashTrieWriter(chunkSize, branching, refLen int, pipelineFn pipelineFunc) chainWriter {
func NewHashTrieWriter(chunkSize, branching, refLen int, pipelineFn pipeline.PipelineFunc) pipeline.ChainWriter {
return &hashTrieWriter{
cursors: make([]int, 9),
buffer: make([]byte, swarm.ChunkWithSpanSize*9*2), // double size as temp workaround for weak calculation of needed buffer space
......@@ -37,13 +38,13 @@ func newHashTrieWriter(chunkSize, branching, refLen int, pipelineFn pipelineFunc
// accepts writes of hashes from the previous writer in the chain, by definition these writes
// are on level 1
func (h *hashTrieWriter) chainWrite(p *pipeWriteArgs) error {
func (h *hashTrieWriter) ChainWrite(p *pipeline.PipeWriteArgs) error {
oneRef := h.refSize + swarm.SpanSize
l := len(p.span) + len(p.ref) + len(p.key)
l := len(p.Span) + len(p.Ref) + len(p.Key)
if l%oneRef != 0 {
return errInconsistentRefs
}
return h.writeToLevel(1, p.span, p.ref, p.key)
return h.writeToLevel(1, p.Span, p.Ref, p.Key)
}
func (h *hashTrieWriter) writeToLevel(level int, span, ref, key []byte) error {
......@@ -88,15 +89,15 @@ func (h *hashTrieWriter) wrapFullLevel(level int) error {
binary.LittleEndian.PutUint64(spb, sp)
hashes = append(spb, hashes...)
writer := h.pipelineFn()
args := pipeWriteArgs{
data: hashes,
span: spb,
args := pipeline.PipeWriteArgs{
Data: hashes,
Span: spb,
}
err := writer.chainWrite(&args)
err := writer.ChainWrite(&args)
if err != nil {
return err
}
err = h.writeToLevel(level+1, args.span, args.ref, args.key)
err = h.writeToLevel(level+1, args.Span, args.Ref, args.Key)
if err != nil {
return err
}
......@@ -157,12 +158,12 @@ func (h *hashTrieWriter) hoistLevels(target int) ([]byte, error) {
binary.LittleEndian.PutUint64(spb, sp)
hashes = append(spb, hashes...)
writer := h.pipelineFn()
args := pipeWriteArgs{
data: hashes,
span: spb,
args := pipeline.PipeWriteArgs{
Data: hashes,
Span: spb,
}
err := writer.chainWrite(&args)
ref := append(args.ref, args.key...)
err := writer.ChainWrite(&args)
ref := append(args.Ref, args.Key...)
return ref, err
}
......@@ -173,7 +174,7 @@ func (h *hashTrieWriter) levelSize(level int) int {
return h.cursors[level] - h.cursors[level+1]
}
func (h *hashTrieWriter) sum() ([]byte, error) {
func (h *hashTrieWriter) Sum() ([]byte, error) {
// look from the top down, to look for the highest hash of a balanced tree
// then, whatever is in the levels below that is necessarily unbalanced,
// so, we'd like to reduce those levels to one hash, then wrap it together
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mock
import (
"sync"
"github.com/ethersphere/bee/pkg/file/pipeline"
)
type MockChainWriter struct {
sync.Mutex
chainWriteCalls int
sumCalls int
}
func NewChainWriter() *MockChainWriter {
return &MockChainWriter{}
}
func (c *MockChainWriter) ChainWrite(_ *pipeline.PipeWriteArgs) error {
c.Lock()
defer c.Unlock()
c.chainWriteCalls++
return nil
}
func (c *MockChainWriter) Sum() ([]byte, error) {
c.Lock()
defer c.Unlock()
c.sumCalls++
return nil, nil
}
func (c *MockChainWriter) ChainWriteCalls() int { c.Lock(); defer c.Unlock(); return c.chainWriteCalls }
func (c *MockChainWriter) SumCalls() int { c.Lock(); defer c.Unlock(); return c.sumCalls }
......@@ -4,128 +4,34 @@
package pipeline
import (
"context"
"fmt"
"io"
"github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
type pipeWriteArgs struct {
ref []byte // reference, generated by bmt
key []byte // encryption key
span []byte // always unecrypted span uint64
data []byte // data includes the span too, but it may be encrypted when the pipeline is encrypted
}
// NewPipelineBuilder returns the appropriate pipeline according to the specified parameters
func NewPipelineBuilder(ctx context.Context, s storage.Storer, mode storage.ModePut, encrypt bool) Interface {
if encrypt {
return newEncryptionPipeline(ctx, s, mode)
}
return newPipeline(ctx, s, mode)
}
// newPipeline creates a standard pipeline that only hashes content with BMT to create
// a merkle-tree of hashes that represent the given arbitrary size byte stream. Partial
// writes are supported. The pipeline flow is: Data -> Feeder -> BMT -> Storage -> HashTrie.
func newPipeline(ctx context.Context, s storage.Storer, mode storage.ModePut) Interface {
tw := newHashTrieWriter(swarm.ChunkSize, swarm.Branches, swarm.HashSize, newShortPipelineFunc(ctx, s, mode))
lsw := newStoreWriter(ctx, s, mode, tw)
b := newBmtWriter(128, lsw)
return newChunkFeederWriter(swarm.ChunkSize, b)
import "io"
// ChainWriter is a writer in a pipeline.
// It is up to the implementer to decide whether a writer
// calls the next writer or not. Implementers should
// call the Sum method of the subsequent writer in case there
// exists one.
type ChainWriter interface {
ChainWrite(*PipeWriteArgs) error
Sum() ([]byte, error)
}
type pipelineFunc func() chainWriter
// newShortPipelineFunc returns a constructor function for an ephemeral hashing pipeline
// needed by the hashTrieWriter.
func newShortPipelineFunc(ctx context.Context, s storage.Storer, mode storage.ModePut) func() chainWriter {
return func() chainWriter {
lsw := newStoreWriter(ctx, s, mode, nil)
return newBmtWriter(128, lsw)
}
}
// newEncryptionPipeline creates an encryption pipeline that encrypts using CTR, hashes content with BMT to create
// a merkle-tree of hashes that represent the given arbitrary size byte stream. Partial
// writes are supported. The pipeline flow is: Data -> Feeder -> Encryption -> BMT -> Storage -> HashTrie.
// Note that the encryption writer will mutate the data to contain the encrypted span, but the span field
// with the unencrypted span is preserved.
func newEncryptionPipeline(ctx context.Context, s storage.Storer, mode storage.ModePut) Interface {
tw := newHashTrieWriter(swarm.ChunkSize, 64, swarm.HashSize+encryption.KeyLength, newShortEncryptionPipelineFunc(ctx, s, mode))
lsw := newStoreWriter(ctx, s, mode, tw)
b := newBmtWriter(128, lsw)
enc := newEncryptionWriter(b)
return newChunkFeederWriter(swarm.ChunkSize, enc)
// Interface exposes an `io.Writer` and `Sum` method, for components to use as a black box.
// Within a pipeline, writers are chainable. It is up for the implementer to decide whether
// a writer calls the next writer. Implementers should always implement the `Sum` method
// and call the next writer's `Sum` method (in case there is one), returning its result to
// the calling context.
type Interface interface {
io.Writer
Sum() ([]byte, error)
}
// newShortEncryptionPipelineFunc returns a constructor function for an ephemeral hashing pipeline
// needed by the hashTrieWriter.
func newShortEncryptionPipelineFunc(ctx context.Context, s storage.Storer, mode storage.ModePut) func() chainWriter {
return func() chainWriter {
lsw := newStoreWriter(ctx, s, mode, nil)
b := newBmtWriter(128, lsw)
return newEncryptionWriter(b)
}
// PipeWriteArgs are passed between different ChainWriters.
type PipeWriteArgs struct {
Ref []byte // reference, generated by bmt
Key []byte // encryption key
Span []byte // always unecrypted span uint64
Data []byte // data includes the span too, but it may be encrypted when the pipeline is encrypted
}
// FeedPipeline feeds the pipeline with the given reader until EOF is reached.
// It returns the cryptographic root hash of the content.
func FeedPipeline(ctx context.Context, pipeline Interface, r io.Reader, dataLength int64) (addr swarm.Address, err error) {
var total int64
data := make([]byte, swarm.ChunkSize)
var eof bool
for !eof {
c, err := r.Read(data)
total += int64(c)
if err != nil {
if err == io.EOF {
if total < dataLength {
return swarm.ZeroAddress, fmt.Errorf("pipline short write: read %d out of %d bytes", total+int64(c), dataLength)
}
eof = true
if c > 0 {
cc, err := pipeline.Write(data[:c])
if err != nil {
return swarm.ZeroAddress, err
}
if cc < c {
return swarm.ZeroAddress, fmt.Errorf("pipeline short write: %d mismatches %d", cc, c)
}
}
continue
} else {
return swarm.ZeroAddress, err
}
}
cc, err := pipeline.Write(data[:c])
if err != nil {
return swarm.ZeroAddress, err
}
if cc < c {
return swarm.ZeroAddress, fmt.Errorf("pipeline short write: %d mismatches %d", cc, c)
}
select {
case <-ctx.Done():
return swarm.ZeroAddress, ctx.Err()
default:
}
}
select {
case <-ctx.Done():
return swarm.ZeroAddress, ctx.Err()
default:
}
sum, err := pipeline.Sum()
if err != nil {
return swarm.ZeroAddress, err
}
newAddress := swarm.NewAddress(sum)
return newAddress, nil
}
type PipelineFunc func() ChainWriter
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package store
var ErrInvalidData = errInvalidData
......@@ -2,30 +2,38 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
package store
import (
"context"
"errors"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/sctx"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
)
var errInvalidData = errors.New("store: invalid data")
type storeWriter struct {
l storage.Putter
mode storage.ModePut
ctx context.Context
next chainWriter
next pipeline.ChainWriter
}
// newStoreWriter returns a storeWriter. It just writes the given data
// NewStoreWriter returns a storeWriter. It just writes the given data
// to a given storage.Storer.
func newStoreWriter(ctx context.Context, l storage.Putter, mode storage.ModePut, next chainWriter) chainWriter {
func NewStoreWriter(ctx context.Context, l storage.Putter, mode storage.ModePut, next pipeline.ChainWriter) pipeline.ChainWriter {
return &storeWriter{ctx: ctx, l: l, mode: mode, next: next}
}
func (w *storeWriter) chainWrite(p *pipeWriteArgs) error {
func (w *storeWriter) ChainWrite(p *pipeline.PipeWriteArgs) error {
if p.Ref == nil || p.Data == nil {
return errInvalidData
}
tag := sctx.GetTag(w.ctx)
var c swarm.Chunk
if tag != nil {
......@@ -33,9 +41,9 @@ func (w *storeWriter) chainWrite(p *pipeWriteArgs) error {
if err != nil {
return err
}
c = swarm.NewChunk(swarm.NewAddress(p.ref), p.data).WithTagID(tag.Uid)
c = swarm.NewChunk(swarm.NewAddress(p.Ref), p.Data).WithTagID(tag.Uid)
} else {
c = swarm.NewChunk(swarm.NewAddress(p.ref), p.data)
c = swarm.NewChunk(swarm.NewAddress(p.Ref), p.Data)
}
seen, err := w.l.Put(w.ctx, w.mode, c)
......@@ -58,10 +66,10 @@ func (w *storeWriter) chainWrite(p *pipeWriteArgs) error {
return nil
}
return w.next.chainWrite(p)
return w.next.ChainWrite(p)
}
func (w *storeWriter) sum() ([]byte, error) {
return w.next.sum()
func (w *storeWriter) Sum() ([]byte, error) {
return w.next.Sum()
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package store_test
import (
"bytes"
"context"
"errors"
"testing"
"github.com/ethersphere/bee/pkg/file/pipeline"
mock "github.com/ethersphere/bee/pkg/file/pipeline/mock"
"github.com/ethersphere/bee/pkg/file/pipeline/store"
"github.com/ethersphere/bee/pkg/storage"
storer "github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
// TestStoreWriter tests that store writer stores the provided data and calls the next chain writer.
func TestStoreWriter(t *testing.T) {
mockStore := storer.NewStorer()
mockChainWriter := mock.NewChainWriter()
ctx := context.Background()
writer := store.NewStoreWriter(ctx, mockStore, storage.ModePutUpload, mockChainWriter)
for _, tc := range []struct {
name string
ref []byte
data []byte
expErr error
}{
{
name: "no data",
expErr: store.ErrInvalidData,
},
{
name: "some data",
ref: []byte{0xaa, 0xbb, 0xcc},
data: []byte("hello world"),
},
{},
} {
args := pipeline.PipeWriteArgs{Ref: tc.ref, Data: tc.data}
err := writer.ChainWrite(&args)
if err != nil && tc.expErr != nil && errors.Is(err, tc.expErr) {
return
}
if err != nil {
t.Fatal(err)
}
d, err := mockStore.Get(ctx, storage.ModeGetRequest, swarm.NewAddress(tc.ref))
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(tc.data, d.Data()) {
t.Fatal("data mismatch")
}
if calls := mockChainWriter.ChainWriteCalls(); calls != 1 {
t.Errorf("wanted 1 ChainWrite call, got %d", calls)
}
}
}
// TestSum tests that calling Sum on the store writer results in Sum on the next writer in the chain.
func TestSum(t *testing.T) {
mockChainWriter := mock.NewChainWriter()
ctx := context.Background()
writer := store.NewStoreWriter(ctx, nil, storage.ModePutUpload, mockChainWriter)
_, err := writer.Sum()
if err != nil {
t.Fatal(err)
}
if calls := mockChainWriter.SumCalls(); calls != 1 {
t.Fatalf("wanted 1 Sum call but got %d", calls)
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
import "io"
// chainWriter is a writer in a pipeline.
// It is up to the implementer to decide whether a writer
// calls the next writer or not. Implementers should
// call the Sum method of the subsequent writer in case there
// exists one.
type chainWriter interface {
chainWrite(*pipeWriteArgs) error
sum() ([]byte, error)
}
// Interface exposes an `io.Writer` and `Sum` method, for components to use as a black box.
// Within a pipeline, writers are chainable. It is up for the implementer to decide whether
// a writer calls the next writer. Implementers should always implement the `Sum` method
// and call the next writer's `Sum` method (in case there is one), returning its result to
// the calling context.
type Interface interface {
io.Writer
Sum() ([]byte, error)
}
......@@ -14,7 +14,7 @@ import (
"testing"
"github.com/ethersphere/bee/pkg/encryption/store"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/seekjoiner"
joiner "github.com/ethersphere/bee/pkg/file/seekjoiner"
filetest "github.com/ethersphere/bee/pkg/file/testing"
......@@ -191,9 +191,9 @@ func TestEncryptDecrypt(t *testing.T) {
t.Fatal(err)
}
ctx := context.Background()
pipe := pipeline.NewPipelineBuilder(ctx, store, storage.ModePutUpload, true)
pipe := builder.NewPipelineBuilder(ctx, store, storage.ModePutUpload, true)
testDataReader := bytes.NewReader(testData)
resultAddress, err := pipeline.FeedPipeline(ctx, pipe, testDataReader, int64(len(testData)))
resultAddress, err := builder.FeedPipeline(ctx, pipe, testDataReader, int64(len(testData)))
if err != nil {
t.Fatal(err)
}
......
......@@ -11,7 +11,7 @@ import (
"fmt"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/seekjoiner"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -165,8 +165,8 @@ func (ls *mantarayLoadSaver) Load(ref []byte) ([]byte, error) {
func (ls *mantarayLoadSaver) Save(data []byte) ([]byte, error) {
ctx := ls.ctx
pipe := pipeline.NewPipelineBuilder(ctx, ls.storer, ls.modePut, ls.encrypted)
address, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(data), int64(len(data)))
pipe := builder.NewPipelineBuilder(ctx, ls.storer, ls.modePut, ls.encrypted)
address, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data), int64(len(data)))
if err != nil {
return swarm.ZeroAddress.Bytes(), err
......
......@@ -11,7 +11,7 @@ import (
"fmt"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/seekjoiner"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -106,8 +106,8 @@ func (m *simpleManifest) Store(ctx context.Context, mode storage.ModePut) (swarm
return swarm.ZeroAddress, fmt.Errorf("manifest marshal error: %w", err)
}
pipe := pipeline.NewPipelineBuilder(ctx, m.storer, mode, m.encrypted)
address, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(data), int64(len(data)))
pipe := builder.NewPipelineBuilder(ctx, m.storer, mode, m.encrypted)
address, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data), int64(len(data)))
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("manifest save error: %w", err)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment