Commit f365a29a authored by Nemanja Zbiljić's avatar Nemanja Zbiljić Committed by GitHub

Fetch chunks from network when pinning (#1045)

parent f92c66fc
......@@ -9,6 +9,7 @@ import (
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/gorilla/mux"
......@@ -33,8 +34,14 @@ func (s *server) pinBytes(w http.ResponseWriter, r *http.Request) {
}
if !has {
jsonhttp.NotFound(w, nil)
return
_, err := s.Storer.Get(r.Context(), storage.ModeGetRequest, addr)
if err != nil {
s.Logger.Debugf("pin chunk: netstore get: %v", err)
s.Logger.Error("pin chunk: netstore")
jsonhttp.NotFound(w, nil)
return
}
}
ctx := r.Context()
......
......@@ -9,6 +9,7 @@ import (
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/gorilla/mux"
......@@ -33,8 +34,14 @@ func (s *server) pinBzz(w http.ResponseWriter, r *http.Request) {
}
if !has {
jsonhttp.NotFound(w, nil)
return
_, err := s.Storer.Get(r.Context(), storage.ModeGetRequest, addr)
if err != nil {
s.Logger.Debugf("pin chunk: netstore get: %v", err)
s.Logger.Error("pin chunk: netstore")
jsonhttp.NotFound(w, nil)
return
}
}
ctx := r.Context()
......
......@@ -32,26 +32,35 @@ func (s *server) pinChunk(w http.ResponseWriter, r *http.Request) {
return
}
has, err := s.Storer.Has(r.Context(), addr)
err = s.Storer.Set(r.Context(), storage.ModeSetPin, addr)
if err != nil {
s.Logger.Debugf("pin chunk: localstore has: %v", err)
s.Logger.Error("pin chunk: store")
jsonhttp.InternalServerError(w, err)
return
}
if errors.Is(err, storage.ErrNotFound) {
ch, err := s.Storer.Get(r.Context(), storage.ModeGetRequest, addr)
if err != nil {
s.Logger.Debugf("pin chunk: netstore get: %v", err)
s.Logger.Error("pin chunk: netstore")
jsonhttp.NotFound(w, nil)
return
}
_, err = s.Storer.Put(r.Context(), storage.ModePutRequestPin, ch)
if err != nil {
s.Logger.Debugf("pin chunk: storer put pin: %v", err)
s.Logger.Error("pin chunk: storer put pin")
jsonhttp.InternalServerError(w, err)
return
}
} else {
s.Logger.Debugf("pin chunk: pinning error: %v, addr %s", err, addr)
s.Logger.Error("pin chunk: cannot pin chunk")
if !has {
jsonhttp.NotFound(w, nil)
return
jsonhttp.InternalServerError(w, "cannot pin chunk")
return
}
}
err = s.Storer.Set(r.Context(), storage.ModeSetPin, addr)
if err != nil {
s.Logger.Debugf("pin chunk: pinning error: %v, addr %s", err, addr)
s.Logger.Error("pin chunk: cannot pin chunk")
jsonhttp.InternalServerError(w, "cannot pin chunk")
return
}
jsonhttp.OK(w, nil)
}
......@@ -312,11 +321,29 @@ func (s *server) updatePinCount(ctx context.Context, reference swarm.Address, de
func (s *server) pinChunkAddressFn(ctx context.Context, reference swarm.Address) func(address swarm.Address) (stop bool) {
return func(address swarm.Address) (stop bool) {
// NOTE: stop pinning on first error
err := s.Storer.Set(ctx, storage.ModeSetPin, address)
if err != nil {
s.Logger.Debugf("pin error: for reference %s, address %s: %w", reference, address, err)
// stop pinning on first error
return true
if errors.Is(err, storage.ErrNotFound) {
// chunk not found locally, try to get from netstore
ch, err := s.Storer.Get(ctx, storage.ModeGetRequest, address)
if err != nil {
s.Logger.Debugf("pin traversal: storer get: for reference %s, address %s: %w", reference, address, err)
return true
}
_, err = s.Storer.Put(ctx, storage.ModePutRequestPin, ch)
if err != nil {
s.Logger.Debugf("pin traversal: storer put pin: for reference %s, address %s: %w", reference, address, err)
return true
}
return false
} else {
s.Logger.Debugf("pin traversal: storer set pin: for reference %s, address %s: %w", reference, address, err)
return true
}
}
return false
......@@ -332,7 +359,7 @@ func (s *server) unpinChunkAddressFn(ctx context.Context, reference swarm.Addres
err = s.Storer.Set(ctx, storage.ModeSetUnpin, address)
if err != nil {
s.Logger.Debugf("unpin error: for reference %s, address %s: %w", reference, address, err)
s.Logger.Debugf("unpin traversal: for reference %s, address %s: %w", reference, address, err)
// continue un-pinning all chunks
}
......
......@@ -9,6 +9,7 @@ import (
"net/http"
"github.com/ethersphere/bee/pkg/jsonhttp"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/traversal"
"github.com/gorilla/mux"
......@@ -33,8 +34,14 @@ func (s *server) pinFile(w http.ResponseWriter, r *http.Request) {
}
if !has {
jsonhttp.NotFound(w, nil)
return
_, err := s.Storer.Get(r.Context(), storage.ModeGetRequest, addr)
if err != nil {
s.Logger.Debugf("pin chunk: netstore get: %v", err)
s.Logger.Error("pin chunk: netstore")
jsonhttp.NotFound(w, nil)
return
}
}
ctx := r.Context()
......
......@@ -73,7 +73,7 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e
binIDs := make(map[uint8]uint64)
switch mode {
case storage.ModePutRequest:
case storage.ModePutRequest, storage.ModePutRequestPin:
for i, ch := range chs {
if containsChunk(ch.Address(), chs[:i]...) {
exist[i] = true
......@@ -85,6 +85,13 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e
}
exist[i] = exists
gcSizeChange += c
if mode == storage.ModePutRequestPin {
err = db.setPin(batch, ch.Address())
if err != nil {
return nil, err
}
}
}
case storage.ModePutUpload, storage.ModePutUploadPin:
......
......@@ -83,6 +83,34 @@ func TestModePutRequest(t *testing.T) {
}
}
// TestModePutRequestPin validates ModePutRequestPin index values on the provided DB.
func TestModePutRequestPin(t *testing.T) {
for _, tc := range multiChunkTestCases {
t.Run(tc.name, func(t *testing.T) {
db := newTestDB(t, nil)
chunks := generateTestRandomChunks(tc.count)
wantTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantTimestamp
})()
_, err := db.Put(context.Background(), storage.ModePutRequestPin, chunks...)
if err != nil {
t.Fatal(err)
}
for _, ch := range chunks {
newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp)(t)
newPinIndexTest(db, ch, nil)(t)
}
newItemsCountTest(db.gcIndex, tc.count)(t)
})
}
}
// TestModePutSync validates ModePutSync index values on the provided DB.
func TestModePutSync(t *testing.T) {
for _, tc := range multiChunkTestCases {
......
......@@ -97,7 +97,16 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) {
case storage.ModeSetPin:
for _, addr := range addrs {
err := db.setPin(batch, addr)
has, err := db.retrievalDataIndex.Has(addressToItem(addr))
if err != nil {
return err
}
if !has {
return storage.ErrNotFound
}
err = db.setPin(batch, addr)
if err != nil {
return err
}
......
......@@ -26,6 +26,12 @@ func TestPinning(t *testing.T) {
t.Fatal(err)
}
// chunk must be present
_, err = db.Put(context.Background(), storage.ModePutUpload, chunks...)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetPin, chunkAddresses(chunks)...)
if err != nil {
t.Fatal(err)
......@@ -52,8 +58,14 @@ func TestPinCounter(t *testing.T) {
chunk := generateTestRandomChunk()
db := newTestDB(t, nil)
// chunk must be present
_, err := db.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
// pin once
err := db.Set(context.Background(), storage.ModeSetPin, swarm.NewAddress(chunk.Address().Bytes()))
err = db.Set(context.Background(), storage.ModeSetPin, swarm.NewAddress(chunk.Address().Bytes()))
if err != nil {
t.Fatal(err)
}
......@@ -95,8 +107,14 @@ func TestPaging(t *testing.T) {
addresses := chunksToSortedStrings(chunks)
db := newTestDB(t, nil)
// chunk must be present
_, err := db.Put(context.Background(), storage.ModePutUpload, chunks...)
if err != nil {
t.Fatal(err)
}
// pin once
err := db.Set(context.Background(), storage.ModeSetPin, chunkAddresses(chunks)...)
err = db.Set(context.Background(), storage.ModeSetPin, chunkAddresses(chunks)...)
if err != nil {
t.Fatal(err)
}
......
......@@ -338,8 +338,6 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
pssService := pss.New(pssPrivateKey, logger)
b.pssCloser = pssService
traversalService := traversal.NewService(storer)
var ns storage.Storer
if o.GlobalPinningEnabled {
// create recovery callback for content repair
......@@ -349,6 +347,8 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
ns = netstore.New(storer, nil, retrieve, logger)
}
traversalService := traversal.NewService(ns)
pushSyncProtocol := pushsync.New(p2ps, storer, kad, tagService, pssService.TryUnwrap, logger, acc, accounting.NewFixedPricer(swarmAddress, 10), tracer)
// set the pushSyncer in the PSS
......
......@@ -144,6 +144,16 @@ func (m *MockStorer) Set(ctx context.Context, mode storage.ModeSet, addrs ...swa
m.modeSet[addr.String()] = mode
switch mode {
case storage.ModeSetPin:
// check if chunk exists
has, err := m.has(ctx, addr)
if err != nil {
return err
}
if !has {
return storage.ErrNotFound
}
// if mode is set pin, increment the pin counter
var found bool
for i, ad := range m.pinnedAddress {
......
......@@ -77,6 +77,8 @@ const (
ModePutUpload
// ModePutUploadPin: the same as ModePutUpload but also pin the chunk atomically with the put
ModePutUploadPin
// ModePutRequestPin: the same as ModePutRequest but also pin the chunk with the put
ModePutRequestPin
)
// ModeSet enumerates different Setter modes.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment