Commit e7b0f738 authored by lash's avatar lash Committed by GitHub

Add splitter interface and simple splitter implementation stub (#124)

parent 68dcb05d
......@@ -31,6 +31,7 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20190923125748-758128399b1d
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
gitlab.com/nolash/go-mockbytes v0.0.7
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
resenje.org/web v0.4.0
......
......@@ -4,13 +4,13 @@
package file
// ErrAborted should be returned whenever a file operation is terminated
// AbortError should be returned whenever a file operation is terminated
// before it has completed.
type AbortError struct {
err error
}
// NewErrAbort creates a new ErrAborted instance.
// NewAbortError creates a new AbortError instance.
func NewAbortError(err error) error {
return &AbortError{
err: err,
......@@ -26,3 +26,26 @@ func (e *AbortError) Unwrap() error {
func (e *AbortError) Error() string {
return e.err.Error()
}
// HashError should be returned whenever a file operation is terminated
// before it has completed.
type HashError struct {
err error
}
// NewHashError creates a new HashError instance.
func NewHashError(err error) error {
return &HashError{
err: err,
}
}
// Unwrap returns an underlying error.
func (e *HashError) Unwrap() error {
return e.err
}
// Error implement standard go error interface.
func (e *HashError) Error() string {
return e.err.Error()
}
......@@ -20,3 +20,12 @@ import (
type Joiner interface {
Join(ctx context.Context, address swarm.Address) (dataOut io.ReadCloser, dataLength int64, err error)
}
// Splitter starts a new file splitting job.
//
// Data is read from the provided reader.
// If the dataLength parameter is 0, data is read until io.EOF is encountered.
// When EOF is received and splitting is done, the resulting Swarm Address is returned.
type Splitter interface {
Split(ctx context.Context, dataIn io.ReadCloser, dataLength int64) (addr swarm.Address, err error)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package file
import (
"bytes"
"errors"
"io"
)
// simpleJoinerReadCloser wraps a byte slice in a io.ReadCloser implementation.
type simpleReadCloser struct {
buffer io.Reader
closed bool
}
func NewSimpleReadCloser(buffer []byte) io.ReadCloser {
return &simpleReadCloser{
buffer: bytes.NewBuffer(buffer),
}
}
// Read implements io.Reader.
func (s *simpleReadCloser) Read(b []byte) (int, error) {
if s.closed {
return 0, errors.New("read on closed reader")
}
return s.buffer.Read(b)
}
// Close implements io.Closer.
func (s *simpleReadCloser) Close() error {
if s.closed {
return errors.New("close on already closed reader")
}
s.closed = true
return nil
}
......@@ -10,7 +10,6 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"sync"
......@@ -52,7 +51,7 @@ type SimpleJoinerJob struct {
// NewSimpleJoinerJob creates a new simpleJoinerJob.
func NewSimpleJoinerJob(ctx context.Context, store storage.Storer, rootChunk swarm.Chunk) *SimpleJoinerJob {
spanLength := binary.LittleEndian.Uint64(rootChunk.Data()[:8])
levelCount := getLevelsFromLength(int64(spanLength), swarm.SectionSize, swarm.Branches)
levelCount := file.Levels(int64(spanLength), swarm.SectionSize, swarm.Branches)
j := &SimpleJoinerJob{
ctx: ctx,
......@@ -67,7 +66,7 @@ func NewSimpleJoinerJob(ctx context.Context, store storage.Storer, rootChunk swa
// data level has index 0
startLevelIndex := levelCount - 1
j.data[startLevelIndex] = rootChunk.Data()[8:]
j.logger.Tracef("simple joiner start index %d for address %x", startLevelIndex, rootChunk.Address())
j.logger.Tracef("simple joiner start index %d for address %s", startLevelIndex, rootChunk.Address())
// retrieval must be asynchronous to the io.Reader()
go func() {
......@@ -199,18 +198,3 @@ func (j *SimpleJoinerJob) closeDone() {
close(j.doneC)
})
}
// getLevelsFromLength calculates the last level index which a particular data section count will result in.
// The returned level will be the level of the root hash.
func getLevelsFromLength(l int64, sectionSize int, branches int) int {
s := int64(sectionSize)
b := int64(branches)
if l == 0 {
return 0
} else if l <= s*b {
return 1
}
c := (l - 1) / s
return int(math.Log(float64(c))/math.Log(float64(b)) + 1)
}
......@@ -24,27 +24,6 @@ type simpleJoiner struct {
logger logging.Logger
}
// simpleJoinerReadCloser wraps a byte slice in a io.ReadCloser implementation.
type simpleReadCloser struct {
buffer []byte
cursor int
}
// Read implements io.Reader.
func (s *simpleReadCloser) Read(b []byte) (int, error) {
if s.cursor == len(s.buffer) {
return 0, io.EOF
}
copy(b, s.buffer)
s.cursor += len(s.buffer)
return len(s.buffer), nil
}
// Close implements io.Closer.
func (s *simpleReadCloser) Close() error {
return nil
}
// NewSimpleJoiner creates a new simpleJoiner.
func NewSimpleJoiner(store storage.Storer) file.Joiner {
return &simpleJoiner{
......@@ -68,10 +47,8 @@ func (s *simpleJoiner) Join(ctx context.Context, address swarm.Address) (dataOut
// if this is a single chunk, short circuit to returning just that chunk
spanLength := binary.LittleEndian.Uint64(rootChunk.Data())
if spanLength <= swarm.ChunkSize {
s.logger.Tracef("simplejoiner root chunk %v is single chunk, skipping join and returning directly", rootChunk)
return &simpleReadCloser{
buffer: (rootChunk.Data()[8:]),
}, int64(spanLength), nil
data := rootChunk.Data()[8:]
return file.NewSimpleReadCloser(data), int64(spanLength), nil
}
s.logger.Tracef("simplejoiner joining root chunk %v", rootChunk)
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package file
import (
"math"
"github.com/ethersphere/bee/pkg/swarm"
)
var Spans []int64
func init() {
Spans = GenerateSpanSizes(9, swarm.Branches)
}
// GenerateSpanSizes generates a dictionary of maximum span lengths per level represented by one SectionSize() of data
func GenerateSpanSizes(levels, branches int) []int64 {
spans := make([]int64, levels)
branchesSixtyfour := int64(branches)
var span int64 = 1
for i := 0; i < 9; i++ {
spans[i] = span
span *= branchesSixtyfour
}
return spans
}
// Levels calculates the last level index which a particular data section count will result in.
// The returned level will be the level of the root hash.
func Levels(length int64, sectionSize, branches int) int {
s := int64(sectionSize)
b := int64(branches)
if length == 0 {
return 0
} else if length <= s*b {
return 1
}
c := (length - 1) / s
return int(math.Log(float64(c))/math.Log(float64(b)) + 1)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package internal
import (
"context"
"errors"
"fmt"
"hash"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bmt"
bmtlegacy "github.com/ethersphere/bmt/legacy"
"golang.org/x/crypto/sha3"
)
// maximum amount of file tree levels this file hasher component can handle
// (128 ^ (9 - 1)) * 4096 = 295147905179352825856 bytes
const levelBufferLimit = 9
// hashFunc is a hasher factory used by the bmt hasher
func hashFunc() hash.Hash {
return sha3.NewLegacyKeccak256()
}
// SimpleSplitterJob encapsulated a single splitter operation, accepting blockwise
// writes of data whose length is defined in advance.
//
// After the job is constructed, Write must be called with up to ChunkSize byte slices
// until the full data length has been written. The Sum should be called which will
// return the SwarmHash of the data.
//
// Called Sum before the last Write, or Write after Sum has been called, may result in
// error and will may result in undefined result.
type SimpleSplitterJob struct {
ctx context.Context
store storage.Storer
spanLength int64 // target length of data
length int64 // number of bytes written to the data level of the hasher
sumCounts []int // number of sums performed, indexed per level
cursors []int // section write position, indexed per level
hasher bmt.Hash // underlying hasher used for hashing the tree
buffer []byte // keeps data and hashes, indexed by cursors
}
// NewSimpleSplitterJob creates a new SimpleSplitterJob.
//
// The spanLength is the length of the data that will be written.
func NewSimpleSplitterJob(ctx context.Context, store storage.Storer, spanLength int64) *SimpleSplitterJob {
p := bmtlegacy.NewTreePool(hashFunc, swarm.Branches, bmtlegacy.PoolSize)
return &SimpleSplitterJob{
ctx: ctx,
store: store,
spanLength: spanLength,
sumCounts: make([]int, levelBufferLimit),
cursors: make([]int, levelBufferLimit),
hasher: bmtlegacy.New(p),
buffer: make([]byte, swarm.ChunkSize*levelBufferLimit),
}
}
// Write adds data to the file splitter.
func (j *SimpleSplitterJob) Write(b []byte) (int, error) {
if len(b) > swarm.ChunkSize {
return 0, fmt.Errorf("Write must be called with a maximum of %d bytes", swarm.ChunkSize)
}
j.length += int64(len(b))
if j.length > j.spanLength {
return 0, errors.New("write past span length")
}
err := j.writeToLevel(0, b)
if err != nil {
return 0, err
}
if j.length == j.spanLength {
err := j.hashUnfinished()
if err != nil {
return 0, file.NewHashError(err)
}
err = j.moveDanglingChunk()
if err != nil {
return 0, file.NewHashError(err)
}
}
return len(b), nil
}
// Sum returns the Swarm hash of the data.
func (j *SimpleSplitterJob) Sum(b []byte) []byte {
return j.digest()
}
// writeToLevel writes to the data buffer on the specified level.
// It calls sum if chunk boundary is reached and recursively calls this function for
// the next level with the acquired bmt hash
//
// It adjusts the relevant levels' cursors accordingly.
func (s *SimpleSplitterJob) writeToLevel(lvl int, data []byte) error {
copy(s.buffer[s.cursors[lvl]:s.cursors[lvl]+len(data)], data)
s.cursors[lvl] += len(data)
if s.cursors[lvl]-s.cursors[lvl+1] == swarm.ChunkSize {
ref, err := s.sumLevel(lvl)
if err != nil {
return err
}
err = s.writeToLevel(lvl+1, ref)
if err != nil {
return err
}
s.cursors[lvl] = s.cursors[lvl+1]
}
return nil
}
// sumLevel calculates and returns the bmt sum of the last written data on the level.
//
// TODO: error handling on store write fail
func (s *SimpleSplitterJob) sumLevel(lvl int) ([]byte, error) {
s.sumCounts[lvl]++
spanSize := file.Spans[lvl] * swarm.ChunkSize
span := (s.length-1)%spanSize + 1
sizeToSum := s.cursors[lvl] - s.cursors[lvl+1]
s.hasher.Reset()
err := s.hasher.SetSpan(span)
if err != nil {
return nil, err
}
_, err = s.hasher.Write(s.buffer[s.cursors[lvl+1] : s.cursors[lvl+1]+sizeToSum])
if err != nil {
return nil, err
}
ref := s.hasher.Sum(nil)
addr := swarm.NewAddress(ref)
ch := swarm.NewChunk(addr, s.buffer[s.cursors[lvl+1]:s.cursors[lvl]])
_, err = s.store.Put(s.ctx, storage.ModePutUpload, ch)
if err != nil {
return nil, err
}
return ref, nil
}
// digest returns the calculated digest after a Sum call.
//
// The hash returned is the hash in the first section index of the work buffer
// this will be the root hash when all recursive sums have completed.
//
// The method does not check that the final hash actually has been written, so
// timing is the responsibility of the caller.
func (s *SimpleSplitterJob) digest() []byte {
return s.buffer[:swarm.SectionSize]
}
// hashUnfinished hasher the remaining unhashed chunks at the end of each level if
// write doesn't end on a chunk boundary.
func (s *SimpleSplitterJob) hashUnfinished() error {
if s.length%swarm.ChunkSize != 0 {
ref, err := s.sumLevel(0)
if err != nil {
return err
}
copy(s.buffer[s.cursors[1]:], ref)
s.cursors[1] += len(ref)
s.cursors[0] = s.cursors[1]
}
return nil
}
// moveDanglingChunk concatenates the reference to the single reference
// at the highest level of the tree in case of a balanced tree.
//
// Let F be full chunks (disregarding branching factor) and S be single references
// in the following scenario:
//
// S
// F F
// F F F
// F F F F S
//
// The result will be:
//
// SS
// F F
// F F F
// F F F F
//
// After which the SS will be hashed to obtain the final root hash
func (s *SimpleSplitterJob) moveDanglingChunk() error {
// calculate the total number of levels needed to represent the data (including the data level)
targetLevel := file.Levels(s.length, swarm.SectionSize, swarm.Branches)
// sum every intermediate level and write to the level above it
for i := 1; i < targetLevel; i++ {
// and if there is a single reference outside a balanced tree on this level
// don't hash it again but pass it on to the next level
if s.sumCounts[i] > 0 {
// TODO: simplify if possible
if int64(s.sumCounts[i-1])-file.Spans[targetLevel-1-i] <= 1 {
s.cursors[i+1] = s.cursors[i]
s.cursors[i] = s.cursors[i-1]
continue
}
}
ref, err := s.sumLevel(i)
if err != nil {
return err
}
copy(s.buffer[s.cursors[i+1]:], ref)
s.cursors[i+1] += len(ref)
s.cursors[i] = s.cursors[i+1]
}
return nil
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package internal_test
import (
"context"
"strconv"
"strings"
"testing"
"github.com/ethersphere/bee/pkg/file/splitter/internal"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
mockbytes "gitlab.com/nolash/go-mockbytes"
)
var (
dataLengths = []int{
31, // 0
32, // 1
33, // 2
63, // 3
64, // 4
65, // 5
swarm.ChunkSize, // 6
swarm.ChunkSize + 31, // 7
swarm.ChunkSize + 32, // 8
swarm.ChunkSize + 63, // 9
swarm.ChunkSize + 64, // 10
swarm.ChunkSize * 2, // 11
swarm.ChunkSize*2 + 32, // 12
swarm.ChunkSize * 128, // 13
swarm.ChunkSize*128 + 31, // 14
swarm.ChunkSize*128 + 32, // 15
swarm.ChunkSize*128 + 64, // 16
swarm.ChunkSize * 129, // 17
swarm.ChunkSize * 130, // 18
swarm.ChunkSize * 128 * 128, // 19
swarm.ChunkSize*128*128 + 32, // 20
}
expected = []string{
"ece86edb20669cc60d142789d464d57bdf5e33cb789d443f608cbd81cfa5697d", // 0
"0be77f0bb7abc9cd0abed640ee29849a3072ccfd1020019fe03658c38f087e02", // 1
"3463b46d4f9d5bfcbf9a23224d635e51896c1daef7d225b86679db17c5fd868e", // 2
"95510c2ff18276ed94be2160aed4e69c9116573b6f69faaeed1b426fea6a3db8", // 3
"490072cc55b8ad381335ff882ac51303cc069cbcb8d8d3f7aa152d9c617829fe", // 4
"541552bae05e9a63a6cb561f69edf36ffe073e441667dbf7a0e9a3864bb744ea", // 5
"c10090961e7682a10890c334d759a28426647141213abda93b096b892824d2ef", // 6
"91699c83ed93a1f87e326a29ccd8cc775323f9e7260035a5f014c975c5f3cd28", // 7
"73759673a52c1f1707cbb61337645f4fcbd209cdc53d7e2cedaaa9f44df61285", // 8
"db1313a727ffc184ae52a70012fbbf7235f551b9f2d2da04bf476abe42a3cb42", // 9
"ade7af36ac0c7297dc1c11fd7b46981b629c6077bce75300f85b02a6153f161b", // 10
"29a5fb121ce96194ba8b7b823a1f9c6af87e1791f824940a53b5a7efe3f790d9", // 11
"61416726988f77b874435bdd89a419edc3861111884fd60e8adf54e2f299efd6", // 12
"3047d841077898c26bbe6be652a2ec590a5d9bd7cd45d290ea42511b48753c09", // 13
"e5c76afa931e33ac94bce2e754b1bb6407d07f738f67856783d93934ca8fc576", // 14
"485a526fc74c8a344c43a4545a5987d17af9ab401c0ef1ef63aefcc5c2c086df", // 15
"624b2abb7aefc0978f891b2a56b665513480e5dc195b4a66cd8def074a6d2e94", // 16
"b8e1804e37a064d28d161ab5f256cc482b1423d5cd0a6b30fde7b0f51ece9199", // 17
"59de730bf6c67a941f3b2ffa2f920acfaa1713695ad5deea12b4a121e5f23fa1", // 18
"522194562123473dcfd7a457b18ee7dee8b7db70ed3cfa2b73f348a992fdfd3b", // 19
"ed0cc44c93b14fef2d91ab3a3674eeb6352a42ac2f0bbe524711824aae1e7bcc", // 20
}
start = 0
end = len(dataLengths)
)
// TestSplitterJobPartialSingleChunk passes sub-chunk length data to the splitter,
// verifies the correct hash is returned, and that write after Sum/complete Write
// returns error.
func TestSplitterJobPartialSingleChunk(t *testing.T) {
store := mock.NewStorer()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
data := []byte("foo")
j := internal.NewSimpleSplitterJob(ctx, store, int64(len(data)))
c, err := j.Write(data)
if err != nil {
t.Fatal(err)
}
if c < len(data) {
t.Fatalf("short write %d", c)
}
hashResult := j.Sum(nil)
addressResult := swarm.NewAddress(hashResult)
bmtHashOfFoo := "2387e8e7d8a48c2a9339c97c1dc3461a9a7aa07e994c5cb8b38fd7c1b3e6ea48"
address := swarm.MustParseHexAddress(bmtHashOfFoo)
if !addressResult.Equal(address) {
t.Fatalf("expected %v, got %v", address, addressResult)
}
_, err = j.Write([]byte("bar"))
if err == nil {
t.Fatal("expected error writing after write/sum complete")
}
}
// TestSplitterJobVector verifies file hasher results of legacy test vectors
func TestSplitterJobVector(t *testing.T) {
for i := start; i < end; i++ {
dataLengthStr := strconv.Itoa(dataLengths[i])
runString := strings.Join([]string{dataLengthStr, expected[i]}, "/")
t.Run(runString, testSplitterJobVector)
}
}
func testSplitterJobVector(t *testing.T) {
var (
paramstring = strings.Split(t.Name(), "/")
dataLength, _ = strconv.ParseInt(paramstring[1], 10, 0)
expectHex = paramstring[2]
store = mock.NewStorer()
)
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
data, err := g.SequentialBytes(int(dataLength))
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
j := internal.NewSimpleSplitterJob(ctx, store, int64(len(data)))
for i := 0; i < len(data); i += swarm.ChunkSize {
l := swarm.ChunkSize
if len(data)-i < swarm.ChunkSize {
l = len(data) - i
}
c, err := j.Write(data[i : i+l])
if err != nil {
t.Fatal(err)
}
if c < l {
t.Fatalf("short write %d", c)
}
}
actualBytes := j.Sum(nil)
actual := swarm.NewAddress(actualBytes)
expect := swarm.MustParseHexAddress(expectHex)
if !expect.Equal(actual) {
t.Fatalf("expected %v, got %v", expect, actual)
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package splitter provides implementations of the file.Splitter interface
package splitter
import (
"context"
"fmt"
"io"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/splitter/internal"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
// simpleSplitter wraps a non-optimized implementation of file.Splitter
type simpleSplitter struct {
store storage.Storer
}
// NewSimpleSplitter creates a new SimpleSplitter
func NewSimpleSplitter(store storage.Storer) file.Splitter {
return &simpleSplitter{
store: store,
}
}
// Split implements the file.Splitter interface
//
// It uses a non-optimized internal component that blocks when performing
// multiple levels of hashing when building the file hash tree.
//
// It returns the Swarmhash of the data.
func (s *simpleSplitter) Split(ctx context.Context, r io.ReadCloser, dataLength int64) (addr swarm.Address, err error) {
j := internal.NewSimpleSplitterJob(ctx, s.store, dataLength)
var total int
data := make([]byte, swarm.ChunkSize)
for {
c, err := r.Read(data)
if err != nil {
if err == io.EOF {
break
}
return swarm.ZeroAddress, err
}
cc, err := j.Write(data[:c])
if err != nil {
return swarm.ZeroAddress, err
}
if cc < c {
return swarm.ZeroAddress, fmt.Errorf("write count to file hasher component %d does not match read count %d", cc, c)
}
total += c
}
sum := j.Sum(nil)
return swarm.NewAddress(sum), nil
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package testing
import (
......@@ -7,7 +11,8 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
)
func GenerateTestRandomFileChunk(address swarm.Address, spanLength int, dataSize int) swarm.Chunk {
// GenerateTestRandomFileChunk generates one single chunk with arbitrary content and address
func GenerateTestRandomFileChunk(address swarm.Address, spanLength, dataSize int) swarm.Chunk {
data := make([]byte, dataSize+8)
binary.LittleEndian.PutUint64(data, uint64(spanLength))
_, _ = rand.Read(data[8:]) // # skipcq: GSC-G404
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment