Commit 6e9510b1 authored by acud's avatar acud Committed by GitHub

api, storage: add http interface to accept chunks (#32)

* storage: add validating mock storage
* api: add bzzchunk api
parent fdd955ee
......@@ -10,6 +10,7 @@ import (
"github.com/ethersphere/bee/pkg/logging"
m "github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/tracing"
)
......@@ -26,6 +27,7 @@ type server struct {
type Options struct {
Pingpong pingpong.Interface
Storer storage.Storer
Logger logging.Logger
Tracer *tracing.Tracer
}
......
......@@ -14,16 +14,19 @@ import (
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/storage"
"resenje.org/web"
)
type testServerOptions struct {
Pingpong pingpong.Interface
Storer storage.Storer
}
func newTestServer(t *testing.T, o testServerOptions) (client *http.Client, cleanup func()) {
s := api.New(api.Options{
Pingpong: o.Pingpong,
Storer: o.Storer,
Logger: logging.New(ioutil.Discard, 0),
})
ts := httptest.NewServer(s)
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"bytes"
"errors"
"io"
"io/ioutil"
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/gorilla/mux"
)
func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
addr := mux.Vars(r)["addr"]
ctx := r.Context()
address, err := swarm.ParseHexAddress(addr)
if err != nil {
s.Logger.Debugf("bzz-chunk: parse chunk address %s: %v", addr, err)
s.Logger.Error("bzz-chunk: error uploading chunk")
jsonhttp.BadRequest(w, "invalid chunk address")
return
}
data, err := ioutil.ReadAll(r.Body)
if err != nil {
s.Logger.Debugf("bzz-chunk: read chunk data error: %v, addr %s", err, address)
s.Logger.Error("bzz-chunk: read chunk data error")
jsonhttp.InternalServerError(w, "cannot read chunk data")
return
}
err = s.Storer.Put(ctx, address, data)
if err != nil {
s.Logger.Debugf("bzz-chunk: chunk write error: %v, addr %s", err, address)
s.Logger.Error("bzz-chunk: chunk write error")
jsonhttp.BadRequest(w, "chunk write error")
return
}
jsonhttp.OK(w, nil)
}
func (s *server) chunkGetHandler(w http.ResponseWriter, r *http.Request) {
addr := mux.Vars(r)["addr"]
ctx := r.Context()
address, err := swarm.ParseHexAddress(addr)
if err != nil {
s.Logger.Debugf("bzz-chunk: parse chunk address %s: %v", addr, err)
s.Logger.Error("bzz-chunk: parse chunk address error")
jsonhttp.BadRequest(w, "invalid chunk address")
return
}
data, err := s.Storer.Get(ctx, address)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
s.Logger.Trace("bzz-chunk: chunk not found. addr %s", address)
jsonhttp.NotFound(w, "chunk not found")
return
}
s.Logger.Debugf("bzz-chunk: chunk read error: %v ,addr %s", err, address)
s.Logger.Error("bzz-chunk: chunk read error")
jsonhttp.InternalServerError(w, "chunk read error")
return
}
w.Header().Set("Content-Type", "binary/octet-stream")
_, _ = io.Copy(w, bytes.NewReader(data))
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"bytes"
"io"
"io/ioutil"
"net/http"
"testing"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
// TestChunkUpload uploads a chunk to an API that verifies the chunk according
// to a given validator, then tries to download the uploaded data.
func TestChunkUploadDownload(t *testing.T) {
resource := func(addr swarm.Address) string {
return "/bzz-chunk/" + addr.String()
}
validHash, err := swarm.ParseHexAddress("aabbcc")
if err != nil {
t.Fatal(err)
}
invalidHash, err := swarm.ParseHexAddress("bbccdd")
if err != nil {
t.Fatal(err)
}
validContent := []byte("bbaatt")
invalidContent := []byte("bbaattss")
validatorF := func(addr swarm.Address, data []byte) bool {
if !addr.Equal(validHash) {
return false
}
if !bytes.Equal(data, validContent) {
return false
}
return true
}
mockValidatingStorer := mock.NewValidatingStorer(validatorF)
client, cleanup := newTestServer(t, testServerOptions{
Storer: mockValidatingStorer,
})
defer cleanup()
t.Run("invalid hash", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodPost, resource(invalidHash), bytes.NewReader(validContent), http.StatusBadRequest, jsonhttp.StatusResponse{
Message: "chunk write error",
Code: http.StatusBadRequest,
})
// make sure chunk is not retrievable
_ = request(t, client, http.MethodGet, resource(invalidHash), nil, http.StatusNotFound)
})
t.Run("invalid content", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodPost, resource(validHash), bytes.NewReader(invalidContent), http.StatusBadRequest, jsonhttp.StatusResponse{
Message: "chunk write error",
Code: http.StatusBadRequest,
})
// make sure not retrievable
_ = request(t, client, http.MethodGet, resource(validHash), nil, http.StatusNotFound)
})
t.Run("ok", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
})
// try to fetch the same chunk
resp := request(t, client, http.MethodGet, resource(validHash), nil, http.StatusOK)
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(validContent, data) {
t.Fatal("data retrieved doesnt match uploaded content")
}
})
}
func request(t *testing.T, client *http.Client, method string, resource string, body io.Reader, responseCode int) *http.Response {
t.Helper()
req, err := http.NewRequest(method, resource, body)
if err != nil {
t.Fatal(err)
}
resp, err := client.Do(req)
if err != nil {
t.Fatal(err)
}
if resp.StatusCode != responseCode {
t.Fatalf("got response status %s, want %v %s", resp.Status, responseCode, http.StatusText(responseCode))
}
return resp
}
......@@ -32,6 +32,11 @@ func (s *server) setupRouting() {
"POST": http.HandlerFunc(s.pingpongHandler),
})
router.Handle("/bzz-chunk/{addr}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.chunkGetHandler),
"POST": http.HandlerFunc(s.chunkUploadHandler),
})
s.Handler = web.ChainHandlers(
logging.NewHTTPAccessLogHandler(s.Logger, logrus.InfoLevel, "api access"),
handlers.CompressHandler,
......
......@@ -29,6 +29,7 @@ import (
"github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/topology/full"
"github.com/ethersphere/bee/pkg/tracing"
ma "github.com/multiformats/go-multiaddr"
......@@ -157,11 +158,15 @@ func NewBee(o Options) (*Bee, error) {
logger.Infof("p2p address: %s", addr)
}
// for now, storer is an in-memory store.
storer := mock.NewStorer()
var apiService api.Service
if o.APIAddr != "" {
// API server
apiService = api.New(api.Options{
Pingpong: pingPong,
Storer: storer,
Logger: logger,
Tracer: tracer,
})
......
......@@ -12,15 +12,21 @@ import (
)
type mockStorer struct {
store map[string][]byte
store map[string][]byte
validator storage.ChunkValidatorFunc
}
func NewStorer() storage.Storer {
s := &mockStorer{
return &mockStorer{
store: make(map[string][]byte),
}
}
return s
func NewValidatingStorer(f storage.ChunkValidatorFunc) storage.Storer {
return &mockStorer{
store: make(map[string][]byte),
validator: f,
}
}
func (m *mockStorer) Get(ctx context.Context, addr swarm.Address) (data []byte, err error) {
......@@ -32,6 +38,11 @@ func (m *mockStorer) Get(ctx context.Context, addr swarm.Address) (data []byte,
}
func (m *mockStorer) Put(ctx context.Context, addr swarm.Address, data []byte) error {
if m.validator != nil {
if !m.validator(addr, data) {
return storage.ErrInvalidChunk
}
}
m.store[addr.String()] = data
return nil
}
......@@ -46,3 +46,59 @@ func TestMockStorer(t *testing.T) {
}
}
}
func TestMockValidatingStorer(t *testing.T) {
validAddr := "aabbcc"
invalidAddr := "bbccdd"
keyValid, err := swarm.ParseHexAddress(validAddr)
if err != nil {
t.Fatal(err)
}
keyInvalid, err := swarm.ParseHexAddress(invalidAddr)
if err != nil {
t.Fatal(err)
}
validContent := []byte("bbaatt")
invalidContent := []byte("bbaattss")
validatorF := func(addr swarm.Address, data []byte) bool {
if !addr.Equal(keyValid) {
return false
}
if !bytes.Equal(data, validContent) {
return false
}
return true
}
s := mock.NewValidatingStorer(validatorF)
ctx := context.Background()
if err := s.Put(ctx, keyValid, validContent); err != nil {
t.Fatalf("expected not error but got: %v", err)
}
if err := s.Put(ctx, keyInvalid, validContent); err == nil {
t.Fatalf("expected error but got none")
}
if err := s.Put(ctx, keyInvalid, invalidContent); err == nil {
t.Fatalf("expected error but got none")
}
if data, err := s.Get(ctx, keyValid); err != nil {
t.Fatalf("got error on get but expected none: %v", err)
} else {
if !bytes.Equal(data, validContent) {
t.Fatal("stored content not identical to input data")
}
}
if _, err := s.Get(ctx, keyInvalid); err == nil {
t.Fatal("got no error on get but expected one")
}
}
......@@ -11,7 +11,13 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
)
var ErrNotFound = errors.New("storage: not found")
var (
ErrNotFound = errors.New("storage: not found")
ErrInvalidChunk = errors.New("storage: invalid chunk")
)
// ChunkValidatorFunc validates Swarm chunk address and chunk data
type ChunkValidatorFunc func(swarm.Address, []byte) bool
type Storer interface {
Get(ctx context.Context, addr swarm.Address) (data []byte, err error)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment