Commit 3f97ad93 authored by lash's avatar lash Committed by GitHub

Add CLI application for splitting data to chunks (#186)

parent ab7b3104
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"github.com/ethersphere/bee/pkg/file/splitter"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/spf13/cobra"
)
var (
outdir string // flag variable, output dir for fsStore
inputLength int64 // flag variable, limit of data input
host string // flag variable, http api host
port int // flag variable, http api port
noHttp bool // flag variable, skips http api if set
ssl bool // flag variable, uses https for api if set
)
// teeStore provides a storage.Putter that can put to multiple underlying storage.Putters
type teeStore struct {
putters []storage.Putter
}
// newTeeStore creates a new teeStore
func newTeeStore() *teeStore {
return &teeStore{}
}
// Add adds a storage.Putter
func (t *teeStore) Add(putter storage.Putter) {
t.putters = append(t.putters, putter)
}
// Put implements storage.Putter
func (t *teeStore) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
for _, putter := range t.putters {
_, err := putter.Put(ctx, mode, chs...)
if err != nil {
return nil, err
}
}
return nil, nil
}
// fsStore provides a storage.Putter that writes chunks directly to the filesystem.
// Each chunk is a separate file, where the hex address of the chunk is the file name.
type fsStore struct {
path string
}
// newFsStore creates a new fsStore
func newFsStore(path string) storage.Putter {
return &fsStore{
path: path,
}
}
// Put implements storage.Putter
func (f *fsStore) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
for _, ch := range chs {
chunkPath := filepath.Join(f.path, ch.Address().String())
err := ioutil.WriteFile(chunkPath, ch.Data(), 0o666)
if err != nil {
return nil, err
}
}
return nil, nil
}
// apiStore provies a storage.Putter that adds chunks to swarm through the HTTP chunk API.
type apiStore struct {
baseUrl string
}
// newApiStore creates a new apiStor
func newApiStore(host string, port int, ssl bool) (storage.Putter, error) {
scheme := "http"
if ssl {
scheme += "s"
}
u := &url.URL{
Host: fmt.Sprintf("%s:%d", host, port),
Scheme: scheme,
Path: "bzz-chunk",
}
return &apiStore{
baseUrl: u.String(),
}, nil
}
// Put implements storage.Putter
func (a *apiStore) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
c := http.DefaultClient
for _, ch := range chs {
addr := ch.Address().String()
buf := bytes.NewReader(ch.Data())
url := strings.Join([]string{a.baseUrl, addr}, "/")
res, err := c.Post(url, "application/octet-stream", buf)
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("upload failed: %v", res.Status)
}
}
return nil, nil
}
// limitReadCloser wraps the input to the application to limit the input to the given count flag.
type limitReadCloser struct {
io.Reader
closeFunc func() error
}
// newLimitReadCloser creates a new limitReadCloser.
func newLimitReadCloser(r io.Reader, closeFunc func() error, c int64) io.ReadCloser {
return &limitReadCloser{
Reader: io.LimitReader(r, c),
closeFunc: closeFunc,
}
}
// Close implements io.Closer
func (l *limitReadCloser) Close() error {
return l.closeFunc()
}
// Split is the underlying procedure for the CLI command
func Split(cmd *cobra.Command, args []string) (err error) {
var infile io.ReadCloser
// if one arg is set, this is the input file
// if not, we are reading from standard input
if len(args) > 0 {
// get the file length
info, err := os.Stat(args[0])
if err != nil {
return err
}
fileLength := info.Size()
// check if we are limiting the input, and if the limit is valid
if inputLength > 0 {
if inputLength > fileLength {
return fmt.Errorf("input data length set to %d on file with length %d", inputLength, fileLength)
}
} else {
inputLength = fileLength
}
// open file and wrap in limiter
f, err := os.Open(args[0])
if err != nil {
return err
}
infile = newLimitReadCloser(f, f.Close, inputLength)
} else {
// this simple splitter is too stupid to handle open-ended input, sadly
if inputLength == 0 {
return errors.New("must specify length of input on stdin")
}
infile = newLimitReadCloser(os.Stdin, func() error { return nil }, inputLength)
}
// add the fsStore and/or apiStore, depending on flags
stores := newTeeStore()
if outdir != "" {
err := os.MkdirAll(outdir, 0o777) // skipcq: GSC-G301
if err != nil {
return err
}
store := newFsStore(outdir)
stores.Add(store)
}
if !noHttp {
store, err := newApiStore(host, port, ssl)
if err != nil {
return err
}
stores.Add(store)
}
// split and rule
s := splitter.NewSimpleSplitter(stores)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
addr, err := s.Split(ctx, infile, inputLength)
if err != nil {
return err
}
// output the resulting hash
fmt.Println(addr)
return nil
}
func main() {
c := &cobra.Command{
Use: "split [datafile]",
Args: cobra.RangeArgs(0, 1),
Short: "Split data into swarm chunks",
Long: `Creates and stores Swarm chunks from input data.
If datafile is not given, data will be read from standard in. In this case the --count flag must be set
to the length of the input.
The application will expect to transmit the chunks to the bee HTTP API, unless the --no-http flag has been set.
If --output-dir is set, the chunks will be saved to the file system, using the flag argument as destination directory.
Chunks are saved in individual files, and the file names will be the hex addresses of the chunks.`,
RunE: Split,
}
c.Flags().StringVarP(&outdir, "output-dir", "d", "", "saves chunks to given directory")
c.Flags().Int64VarP(&inputLength, "count", "c", 0, "read at most this many bytes")
c.Flags().StringVar(&host, "host", "127.0.0.1", "api host")
c.Flags().IntVar(&port, "port", 8080, "api port")
c.Flags().BoolVar(&ssl, "ssl", false, "use ssl")
c.Flags().BoolVar(&noHttp, "no-http", false, "skip http put")
err := c.Execute()
if err != nil {
os.Exit(1)
}
}
......@@ -36,7 +36,7 @@ import (
// The process is repeated until the readCount reaches the announced spanLength of the chunk.
type SimpleJoinerJob struct {
ctx context.Context
store storage.Storer
getter storage.Getter
spanLength int64 // the total length of data represented by the root chunk the job was initialized with.
readCount int64 // running count of chunks read by the io.Reader consumer.
cursors [9]int // per-level read cursor of data.
......@@ -49,13 +49,13 @@ type SimpleJoinerJob struct {
}
// NewSimpleJoinerJob creates a new simpleJoinerJob.
func NewSimpleJoinerJob(ctx context.Context, store storage.Storer, rootChunk swarm.Chunk) *SimpleJoinerJob {
func NewSimpleJoinerJob(ctx context.Context, getter storage.Getter, rootChunk swarm.Chunk) *SimpleJoinerJob {
spanLength := binary.LittleEndian.Uint64(rootChunk.Data()[:8])
levelCount := file.Levels(int64(spanLength), swarm.SectionSize, swarm.Branches)
j := &SimpleJoinerJob{
ctx: ctx,
store: store,
getter: getter,
spanLength: int64(spanLength),
dataC: make(chan []byte),
doneC: make(chan struct{}),
......@@ -135,7 +135,7 @@ func (j *SimpleJoinerJob) nextReference(level int) error {
func (j *SimpleJoinerJob) nextChunk(level int, address swarm.Address) error {
// attempt to retrieve the chunk
ch, err := j.store.Get(j.ctx, storage.ModeGetRequest, address)
ch, err := j.getter.Get(j.ctx, storage.ModeGetRequest, address)
if err != nil {
return err
}
......
......@@ -18,13 +18,13 @@ import (
// simpleJoiner wraps a non-optimized implementation of file.Joiner.
type simpleJoiner struct {
store storage.Storer
getter storage.Getter
}
// NewSimpleJoiner creates a new simpleJoiner.
func NewSimpleJoiner(store storage.Storer) file.Joiner {
func NewSimpleJoiner(getter storage.Getter) file.Joiner {
return &simpleJoiner{
store: store,
getter: getter,
}
}
......@@ -35,7 +35,7 @@ func NewSimpleJoiner(store storage.Storer) file.Joiner {
func (s *simpleJoiner) Join(ctx context.Context, address swarm.Address) (dataOut io.ReadCloser, dataSize int64, err error) {
// retrieve the root chunk to read the total data length the be retrieved
rootChunk, err := s.store.Get(ctx, storage.ModeGetRequest, address)
rootChunk, err := s.getter.Get(ctx, storage.ModeGetRequest, address)
if err != nil {
return nil, 0, err
}
......@@ -47,6 +47,6 @@ func (s *simpleJoiner) Join(ctx context.Context, address swarm.Address) (dataOut
return file.NewSimpleReadCloser(data), int64(spanLength), nil
}
r := internal.NewSimpleJoinerJob(ctx, s.store, rootChunk)
r := internal.NewSimpleJoinerJob(ctx, s.getter, rootChunk)
return r, int64(spanLength), nil
}
......@@ -39,7 +39,7 @@ func hashFunc() hash.Hash {
// error and will may result in undefined result.
type SimpleSplitterJob struct {
ctx context.Context
store storage.Storer
putter storage.Putter
spanLength int64 // target length of data
length int64 // number of bytes written to the data level of the hasher
sumCounts []int // number of sums performed, indexed per level
......@@ -51,11 +51,11 @@ type SimpleSplitterJob struct {
// NewSimpleSplitterJob creates a new SimpleSplitterJob.
//
// The spanLength is the length of the data that will be written.
func NewSimpleSplitterJob(ctx context.Context, store storage.Storer, spanLength int64) *SimpleSplitterJob {
func NewSimpleSplitterJob(ctx context.Context, putter storage.Putter, spanLength int64) *SimpleSplitterJob {
p := bmtlegacy.NewTreePool(hashFunc, swarm.Branches, bmtlegacy.PoolSize)
return &SimpleSplitterJob{
ctx: ctx,
store: store,
putter: putter,
spanLength: spanLength,
sumCounts: make([]int, levelBufferLimit),
cursors: make([]int, levelBufferLimit),
......@@ -148,7 +148,7 @@ func (s *SimpleSplitterJob) sumLevel(lvl int) ([]byte, error) {
tail := s.buffer[s.cursors[lvl+1]:s.cursors[lvl]]
chunkData := append(head, tail...)
ch := swarm.NewChunk(addr, chunkData)
_, err = s.store.Put(s.ctx, storage.ModePutUpload, ch)
_, err = s.putter.Put(s.ctx, storage.ModePutUpload, ch)
if err != nil {
return nil, err
}
......
......@@ -18,13 +18,13 @@ import (
// simpleSplitter wraps a non-optimized implementation of file.Splitter
type simpleSplitter struct {
store storage.Storer
putter storage.Putter
}
// NewSimpleSplitter creates a new SimpleSplitter
func NewSimpleSplitter(store storage.Storer) file.Splitter {
func NewSimpleSplitter(putter storage.Putter) file.Splitter {
return &simpleSplitter{
store: store,
putter: putter,
}
}
......@@ -35,14 +35,17 @@ func NewSimpleSplitter(store storage.Storer) file.Splitter {
//
// It returns the Swarmhash of the data.
func (s *simpleSplitter) Split(ctx context.Context, r io.ReadCloser, dataLength int64) (addr swarm.Address, err error) {
j := internal.NewSimpleSplitterJob(ctx, s.store, dataLength)
j := internal.NewSimpleSplitterJob(ctx, s.putter, dataLength)
var total int
var total int64
data := make([]byte, swarm.ChunkSize)
for {
c, err := r.Read(data)
if err != nil {
if err == io.EOF {
if total < dataLength {
return swarm.ZeroAddress, fmt.Errorf("splitter only received %d bytes of data, expected %d bytes", total, dataLength)
}
break
}
return swarm.ZeroAddress, err
......@@ -54,7 +57,7 @@ func (s *simpleSplitter) Split(ctx context.Context, r io.ReadCloser, dataLength
if cc < c {
return swarm.ZeroAddress, fmt.Errorf("write count to file hasher component %d does not match read count %d", cc, c)
}
total += c
total += int64(c)
}
sum := j.Sum(nil)
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package splitter_test
import (
"context"
"testing"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/splitter"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
mockbytes "gitlab.com/nolash/go-mockbytes"
)
// TestSplitIncomplete tests that the Split method returns an error if
// the amounts of bytes written does not match the data length passed to the method.
func TestSplitIncomplete(t *testing.T) {
testData := make([]byte, 42)
store := mock.NewStorer()
s := splitter.NewSimpleSplitter(store)
testDataReader := file.NewSimpleReadCloser(testData)
_, err := s.Split(context.Background(), testDataReader, 41)
if err == nil {
t.Fatalf("expected error on EOF before full length write")
}
}
// TestSplitSingleChunk hashes one single chunk and verifies
// that that corresponding chunk exist in the store afterwards.
func TestSplitSingleChunk(t *testing.T) {
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
testData, err := g.SequentialBytes(swarm.ChunkSize)
if err != nil {
t.Fatal(err)
}
store := mock.NewStorer()
s := splitter.NewSimpleSplitter(store)
testDataReader := file.NewSimpleReadCloser(testData)
resultAddress, err := s.Split(context.Background(), testDataReader, int64(len(testData)))
if err != nil {
t.Fatal(err)
}
testHashHex := "c10090961e7682a10890c334d759a28426647141213abda93b096b892824d2ef"
testHashAddress := swarm.MustParseHexAddress(testHashHex)
if !testHashAddress.Equal(resultAddress) {
t.Fatalf("expected %v, got %v", testHashAddress, resultAddress)
}
_, err = store.Get(context.Background(), storage.ModeGetRequest, resultAddress)
if err != nil {
t.Fatal(err)
}
}
// TestSplitThreeLevels hashes enough data chunks in order to
// create a full chunk of intermediate hashes.
// It verifies that all created chunks exist in the store afterwards.
func TestSplitThreeLevels(t *testing.T) {
// edge case selected from internal/job_test.go
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
testData, err := g.SequentialBytes(swarm.ChunkSize * swarm.Branches)
if err != nil {
t.Fatal(err)
}
store := mock.NewStorer()
s := splitter.NewSimpleSplitter(store)
testDataReader := file.NewSimpleReadCloser(testData)
resultAddress, err := s.Split(context.Background(), testDataReader, int64(len(testData)))
if err != nil {
t.Fatal(err)
}
testHashHex := "3047d841077898c26bbe6be652a2ec590a5d9bd7cd45d290ea42511b48753c09"
testHashAddress := swarm.MustParseHexAddress(testHashHex)
if !testHashAddress.Equal(resultAddress) {
t.Fatalf("expected %v, got %v", testHashAddress, resultAddress)
}
_, err = store.Get(context.Background(), storage.ModeGetRequest, resultAddress)
if err != nil {
t.Fatal(err)
}
rootChunk, err := store.Get(context.Background(), storage.ModeGetRequest, resultAddress)
if err != nil {
t.Fatal(err)
}
rootData := rootChunk.Data()[8:]
for i := 0; i < swarm.ChunkSize; i += swarm.SectionSize {
dataAddressBytes := rootData[i : i+swarm.SectionSize]
dataAddress := swarm.NewAddress(dataAddressBytes)
_, err := store.Get(context.Background(), storage.ModeGetRequest, dataAddress)
if err != nil {
t.Fatal(err)
}
}
}
......@@ -127,9 +127,9 @@ func (d *Descriptor) String() string {
}
type Storer interface {
Get(ctx context.Context, mode ModeGet, addr swarm.Address) (ch swarm.Chunk, err error)
GetMulti(ctx context.Context, mode ModeGet, addrs ...swarm.Address) (ch []swarm.Chunk, err error)
Getter
Putter
GetMulti(ctx context.Context, mode ModeGet, addrs ...swarm.Address) (ch []swarm.Chunk, err error)
Has(ctx context.Context, addr swarm.Address) (yes bool, err error)
HasMulti(ctx context.Context, addrs ...swarm.Address) (yes []bool, err error)
Set(ctx context.Context, mode ModeSet, addrs ...swarm.Address) (err error)
......@@ -143,6 +143,10 @@ type Putter interface {
Put(ctx context.Context, mode ModePut, chs ...swarm.Chunk) (exist []bool, err error)
}
type Getter interface {
Get(ctx context.Context, mode ModeGet, addr swarm.Address) (ch swarm.Chunk, err error)
}
// StateStorer defines methods required to get, set, delete values for different keys
// and close the underlying resources.
type StateStorer interface {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment