Commit 5b58b937 authored by Zahoor Mohamed's avatar Zahoor Mohamed Committed by GitHub

Local pinning of chunk (#187)

* Local pinning support for chunks

* Fixed review comments

* Fixed some more review comments
parent aa05e79d
......@@ -10,6 +10,7 @@ import (
"io"
"io/ioutil"
"net/http"
"strings"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
......@@ -17,6 +18,9 @@ import (
"github.com/gorilla/mux"
)
// Presence of this header in the HTTP request indicates the chunk needs to be pinned.
const PinHeaderName = "x-swarm-pin"
func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
addr := mux.Vars(r)["addr"]
ctx := r.Context()
......@@ -46,6 +50,18 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}
// Check if this chunk needs to pinned and pin it
pinHeaderValues := r.Header.Get(PinHeaderName)
if pinHeaderValues != "" && strings.ToLower(pinHeaderValues) == "true" {
err = s.Storer.Set(ctx, storage.ModeSetPin, address)
if err != nil {
s.Logger.Debugf("bzz-chunk: chunk pinning error: %v, addr %s", err, address)
s.Logger.Error("bzz-chunk: chunk pinning error")
jsonhttp.InternalServerError(w, "cannot pin chunk")
return
}
}
jsonhttp.OK(w, nil)
}
......
......@@ -11,8 +11,10 @@ import (
"net/http"
"testing"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/storage/mock/validator"
"github.com/ethersphere/bee/pkg/swarm"
......@@ -21,6 +23,7 @@ import (
// TestChunkUploadDownload uploads a chunk to an API that verifies the chunk according
// to a given validator, then tries to download the uploaded data.
func TestChunkUploadDownload(t *testing.T) {
var (
resource = func(addr swarm.Address) string { return "/bzz-chunk/" + addr.String() }
validHash = swarm.MustParseHexAddress("aabbcc")
......@@ -72,6 +75,46 @@ func TestChunkUploadDownload(t *testing.T) {
t.Fatal("data retrieved doesnt match uploaded content")
}
})
t.Run("pin-invalid-value", func(t *testing.T) {
headers := make(map[string][]string)
headers[api.PinHeaderName] = []string{"hdgdh"}
jsonhttptest.ResponseDirectWithHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, headers)
// Also check if the chunk is NOT pinned
if mockValidatingStorer.GetModeSet(validHash) == storage.ModeSetPin {
t.Fatal("chunk should not be pinned")
}
})
t.Run("pin-header-missing", func(t *testing.T) {
headers := make(map[string][]string)
jsonhttptest.ResponseDirectWithHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, headers)
// Also check if the chunk is NOT pinned
if mockValidatingStorer.GetModeSet(validHash) == storage.ModeSetPin {
t.Fatal("chunk should not be pinned")
}
})
t.Run("pin-ok", func(t *testing.T) {
headers := make(map[string][]string)
headers[api.PinHeaderName] = []string{"True"}
jsonhttptest.ResponseDirectWithHeaders(t, client, http.MethodPost, resource(validHash), bytes.NewReader(validContent), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}, headers)
// Also check if the chunk is pinned
if mockValidatingStorer.GetModeSet(validHash) != storage.ModeSetPin {
t.Fatal("chunk is not pinned")
}
})
}
func request(t *testing.T, client *http.Client, method string, resource string, body io.Reader, responseCode int) *http.Response {
......
......@@ -12,6 +12,7 @@ import (
"testing"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
......@@ -40,14 +41,14 @@ type testServer struct {
func newTestServer(t *testing.T, o testServerOptions) *testServer {
statestore := mockstore.NewStateStore()
addressbook := addressbook.New(statestore)
addrbook := addressbook.New(statestore)
topologyDriver := mock.NewTopologyDriver(o.TopologyOpts...)
s := debugapi.New(debugapi.Options{
Overlay: o.Overlay,
P2P: o.P2P,
Logger: logging.New(ioutil.Discard, 0),
Addressbook: addressbook,
Addressbook: addrbook,
TopologyDriver: topologyDriver,
Storer: o.Storer,
})
......@@ -66,12 +67,33 @@ func newTestServer(t *testing.T, o testServerOptions) *testServer {
}
return &testServer{
Client: client,
Addressbook: addressbook,
Addressbook: addrbook,
TopologyDriver: topologyDriver,
Cleanup: cleanup,
}
}
func newBZZTestServer(t *testing.T, o testServerOptions) (client *http.Client, cleanup func()) {
s := api.New(api.Options{
Storer: o.Storer,
Logger: logging.New(ioutil.Discard, 0),
})
ts := httptest.NewServer(s)
cleanup = ts.Close
client = &http.Client{
Transport: web.RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
u, err := url.Parse(ts.URL + r.URL.String())
if err != nil {
return nil, err
}
r.URL = u
return ts.Client().Transport.RoundTrip(r)
}),
}
return client, cleanup
}
func mustMultiaddr(t *testing.T, s string) multiaddr.Multiaddr {
t.Helper()
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package debugapi
import (
"context"
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/gorilla/mux"
)
// pinChunk pin's the already created chunk given its address.
// it fails if the chunk is not present in the local store.
// It also increments a pin counter to keep track of how many pin requests
// are originating for this chunk.
func (s *server) pinChunk(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.Logger.Debugf("debug api: pin chunk: parse chunk address: %v", err)
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.Storer.Has(r.Context(), addr)
if err != nil {
s.Logger.Debugf("debug api: pin chunk: localstore has: %v", err)
jsonhttp.BadRequest(w, err)
return
}
if !has {
jsonhttp.NotFound(w, nil)
return
}
err = s.Storer.Set(r.Context(), storage.ModeSetPin, addr)
if err != nil {
s.Logger.Debugf("debug-api: pin chunk: pinning error: %v, addr %s", err, addr)
jsonhttp.InternalServerError(w, "cannot pin chunk")
return
}
jsonhttp.OK(w, nil)
}
// unpinChunk unpin's an already pinned chunk. If the chunk is not present or the
// if the pin counter is zero, it raises error.
func (s *server) unpinChunk(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil {
s.Logger.Debugf("debug api: pin chunk: parse chunk ddress: %v", err)
jsonhttp.BadRequest(w, "bad address")
return
}
has, err := s.Storer.Has(r.Context(), addr)
if err != nil {
s.Logger.Debugf("debug api: pin chunk: localstore has: %v", err)
jsonhttp.BadRequest(w, err)
return
}
if !has {
jsonhttp.NotFound(w, nil)
return
}
_, err = s.Storer.PinInfo(addr)
if err != nil {
s.Logger.Debugf("debug api: pin chunk: not pinned: %v", err)
jsonhttp.BadRequest(w, "chunk is not yet pinned")
return
}
err = s.Storer.Set(r.Context(), storage.ModeSetUnpin, addr)
if err != nil {
s.Logger.Debugf("debug-api: pin chunk: unpinning error: %v, addr %s", err, addr)
jsonhttp.InternalServerError(w, "cannot unpin chunk")
return
}
jsonhttp.OK(w, nil)
}
// listPinnedChunks lists all the chunk address and pin counters that are currently pinned.
func (s *server) listPinnedChunks(w http.ResponseWriter, r *http.Request) {
pinnedChunks, err := s.Storer.PinnedChunks(context.Background(), swarm.NewAddress(nil))
if err != nil {
s.Logger.Debugf("debug-api: pin chunk: listing pinned chunks error: %v", err)
jsonhttp.InternalServerError(w, err)
return
}
jsonhttp.OK(w, pinnedChunks)
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package debugapi_test
import (
"bytes"
"net/http"
"testing"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/storage/mock/validator"
"github.com/ethersphere/bee/pkg/swarm"
)
// TestPinChunkHandler checks for pinning, unpinning and listing of chunks.
// It also check other edgw cases like chunk not present and checking for pinning,
// invalid chunk address case etc. This test case has to be run in sequence and
// it assumes some state of the DB before another case is run.
func TestPinChunkHandler(t *testing.T) {
resource := func(addr swarm.Address) string { return "/bzz-chunk/" + addr.String() }
hash := swarm.MustParseHexAddress("aabbcc")
data := []byte("bbaatt")
mockValidator := validator.NewMockValidator(hash, data)
mockValidatingStorer := mock.NewValidatingStorer(mockValidator)
debugTestServer := newTestServer(t, testServerOptions{
Storer: mockValidatingStorer,
})
// This server is used to store chunks
bzzTestServer, cleanup := newBZZTestServer(t, testServerOptions{
Storer: mockValidatingStorer,
})
defer debugTestServer.Cleanup()
defer cleanup()
// bad chunk address
t.Run("pin-bad-address", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, debugTestServer.Client, http.MethodPost, "/chunks-pin/abcd1100zz", nil, http.StatusBadRequest, jsonhttp.StatusResponse{
Message: "bad address",
Code: http.StatusBadRequest,
})
})
// pin a chunk which is not existing
t.Run("pin-absent-chunk", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, debugTestServer.Client, http.MethodPost, "/chunks-pin/123456", nil, http.StatusNotFound, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusNotFound),
Code: http.StatusNotFound,
})
})
// unpin on a chunk which is not pinned
t.Run("unpin-while-not-pinned", func(t *testing.T) {
// Post a chunk
jsonhttptest.ResponseDirect(t, bzzTestServer, http.MethodPost, resource(hash), bytes.NewReader(data), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
})
jsonhttptest.ResponseDirect(t, debugTestServer.Client, http.MethodDelete, "/chunks-pin/"+hash.String(), nil, http.StatusBadRequest, jsonhttp.StatusResponse{
Message: "chunk is not yet pinned",
Code: http.StatusBadRequest,
})
})
// pin a existing chunk first time
t.Run("pin-chunk-1", func(t *testing.T) {
// Post a chunk
jsonhttptest.ResponseDirect(t, bzzTestServer, http.MethodPost, resource(hash), bytes.NewReader(data), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
})
jsonhttptest.ResponseDirect(t, debugTestServer.Client, http.MethodPost, "/chunks-pin/"+hash.String(), nil, http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
})
// Check is the chunk is pinned once
jsonhttptest.ResponseDirectWithJson(t, debugTestServer.Client, http.MethodGet, "/chunks-pin/"+hash.String(), nil, http.StatusOK, jsonhttp.StatusResponse{
Message: `{"Address":"aabbcc","PinCounter":1}`,
Code: http.StatusOK,
})
})
// pin a existing chunk second time
t.Run("pin-chunk-2", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, debugTestServer.Client, http.MethodPost, "/chunks-pin/"+hash.String(), nil, http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
})
// Check is the chunk is pinned twice
jsonhttptest.ResponseDirectWithJson(t, debugTestServer.Client, http.MethodGet, "/chunks-pin/"+hash.String(), nil, http.StatusOK, jsonhttp.StatusResponse{
Message: `{"Address":"aabbcc","PinCounter":2}`,
Code: http.StatusOK,
})
})
// unpin a chunk first time
t.Run("unpin-chunk-1", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, debugTestServer.Client, http.MethodDelete, "/chunks-pin/"+hash.String(), nil, http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
})
// Check is the chunk is pinned once
jsonhttptest.ResponseDirectWithJson(t, debugTestServer.Client, http.MethodGet, "/chunks-pin/"+hash.String(), nil, http.StatusOK, jsonhttp.StatusResponse{
Message: `{"Address":"aabbcc","PinCounter":1}`,
Code: http.StatusOK,
})
})
// unpin a chunk second time
t.Run("unpin-chunk-2", func(t *testing.T) {
jsonhttptest.ResponseDirect(t, debugTestServer.Client, http.MethodDelete, "/chunks-pin/"+hash.String(), nil, http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
})
// Check if the chunk is removed from the pinIndex
jsonhttptest.ResponseDirect(t, debugTestServer.Client, http.MethodGet, "/chunks-pin/"+hash.String(), nil, http.StatusInternalServerError, jsonhttp.StatusResponse{
Message: "pin chunks: leveldb: not found",
Code: http.StatusInternalServerError,
})
})
// Add 2 chunks, pin it and check if they show up in the list
t.Run("list-chunks", func(t *testing.T) {
// Post a chunk
jsonhttptest.ResponseDirect(t, bzzTestServer, http.MethodPost, resource(hash), bytes.NewReader(data), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
})
jsonhttptest.ResponseDirect(t, debugTestServer.Client, http.MethodPost, "/chunks-pin/"+hash.String(), nil, http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
})
// post another chunk
hash2 := swarm.MustParseHexAddress("ddeeff")
data2 := []byte("eagle")
mockValidator.AddPair(hash2, data2)
jsonhttptest.ResponseDirect(t, bzzTestServer, http.MethodPost, resource(hash2), bytes.NewReader(data2), http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
})
jsonhttptest.ResponseDirect(t, debugTestServer.Client, http.MethodPost, "/chunks-pin/"+hash2.String(), nil, http.StatusOK, jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
})
jsonhttptest.ResponseDirectWithJson(t, debugTestServer.Client, http.MethodGet, "/chunks-pin", nil, http.StatusOK, jsonhttp.StatusResponse{
Message: `{"Address":"aabbcc","PinCounter":1},{"Address":"ddeeff","PinCounter":1}`,
Code: http.StatusOK,
})
})
}
......@@ -56,6 +56,14 @@ func (s *server) setupRouting() {
router.Handle("/chunks/{address}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.hasChunkHandler),
})
router.Handle("/chunks-pin/{address}", jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.pinChunk),
"DELETE": http.HandlerFunc(s.unpinChunk),
"GET": http.HandlerFunc(s.listPinnedChunks),
})
router.Handle("/chunks-pin", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.listPinnedChunks),
})
baseRouter.Handle("/", web.ChainHandlers(
logging.NewHTTPAccessLogHandler(s.Logger, logrus.InfoLevel, "debug api access"),
......
......@@ -11,12 +11,66 @@ import (
"io/ioutil"
"net/http"
"testing"
"github.com/ethersphere/bee/pkg/jsonhttp"
)
func ResponseDirect(t *testing.T, client *http.Client, method, url string, body io.Reader, responseCode int, response interface{}) {
t.Helper()
resp := request(t, client, method, url, body, responseCode)
resp := request(t, client, method, url, body, responseCode, nil)
defer resp.Body.Close()
got, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
got = bytes.TrimSpace(got)
want, err := json.Marshal(response)
if err != nil {
t.Error(err)
}
if !bytes.Equal(got, want) {
t.Errorf("got response %s, want %s", string(got), string(want))
}
}
// ResponseDirectWithJson checks for responses in json format. It is useful in cases where the response is json.
func ResponseDirectWithJson(t *testing.T, client *http.Client, method, url string, body io.Reader, responseCode int, response interface{}) {
t.Helper()
resp := request(t, client, method, url, body, responseCode, nil)
defer resp.Body.Close()
got, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
got = bytes.TrimSpace(got)
want, err := json.Marshal(response)
if err != nil {
t.Error(err)
}
var wantJson jsonhttp.StatusResponse
err = json.Unmarshal(want, &wantJson)
if err != nil {
t.Error(err)
}
wantString := "[" + wantJson.Message + "]"
if wantString != string(got) {
t.Errorf("got response %s, want %s", string(got), wantString)
}
}
func ResponseDirectWithHeaders(t *testing.T, client *http.Client, method, url string, body io.Reader, responseCode int,
response interface{}, headers http.Header) {
t.Helper()
resp := request(t, client, method, url, body, responseCode, headers)
defer resp.Body.Close()
got, err := ioutil.ReadAll(resp.Body)
......@@ -38,7 +92,7 @@ func ResponseDirect(t *testing.T, client *http.Client, method, url string, body
func ResponseUnmarshal(t *testing.T, client *http.Client, method, url string, body io.Reader, responseCode int, response interface{}) {
t.Helper()
resp := request(t, client, method, url, body, responseCode)
resp := request(t, client, method, url, body, responseCode, nil)
defer resp.Body.Close()
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
......@@ -46,13 +100,14 @@ func ResponseUnmarshal(t *testing.T, client *http.Client, method, url string, bo
}
}
func request(t *testing.T, client *http.Client, method, url string, body io.Reader, responseCode int) *http.Response {
func request(t *testing.T, client *http.Client, method, url string, body io.Reader, responseCode int, headers http.Header) *http.Response {
t.Helper()
req, err := http.NewRequest(method, url, body)
if err != nil {
t.Fatal(err)
}
req.Header = headers
resp, err := client.Do(req)
if err != nil {
t.Fatal(err)
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package localstore
import (
"bytes"
"context"
"fmt"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
const (
maxChunksToDisplay = 20 // no of items to display per request
)
// PinnedChunks for now returns the first few pinned chunks to display along with their pin counter.
// TODO: have pagination and prefix filter
func (db *DB) PinnedChunks(ctx context.Context, cursor swarm.Address) (pinnedChunks []*storage.Pinner, err error) {
count := 0
var prefix []byte
if bytes.Equal(cursor.Bytes(), []byte{0}) {
prefix = nil
}
it, err := db.pinIndex.First(prefix)
if err != nil {
return nil, fmt.Errorf("pin chunks: %w", err)
}
err = db.pinIndex.Iterate(func(item shed.Item) (stop bool, err error) {
pinnedChunks = append(pinnedChunks,
&storage.Pinner{
Address: swarm.NewAddress(item.Address),
PinCounter: item.PinCounter,
})
count++
if count >= maxChunksToDisplay {
return true, nil
} else {
return false, nil
}
}, &shed.IterateOptions{
StartFrom: &it,
SkipStartFromItem: false,
})
return pinnedChunks, err
}
// Pinner returns the pin counter given a swarm address, provided that the
// address has to be pinned already.
func (db *DB) PinInfo(address swarm.Address) (uint64, error) {
it := shed.Item{
Address: address.Bytes(),
}
out, err := db.pinIndex.Get(it)
if err != nil {
return 0, err
}
return out.PinCounter, nil
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package localstore
import (
"context"
"sort"
"testing"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)
func TestPinning(t *testing.T) {
chunks := generateTestRandomChunks(21)
// SOrt the addresses
var addresses []string
for _, c := range chunks {
addresses = append(addresses, c.Address().String())
}
sort.Strings(addresses)
t.Run("empty-db", func(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
// Nothing should be there in the pinned DB
_, err := db.PinnedChunks(context.Background(), swarm.NewAddress([]byte{0}))
if err != nil {
if err.Error() != "pin chunks: leveldb: not found" {
t.Fatal(err)
}
}
})
t.Run("get-pinned-chunks", func(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
err := db.Set(context.Background(), storage.ModeSetPin, chunkAddresses(chunks)...)
if err != nil {
t.Fatal(err)
}
pinnedChunks, err := db.PinnedChunks(context.Background(), swarm.NewAddress([]byte{0}))
if err != nil {
t.Fatal(err)
}
if pinnedChunks == nil || len(pinnedChunks) != maxChunksToDisplay {
t.Fatal(err)
}
// Check if they are sorted
for i, addr := range pinnedChunks {
if addresses[i] != addr.Address.String() {
t.Fatal("error in getting sorted address")
}
}
})
}
func TestPinInfo(t *testing.T) {
chunk := generateTestRandomChunk()
t.Run("get-pinned-chunks", func(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
// pin once
err := db.Set(context.Background(), storage.ModeSetPin, swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
pinCounter, err := db.PinInfo(swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
if pinCounter != 1 {
t.Fatal(err)
}
// pin twice
err = db.Set(context.Background(), storage.ModeSetPin, swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
pinCounter, err = db.PinInfo(swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
if pinCounter != 2 {
t.Fatal(err)
}
})
t.Run("get-unpinned-chunks", func(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
// pin once
err := db.Set(context.Background(), storage.ModeSetPin, swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
pinCounter, err := db.PinInfo(swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
if pinCounter != 1 {
t.Fatal(err)
}
// unpin and see if it doesn't exists
err = db.Set(context.Background(), storage.ModeSetUnpin, swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
_, err = db.PinInfo(swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
if err != leveldb.ErrNotFound {
t.Fatal(err)
}
}
})
}
......@@ -6,29 +6,44 @@ package mock
import (
"context"
"errors"
"sync"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
type mockStorer struct {
var _ storage.Storer = (*MockStorer)(nil)
type MockStorer struct {
store map[string][]byte
modeSet map[string]storage.ModeSet
modeSetMu sync.Mutex
pinnedAddress []swarm.Address // Stores the pinned address
pinnedCounter []uint64 // and its respective counter. These are stored as slices to preserve the order.
pinSetMu sync.Mutex
validator swarm.ChunkValidator
}
func NewStorer() storage.Storer {
return &mockStorer{
return &MockStorer{
store: make(map[string][]byte),
modeSet: make(map[string]storage.ModeSet),
modeSetMu: sync.Mutex{},
}
}
func NewValidatingStorer(v swarm.ChunkValidator) storage.Storer {
return &mockStorer{
func NewValidatingStorer(v swarm.ChunkValidator) *MockStorer {
return &MockStorer{
store: make(map[string][]byte),
modeSet: make(map[string]storage.ModeSet),
modeSetMu: sync.Mutex{},
pinSetMu: sync.Mutex{},
validator: v,
}
}
func (m *mockStorer) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (ch swarm.Chunk, err error) {
func (m *MockStorer) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (ch swarm.Chunk, err error) {
v, has := m.store[addr.String()]
if !has {
return nil, storage.ErrNotFound
......@@ -36,7 +51,7 @@ func (m *mockStorer) Get(ctx context.Context, mode storage.ModeGet, addr swarm.A
return swarm.NewChunk(addr, v), nil
}
func (m *mockStorer) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
func (m *MockStorer) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
for _, ch := range chs {
if m.validator != nil {
if !m.validator.Validate(ch) {
......@@ -48,35 +63,112 @@ func (m *mockStorer) Put(ctx context.Context, mode storage.ModePut, chs ...swarm
return nil, nil
}
func (m *mockStorer) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm.Address) (ch []swarm.Chunk, err error) {
func (m *MockStorer) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm.Address) (ch []swarm.Chunk, err error) {
panic("not implemented") // TODO: Implement
}
func (m *mockStorer) Has(ctx context.Context, addr swarm.Address) (yes bool, err error) {
func (m *MockStorer) Has(ctx context.Context, addr swarm.Address) (yes bool, err error) {
_, has := m.store[addr.String()]
return has, nil
}
func (m *mockStorer) HasMulti(ctx context.Context, addrs ...swarm.Address) (yes []bool, err error) {
func (m *MockStorer) HasMulti(ctx context.Context, addrs ...swarm.Address) (yes []bool, err error) {
panic("not implemented") // TODO: Implement
}
func (m *mockStorer) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) (err error) {
panic("not implemented") // TODO: Implement
func (m *MockStorer) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) (err error) {
m.modeSetMu.Lock()
m.pinSetMu.Lock()
defer m.modeSetMu.Unlock()
defer m.pinSetMu.Unlock()
for _, addr := range addrs {
m.modeSet[addr.String()] = mode
// if mode is set pin, increment the pin counter
if mode == storage.ModeSetPin {
var found bool
for i, ad := range m.pinnedAddress {
if addr.String() == ad.String() {
m.pinnedCounter[i] = m.pinnedCounter[i] + 1
found = true
}
}
if !found {
m.pinnedAddress = append(m.pinnedAddress, addr)
m.pinnedCounter = append(m.pinnedCounter, uint64(1))
}
}
// if mode is set unpin, decrement the pin counter and remove the address
// once it reaches zero
if mode == storage.ModeSetUnpin {
for i, ad := range m.pinnedAddress {
if addr.String() == ad.String() {
m.pinnedCounter[i] = m.pinnedCounter[i] - 1
if m.pinnedCounter[i] == 0 {
copy(m.pinnedAddress[i:], m.pinnedAddress[i+1:])
m.pinnedAddress[len(m.pinnedAddress)-1] = swarm.NewAddress([]byte{0})
m.pinnedAddress = m.pinnedAddress[:len(m.pinnedAddress)-1]
copy(m.pinnedCounter[i:], m.pinnedCounter[i+1:])
m.pinnedCounter[len(m.pinnedCounter)-1] = uint64(0)
m.pinnedCounter = m.pinnedCounter[:len(m.pinnedCounter)-1]
}
}
}
}
}
return nil
}
func (m *MockStorer) GetModeSet(addr swarm.Address) (mode storage.ModeSet) {
m.modeSetMu.Lock()
defer m.modeSetMu.Unlock()
if mode, ok := m.modeSet[addr.String()]; ok {
return mode
}
return mode
}
func (m *mockStorer) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
func (m *MockStorer) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
panic("not implemented") // TODO: Implement
}
func (m *mockStorer) SubscribePull(ctx context.Context, bin uint8, since uint64, until uint64) (c <-chan storage.Descriptor, stop func()) {
func (m *MockStorer) SubscribePull(ctx context.Context, bin uint8, since uint64, until uint64) (c <-chan storage.Descriptor, stop func()) {
panic("not implemented") // TODO: Implement
}
func (m *mockStorer) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop func()) {
func (m *MockStorer) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop func()) {
panic("not implemented") // TODO: Implement
}
func (m *mockStorer) Close() error {
func (m *MockStorer) PinnedChunks(ctx context.Context, cursor swarm.Address) (pinnedChunks []*storage.Pinner, err error) {
m.pinSetMu.Lock()
defer m.pinSetMu.Unlock()
for i, addr := range m.pinnedAddress {
pi := &storage.Pinner{
Address: swarm.NewAddress(addr.Bytes()),
PinCounter: m.pinnedCounter[i],
}
pinnedChunks = append(pinnedChunks, pi)
}
if pinnedChunks == nil {
return pinnedChunks, errors.New("pin chunks: leveldb: not found")
}
return pinnedChunks, nil
}
func (m *MockStorer) PinInfo(address swarm.Address) (uint64, error) {
m.pinSetMu.Lock()
defer m.pinSetMu.Unlock()
for i, addr := range m.pinnedAddress {
if addr.String() == address.String() {
return m.pinnedCounter[i], nil
}
}
return 0, errors.New("could not find address")
}
func (m *MockStorer) Close() error {
panic("not implemented") // TODO: Implement
}
......@@ -13,26 +13,29 @@ import (
// MockValidator returns true if the data and address passed in the Validate method
// are a byte-wise match to the data and address passed to the constructor
type MockValidator struct {
validAddress swarm.Address
validContent []byte
swarm.ChunkValidator
addressDataPair map[string][]byte // Make validator accept more than one address/data pair
}
// NewMockValidator constructs a new MockValidator
func NewMockValidator(address swarm.Address, data []byte) *MockValidator {
return &MockValidator{
validAddress: address,
validContent: data,
mp := &MockValidator{
addressDataPair: make(map[string][]byte),
}
mp.addressDataPair[address.String()] = data
return mp
}
// Validate checkes the passed chunk for validity
// Add a new address/data pair which can be validated
func (v *MockValidator) AddPair(address swarm.Address, data []byte) {
v.addressDataPair[address.String()] = data
}
// Validate checks the passed chunk for validity
func (v *MockValidator) Validate(ch swarm.Chunk) (valid bool) {
if !v.validAddress.Equal(ch.Address()) {
return false
if data, ok := v.addressDataPair[ch.Address().String()]; ok {
if bytes.Equal(data, ch.Data()) {
return true
}
if !bytes.Equal(v.validContent, ch.Data()) {
return false
}
return true
return false
}
......@@ -119,6 +119,12 @@ type Descriptor struct {
BinID uint64
}
// Pinner holds the required information for pinning
type Pinner struct {
Address swarm.Address
PinCounter uint64
}
func (d *Descriptor) String() string {
if d == nil {
return ""
......@@ -136,6 +142,8 @@ type Storer interface {
LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan Descriptor, stop func())
SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop func())
PinnedChunks(ctx context.Context, cursor swarm.Address) (pinnedChunks []*Pinner, err error)
PinInfo(address swarm.Address) (uint64, error)
io.Closer
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment