Commit c4efb95c authored by acud's avatar acud Committed by GitHub

pipeline splitter (#603)

* pipeline splitter without integration
parent 48600669
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
import (
"hash"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bmt"
bmtlegacy "github.com/ethersphere/bmt/legacy"
"golang.org/x/crypto/sha3"
)
type bmtWriter struct {
b bmt.Hash
next chainWriter
}
// newBmtWriter returns a new bmtWriter. Partial writes are not supported.
// Note: branching factor is the BMT branching factor, not the merkle trie branching factor.
func newBmtWriter(branches int, next chainWriter) chainWriter {
return &bmtWriter{
b: bmtlegacy.New(bmtlegacy.NewTreePool(hashFunc, branches, bmtlegacy.PoolSize)),
next: next,
}
}
// chainWrite writes data in chain. It assumes span has been prepended to the data.
// The span can be encrypted or unencrypted.
func (w *bmtWriter) chainWrite(p *pipeWriteArgs) error {
w.b.Reset()
err := w.b.SetSpanBytes(p.data[:swarm.SpanSize])
if err != nil {
return err
}
_, err = w.b.Write(p.data[swarm.SpanSize:])
if err != nil {
return err
}
bytes := w.b.Sum(nil)
args := &pipeWriteArgs{ref: bytes, data: p.data, span: p.data[:swarm.SpanSize]}
return w.next.chainWrite(args)
}
// sum calls the next writer for the cryptographic sum
func (w *bmtWriter) sum() ([]byte, error) {
return w.next.sum()
}
func hashFunc() hash.Hash {
return sha3.NewLegacyKeccak256()
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
import (
"bytes"
"encoding/binary"
"errors"
"testing"
)
func TestFeeder(t *testing.T) {
var (
chunkSize = 5
data = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}
)
for _, tc := range []struct {
name string // name
dataSize []int // how big each write is
expWrites int // expected number of writes
writeData []byte // expected data in last write buffer
span uint64 // expected span of written data
}{
{
name: "empty write",
dataSize: []int{0},
expWrites: 0,
},
{
name: "less than chunk, no writes",
dataSize: []int{3},
expWrites: 0,
},
{
name: "one chunk, one write",
dataSize: []int{5},
expWrites: 1,
writeData: []byte{1, 2, 3, 4, 5},
span: 5,
},
{
name: "two chunks, two writes",
dataSize: []int{10},
expWrites: 2,
writeData: []byte{6, 7, 8, 9, 10},
span: 5,
},
{
name: "half chunk, then full one, one write",
dataSize: []int{3, 5},
expWrites: 1,
writeData: []byte{1, 2, 3, 4, 5},
span: 5,
},
{
name: "half chunk, another two halves, one write",
dataSize: []int{3, 2, 3},
expWrites: 1,
writeData: []byte{1, 2, 3, 4, 5},
span: 5,
},
{
name: "half chunk, another two halves, another full, two writes",
dataSize: []int{3, 2, 3, 5},
expWrites: 2,
writeData: []byte{6, 7, 8, 9, 10},
span: 5,
},
} {
t.Run(tc.name, func(t *testing.T) {
var results pipeWriteArgs
rr := newMockResultWriter(&results)
cf := newChunkFeederWriter(chunkSize, rr)
i := 0
for _, v := range tc.dataSize {
d := data[i : i+v]
n, err := cf.Write(d)
if err != nil {
t.Fatal(err)
}
if n != v {
t.Fatalf("wrote %d bytes but expected %d bytes", n, v)
}
i += v
}
if tc.expWrites == 0 && results.data != nil {
t.Fatal("expected no write but got one")
}
if rr.count != tc.expWrites {
t.Fatalf("expected %d writes but got %d", tc.expWrites, rr.count)
}
if results.data != nil && !bytes.Equal(tc.writeData, results.data[8:]) {
t.Fatalf("expected write data %v but got %v", tc.writeData, results.data[8:])
}
if tc.span > 0 {
v := binary.LittleEndian.Uint64(results.data[:8])
if v != tc.span {
t.Fatalf("span mismatch, got %d want %d", v, tc.span)
}
}
})
}
}
// TestFeederFlush tests that the feeder flushes the data in the buffer correctly
// when Summing
func TestFeederFlush(t *testing.T) {
var (
chunkSize = 5
data = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}
)
for _, tc := range []struct {
name string // name
dataSize []int // how big each write is
expWrites int // expected number of writes
writeData []byte // expected data in last write buffer
span uint64 // expected span of written data
}{
{
name: "empty write",
dataSize: []int{0},
expWrites: 0,
},
{
name: "less than chunk, one write",
dataSize: []int{3},
expWrites: 1,
writeData: []byte{1, 2, 3},
},
{
name: "one chunk, one write",
dataSize: []int{5},
expWrites: 1,
writeData: []byte{1, 2, 3, 4, 5},
span: 5,
},
{
name: "two chunks, two writes",
dataSize: []int{10},
expWrites: 2,
writeData: []byte{6, 7, 8, 9, 10},
span: 5,
},
{
name: "half chunk, then full one, two writes",
dataSize: []int{3, 5},
expWrites: 2,
writeData: []byte{6, 7, 8},
span: 3,
},
{
name: "half chunk, another two halves, two writes",
dataSize: []int{3, 2, 3},
expWrites: 2,
writeData: []byte{6, 7, 8},
span: 3,
},
{
name: "half chunk, another two halves, another full, three writes",
dataSize: []int{3, 2, 3, 5},
expWrites: 3,
writeData: []byte{11, 12, 13},
span: 3,
},
} {
t.Run(tc.name, func(t *testing.T) {
var results pipeWriteArgs
rr := newMockResultWriter(&results)
cf := newChunkFeederWriter(chunkSize, rr)
i := 0
for _, v := range tc.dataSize {
d := data[i : i+v]
n, err := cf.Write(d)
if err != nil {
t.Fatal(err)
}
if n != v {
t.Fatalf("wrote %d bytes but expected %d bytes", n, v)
}
i += v
}
_, _ = cf.Sum()
if tc.expWrites == 0 && results.data != nil {
t.Fatal("expected no write but got one")
}
if rr.count != tc.expWrites {
t.Fatalf("expected %d writes but got %d", tc.expWrites, rr.count)
}
if results.data != nil && !bytes.Equal(tc.writeData, results.data[8:]) {
t.Fatalf("expected write data %v but got %v", tc.writeData, results.data[8:])
}
if tc.span > 0 {
v := binary.LittleEndian.Uint64(results.data[:8])
if v != tc.span {
t.Fatalf("span mismatch, got %d want %d", v, tc.span)
}
}
})
}
}
type countingResultWriter struct {
target *pipeWriteArgs
count int
}
func newMockResultWriter(b *pipeWriteArgs) *countingResultWriter {
return &countingResultWriter{target: b}
}
func (w *countingResultWriter) chainWrite(p *pipeWriteArgs) error {
w.count++
*w.target = *p
return nil
}
func (w *countingResultWriter) sum() ([]byte, error) {
return nil, errors.New("not implemented")
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
import (
"encoding/binary"
"github.com/ethersphere/bee/pkg/swarm"
)
const span = swarm.SpanSize
type chunkFeeder struct {
size int
next chainWriter
buffer []byte
bufferIdx int
}
// newChunkFeederWriter creates a new chunkFeeder that allows for partial
// writes into the pipeline. Any pending data in the buffer is flushed to
// subsequent writers when Sum() is called.
func newChunkFeederWriter(size int, next chainWriter) Interface {
return &chunkFeeder{
size: size,
next: next,
buffer: make([]byte, size),
}
}
// Write writes data to the chunk feeder. It returns the number of bytes written
// to the feeder. The number of bytes written does not necessarily reflect how many
// bytes were actually flushed to subsequent writers, since the feeder is buffered
// and works in chunk-size quantiles.
func (f *chunkFeeder) Write(b []byte) (int, error) {
l := len(b) // data length
w := 0 // written
if l+f.bufferIdx < f.size {
// write the data into the buffer and return
n := copy(f.buffer[f.bufferIdx:], b)
f.bufferIdx += n
return n, nil
}
// if we are here it means we have to do at least one write
d := make([]byte, f.size+span)
sp := 0 // span of current write
//copy from existing buffer to this one
sp = copy(d[span:], f.buffer[:f.bufferIdx])
// don't account what was already in the buffer when returning
// number of written bytes
if sp > 0 {
w -= sp
}
var n int
for i := 0; i < len(b); {
// if we can't fill a whole write, buffer the rest and return
if sp+len(b[i:]) < f.size {
n = copy(f.buffer, b[i:])
f.bufferIdx = n
return w + n, nil
}
// fill stuff up from the incoming write
n = copy(d[span+f.bufferIdx:], b[i:])
i += n
sp += n
binary.LittleEndian.PutUint64(d[:span], uint64(sp))
args := &pipeWriteArgs{data: d[:span+sp], span: d[:span]}
err := f.next.chainWrite(args)
if err != nil {
return 0, err
}
f.bufferIdx = 0
w += sp
sp = 0
}
return w, nil
}
// Sum flushes any pending data to subsequent writers and returns
// the cryptographic root-hash respresenting the data written to
// the feeder.
func (f *chunkFeeder) Sum() ([]byte, error) {
// flush existing data in the buffer
if f.bufferIdx > 0 {
d := make([]byte, f.bufferIdx+span)
copy(d[span:], f.buffer[:f.bufferIdx])
binary.LittleEndian.PutUint64(d[:span], uint64(f.bufferIdx))
args := &pipeWriteArgs{data: d}
err := f.next.chainWrite(args)
if err != nil {
return nil, err
}
}
return f.next.sum()
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
import (
"encoding/binary"
"github.com/ethersphere/bee/pkg/swarm"
)
type hashTrieWriter struct {
branching int
chunkSize int
refSize int
fullChunk int // full chunk size in terms of the data represented in the buffer (span+refsize)
cursors []int // level cursors, key is level. level 0 is data level
buffer []byte // keeps all level data
pipelineFn pipelineFunc
}
func newHashTrieWriter(chunkSize, branching, refLen int, pipelineFn pipelineFunc) chainWriter {
return &hashTrieWriter{
cursors: make([]int, 9),
buffer: make([]byte, swarm.ChunkWithSpanSize*9*2), // double size as temp workaround for weak calculation of needed buffer space
branching: branching,
chunkSize: chunkSize,
refSize: refLen,
fullChunk: (refLen + swarm.SpanSize) * branching,
pipelineFn: pipelineFn,
}
}
// accepts writes of hashes from the previous writer in the chain, by definition these writes
// are on level 1
func (h *hashTrieWriter) chainWrite(p *pipeWriteArgs) error {
return h.writeToLevel(1, p.span, p.ref)
}
func (h *hashTrieWriter) writeToLevel(level int, span, ref []byte) error {
copy(h.buffer[h.cursors[level]:h.cursors[level]+len(span)], span) //copy the span slongside
h.cursors[level] += len(span)
copy(h.buffer[h.cursors[level]:h.cursors[level]+len(ref)], ref)
h.cursors[level] += len(ref)
howLong := (h.refSize + swarm.SpanSize) * h.branching
if h.levelSize(level) == howLong {
return h.wrapFullLevel(level)
}
return nil
}
// wrapLevel wraps an existing level and writes the resulting hash to the following level
// then truncates the current level data by shifting the cursors.
// Steps are performed in the following order:
// - take all of the data in the current level
// - break down span and hash data
// - sum the span size, concatenate the hash to the buffer
// - call the short pipeline with the span and the buffer
// - get the hash that was created, append it one level above, and if necessary, wrap that level too
// - remove already hashed data from buffer
// assumes that the function has been called when refsize+span*branching has been reached
func (h *hashTrieWriter) wrapFullLevel(level int) error {
data := h.buffer[h.cursors[level+1]:h.cursors[level]]
sp := uint64(0)
var hashes []byte
for i := 0; i < len(data); i += h.refSize + 8 {
// sum up the spans of the level, then we need to bmt them and store it as a chunk
// then write the chunk address to the next level up
sp += binary.LittleEndian.Uint64(data[i : i+8])
hash := data[i+8 : i+h.refSize+8]
hashes = append(hashes, hash...)
}
spb := make([]byte, 8)
binary.LittleEndian.PutUint64(spb, sp)
hashes = append(spb, hashes...)
var results pipeWriteArgs
writer := h.pipelineFn(&results)
args := pipeWriteArgs{
data: hashes,
}
err := writer.chainWrite(&args)
if err != nil {
return err
}
err = h.writeToLevel(level+1, results.span, results.ref)
if err != nil {
return err
}
// this "truncates" the current level that was wrapped
// by setting the cursors the the cursors of one level above
h.cursors[level] = h.cursors[level+1]
return nil
}
// pulls and potentially wraps all levels up to target
func (h *hashTrieWriter) hoistLevels(target int) ([]byte, error) {
oneRef := h.refSize + swarm.SpanSize
for i := 1; i < target; i++ {
l := h.levelSize(i)
switch {
case l == 0:
continue
case l == h.fullChunk:
err := h.wrapFullLevel(i)
if err != nil {
return nil, err
}
case l == oneRef:
h.cursors[i+1] = h.cursors[i]
default:
// more than 0 but smaller than chunk size - wrap the level to the one above it
err := h.wrapFullLevel(i)
if err != nil {
return nil, err
}
}
}
level := target
tlen := h.levelSize(target)
data := h.buffer[h.cursors[level+1]:h.cursors[level]]
if tlen == oneRef {
return data[8:], nil
}
// here we are still with possible length of more than one ref in the highest+1 level
sp := uint64(0)
var hashes []byte
for i := 0; i < len(data); i += h.refSize + 8 {
// sum up the spans of the level, then we need to bmt them and store it as a chunk
// then write the chunk address to the next level up
sp += binary.LittleEndian.Uint64(data[i : i+8])
hash := data[i+8 : i+h.refSize+8]
hashes = append(hashes, hash...)
}
spb := make([]byte, 8)
binary.LittleEndian.PutUint64(spb, sp)
hashes = append(spb, hashes...)
var results pipeWriteArgs
writer := h.pipelineFn(&results)
args := pipeWriteArgs{
data: hashes,
}
err := writer.chainWrite(&args)
return results.ref, err
}
func (h *hashTrieWriter) levelSize(level int) int {
if level == 8 {
return h.cursors[level]
}
return h.cursors[level] - h.cursors[level+1]
}
func (h *hashTrieWriter) sum() ([]byte, error) {
// look from the top down, to look for the highest hash of a balanced tree
// then, whatever is in the levels below that is necessarily unbalanced,
// so, we'd like to reduce those levels to one hash, then wrap it together
// with the balanced tree hash, to produce the root chunk
highest := 1
for i := 8; i > 0; i-- {
if h.levelSize(i) > 0 && i > highest {
highest = i
}
}
return h.hoistLevels(highest)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
import (
"bytes"
"encoding/hex"
"fmt"
"testing"
test "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
func TestPartialWrites(t *testing.T) {
m := mock.NewStorer()
p := NewPipeline(m)
_, _ = p.Write([]byte("hello "))
_, _ = p.Write([]byte("world"))
sum, err := p.Sum()
if err != nil {
t.Fatal(err)
}
exp := swarm.MustParseHexAddress("92672a471f4419b255d7cb0cf313474a6f5856fb347c5ece85fb706d644b630f")
if !bytes.Equal(exp.Bytes(), sum) {
t.Fatalf("expected %s got %s", exp.String(), hex.EncodeToString(sum))
}
}
func TestHelloWorld(t *testing.T) {
m := mock.NewStorer()
p := NewPipeline(m)
data := []byte("hello world")
_, _ = p.Write(data)
sum, err := p.Sum()
if err != nil {
t.Fatal(err)
}
exp := swarm.MustParseHexAddress("92672a471f4419b255d7cb0cf313474a6f5856fb347c5ece85fb706d644b630f")
if !bytes.Equal(exp.Bytes(), sum) {
t.Fatalf("expected %s got %s", exp.String(), hex.EncodeToString(sum))
}
}
func TestAllVectors(t *testing.T) {
for i := 1; i <= 20; i++ {
data, expect := test.GetVector(t, i)
t.Run(fmt.Sprintf("data length %d, vector %d", len(data), i), func(t *testing.T) {
m := mock.NewStorer()
p := NewPipeline(m)
_, _ = p.Write(data)
sum, err := p.Sum()
if err != nil {
t.Fatal(err)
}
a := swarm.NewAddress(sum)
if !a.Equal(expect) {
t.Fatalf("failed run %d, expected address %s but got %s", i, expect.String(), a.String())
}
})
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
import (
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
type pipeWriteArgs struct {
ref []byte
span []byte
data []byte //data includes the span too
}
// NewPipeline creates a standard pipeline that only hashes content with BMT to create
// a merkle-tree of hashes that represent the given arbitrary size byte stream. Partial
// writes are supported. The pipeline flow is: Data -> Feeder -> BMT -> Storage -> HashTrie.
func NewPipeline(s storage.Storer) Interface {
tw := newHashTrieWriter(swarm.ChunkSize, swarm.Branches, swarm.HashSize, newShortPipelineFunc(s))
lsw := newStoreWriter(s, tw)
b := newBmtWriter(128, lsw)
feeder := newChunkFeederWriter(swarm.ChunkSize, b)
return feeder
}
type pipelineFunc func(p *pipeWriteArgs) chainWriter
// newShortPipelineFunc returns a constructor function for an ephemeral hashing pipeline
// needed by the hashTrieWriter.
func newShortPipelineFunc(s storage.Storer) func(*pipeWriteArgs) chainWriter {
return func(p *pipeWriteArgs) chainWriter {
rsw := newResultWriter(p)
lsw := newStoreWriter(s, rsw)
bw := newBmtWriter(128, lsw)
return bw
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
import "errors"
type resultWriter struct {
target *pipeWriteArgs
}
func newResultWriter(b *pipeWriteArgs) chainWriter {
return &resultWriter{target: b}
}
func (w *resultWriter) chainWrite(p *pipeWriteArgs) error {
*w.target = *p
return nil
}
func (w *resultWriter) sum() ([]byte, error) {
return nil, errors.New("not implemented")
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
import (
"context"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
type storeWriter struct {
l storage.Putter
next chainWriter
}
// newStoreWriter returns a storeWriter. It just writes the given data
// to a given storage.Storer.
func newStoreWriter(l storage.Putter, next chainWriter) chainWriter {
return &storeWriter{l: l, next: next}
}
func (w *storeWriter) chainWrite(p *pipeWriteArgs) error {
c := swarm.NewChunk(swarm.NewAddress(p.ref), p.data)
_, err := w.l.Put(context.Background(), storage.ModePutUpload, c)
if err != nil {
return err
}
return w.next.chainWrite(p)
}
func (w *storeWriter) sum() ([]byte, error) {
return w.next.sum()
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pipeline
import "io"
// chainWriter is a writer in a pipeline.
// It is up to the implementer to decide whether a writer
// calls the next writer or not. Implementers should
// call the Sum method of the subsequent writer in case there
// exists one.
type chainWriter interface {
chainWrite(*pipeWriteArgs) error
sum() ([]byte, error)
}
// Interface exposes an `io.Writer` and `Sum` method, for components to use as a black box.
// Within a pipeline, writers are chainable. It is up for the implementer to decide whether
// a writer calls the next writer. Implementers should always implement the `Sum` method
// and call the next writer's `Sum` method (in case there is one), returning its result to
// the calling context.
type Interface interface {
io.Writer
Sum() ([]byte, error)
}
......@@ -72,7 +72,7 @@ func TestSplitterJobPartialSingleChunk(t *testing.T) {
// TestSplitterJobVector verifies file hasher results of legacy test vectors
func TestSplitterJobVector(t *testing.T) {
for i := start; i < end; i++ {
for i := start; i < end-2; i++ {
dataLengthStr := strconv.Itoa(i)
t.Run(dataLengthStr, testSplitterJobVector)
}
......
......@@ -14,27 +14,27 @@ import (
var (
fileByteMod int = 255
fileLengths = []int{
31, // 0
32, // 1
33, // 2
63, // 3
64, // 4
65, // 5
swarm.ChunkSize, // 6
swarm.ChunkSize + 31, // 7
swarm.ChunkSize + 32, // 8
swarm.ChunkSize + 63, // 9
swarm.ChunkSize + 64, // 10
swarm.ChunkSize * 2, // 11
swarm.ChunkSize*2 + 32, // 12
swarm.ChunkSize * 128, // 13
swarm.ChunkSize*128 + 31, // 14
swarm.ChunkSize*128 + 32, // 15
swarm.ChunkSize*128 + 64, // 16
swarm.ChunkSize * 129, // 17
swarm.ChunkSize * 130, // 18
//swarm.ChunkSize * 128 * 128, // 19
//swarm.ChunkSize*128*128 + 32, // 20
31, // 0
32, // 1
33, // 2
63, // 3
64, // 4
65, // 5
swarm.ChunkSize, // 6
swarm.ChunkSize + 31, // 7
swarm.ChunkSize + 32, // 8
swarm.ChunkSize + 63, // 9
swarm.ChunkSize + 64, // 10
swarm.ChunkSize * 2, // 11
swarm.ChunkSize*2 + 32, // 12
swarm.ChunkSize * 128, // 13
swarm.ChunkSize*128 + 31, // 14
swarm.ChunkSize*128 + 32, // 15
swarm.ChunkSize*128 + 64, // 16
swarm.ChunkSize * 129, // 17
swarm.ChunkSize * 130, // 18
swarm.ChunkSize * 128 * 128, // 19
swarm.ChunkSize*128*128 + 32, // 20
}
fileExpectHashHex = []string{
"ece86edb20669cc60d142789d464d57bdf5e33cb789d443f608cbd81cfa5697d", // 0
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment