Commit ecbb71e5 authored by acud's avatar acud Committed by GitHub

api, feeds: wire-in feeds resolution (#1193)

parent 67ff6fb0
......@@ -155,7 +155,7 @@ func newTestServer(t *testing.T, storer storage.Storer) *url.URL {
t.Helper()
logger := logging.New(ioutil.Discard, 0)
store := statestore.NewStateStore()
s := api.New(tags.NewTags(store, logger), storer, nil, nil, nil, logger, nil, api.Options{})
s := api.New(tags.NewTags(store, logger), storer, nil, nil, nil, nil, logger, nil, api.Options{})
ts := httptest.NewServer(s)
srvUrl, err := url.Parse(ts.URL)
if err != nil {
......
......@@ -861,11 +861,107 @@ paths:
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/ReferenceResponse'
'400':
$ref: 'SwarmCommon.yaml#/components/responses/400'
$ref: 'SwarmCommon.yaml#/components/responses/400'
'401':
$ref: 'SwarmCommon.yaml#/components/responses/401'
$ref: 'SwarmCommon.yaml#/components/responses/401'
'500':
$ref: 'SwarmCommon.yaml#/components/responses/500'
default:
description: Default response
'/feeds/{owner}/{topic}':
post:
summary: Create an initial feed root manifest
tags:
- Feed
parameters:
- in: path
name: owner
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/EthereumAddress'
required: true
description: Owner
- in: path
name: topic
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/HexString'
required: true
description: Topic
- in: query
name: type
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/FeedType'
required: false
description: "Feed indexing scheme (default: sequence)"
responses:
'201':
description: Created
content:
application/json:
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/ReferenceResponse'
'400':
$ref: 'SwarmCommon.yaml#/components/responses/400'
'401':
$ref: 'SwarmCommon.yaml#/components/responses/401'
'500':
$ref: 'SwarmCommon.yaml#/components/responses/500'
default:
description: Default response
get:
summary: Find feed update
tags:
- Feed
parameters:
- in: path
name: owner
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/EthereumAddress'
required: true
description: Owner
- in: path
name: topic
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/HexString'
required: true
description: Topic
- in: query
name: index
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/HexString'
required: false
description: Feed update index
- in: query
name: at
schema:
type: integer
required: false
description: "Timestamp of the update (default: now)"
- in: query
name: type
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/FeedType'
required: false
description: "Feed indexing scheme (default: sequence)"
responses:
'200':
description: Latest feed update
headers:
'swarm-feed-index':
$ref: 'SwarmCommon.yaml#/components/headers/SwarmFeedIndex'
'swarm-feed-index-next':
$ref: 'SwarmCommon.yaml#/components/headers/SwarmFeedIndexNext'
content:
application/json:
schema:
$ref: 'SwarmCommon.yaml#/components/schemas/ReferenceResponse'
'400':
$ref: 'SwarmCommon.yaml#/components/responses/400'
'401':
$ref: 'SwarmCommon.yaml#/components/responses/401'
'500':
$ref: 'SwarmCommon.yaml#/components/responses/500'
default:
description: Default response
......@@ -318,6 +318,11 @@ components:
pattern: '^[A-Za-z0-9]+\.[A-Za-z0-9]+$'
example: "swarm.eth"
SwarmOnlyReference:
oneOf:
- $ref: '#/components/schemas/SwarmAddress'
- $ref: '#/components/schemas/SwarmEncryptedReference'
SwarmReference:
oneOf:
- $ref: '#/components/schemas/SwarmAddress'
......@@ -373,6 +378,21 @@ components:
welcome_message:
type: string
FeedType:
type: string
pattern: '^(sequence|epoch)$'
headers:
SwarmFeedIndex:
description: 'The index of the found update'
schema:
$ref: '#/components/schemas/HexString'
SwarmFeedIndexNext:
description: 'The index of the next possible update'
schema:
$ref: '#/components/schemas/HexString'
responses:
'204':
......
......@@ -17,6 +17,7 @@ import (
"time"
"unicode/utf8"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/logging"
m "github.com/ethersphere/bee/pkg/metrics"
......@@ -35,6 +36,8 @@ const (
SwarmEncryptHeader = "Swarm-Encrypt"
SwarmIndexDocumentHeader = "Swarm-Index-Document"
SwarmErrorDocumentHeader = "Swarm-Error-Document"
SwarmFeedIndexHeader = "Swarm-Feed-Index"
SwarmFeedIndexNextHeader = "Swarm-Feed-Index-Next"
)
// The size of buffer used for prefetching content with Langos.
......@@ -62,13 +65,14 @@ type Service interface {
}
type server struct {
Tags *tags.Tags
Storer storage.Storer
Resolver resolver.Interface
Pss pss.Interface
Traversal traversal.Service
Logger logging.Logger
Tracer *tracing.Tracer
Tags *tags.Tags
Storer storage.Storer
Resolver resolver.Interface
Pss pss.Interface
Traversal traversal.Service
Logger logging.Logger
Tracer *tracing.Tracer
feedFactory feeds.Factory
Options
http.Handler
metrics metrics
......@@ -89,18 +93,19 @@ const (
)
// New will create a and initialize a new API service.
func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, pss pss.Interface, traversalService traversal.Service, logger logging.Logger, tracer *tracing.Tracer, o Options) Service {
func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, pss pss.Interface, traversalService traversal.Service, feedFactory feeds.Factory, logger logging.Logger, tracer *tracing.Tracer, o Options) Service {
s := &server{
Tags: tags,
Storer: storer,
Resolver: resolver,
Pss: pss,
Traversal: traversalService,
Options: o,
Logger: logger,
Tracer: tracer,
metrics: newMetrics(),
quit: make(chan struct{}),
Tags: tags,
Storer: storer,
Resolver: resolver,
Pss: pss,
Traversal: traversalService,
feedFactory: feedFactory,
Options: o,
Logger: logger,
Tracer: tracer,
metrics: newMetrics(),
quit: make(chan struct{}),
}
s.setupRouting()
......
......@@ -15,6 +15,7 @@ import (
"time"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pss"
"github.com/ethersphere/bee/pkg/resolver"
......@@ -38,6 +39,7 @@ type testServerOptions struct {
WsPingPeriod time.Duration
Logger logging.Logger
PreventRedirect bool
Feeds feeds.Factory
}
func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string) {
......@@ -50,7 +52,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
if o.WsPingPeriod == 0 {
o.WsPingPeriod = 60 * time.Second
}
s := api.New(o.Tags, o.Storer, o.Resolver, o.Pss, o.Traversal, o.Logger, nil, api.Options{
s := api.New(o.Tags, o.Storer, o.Resolver, o.Pss, o.Traversal, o.Feeds, o.Logger, nil, api.Options{
GatewayMode: o.GatewayMode,
WsPingPeriod: o.WsPingPeriod,
})
......@@ -167,7 +169,7 @@ func TestParseName(t *testing.T) {
}))
}
s := api.New(nil, nil, tC.res, nil, nil, tC.log, nil, api.Options{}).(*api.Server)
s := api.New(nil, nil, tC.res, nil, nil, nil, tC.log, nil, api.Options{}).(*api.Server)
t.Run(tC.desc, func(t *testing.T) {
got, err := s.ResolveNameOrAddress(tC.name)
......
......@@ -90,5 +90,5 @@ func (s *server) bytesGetHandler(w http.ResponseWriter, r *http.Request) {
"Content-Type": {"application/octet-stream"},
}
s.downloadHandler(w, r, address, additionalHeaders)
s.downloadHandler(w, r, address, additionalHeaders, true)
}
......@@ -7,16 +7,20 @@ package api
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net/http"
"path"
"strings"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/gorilla/mux"
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/file/loadsave"
......@@ -26,10 +30,14 @@ import (
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/manifest/mantaray"
)
func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
logger := tracing.NewLoggerWithTraceID(r.Context(), s.Logger)
ls := loadsave.New(s.Storer, storage.ModePutRequest, false)
feedDereferenced := false
targets := r.URL.Query().Get("targets")
if targets != "" {
r = r.WithContext(sctx.SetTargets(r.Context(), targets))
......@@ -52,6 +60,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
return
}
FETCH:
// read manifest entry
j, _, err := joiner.New(ctx, s.Storer, address)
if err != nil {
......@@ -69,6 +78,47 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
jsonhttp.NotFound(w, nil)
return
}
// there's a possible ambiguity here, right now the data which was
// read can be an entry.Entry or a mantaray feed manifest. Try to
// unmarshal as mantaray first and possibly resolve the feed, otherwise
// go on normally.
if !feedDereferenced {
if l, err := s.manifestFeed(ctx, ls, buf.Bytes()); err == nil {
//we have a feed manifest here
ch, cur, _, err := l.At(ctx, time.Now().Unix(), 0)
if err != nil {
logger.Debugf("bzz download: feed lookup: %v", err)
logger.Error("bzz download: feed lookup")
jsonhttp.NotFound(w, "feed not found")
return
}
ref, _, err := parseFeedUpdate(ch)
if err != nil {
logger.Debugf("bzz download: parse feed update: %v", err)
logger.Error("bzz download: parse feed update")
jsonhttp.InternalServerError(w, "parse feed update")
return
}
address = ref
feedDereferenced = true
curBytes, err := cur.MarshalBinary()
if err != nil {
s.Logger.Debugf("bzz download: marshal feed index: %v", err)
s.Logger.Error("bzz download: marshal index")
jsonhttp.InternalServerError(w, "marshal index")
return
}
w.Header().Set(SwarmFeedIndexHeader, hex.EncodeToString(curBytes))
// this header might be overriding others. handle with care. in the future
// we should implement an append functionality for this specific header,
// since different parts of handlers might be overriding others' values
// resulting in inconsistent headers in the response.
w.Header().Set("Access-Control-Expose-Headers", SwarmFeedIndexHeader)
goto FETCH
}
}
e := &entry.Entry{}
err = e.UnmarshalBinary(buf.Bytes())
if err != nil {
......@@ -109,7 +159,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
m, err := manifest.NewManifestReference(
manifestMetadata.MimeType,
e.Reference(),
loadsave.New(s.Storer, storage.ModePutRequest, false), // mode and encryption values are fallback
ls,
)
if err != nil {
logger.Debugf("bzz download: not manifest %s: %v", address, err)
......@@ -128,7 +178,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
// index document exists
logger.Debugf("bzz download: serving path: %s", pathWithIndex)
s.serveManifestEntry(w, r, address, indexDocumentManifestEntry.Reference())
s.serveManifestEntry(w, r, address, indexDocumentManifestEntry.Reference(), !feedDereferenced)
return
}
}
......@@ -168,7 +218,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
// index document exists
logger.Debugf("bzz download: serving path: %s", pathWithIndex)
s.serveManifestEntry(w, r, address, indexDocumentManifestEntry.Reference())
s.serveManifestEntry(w, r, address, indexDocumentManifestEntry.Reference(), !feedDereferenced)
return
}
}
......@@ -182,7 +232,7 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
// error document exists
logger.Debugf("bzz download: serving path: %s", errorDocumentPath)
s.serveManifestEntry(w, r, address, errorDocumentManifestEntry.Reference())
s.serveManifestEntry(w, r, address, errorDocumentManifestEntry.Reference(), !feedDereferenced)
return
}
}
......@@ -196,10 +246,10 @@ func (s *server) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
}
// serve requested path
s.serveManifestEntry(w, r, address, me.Reference())
s.serveManifestEntry(w, r, address, me.Reference(), !feedDereferenced)
}
func (s *server) serveManifestEntry(w http.ResponseWriter, r *http.Request, address, manifestEntryAddress swarm.Address) {
func (s *server) serveManifestEntry(w http.ResponseWriter, r *http.Request, address, manifestEntryAddress swarm.Address, etag bool) {
var (
logger = tracing.NewLoggerWithTraceID(r.Context(), s.Logger)
ctx = r.Context()
......@@ -264,7 +314,7 @@ func (s *server) serveManifestEntry(w http.ResponseWriter, r *http.Request, addr
fileEntryAddress := fe.Reference()
s.downloadHandler(w, r, fileEntryAddress, additionalHeaders)
s.downloadHandler(w, r, fileEntryAddress, additionalHeaders, etag)
}
// manifestMetadataLoad returns the value for a key stored in the metadata of
......@@ -283,3 +333,41 @@ func manifestMetadataLoad(ctx context.Context, manifest manifest.Interface, path
return "", false
}
func (s *server) manifestFeed(ctx context.Context, ls file.LoadSaver, candidate []byte) (feeds.Lookup, error) {
node := new(mantaray.Node)
err := node.UnmarshalBinary(candidate)
if err != nil {
return nil, fmt.Errorf("node unmarshal: %w", err)
}
e, err := node.LookupNode(context.Background(), []byte("/"), ls)
if err != nil {
return nil, fmt.Errorf("node lookup: %w", err)
}
var (
owner, topic []byte
t = new(feeds.Type)
)
meta := e.Metadata()
if e := meta[feedMetadataEntryOwner]; e != "" {
owner, err = hex.DecodeString(e)
if err != nil {
return nil, err
}
}
if e := meta[feedMetadataEntryTopic]; e != "" {
topic, err = hex.DecodeString(e)
if err != nil {
return nil, err
}
}
if e := meta[feedMetadataEntryType]; e != "" {
err := t.FromString(e)
if err != nil {
return nil, err
}
}
f := feeds.New(topic, common.BytesToAddress(owner))
return s.feedFactory.NewLookup(*t, f)
}
......@@ -7,15 +7,18 @@ package api_test
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"mime"
"net/http"
"os"
"strings"
"testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file/loadsave"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
......@@ -171,3 +174,84 @@ func TestBzz(t *testing.T) {
})
}
func TestFeedIndirection(t *testing.T) {
// first, "upload" some content for the update
var (
updateData = []byte("<h1>Swarm Feeds Hello World!</h1>")
mockStatestore = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0)
storer = smock.NewStorer()
client, _, _ = newTestServer(t, testServerOptions{
Storer: storer,
Tags: tags.NewTags(mockStatestore, logger),
Logger: logger,
})
)
// tar all the test case files
tarReader := tarFiles(t, []f{
{
data: updateData,
name: "index.html",
dir: "",
filePath: "./index.html",
},
})
var resp api.FileUploadResponse
options := []jsonhttptest.Option{
jsonhttptest.WithRequestBody(tarReader),
jsonhttptest.WithRequestHeader("Content-Type", api.ContentTypeTar),
jsonhttptest.WithUnmarshalJSONResponse(&resp),
jsonhttptest.WithRequestHeader(api.SwarmIndexDocumentHeader, "index.html"),
}
// verify directory tar upload response
jsonhttptest.Request(t, client, http.MethodPost, "/dirs", http.StatusOK, options...)
if resp.Reference.String() == "" {
t.Fatalf("expected file reference, did not got any")
}
// now use the "content" to mock the feed lookup
// also, use the mocked mantaray chunks that unmarshal
// into a real manifest with the mocked feed values when
// called from the bzz endpoint. then call the bzz endpoint with
// the pregenerated feed root manifest hash
feedUpdate, _ := toChunk(121212, resp.Reference.Bytes())
var (
feedChunkAddr = swarm.MustParseHexAddress("891a1d1c8436c792d02fc2e8883fef7ab387eaeaacd25aa9f518be7be7856d54")
feedChunkData, _ = hex.DecodeString("400100000000000000000000000000000000000000000000000000000000000000000000000000005768b3b6a7db56d21d1abff40d41cebfc83448fed8d7e9b06ec0d3b073f28f200000000000000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000012012f00000000000000000000000000000000000000000000000000000000008504f2a107ca940beafc4ce2f6c9a9f0968c62a5b5893ff0e4e1e2983048d276007e7b22737761726d2d666565642d6f776e6572223a2238643337363634343066306437623934396135653332393935643039363139613766383665363332222c22737761726d2d666565642d746f706963223a22616162626363222c22737761726d2d666565642d74797065223a2253657175656e6365227d0a0a0a0a0a0a")
chData, _ = hex.DecodeString("800000000000000000000000000000000000000000000000000000000000000000000000000000005768b3b6a7db56d21d1abff40d41cebfc83448fed8d7e9b06ec0d3b073f28f2000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
manifestCh = swarm.NewChunk(swarm.MustParseHexAddress("8504f2a107ca940beafc4ce2f6c9a9f0968c62a5b5893ff0e4e1e2983048d276"), chData)
look = newMockLookup(-1, 0, feedUpdate, nil, &id{}, nil)
factory = newMockFactory(look)
bzzDownloadResource = func(addr, path string) string { return "/bzz/" + addr + "/" + path }
ctx = context.Background()
)
client, _, _ = newTestServer(t, testServerOptions{
Storer: storer,
Tags: tags.NewTags(mockStatestore, logger),
Logger: logging.New(os.Stdout, 5),
Feeds: factory,
})
_, err := storer.Put(ctx, storage.ModePutUpload, swarm.NewChunk(feedChunkAddr, feedChunkData))
if err != nil {
t.Fatal(err)
}
_, err = storer.Put(ctx, storage.ModePutUpload, feedUpdate)
if err != nil {
t.Fatal(err)
}
_, err = storer.Put(ctx, storage.ModePutUpload, manifestCh)
if err != nil {
t.Fatal(err)
}
jsonhttptest.Request(t, client, http.MethodGet, bzzDownloadResource(feedChunkAddr.String(), ""), http.StatusOK,
jsonhttptest.WithExpectedResponse(updateData),
)
}
......@@ -12,6 +12,7 @@ type (
BytesPostResponse = bytesPostResponse
ChunkAddressResponse = chunkAddressResponse
SocPostResponse = socPostResponse
FeedReferenceResponse = feedReferenceResponse
FileUploadResponse = fileUploadResponse
TagResponse = tagResponse
TagRequest = tagRequest
......@@ -36,6 +37,12 @@ var (
ErrInvalidNameOrAddress = errInvalidNameOrAddress
)
var (
FeedMetadataEntryOwner = feedMetadataEntryOwner
FeedMetadataEntryTopic = feedMetadataEntryTopic
FeedMetadataEntryType = feedMetadataEntryType
)
func (s *Server) ResolveNameOrAddress(str string) (swarm.Address, error) {
return s.resolveNameOrAddress(str)
}
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"net/http"
"strconv"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/file/loadsave"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/manifest"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/gorilla/mux"
)
const (
feedMetadataEntryOwner = "swarm-feed-owner"
feedMetadataEntryTopic = "swarm-feed-topic"
feedMetadataEntryType = "swarm-feed-type"
)
var (
errInvalidFeedUpdate = errors.New("invalid feed update")
)
type feedReferenceResponse struct {
Reference swarm.Address `json:"reference"`
}
func (s *server) feedGetHandler(w http.ResponseWriter, r *http.Request) {
owner, err := hex.DecodeString(mux.Vars(r)["owner"])
if err != nil {
s.Logger.Debugf("feed get: decode owner: %v", err)
s.Logger.Error("feed get: bad owner")
jsonhttp.BadRequest(w, "bad owner")
return
}
topic, err := hex.DecodeString(mux.Vars(r)["topic"])
if err != nil {
s.Logger.Debugf("feed get: decode topic: %v", err)
s.Logger.Error("feed get: bad topic")
jsonhttp.BadRequest(w, "bad topic")
return
}
var at int64
atStr := r.URL.Query().Get("at")
if atStr != "" {
at, err = strconv.ParseInt(atStr, 10, 64)
if err != nil {
s.Logger.Debugf("feed get: decode at: %v", err)
s.Logger.Error("feed get: bad at")
jsonhttp.BadRequest(w, "bad at")
return
}
} else {
at = time.Now().Unix()
}
f := feeds.New(topic, common.BytesToAddress(owner))
lookup, err := s.feedFactory.NewLookup(feeds.Sequence, f)
if err != nil {
s.Logger.Debugf("feed get: new lookup: %v", err)
s.Logger.Error("feed get: new lookup")
jsonhttp.InternalServerError(w, "new lookup")
return
}
ch, cur, next, err := lookup.At(r.Context(), at, 0)
if err != nil {
s.Logger.Debugf("feed get: lookup: %v", err)
s.Logger.Error("feed get: lookup error")
jsonhttp.NotFound(w, "lookup failed")
return
}
// KLUDGE: if a feed was never updated, the chunk will be nil
if ch == nil {
s.Logger.Debugf("feed get: no update found: %v", err)
s.Logger.Error("feed get: no update found")
jsonhttp.NotFound(w, "lookup failed")
return
}
ref, _, err := parseFeedUpdate(ch)
if err != nil {
s.Logger.Debugf("feed get: parse update: %v", err)
s.Logger.Error("feed get: parse update")
jsonhttp.InternalServerError(w, "parse update")
return
}
curBytes, err := cur.MarshalBinary()
if err != nil {
s.Logger.Debugf("feed get: marshal current index: %v", err)
s.Logger.Error("feed get: marshal index")
jsonhttp.InternalServerError(w, "marshal index")
return
}
nextBytes, err := next.MarshalBinary()
if err != nil {
s.Logger.Debugf("feed get: marshal next index: %v", err)
s.Logger.Error("feed get: marshal index")
jsonhttp.InternalServerError(w, "marshal index")
return
}
w.Header().Set(SwarmFeedIndexHeader, hex.EncodeToString(curBytes))
w.Header().Set(SwarmFeedIndexNextHeader, hex.EncodeToString(nextBytes))
w.Header().Set("Access-Control-Expose-Headers", fmt.Sprintf("%s, %s", SwarmFeedIndexHeader, SwarmFeedIndexNextHeader))
jsonhttp.OK(w, feedReferenceResponse{Reference: ref})
}
func (s *server) feedPostHandler(w http.ResponseWriter, r *http.Request) {
owner, err := hex.DecodeString(mux.Vars(r)["owner"])
if err != nil {
s.Logger.Debugf("feed put: decode owner: %v", err)
s.Logger.Error("feed put: bad owner")
jsonhttp.BadRequest(w, "bad owner")
return
}
topic, err := hex.DecodeString(mux.Vars(r)["topic"])
if err != nil {
s.Logger.Debugf("feed put: decode topic: %v", err)
s.Logger.Error("feed put: bad topic")
jsonhttp.BadRequest(w, "bad topic")
return
}
l := loadsave.New(s.Storer, requestModePut(r), false)
feedManifest, err := manifest.NewDefaultManifest(l, false)
if err != nil {
s.Logger.Debugf("feed put: new manifest: %v", err)
s.Logger.Error("feed put: new manifest")
jsonhttp.InternalServerError(w, "create manifest")
return
}
meta := map[string]string{
feedMetadataEntryOwner: hex.EncodeToString(owner),
feedMetadataEntryTopic: hex.EncodeToString(topic),
feedMetadataEntryType: feeds.Sequence.String(), // only sequence allowed for now
}
emptyAddr := make([]byte, 32)
// a feed manifest stores the metadata at the root "/" path
err = feedManifest.Add(r.Context(), "/", manifest.NewEntry(swarm.NewAddress(emptyAddr), meta))
if err != nil {
s.Logger.Debugf("feed post: add manifest entry: %v", err)
s.Logger.Error("feed post: add manifest entry")
jsonhttp.InternalServerError(w, nil)
return
}
ref, err := feedManifest.Store(r.Context())
if err != nil {
s.Logger.Debugf("feed post: store manifest: %v", err)
s.Logger.Error("feed post: store manifest")
jsonhttp.InternalServerError(w, nil)
return
}
jsonhttp.Created(w, feedReferenceResponse{Reference: ref})
}
func parseFeedUpdate(ch swarm.Chunk) (swarm.Address, int64, error) {
sch, err := soc.FromChunk(ch)
if err != nil {
return swarm.ZeroAddress, 0, fmt.Errorf("soc unmarshal: %w", err)
}
update := sch.Chunk.Data()
// split the timestamp and reference
// possible values right now:
// unencrypted ref: span+timestamp+ref => 8+8+32=48
// encrypted ref: span+timestamp+ref+decryptKey => 8+8+64=80
if len(update) != 48 && len(update) != 80 {
return swarm.ZeroAddress, 0, errInvalidFeedUpdate
}
ts := binary.BigEndian.Uint64(update[8:16])
ref := swarm.NewAddress(update[16:])
return ref, int64(ts), nil
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api_test
import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io/ioutil"
"net/http"
"testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/file/loadsave"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/manifest"
"github.com/ethersphere/bee/pkg/soc"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
)
const ownerString = "8d3766440f0d7b949a5e32995d09619a7f86e632"
var expReference = swarm.MustParseHexAddress("891a1d1c8436c792d02fc2e8883fef7ab387eaeaacd25aa9f518be7be7856d54")
func TestFeed_Get(t *testing.T) {
var (
feedResource = func(owner, topic, at string) string {
if at != "" {
return fmt.Sprintf("/feeds/%s/%s?at=%s", owner, topic, at)
}
return fmt.Sprintf("/feeds/%s/%s", owner, topic)
}
mockStatestore = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0)
tag = tags.NewTags(mockStatestore, logger)
mockStorer = mock.NewStorer()
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Tags: tag,
})
)
t.Run("malformed owner", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodGet, feedResource("xyz", "cc", ""), http.StatusBadRequest,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: "bad owner",
Code: http.StatusBadRequest,
}),
)
})
t.Run("malformed topic", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodGet, feedResource("8d3766440f0d7b949a5e32995d09619a7f86e632", "xxzzyy", ""), http.StatusBadRequest,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: "bad topic",
Code: http.StatusBadRequest,
}),
)
})
t.Run("at malformed", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodGet, feedResource("8d3766440f0d7b949a5e32995d09619a7f86e632", "aabbcc", "unbekannt"), http.StatusBadRequest,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: "bad at",
Code: http.StatusBadRequest,
}),
)
})
t.Run("with at", func(t *testing.T) {
var (
timestamp = int64(12121212)
ch, _ = toChunk(uint64(timestamp), expReference.Bytes())
look = newMockLookup(12, 0, ch, nil, &id{}, &id{})
factory = newMockFactory(look)
idBytes, _ = (&id{}).MarshalBinary()
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Tags: tag,
Feeds: factory,
})
)
respHeaders := jsonhttptest.Request(t, client, http.MethodGet, feedResource(ownerString, "aabbcc", "12"), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.FeedReferenceResponse{Reference: expReference}),
)
h := respHeaders[api.SwarmFeedIndexHeader]
if len(h) == 0 {
t.Fatal("expected swarm feed index header to be set")
}
b, err := hex.DecodeString(h[0])
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(b, idBytes) {
t.Fatalf("feed index header mismatch. got %v want %v", b, idBytes)
}
})
t.Run("latest", func(t *testing.T) {
var (
timestamp = int64(12121212)
ch, _ = toChunk(uint64(timestamp), expReference.Bytes())
look = newMockLookup(-1, 2, ch, nil, &id{}, &id{})
factory = newMockFactory(look)
idBytes, _ = (&id{}).MarshalBinary()
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Tags: tag,
Feeds: factory,
})
)
respHeaders := jsonhttptest.Request(t, client, http.MethodGet, feedResource(ownerString, "aabbcc", ""), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.FeedReferenceResponse{Reference: expReference}),
)
if h := respHeaders[api.SwarmFeedIndexHeader]; len(h) > 0 {
b, err := hex.DecodeString(h[0])
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(b, idBytes) {
t.Fatalf("feed index header mismatch. got %v want %v", b, idBytes)
}
} else {
t.Fatal("expected swarm feed index header to be set")
}
})
}
func TestFeed_Post(t *testing.T) {
// post to owner, tpoic, then expect a reference
// get the reference from the store, unmarshal to a
// manifest entry and make sure all metadata correct
var (
mockStatestore = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0)
tag = tags.NewTags(mockStatestore, logger)
topic = "aabbcc"
mockStorer = mock.NewStorer()
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
Tags: tag,
Logger: logger,
})
)
t.Run("ok", func(t *testing.T) {
url := fmt.Sprintf("/feeds/%s/%s?type=%s", ownerString, topic, "sequence")
jsonhttptest.Request(t, client, http.MethodPost, url, http.StatusCreated,
jsonhttptest.WithExpectedJSONResponse(api.FeedReferenceResponse{
Reference: expReference,
}),
)
ls := loadsave.New(mockStorer, storage.ModePutUpload, false)
i, err := manifest.NewMantarayManifestReference(expReference, ls)
if err != nil {
t.Fatal(err)
}
e, err := i.Lookup(context.Background(), "/")
if err != nil {
t.Fatal(err)
}
meta := e.Metadata()
if e := meta[api.FeedMetadataEntryOwner]; e != ownerString {
t.Fatalf("owner mismatch. got %s want %s", e, ownerString)
}
if e := meta[api.FeedMetadataEntryTopic]; e != topic {
t.Fatalf("topic mismatch. got %s want %s", e, topic)
}
if e := meta[api.FeedMetadataEntryType]; e != "Sequence" {
t.Fatalf("type mismatch. got %s want %s", e, "Sequence")
}
})
}
type factoryMock struct {
sequenceCalled bool
epochCalled bool
feed *feeds.Feed
lookup feeds.Lookup
}
func newMockFactory(mockLookup feeds.Lookup) *factoryMock {
return &factoryMock{lookup: mockLookup}
}
func (f *factoryMock) NewLookup(t feeds.Type, feed *feeds.Feed) (feeds.Lookup, error) {
switch t {
case feeds.Sequence:
f.sequenceCalled = true
case feeds.Epoch:
f.epochCalled = true
}
f.feed = feed
return f.lookup, nil
}
type mockLookup struct {
at, after int64
chunk swarm.Chunk
err error
cur, next feeds.Index
}
func newMockLookup(at, after int64, ch swarm.Chunk, err error, cur, next feeds.Index) *mockLookup {
return &mockLookup{at: at, after: after, chunk: ch, err: err, cur: cur, next: next}
}
func (l *mockLookup) At(_ context.Context, at, after int64) (swarm.Chunk, feeds.Index, feeds.Index, error) {
if l.at == -1 {
// shortcut to ignore the value in the call since time.Now() is a moving target
return l.chunk, l.cur, l.next, nil
}
if at == l.at && after == l.after {
return l.chunk, l.cur, l.next, nil
}
return nil, nil, nil, errors.New("no feed update found")
}
func toChunk(at uint64, payload []byte) (swarm.Chunk, error) {
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts, at)
content := append(ts, payload...)
ch, err := cac.New(content)
if err != nil {
return nil, err
}
id := make([]byte, soc.IdSize)
privKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
return nil, err
}
signer := crypto.NewDefaultSigner(privKey)
sch := soc.New(id, ch)
if err != nil {
return nil, err
}
err = sch.AddSigner(signer)
if err != nil {
return nil, err
}
return sch.ToChunk()
}
type id struct {
}
func (i *id) MarshalBinary() ([]byte, error) {
return []byte("accd"), nil
}
......@@ -341,11 +341,11 @@ func (s *server) fileDownloadHandler(w http.ResponseWriter, r *http.Request) {
"Content-Type": {metaData.MimeType},
}
s.downloadHandler(w, r, e.Reference(), additionalHeaders)
s.downloadHandler(w, r, e.Reference(), additionalHeaders, true)
}
// downloadHandler contains common logic for dowloading Swarm file from API
func (s *server) downloadHandler(w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header) {
func (s *server) downloadHandler(w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header, etag bool) {
logger := tracing.NewLoggerWithTraceID(r.Context(), s.Logger)
targets := r.URL.Query().Get("targets")
if targets != "" {
......@@ -377,8 +377,9 @@ func (s *server) downloadHandler(w http.ResponseWriter, r *http.Request, referen
}
w.Header().Set(name, v)
}
w.Header().Set("ETag", fmt.Sprintf("%q", reference))
if etag {
w.Header().Set("ETag", fmt.Sprintf("%q", reference))
}
w.Header().Set("Content-Length", fmt.Sprintf("%d", l))
w.Header().Set("Decompressed-Content-Length", fmt.Sprintf("%d", l))
if targets != "" {
......
......@@ -89,6 +89,14 @@ func (s *server) setupRouting() {
),
})
handle(router, "/feeds/{owner}/{topic}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.feedGetHandler),
"POST": web.ChainHandlers(
jsonhttp.NewMaxBodyBytesHandler(swarm.ChunkWithSpanSize),
web.FinalHandlerFunc(s.feedPostHandler),
),
})
handle(router, "/bzz/{address}", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
u := r.URL
u.Path += "/"
......
......@@ -13,7 +13,6 @@ import (
"net/http"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/jsonhttp"
......@@ -22,20 +21,16 @@ import (
"github.com/ethersphere/bee/pkg/soc"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage/mock"
testingc "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
)
func TestSoc(t *testing.T) {
var (
socResource = func(owner, id, sig string) string { return fmt.Sprintf("/soc/%s/%s?sig=%s", owner, id, sig) }
_ = testingc.GenerateTestRandomChunk()
mockStatestore = statestore.NewStateStore()
logger = logging.New(ioutil.Discard, 0)
tag = tags.NewTags(mockStatestore, logger)
_ = common.HexToAddress("8d3766440f0d7b949a5e32995d09619a7f86e632")
mockStorer = mock.NewStorer()
client, _, _ = newTestServer(t, testServerOptions{
Storer: mockStorer,
......
package cac
import (
"encoding/binary"
"github.com/ethersphere/bee/pkg/bmtpool"
"github.com/ethersphere/bee/pkg/swarm"
)
func New(data []byte) (swarm.Chunk, error) {
hasher := bmtpool.Get()
defer bmtpool.Put(hasher)
_, err := hasher.Write(data)
if err != nil {
return nil, err
}
span := make([]byte, 8)
binary.LittleEndian.PutUint64(span, uint64(len(data)))
err = hasher.SetSpanBytes(span)
if err != nil {
return nil, err
}
return swarm.NewChunk(swarm.NewAddress(hasher.Sum(nil)), append(span, data...)), nil
}
package cac_test
import (
"testing"
"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/swarm"
)
func TestCac(t *testing.T) {
bmtHashOfFoo := "2387e8e7d8a48c2a9339c97c1dc3461a9a7aa07e994c5cb8b38fd7c1b3e6ea48"
address := swarm.MustParseHexAddress(bmtHashOfFoo)
foo := "foo"
c, err := cac.New([]byte(foo))
if err != nil {
t.Fatal(err)
}
if !c.Address().Equal(address) {
t.Fatalf("address mismatch. got %s want %s", c.Address().String(), address.String())
}
}
......@@ -29,12 +29,13 @@ func NewFinder(getter storage.Getter, feed *feeds.Feed) feeds.Lookup {
// At looks up the version valid at time `at`
// after is a unix time hint of the latest known update
func (f *finder) At(ctx context.Context, at, after int64) (swarm.Chunk, error) {
func (f *finder) At(ctx context.Context, at, after int64) (swarm.Chunk, feeds.Index, feeds.Index, error) {
e, ch, err := f.common(ctx, at, after)
if err != nil {
return nil, err
return nil, nil, nil, err
}
return f.at(ctx, uint64(at), e, ch)
ch, err = f.at(ctx, uint64(at), e, ch)
return ch, nil, nil, err
}
// common returns the lowest common ancestor for which a feed update chunk is found in the chunk store
......@@ -164,10 +165,15 @@ func (f *asyncFinder) at(ctx context.Context, at int64, p *path, e *epoch, c cha
}
}
}
func (f *asyncFinder) At(ctx context.Context, at, after int64) (swarm.Chunk, feeds.Index, feeds.Index, error) {
// TODO: current and next index return values need to be implemented
ch, err := f.asyncAt(ctx, at, after)
return ch, nil, nil, err
}
// At looks up the version valid at time `at`
// after is a unix time hint of the latest known update
func (f *asyncFinder) At(ctx context.Context, at, after int64) (swarm.Chunk, error) {
func (f *asyncFinder) asyncAt(ctx context.Context, at, after int64) (swarm.Chunk, error) {
c := make(chan *result)
go f.at(ctx, at, newPath(at), &epoch{0, maxLevel}, c)
LOOP:
......
......@@ -44,7 +44,11 @@ func BenchmarkFinder(b *testing.B) {
for _, prefill := range []int64{1, 50} {
after := int64(50)
storer := &timeout{mock.NewStorer()}
topic := "testtopic"
topicStr := "testtopic"
topic, err := crypto.LegacyKeccak256([]byte(topicStr))
if err != nil {
b.Fatal(err)
}
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)
......@@ -78,7 +82,7 @@ func BenchmarkFinder(b *testing.B) {
names := []string{"sync", "async"}
b.Run(fmt.Sprintf("%s:prefill=%d, latest=%d, now=%d", names[k], prefill, latest, now), func(b *testing.B) {
for n := 0; n < b.N; n++ {
_, err := finder.At(ctx, now, after)
_, _, _, err := finder.At(ctx, now, after)
if err != nil {
b.Fatal(err)
}
......
......@@ -15,7 +15,7 @@ import (
)
func TestFinder(t *testing.T) {
testf := func(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic string) (feeds.Updater, error)) {
testf := func(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) {
t.Run("basic", func(t *testing.T) {
feedstesting.TestFinderBasic(t, finderf, updaterf)
})
......
......@@ -23,7 +23,7 @@ type updater struct {
}
// NewUpdater constructs a feed updater
func NewUpdater(putter storage.Putter, signer crypto.Signer, topic string) (feeds.Updater, error) {
func NewUpdater(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error) {
p, err := feeds.NewPutter(putter, signer, topic)
if err != nil {
return nil, err
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package factory
import (
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/feeds/epochs"
"github.com/ethersphere/bee/pkg/feeds/sequence"
"github.com/ethersphere/bee/pkg/storage"
)
type factory struct {
storage.Getter
}
func New(getter storage.Getter) feeds.Factory {
return &factory{getter}
}
func (f *factory) NewLookup(t feeds.Type, feed *feeds.Feed) (feeds.Lookup, error) {
switch t {
case feeds.Sequence:
return sequence.NewAsyncFinder(f.Getter, feed), nil
case feeds.Epoch:
return epochs.NewAsyncFinder(f.Getter, feed), nil
}
return nil, feeds.ErrFeedTypeNotFound
}
......@@ -10,13 +10,54 @@ package feeds
import (
"encoding"
"errors"
"fmt"
"strings"
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
var ErrFeedTypeNotFound = errors.New("no such feed type")
// Factory creates feed lookups for different types of feeds.
type Factory interface {
NewLookup(Type, *Feed) (Lookup, error)
}
type Type int
const (
Sequence Type = iota
Epoch
)
func (t Type) String() string {
switch t {
case Sequence:
return "Sequence"
case Epoch:
return "Epoch"
default:
return ""
}
}
func (t *Type) FromString(s string) error {
switch s = strings.ToLower(s); s {
case "sequence":
*t = Sequence
case "epoch":
*t = Epoch
default:
return ErrFeedTypeNotFound
}
return nil
}
type id struct {
topic []byte
index []byte
......@@ -34,16 +75,13 @@ type Feed struct {
Owner common.Address
}
// New constructs an epoch based feed from a human readable topic and an ether address
func New(topic string, owner common.Address) (*Feed, error) {
th, err := crypto.LegacyKeccak256([]byte(topic))
if err != nil {
return nil, err
}
return &Feed{th, owner}, nil
// New constructs an epoch based feed from a keccak256 digest of a plaintext
// topic and an ether address.
func New(topic []byte, owner common.Address) *Feed {
return &Feed{topic, owner}
}
// Index is the interface for feed implementations
// Index is the interface for feed implementations.
type Index interface {
encoding.BinaryMarshaler
}
......@@ -59,6 +97,26 @@ func (f *Feed) Update(index Index) *Update {
return &Update{f, index}
}
func NewUpdate(f *Feed, idx Index, timestamp int64, payload []byte, sig []byte) (swarm.Chunk, error) {
id, err := f.Update(idx).Id()
if err != nil {
return nil, fmt.Errorf("update: %w", err)
}
cac, err := toChunk(uint64(timestamp), payload)
if err != nil {
return nil, fmt.Errorf("toChunk: %w", err)
}
ch, err := soc.NewSignedChunk(id, cac, f.Owner.Bytes(), sig)
if err != nil {
return nil, fmt.Errorf("new chunk: %w", err)
}
if !soc.Valid(ch) {
return nil, storage.ErrInvalidChunk
}
return ch, nil
}
// Id calculates the identifier if a feed update to be used in single owner chunks
func (u *Update) Id() ([]byte, error) {
index, err := u.index.MarshalBinary()
......
......@@ -17,7 +17,7 @@ import (
// Lookup is the interface for time based feed lookup
type Lookup interface {
At(ctx context.Context, at, after int64) (swarm.Chunk, error)
At(ctx context.Context, at, after int64) (chunk swarm.Chunk, currentIndex, nextIndex Index, err error)
}
// Getter encapsulates a chunk Getter getter and a feed and provides
......@@ -35,7 +35,8 @@ func NewGetter(getter storage.Getter, feed *Feed) *Getter {
// Latest looks up the latest update of the feed
// after is a unix time hint of the latest known update
func Latest(ctx context.Context, l Lookup, after int64) (swarm.Chunk, error) {
return l.At(ctx, time.Now().Unix(), after)
c, _, _, err := l.At(ctx, time.Now().Unix(), after)
return c, err
}
// Get creates an update of the underlying feed at the given epoch
......
......@@ -29,15 +29,12 @@ type Putter struct {
}
// NewPutter constructs a feed Putter
func NewPutter(putter storage.Putter, signer crypto.Signer, topic string) (*Putter, error) {
func NewPutter(putter storage.Putter, signer crypto.Signer, topic []byte) (*Putter, error) {
owner, err := signer.EthereumAddress()
if err != nil {
return nil, err
}
feed, err := New(topic, owner)
if err != nil {
return nil, err
}
feed := New(topic, owner)
return &Putter{putter, signer, feed}, nil
}
......
......@@ -2,10 +2,6 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sequence_test
import (
......@@ -46,7 +42,11 @@ func (t *timeout) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Addr
func BenchmarkFinder(b *testing.B) {
for _, prefill := range []int64{1, 100, 1000, 5000} {
storer := &timeout{mock.NewStorer()}
topic := "testtopic"
topicStr := "testtopic"
topic, err := crypto.LegacyKeccak256([]byte(topicStr))
if err != nil {
b.Fatal(err)
}
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)
......@@ -78,7 +78,7 @@ func BenchmarkFinder(b *testing.B) {
names := []string{"sync", "async"}
b.Run(fmt.Sprintf("%s:prefill=%d, latest/now=%d", names[k], prefill, now), func(b *testing.B) {
for n := 0; n < b.N; n++ {
_, err := finder.At(ctx, now, 0)
_, _, _, err := finder.At(ctx, now, 0)
if err != nil {
b.Fatal(err)
}
......
......@@ -15,7 +15,7 @@ import (
)
func TestFinder(t *testing.T) {
testf := func(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic string) (feeds.Updater, error)) {
testf := func(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) {
t.Run("basic", func(t *testing.T) {
feedstesting.TestFinderBasic(t, finderf, updaterf)
})
......
......@@ -49,21 +49,21 @@ func NewFinder(getter storage.Getter, feed *feeds.Feed) feeds.Lookup {
// At looks up the version valid at time `at`
// after is a unix time hint of the latest known update
func (f *finder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, err error) {
func (f *finder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, current, next feeds.Index, err error) {
for i := uint64(0); ; i++ {
u, err := f.getter.Get(ctx, &index{i})
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return nil, err
return nil, nil, nil, err
}
return ch, nil
return ch, &index{i - 1}, &index{i}, nil
}
ts, err := feeds.UpdatedAt(u)
if err != nil {
return nil, err
return nil, nil, nil, err
}
if ts > uint64(at) {
return ch, nil
return ch, &index{i}, nil, nil
}
ch = u
}
......@@ -109,16 +109,16 @@ type result struct {
// At looks up the version valid at time `at`
// after is a unix time hint of the latest known update
func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, err error) {
func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk, cur, next feeds.Index, err error) {
ch, diff, err := f.get(ctx, at, 0)
if err != nil {
return nil, err
return nil, nil, nil, err
}
if ch == nil {
return nil, nil
return nil, nil, nil, nil
}
if diff == 0 {
return ch, nil
return ch, &index{0}, &index{1}, nil
}
c := make(chan result)
p := newPath(0)
......@@ -132,7 +132,7 @@ func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk,
p = r.path
if r.chunk == nil {
if r.level == 0 {
return p.latest.chunk, nil
return p.latest.chunk, &index{p.latest.seq}, &index{p.latest.seq + 1}, nil
}
if p.level < r.level {
continue
......@@ -140,7 +140,7 @@ func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk,
p.level = r.level - 1
} else {
if r.diff == 0 {
return r.chunk, nil
return r.chunk, &index{r.seq}, &index{r.seq + 1}, nil
}
if p.latest.level > r.level {
continue
......@@ -151,7 +151,7 @@ func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk,
// below applies even if p.latest==maxLevel
if p.latest.level == p.level {
if p.level == 0 {
return p.latest.chunk, nil
return p.latest.chunk, &index{p.latest.seq}, &index{p.latest.seq + 1}, nil
}
p.close()
np := newPath(p.latest.seq)
......@@ -160,7 +160,7 @@ func (f *asyncFinder) At(ctx context.Context, at, after int64) (ch swarm.Chunk,
go f.at(ctx, at, np, c, quit)
}
}
return nil, nil
return nil, nil, nil, nil
}
func (f *asyncFinder) at(ctx context.Context, at int64, p *path, c chan<- result, quit <-chan struct{}) {
......@@ -216,7 +216,7 @@ type updater struct {
}
// NewUpdater constructs a feed updater
func NewUpdater(putter storage.Putter, signer crypto.Signer, topic string) (feeds.Updater, error) {
func NewUpdater(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error) {
p, err := feeds.NewPutter(putter, signer, topic)
if err != nil {
return nil, err
......
......@@ -19,9 +19,14 @@ import (
"github.com/ethersphere/bee/pkg/storage/mock"
)
func TestFinderBasic(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic string) (feeds.Updater, error)) {
func TestFinderBasic(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) {
storer := mock.NewStorer()
topic := "testtopic"
topicStr := "testtopic"
topic, err := crypto.LegacyKeccak256([]byte(topicStr))
if err != nil {
t.Fatal(err)
}
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)
......@@ -68,7 +73,7 @@ func TestFinderBasic(t *testing.T, finderf func(storage.Getter, *feeds.Feed) fee
})
}
func TestFinderFixIntervals(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic string) (feeds.Updater, error)) {
func TestFinderFixIntervals(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) {
for _, tc := range []struct {
count int64
step int64
......@@ -81,7 +86,11 @@ func TestFinderFixIntervals(t *testing.T, finderf func(storage.Getter, *feeds.Fe
} {
t.Run(fmt.Sprintf("count=%d,step=%d,offset=%d", tc.count, tc.step, tc.offset), func(t *testing.T) {
storer := mock.NewStorer()
topic := "testtopic"
topicStr := "testtopic"
topic, err := crypto.LegacyKeccak256([]byte(topicStr))
if err != nil {
t.Fatal(err)
}
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)
......@@ -106,7 +115,7 @@ func TestFinderFixIntervals(t *testing.T, finderf func(storage.Getter, *feeds.Fe
step = tc.step / 4
}
for now := at; now < at+tc.step; now += step {
ch, err := finder.At(ctx, now, after)
ch, _, _, err := finder.At(ctx, now, after)
if err != nil {
t.Fatal(err)
}
......@@ -131,11 +140,15 @@ func TestFinderFixIntervals(t *testing.T, finderf func(storage.Getter, *feeds.Fe
}
}
func TestFinderRandomIntervals(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic string) (feeds.Updater, error)) {
func TestFinderRandomIntervals(t *testing.T, finderf func(storage.Getter, *feeds.Feed) feeds.Lookup, updaterf func(putter storage.Putter, signer crypto.Signer, topic []byte) (feeds.Updater, error)) {
for i := 0; i < 5; i++ {
t.Run(fmt.Sprintf("random intervals %d", i), func(t *testing.T) {
storer := mock.NewStorer()
topic := "testtopic"
topicStr := "testtopic"
topic, err := crypto.LegacyKeccak256([]byte(topicStr))
if err != nil {
t.Fatal(err)
}
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)
......@@ -161,7 +174,7 @@ func TestFinderRandomIntervals(t *testing.T, finderf func(storage.Getter, *feeds
diff := ats[j+1] - ats[j]
for at := ats[j]; at < ats[j+1]; at += int64(rand.Intn(int(diff)) + 1) {
for after := int64(0); after < at; after += int64(rand.Intn(int(at))) {
ch, err := finder.At(ctx, at, after)
ch, _, _, err := finder.At(ctx, at, after)
if err != nil {
t.Fatal(err)
}
......
......@@ -95,7 +95,6 @@ func (m *mantarayManifest) Lookup(ctx context.Context, path string) (Entry, erro
}
address := swarm.NewAddress(node.Entry())
entry := NewEntry(address, node.Metadata())
return entry, nil
......
......@@ -24,6 +24,7 @@ import (
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/feeds/factory"
"github.com/ethersphere/bee/pkg/hive"
"github.com/ethersphere/bee/pkg/kademlia"
"github.com/ethersphere/bee/pkg/localstore"
......@@ -427,7 +428,8 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
var apiService api.Service
if o.APIAddr != "" {
// API server
apiService = api.New(tagService, ns, multiResolver, pssService, traversalService, logger, tracer, api.Options{
feedFactory := factory.New(ns)
apiService = api.New(tagService, ns, multiResolver, pssService, traversalService, feedFactory, logger, tracer, api.Options{
CORSAllowedOrigins: o.CORSAllowedOrigins,
GatewayMode: o.GatewayMode,
WsPingPeriod: 60 * time.Second,
......
......@@ -68,7 +68,7 @@ func NewStorer(opts ...Option) *MockStorer {
return s
}
func (m *MockStorer) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (ch swarm.Chunk, err error) {
func (m *MockStorer) Get(_ context.Context, _ storage.ModeGet, addr swarm.Address) (ch swarm.Chunk, err error) {
m.mtx.Lock()
defer m.mtx.Unlock()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment