Commit 4062b5c2 authored by Zahoor Mohamed's avatar Zahoor Mohamed Committed by GitHub

Local pinning fix (#255)

* Local pinning fix
parent 34039bde
...@@ -534,3 +534,184 @@ func TestSetTestHookCollectGarbage(t *testing.T) { ...@@ -534,3 +534,184 @@ func TestSetTestHookCollectGarbage(t *testing.T) {
t.Errorf("got hook value %v, want %v", got, original) t.Errorf("got hook value %v, want %v", got, original)
} }
} }
func TestPinAfterMultiGC(t *testing.T) {
db := newTestDB(t, &Options{
Capacity: 10,
})
pinnedChunks := make([]swarm.Address, 0)
// upload random chunks above db capacity to see if chunks are still pinned
for i := 0; i < 20; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
if len(pinnedChunks) < 10 {
rch := generateAndPinAChunk(t, db)
pinnedChunks = append(pinnedChunks, rch.Address())
}
}
for i := 0; i < 20; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
}
for i := 0; i < 20; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
}
t.Run("pin Index count", newItemsCountTest(db.pinIndex, len(pinnedChunks)))
// Check if all the pinned chunks are present in the data DB
for _, addr := range pinnedChunks {
outItem := shed.Item{
Address: addr.Bytes(),
}
gotChunk, err := db.Get(context.Background(), storage.ModeGetRequest, swarm.NewAddress(outItem.Address))
if err != nil {
t.Fatal(err)
}
if !gotChunk.Address().Equal(swarm.NewAddress(addr.Bytes())) {
t.Fatal("Pinned chunk is not equal to got chunk")
}
}
}
func generateAndPinAChunk(t *testing.T, db *DB) swarm.Chunk {
// Create a chunk and pin it
pinnedChunk := generateTestRandomChunk()
_, err := db.Put(context.Background(), storage.ModePutUpload, pinnedChunk)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetPin, pinnedChunk.Address())
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSyncPull, pinnedChunk.Address())
if err != nil {
t.Fatal(err)
}
return pinnedChunk
}
func TestPinSyncAndAccessPutSetChunkMultipleTimes(t *testing.T) {
var closed chan struct{}
testHookCollectGarbageChan := make(chan uint64)
t.Cleanup(setTestHookCollectGarbage(func(collectedCount uint64) {
select {
case testHookCollectGarbageChan <- collectedCount:
case <-closed:
}
}))
db := newTestDB(t, &Options{
Capacity: 10,
})
closed = db.close
pinnedChunks := addRandomChunks(t, 5, db, true)
rand1Chunks := addRandomChunks(t, 15, db, false)
for _, ch := range pinnedChunks {
_, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
}
for _, ch := range rand1Chunks {
_, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
// ignore the chunks that are GCd
continue
}
}
rand2Chunks := addRandomChunks(t, 20, db, false)
for _, ch := range rand2Chunks {
_, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
// ignore the chunks that are GCd
continue
}
}
rand3Chunks := addRandomChunks(t, 20, db, false)
for _, ch := range rand3Chunks {
_, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
// ignore the chunks that are GCd
continue
}
}
// check if the pinned chunk is present after GC
for _, ch := range pinnedChunks {
gotChunk, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal("Pinned chunk missing ", err)
}
if !gotChunk.Address().Equal(ch.Address()) {
t.Fatal("Pinned chunk address is not equal to got chunk")
}
if !bytes.Equal(gotChunk.Data(), ch.Data()) {
t.Fatal("Pinned chunk data is not equal to got chunk")
}
}
}
func addRandomChunks(t *testing.T, count int, db *DB, pin bool) []swarm.Chunk {
var chunks []swarm.Chunk
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
if pin {
err = db.Set(context.Background(), storage.ModeSetPin, ch.Address())
if err != nil {
t.Fatal(err)
}
}
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetAccess, ch.Address())
if err != nil {
t.Fatal(err)
}
_, err = db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
chunks = append(chunks, ch)
}
return chunks
}
...@@ -577,6 +577,6 @@ func TestDBDebugIndexes(t *testing.T) { ...@@ -577,6 +577,6 @@ func TestDBDebugIndexes(t *testing.T) {
} }
// assert that there's a pin and gc exclude entry now // assert that there's a pin and gc exclude entry now
testIndexCounts(t, 1, 1, 1, 1, 1, 1, 1, indexCounts) testIndexCounts(t, 1, 1, 0, 1, 1, 1, 1, indexCounts)
} }
...@@ -21,10 +21,11 @@ import ( ...@@ -21,10 +21,11 @@ import (
"errors" "errors"
"time" "time"
"github.com/syndtr/goleveldb/leveldb"
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
) )
// Get returns a chunk from the database. If the chunk is // Get returns a chunk from the database. If the chunk is
...@@ -155,11 +156,19 @@ func (db *DB) updateGC(item shed.Item) (err error) { ...@@ -155,11 +156,19 @@ func (db *DB) updateGC(item shed.Item) (err error) {
if err != nil { if err != nil {
return err return err
} }
// add new entry to gc index
err = db.gcIndex.PutInBatch(batch, item) // add new entry to gc index ONLY if it is not present in pinIndex
ok, err := db.pinIndex.Has(item)
if err != nil { if err != nil {
return err return err
} }
if !ok {
err = db.gcIndex.PutInBatch(batch, item)
if err != nil {
return err
}
}
return db.shed.WriteBatch(batch) return db.shed.WriteBatch(batch)
} }
......
...@@ -338,12 +338,18 @@ func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, e ...@@ -338,12 +338,18 @@ func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, e
return 0, err return 0, err
} }
err = db.gcIndex.PutInBatch(batch, item) // add new entry to gc index ONLY if it is not present in pinIndex
ok, err := db.pinIndex.Has(item)
if err != nil { if err != nil {
return 0, err return 0, err
} }
if !ok {
gcSizeChange++ err = db.gcIndex.PutInBatch(batch, item)
if err != nil {
return 0, err
}
gcSizeChange++
}
return gcSizeChange, nil return gcSizeChange, nil
} }
......
...@@ -21,10 +21,11 @@ import ( ...@@ -21,10 +21,11 @@ import (
"errors" "errors"
"time" "time"
"github.com/syndtr/goleveldb/leveldb"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
"github.com/syndtr/goleveldb/leveldb"
) )
// Set updates database indexes for // Set updates database indexes for
...@@ -180,11 +181,18 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr swar ...@@ -180,11 +181,18 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr swar
if err != nil { if err != nil {
return 0, err return 0, err
} }
err = db.gcIndex.PutInBatch(batch, item)
ok, err := db.pinIndex.Has(item)
if err != nil { if err != nil {
return 0, err return 0, err
} }
gcSizeChange++ if !ok {
err = db.gcIndex.PutInBatch(batch, item)
if err != nil {
return 0, err
}
gcSizeChange++
}
return gcSizeChange, nil return gcSizeChange, nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment