Commit 034dfb4f authored by acud's avatar acud Committed by GitHub

localstore/pin: refactor and add paging (#821)

parent 09006d70
...@@ -7,6 +7,7 @@ package api ...@@ -7,6 +7,7 @@ package api
import ( import (
"errors" "errors"
"net/http" "net/http"
"strconv"
"github.com/ethersphere/bee/pkg/jsonhttp" "github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
...@@ -21,14 +22,16 @@ import ( ...@@ -21,14 +22,16 @@ import (
func (s *server) pinChunk(w http.ResponseWriter, r *http.Request) { func (s *server) pinChunk(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"]) addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil { if err != nil {
s.Logger.Debugf("debug api: pin chunk: parse chunk address: %v", err) s.Logger.Debugf("pin chunk: parse chunk address: %v", err)
s.Logger.Error("pin chunk: parse address")
jsonhttp.BadRequest(w, "bad address") jsonhttp.BadRequest(w, "bad address")
return return
} }
has, err := s.Storer.Has(r.Context(), addr) has, err := s.Storer.Has(r.Context(), addr)
if err != nil { if err != nil {
s.Logger.Debugf("debug api: pin chunk: localstore has: %v", err) s.Logger.Debugf("pin chunk: localstore has: %v", err)
s.Logger.Error("pin chunk: store")
jsonhttp.InternalServerError(w, err) jsonhttp.InternalServerError(w, err)
return return
} }
...@@ -40,7 +43,8 @@ func (s *server) pinChunk(w http.ResponseWriter, r *http.Request) { ...@@ -40,7 +43,8 @@ func (s *server) pinChunk(w http.ResponseWriter, r *http.Request) {
err = s.Storer.Set(r.Context(), storage.ModeSetPin, addr) err = s.Storer.Set(r.Context(), storage.ModeSetPin, addr)
if err != nil { if err != nil {
s.Logger.Debugf("debug-api: pin chunk: pinning error: %v, addr %s", err, addr) s.Logger.Debugf("pin chunk: pinning error: %v, addr %s", err, addr)
s.Logger.Error("pin chunk: cannot pin chunk")
jsonhttp.InternalServerError(w, "cannot pin chunk") jsonhttp.InternalServerError(w, "cannot pin chunk")
return return
} }
...@@ -52,14 +56,16 @@ func (s *server) pinChunk(w http.ResponseWriter, r *http.Request) { ...@@ -52,14 +56,16 @@ func (s *server) pinChunk(w http.ResponseWriter, r *http.Request) {
func (s *server) unpinChunk(w http.ResponseWriter, r *http.Request) { func (s *server) unpinChunk(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"]) addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil { if err != nil {
s.Logger.Debugf("debug api: pin chunk: parse chunk address: %v", err) s.Logger.Debugf("pin chunk: parse chunk address: %v", err)
s.Logger.Error("pin chunk: parse address")
jsonhttp.BadRequest(w, "bad address") jsonhttp.BadRequest(w, "bad address")
return return
} }
has, err := s.Storer.Has(r.Context(), addr) has, err := s.Storer.Has(r.Context(), addr)
if err != nil { if err != nil {
s.Logger.Debugf("debug api: pin chunk: localstore has: %v", err) s.Logger.Debugf("pin chunk: localstore has: %v", err)
s.Logger.Error("pin chunk: store")
jsonhttp.InternalServerError(w, err) jsonhttp.InternalServerError(w, err)
return return
} }
...@@ -69,16 +75,18 @@ func (s *server) unpinChunk(w http.ResponseWriter, r *http.Request) { ...@@ -69,16 +75,18 @@ func (s *server) unpinChunk(w http.ResponseWriter, r *http.Request) {
return return
} }
_, err = s.Storer.PinInfo(addr) _, err = s.Storer.PinCounter(addr)
if err != nil { if err != nil {
s.Logger.Debugf("debug api: pin chunk: not pinned: %v", err) s.Logger.Debugf("pin chunk: not pinned: %v", err)
s.Logger.Error("pin chunk: pin counter")
jsonhttp.BadRequest(w, "chunk is not yet pinned") jsonhttp.BadRequest(w, "chunk is not yet pinned")
return return
} }
err = s.Storer.Set(r.Context(), storage.ModeSetUnpin, addr) err = s.Storer.Set(r.Context(), storage.ModeSetUnpin, addr)
if err != nil { if err != nil {
s.Logger.Debugf("debug-api: pin chunk: unpinning error: %v, addr %s", err, addr) s.Logger.Debugf("pin chunk: unpinning error: %v, addr %s", err, addr)
s.Logger.Error("pin chunk: unpin")
jsonhttp.InternalServerError(w, "cannot unpin chunk") jsonhttp.InternalServerError(w, "cannot unpin chunk")
return return
} }
...@@ -96,16 +104,41 @@ type listPinnedChunksResponse struct { ...@@ -96,16 +104,41 @@ type listPinnedChunksResponse struct {
// listPinnedChunks lists all the chunk address and pin counters that are currently pinned. // listPinnedChunks lists all the chunk address and pin counters that are currently pinned.
func (s *server) listPinnedChunks(w http.ResponseWriter, r *http.Request) { func (s *server) listPinnedChunks(w http.ResponseWriter, r *http.Request) {
pinnedChunks, err := s.Storer.PinnedChunks(r.Context(), swarm.NewAddress(nil)) var (
err error
offset, limit = 0, 100 // default offset is 0, default limit 100
)
if v := r.URL.Query().Get("offset"); v != "" {
offset, err = strconv.Atoi(v)
if err != nil {
s.Logger.Debugf("list pins: parse offset: %v", err)
s.Logger.Errorf("list pins: bad offset")
jsonhttp.BadRequest(w, "bad offset")
}
}
if v := r.URL.Query().Get("limit"); v != "" {
limit, err = strconv.Atoi(v)
if err != nil {
s.Logger.Debugf("list pins: parse limit: %v", err)
s.Logger.Errorf("list pins: bad limit")
jsonhttp.BadRequest(w, "bad limit")
}
}
pinnedChunks, err := s.Storer.PinnedChunks(r.Context(), offset, limit)
if err != nil { if err != nil {
s.Logger.Debugf("debug-api: pin chunk: listing pinned chunks: %v", err) s.Logger.Debugf("list pins: list pinned: %v", err)
s.Logger.Errorf("list pins: list pinned")
jsonhttp.InternalServerError(w, err) jsonhttp.InternalServerError(w, err)
return return
} }
chunks := make([]pinnedChunk, len(pinnedChunks)) chunks := make([]pinnedChunk, len(pinnedChunks))
for i, c := range pinnedChunks { for i, c := range pinnedChunks {
chunks[i] = pinnedChunk(*c) chunks[i] = pinnedChunk(*c)
} }
jsonhttp.OK(w, listPinnedChunksResponse{ jsonhttp.OK(w, listPinnedChunksResponse{
Chunks: chunks, Chunks: chunks,
}) })
...@@ -114,15 +147,17 @@ func (s *server) listPinnedChunks(w http.ResponseWriter, r *http.Request) { ...@@ -114,15 +147,17 @@ func (s *server) listPinnedChunks(w http.ResponseWriter, r *http.Request) {
func (s *server) getPinnedChunk(w http.ResponseWriter, r *http.Request) { func (s *server) getPinnedChunk(w http.ResponseWriter, r *http.Request) {
addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"]) addr, err := swarm.ParseHexAddress(mux.Vars(r)["address"])
if err != nil { if err != nil {
s.Logger.Debugf("debug api: pin chunk: parse chunk ddress: %v", err) s.Logger.Debugf("pin counter: parse chunk ddress: %v", err)
jsonhttp.BadRequest(w, "bad address") s.Logger.Errorf("pin counter: parse address")
jsonhttp.NotFound(w, nil)
return return
} }
has, err := s.Storer.Has(r.Context(), addr) has, err := s.Storer.Has(r.Context(), addr)
if err != nil { if err != nil {
s.Logger.Debugf("debug api: pin chunk: localstore has: %v", err) s.Logger.Debugf("pin counter: localstore has: %v", err)
jsonhttp.BadRequest(w, err) s.Logger.Errorf("pin counter: store")
jsonhttp.NotFound(w, nil)
return return
} }
...@@ -131,13 +166,14 @@ func (s *server) getPinnedChunk(w http.ResponseWriter, r *http.Request) { ...@@ -131,13 +166,14 @@ func (s *server) getPinnedChunk(w http.ResponseWriter, r *http.Request) {
return return
} }
pinCounter, err := s.Storer.PinInfo(addr) pinCounter, err := s.Storer.PinCounter(addr)
if err != nil { if err != nil {
if errors.Is(err, storage.ErrNotFound) { if errors.Is(err, storage.ErrNotFound) {
jsonhttp.NotFound(w, nil) jsonhttp.NotFound(w, nil)
return return
} }
s.Logger.Debugf("debug-api: pin chunk: listing pinned chunks: %v", err) s.Logger.Debugf("pin counter: get pin counter: %v", err)
s.Logger.Errorf("pin counter: get pin counter")
jsonhttp.InternalServerError(w, err) jsonhttp.InternalServerError(w, err)
return return
} }
......
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
package localstore package localstore
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
...@@ -17,17 +16,13 @@ import ( ...@@ -17,17 +16,13 @@ import (
) )
const ( const (
maxChunksToDisplay = 20 // no of items to display per request maxPage = 1000 // hard limit of page size
) )
// PinnedChunks for now returns the first few pinned chunks to display along with their pin counter. // PinnedChunks
// TODO: have pagination and prefix filter func (db *DB) PinnedChunks(ctx context.Context, offset, limit int) (chunks []*storage.Pinner, err error) {
func (db *DB) PinnedChunks(ctx context.Context, cursor swarm.Address) (pinnedChunks []*storage.Pinner, err error) { if limit > maxPage {
count := 0 limit = maxPage
var prefix []byte
if bytes.Equal(cursor.Bytes(), []byte{0}) {
prefix = nil
} }
c, err := db.pinIndex.Count() c, err := db.pinIndex.Count()
...@@ -37,36 +32,32 @@ func (db *DB) PinnedChunks(ctx context.Context, cursor swarm.Address) (pinnedChu ...@@ -37,36 +32,32 @@ func (db *DB) PinnedChunks(ctx context.Context, cursor swarm.Address) (pinnedChu
// send empty response if there is nothing pinned // send empty response if there is nothing pinned
if c == 0 { if c == 0 {
return pinnedChunks, nil return nil, nil
} }
it, err := db.pinIndex.First(prefix)
if err != nil {
return nil, fmt.Errorf("get first pin: %w", err)
}
err = db.pinIndex.Iterate(func(item shed.Item) (stop bool, err error) { err = db.pinIndex.Iterate(func(item shed.Item) (stop bool, err error) {
pinnedChunks = append(pinnedChunks, if offset > 0 {
offset--
return false, nil
}
chunks = append(chunks,
&storage.Pinner{ &storage.Pinner{
Address: swarm.NewAddress(item.Address), Address: swarm.NewAddress(item.Address),
PinCounter: item.PinCounter, PinCounter: item.PinCounter,
}) })
count++ limit--
if count >= maxChunksToDisplay {
if limit == 0 {
return true, nil return true, nil
} else {
return false, nil
} }
return false, nil
}, &shed.IterateOptions{ }, nil)
StartFrom: &it, return chunks, err
SkipStartFromItem: false,
})
return pinnedChunks, err
} }
// PinInfo returns the pin counter for a given swarm address, provided that the // PinCounter returns the pin counter for a given swarm address, provided that the
// address has been pinned. // address has been pinned.
func (db *DB) PinInfo(address swarm.Address) (uint64, error) { func (db *DB) PinCounter(address swarm.Address) (uint64, error) {
out, err := db.pinIndex.Get(shed.Item{ out, err := db.pinIndex.Get(shed.Item{
Address: address.Bytes(), Address: address.Bytes(),
}) })
......
...@@ -12,114 +12,132 @@ import ( ...@@ -12,114 +12,132 @@ import (
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
) )
func TestPinning(t *testing.T) { func TestPinning(t *testing.T) {
chunks := generateTestRandomChunks(21) chunks := generateTestRandomChunks(21)
// SOrt the addresses addresses := chunksToSortedStrings(chunks)
var addresses []string
for _, c := range chunks {
addresses = append(addresses, c.Address().String())
}
sort.Strings(addresses)
t.Run("empty-db", func(t *testing.T) { db := newTestDB(t, nil)
db := newTestDB(t, nil) _, err := db.PinnedChunks(context.Background(), 0, 10)
// Nothing should be there in the pinned DB
_, err := db.PinnedChunks(context.Background(), swarm.NewAddress([]byte{0}))
if err != nil {
if !errors.Is(err, leveldb.ErrNotFound) {
t.Fatal(err)
}
}
})
t.Run("get-pinned-chunks", func(t *testing.T) { // error should be nil
db := newTestDB(t, nil) if err != nil {
t.Fatal(err)
}
err := db.Set(context.Background(), storage.ModeSetPin, chunkAddresses(chunks)...) err = db.Set(context.Background(), storage.ModeSetPin, chunkAddresses(chunks)...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
pinnedChunks, err := db.PinnedChunks(context.Background(), swarm.NewAddress([]byte{0})) pinnedChunks, err := db.PinnedChunks(context.Background(), 0, 30)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if pinnedChunks == nil || len(pinnedChunks) != maxChunksToDisplay { if len(pinnedChunks) != len(chunks) {
t.Fatal(err) t.Fatalf("want %d pins but got %d", len(chunks), len(pinnedChunks))
} }
// Check if they are sorted // Check if they are sorted
for i, addr := range pinnedChunks { for i, addr := range pinnedChunks {
if addresses[i] != addr.Address.String() { if addresses[i] != addr.Address.String() {
t.Fatal("error in getting sorted address") t.Fatal("error in getting sorted address")
}
} }
}) }
} }
func TestPinInfo(t *testing.T) { func TestPinCounter(t *testing.T) {
chunk := generateTestRandomChunk() chunk := generateTestRandomChunk()
t.Run("get-pinned-chunks", func(t *testing.T) { db := newTestDB(t, nil)
db := newTestDB(t, nil)
// pin once // pin once
err := db.Set(context.Background(), storage.ModeSetPin, swarm.NewAddress(chunk.Address().Bytes())) err := db.Set(context.Background(), storage.ModeSetPin, swarm.NewAddress(chunk.Address().Bytes()))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
pinCounter, err := db.PinInfo(swarm.NewAddress(chunk.Address().Bytes())) pinCounter, err := db.PinCounter(swarm.NewAddress(chunk.Address().Bytes()))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if pinCounter != 1 { if pinCounter != 1 {
t.Fatal(err) t.Fatalf("want pin counter %d but got %d", 1, pinCounter)
} }
// pin twice // pin twice
err = db.Set(context.Background(), storage.ModeSetPin, swarm.NewAddress(chunk.Address().Bytes())) err = db.Set(context.Background(), storage.ModeSetPin, swarm.NewAddress(chunk.Address().Bytes()))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
pinCounter, err = db.PinInfo(swarm.NewAddress(chunk.Address().Bytes())) pinCounter, err = db.PinCounter(swarm.NewAddress(chunk.Address().Bytes()))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if pinCounter != 2 { if pinCounter != 2 {
t.Fatalf("want pin counter %d but got %d", 2, pinCounter)
}
err = db.Set(context.Background(), storage.ModeSetUnpin, swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
_, err = db.PinCounter(swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
t.Fatal(err) t.Fatal(err)
} }
}) }
}
t.Run("get-unpinned-chunks", func(t *testing.T) { func TestPaging(t *testing.T) {
db := newTestDB(t, nil) chunks := generateTestRandomChunks(10)
addresses := chunksToSortedStrings(chunks)
db := newTestDB(t, nil)
// pin once // pin once
err := db.Set(context.Background(), storage.ModeSetPin, swarm.NewAddress(chunk.Address().Bytes())) err := db.Set(context.Background(), storage.ModeSetPin, chunkAddresses(chunks)...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
pinCounter, err := db.PinInfo(swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
if pinCounter != 1 {
t.Fatal(err)
}
// unpin and see if it doesn't exists pinnedChunks, err := db.PinnedChunks(context.Background(), 0, 5)
err = db.Set(context.Background(), storage.ModeSetUnpin, swarm.NewAddress(chunk.Address().Bytes())) if err != nil {
if err != nil { t.Fatal(err)
t.Fatal(err) }
if len(pinnedChunks) != 5 {
t.Fatalf("want %d pins but got %d", 5, len(pinnedChunks))
}
// Check if they are sorted
for i, addr := range pinnedChunks {
if addresses[i] != addr.Address.String() {
t.Fatal("error in getting sorted address")
} }
_, err = db.PinInfo(swarm.NewAddress(chunk.Address().Bytes())) }
if err != nil { pinnedChunks, err = db.PinnedChunks(context.Background(), 5, 5)
if !errors.Is(err, storage.ErrNotFound) { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(pinnedChunks) != 5 {
t.Fatalf("want %d pins but got %d", 5, len(pinnedChunks))
}
// Check if they are sorted
for i, addr := range pinnedChunks {
if addresses[5+i] != addr.Address.String() {
t.Fatal("error in getting sorted address")
} }
}) }
}
func chunksToSortedStrings(chunks []swarm.Chunk) []string {
var addresses []string
for _, c := range chunks {
addresses = append(addresses, c.Address().String())
}
sort.Strings(addresses)
return addresses
} }
...@@ -267,7 +267,7 @@ func (m *MockStorer) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, s ...@@ -267,7 +267,7 @@ func (m *MockStorer) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, s
panic("not implemented") // TODO: Implement panic("not implemented") // TODO: Implement
} }
func (m *MockStorer) PinnedChunks(ctx context.Context, cursor swarm.Address) (pinnedChunks []*storage.Pinner, err error) { func (m *MockStorer) PinnedChunks(ctx context.Context, offset, cursor int) (pinnedChunks []*storage.Pinner, err error) {
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
if len(m.pinnedAddress) == 0 { if len(m.pinnedAddress) == 0 {
...@@ -286,7 +286,7 @@ func (m *MockStorer) PinnedChunks(ctx context.Context, cursor swarm.Address) (pi ...@@ -286,7 +286,7 @@ func (m *MockStorer) PinnedChunks(ctx context.Context, cursor swarm.Address) (pi
return pinnedChunks, nil return pinnedChunks, nil
} }
func (m *MockStorer) PinInfo(address swarm.Address) (uint64, error) { func (m *MockStorer) PinCounter(address swarm.Address) (uint64, error) {
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
for i, addr := range m.pinnedAddress { for i, addr := range m.pinnedAddress {
......
...@@ -146,8 +146,8 @@ type Storer interface { ...@@ -146,8 +146,8 @@ type Storer interface {
LastPullSubscriptionBinID(bin uint8) (id uint64, err error) LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
PullSubscriber PullSubscriber
SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop func()) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop func())
PinnedChunks(ctx context.Context, cursor swarm.Address) (pinnedChunks []*Pinner, err error) PinnedChunks(ctx context.Context, offset, limit int) (pinnedChunks []*Pinner, err error)
PinInfo(address swarm.Address) (uint64, error) PinCounter(address swarm.Address) (uint64, error)
io.Closer io.Closer
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment