Commit 18380b16 authored by lash's avatar lash Committed by GitHub

Add split join integration test (#168)

parent 93fa9d09
...@@ -12,6 +12,10 @@ import ( ...@@ -12,6 +12,10 @@ import (
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
) )
var (
ChunkWithLengthSize = swarm.ChunkSize + 8
)
// Joiner returns file data referenced by the given Swarm Address to the given io.Reader. // Joiner returns file data referenced by the given Swarm Address to the given io.Reader.
// //
// The call returns when the chunk for the given Swarm Address is found, // The call returns when the chunk for the given Swarm Address is found,
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package file_test
import (
"bytes"
"context"
"io"
"strconv"
"strings"
"testing"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/splitter"
test "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
var (
start = 0
end = test.GetVectorCount()
)
// TestSplitThenJoin splits a file with the splitter implementation and
// joins it again with the joiner implementation, verifying that the
// rebuilt data matches the original data that was split.
//
// It uses the same test vectors as the splitter tests to generate the
// necessary data.
func TestSplitThenJoin(t *testing.T) {
for i := start; i < end; i++ {
dataLengthStr := strconv.Itoa(i)
t.Run(dataLengthStr, testSplitThenJoin)
}
}
func testSplitThenJoin(t *testing.T) {
var (
paramstring = strings.Split(t.Name(), "/")
dataIdx, _ = strconv.ParseInt(paramstring[1], 10, 0)
store = mock.NewStorer()
s = splitter.NewSimpleSplitter(store)
j = joiner.NewSimpleJoiner(store)
data, _ = test.GetVector(t, int(dataIdx))
)
// first split
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dataReader := file.NewSimpleReadCloser(data)
resultAddress, err := s.Split(ctx, dataReader, int64(len(data)))
if err != nil {
t.Fatal(err)
}
// then join
r, l, err := j.Join(ctx, resultAddress)
if err != nil {
t.Fatal(err)
}
if l != int64(len(data)) {
t.Fatalf("data length return expected %d, got %d", len(data), l)
}
// read from joiner
var resultData []byte
for i := 0; i < len(data); i += swarm.ChunkSize {
readData := make([]byte, swarm.ChunkSize)
_, err := r.Read(readData)
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
resultData = append(resultData, readData...)
}
// compare result
if !bytes.Equal(resultData[:len(data)], data) {
t.Fatalf("data mismatch %d", len(data))
}
}
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"os" "io/ioutil"
"sync" "sync"
"github.com/ethersphere/bee/pkg/file" "github.com/ethersphere/bee/pkg/file"
...@@ -59,14 +59,13 @@ func NewSimpleJoinerJob(ctx context.Context, store storage.Storer, rootChunk swa ...@@ -59,14 +59,13 @@ func NewSimpleJoinerJob(ctx context.Context, store storage.Storer, rootChunk swa
spanLength: int64(spanLength), spanLength: int64(spanLength),
dataC: make(chan []byte), dataC: make(chan []byte),
doneC: make(chan struct{}), doneC: make(chan struct{}),
logger: logging.New(os.Stderr, 6), logger: logging.New(ioutil.Discard, 0),
} }
// startLevelIndex is the root chunk level // startLevelIndex is the root chunk level
// data level has index 0 // data level has index 0
startLevelIndex := levelCount - 1 startLevelIndex := levelCount - 1
j.data[startLevelIndex] = rootChunk.Data()[8:] j.data[startLevelIndex] = rootChunk.Data()[8:]
j.logger.Tracef("simple joiner start index %d for address %s", startLevelIndex, rootChunk.Address())
// retrieval must be asynchronous to the io.Reader() // retrieval must be asynchronous to the io.Reader()
go func() { go func() {
...@@ -76,8 +75,6 @@ func NewSimpleJoinerJob(ctx context.Context, store storage.Storer, rootChunk swa ...@@ -76,8 +75,6 @@ func NewSimpleJoinerJob(ctx context.Context, store storage.Storer, rootChunk swa
// in this case the error will always be nil and this will not be executed // in this case the error will always be nil and this will not be executed
if err != io.EOF { if err != io.EOF {
j.logger.Errorf("simple joiner chunk join job fail: %v", err) j.logger.Errorf("simple joiner chunk join job fail: %v", err)
} else {
j.logger.Tracef("simple joiner chunk join job eof")
} }
} }
j.err = err j.err = err
...@@ -110,8 +107,20 @@ func (j *SimpleJoinerJob) nextReference(level int) error { ...@@ -110,8 +107,20 @@ func (j *SimpleJoinerJob) nextReference(level int) error {
chunkAddress := swarm.NewAddress(data[cursor : cursor+swarm.SectionSize]) chunkAddress := swarm.NewAddress(data[cursor : cursor+swarm.SectionSize])
err := j.nextChunk(level-1, chunkAddress) err := j.nextChunk(level-1, chunkAddress)
if err != nil { if err != nil {
if err == io.EOF {
return err
}
// if the last write is a "dangling chunk" the data chunk will have been moved
// to an intermediate level. In this edge case, the error must be suppressed,
// and the cursor manually to data length boundary to terminate the loop in
// the calling frame.
if j.readCount+int64(len(data)) == j.spanLength {
j.cursors[level] = len(j.data[level])
err = j.sendChunkToReader(data)
return err return err
} }
return fmt.Errorf("error in join for chunk %v: %v", chunkAddress, err)
}
// move the cursor to the next reference // move the cursor to the next reference
j.cursors[level] += swarm.SectionSize j.cursors[level] += swarm.SectionSize
...@@ -151,6 +160,14 @@ func (j *SimpleJoinerJob) nextChunk(level int, address swarm.Address) error { ...@@ -151,6 +160,14 @@ func (j *SimpleJoinerJob) nextChunk(level int, address swarm.Address) error {
// * context cancelled when client has disappeared, timeout etc // * context cancelled when client has disappeared, timeout etc
// * doneC receive when gracefully terminated through Close // * doneC receive when gracefully terminated through Close
data := ch.Data()[8:] data := ch.Data()[8:]
err = j.sendChunkToReader(data)
}
return err
}
// sendChunkToReader handles exceptions on the part of consumer in
// the reading of data
func (j *SimpleJoinerJob) sendChunkToReader(data []byte) error {
select { select {
case <-j.ctx.Done(): case <-j.ctx.Done():
j.readCount = j.spanLength j.readCount = j.spanLength
...@@ -159,16 +176,14 @@ func (j *SimpleJoinerJob) nextChunk(level int, address swarm.Address) error { ...@@ -159,16 +176,14 @@ func (j *SimpleJoinerJob) nextChunk(level int, address swarm.Address) error {
return file.NewAbortError(errors.New("chunk read aborted")) return file.NewAbortError(errors.New("chunk read aborted"))
case j.dataC <- data: case j.dataC <- data:
j.readCount += int64(len(data)) j.readCount += int64(len(data))
}
// when we reach the end of data to be read // when we reach the end of data to be read
// bubble io.EOF error to the gofunc in the // bubble io.EOF error to the gofunc in the
// constructor that called start() // constructor that called start()
if j.readCount == j.spanLength { if j.readCount == j.spanLength {
j.logger.Trace("read all")
return io.EOF return io.EOF
} }
} }
return err return nil
} }
// Read is called by the consumer to retrieve the joined data. // Read is called by the consumer to retrieve the joined data.
......
...@@ -9,11 +9,9 @@ import ( ...@@ -9,11 +9,9 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"io" "io"
"os"
"github.com/ethersphere/bee/pkg/file" "github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner/internal" "github.com/ethersphere/bee/pkg/file/joiner/internal"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
) )
...@@ -21,14 +19,12 @@ import ( ...@@ -21,14 +19,12 @@ import (
// simpleJoiner wraps a non-optimized implementation of file.Joiner. // simpleJoiner wraps a non-optimized implementation of file.Joiner.
type simpleJoiner struct { type simpleJoiner struct {
store storage.Storer store storage.Storer
logger logging.Logger
} }
// NewSimpleJoiner creates a new simpleJoiner. // NewSimpleJoiner creates a new simpleJoiner.
func NewSimpleJoiner(store storage.Storer) file.Joiner { func NewSimpleJoiner(store storage.Storer) file.Joiner {
return &simpleJoiner{ return &simpleJoiner{
store: store, store: store,
logger: logging.New(os.Stderr, 6),
} }
} }
...@@ -51,7 +47,6 @@ func (s *simpleJoiner) Join(ctx context.Context, address swarm.Address) (dataOut ...@@ -51,7 +47,6 @@ func (s *simpleJoiner) Join(ctx context.Context, address swarm.Address) (dataOut
return file.NewSimpleReadCloser(data), int64(spanLength), nil return file.NewSimpleReadCloser(data), int64(spanLength), nil
} }
s.logger.Tracef("simplejoiner joining root chunk %v", rootChunk)
r := internal.NewSimpleJoinerJob(ctx, s.store, rootChunk) r := internal.NewSimpleJoinerJob(ctx, s.store, rootChunk)
return r, int64(spanLength), nil return r, int64(spanLength), nil
} }
...@@ -6,6 +6,7 @@ package internal ...@@ -6,6 +6,7 @@ package internal
import ( import (
"context" "context"
"encoding/binary"
"errors" "errors"
"fmt" "fmt"
"hash" "hash"
...@@ -59,7 +60,7 @@ func NewSimpleSplitterJob(ctx context.Context, store storage.Storer, spanLength ...@@ -59,7 +60,7 @@ func NewSimpleSplitterJob(ctx context.Context, store storage.Storer, spanLength
sumCounts: make([]int, levelBufferLimit), sumCounts: make([]int, levelBufferLimit),
cursors: make([]int, levelBufferLimit), cursors: make([]int, levelBufferLimit),
hasher: bmtlegacy.New(p), hasher: bmtlegacy.New(p),
buffer: make([]byte, swarm.ChunkSize*levelBufferLimit), buffer: make([]byte, file.ChunkWithLengthSize*levelBufferLimit),
} }
} }
...@@ -128,6 +129,7 @@ func (s *SimpleSplitterJob) sumLevel(lvl int) ([]byte, error) { ...@@ -128,6 +129,7 @@ func (s *SimpleSplitterJob) sumLevel(lvl int) ([]byte, error) {
sizeToSum := s.cursors[lvl] - s.cursors[lvl+1] sizeToSum := s.cursors[lvl] - s.cursors[lvl+1]
// perform hashing
s.hasher.Reset() s.hasher.Reset()
err := s.hasher.SetSpan(span) err := s.hasher.SetSpan(span)
if err != nil { if err != nil {
...@@ -138,12 +140,19 @@ func (s *SimpleSplitterJob) sumLevel(lvl int) ([]byte, error) { ...@@ -138,12 +140,19 @@ func (s *SimpleSplitterJob) sumLevel(lvl int) ([]byte, error) {
return nil, err return nil, err
} }
ref := s.hasher.Sum(nil) ref := s.hasher.Sum(nil)
// assemble chunk and put in store
addr := swarm.NewAddress(ref) addr := swarm.NewAddress(ref)
ch := swarm.NewChunk(addr, s.buffer[s.cursors[lvl+1]:s.cursors[lvl]]) head := make([]byte, 8)
binary.LittleEndian.PutUint64(head, uint64(span))
tail := s.buffer[s.cursors[lvl+1]:s.cursors[lvl]]
chunkData := append(head, tail...)
ch := swarm.NewChunk(addr, chunkData)
_, err = s.store.Put(s.ctx, storage.ModePutUpload, ch) _, err = s.store.Put(s.ctx, storage.ModePutUpload, ch)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return ref, nil return ref, nil
} }
......
...@@ -11,61 +11,14 @@ import ( ...@@ -11,61 +11,14 @@ import (
"testing" "testing"
"github.com/ethersphere/bee/pkg/file/splitter/internal" "github.com/ethersphere/bee/pkg/file/splitter/internal"
test "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/storage/mock" "github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
mockbytes "gitlab.com/nolash/go-mockbytes"
) )
var ( var (
dataLengths = []int{
31, // 0
32, // 1
33, // 2
63, // 3
64, // 4
65, // 5
swarm.ChunkSize, // 6
swarm.ChunkSize + 31, // 7
swarm.ChunkSize + 32, // 8
swarm.ChunkSize + 63, // 9
swarm.ChunkSize + 64, // 10
swarm.ChunkSize * 2, // 11
swarm.ChunkSize*2 + 32, // 12
swarm.ChunkSize * 128, // 13
swarm.ChunkSize*128 + 31, // 14
swarm.ChunkSize*128 + 32, // 15
swarm.ChunkSize*128 + 64, // 16
swarm.ChunkSize * 129, // 17
swarm.ChunkSize * 130, // 18
swarm.ChunkSize * 128 * 128, // 19
swarm.ChunkSize*128*128 + 32, // 20
}
expected = []string{
"ece86edb20669cc60d142789d464d57bdf5e33cb789d443f608cbd81cfa5697d", // 0
"0be77f0bb7abc9cd0abed640ee29849a3072ccfd1020019fe03658c38f087e02", // 1
"3463b46d4f9d5bfcbf9a23224d635e51896c1daef7d225b86679db17c5fd868e", // 2
"95510c2ff18276ed94be2160aed4e69c9116573b6f69faaeed1b426fea6a3db8", // 3
"490072cc55b8ad381335ff882ac51303cc069cbcb8d8d3f7aa152d9c617829fe", // 4
"541552bae05e9a63a6cb561f69edf36ffe073e441667dbf7a0e9a3864bb744ea", // 5
"c10090961e7682a10890c334d759a28426647141213abda93b096b892824d2ef", // 6
"91699c83ed93a1f87e326a29ccd8cc775323f9e7260035a5f014c975c5f3cd28", // 7
"73759673a52c1f1707cbb61337645f4fcbd209cdc53d7e2cedaaa9f44df61285", // 8
"db1313a727ffc184ae52a70012fbbf7235f551b9f2d2da04bf476abe42a3cb42", // 9
"ade7af36ac0c7297dc1c11fd7b46981b629c6077bce75300f85b02a6153f161b", // 10
"29a5fb121ce96194ba8b7b823a1f9c6af87e1791f824940a53b5a7efe3f790d9", // 11
"61416726988f77b874435bdd89a419edc3861111884fd60e8adf54e2f299efd6", // 12
"3047d841077898c26bbe6be652a2ec590a5d9bd7cd45d290ea42511b48753c09", // 13
"e5c76afa931e33ac94bce2e754b1bb6407d07f738f67856783d93934ca8fc576", // 14
"485a526fc74c8a344c43a4545a5987d17af9ab401c0ef1ef63aefcc5c2c086df", // 15
"624b2abb7aefc0978f891b2a56b665513480e5dc195b4a66cd8def074a6d2e94", // 16
"b8e1804e37a064d28d161ab5f256cc482b1423d5cd0a6b30fde7b0f51ece9199", // 17
"59de730bf6c67a941f3b2ffa2f920acfaa1713695ad5deea12b4a121e5f23fa1", // 18
"522194562123473dcfd7a457b18ee7dee8b7db70ed3cfa2b73f348a992fdfd3b", // 19
"ed0cc44c93b14fef2d91ab3a3674eeb6352a42ac2f0bbe524711824aae1e7bcc", // 20
}
start = 0 start = 0
end = len(dataLengths) end = test.GetVectorCount()
) )
// TestSplitterJobPartialSingleChunk passes sub-chunk length data to the splitter, // TestSplitterJobPartialSingleChunk passes sub-chunk length data to the splitter,
...@@ -106,26 +59,19 @@ func TestSplitterJobPartialSingleChunk(t *testing.T) { ...@@ -106,26 +59,19 @@ func TestSplitterJobPartialSingleChunk(t *testing.T) {
// TestSplitterJobVector verifies file hasher results of legacy test vectors // TestSplitterJobVector verifies file hasher results of legacy test vectors
func TestSplitterJobVector(t *testing.T) { func TestSplitterJobVector(t *testing.T) {
for i := start; i < end; i++ { for i := start; i < end; i++ {
dataLengthStr := strconv.Itoa(dataLengths[i]) dataLengthStr := strconv.Itoa(i)
runString := strings.Join([]string{dataLengthStr, expected[i]}, "/") t.Run(dataLengthStr, testSplitterJobVector)
t.Run(runString, testSplitterJobVector)
} }
} }
func testSplitterJobVector(t *testing.T) { func testSplitterJobVector(t *testing.T) {
var ( var (
paramstring = strings.Split(t.Name(), "/") paramstring = strings.Split(t.Name(), "/")
dataLength, _ = strconv.ParseInt(paramstring[1], 10, 0) dataIdx, _ = strconv.ParseInt(paramstring[1], 10, 0)
expectHex = paramstring[2]
store = mock.NewStorer() store = mock.NewStorer()
) )
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255) data, expect := test.GetVector(t, int(dataIdx))
data, err := g.SequentialBytes(int(dataLength))
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
j := internal.NewSimpleSplitterJob(ctx, store, int64(len(data))) j := internal.NewSimpleSplitterJob(ctx, store, int64(len(data)))
...@@ -147,7 +93,6 @@ func testSplitterJobVector(t *testing.T) { ...@@ -147,7 +93,6 @@ func testSplitterJobVector(t *testing.T) {
actualBytes := j.Sum(nil) actualBytes := j.Sum(nil)
actual := swarm.NewAddress(actualBytes) actual := swarm.NewAddress(actualBytes)
expect := swarm.MustParseHexAddress(expectHex)
if !expect.Equal(actual) { if !expect.Equal(actual) {
t.Fatalf("expected %v, got %v", expect, actual) t.Fatalf("expected %v, got %v", expect, actual)
} }
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package testing
import (
"testing"
"github.com/ethersphere/bee/pkg/swarm"
mockbytes "gitlab.com/nolash/go-mockbytes"
)
var (
fileByteMod int = 255
fileLengths = []int{
31, // 0
32, // 1
33, // 2
63, // 3
64, // 4
65, // 5
swarm.ChunkSize, // 6
swarm.ChunkSize + 31, // 7
swarm.ChunkSize + 32, // 8
swarm.ChunkSize + 63, // 9
swarm.ChunkSize + 64, // 10
swarm.ChunkSize * 2, // 11
swarm.ChunkSize*2 + 32, // 12
swarm.ChunkSize * 128, // 13
swarm.ChunkSize*128 + 31, // 14
swarm.ChunkSize*128 + 32, // 15
swarm.ChunkSize*128 + 64, // 16
swarm.ChunkSize * 129, // 17
swarm.ChunkSize * 130, // 18
swarm.ChunkSize * 128 * 128, // 19
swarm.ChunkSize*128*128 + 32, // 20
}
fileExpectHashHex = []string{
"ece86edb20669cc60d142789d464d57bdf5e33cb789d443f608cbd81cfa5697d", // 0
"0be77f0bb7abc9cd0abed640ee29849a3072ccfd1020019fe03658c38f087e02", // 1
"3463b46d4f9d5bfcbf9a23224d635e51896c1daef7d225b86679db17c5fd868e", // 2
"95510c2ff18276ed94be2160aed4e69c9116573b6f69faaeed1b426fea6a3db8", // 3
"490072cc55b8ad381335ff882ac51303cc069cbcb8d8d3f7aa152d9c617829fe", // 4
"541552bae05e9a63a6cb561f69edf36ffe073e441667dbf7a0e9a3864bb744ea", // 5
"c10090961e7682a10890c334d759a28426647141213abda93b096b892824d2ef", // 6
"91699c83ed93a1f87e326a29ccd8cc775323f9e7260035a5f014c975c5f3cd28", // 7
"73759673a52c1f1707cbb61337645f4fcbd209cdc53d7e2cedaaa9f44df61285", // 8
"db1313a727ffc184ae52a70012fbbf7235f551b9f2d2da04bf476abe42a3cb42", // 9
"ade7af36ac0c7297dc1c11fd7b46981b629c6077bce75300f85b02a6153f161b", // 10
"29a5fb121ce96194ba8b7b823a1f9c6af87e1791f824940a53b5a7efe3f790d9", // 11
"61416726988f77b874435bdd89a419edc3861111884fd60e8adf54e2f299efd6", // 12
"3047d841077898c26bbe6be652a2ec590a5d9bd7cd45d290ea42511b48753c09", // 13
"e5c76afa931e33ac94bce2e754b1bb6407d07f738f67856783d93934ca8fc576", // 14
"485a526fc74c8a344c43a4545a5987d17af9ab401c0ef1ef63aefcc5c2c086df", // 15
"624b2abb7aefc0978f891b2a56b665513480e5dc195b4a66cd8def074a6d2e94", // 16
"b8e1804e37a064d28d161ab5f256cc482b1423d5cd0a6b30fde7b0f51ece9199", // 17
"59de730bf6c67a941f3b2ffa2f920acfaa1713695ad5deea12b4a121e5f23fa1", // 18
"522194562123473dcfd7a457b18ee7dee8b7db70ed3cfa2b73f348a992fdfd3b", // 19
"ed0cc44c93b14fef2d91ab3a3674eeb6352a42ac2f0bbe524711824aae1e7bcc", // 20
}
)
// GetVector returns test data corresponding to the test vector index,
// and the expected result address.
func GetVector(t *testing.T, idx int) ([]byte, swarm.Address) {
t.Helper()
if idx > fileLengths[idx] {
t.Fatalf("idx %d out of bound for count %d", idx, GetVectorCount())
}
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(fileByteMod)
data, err := g.SequentialBytes(fileLengths[idx])
if err != nil {
t.Fatal(err)
}
return data, swarm.MustParseHexAddress(fileExpectHashHex[idx])
}
// GetVectorCount returns the number of available test vectors.
func GetVectorCount() int {
return len(fileLengths)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment