Commit 69f460c8 authored by mrekucci's avatar mrekucci Committed by GitHub

feat: add an API endpoint that checks if content is available (#2357)

parent 76aff41e
......@@ -86,6 +86,9 @@ jobs:
- name: Test manifest
id: manifest-1
run: beekeeper check --cluster-name local-dns --checks=ci-manifest
- name: Test content availability
id: content-availability
run: beekeeper check --cluster-name local-dns --checks=ci-content-availability
- name: Destroy the cluster
run: |
beekeeper delete bee-cluster --cluster-name local-dns
......@@ -177,6 +180,7 @@ jobs:
if ${{ steps.pushsync-chunks-1.outcome=='failure' }}; then FAILED=pushsync-chunks-1; fi
if ${{ steps.retrieval-1.outcome=='failure' }}; then FAILED=retrieval-1; fi
if ${{ steps.manifest-1.outcome=='failure' }}; then FAILED=manifest-1; fi
if ${{ steps.content-availability.outcome=='failure' }}; then FAILED=content-availability; fi
if ${{ steps.pingpong-2.outcome=='failure' }}; then FAILED=pingpong-2; fi
if ${{ steps.fullconnectivity-2.outcome=='failure' }}; then FAILED=fullconnectivity-2; fi
if ${{ steps.settlements-2.outcome=='failure' }}; then FAILED=settlements-2; fi
......
......@@ -43,7 +43,7 @@ deploylocal:
.PHONY: testlocal
testlocal:
export PATH=${PATH}:$$($(GO) env GOPATH)/bin
beekeeper check --cluster-name local --checks=ci-full-connectivity,ci-gc,ci-manifest,ci-pingpong,ci-pss,ci-pushsync-chunks,ci-retrieval,ci-settlements,ci-soc
beekeeper check --cluster-name local --checks=ci-full-connectivity,ci-gc,ci-manifest,ci-pingpong,ci-pss,ci-pushsync-chunks,ci-retrieval,ci-content-availability,ci-settlements,ci-soc
.PHONY: testlocal-all
all: beekeeper beelocal deploylocal testlocal
......
......@@ -867,6 +867,30 @@ paths:
description: Default response
"/stewardship/{reference}":
get:
summary: "Check if content is available"
tags:
- Stewardship
parameters:
- in: path
name: reference
schema:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmReference"
required: true
description: "Root hash of content (can be of any type: collection, file, chunk)"
responses:
"200":
description: Returns if the content is retrievable
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/IsRetrievableResponse"
"404":
$ref: "SwarmCommon.yaml#/components/responses/404"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response
put:
summary: "Reupload a root hash to the network"
tags:
......@@ -881,6 +905,8 @@ paths:
responses:
"200":
description: Ok
"404":
$ref: "SwarmCommon.yaml#/components/responses/404"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
......
......@@ -528,6 +528,12 @@ components:
type: string
pattern: "^(sequence|epoch)$"
IsRetrievableResponse:
type: object
properties:
isRetrievable:
type: boolean
headers:
SwarmTag:
description: "Tag UID"
......
......@@ -92,7 +92,7 @@ type server struct {
pss pss.Interface
traversal traversal.Traverser
pinning pinning.Interface
steward steward.Reuploader
steward steward.Interface
logger logging.Logger
tracer *tracing.Tracer
feedFactory feeds.Factory
......@@ -119,7 +119,7 @@ const (
)
// New will create a and initialize a new API service.
func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, pss pss.Interface, traversalService traversal.Traverser, pinning pinning.Interface, feedFactory feeds.Factory, post postage.Service, postageContract postagecontract.Interface, steward steward.Reuploader, signer crypto.Signer, logger logging.Logger, tracer *tracing.Tracer, o Options) Service {
func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, pss pss.Interface, traversalService traversal.Traverser, pinning pinning.Interface, feedFactory feeds.Factory, post postage.Service, postageContract postagecontract.Interface, steward steward.Interface, signer crypto.Signer, logger logging.Logger, tracer *tracing.Tracer, o Options) Service {
s := &server{
tags: tags,
storer: storer,
......
......@@ -70,7 +70,7 @@ type testServerOptions struct {
CORSAllowedOrigins []string
PostageContract postagecontract.Interface
Post postage.Service
Steward steward.Reuploader
Steward steward.Interface
WsHeaders http.Header
}
......
......@@ -20,6 +20,7 @@ type (
PostageCreateResponse = postageCreateResponse
PostageStampResponse = postageStampResponse
PostageStampsResponse = postageStampsResponse
IsRetrievableResponse = isRetrievableResponse
)
var (
......
......@@ -183,6 +183,10 @@ func (s *server) setupRouting() {
)
handle("/stewardship/{address}", jsonhttp.MethodHandler{
"GET": web.ChainHandlers(
s.gatewayModeForbidEndpointHandler,
web.FinalHandlerFunc(s.stewardshipGetHandler),
),
"PUT": web.ChainHandlers(
s.gatewayModeForbidEndpointHandler,
web.FinalHandlerFunc(s.stewardshipPutHandler),
......
......@@ -31,3 +31,29 @@ func (s *server) stewardshipPutHandler(w http.ResponseWriter, r *http.Request) {
}
jsonhttp.OK(w, nil)
}
type isRetrievableResponse struct {
IsRetrievable bool `json:"isRetrievable"`
}
// stewardshipGetHandler checks whether the content on the given address is retrievable.
func (s *server) stewardshipGetHandler(w http.ResponseWriter, r *http.Request) {
nameOrHex := mux.Vars(r)["address"]
address, err := s.resolveNameOrAddress(nameOrHex)
if err != nil {
s.logger.Debugf("stewardship get: parse address %s: %v", nameOrHex, err)
s.logger.Error("stewardship get: parse address")
jsonhttp.NotFound(w, nil)
return
}
res, err := s.steward.IsRetrievable(r.Context(), address)
if err != nil {
s.logger.Debugf("stewardship get: is retrievable %s: %v", address, err)
s.logger.Error("stewardship get: is retrievable")
jsonhttp.InternalServerError(w, nil)
return
}
jsonhttp.OK(w, isRetrievableResponse{
IsRetrievable: res,
})
}
......@@ -6,10 +6,12 @@ package api_test
import (
"context"
"encoding/hex"
"io/ioutil"
"net/http"
"testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/logging"
......@@ -19,7 +21,7 @@ import (
"github.com/ethersphere/bee/pkg/tags"
)
func TestStewardshipReUpload(t *testing.T) {
func TestStewardship(t *testing.T) {
var (
logger = logging.New(ioutil.Discard, 0)
mockStatestore = statestore.NewStateStore()
......@@ -33,15 +35,30 @@ func TestStewardshipReUpload(t *testing.T) {
Logger: logger,
Steward: m,
})
jsonhttptest.Request(t, client, http.MethodPut, "/v1/stewardship/"+addr.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
if !m.addr.Equal(addr) {
t.Fatalf("\nhave address: %q\nwant address: %q", m.addr.String(), addr.String())
}
t.Run("re-upload", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodPut, "/v1/stewardship/"+addr.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
if !m.addr.Equal(addr) {
t.Fatalf("\nhave address: %q\nwant address: %q", m.addr.String(), addr.String())
}
})
t.Run("is-retrievable", func(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodGet, "/v1/stewardship/"+addr.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.IsRetrievableResponse{IsRetrievable: true}),
)
jsonhttptest.Request(t, client, http.MethodGet, "/v1/stewardship/"+hex.EncodeToString([]byte{}), http.StatusNotFound,
jsonhttptest.WithExpectedJSONResponse(&jsonhttp.StatusResponse{
Code: http.StatusNotFound,
Message: http.StatusText(http.StatusNotFound),
}),
)
})
}
type mockSteward struct {
......@@ -52,3 +69,7 @@ func (m *mockSteward) Reupload(_ context.Context, addr swarm.Address) error {
m.addr = addr
return nil
}
func (m *mockSteward) IsRetrievable(_ context.Context, _ swarm.Address) (bool, error) {
return true, nil
}
......@@ -31,6 +31,7 @@ import (
"github.com/ethersphere/bee/pkg/topology/lightnode"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/bee/pkg/transaction"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/semaphore"
)
......@@ -61,6 +62,7 @@ type Service struct {
metricsRegistry *prometheus.Registry
lightNodes *lightnode.Container
blockTime *big.Int
traverser traversal.Traverser
// handler is changed in the Configure method
handler http.Handler
handlerMu sync.RWMutex
......@@ -97,7 +99,7 @@ func New(publicKey, pssPublicKey ecdsa.PublicKey, ethereumAddress common.Address
// Configure injects required dependencies and configuration parameters and
// constructs HTTP routes that depend on them. It is intended and safe to call
// this method only once.
func (s *Service) Configure(overlay swarm.Address, p2p p2p.DebugService, pingpong pingpong.Interface, topologyDriver topology.Driver, lightNodes *lightnode.Container, storer storage.Storer, tags *tags.Tags, accounting accounting.Interface, pseudosettle settlement.Interface, chequebookEnabled bool, swap swap.Interface, chequebook chequebook.Service, batchStore postage.Storer, post postage.Service, postageContract postagecontract.Interface) {
func (s *Service) Configure(overlay swarm.Address, p2p p2p.DebugService, pingpong pingpong.Interface, topologyDriver topology.Driver, lightNodes *lightnode.Container, storer storage.Storer, tags *tags.Tags, accounting accounting.Interface, pseudosettle settlement.Interface, chequebookEnabled bool, swap swap.Interface, chequebook chequebook.Service, batchStore postage.Storer, post postage.Service, postageContract postagecontract.Interface, traverser traversal.Traverser) {
s.p2p = p2p
s.pingpong = pingpong
s.topologyDriver = topologyDriver
......@@ -113,6 +115,7 @@ func (s *Service) Configure(overlay swarm.Address, p2p p2p.DebugService, pingpon
s.overlay = &overlay
s.post = post
s.postageContract = postageContract
s.traverser = traverser
s.setRouter(s.newRouter())
}
......
......@@ -37,6 +37,7 @@ import (
"github.com/ethersphere/bee/pkg/topology/lightnode"
topologymock "github.com/ethersphere/bee/pkg/topology/mock"
transactionmock "github.com/ethersphere/bee/pkg/transaction/mock"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/multiformats/go-multiaddr"
"resenje.org/web"
)
......@@ -72,6 +73,7 @@ type testServerOptions struct {
TransactionOpts []transactionmock.Option
PostageContract postagecontract.Interface
Post postage.Service
Traverser traversal.Traverser
}
type testServer struct {
......@@ -88,7 +90,7 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer {
transaction := transactionmock.New(o.TransactionOpts...)
ln := lightnode.NewContainer(o.Overlay)
s := debugapi.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, logging.New(ioutil.Discard, 0), nil, o.CORSAllowedOrigins, big.NewInt(2), transaction)
s.Configure(o.Overlay, o.P2P, o.Pingpong, topologyDriver, ln, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook, o.BatchStore, o.Post, o.PostageContract)
s.Configure(o.Overlay, o.P2P, o.Pingpong, topologyDriver, ln, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook, o.BatchStore, o.Post, o.PostageContract, o.Traverser)
ts := httptest.NewServer(s)
t.Cleanup(ts.Close)
......@@ -186,7 +188,7 @@ func TestServer_Configure(t *testing.T) {
}),
)
s.Configure(o.Overlay, o.P2P, o.Pingpong, topologyDriver, ln, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook, nil, mockpost.New(), nil)
s.Configure(o.Overlay, o.P2P, o.Pingpong, topologyDriver, ln, o.Storer, o.Tags, acc, settlement, true, swapserv, chequebook, nil, mockpost.New(), nil, nil)
testBasicRouter(t, client)
jsonhttptest.Request(t, client, http.MethodGet, "/readiness", http.StatusOK,
......
......@@ -322,7 +322,7 @@ func NewDevBee(logger logging.Logger, o *DevOptions) (b *DevBee, err error) {
)
// inject dependencies and configure full debug api http path routes
debugAPIService.Configure(swarmAddress, p2ps, pingPong, kad, lightNodes, storer, tagService, acc, pseudoset, true, mockSwap, mockChequebook, batchStore, post, postageContract)
debugAPIService.Configure(swarmAddress, p2ps, pingPong, kad, lightNodes, storer, tagService, acc, pseudoset, true, mockSwap, mockChequebook, batchStore, post, postageContract, traversalService)
}
return b, nil
......
......@@ -703,7 +703,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
if o.APIAddr != "" {
// API server
feedFactory := factory.New(ns)
steward := steward.New(storer, traversalService, pushSyncProtocol)
steward := steward.New(storer, traversalService, retrieve, pushSyncProtocol)
apiService = api.New(tagService, ns, multiResolver, pssService, traversalService, pinningService, feedFactory, post, postageContractService, steward, signer, logger, tracer, api.Options{
CORSAllowedOrigins: o.CORSAllowedOrigins,
GatewayMode: o.GatewayMode,
......@@ -782,7 +782,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
}
// inject dependencies and configure full debug api http path routes
debugAPIService.Configure(swarmAddress, p2ps, pingPong, kad, lightNodes, storer, tagService, acc, pseudosettleService, o.SwapEnable, swapService, chequebookService, batchStore, post, postageContractService)
debugAPIService.Configure(swarmAddress, p2ps, pingPong, kad, lightNodes, storer, tagService, acc, pseudosettleService, o.SwapEnable, swapService, chequebookService, batchStore, post, postageContractService, traversalService)
}
if err := kad.Start(p2pCtx); err != nil {
......
......@@ -12,6 +12,7 @@ import (
"fmt"
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/retrieval"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
......@@ -22,20 +23,30 @@ import (
// how many parallel push operations
const parallelPush = 5
type Reuploader interface {
type Interface interface {
// Reupload root hash and all of its underlying
// associated chunks to the network.
Reupload(context.Context, swarm.Address) error
// IsRetrievable checks whether the content
// on the given address is retrievable.
IsRetrievable(context.Context, swarm.Address) (bool, error)
}
type steward struct {
getter storage.Getter
push pushsync.PushSyncer
traverser traversal.Traverser
getter storage.Getter
push pushsync.PushSyncer
traverser traversal.Traverser
netTraverser traversal.Traverser
}
func New(getter storage.Getter, t traversal.Traverser, p pushsync.PushSyncer) Reuploader {
return &steward{getter: getter, push: p, traverser: t}
func New(getter storage.Getter, t traversal.Traverser, r retrieval.Interface, p pushsync.PushSyncer) Interface {
return &steward{
getter: getter,
push: p,
traverser: t,
netTraverser: traversal.New(&netGetter{r}),
}
}
// Reupload content with the given root hash to the network.
......@@ -76,3 +87,32 @@ func (s *steward) Reupload(ctx context.Context, root swarm.Address) error {
}
return nil
}
// IsRetrievable implements Interface.IsRetrievable method.
func (s *steward) IsRetrievable(ctx context.Context, root swarm.Address) (bool, error) {
noop := func(leaf swarm.Address) error { return nil }
switch err := s.netTraverser.Traverse(ctx, root, noop); {
case errors.Is(err, storage.ErrNotFound):
return false, nil
case err != nil:
return false, fmt.Errorf("traversal of %q failed: %w", root, err)
default:
return true, nil
}
}
// netGetter implements the storage Getter.Get method in a way
// that it will try to retrieve the chunk only from the network.
type netGetter struct {
retrieval retrieval.Interface
}
// Get implements the storage Getter.Get interface.
func (ng *netGetter) Get(ctx context.Context, _ storage.ModeGet, addr swarm.Address) (swarm.Chunk, error) {
return ng.retrieval.RetrieveChunk(ctx, addr, true)
}
// Put implements the storage Putter.Put interface.
func (ng *netGetter) Put(_ context.Context, _ storage.ModePut, _ ...swarm.Chunk) ([]bool, error) {
return nil, errors.New("operation is not supported")
}
......@@ -29,6 +29,7 @@ func TestSteward(t *testing.T) {
data = make([]byte, chunks*4096) //1k chunks
store = mock.NewStorer()
traverser = traversal.New(store)
loggingStorer = &loggingStore{Storer: store}
traversedAddrs = make(map[string]struct{})
mu sync.Mutex
fn = func(_ context.Context, ch swarm.Chunk) (*pushsync.Receipt, error) {
......@@ -38,7 +39,7 @@ func TestSteward(t *testing.T) {
return nil, nil
}
ps = psmock.New(fn)
s = steward.New(store, traverser, ps)
s = steward.New(store, traverser, loggingStorer, ps)
)
n, err := rand.Read(data)
if n != cap(data) {
......@@ -48,8 +49,7 @@ func TestSteward(t *testing.T) {
t.Fatal(err)
}
l := &loggingStore{Storer: store}
pipe := builder.NewPipelineBuilder(ctx, l, storage.ModePutUpload, false)
pipe := builder.NewPipelineBuilder(ctx, loggingStorer, storage.ModePutUpload, false)
addr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data))
if err != nil {
t.Fatal(err)
......@@ -62,8 +62,16 @@ func TestSteward(t *testing.T) {
mu.Lock()
defer mu.Unlock()
isRetrievable, err := s.IsRetrievable(ctx, addr)
if err != nil {
t.Fatal(err)
}
if !isRetrievable {
t.Fatalf("re-uploaded content on %q should be retrievable", addr)
}
// check that everything that was stored is also traversed
for _, a := range l.addrs {
for _, a := range loggingStorer.addrs {
if _, ok := traversedAddrs[a.String()]; !ok {
t.Fatalf("expected address %s to be traversed", a.String())
}
......@@ -72,16 +80,17 @@ func TestSteward(t *testing.T) {
func TestSteward_ErrWantSelf(t *testing.T) {
var (
ctx = context.Background()
chunks = 10
data = make([]byte, chunks*4096)
store = mock.NewStorer()
traverser = traversal.New(store)
fn = func(_ context.Context, ch swarm.Chunk) (*pushsync.Receipt, error) {
ctx = context.Background()
chunks = 10
data = make([]byte, chunks*4096)
store = mock.NewStorer()
traverser = traversal.New(store)
loggingStorer = &loggingStore{Storer: store}
fn = func(_ context.Context, ch swarm.Chunk) (*pushsync.Receipt, error) {
return nil, topology.ErrWantSelf
}
ps = psmock.New(fn)
s = steward.New(store, traverser, ps)
s = steward.New(store, traverser, loggingStorer, ps)
)
n, err := rand.Read(data)
if n != cap(data) {
......@@ -91,8 +100,7 @@ func TestSteward_ErrWantSelf(t *testing.T) {
t.Fatal(err)
}
l := &loggingStore{Storer: store}
pipe := builder.NewPipelineBuilder(ctx, l, storage.ModePutUpload, false)
pipe := builder.NewPipelineBuilder(ctx, loggingStorer, storage.ModePutUpload, false)
addr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data))
if err != nil {
t.Fatal(err)
......@@ -109,9 +117,13 @@ type loggingStore struct {
addrs []swarm.Address
}
func (l *loggingStore) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
func (ls *loggingStore) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
for _, c := range chs {
l.addrs = append(l.addrs, c.Address())
ls.addrs = append(ls.addrs, c.Address())
}
return l.Storer.Put(ctx, mode, chs...)
return ls.Storer.Put(ctx, mode, chs...)
}
func (ls *loggingStore) RetrieveChunk(ctx context.Context, addr swarm.Address, _ bool) (chunk swarm.Chunk, err error) {
return ls.Get(ctx, storage.ModeGetRequest, addr)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment