Commit e7c8c691 authored by acud's avatar acud Committed by GitHub

seekjoiner: add pluggable decryption (#582)

* seekjoiner: add pluggable decryption
parent 70b7a7e4
......@@ -17,7 +17,7 @@ import (
cmdfile "github.com/ethersphere/bee/cmd/internal/file"
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/seekjoiner"
"github.com/ethersphere/bee/pkg/file/splitter"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/storage"
......@@ -59,8 +59,8 @@ func getEntry(cmd *cobra.Command, args []string) (err error) {
buf := bytes.NewBuffer(nil)
writeCloser := cmdfile.NopWriteCloser(buf)
limitBuf := cmdfile.NewLimitWriteCloser(writeCloser, limitMetadataLength)
j := joiner.NewSimpleJoiner(store)
_, err = file.JoinReadAll(cmd.Context(), j, addr, limitBuf, false)
j := seekjoiner.NewSimpleJoiner(store)
_, err = file.JoinReadAll(cmd.Context(), j, addr, limitBuf)
if err != nil {
return err
}
......@@ -71,7 +71,7 @@ func getEntry(cmd *cobra.Command, args []string) (err error) {
}
buf = bytes.NewBuffer(nil)
_, err = file.JoinReadAll(cmd.Context(), j, e.Metadata(), buf, false)
_, err = file.JoinReadAll(cmd.Context(), j, e.Metadata(), buf)
if err != nil {
return err
}
......@@ -117,7 +117,7 @@ func getEntry(cmd *cobra.Command, args []string) (err error) {
return err
}
defer outFile.Close()
_, err = file.JoinReadAll(cmd.Context(), j, e.Reference(), outFile, false)
_, err = file.JoinReadAll(cmd.Context(), j, e.Reference(), outFile)
return err
}
......
......@@ -11,7 +11,7 @@ import (
cmdfile "github.com/ethersphere/bee/cmd/internal/file"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/seekjoiner"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -82,8 +82,8 @@ func Join(cmd *cobra.Command, args []string) (err error) {
}
// create the join and get its data reader
j := joiner.NewSimpleJoiner(store)
_, err = file.JoinReadAll(cmd.Context(), j, addr, outFile, false)
j := seekjoiner.NewSimpleJoiner(store)
_, err = file.JoinReadAll(cmd.Context(), j, addr, outFile)
return err
}
......
......@@ -72,7 +72,7 @@ func TestBytes(t *testing.T) {
t.Run("not found", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodGet, resource+"/abcd", http.StatusNotFound,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: "not found",
Message: "Not Found",
Code: http.StatusNotFound,
}),
)
......
......@@ -14,9 +14,8 @@ import (
"github.com/gorilla/mux"
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/seekjoiner"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/manifest"
"github.com/ethersphere/bee/pkg/sctx"
......@@ -39,12 +38,13 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
return
}
toDecrypt := len(address.Bytes()) == (swarm.HashSize + encryption.KeyLength)
// this is a hack and is needed because encryption is coupled into manifests
toDecrypt := len(address.Bytes()) == 64
// read manifest entry
j := joiner.NewSimpleJoiner(s.Storer)
j := seekjoiner.NewSimpleJoiner(s.Storer)
buf := bytes.NewBuffer(nil)
_, err = file.JoinReadAll(ctx, j, address, buf, toDecrypt)
_, err = file.JoinReadAll(ctx, j, address, buf)
if err != nil {
s.Logger.Debugf("bzz download: read entry %s: %v", address, err)
s.Logger.Errorf("bzz download: read entry %s", address)
......@@ -62,7 +62,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
// read metadata
buf = bytes.NewBuffer(nil)
_, err = file.JoinReadAll(ctx, j, e.Metadata(), buf, toDecrypt)
_, err = file.JoinReadAll(ctx, j, e.Metadata(), buf)
if err != nil {
s.Logger.Debugf("bzz download: read metadata %s: %v", address, err)
s.Logger.Errorf("bzz download: read metadata %s", address)
......@@ -110,7 +110,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
// read file entry
buf = bytes.NewBuffer(nil)
_, err = file.JoinReadAll(ctx, j, manifestEntryAddress, buf, toDecrypt)
_, err = file.JoinReadAll(ctx, j, manifestEntryAddress, buf)
if err != nil {
s.Logger.Debugf("bzz download: read file entry %s: %v", address, err)
s.Logger.Errorf("bzz download: read file entry %s", address)
......@@ -128,7 +128,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
// read file metadata
buf = bytes.NewBuffer(nil)
_, err = file.JoinReadAll(ctx, j, fe.Metadata(), buf, toDecrypt)
_, err = file.JoinReadAll(ctx, j, fe.Metadata(), buf)
if err != nil {
s.Logger.Debugf("bzz download: read file metadata %s: %v", address, err)
s.Logger.Errorf("bzz download: read file metadata %s", address)
......
......@@ -17,7 +17,7 @@ import (
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/seekjoiner"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
......@@ -170,10 +170,10 @@ func TestDirs(t *testing.T) {
}
// read manifest metadata
j := joiner.NewSimpleJoiner(storer)
j := seekjoiner.NewSimpleJoiner(storer)
buf := bytes.NewBuffer(nil)
_, err = file.JoinReadAll(context.Background(), j, resp.Reference, buf, false)
_, err = file.JoinReadAll(context.Background(), j, resp.Reference, buf)
if err != nil {
t.Fatal(err)
}
......
......@@ -20,9 +20,7 @@ import (
"time"
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/seekjoiner"
"github.com/ethersphere/bee/pkg/jsonhttp"
......@@ -232,15 +230,13 @@ func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) {
return
}
toDecrypt := len(address.Bytes()) == (swarm.HashSize + encryption.KeyLength)
targets := r.URL.Query().Get("targets")
sctx.SetTargets(r.Context(), targets)
// read entry.
j := joiner.NewSimpleJoiner(s.Storer)
j := seekjoiner.NewSimpleJoiner(s.Storer)
buf := bytes.NewBuffer(nil)
_, err = file.JoinReadAll(r.Context(), j, address, buf, toDecrypt)
_, err = file.JoinReadAll(r.Context(), j, address, buf)
if err != nil {
s.Logger.Debugf("file download: read entry %s: %v", addr, err)
s.Logger.Errorf("file download: read entry %s", addr)
......@@ -268,7 +264,7 @@ func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) {
// Read metadata.
buf = bytes.NewBuffer(nil)
_, err = file.JoinReadAll(r.Context(), j, e.Metadata(), buf, toDecrypt)
_, err = file.JoinReadAll(r.Context(), j, e.Metadata(), buf)
if err != nil {
s.Logger.Debugf("file download: read metadata %s: %v", addr, err)
s.Logger.Errorf("file download: read metadata %s", addr)
......@@ -308,7 +304,7 @@ func (s *server) downloadHandler(w http.ResponseWriter, r *http.Request, referen
}
s.Logger.Debugf("api download: invalid root chunk %s: %v", reference, err)
s.Logger.Error("api download: invalid root chunk")
jsonhttp.BadRequest(w, "invalid root chunk")
jsonhttp.NotFound(w, nil)
return
}
......
......@@ -60,7 +60,6 @@ func TestFiles(t *testing.T) {
})
t.Run("encrypt-decrypt", func(t *testing.T) {
t.Skip("reenable after crypto refactor")
fileName := "my-pictures.jpeg"
var resp api.FileUploadResponse
......
......@@ -8,13 +8,14 @@ import (
"errors"
"github.com/ethersphere/bee/pkg/collection"
"github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/swarm"
)
var (
_ = collection.Entry(&Entry{})
serializedDataSize = swarm.SectionSize * 2
encryptedSerializedDataSize = swarm.EncryptedReferenceSize * 2
encryptedSerializedDataSize = encryption.ReferenceSize * 2
)
// Entry provides addition of metadata to a data reference.
......
......@@ -23,11 +23,14 @@ import (
"hash"
)
const KeyLength = 32
const (
KeyLength = 32
ReferenceSize = 64
)
type Key []byte
type Encryptor interface {
type Interface interface {
Encrypt(data []byte) ([]byte, error)
Decrypt(data []byte) ([]byte, error)
Reset()
......@@ -43,7 +46,7 @@ type Encryption struct {
}
// New constructs a new encryptor/decryptor
func New(key Key, padding int, initCtr uint32, hashFunc func() hash.Hash) *Encryption {
func New(key Key, padding int, initCtr uint32, hashFunc func() hash.Hash) Interface {
return &Encryption{
key: key,
keyLen: len(key),
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package store
import (
"context"
"encoding/binary"
"github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"golang.org/x/crypto/sha3"
)
type decryptingStore struct {
storage.Getter
}
func New(s storage.Getter) storage.Getter {
return &decryptingStore{s}
}
func (s *decryptingStore) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (ch swarm.Chunk, err error) {
switch l := len(addr.Bytes()); l {
case swarm.HashSize:
// normal, unencrypted content
return s.Getter.Get(ctx, mode, addr)
case encryption.ReferenceSize:
// encrypted reference
ref := addr.Bytes()
address := swarm.NewAddress(ref[:swarm.HashSize])
ch, err := s.Getter.Get(ctx, mode, address)
if err != nil {
return nil, err
}
d, err := decryptChunkData(ch.Data(), ref[swarm.HashSize:])
if err != nil {
return nil, err
}
return swarm.NewChunk(address, d), nil
default:
return nil, storage.ErrReferenceLength
}
}
func decryptChunkData(chunkData []byte, encryptionKey encryption.Key) ([]byte, error) {
decryptedSpan, decryptedData, err := decrypt(chunkData, encryptionKey)
if err != nil {
return nil, err
}
// removing extra bytes which were just added for padding
length := binary.LittleEndian.Uint64(decryptedSpan)
refSize := int64(swarm.HashSize + encryption.KeyLength)
for length > swarm.ChunkSize {
length = length + (swarm.ChunkSize - 1)
length = length / swarm.ChunkSize
length *= uint64(refSize)
}
c := make([]byte, length+8)
copy(c[:8], decryptedSpan)
copy(c[8:], decryptedData[:length])
return c, nil
}
func decrypt(chunkData []byte, key encryption.Key) ([]byte, []byte, error) {
decryptedSpan, err := newSpanEncryption(key).Decrypt(chunkData[:swarm.SpanSize])
if err != nil {
return nil, nil, err
}
decryptedData, err := newDataEncryption(key).Decrypt(chunkData[swarm.SpanSize:])
if err != nil {
return nil, nil, err
}
return decryptedSpan, decryptedData, nil
}
func newSpanEncryption(key encryption.Key) encryption.Interface {
refSize := int64(swarm.HashSize + encryption.KeyLength)
return encryption.New(key, 0, uint32(swarm.ChunkSize/refSize), sha3.NewLegacyKeccak256)
}
func newDataEncryption(key encryption.Key) encryption.Interface {
return encryption.New(key, int(swarm.ChunkSize), 0, sha3.NewLegacyKeccak256)
}
......@@ -14,16 +14,6 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
)
// Joiner returns file data referenced by the given Swarm Address to the given io.Reader.
//
// The call returns when the chunk for the given Swarm Address is found,
// returning the length of the data which will be returned.
// The called can then read the data on the io.Reader that was provided.
type Joiner interface {
Join(ctx context.Context, address swarm.Address, toDecrypt bool) (dataOut io.ReadCloser, dataLength int64, err error)
Size(ctx context.Context, address swarm.Address) (dataLength int64, err error)
}
// JoinSeeker provides a Joiner that can seek.
type JoinSeeker interface {
Join(ctx context.Context, address swarm.Address) (dataOut io.ReadSeeker, dataLength int64, err error)
......@@ -39,9 +29,9 @@ type Splitter interface {
Split(ctx context.Context, dataIn io.ReadCloser, dataLength int64, toEncrypt bool) (addr swarm.Address, err error)
}
// JoinReadAll reads all output from the provided joiner.
func JoinReadAll(ctx context.Context, j Joiner, addr swarm.Address, outFile io.Writer, toDecrypt bool) (int64, error) {
r, l, err := j.Join(ctx, addr, toDecrypt)
// JoinReadAll reads all output from the provided SeekJoiner.
func JoinReadAll(ctx context.Context, j JoinSeeker, addr swarm.Address, outFile io.Writer) (int64, error) {
r, l, err := j.Join(ctx, addr)
if err != nil {
return 0, err
}
......
......@@ -8,13 +8,12 @@ import (
"bytes"
"context"
"io"
"io/ioutil"
"strconv"
"strings"
"testing"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/seekjoiner"
"github.com/ethersphere/bee/pkg/file/splitter"
test "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/storage"
......@@ -46,7 +45,7 @@ func testSplitThenJoin(t *testing.T) {
dataIdx, _ = strconv.ParseInt(paramstring[1], 10, 0)
store = mock.NewStorer()
s = splitter.NewSimpleSplitter(store, storage.ModePutUpload)
j = joiner.NewSimpleJoiner(store)
j = seekjoiner.NewSimpleJoiner(store)
data, _ = test.GetVector(t, int(dataIdx))
)
......@@ -60,7 +59,7 @@ func testSplitThenJoin(t *testing.T) {
}
// then join
r, l, err := j.Join(ctx, resultAddress, false)
r, l, err := j.Join(ctx, resultAddress)
if err != nil {
t.Fatal(err)
}
......@@ -87,47 +86,3 @@ func testSplitThenJoin(t *testing.T) {
t.Fatalf("data mismatch %d", len(data))
}
}
// TestJoinReadAll verifies that data in excess of a single chunk is returned
// in its entirety.
func TestJoinReadAll(t *testing.T) {
var dataLength int64 = swarm.ChunkSize + 2
j := newMockJoiner(dataLength)
buf := bytes.NewBuffer(nil)
c, err := file.JoinReadAll(context.Background(), j, swarm.ZeroAddress, buf, false)
if err != nil {
t.Fatal(err)
}
if dataLength != c {
t.Fatalf("expected readall return length %d, got %d", dataLength, c)
}
if dataLength != int64(len(buf.Bytes())) {
t.Fatalf("expected length %d, got %d", dataLength, len(buf.Bytes()))
}
}
// mockJoiner is an implementation of file,Joiner that short-circuits that returns
// a mock byte vector of the length given at initialization.
type mockJoiner struct {
l int64
}
// Join implements file.Joiner.
func (j *mockJoiner) Join(ctx context.Context, address swarm.Address, toDecrypt bool) (dataOut io.ReadCloser, dataLength int64, err error) {
data := make([]byte, j.l)
buf := bytes.NewBuffer(data)
readCloser := ioutil.NopCloser(buf)
return readCloser, j.l, nil
}
func (j *mockJoiner) Size(ctx context.Context, address swarm.Address) (dataSize int64, err error) {
return j.l, nil
}
// newMockJoiner creates a new mockJoiner.
func newMockJoiner(l int64) file.Joiner {
return &mockJoiner{
l: l,
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package internal
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"io/ioutil"
"sync"
"github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"golang.org/x/crypto/sha3"
)
// SimpleJoinerJob encapsulates a single joiner operation, providing the consumer
// with blockwise reads of data represented by a content addressed chunk tree.
//
// Every chunk has a span length, which is a 64-bit integer in little-endian encoding
// stored as a prefix in the chunk itself. This represents the length of the data
// that reference represents.
//
// If a chunk's span length is greater than swarm.ChunkSize, the chunk will be treated
// as an intermediate chunk, meaning the contents of the chunk are handled as references
// to other chunks which in turn are retrieved.
//
// Otherwise it passes the data chunk to the io.Reader and blocks until the consumer reads
// the chunk.
//
// The process is repeated until the readCount reaches the announced spanLength of the chunk.
type SimpleJoinerJob struct {
ctx context.Context
getter storage.Getter
spanLength int64 // the total length of data represented by the root chunk the job was initialized with.
readCount int64 // running count of chunks read by the io.Reader consumer.
cursors [9]int // per-level read cursor of data.
data [9][]byte // data of currently loaded chunk.
dataC chan []byte // channel to pass data chunks to the io.Reader method.
doneC chan struct{} // channel to signal termination of join loop
closeDoneOnce sync.Once // make sure done channel is closed only once
err error // read by the main thread to capture error state of the job
logger logging.Logger
toDecrypt bool // to decrypt the chunks or not
}
// NewSimpleJoinerJob creates a new simpleJoinerJob.
func NewSimpleJoinerJob(ctx context.Context, getter storage.Getter, rootChunk swarm.Chunk, toDecrypt bool) *SimpleJoinerJob {
spanLength := binary.LittleEndian.Uint64(rootChunk.Data()[:8])
levelCount := file.Levels(int64(spanLength), swarm.SectionSize, swarm.Branches)
j := &SimpleJoinerJob{
ctx: ctx,
getter: getter,
spanLength: int64(spanLength),
dataC: make(chan []byte),
doneC: make(chan struct{}),
logger: logging.New(ioutil.Discard, 0),
toDecrypt: toDecrypt,
}
// startLevelIndex is the root chunk level
// data level has index 0
startLevelIndex := levelCount - 1
j.data[startLevelIndex] = rootChunk.Data()[8:]
// retrieval must be asynchronous to the io.Reader()
go func() {
err := j.start(startLevelIndex)
if err != nil {
// this will only already be closed if all the chunk data has been fully read
// in this case the error will always be nil and this will not be executed
if err != io.EOF {
j.logger.Errorf("simple joiner chunk join job fail: %v", err)
}
}
j.err = err
close(j.dataC)
j.closeDone()
}()
return j
}
// start processes all chunk references of the root chunk that already has been retrieved.
func (j *SimpleJoinerJob) start(level int) error {
// consume the reference at the current cursor position of the chunk level data
// and start recursive retrieval down to the underlying data chunks
for j.cursors[level] < len(j.data[level]) {
err := j.nextReference(level)
if err != nil {
return err
}
}
return nil
}
// nextReference gets the next chunk reference at the cursor of the chunk currently loaded
// for the specified level.
func (j *SimpleJoinerJob) nextReference(level int) error {
data := j.data[level]
cursor := j.cursors[level]
var encryptionKey encryption.Key
chunkAddress := swarm.NewAddress(data[cursor : cursor+swarm.SectionSize])
if j.toDecrypt {
encryptionKey = make([]byte, encryption.KeyLength)
copy(encryptionKey, data[cursor+swarm.SectionSize:cursor+swarm.SectionSize+encryption.KeyLength])
}
err := j.nextChunk(level-1, chunkAddress, encryptionKey)
if err != nil {
if err == io.EOF {
return err
}
// if the last write is a "dangling chunk" the data chunk will have been moved
// to an intermediate level. In this edge case, the error must be suppressed,
// and the cursor manually to data length boundary to terminate the loop in
// the calling frame.
if j.readCount+int64(len(data)) == j.spanLength {
j.cursors[level] = len(j.data[level])
err = j.sendChunkToReader(data)
return err
}
return fmt.Errorf("error in join for chunk %v: %v", chunkAddress, err)
}
// move the cursor to the next reference
j.cursors[level] += swarm.SectionSize
if j.toDecrypt {
j.cursors[level] += encryption.KeyLength
}
return nil
}
// nextChunk retrieves data chunks by resolving references in intermediate chunks.
// The method will be called recursively via the nextReference method when
// the current chunk is an intermediate chunk.
// When a data chunk is found it is passed on the dataC channel to be consumed by the
// io.Reader consumer.
func (j *SimpleJoinerJob) nextChunk(level int, address swarm.Address, key encryption.Key) error {
// attempt to retrieve the chunk
ch, err := j.getter.Get(j.ctx, storage.ModeGetRequest, address)
if err != nil {
return err
}
var chunkData []byte
if j.toDecrypt {
decryptedData, err := DecryptChunkData(ch.Data(), key)
if err != nil {
return fmt.Errorf("error decrypting chunk %v: %v", address, err)
}
chunkData = decryptedData[8:]
} else {
chunkData = ch.Data()[8:]
}
j.cursors[level] = 0
j.data[level] = chunkData
// any level higher than 0 means the chunk contains references
// which must be recursively processed
if level > 0 {
for j.cursors[level] < len(j.data[level]) {
if len(j.data[level]) == j.cursors[level] {
j.data[level] = chunkData
j.cursors[level] = 0
}
err = j.nextReference(level)
if err != nil {
return err
}
}
} else {
// read data and pass to reader only if session is still active
// * context cancelled when client has disappeared, timeout etc
// * doneC receive when gracefully terminated through Close
data := chunkData
err = j.sendChunkToReader(data)
}
return err
}
// sendChunkToReader handles exceptions on the part of consumer in
// the reading of data
func (j *SimpleJoinerJob) sendChunkToReader(data []byte) error {
select {
case <-j.ctx.Done():
j.readCount = j.spanLength
return j.ctx.Err()
case <-j.doneC:
return file.NewAbortError(errors.New("chunk read aborted"))
case j.dataC <- data:
j.readCount += int64(len(data))
// when we reach the end of data to be read
// bubble io.EOF error to the gofunc in the
// constructor that called start()
if j.readCount == j.spanLength {
return io.EOF
}
}
return nil
}
// Read is called by the consumer to retrieve the joined data.
// It must be called with a buffer equal to the maximum chunk size.
func (j *SimpleJoinerJob) Read(b []byte) (n int, err error) {
if cap(b) != swarm.ChunkSize {
return 0, fmt.Errorf("Read must be called with a buffer of %d bytes", swarm.ChunkSize)
}
data, ok := <-j.dataC
if !ok {
<-j.doneC
return 0, j.err
}
copy(b, data)
return len(data), nil
}
// Close is called by the consumer to gracefully abort the data retrieval.
func (j *SimpleJoinerJob) Close() error {
j.closeDone()
return nil
}
// closeDone, for purpose readability, wraps the sync.Once execution of closing the doneC channel
func (j *SimpleJoinerJob) closeDone() {
j.closeDoneOnce.Do(func() {
close(j.doneC)
})
}
func DecryptChunkData(chunkData []byte, encryptionKey encryption.Key) ([]byte, error) {
if len(chunkData) < 8 {
return nil, fmt.Errorf("invalid ChunkData, min length 8 got %v", len(chunkData))
}
decryptedSpan, decryptedData, err := decrypt(chunkData, encryptionKey)
if err != nil {
return nil, err
}
// removing extra bytes which were just added for padding
length := binary.LittleEndian.Uint64(decryptedSpan)
refSize := int64(swarm.HashSize + encryption.KeyLength)
for length > swarm.ChunkSize {
length = length + (swarm.ChunkSize - 1)
length = length / swarm.ChunkSize
length *= uint64(refSize)
}
c := make([]byte, length+8)
copy(c[:8], decryptedSpan)
copy(c[8:], decryptedData[:length])
return c, nil
}
func decrypt(chunkData []byte, key encryption.Key) ([]byte, []byte, error) {
encryptedSpan, err := newSpanEncryption(key).Encrypt(chunkData[:8])
if err != nil {
return nil, nil, err
}
encryptedData, err := newDataEncryption(key).Encrypt(chunkData[8:])
if err != nil {
return nil, nil, err
}
return encryptedSpan, encryptedData, nil
}
func newSpanEncryption(key encryption.Key) *encryption.Encryption {
refSize := int64(swarm.HashSize + encryption.KeyLength)
return encryption.New(key, 0, uint32(swarm.ChunkSize/refSize), sha3.NewLegacyKeccak256)
}
func newDataEncryption(key encryption.Key) *encryption.Encryption {
return encryption.New(key, int(swarm.ChunkSize), 0, sha3.NewLegacyKeccak256)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package internal_test
import (
"bytes"
"context"
"io"
"testing"
"time"
"github.com/ethersphere/bee/pkg/file/joiner/internal"
filetest "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
// TestSimpleJoinerJobBlocksize checks that only Read() calls with exact
// chunk size buffer capacity is allowed.
func TestSimpleJoinerJobBlocksize(t *testing.T) {
store := mock.NewStorer()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// create root chunk with 2 references and the referenced data chunks
rootChunk := filetest.GenerateTestRandomFileChunk(swarm.ZeroAddress, swarm.ChunkSize*2, swarm.SectionSize*2)
_, err := store.Put(ctx, storage.ModePutUpload, rootChunk)
if err != nil {
t.Fatal(err)
}
firstAddress := swarm.NewAddress(rootChunk.Data()[8 : swarm.SectionSize+8])
firstChunk := filetest.GenerateTestRandomFileChunk(firstAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, firstChunk)
if err != nil {
t.Fatal(err)
}
secondAddress := swarm.NewAddress(rootChunk.Data()[swarm.SectionSize+8:])
secondChunk := filetest.GenerateTestRandomFileChunk(secondAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, secondChunk)
if err != nil {
t.Fatal(err)
}
// this buffer is too small
j := internal.NewSimpleJoinerJob(ctx, store, rootChunk, false)
b := make([]byte, swarm.SectionSize)
_, err = j.Read(b)
if err == nil {
t.Fatal("expected error on Read with too small buffer")
}
// this buffer is too big
b = make([]byte, swarm.ChunkSize+swarm.SectionSize)
_, err = j.Read(b)
if err == nil {
t.Fatal("expected error on Read with too big buffer")
}
// this buffer is juuuuuust right
b = make([]byte, swarm.ChunkSize)
_, err = j.Read(b)
if err != nil {
t.Fatal(err)
}
}
// TestSimpleJoinerJobOneLevel tests the retrieval of two data chunks immediately
// below the root chunk level.
func TestSimpleJoinerJobOneLevel(t *testing.T) {
store := mock.NewStorer()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// create root chunk with 2 references and the referenced data chunks
rootChunk := filetest.GenerateTestRandomFileChunk(swarm.ZeroAddress, swarm.ChunkSize*2, swarm.SectionSize*2)
_, err := store.Put(ctx, storage.ModePutUpload, rootChunk)
if err != nil {
t.Fatal(err)
}
firstAddress := swarm.NewAddress(rootChunk.Data()[8 : swarm.SectionSize+8])
firstChunk := filetest.GenerateTestRandomFileChunk(firstAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, firstChunk)
if err != nil {
t.Fatal(err)
}
secondAddress := swarm.NewAddress(rootChunk.Data()[swarm.SectionSize+8:])
secondChunk := filetest.GenerateTestRandomFileChunk(secondAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, secondChunk)
if err != nil {
t.Fatal(err)
}
j := internal.NewSimpleJoinerJob(ctx, store, rootChunk, false)
// verify first chunk content
outBuffer := make([]byte, 4096)
c, err := j.Read(outBuffer)
if err != nil {
t.Fatal(err)
}
if c != 4096 {
t.Fatalf("expected firstchunk read count %d, got %d", 4096, c)
}
if !bytes.Equal(outBuffer, firstChunk.Data()[8:]) {
t.Fatalf("firstchunk data mismatch, expected %x, got %x", outBuffer, firstChunk.Data()[8:])
}
// verify second chunk content
c, err = j.Read(outBuffer)
if err != nil {
t.Fatal(err)
}
if c != 4096 {
t.Fatalf("expected secondchunk read count %d, got %d", 4096, c)
}
if !bytes.Equal(outBuffer, secondChunk.Data()[8:]) {
t.Fatalf("secondchunk data mismatch, expected %x, got %x", outBuffer, secondChunk.Data()[8:])
}
// verify EOF is returned also after first time it is returned
_, err = j.Read(outBuffer)
if err != io.EOF {
t.Fatal("expected io.EOF")
}
_, err = j.Read(outBuffer)
if err != io.EOF {
t.Fatal("expected io.EOF")
}
}
// TestSimpleJoinerJobTwoLevelsAcrossChunk tests the retrieval of data chunks below
// first intermediate level across two intermediate chunks.
// Last chunk has sub-chunk length.
func TestSimpleJoinerJobTwoLevelsAcrossChunk(t *testing.T) {
store := mock.NewStorer()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// create root chunk with 2 references and two intermediate chunks with references
rootChunk := filetest.GenerateTestRandomFileChunk(swarm.ZeroAddress, swarm.ChunkSize*swarm.Branches+42, swarm.SectionSize*2)
_, err := store.Put(ctx, storage.ModePutUpload, rootChunk)
if err != nil {
t.Fatal(err)
}
firstAddress := swarm.NewAddress(rootChunk.Data()[8 : swarm.SectionSize+8])
firstChunk := filetest.GenerateTestRandomFileChunk(firstAddress, swarm.ChunkSize*swarm.Branches, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, firstChunk)
if err != nil {
t.Fatal(err)
}
secondAddress := swarm.NewAddress(rootChunk.Data()[swarm.SectionSize+8:])
secondChunk := filetest.GenerateTestRandomFileChunk(secondAddress, 42, swarm.SectionSize)
_, err = store.Put(ctx, storage.ModePutUpload, secondChunk)
if err != nil {
t.Fatal(err)
}
// create 128+1 chunks for all references in the intermediate chunks
cursor := 8
for i := 0; i < swarm.Branches; i++ {
chunkAddressBytes := firstChunk.Data()[cursor : cursor+swarm.SectionSize]
chunkAddress := swarm.NewAddress(chunkAddressBytes)
ch := filetest.GenerateTestRandomFileChunk(chunkAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err := store.Put(ctx, storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
cursor += swarm.SectionSize
}
chunkAddressBytes := secondChunk.Data()[8:]
chunkAddress := swarm.NewAddress(chunkAddressBytes)
ch := filetest.GenerateTestRandomFileChunk(chunkAddress, 42, 42)
_, err = store.Put(ctx, storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
j := internal.NewSimpleJoinerJob(ctx, store, rootChunk, false)
// read back all the chunks and verify
b := make([]byte, swarm.ChunkSize)
for i := 0; i < swarm.Branches; i++ {
c, err := j.Read(b)
if err != nil {
t.Fatal(err)
}
if c != swarm.ChunkSize {
t.Fatalf("chunk %d expected read %d bytes; got %d", i, swarm.ChunkSize, c)
}
}
c, err := j.Read(b)
if err != nil {
t.Fatal(err)
}
if c != 42 {
t.Fatalf("last chunk expected read %d bytes; got %d", 42, c)
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package joiner provides implementations of the file.Joiner interface
package joiner
import (
"context"
"encoding/binary"
"fmt"
"io"
"github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner/internal"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
// simpleJoiner wraps a non-optimized implementation of file.Joiner.
type simpleJoiner struct {
getter storage.Getter
}
// NewSimpleJoiner creates a new simpleJoiner.
func NewSimpleJoiner(getter storage.Getter) file.Joiner {
return &simpleJoiner{
getter: getter,
}
}
func (s *simpleJoiner) Size(ctx context.Context, address swarm.Address) (dataSize int64, err error) {
// Handle size based on whether the root chunk is encrypted or not
toDecrypt := len(address.Bytes()) == swarm.EncryptedReferenceSize
var key encryption.Key
addrBytes := address.Bytes()
if toDecrypt {
addrBytes = address.Bytes()[:swarm.HashSize]
key = address.Bytes()[swarm.HashSize : swarm.HashSize+encryption.KeyLength]
}
address = swarm.NewAddress(addrBytes)
// retrieve the root chunk to read the total data length the be retrieved
rootChunk, err := s.getter.Get(ctx, storage.ModeGetRequest, address)
if err != nil {
return 0, err
}
chunkLength := rootChunk.Data()
if len(chunkLength) < 8 {
return 0, fmt.Errorf("invalid chunk content of %d bytes", chunkLength)
}
chunkData := rootChunk.Data()
if toDecrypt {
originalData, err := internal.DecryptChunkData(rootChunk.Data(), key)
if err != nil {
return 0, err
}
chunkData = originalData
}
dataLength := binary.LittleEndian.Uint64(chunkData[:8])
return int64(dataLength), nil
}
// Join implements the file.Joiner interface.
//
// It uses a non-optimized internal component that only retrieves a data chunk
// after the previous has been read.
func (s *simpleJoiner) Join(ctx context.Context, address swarm.Address, toDecrypt bool) (dataOut io.ReadCloser, dataSize int64, err error) {
var addr []byte
var key encryption.Key
if toDecrypt {
addr = address.Bytes()[:swarm.HashSize]
key = address.Bytes()[swarm.HashSize : swarm.HashSize+encryption.KeyLength]
} else {
addr = address.Bytes()
}
// retrieve the root chunk to read the total data length the be retrieved
rootChunk, err := s.getter.Get(ctx, storage.ModeGetRequest, swarm.NewAddress(addr))
if err != nil {
return nil, 0, err
}
var chunkData []byte
if toDecrypt {
originalData, err := internal.DecryptChunkData(rootChunk.Data(), key)
if err != nil {
return nil, 0, err
}
chunkData = originalData
} else {
chunkData = rootChunk.Data()
}
// if this is a single chunk, short circuit to returning just that chunk
spanLength := binary.LittleEndian.Uint64(chunkData[:8])
chunkToSend := rootChunk
if spanLength <= swarm.ChunkSize {
data := chunkData[8:]
return file.NewSimpleReadCloser(data), int64(spanLength), nil
}
if toDecrypt {
chunkToSend = swarm.NewChunk(swarm.NewAddress(addr), chunkData)
}
r := internal.NewSimpleJoinerJob(ctx, s.getter, chunkToSend, toDecrypt)
return r, int64(spanLength), nil
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package joiner_test
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"testing"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/pipeline"
filetest "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"gitlab.com/nolash/go-mockbytes"
)
// TestJoiner verifies that a newly created joiner returns the data stored
// in the store when the reference is one single chunk.
func TestJoinerSingleChunk(t *testing.T) {
store := mock.NewStorer()
joiner := joiner.NewSimpleJoiner(store)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var err error
_, _, err = joiner.Join(ctx, swarm.ZeroAddress, false)
if err != storage.ErrNotFound {
t.Fatalf("expected ErrNotFound for %x", swarm.ZeroAddress)
}
// create the chunk to
mockAddrHex := fmt.Sprintf("%064s", "2a")
mockAddr := swarm.MustParseHexAddress(mockAddrHex)
mockData := []byte("foo")
mockDataLengthBytes := make([]byte, 8)
mockDataLengthBytes[0] = 0x03
mockChunk := swarm.NewChunk(mockAddr, append(mockDataLengthBytes, mockData...))
_, err = store.Put(ctx, storage.ModePutUpload, mockChunk)
if err != nil {
t.Fatal(err)
}
// read back data and compare
joinReader, l, err := joiner.Join(ctx, mockAddr, false)
if err != nil {
t.Fatal(err)
}
if l != int64(len(mockData)) {
t.Fatalf("expected join data length %d, got %d", len(mockData), l)
}
joinData, err := ioutil.ReadAll(joinReader)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(joinData, mockData) {
t.Fatalf("retrieved data '%x' not like original data '%x'", joinData, mockData)
}
}
// TestJoinerWithReference verifies that a chunk reference is correctly resolved
// and the underlying data is returned.
func TestJoinerWithReference(t *testing.T) {
store := mock.NewStorer()
joiner := joiner.NewSimpleJoiner(store)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// create root chunk and two data chunks referenced in the root chunk
rootChunk := filetest.GenerateTestRandomFileChunk(swarm.ZeroAddress, swarm.ChunkSize*2, swarm.SectionSize*2)
_, err := store.Put(ctx, storage.ModePutUpload, rootChunk)
if err != nil {
t.Fatal(err)
}
firstAddress := swarm.NewAddress(rootChunk.Data()[8 : swarm.SectionSize+8])
firstChunk := filetest.GenerateTestRandomFileChunk(firstAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, firstChunk)
if err != nil {
t.Fatal(err)
}
secondAddress := swarm.NewAddress(rootChunk.Data()[swarm.SectionSize+8:])
secondChunk := filetest.GenerateTestRandomFileChunk(secondAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, secondChunk)
if err != nil {
t.Fatal(err)
}
// read back data and compare
joinReader, l, err := joiner.Join(ctx, rootChunk.Address(), false)
if err != nil {
t.Fatal(err)
}
if l != int64(swarm.ChunkSize*2) {
t.Fatalf("expected join data length %d, got %d", swarm.ChunkSize*2, l)
}
resultBuffer := make([]byte, swarm.ChunkSize)
n, err := joinReader.Read(resultBuffer)
if err != nil {
t.Fatal(err)
}
if n != len(resultBuffer) {
t.Fatalf("expected read count %d, got %d", len(resultBuffer), n)
}
if !bytes.Equal(resultBuffer, firstChunk.Data()[8:]) {
t.Fatalf("expected resultbuffer %v, got %v", resultBuffer, firstChunk.Data()[:len(resultBuffer)])
}
}
func TestEncryptionAndDecryption(t *testing.T) {
var tests = []struct {
chunkLength int
}{
{10},
{100},
{1000},
{4095},
{4096},
{4097},
{15000},
{256 * 1024},
// {256 * 1024 * 2}, // TODO: fix - incorrect join
// {256 * 1024 * 3}, // TODO: fix - deadlock
}
for _, tt := range tests {
t.Run(fmt.Sprintf("Encrypt %d bytes", tt.chunkLength), func(t *testing.T) {
store := mock.NewStorer()
joinner := joiner.NewSimpleJoiner(store)
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
testData, err := g.SequentialBytes(tt.chunkLength)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
pipe := pipeline.NewPipelineBuilder(ctx, store, storage.ModePutUpload, true)
testDataReader := bytes.NewReader(testData)
resultAddress, err := pipeline.FeedPipeline(ctx, pipe, testDataReader, int64(len(testData)))
if err != nil {
t.Fatal(err)
}
reader, l, err := joinner.Join(context.Background(), resultAddress, true)
if err != nil {
t.Fatal(err)
}
if l != int64(len(testData)) {
t.Fatalf("expected join data length %d, got %d", len(testData), l)
}
totalGot := make([]byte, tt.chunkLength)
index := 0
resultBuffer := make([]byte, swarm.ChunkSize)
for index < tt.chunkLength {
n, err := reader.Read(resultBuffer)
if err != nil && err != io.EOF {
t.Fatal(err)
}
copy(totalGot[index:], resultBuffer[:n])
index += n
}
if !bytes.Equal(testData, totalGot) {
t.Fatal("input data and output data does not match")
}
})
}
}
......@@ -47,11 +47,11 @@ func encrypt(chunkData []byte) (encryption.Key, []byte, []byte, error) {
return key, encryptedSpan, encryptedData, nil
}
func newSpanEncryption(key encryption.Key) *encryption.Encryption {
func newSpanEncryption(key encryption.Key) encryption.Interface {
refSize := int64(swarm.HashSize + encryption.KeyLength)
return encryption.New(key, 0, uint32(swarm.ChunkSize/refSize), sha3.NewLegacyKeccak256)
}
func newDataEncryption(key encryption.Key) *encryption.Encryption {
func newDataEncryption(key encryption.Key) encryption.Interface {
return encryption.New(key, int(swarm.ChunkSize), 0, sha3.NewLegacyKeccak256)
}
......@@ -10,42 +10,48 @@ import (
"errors"
"io"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
type SimpleJoinerJob struct {
addr swarm.Address
rootData []byte
spanLength int64
off int64
levels int
type SimpleJoiner struct {
addr swarm.Address
rootData []byte
span int64
off int64
refLength int
ctx context.Context
getter storage.Getter
}
// NewSimpleJoinerJob creates a new simpleJoinerJob.
func NewSimpleJoinerJob(ctx context.Context, getter storage.Getter, rootChunk swarm.Chunk) *SimpleJoinerJob {
// spanLength is the overall size of the entire data layer for this content addressed hash
spanLength := binary.LittleEndian.Uint64(rootChunk.Data()[:8])
levelCount := file.Levels(int64(spanLength), swarm.SectionSize, swarm.Branches)
j := &SimpleJoinerJob{
addr: rootChunk.Address(),
ctx: ctx,
getter: getter,
spanLength: int64(spanLength),
rootData: rootChunk.Data()[8:],
levels: levelCount,
// NewSimpleJoiner creates a new SimpleJoiner.
func NewSimpleJoiner(ctx context.Context, getter storage.Getter, address swarm.Address) (*SimpleJoiner, int64, error) {
// retrieve the root chunk to read the total data length the be retrieved
rootChunk, err := getter.Get(ctx, storage.ModeGetRequest, address)
if err != nil {
return nil, 0, err
}
return j
var chunkData = rootChunk.Data()
span := int64(binary.LittleEndian.Uint64(chunkData[:swarm.SpanSize]))
j := &SimpleJoiner{
addr: rootChunk.Address(),
refLength: len(address.Bytes()),
ctx: ctx,
getter: getter,
span: span,
rootData: chunkData[swarm.SpanSize:],
}
return j, span, nil
}
// Read is called by the consumer to retrieve the joined data.
// It must be called with a buffer equal to the maximum chunk size.
func (j *SimpleJoinerJob) Read(b []byte) (n int, err error) {
func (j *SimpleJoiner) Read(b []byte) (n int, err error) {
read, err := j.ReadAt(b, j.off)
if err != nil && err != io.EOF {
return read, err
......@@ -55,13 +61,13 @@ func (j *SimpleJoinerJob) Read(b []byte) (n int, err error) {
return read, err
}
func (j *SimpleJoinerJob) ReadAt(b []byte, off int64) (read int, err error) {
func (j *SimpleJoiner) ReadAt(b []byte, off int64) (read int, err error) {
// since offset is int64 and swarm spans are uint64 it means we cannot seek beyond int64 max value
return j.readAtOffset(b, j.rootData, 0, j.spanLength, off)
return j.readAtOffset(b, j.rootData, 0, j.span, off)
}
func (j *SimpleJoinerJob) readAtOffset(b, data []byte, cur, subTrieSize, off int64) (read int, err error) {
if off >= j.spanLength {
func (j *SimpleJoiner) readAtOffset(b, data []byte, cur, subTrieSize, off int64) (read int, err error) {
if off >= j.span {
return 0, io.EOF
}
......@@ -79,8 +85,8 @@ func (j *SimpleJoinerJob) readAtOffset(b, data []byte, cur, subTrieSize, off int
return n, nil
}
for cursor := 0; cursor < len(data); cursor += swarm.SectionSize {
address := swarm.NewAddress(data[cursor : cursor+swarm.SectionSize])
for cursor := 0; cursor < len(data); cursor += j.refLength {
address := swarm.NewAddress(data[cursor : cursor+j.refLength])
ch, err := j.getter.Get(j.ctx, storage.ModeGetRequest, address)
if err != nil {
return 0, err
......@@ -104,7 +110,7 @@ func (j *SimpleJoinerJob) readAtOffset(b, data []byte, cur, subTrieSize, off int
var errWhence = errors.New("seek: invalid whence")
var errOffset = errors.New("seek: invalid offset")
func (j *SimpleJoinerJob) Seek(offset int64, whence int) (int64, error) {
func (j *SimpleJoiner) Seek(offset int64, whence int) (int64, error) {
switch whence {
case 0:
offset += 0
......@@ -112,7 +118,7 @@ func (j *SimpleJoinerJob) Seek(offset int64, whence int) (int64, error) {
offset += j.off
case 2:
offset = j.spanLength - offset
offset = j.span - offset
if offset < 0 {
return 0, io.EOF
}
......@@ -123,7 +129,7 @@ func (j *SimpleJoinerJob) Seek(offset int64, whence int) (int64, error) {
if offset < 0 {
return 0, errOffset
}
if offset > j.spanLength {
if offset > j.span {
return 0, io.EOF
}
j.off = offset
......@@ -131,7 +137,7 @@ func (j *SimpleJoinerJob) Seek(offset int64, whence int) (int64, error) {
}
func (j *SimpleJoinerJob) Size() (int64, error) {
func (j *SimpleJoiner) Size() (int64, error) {
if j.rootData == nil {
chunk, err := j.getter.Get(j.ctx, storage.ModeGetRequest, j.addr)
if err != nil {
......
......@@ -79,13 +79,12 @@ func TestSeek(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rootChunk, err := store.Get(ctx, storage.ModeGetLookup, addr)
j, _, err := internal.NewSimpleJoiner(ctx, store, addr)
if err != nil {
t.Fatal(err)
}
j := internal.NewSimpleJoinerJob(ctx, store, rootChunk)
validateRead := func(t *testing.T, name string, i int) {
t.Helper()
......@@ -205,7 +204,10 @@ func TestSimpleJoinerReadAt(t *testing.T) {
t.Fatal(err)
}
j := internal.NewSimpleJoinerJob(ctx, store, rootChunk)
j, _, err := internal.NewSimpleJoiner(ctx, store, rootChunk.Address())
if err != nil {
t.Fatal(err)
}
b := make([]byte, swarm.ChunkSize)
_, err = j.ReadAt(b, swarm.ChunkSize)
......@@ -218,9 +220,9 @@ func TestSimpleJoinerReadAt(t *testing.T) {
}
}
// TestSimpleJoinerJobOneLevel tests the retrieval of two data chunks immediately
// TestSimpleJoinerOneLevel tests the retrieval of two data chunks immediately
// below the root chunk level.
func TestSimpleJoinerJobOneLevel(t *testing.T) {
func TestSimpleJoinerOneLevel(t *testing.T) {
store := mock.NewStorer()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
......@@ -247,7 +249,10 @@ func TestSimpleJoinerJobOneLevel(t *testing.T) {
t.Fatal(err)
}
j := internal.NewSimpleJoinerJob(ctx, store, rootChunk)
j, _, err := internal.NewSimpleJoiner(ctx, store, rootChunk.Address())
if err != nil {
t.Fatal(err)
}
// verify first chunk content
outBuffer := make([]byte, swarm.ChunkSize)
......@@ -286,10 +291,10 @@ func TestSimpleJoinerJobOneLevel(t *testing.T) {
}
}
// TestSimpleJoinerJobTwoLevelsAcrossChunk tests the retrieval of data chunks below
// TestSimpleJoinerTwoLevelsAcrossChunk tests the retrieval of data chunks below
// first intermediate level across two intermediate chunks.
// Last chunk has sub-chunk length.
func TestSimpleJoinerJobTwoLevelsAcrossChunk(t *testing.T) {
func TestSimpleJoinerTwoLevelsAcrossChunk(t *testing.T) {
store := mock.NewStorer()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
......@@ -336,7 +341,10 @@ func TestSimpleJoinerJobTwoLevelsAcrossChunk(t *testing.T) {
t.Fatal(err)
}
j := internal.NewSimpleJoinerJob(ctx, store, rootChunk)
j, _, err := internal.NewSimpleJoiner(ctx, store, rootChunk.Address())
if err != nil {
t.Fatal(err)
}
// read back all the chunks and verify
b := make([]byte, swarm.ChunkSize)
......
......@@ -11,6 +11,7 @@ import (
"fmt"
"io"
"github.com/ethersphere/bee/pkg/encryption/store"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/seekjoiner/internal"
"github.com/ethersphere/bee/pkg/storage"
......@@ -25,7 +26,7 @@ type simpleJoiner struct {
// NewSimpleJoiner creates a new simpleJoiner.
func NewSimpleJoiner(getter storage.Getter) file.JoinSeeker {
return &simpleJoiner{
getter: getter,
getter: store.New(getter),
}
}
......@@ -50,16 +51,5 @@ func (s *simpleJoiner) Size(ctx context.Context, address swarm.Address) (int64,
// It uses a non-optimized internal component that only retrieves a data chunk
// after the previous has been read.
func (s *simpleJoiner) Join(ctx context.Context, address swarm.Address) (dataOut io.ReadSeeker, dataSize int64, err error) {
// retrieve the root chunk to read the total data length the be retrieved
rootChunk, err := s.getter.Get(ctx, storage.ModeGetRequest, address)
if err != nil {
return nil, 0, err
}
var chunkData = rootChunk.Data()
// if this is a single chunk, short circuit to returning just that chunk
spanLength := binary.LittleEndian.Uint64(chunkData[:8])
r := internal.NewSimpleJoinerJob(ctx, s.getter, rootChunk)
return r, int64(spanLength), nil
return internal.NewSimpleJoiner(ctx, s.getter, address)
}
......@@ -7,32 +7,84 @@ package seekjoiner_test
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"testing"
"github.com/ethersphere/bee/pkg/encryption/store"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/seekjoiner"
joiner "github.com/ethersphere/bee/pkg/file/seekjoiner"
filetest "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"gitlab.com/nolash/go-mockbytes"
)
func TestJoiner_ErrReferenceLength(t *testing.T) {
store := mock.NewStorer()
joiner := joiner.NewSimpleJoiner(store)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var err error
_, _, err = joiner.Join(ctx, swarm.ZeroAddress)
if !errors.Is(err, storage.ErrReferenceLength) {
t.Fatalf("expected ErrReferenceLength %x but got %v", swarm.ZeroAddress, err)
}
}
// TestJoiner verifies that a newly created joiner returns the data stored
// in the store when the reference is one single chunk.
func TestJoinerSingleChunk(t *testing.T) {
store := mock.NewStorer()
joiner := joiner.NewSimpleJoiner(store)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var err error
_, _, err = joiner.Join(ctx, swarm.ZeroAddress)
if err != storage.ErrNotFound {
t.Fatalf("expected ErrNotFound for %x but got %v", swarm.ZeroAddress, err)
// create the chunk to
mockAddrHex := fmt.Sprintf("%064s", "2a")
mockAddr := swarm.MustParseHexAddress(mockAddrHex)
mockData := []byte("foo")
mockDataLengthBytes := make([]byte, 8)
mockDataLengthBytes[0] = 0x03
mockChunk := swarm.NewChunk(mockAddr, append(mockDataLengthBytes, mockData...))
_, err := store.Put(ctx, storage.ModePutUpload, mockChunk)
if err != nil {
t.Fatal(err)
}
// read back data and compare
joinReader, l, err := joiner.Join(ctx, mockAddr)
if err != nil {
t.Fatal(err)
}
if l != int64(len(mockData)) {
t.Fatalf("expected join data length %d, got %d", len(mockData), l)
}
joinData, err := ioutil.ReadAll(joinReader)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(joinData, mockData) {
t.Fatalf("retrieved data '%x' not like original data '%x'", joinData, mockData)
}
}
// TestJoinerDecryptingStore_NormalChunk verifies the the mock store that uses
// the decrypting store manages to retrieve a normal chunk which is not encrypted
func TestJoinerDecryptingStore_NormalChunk(t *testing.T) {
st := mock.NewStorer()
store := store.New(st)
joiner := joiner.NewSimpleJoiner(store)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// create the chunk to
mockAddrHex := fmt.Sprintf("%064s", "2a")
......@@ -41,7 +93,7 @@ func TestJoinerSingleChunk(t *testing.T) {
mockDataLengthBytes := make([]byte, 8)
mockDataLengthBytes[0] = 0x03
mockChunk := swarm.NewChunk(mockAddr, append(mockDataLengthBytes, mockData...))
_, err = store.Put(ctx, storage.ModePutUpload, mockChunk)
_, err := st.Put(ctx, storage.ModePutUpload, mockChunk)
if err != nil {
t.Fatal(err)
}
......@@ -114,3 +166,62 @@ func TestJoinerWithReference(t *testing.T) {
t.Fatalf("expected resultbuffer %v, got %v", resultBuffer, firstChunk.Data()[:len(resultBuffer)])
}
}
func TestEncryptDecrypt(t *testing.T) {
var tests = []struct {
chunkLength int
}{
{10},
{100},
{1000},
{4095},
{4096},
{4097},
{1000000},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("Encrypt %d bytes", tt.chunkLength), func(t *testing.T) {
store := mock.NewStorer()
joiner := seekjoiner.NewSimpleJoiner(store)
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
testData, err := g.SequentialBytes(tt.chunkLength)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
pipe := pipeline.NewPipelineBuilder(ctx, store, storage.ModePutUpload, true)
testDataReader := bytes.NewReader(testData)
resultAddress, err := pipeline.FeedPipeline(ctx, pipe, testDataReader, int64(len(testData)))
if err != nil {
t.Fatal(err)
}
reader, l, err := joiner.Join(context.Background(), resultAddress)
if err != nil {
t.Fatal(err)
}
if l != int64(len(testData)) {
t.Fatalf("expected join data length %d, got %d", len(testData), l)
}
totalGot := make([]byte, tt.chunkLength)
index := 0
resultBuffer := make([]byte, swarm.ChunkSize)
for index < tt.chunkLength {
n, err := reader.Read(resultBuffer)
if err != nil && err != io.EOF {
t.Fatal(err)
}
copy(totalGot[index:], resultBuffer[:n])
index += n
}
if !bytes.Equal(testData, totalGot) {
t.Fatal("input data and output data does not match")
}
})
}
}
......@@ -302,11 +302,11 @@ func (s *SimpleSplitterJob) encrypt(chunkData []byte) (encryption.Key, []byte, [
return key, encryptedSpan, encryptedData, nil
}
func (s *SimpleSplitterJob) newSpanEncryption(key encryption.Key) *encryption.Encryption {
func (s *SimpleSplitterJob) newSpanEncryption(key encryption.Key) encryption.Interface {
return encryption.New(key, 0, uint32(swarm.ChunkSize/s.refSize), sha3.NewLegacyKeccak256)
}
func (s *SimpleSplitterJob) newDataEncryption(key encryption.Key) *encryption.Encryption {
func (s *SimpleSplitterJob) newDataEncryption(key encryption.Key) encryption.Interface {
return encryption.New(key, int(swarm.ChunkSize), 0, sha3.NewLegacyKeccak256)
}
......
......@@ -11,8 +11,8 @@ import (
"fmt"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/splitter"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/seekjoiner"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/manifest/mantaray"
......@@ -151,10 +151,10 @@ func newMantaraySaver(
func (ls *mantarayLoadSaver) Load(ref []byte) ([]byte, error) {
ctx := ls.ctx
j := joiner.NewSimpleJoiner(ls.storer)
j := seekjoiner.NewSimpleJoiner(ls.storer)
buf := bytes.NewBuffer(nil)
_, err := file.JoinReadAll(ctx, j, swarm.NewAddress(ref), buf, ls.encrypted)
_, err := file.JoinReadAll(ctx, j, swarm.NewAddress(ref), buf)
if err != nil {
return nil, err
}
......@@ -165,9 +165,9 @@ func (ls *mantarayLoadSaver) Load(ref []byte) ([]byte, error) {
func (ls *mantarayLoadSaver) Save(data []byte) ([]byte, error) {
ctx := ls.ctx
sp := splitter.NewSimpleSplitter(ls.storer, ls.modePut)
pipe := pipeline.NewPipelineBuilder(ctx, ls.storer, ls.modePut, ls.encrypted)
address, err := pipeline.FeedPipeline(ctx, pipe, bytes.NewReader(data), int64(len(data)))
address, err := file.SplitWriteAll(ctx, sp, bytes.NewReader(data), int64(len(data)), ls.encrypted)
if err != nil {
return swarm.ZeroAddress.Bytes(), err
}
......
......@@ -11,8 +11,8 @@ import (
"fmt"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/seekjoiner"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/manifest/simple"
......@@ -116,10 +116,10 @@ func (m *simpleManifest) Store(ctx context.Context, mode storage.ModePut) (swarm
}
func (m *simpleManifest) load(ctx context.Context, reference swarm.Address) error {
j := joiner.NewSimpleJoiner(m.storer)
j := seekjoiner.NewSimpleJoiner(m.storer)
buf := bytes.NewBuffer(nil)
_, err := file.JoinReadAll(ctx, j, reference, buf, m.encrypted)
_, err := file.JoinReadAll(ctx, j, reference, buf)
if err != nil {
return fmt.Errorf("manifest load error: %w", err)
}
......
......@@ -14,8 +14,9 @@ import (
)
var (
ErrNotFound = errors.New("storage: not found")
ErrInvalidChunk = errors.New("storage: invalid chunk")
ErrNotFound = errors.New("storage: not found")
ErrInvalidChunk = errors.New("storage: invalid chunk")
ErrReferenceLength = errors.New("invalid reference length")
)
// ModeGet enumerates different Getter modes.
......
......@@ -12,20 +12,17 @@ import (
"fmt"
"golang.org/x/crypto/sha3"
"github.com/ethersphere/bee/pkg/encryption"
)
const (
SpanSize = 8
SectionSize = 32
Branches = 128
ChunkSize = SectionSize * Branches
HashSize = 32
EncryptedReferenceSize = HashSize + encryption.KeyLength
MaxPO uint8 = 15
MaxBins = MaxPO + 1
ChunkWithSpanSize = ChunkSize + SpanSize
SpanSize = 8
SectionSize = 32
Branches = 128
ChunkSize = SectionSize * Branches
HashSize = 32
MaxPO uint8 = 15
MaxBins = MaxPO + 1
ChunkWithSpanSize = ChunkSize + SpanSize
)
var (
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment