Commit 0b244e9b authored by aloknerurkar's avatar aloknerurkar Committed by GitHub

feat: chunk stream upload endpoint (#2230)

parent 26c621b6
...@@ -164,6 +164,22 @@ paths: ...@@ -164,6 +164,22 @@ paths:
default: default:
description: Default response description: Default response
"/chunks/stream":
get:
summary: "Upload stream of chunks"
tags:
- Chunk
parameters:
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmTagParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
responses:
"200":
description: "Returns a Websocket connection on which stream of chunks can be uploaded. Each chunk sent is acknowledged using a binary response `0` which serves as confirmation of upload of single chunk. Chunks should be packaged as binary messages for uploading."
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
default:
description: Default response
"/bzz": "/bzz":
post: post:
summary: "Upload file or a collection of files" summary: "Upload file or a collection of files"
......
...@@ -71,6 +71,7 @@ type testServerOptions struct { ...@@ -71,6 +71,7 @@ type testServerOptions struct {
PostageContract postagecontract.Interface PostageContract postagecontract.Interface
Post postage.Service Post postage.Service
Steward steward.Reuploader Steward steward.Reuploader
WsHeaders http.Header
} }
func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string) { func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string) {
...@@ -115,7 +116,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. ...@@ -115,7 +116,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
if o.WsPath != "" { if o.WsPath != "" {
u := url.URL{Scheme: "ws", Host: ts.Listener.Addr().String(), Path: o.WsPath} u := url.URL{Scheme: "ws", Host: ts.Listener.Addr().String(), Path: o.WsPath}
conn, _, err = websocket.DefaultDialer.Dial(u.String(), nil) conn, _, err = websocket.DefaultDialer.Dial(u.String(), o.WsHeaders)
if err != nil { if err != nil {
t.Fatalf("dial: %v. url %v", err, u.String()) t.Fatalf("dial: %v. url %v", err, u.String())
} }
......
...@@ -6,6 +6,7 @@ package api ...@@ -6,6 +6,7 @@ package api
import ( import (
"bytes" "bytes"
"context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
...@@ -29,27 +30,55 @@ type chunkAddressResponse struct { ...@@ -29,27 +30,55 @@ type chunkAddressResponse struct {
Reference swarm.Address `json:"reference"` Reference swarm.Address `json:"reference"`
} }
func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { func (s *server) processUploadRequest(
var ( r *http.Request,
tag *tags.Tag ) (ctx context.Context, tag *tags.Tag, putter storage.Putter, err error) {
ctx = r.Context()
err error
)
if h := r.Header.Get(SwarmTagHeader); h != "" { if h := r.Header.Get(SwarmTagHeader); h != "" {
tag, err = s.getTag(h) tag, err = s.getTag(h)
if err != nil { if err != nil {
s.logger.Debugf("chunk upload: get tag: %v", err) s.logger.Debugf("chunk upload: get tag: %v", err)
s.logger.Error("chunk upload: get tag") s.logger.Error("chunk upload: get tag")
jsonhttp.BadRequest(w, "cannot get tag") return nil, nil, nil, errors.New("cannot get tag")
return
} }
// add the tag to the context if it exists // add the tag to the context if it exists
ctx = sctx.SetTag(r.Context(), tag) ctx = sctx.SetTag(r.Context(), tag)
} else {
ctx = r.Context()
}
batch, err := requestPostageBatchId(r)
if err != nil {
s.logger.Debugf("chunk upload: postage batch id: %v", err)
s.logger.Error("chunk upload: postage batch id")
return nil, nil, nil, errors.New("invalid postage batch id")
}
// increment the StateSplit here since we dont have a splitter for the file upload putter, err = newStamperPutter(s.storer, s.post, s.signer, batch)
if err != nil {
s.logger.Debugf("chunk upload: putter: %v", err)
s.logger.Error("chunk upload: putter")
switch {
case errors.Is(err, postage.ErrNotFound):
return nil, nil, nil, errors.New("batch not found")
case errors.Is(err, postage.ErrNotUsable):
return nil, nil, nil, errors.New("batch not usable")
}
return nil, nil, nil, err
}
return ctx, tag, putter, nil
}
func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
ctx, tag, putter, err := s.processUploadRequest(r)
if err != nil {
jsonhttp.BadRequest(w, err.Error())
return
}
if tag != nil {
err = tag.Inc(tags.StateSplit) err = tag.Inc(tags.StateSplit)
if err != nil { if err != nil {
s.logger.Debugf("chunk upload: increment tag: %v", err) s.logger.Debugf("chunk upload: increment tag: %v", err)
...@@ -85,29 +114,6 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -85,29 +114,6 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
batch, err := requestPostageBatchId(r)
if err != nil {
s.logger.Debugf("chunk upload: postage batch id: %v", err)
s.logger.Error("chunk upload: postage batch id")
jsonhttp.BadRequest(w, "invalid postage batch id")
return
}
putter, err := newStamperPutter(s.storer, s.post, s.signer, batch)
if err != nil {
s.logger.Debugf("chunk upload: putter:%v", err)
s.logger.Error("chunk upload: putter")
switch {
case errors.Is(err, postage.ErrNotFound):
jsonhttp.BadRequest(w, "batch not found")
case errors.Is(err, postage.ErrNotUsable):
jsonhttp.BadRequest(w, "batch not usable yet")
default:
jsonhttp.BadRequest(w, nil)
}
return
}
seen, err := putter.Put(ctx, requestModePut(r), chunk) seen, err := putter.Put(ctx, requestModePut(r), chunk)
if err != nil { if err != nil {
s.logger.Debugf("chunk upload: chunk write error: %v, addr %s", err, chunk.Address()) s.logger.Debugf("chunk upload: chunk write error: %v, addr %s", err, chunk.Address())
...@@ -145,6 +151,11 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { ...@@ -145,6 +151,11 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
if err := s.pinning.CreatePin(ctx, chunk.Address(), false); err != nil { if err := s.pinning.CreatePin(ctx, chunk.Address(), false); err != nil {
s.logger.Debugf("chunk upload: creation of pin for %q failed: %v", chunk.Address(), err) s.logger.Debugf("chunk upload: creation of pin for %q failed: %v", chunk.Address(), err)
s.logger.Error("chunk upload: creation of pin failed") s.logger.Error("chunk upload: creation of pin failed")
err = s.storer.Set(ctx, storage.ModeSetUnpin, chunk.Address())
if err != nil {
s.logger.Debugf("chunk upload: deletion of pin for %s failed: %v", chunk.Address(), err)
s.logger.Error("chunk upload: deletion of pin failed")
}
jsonhttp.InternalServerError(w, nil) jsonhttp.InternalServerError(w, nil)
return return
} }
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"context"
"errors"
"net/http"
"strings"
"time"
"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/gorilla/websocket"
)
var successWsMsg = []byte{}
func (s *server) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Request) {
ctx, tag, putter, err := s.processUploadRequest(r)
if err != nil {
jsonhttp.BadRequest(w, err.Error())
return
}
upgrader := websocket.Upgrader{
ReadBufferSize: swarm.ChunkSize,
WriteBufferSize: swarm.ChunkSize,
CheckOrigin: s.checkOrigin,
}
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
s.logger.Debugf("chunk stream handler failed upgrading: %v", err)
s.logger.Error("chunk stream handler: upgrading")
jsonhttp.BadRequest(w, "not a websocket connection")
return
}
s.wsWg.Add(1)
go s.handleUploadStream(
ctx,
c,
tag,
putter,
requestModePut(r),
strings.ToLower(r.Header.Get(SwarmPinHeader)) == "true",
)
}
func (s *server) handleUploadStream(
ctx context.Context,
conn *websocket.Conn,
tag *tags.Tag,
putter storage.Putter,
mode storage.ModePut,
pin bool,
) {
defer s.wsWg.Done()
var (
gone = make(chan struct{})
err error
)
defer func() {
_ = conn.Close()
}()
conn.SetCloseHandler(func(code int, text string) error {
s.logger.Debugf("chunk stream handler: client gone. code %d message %s", code, text)
close(gone)
return nil
})
sendMsg := func(msgType int, buf []byte) error {
err := conn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
return err
}
err = conn.WriteMessage(msgType, buf)
if err != nil {
return err
}
return nil
}
sendErrorClose := func(code int, errmsg string) {
err := conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(code, errmsg),
time.Now().Add(writeDeadline),
)
if err != nil {
s.logger.Errorf("chunk stream handler: failed sending close msg")
}
}
for {
select {
case <-s.quit:
// shutdown
sendErrorClose(websocket.CloseGoingAway, "node shutting down")
return
case <-gone:
// client gone
return
default:
// if there is no indication to stop, go ahead and read the next message
}
err = conn.SetReadDeadline(time.Now().Add(readDeadline))
if err != nil {
s.logger.Debugf("chunk stream handler: set read deadline: %v", err)
s.logger.Error("chunk stream handler: set read deadline")
return
}
mt, msg, err := conn.ReadMessage()
if err != nil {
s.logger.Debugf("chunk stream handler: read message error: %v", err)
s.logger.Error("chunk stream handler: read message error")
return
}
if mt != websocket.BinaryMessage {
s.logger.Debug("chunk stream handler: unexpected message received from client", mt)
s.logger.Error("chunk stream handler: unexpected message received from client")
sendErrorClose(websocket.CloseUnsupportedData, "invalid message")
return
}
if tag != nil {
err = tag.Inc(tags.StateSplit)
if err != nil {
s.logger.Debug("chunk stream handler: failed incrementing tag", err)
s.logger.Error("chunk stream handler: failed incrementing tag")
sendErrorClose(websocket.CloseInternalServerErr, "failed incrementing tag")
return
}
}
if len(msg) < swarm.SpanSize {
s.logger.Debug("chunk stream handler: not enough data")
s.logger.Error("chunk stream handler: not enough data")
return
}
chunk, err := cac.NewWithDataSpan(msg)
if err != nil {
s.logger.Debugf("chunk stream handler: create chunk error: %v", err)
s.logger.Error("chunk stream handler: failed creating chunk")
return
}
seen, err := putter.Put(ctx, mode, chunk)
if err != nil {
s.logger.Debugf("chunk stream handler: chunk write error: %v, addr %s", err, chunk.Address())
s.logger.Error("chunk stream handler: chunk write error")
switch {
case errors.Is(err, postage.ErrBucketFull):
sendErrorClose(websocket.CloseInternalServerErr, "batch is overissued")
default:
sendErrorClose(websocket.CloseInternalServerErr, "chunk write error")
}
return
} else if len(seen) > 0 && seen[0] && tag != nil {
err := tag.Inc(tags.StateSeen)
if err != nil {
s.logger.Debugf("chunk stream handler: increment tag", err)
s.logger.Error("chunk stream handler: increment tag")
sendErrorClose(websocket.CloseInternalServerErr, "failed incrementing tag")
return
}
}
if tag != nil {
// indicate that the chunk is stored
err = tag.Inc(tags.StateStored)
if err != nil {
s.logger.Debugf("chunk stream handler: increment tag", err)
s.logger.Error("chunk stream handler: increment tag")
sendErrorClose(websocket.CloseInternalServerErr, "failed incrementing tag")
return
}
}
if pin {
if err := s.pinning.CreatePin(ctx, chunk.Address(), false); err != nil {
s.logger.Debugf("chunk stream handler: creation of pin for %q failed: %v", chunk.Address(), err)
s.logger.Error("chunk stream handler: creation of pin failed")
// since we already increment the pin counter because of the ModePut, we need
// to delete the pin here to prevent the pin counter from never going to 0
err = s.storer.Set(ctx, storage.ModeSetUnpin, chunk.Address())
if err != nil {
s.logger.Debugf("chunk stream handler: deletion of pin for %s failed: %v", chunk.Address(), err)
s.logger.Error("chunk stream handler: deletion of pin failed")
}
sendErrorClose(websocket.CloseInternalServerErr, "failed creating pin")
return
}
}
err = sendMsg(websocket.BinaryMessage, successWsMsg)
if err != nil {
s.logger.Debugf("chunk stream handler: failed sending success msg: %v", err)
s.logger.Error("chunk stream handler: failed sending confirmation")
return
}
}
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"bytes"
"context"
"io/ioutil"
"net/http"
"testing"
"time"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/logging"
pinning "github.com/ethersphere/bee/pkg/pinning/mock"
mockpost "github.com/ethersphere/bee/pkg/postage/mock"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
testingc "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/gorilla/websocket"
)
func TestChunkUploadStream(t *testing.T) {
wsHeaders := http.Header{}
wsHeaders.Set("Content-Type", "application/octet-stream")
wsHeaders.Set("Swarm-Postage-Batch-Id", batchOkStr)
var (
statestoreMock = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0)
tag = tags.NewTags(statestoreMock, logger)
storerMock = mock.NewStorer()
pinningMock = pinning.NewServiceMock()
_, wsConn, _ = newTestServer(t, testServerOptions{
Storer: storerMock,
Pinning: pinningMock,
Tags: tag,
Post: mockpost.New(mockpost.WithAcceptAll()),
WsPath: "/chunks/stream",
WsHeaders: wsHeaders,
})
)
t.Run("upload and verify", func(t *testing.T) {
chsToGet := []swarm.Chunk{}
for i := 0; i < 5; i++ {
ch := testingc.GenerateTestRandomChunk()
err := wsConn.SetWriteDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
err = wsConn.WriteMessage(websocket.BinaryMessage, ch.Data())
if err != nil {
t.Fatal(err)
}
err = wsConn.SetReadDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
mt, msg, err := wsConn.ReadMessage()
if err != nil {
t.Fatal(err)
}
if mt != websocket.BinaryMessage || !bytes.Equal(msg, api.SuccessWsMsg) {
t.Fatal("invalid response", mt, string(msg))
}
chsToGet = append(chsToGet, ch)
}
for _, c := range chsToGet {
ch, err := storerMock.Get(context.Background(), storage.ModeGetRequest, c.Address())
if err != nil {
t.Fatal("failed to get chunk after upload", err)
}
if !ch.Equal(c) {
t.Fatal("invalid chunk read")
}
}
})
t.Run("close on incorrect msg", func(t *testing.T) {
err := wsConn.SetWriteDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
err = wsConn.WriteMessage(websocket.TextMessage, []byte("incorrect msg"))
if err != nil {
t.Fatal(err)
}
err = wsConn.SetReadDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
_, _, err = wsConn.ReadMessage()
if err == nil {
t.Fatal("expected failure on read")
}
if cerr, ok := err.(*websocket.CloseError); !ok {
t.Fatal("invalid error on read")
} else if cerr.Text != "invalid message" {
t.Fatalf("incorrect response on error, exp: (invalid message) got (%s)", cerr.Text)
}
})
}
...@@ -42,6 +42,8 @@ var ( ...@@ -42,6 +42,8 @@ var (
FeedMetadataEntryOwner = feedMetadataEntryOwner FeedMetadataEntryOwner = feedMetadataEntryOwner
FeedMetadataEntryTopic = feedMetadataEntryTopic FeedMetadataEntryTopic = feedMetadataEntryTopic
FeedMetadataEntryType = feedMetadataEntryType FeedMetadataEntryType = feedMetadataEntryType
SuccessWsMsg = successWsMsg
) )
func (s *Server) ResolveNameOrAddress(str string) (swarm.Address, error) { func (s *Server) ResolveNameOrAddress(str string) (swarm.Address, error) {
......
...@@ -23,8 +23,9 @@ import ( ...@@ -23,8 +23,9 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
var ( const (
writeDeadline = 4 * time.Second // write deadline. should be smaller than the shutdown timeout on api close writeDeadline = 4 * time.Second // write deadline. should be smaller than the shutdown timeout on api close
readDeadline = 4 * time.Second // read deadline. should be smaller than the shutdown timeout on api close
targetMaxLength = 2 // max target length in bytes, in order to prevent grieving by excess computation targetMaxLength = 2 // max target length in bytes, in order to prevent grieving by excess computation
) )
......
...@@ -63,6 +63,11 @@ func (s *server) setupRouting() { ...@@ -63,6 +63,11 @@ func (s *server) setupRouting() {
), ),
}) })
handle("/chunks/stream", web.ChainHandlers(
s.newTracingHandler("chunks-stream-upload"),
web.FinalHandlerFunc(s.chunkUploadStreamHandler),
))
handle("/chunks/{addr}", jsonhttp.MethodHandler{ handle("/chunks/{addr}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.chunkGetHandler), "GET": http.HandlerFunc(s.chunkGetHandler),
}) })
......
...@@ -9,9 +9,11 @@ import ( ...@@ -9,9 +9,11 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url"
"sort" "sort"
"strconv" "strconv"
"testing" "testing"
"time"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
mockpost "github.com/ethersphere/bee/pkg/postage/mock" mockpost "github.com/ethersphere/bee/pkg/postage/mock"
...@@ -25,6 +27,7 @@ import ( ...@@ -25,6 +27,7 @@ import (
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/swarm/test" "github.com/ethersphere/bee/pkg/swarm/test"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
"github.com/gorilla/websocket"
"gitlab.com/nolash/go-mockbytes" "gitlab.com/nolash/go-mockbytes"
) )
...@@ -35,6 +38,7 @@ type fileUploadResponse struct { ...@@ -35,6 +38,7 @@ type fileUploadResponse struct {
func tagsWithIdResource(id uint32) string { return fmt.Sprintf("/tags/%d", id) } func tagsWithIdResource(id uint32) string { return fmt.Sprintf("/tags/%d", id) }
func TestTags(t *testing.T) { func TestTags(t *testing.T) {
var ( var (
bzzResource = "/bzz" bzzResource = "/bzz"
bytesResource = "/bytes" bytesResource = "/bytes"
...@@ -44,7 +48,7 @@ func TestTags(t *testing.T) { ...@@ -44,7 +48,7 @@ func TestTags(t *testing.T) {
mockStatestore = statestore.NewStateStore() mockStatestore = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0) logger = logging.New(ioutil.Discard, 0)
tag = tags.NewTags(mockStatestore, logger) tag = tags.NewTags(mockStatestore, logger)
client, _, _ = newTestServer(t, testServerOptions{ client, _, listenAddr = newTestServer(t, testServerOptions{
Storer: mock.NewStorer(), Storer: mock.NewStorer(),
Tags: tag, Tags: tag,
Logger: logger, Logger: logger,
...@@ -123,6 +127,56 @@ func TestTags(t *testing.T) { ...@@ -123,6 +127,56 @@ func TestTags(t *testing.T) {
tagValueTest(t, tr.Uid, 1, 1, 1, 0, 0, 0, swarm.ZeroAddress, client) tagValueTest(t, tr.Uid, 1, 1, 1, 0, 0, 0, swarm.ZeroAddress, client)
}) })
t.Run("create tag upload chunk stream", func(t *testing.T) {
// create a tag using the API
tr := api.TagResponse{}
jsonhttptest.Request(t, client, http.MethodPost, tagsResource, http.StatusCreated,
jsonhttptest.WithJSONRequestBody(api.TagRequest{}),
jsonhttptest.WithUnmarshalJSONResponse(&tr),
)
wsHeaders := http.Header{}
wsHeaders.Set("Content-Type", "application/octet-stream")
wsHeaders.Set(api.SwarmPostageBatchIdHeader, batchOkStr)
wsHeaders.Set(api.SwarmTagHeader, strconv.FormatUint(uint64(tr.Uid), 10))
u := url.URL{Scheme: "ws", Host: listenAddr, Path: "/chunks/stream"}
wsConn, _, err := websocket.DefaultDialer.Dial(u.String(), wsHeaders)
if err != nil {
t.Fatalf("dial: %v. url %v", err, u.String())
}
for i := 0; i < 5; i++ {
ch := testingc.GenerateTestRandomChunk()
err := wsConn.SetWriteDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
err = wsConn.WriteMessage(websocket.BinaryMessage, ch.Data())
if err != nil {
t.Fatal(err)
}
err = wsConn.SetReadDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
mt, msg, err := wsConn.ReadMessage()
if err != nil {
t.Fatal(err)
}
if mt != websocket.BinaryMessage || !bytes.Equal(msg, api.SuccessWsMsg) {
t.Fatal("invalid response", mt, string(msg))
}
}
tagValueTest(t, tr.Uid, 5, 5, 0, 0, 0, 0, swarm.ZeroAddress, client)
})
t.Run("list tags", func(t *testing.T) { t.Run("list tags", func(t *testing.T) {
// list all current tags // list all current tags
var resp api.ListTagsResponse var resp api.ListTagsResponse
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment