Commit 1ab01607 authored by lash's avatar lash Committed by GitHub

Improve code in api/bytes.go (#327)

parent 1f11fab5
......@@ -5,10 +5,8 @@
package api
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"github.com/ethersphere/bee/pkg/file"
......@@ -27,7 +25,8 @@ type bytesPostResponse struct {
// bytesUploadHandler handles upload of raw binary data of arbitrary length.
func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
address, err := s.splitUpload(ctx, r.Body, r.ContentLength)
sp := splitter.NewSimpleSplitter(s.Storer)
address, err := file.SplitWriteAll(ctx, sp, r.Body, r.ContentLength)
if err != nil {
s.Logger.Debugf("bytes upload: %v", err)
jsonhttp.InternalServerError(w, nil)
......@@ -38,31 +37,6 @@ func (s *server) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
})
}
func (s *server) splitUpload(ctx context.Context, r io.Reader, l int64) (swarm.Address, error) {
chunkPipe := file.NewChunkPipe()
go func() {
buf := make([]byte, swarm.ChunkSize)
c, err := io.CopyBuffer(chunkPipe, r, buf)
if err != nil {
s.Logger.Debugf("split upload: io error %d: %v", c, err)
s.Logger.Error("split upload: io error")
return
}
if c != l {
s.Logger.Debugf("split upload: read count mismatch %d: %v", c, err)
s.Logger.Error("split upload: read count mismatch")
return
}
err = chunkPipe.Close()
if err != nil {
s.Logger.Debugf("split upload: incomplete file write close %v", err)
s.Logger.Error("split upload: incomplete file write close")
}
}()
sp := splitter.NewSimpleSplitter(s.Storer)
return sp.Split(ctx, chunkPipe, l)
}
// bytesGetHandler handles retrieval of raw binary data of arbitrary length.
func (s *server) bytesGetHandler(w http.ResponseWriter, r *http.Request) {
addressHex := mux.Vars(r)["address"]
......
......@@ -21,6 +21,7 @@ import (
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/splitter"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -131,7 +132,8 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
}
// first store the file and get its reference
fr, err := s.splitUpload(ctx, reader, int64(fileSize))
sp := splitter.NewSimpleSplitter(s.Storer)
fr, err := file.SplitWriteAll(ctx, sp, reader, int64(fileSize))
if err != nil {
s.Logger.Debugf("file upload: file store, file %q: %v", fileName, err)
s.Logger.Errorf("file upload: file store, file %q", fileName)
......@@ -154,7 +156,8 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "metadata marshal error")
return
}
mr, err := s.splitUpload(ctx, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
sp = splitter.NewSimpleSplitter(s.Storer)
mr, err := file.SplitWriteAll(ctx, sp, bytes.NewReader(metadataBytes), int64(len(metadataBytes)))
if err != nil {
s.Logger.Debugf("file upload: metadata store, file %q: %v", fileName, err)
s.Logger.Errorf("file upload: metadata store, file %q", fileName)
......@@ -171,7 +174,9 @@ func (s *server) fileUploadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.InternalServerError(w, "entry marshal error")
return
}
reference, err := s.splitUpload(ctx, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
sp = splitter.NewSimpleSplitter(s.Storer)
reference, err := file.SplitWriteAll(ctx, sp, bytes.NewReader(fileEntryBytes), int64(len(fileEntryBytes)))
if err != nil {
s.Logger.Debugf("file upload: entry store, file %q: %v", fileName, err)
s.Logger.Errorf("file upload: entry store, file %q", fileName)
......
......@@ -22,7 +22,7 @@ func (e *AbortError) Unwrap() error {
return e.err
}
// Error implement standard go error interface.
// Error implements standard go error interface.
func (e *AbortError) Error() string {
return e.err.Error()
}
......@@ -45,7 +45,7 @@ func (e *HashError) Unwrap() error {
return e.err
}
// Error implement standard go error interface.
// Error implements standard go error interface.
func (e *HashError) Error() string {
return e.err.Error()
}
......@@ -7,6 +7,7 @@ package file
import (
"context"
"errors"
"fmt"
"io"
......@@ -64,3 +65,39 @@ func JoinReadAll(j Joiner, addr swarm.Address, outFile io.Writer) (int64, error)
}
return total, nil
}
// SplitWriteAll writes all input from provided reader to the provided splitter
func SplitWriteAll(ctx context.Context, s Splitter, r io.Reader, l int64) (swarm.Address, error) {
chunkPipe := NewChunkPipe()
errC := make(chan error)
go func() {
buf := make([]byte, swarm.ChunkSize)
c, err := io.CopyBuffer(chunkPipe, r, buf)
if err != nil {
errC <- err
}
if c != l {
errC <- errors.New("read count mismatch")
}
err = chunkPipe.Close()
if err != nil {
errC <- err
}
close(errC)
}()
addr, err := s.Split(ctx, chunkPipe, l)
if err != nil {
return swarm.ZeroAddress, err
}
select {
case err := <-errC:
if err != nil {
return swarm.ZeroAddress, err
}
case <-ctx.Done():
return swarm.ZeroAddress, ctx.Err()
}
return addr, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment