Commit 886deff1 authored by lash's avatar lash Committed by GitHub

Introduce interface for file join and simple implementation (#110)

parent cf2d2cc4
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package file
// ErrAborted should be returned whenever a file operation is terminated
// before it has completed.
type AbortError struct {
err error
}
// NewErrAbort creates a new ErrAborted instance.
func NewAbortError(err error) error {
return &AbortError{
err: err,
}
}
// Unwrap returns an underlying error.
func (e *AbortError) Unwrap() error {
return e.err
}
// Error implement standard go error interface.
func (e *AbortError) Error() string {
return e.err.Error()
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package file provides interfaces for file-oriented operations.
package file
import (
"context"
"io"
"github.com/ethersphere/bee/pkg/swarm"
)
// Joiner returns file data referenced by the given Swarm Address to the given io.Reader.
//
// The call returns when the chunk for the given Swarm Address is found,
// returning the length of the data which will be returned.
// The called can then read the data on the io.Reader that was provided.
type Joiner interface {
Join(ctx context.Context, address swarm.Address) (dataOut io.ReadCloser, dataLength int64, err error)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package internal
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"os"
"sync"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
// SimpleJoinerJob encapsulates a single joiner operation, providing the consumer
// with blockwise reads of data represented by a content addressed chunk tree.
//
// Every chunk has a span length, which is a 64-bit integer in little-endian encoding
// stored as a prefix in the chunk itself. This represents the length of the data
// that reference represents.
//
// If a chunk's span length is greater than swarm.ChunkSize, the chunk will be treated
// as an intermediate chunk, meaning the contents of the chunk are handled as references
// to other chunks which in turn are retrieved.
//
// Otherwise it passes the data chunk to the io.Reader and blocks until the consumer reads
// the chunk.
//
// The process is repeated until the readCount reaches the announced spanLength of the chunk.
type SimpleJoinerJob struct {
ctx context.Context
store storage.Storer
spanLength int64 // the total length of data represented by the root chunk the job was initialized with.
readCount int64 // running count of chunks read by the io.Reader consumer.
cursors [9]int // per-level read cursor of data.
data [9][]byte // data of currently loaded chunk.
dataC chan []byte // channel to pass data chunks to the io.Reader method.
doneC chan struct{} // channel to signal termination of join loop
closeDoneOnce sync.Once // make sure done channel is closed only once
err error // read by the main thread to capture error state of the job
logger logging.Logger
}
// NewSimpleJoinerJob creates a new simpleJoinerJob.
func NewSimpleJoinerJob(ctx context.Context, store storage.Storer, rootChunk swarm.Chunk) *SimpleJoinerJob {
spanLength := binary.LittleEndian.Uint64(rootChunk.Data()[:8])
levelCount := getLevelsFromLength(int64(spanLength), swarm.SectionSize, swarm.Branches)
j := &SimpleJoinerJob{
ctx: ctx,
store: store,
spanLength: int64(spanLength),
dataC: make(chan []byte),
doneC: make(chan struct{}),
logger: logging.New(os.Stderr, 6),
}
// startLevelIndex is the root chunk level
// data level has index 0
startLevelIndex := levelCount - 1
j.data[startLevelIndex] = rootChunk.Data()[8:]
j.logger.Tracef("simple joiner start index %d for address %x", startLevelIndex, rootChunk.Address())
// retrieval must be asynchronous to the io.Reader()
go func() {
err := j.start(startLevelIndex)
if err != nil {
// this will only already be closed if all the chunk data has been fully read
// in this case the error will always be nil and this will not be executed
if err != io.EOF {
j.logger.Errorf("simple joiner chunk join job fail: %v", err)
} else {
j.logger.Tracef("simple joiner chunk join job eof")
}
}
j.err = err
close(j.dataC)
j.closeDone()
}()
return j
}
// start processes all chunk references of the root chunk that already has been retrieved.
func (j *SimpleJoinerJob) start(level int) error {
// consume the reference at the current cursor position of the chunk level data
// and start recursive retrieval down to the underlying data chunks
for j.cursors[level] < len(j.data[level]) {
err := j.nextReference(level)
if err != nil {
return err
}
}
return nil
}
// nextReference gets the next chunk reference at the cursor of the chunk currently loaded
// for the specified level.
func (j *SimpleJoinerJob) nextReference(level int) error {
data := j.data[level]
cursor := j.cursors[level]
chunkAddress := swarm.NewAddress(data[cursor : cursor+swarm.SectionSize])
err := j.nextChunk(level-1, chunkAddress)
if err != nil {
return err
}
// move the cursor to the next reference
j.cursors[level] += swarm.SectionSize
return nil
}
// nextChunk retrieves data chunks by resolving references in intermediate chunks.
// The method will be called recursively via the nextReference method when
// the current chunk is an intermediate chunk.
// When a data chunk is found it is passed on the dataC channel to be consumed by the
// io.Reader consumer.
func (j *SimpleJoinerJob) nextChunk(level int, address swarm.Address) error {
// attempt to retrieve the chunk
ch, err := j.store.Get(j.ctx, storage.ModeGetRequest, address)
if err != nil {
return err
}
j.cursors[level] = 0
j.data[level] = ch.Data()[8:]
// any level higher than 0 means the chunk contains references
// which must be recursively processed
if level > 0 {
for j.cursors[level] < len(j.data[level]) {
if len(j.data[level]) == j.cursors[level] {
j.data[level] = ch.Data()[8:]
j.cursors[level] = 0
}
err = j.nextReference(level)
if err != nil {
return err
}
}
} else {
// read data and pass to reader only if session is still active
// * context cancelled when client has disappeared, timeout etc
// * doneC receive when gracefully terminated through Close
data := ch.Data()[8:]
select {
case <-j.ctx.Done():
j.readCount = j.spanLength
return j.ctx.Err()
case <-j.doneC:
return file.NewAbortError(errors.New("chunk read aborted"))
case j.dataC <- data:
j.readCount += int64(len(data))
}
// when we reach the end of data to be read
// bubble io.EOF error to the gofunc in the
// constructor that called start()
if j.readCount == j.spanLength {
j.logger.Trace("read all")
return io.EOF
}
}
return err
}
// Read is called by the consumer to retrieve the joined data.
// It must be called with a buffer equal to the maximum chunk size.
func (j *SimpleJoinerJob) Read(b []byte) (n int, err error) {
if cap(b) != swarm.ChunkSize {
return 0, fmt.Errorf("Read must be called with a buffer of %d bytes", swarm.ChunkSize)
}
data, ok := <-j.dataC
if !ok {
<-j.doneC
return 0, j.err
}
copy(b, data)
return len(data), nil
}
// Close is called by the consumer to gracefully abort the data retrieval.
func (j *SimpleJoinerJob) Close() error {
j.closeDone()
return nil
}
// closeDone, for purpose readability, wraps the sync.Once execution of closing the doneC channel
func (j *SimpleJoinerJob) closeDone() {
j.closeDoneOnce.Do(func() {
close(j.doneC)
})
}
// getLevelsFromLength calculates the last level index which a particular data section count will result in.
// The returned level will be the level of the root hash.
func getLevelsFromLength(l int64, sectionSize int, branches int) int {
s := int64(sectionSize)
b := int64(branches)
if l == 0 {
return 0
} else if l <= s*b {
return 1
}
c := (l - 1) / s
return int(math.Log(float64(c))/math.Log(float64(b)) + 1)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package internal_test
import (
"bytes"
"context"
"io"
"testing"
"time"
"github.com/ethersphere/bee/pkg/file/joiner/internal"
filetest "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
// TestSimpleJoinerJobBlocksize checks that only Read() calls with exact
// chunk size buffer capacity is allowed.
func TestSimpleJoinerJobBlocksize(t *testing.T) {
store := mock.NewStorer()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// create root chunk with 2 references and the referenced data chunks
rootChunk := filetest.GenerateTestRandomFileChunk(swarm.ZeroAddress, swarm.ChunkSize*2, swarm.SectionSize*2)
_, err := store.Put(ctx, storage.ModePutUpload, rootChunk)
if err != nil {
t.Fatal(err)
}
firstAddress := swarm.NewAddress(rootChunk.Data()[8 : swarm.SectionSize+8])
firstChunk := filetest.GenerateTestRandomFileChunk(firstAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, firstChunk)
if err != nil {
t.Fatal(err)
}
secondAddress := swarm.NewAddress(rootChunk.Data()[swarm.SectionSize+8:])
secondChunk := filetest.GenerateTestRandomFileChunk(secondAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, secondChunk)
if err != nil {
t.Fatal(err)
}
// this buffer is too small
j := internal.NewSimpleJoinerJob(ctx, store, rootChunk)
b := make([]byte, swarm.SectionSize)
_, err = j.Read(b)
if err == nil {
t.Fatal("expected error on Read with too small buffer")
}
// this buffer is too big
b = make([]byte, swarm.ChunkSize+swarm.SectionSize)
_, err = j.Read(b)
if err == nil {
t.Fatal("expected error on Read with too big buffer")
}
// this buffer is juuuuuust right
b = make([]byte, swarm.ChunkSize)
_, err = j.Read(b)
if err != nil {
t.Fatal(err)
}
}
// TestSimpleJoinerJobOneLevel tests the retrieval of two data chunks immediately
// below the root chunk level.
func TestSimpleJoinerJobOneLevel(t *testing.T) {
store := mock.NewStorer()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// create root chunk with 2 references and the referenced data chunks
rootChunk := filetest.GenerateTestRandomFileChunk(swarm.ZeroAddress, swarm.ChunkSize*2, swarm.SectionSize*2)
_, err := store.Put(ctx, storage.ModePutUpload, rootChunk)
if err != nil {
t.Fatal(err)
}
firstAddress := swarm.NewAddress(rootChunk.Data()[8 : swarm.SectionSize+8])
firstChunk := filetest.GenerateTestRandomFileChunk(firstAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, firstChunk)
if err != nil {
t.Fatal(err)
}
secondAddress := swarm.NewAddress(rootChunk.Data()[swarm.SectionSize+8:])
secondChunk := filetest.GenerateTestRandomFileChunk(secondAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, secondChunk)
if err != nil {
t.Fatal(err)
}
j := internal.NewSimpleJoinerJob(ctx, store, rootChunk)
// verify first chunk content
outBuffer := make([]byte, 4096)
c, err := j.Read(outBuffer)
if err != nil {
t.Fatal(err)
}
if c != 4096 {
t.Fatalf("expected firstchunk read count %d, got %d", 4096, c)
}
if !bytes.Equal(outBuffer, firstChunk.Data()[8:]) {
t.Fatalf("firstchunk data mismatch, expected %x, got %x", outBuffer, firstChunk.Data()[8:])
}
// verify second chunk content
c, err = j.Read(outBuffer)
if err != nil {
t.Fatal(err)
}
if c != 4096 {
t.Fatalf("expected secondchunk read count %d, got %d", 4096, c)
}
if !bytes.Equal(outBuffer, secondChunk.Data()[8:]) {
t.Fatalf("secondchunk data mismatch, expected %x, got %x", outBuffer, secondChunk.Data()[8:])
}
// verify EOF is returned also after first time it is returned
_, err = j.Read(outBuffer)
if err != io.EOF {
t.Fatal("expected io.EOF")
}
_, err = j.Read(outBuffer)
if err != io.EOF {
t.Fatal("expected io.EOF")
}
}
// TestSimpleJoinerJobTwoLevelsAcrossChunk tests the retrieval of data chunks below
// first intermediate level across two intermediate chunks.
// Last chunk has sub-chunk length.
func TestSimpleJoinerJobTwoLevelsAcrossChunk(t *testing.T) {
store := mock.NewStorer()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// create root chunk with 2 references and two intermediate chunks with references
rootChunk := filetest.GenerateTestRandomFileChunk(swarm.ZeroAddress, swarm.ChunkSize*swarm.Branches+42, swarm.SectionSize*2)
_, err := store.Put(ctx, storage.ModePutUpload, rootChunk)
if err != nil {
t.Fatal(err)
}
firstAddress := swarm.NewAddress(rootChunk.Data()[8 : swarm.SectionSize+8])
firstChunk := filetest.GenerateTestRandomFileChunk(firstAddress, swarm.ChunkSize*swarm.Branches, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, firstChunk)
if err != nil {
t.Fatal(err)
}
secondAddress := swarm.NewAddress(rootChunk.Data()[swarm.SectionSize+8:])
secondChunk := filetest.GenerateTestRandomFileChunk(secondAddress, 42, swarm.SectionSize)
_, err = store.Put(ctx, storage.ModePutUpload, secondChunk)
if err != nil {
t.Fatal(err)
}
// create 128+1 chunks for all references in the intermediate chunks
cursor := 8
for i := 0; i < swarm.Branches; i++ {
chunkAddressBytes := firstChunk.Data()[cursor : cursor+swarm.SectionSize]
chunkAddress := swarm.NewAddress(chunkAddressBytes)
ch := filetest.GenerateTestRandomFileChunk(chunkAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err := store.Put(ctx, storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
cursor += swarm.SectionSize
}
chunkAddressBytes := secondChunk.Data()[8:]
chunkAddress := swarm.NewAddress(chunkAddressBytes)
ch := filetest.GenerateTestRandomFileChunk(chunkAddress, 42, 42)
_, err = store.Put(ctx, storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
j := internal.NewSimpleJoinerJob(ctx, store, rootChunk)
// read back all the chunks and verify
b := make([]byte, swarm.ChunkSize)
for i := 0; i < swarm.Branches; i++ {
c, err := j.Read(b)
if err != nil {
t.Fatal(err)
}
if c != swarm.ChunkSize {
t.Fatalf("chunk %d expected read %d bytes; got %d", i, swarm.ChunkSize, c)
}
}
c, err := j.Read(b)
if err != nil {
t.Fatal(err)
}
if c != 42 {
t.Fatalf("last chunk expected read %d bytes; got %d", 42, c)
}
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package joiner provides implementations of the file.Joiner interface
package joiner
import (
"context"
"encoding/binary"
"io"
"os"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner/internal"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
// simpleJoiner wraps a non-optimized implementation of file.Joiner.
type simpleJoiner struct {
store storage.Storer
logger logging.Logger
}
// simpleJoinerReadCloser wraps a byte slice in a io.ReadCloser implementation.
type simpleReadCloser struct {
buffer []byte
cursor int
}
// Read implements io.Reader.
func (s *simpleReadCloser) Read(b []byte) (int, error) {
if s.cursor == len(s.buffer) {
return 0, io.EOF
}
copy(b, s.buffer)
s.cursor += len(s.buffer)
return len(s.buffer), nil
}
// Close implements io.Closer.
func (s *simpleReadCloser) Close() error {
return nil
}
// NewSimpleJoiner creates a new simpleJoiner.
func NewSimpleJoiner(store storage.Storer) file.Joiner {
return &simpleJoiner{
store: store,
logger: logging.New(os.Stderr, 6),
}
}
// Join implements the file.Joiner interface.
//
// It uses a non-optimized internal component that only retrieves a data chunk
// after the previous has been read.
func (s *simpleJoiner) Join(ctx context.Context, address swarm.Address) (dataOut io.ReadCloser, dataSize int64, err error) {
// retrieve the root chunk to read the total data length the be retrieved
rootChunk, err := s.store.Get(ctx, storage.ModeGetRequest, address)
if err != nil {
return nil, 0, err
}
// if this is a single chunk, short circuit to returning just that chunk
spanLength := binary.LittleEndian.Uint64(rootChunk.Data())
if spanLength <= swarm.ChunkSize {
s.logger.Tracef("simplejoiner root chunk %v is single chunk, skipping join and returning directly", rootChunk)
return &simpleReadCloser{
buffer: (rootChunk.Data()[8:]),
}, int64(spanLength), nil
}
s.logger.Tracef("simplejoiner joining root chunk %v", rootChunk)
r := internal.NewSimpleJoinerJob(ctx, s.store, rootChunk)
return r, int64(spanLength), nil
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package joiner_test
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"testing"
"github.com/ethersphere/bee/pkg/file/joiner"
filetest "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
// TestJoiner verifies that a newly created joiner returns the data stored
// in the store when the reference is one single chunk.
func TestJoinerSingleChunk(t *testing.T) {
store := mock.NewStorer()
joiner := joiner.NewSimpleJoiner(store)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var err error
_, _, err = joiner.Join(ctx, swarm.ZeroAddress)
if err != storage.ErrNotFound {
t.Fatalf("expected ErrNotFound for %x", swarm.ZeroAddress)
}
// create the chunk to
mockAddrHex := fmt.Sprintf("%064s", "2a")
mockAddr := swarm.MustParseHexAddress(mockAddrHex)
mockData := []byte("foo")
mockDataLengthBytes := make([]byte, 8)
mockDataLengthBytes[0] = 0x03
mockChunk := swarm.NewChunk(mockAddr, append(mockDataLengthBytes, mockData...))
_, err = store.Put(ctx, storage.ModePutUpload, mockChunk)
if err != nil {
t.Fatal(err)
}
// read back data and compare
joinReader, l, err := joiner.Join(ctx, mockAddr)
if err != nil {
t.Fatal(err)
}
if l != int64(len(mockData)) {
t.Fatalf("expected join data length %d, got %d", len(mockData), l)
}
joinData, err := ioutil.ReadAll(joinReader)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(joinData, mockData) {
t.Fatalf("retrieved data '%x' not like original data '%x'", joinData, mockData)
}
}
// TestJoinerWithReference verifies that a chunk reference is correctly resolved
// and the underlying data is returned.
func TestJoinerWithReference(t *testing.T) {
store := mock.NewStorer()
joiner := joiner.NewSimpleJoiner(store)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// create root chunk and two data chunks referenced in the root chunk
rootChunk := filetest.GenerateTestRandomFileChunk(swarm.ZeroAddress, swarm.ChunkSize*2, swarm.SectionSize*2)
_, err := store.Put(ctx, storage.ModePutUpload, rootChunk)
if err != nil {
t.Fatal(err)
}
firstAddress := swarm.NewAddress(rootChunk.Data()[8 : swarm.SectionSize+8])
firstChunk := filetest.GenerateTestRandomFileChunk(firstAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, firstChunk)
if err != nil {
t.Fatal(err)
}
secondAddress := swarm.NewAddress(rootChunk.Data()[swarm.SectionSize+8:])
secondChunk := filetest.GenerateTestRandomFileChunk(secondAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, secondChunk)
if err != nil {
t.Fatal(err)
}
// read back data and compare
joinReader, l, err := joiner.Join(ctx, rootChunk.Address())
if err != nil {
t.Fatal(err)
}
if l != int64(swarm.ChunkSize*2) {
t.Fatalf("expected join data length %d, got %d", swarm.ChunkSize*2, l)
}
resultBuffer := make([]byte, swarm.ChunkSize)
n, err := joinReader.Read(resultBuffer)
if err != nil {
t.Fatal(err)
}
if n != len(resultBuffer) {
t.Fatalf("expected read count %d, got %d", len(resultBuffer), n)
}
if !bytes.Equal(resultBuffer, firstChunk.Data()[8:]) {
t.Fatalf("expected resultbuffer %v, got %v", resultBuffer, firstChunk.Data()[:len(resultBuffer)])
}
}
package testing
import (
"encoding/binary"
"math/rand"
"github.com/ethersphere/bee/pkg/swarm"
)
func GenerateTestRandomFileChunk(address swarm.Address, spanLength int, dataSize int) swarm.Chunk {
data := make([]byte, dataSize+8)
binary.LittleEndian.PutUint64(data, uint64(spanLength))
_, _ = rand.Read(data[8:]) // # skipcq: GSC-G404
key := make([]byte, swarm.SectionSize)
if address.IsZero() {
_, _ = rand.Read(key) // # skipcq: GSC-G404
} else {
copy(key, address.Bytes())
}
return swarm.NewChunk(swarm.NewAddress(key), data)
}
......@@ -38,7 +38,7 @@ func init() {
func GenerateTestRandomChunk() swarm.Chunk {
data := make([]byte, swarm.ChunkSize)
_, _ = rand.Read(data)
key := make([]byte, 32)
key := make([]byte, swarm.SectionSize)
_, _ = rand.Read(key)
return swarm.NewChunk(swarm.NewAddress(key), data)
}
......
......@@ -13,8 +13,9 @@ import (
)
const (
ChunkSize = 4096
SectionSize = 32
Branches = 128
ChunkSize = SectionSize * Branches
MaxPO = 16
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment