Commit cc2fe7be authored by lash's avatar lash Committed by GitHub

API for upload and download of multi-chunk data (#266)

parent 07adec3b
......@@ -16,6 +16,7 @@ import (
cmdfile "github.com/ethersphere/bee/cmd/internal/file"
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/splitter"
"github.com/ethersphere/bee/pkg/logging"
......@@ -58,7 +59,7 @@ func getEntry(cmd *cobra.Command, args []string) (err error) {
writeCloser := cmdfile.NopWriteCloser(buf)
limitBuf := cmdfile.NewLimitWriteCloser(writeCloser, limitMetadataLength)
j := joiner.NewSimpleJoiner(store)
err = cmdfile.JoinReadAll(j, addr, limitBuf)
_, err = file.JoinReadAll(j, addr, limitBuf)
if err != nil {
return err
}
......@@ -69,7 +70,7 @@ func getEntry(cmd *cobra.Command, args []string) (err error) {
}
buf = bytes.NewBuffer(nil)
err = cmdfile.JoinReadAll(j, e.Metadata(), buf)
_, err = file.JoinReadAll(j, e.Metadata(), buf)
if err != nil {
return err
}
......@@ -115,8 +116,8 @@ func getEntry(cmd *cobra.Command, args []string) (err error) {
return err
}
defer outFile.Close()
return cmdfile.JoinReadAll(j, e.Reference(), outFile)
_, err = file.JoinReadAll(j, e.Reference(), outFile)
return err
}
// putEntry creates a new file entry with the given reference.
......
......@@ -10,6 +10,7 @@ import (
"path/filepath"
cmdfile "github.com/ethersphere/bee/cmd/internal/file"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -74,7 +75,8 @@ func Join(cmd *cobra.Command, args []string) (err error) {
// create the join and get its data reader
j := joiner.NewSimpleJoiner(store)
return cmdfile.JoinReadAll(j, addr, outFile)
_, err = file.JoinReadAll(j, addr, outFile)
return err
}
func main() {
......
......@@ -19,7 +19,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -187,35 +186,6 @@ func (l *LimitWriteCloser) Write(b []byte) (int, error) {
return c, err
}
// JoinReadAll reads all output from the provided joiner.
func JoinReadAll(j file.Joiner, addr swarm.Address, outFile io.Writer) error {
r, l, err := j.Join(context.Background(), addr)
if err != nil {
return err
}
// join, rinse, repeat until done
data := make([]byte, swarm.ChunkSize)
var total int64
for i := int64(0); i < l; i += swarm.ChunkSize {
cr, err := r.Read(data)
if err != nil {
return err
}
total += int64(cr)
cw, err := outFile.Write(data[:cr])
if err != nil {
return err
}
if cw != cr {
return fmt.Errorf("short wrote %d of %d for chunk %d", cw, cr, i)
}
}
if total != l {
return fmt.Errorf("received only %d of %d total bytes", total, l)
}
return nil
}
func SetLogger(cmd *cobra.Command, verbosityString string) (logger logging.Logger, err error) {
v := strings.ToLower(verbosityString)
switch v {
......
......@@ -7,7 +7,6 @@ package file_test
import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http/httptest"
"net/url"
......@@ -18,7 +17,6 @@ import (
cmdfile "github.com/ethersphere/bee/cmd/internal/file"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
......@@ -152,42 +150,6 @@ func TestLimitWriter(t *testing.T) {
}
}
// TestJoinReadAll verifies that data in excess of a single chunk is returned
// in its entirety.
func TestJoinReadAll(t *testing.T) {
var dataLength int64 = swarm.ChunkSize + 2
j := newMockJoiner(dataLength)
buf := bytes.NewBuffer(nil)
err := cmdfile.JoinReadAll(j, swarm.ZeroAddress, buf)
if err != nil {
t.Fatal(err)
}
if dataLength != int64(len(buf.Bytes())) {
t.Fatalf("expected length %d, got %d", dataLength, len(buf.Bytes()))
}
}
// mockJoiner is an implementation of file,Joiner that short-circuits that returns
// a mock byte vector of the length given at initialization.
type mockJoiner struct {
l int64
}
// Join implements file.Joiner.
func (j *mockJoiner) Join(ctx context.Context, address swarm.Address) (dataOut io.ReadCloser, dataLength int64, err error) {
data := make([]byte, j.l)
buf := bytes.NewBuffer(data)
readCloser := ioutil.NopCloser(buf)
return readCloser, j.l, nil
}
// newMockJoiner creates a new mockJoiner.
func newMockJoiner(l int64) file.Joiner {
return &mockJoiner{
l: l,
}
}
// newTestServer creates an http server to serve the bee http api endpoints.
func newTestServer(t *testing.T, storer storage.Storer) *url.URL {
t.Helper()
......
......@@ -23,14 +23,18 @@ type testServerOptions struct {
Pingpong pingpong.Interface
Storer storage.Storer
Tags *tags.Tags
Logger logging.Logger
}
func newTestServer(t *testing.T, o testServerOptions) *http.Client {
if o.Logger == nil {
o.Logger = logging.New(ioutil.Discard, 0)
}
s := api.New(api.Options{
Pingpong: o.Pingpong,
Tags: o.Tags,
Storer: o.Storer,
Logger: logging.New(ioutil.Discard, 0),
Logger: o.Logger,
})
ts := httptest.NewServer(s)
t.Cleanup(ts.Close)
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"bytes"
"encoding/binary"
"errors"
"hash"
"io"
"io/ioutil"
"net/http"
bmtlegacy "github.com/ethersphere/bmt/legacy"
"github.com/gorilla/mux"
"golang.org/x/crypto/sha3"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
type bzzPostResponse struct {
Hash swarm.Address `json:"hash"`
}
func hashFunc() hash.Hash {
return sha3.NewLegacyKeccak256()
}
func (s *server) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
data, err := ioutil.ReadAll(r.Body)
if err != nil {
s.Logger.Debugf("bzz: read error: %v", err)
s.Logger.Error("bzz: read error")
jsonhttp.InternalServerError(w, "cannot read request")
return
}
p := bmtlegacy.NewTreePool(hashFunc, swarm.Branches, bmtlegacy.PoolSize)
hasher := bmtlegacy.New(p)
span := len(data)
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(span))
data = append(b, data...)
err = hasher.SetSpan(int64(span))
if err != nil {
s.Logger.Debugf("bzz: hasher set span: %v", err)
s.Logger.Error("bzz: hash data error")
jsonhttp.InternalServerError(w, "cannot hash data")
return
}
_, err = hasher.Write(data[8:])
if err != nil {
s.Logger.Debugf("bzz: hasher write: %v", err)
s.Logger.Error("bzz: hash data error")
jsonhttp.InternalServerError(w, "cannot hash data")
return
}
addr := swarm.NewAddress(hasher.Sum(nil))
_, err = s.Storer.Put(ctx, storage.ModePutUpload, swarm.NewChunk(addr, data))
if err != nil {
s.Logger.Debugf("bzz: write error: %v, addr %s", err, addr)
s.Logger.Error("bzz: write error")
jsonhttp.InternalServerError(w, "write error")
return
}
jsonhttp.OK(w, bzzPostResponse{Hash: addr})
}
func (s *server) bzzGetHandler(w http.ResponseWriter, r *http.Request) {
addr := mux.Vars(r)["address"]
ctx := r.Context()
address, err := swarm.ParseHexAddress(addr)
if err != nil {
s.Logger.Debugf("bzz: parse address %s: %v", addr, err)
s.Logger.Error("bzz: parse address error")
jsonhttp.BadRequest(w, "invalid address")
return
}
chunk, err := s.Storer.Get(ctx, storage.ModeGetRequest, address)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
s.Logger.Trace("bzz: not found. addr %s", address)
jsonhttp.NotFound(w, "not found")
return
}
s.Logger.Debugf("bzz: read error: %v ,addr %s", err, address)
s.Logger.Error("bzz: read error")
jsonhttp.InternalServerError(w, "read error")
return
}
w.Header().Set("Content-Type", "application/octet-stream")
_, _ = io.Copy(w, bytes.NewReader(chunk.Data()[8:]))
}
......@@ -6,6 +6,6 @@ package api
type (
PingpongResponse = pingpongResponse
BzzPostResponse = bzzPostResponse
RawPostResponse = rawPostResponse
TagResponse = tagResponse
)
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"github.com/gorilla/mux"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/splitter"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
type rawPostResponse struct {
Hash swarm.Address `json:"hash"`
}
// rawUploadHandler handles upload of raw binary data of arbitrary length.
func (s *server) rawUploadHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
responseObject, err := s.splitUpload(ctx, r.Body, r.ContentLength)
if err != nil {
s.Logger.Debugf("raw: %v", err)
o := responseObject.(jsonhttp.StatusResponse)
jsonhttp.Respond(w, o.Code, o)
} else {
jsonhttp.OK(w, responseObject)
}
}
func (s *server) splitUpload(ctx context.Context, r io.ReadCloser, l int64) (interface{}, error) {
chunkPipe := file.NewChunkPipe()
go func() {
buf := make([]byte, swarm.ChunkSize)
c, err := io.CopyBuffer(chunkPipe, r, buf)
if err != nil {
s.Logger.Debugf("split upload: io error %d: %v", c, err)
s.Logger.Error("io error")
return
}
if c != l {
s.Logger.Debugf("split upload: read count mismatch %d: %v", c, err)
s.Logger.Error("read count mismatch")
return
}
err = chunkPipe.Close()
if err != nil {
s.Logger.Errorf("split upload: incomplete file write close %v", err)
s.Logger.Error("incomplete file write close")
}
}()
sp := splitter.NewSimpleSplitter(s.Storer)
address, err := sp.Split(ctx, chunkPipe, l)
var response jsonhttp.StatusResponse
if err != nil {
response.Message = "upload error"
response.Code = http.StatusInternalServerError
err = fmt.Errorf("%s: %v", response.Message, err)
return response, err
}
return rawPostResponse{Hash: address}, nil
}
// rawGetHandler handles retrieval of raw binary data of arbitrary length.
func (s *server) rawGetHandler(w http.ResponseWriter, r *http.Request) {
addressHex := mux.Vars(r)["address"]
ctx := r.Context()
address, err := swarm.ParseHexAddress(addressHex)
if err != nil {
s.Logger.Debugf("raw: parse address %s: %v", addressHex, err)
s.Logger.Error("raw: parse address error")
jsonhttp.BadRequest(w, "invalid address")
return
}
j := joiner.NewSimpleJoiner(s.Storer)
dataSize, err := j.Size(ctx, address)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
s.Logger.Debugf("raw: not found %s: %v", address, err)
s.Logger.Error("raw: not found")
jsonhttp.NotFound(w, "not found")
return
}
s.Logger.Debugf("raw: invalid root chunk %s: %v", address, err)
s.Logger.Error("raw: invalid root chunk")
jsonhttp.BadRequest(w, "invalid root chunk")
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", fmt.Sprintf("%d", dataSize))
c, err := file.JoinReadAll(j, address, w)
if err != nil && c == 0 {
s.Logger.Errorf("raw: data write %s: %v", address, err)
s.Logger.Error("raw: data input error")
jsonhttp.InternalServerError(w, "retrieval fail")
}
}
......@@ -13,25 +13,34 @@ import (
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
mockbytes "gitlab.com/nolash/go-mockbytes"
)
// TestBzz tests that the data upload api responds as expected when uploading,
// TestRaw tests that the data upload api responds as expected when uploading,
// downloading and requesting a resource that cannot be found.
func TestBzz(t *testing.T) {
func TestRaw(t *testing.T) {
var (
resource = "/bzz"
content = []byte("foo")
expHash = "2387e8e7d8a48c2a9339c97c1dc3461a9a7aa07e994c5cb8b38fd7c1b3e6ea48"
resource = "/bzz-raw"
expHash = "29a5fb121ce96194ba8b7b823a1f9c6af87e1791f824940a53b5a7efe3f790d9"
mockStorer = mock.NewStorer()
client = newTestServer(t, testServerOptions{
Storer: mockStorer,
Tags: tags.NewTags(),
Logger: logging.New(ioutil.Discard, 5),
})
)
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
content, err := g.SequentialBytes(swarm.ChunkSize * 2)
if err != nil {
t.Fatal(err)
}
t.Run("upload", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodPost, resource, bytes.NewReader(content), http.StatusOK, api.BzzPostResponse{
jsonhttptest.ResponseDirect(t, client, http.MethodPost, resource, bytes.NewReader(content), http.StatusOK, api.RawPostResponse{
Hash: swarm.MustParseHexAddress(expHash),
})
})
......
......@@ -32,12 +32,12 @@ func (s *server) setupRouting() {
"POST": http.HandlerFunc(s.pingpongHandler),
})
router.Handle("/bzz", jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.bzzUploadHandler),
router.Handle("/bzz-raw", jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.rawUploadHandler),
})
router.Handle("/bzz/{address}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.bzzGetHandler),
router.Handle("/bzz-raw/{address}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.rawGetHandler),
})
router.Handle("/bzz-chunk/{addr}", jsonhttp.MethodHandler{
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package file
import (
"io"
"github.com/ethersphere/bee/pkg/swarm"
)
const (
maxBufferSize = swarm.ChunkSize * 2
)
// ChunkPipe ensures that only the last read is smaller than the chunk size,
// regardless of size of individual writes.
type ChunkPipe struct {
io.ReadCloser
writer io.WriteCloser
data []byte
cursor int
}
// Creates a new ChunkPipe
func NewChunkPipe() io.ReadWriteCloser {
r, w := io.Pipe()
return &ChunkPipe{
ReadCloser: r,
writer: w,
data: make([]byte, maxBufferSize),
}
}
// Read implements io.Reader
func (c *ChunkPipe) Read(b []byte) (int, error) {
return c.ReadCloser.Read(b)
}
// Writer implements io.Writer
func (c *ChunkPipe) Write(b []byte) (int, error) {
copy(c.data[c.cursor:], b)
c.cursor += len(b)
if c.cursor >= swarm.ChunkSize {
_, err := c.writer.Write(c.data[:swarm.ChunkSize])
if err != nil {
return len(b), err
}
c.cursor -= swarm.ChunkSize
copy(c.data, c.data[swarm.ChunkSize:])
}
return len(b), nil
}
// Closer implements io.Closer
func (c *ChunkPipe) Close() error {
if c.cursor > 0 {
_, err := c.writer.Write(c.data[:c.cursor])
if err != nil {
return err
}
}
return c.writer.Close()
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package file_test
import (
"fmt"
"io"
"strconv"
"strings"
"testing"
"time"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/swarm"
)
var (
dataWrites = [][]int{
{swarm.ChunkSize - 2}, // short
{swarm.ChunkSize - 2, 4}, // short, over
{swarm.ChunkSize - 2, 4, swarm.ChunkSize - 6}, // short, over, short
{swarm.ChunkSize - 2, 4, swarm.ChunkSize - 4}, // short, over, onononon
{swarm.ChunkSize, 2, swarm.ChunkSize - 4}, // on, short, short
{swarm.ChunkSize, 2, swarm.ChunkSize - 2}, // on, short, on
{swarm.ChunkSize, 2, swarm.ChunkSize}, // on, short, over
{swarm.ChunkSize, 2, swarm.ChunkSize - 2, 4}, // on, short, on, short
{swarm.ChunkSize, swarm.ChunkSize}, // on, on
}
)
// TestChunkPipe verifies that the reads are correctly buffered for
// various write length combinations.
func TestChunkPipe(t *testing.T) {
for i := range dataWrites {
t.Run(fmt.Sprintf("%d", i), testChunkPipe)
}
}
func testChunkPipe(t *testing.T) {
paramString := strings.Split(t.Name(), "/")
dataWriteIdx, err := strconv.ParseInt(paramString[1], 10, 0)
if err != nil {
t.Fatal(err)
}
buf := file.NewChunkPipe()
sizeC := make(chan int, 255)
errC := make(chan error, 1)
go func() {
data := make([]byte, swarm.ChunkSize)
for {
// get buffered chunkpipe read
c, err := buf.Read(data)
sizeC <- c
if err != nil {
close(sizeC)
errC <- err
return
}
// only the last read should be smaller than chunk size
if c < swarm.ChunkSize {
close(sizeC)
errC <- nil
return
}
}
}()
// do the writes
dataWrite := dataWrites[dataWriteIdx]
writeTotal := 0
for _, l := range dataWrite {
data := make([]byte, l)
c, err := buf.Write(data)
if err != nil {
t.Fatal(err)
}
if c != l {
t.Fatalf("short write")
}
writeTotal += l
}
// finish up (last unfinished chunk write will be flushed)
err = buf.Close()
if err != nil {
t.Fatal(err)
}
// receive the writes
// err may or may not be EOF, depending on whether writes end on
// chunk boundary
timer := time.NewTimer(time.Second)
readTotal := 0
for {
select {
case c := <-sizeC:
readTotal += c
if readTotal == writeTotal {
return
}
case err = <-errC:
if err != nil {
if err != io.EOF {
t.Fatal(err)
}
}
case <-timer.C:
t.Fatal("timeout")
}
}
}
......@@ -7,6 +7,7 @@ package file
import (
"context"
"fmt"
"io"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -23,6 +24,7 @@ var (
// The called can then read the data on the io.Reader that was provided.
type Joiner interface {
Join(ctx context.Context, address swarm.Address) (dataOut io.ReadCloser, dataLength int64, err error)
Size(ctx context.Context, address swarm.Address) (dataLength int64, err error)
}
// Splitter starts a new file splitting job.
......@@ -33,3 +35,32 @@ type Joiner interface {
type Splitter interface {
Split(ctx context.Context, dataIn io.ReadCloser, dataLength int64) (addr swarm.Address, err error)
}
// JoinReadAll reads all output from the provided joiner.
func JoinReadAll(j Joiner, addr swarm.Address, outFile io.Writer) (int64, error) {
r, l, err := j.Join(context.Background(), addr)
if err != nil {
return 0, err
}
// join, rinse, repeat until done
data := make([]byte, swarm.ChunkSize)
var total int64
for i := int64(0); i < l; i += swarm.ChunkSize {
cr, err := r.Read(data)
if err != nil {
return total, err
}
total += int64(cr)
cw, err := outFile.Write(data[:cr])
if err != nil {
return total, err
}
if cw != cr {
return total, fmt.Errorf("short wrote %d of %d for chunk %d", cw, cr, i)
}
}
if total != l {
return total, fmt.Errorf("received only %d of %d total bytes", total, l)
}
return total, nil
}
......@@ -8,6 +8,7 @@ import (
"bytes"
"context"
"io"
"io/ioutil"
"strconv"
"strings"
"testing"
......@@ -85,3 +86,47 @@ func testSplitThenJoin(t *testing.T) {
t.Fatalf("data mismatch %d", len(data))
}
}
// TestJoinReadAll verifies that data in excess of a single chunk is returned
// in its entirety.
func TestJoinReadAll(t *testing.T) {
var dataLength int64 = swarm.ChunkSize + 2
j := newMockJoiner(dataLength)
buf := bytes.NewBuffer(nil)
c, err := file.JoinReadAll(j, swarm.ZeroAddress, buf)
if err != nil {
t.Fatal(err)
}
if dataLength != c {
t.Fatalf("expected readall return length %d, got %d", dataLength, c)
}
if dataLength != int64(len(buf.Bytes())) {
t.Fatalf("expected length %d, got %d", dataLength, len(buf.Bytes()))
}
}
// mockJoiner is an implementation of file,Joiner that short-circuits that returns
// a mock byte vector of the length given at initialization.
type mockJoiner struct {
l int64
}
// Join implements file.Joiner.
func (j *mockJoiner) Join(ctx context.Context, address swarm.Address) (dataOut io.ReadCloser, dataLength int64, err error) {
data := make([]byte, j.l)
buf := bytes.NewBuffer(data)
readCloser := ioutil.NopCloser(buf)
return readCloser, j.l, nil
}
func (j *mockJoiner) Size(ctx context.Context, address swarm.Address) (dataSize int64, err error) {
return j.l, nil
}
// newMockJoiner creates a new mockJoiner.
func newMockJoiner(l int64) file.Joiner {
return &mockJoiner{
l: l,
}
}
......@@ -8,6 +8,7 @@ package joiner
import (
"context"
"encoding/binary"
"fmt"
"io"
"github.com/ethersphere/bee/pkg/file"
......@@ -28,6 +29,22 @@ func NewSimpleJoiner(getter storage.Getter) file.Joiner {
}
}
func (s *simpleJoiner) Size(ctx context.Context, address swarm.Address) (dataSize int64, err error) {
// retrieve the root chunk to read the total data length the be retrieved
rootChunk, err := s.getter.Get(ctx, storage.ModeGetRequest, address)
if err != nil {
return 0, err
}
chunkLength := rootChunk.Data()
if len(chunkLength) < 8 {
return 0, fmt.Errorf("invalid chunk content of %d bytes", chunkLength)
}
dataLength := binary.LittleEndian.Uint64(rootChunk.Data())
return int64(dataLength), nil
}
// Join implements the file.Joiner interface.
//
// It uses a non-optimized internal component that only retrieves a data chunk
......
......@@ -60,7 +60,7 @@ func NewSimpleSplitterJob(ctx context.Context, putter storage.Putter, spanLength
sumCounts: make([]int, levelBufferLimit),
cursors: make([]int, levelBufferLimit),
hasher: bmtlegacy.New(p),
buffer: make([]byte, file.ChunkWithLengthSize*levelBufferLimit),
buffer: make([]byte, file.ChunkWithLengthSize*levelBufferLimit*2), // double size as temp workaround for weak calculation of needed buffer space
}
}
......
......@@ -39,16 +39,19 @@ func (s *simpleSplitter) Split(ctx context.Context, r io.ReadCloser, dataLength
var total int64
data := make([]byte, swarm.ChunkSize)
for {
var eof bool
for !eof {
c, err := r.Read(data)
total += int64(c)
if err != nil {
if err == io.EOF {
if total < dataLength {
return swarm.ZeroAddress, fmt.Errorf("splitter only received %d bytes of data, expected %d bytes", total, dataLength)
return swarm.ZeroAddress, fmt.Errorf("splitter only received %d bytes of data, expected %d bytes", total+int64(c), dataLength)
}
break
eof = true
} else {
return swarm.ZeroAddress, err
}
return swarm.ZeroAddress, err
}
cc, err := j.Write(data[:c])
if err != nil {
......@@ -57,7 +60,6 @@ func (s *simpleSplitter) Split(ctx context.Context, r io.ReadCloser, dataLength
if cc < c {
return swarm.ZeroAddress, fmt.Errorf("write count to file hasher component %d does not match read count %d", cc, c)
}
total += int64(c)
}
sum := j.Sum(nil)
......
......@@ -5,8 +5,10 @@
package splitter_test
import (
"bytes"
"context"
"testing"
"time"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/splitter"
......@@ -106,3 +108,77 @@ func TestSplitThreeLevels(t *testing.T) {
}
}
}
// TestUnalignedSplit tests that correct hash is generated regarless of
// individual write sizes at the source of the data.
func TestUnalignedSplit(t *testing.T) {
var (
storer storage.Storer = mock.NewStorer()
chunkPipe = file.NewChunkPipe()
)
// test vector taken from pkg/file/testing/vector.go
var (
dataLen int64 = swarm.ChunkSize*2 + 32
expectAddrHex = "61416726988f77b874435bdd89a419edc3861111884fd60e8adf54e2f299efd6"
g = mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
)
// generate test vector data content
content, err := g.SequentialBytes(int(dataLen))
if err != nil {
t.Fatal(err)
}
// perform the split in a separate thread
sp := splitter.NewSimpleSplitter(storer)
ctx := context.Background()
doneC := make(chan swarm.Address)
errC := make(chan error)
go func() {
addr, err := sp.Split(ctx, chunkPipe, dataLen)
if err != nil {
errC <- err
} else {
doneC <- addr
}
close(doneC)
close(errC)
}()
// perform the writes in unaligned bursts
writeSizes := []int{swarm.ChunkSize - 40, 40 + 32, swarm.ChunkSize}
contentBuf := bytes.NewReader(content)
cursor := 0
for _, writeSize := range writeSizes {
data := make([]byte, writeSize)
_, err = contentBuf.Read(data)
if err != nil {
t.Fatal(err)
}
c, err := chunkPipe.Write(data)
if err != nil {
t.Fatal(err)
}
cursor += c
}
err = chunkPipe.Close()
if err != nil {
t.Fatal(err)
}
// read and hopefully not weep
timer := time.NewTimer(time.Millisecond * 100)
select {
case addr := <-doneC:
expectAddr := swarm.MustParseHexAddress(expectAddrHex)
if !expectAddr.Equal(addr) {
t.Fatalf("addr mismatch, expected %s, got %s", expectAddr, addr)
}
case err := <-errC:
t.Fatal(err)
case <-timer.C:
t.Fatal("timeout")
}
}
......@@ -96,7 +96,7 @@ func (s *Service) chunksWorker() {
// for now ignoring the receipt and checking only for error
_, err = s.pushSyncer.PushChunkToClosest(ctx, ch)
if err != nil {
s.logger.Debugf("pusher: error while sending chunk or receiving receipt: %v", err)
s.logger.Errorf("pusher: error while sending chunk or receiving receipt: %v", err)
continue
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment