Commit 6e46128f authored by acud's avatar acud Committed by GitHub

localstore: remove gc check hack (#302)

parent a884705f
......@@ -118,8 +118,6 @@ type DB struct {
// are done
collectGarbageWorkerDone chan struct{}
putToGCCheck func([]byte) bool
// wait for all subscriptions to finish before closing
// underlaying BadgerDB to prevent possible panics from
// iterators
......@@ -138,10 +136,6 @@ type Options struct {
// MetricsPrefix defines a prefix for metrics names.
MetricsPrefix string
Tags *tags.Tags
// PutSetCheckFunc is a function called after a Put of a chunk
// to verify whether that chunk needs to be Set and added to
// garbage collection index too
PutToGCCheck func([]byte) bool
}
// New returns a new DB. All fields and indexes are initialized
......@@ -155,10 +149,6 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
}
}
if o.PutToGCCheck == nil {
o.PutToGCCheck = func(_ []byte) bool { return false }
}
db = &DB{
capacity: o.Capacity,
baseKey: baseKey,
......@@ -170,7 +160,6 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
collectGarbageTrigger: make(chan struct{}, 1),
close: make(chan struct{}),
collectGarbageWorkerDone: make(chan struct{}),
putToGCCheck: o.PutToGCCheck,
metrics: newMetrics(),
logger: logger,
}
......
......@@ -204,13 +204,7 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed
return false, 0, err
}
if exists {
if db.putToGCCheck(item.Address) {
gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
}
return true, gcSizeChange, nil
return true, 0, nil
}
anonymous := false
if db.tags != nil && item.Tag != 0 {
......@@ -241,19 +235,7 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed
}
}
if db.putToGCCheck(item.Address) {
// TODO: this might result in an edge case where a node
// that has very little storage and uploads using an anonymous
// upload will have some of the content GCd before being able
// to sync it
gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
}
return false, gcSizeChange, nil
return false, 0, nil
}
// putSync adds an Item to the batch by updating required indexes:
......@@ -266,14 +248,7 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I
return false, 0, err
}
if exists {
if db.putToGCCheck(item.Address) {
gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
}
return true, gcSizeChange, nil
return true, 0, nil
}
item.StoreTimestamp = now()
......@@ -290,17 +265,6 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I
return false, 0, err
}
if db.putToGCCheck(item.Address) {
// TODO: this might result in an edge case where a node
// that has very little storage and uploads using an anonymous
// upload will have some of the content GCd before being able
// to sync it
gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
}
return false, gcSizeChange, nil
}
......
......@@ -26,7 +26,6 @@ import (
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)
// TestModePutRequest validates ModePutRequest index values on the provided DB.
......@@ -313,124 +312,6 @@ func TestModePut_sameChunk(t *testing.T) {
}
}
// TestModePutSync_addToGc validates ModePut* with PutSetCheckFunc stub results
// in the added chunk to show up in GC index
func TestModePut_addToGc(t *testing.T) {
retVal := true
// PutSetCheckFunc's output is toggled from the test case
opts := &Options{PutToGCCheck: func(_ []byte) bool { return retVal }}
for _, m := range []struct {
mode storage.ModePut
putToGc bool
}{
{mode: storage.ModePutSync, putToGc: true},
{mode: storage.ModePutSync, putToGc: false},
{mode: storage.ModePutUpload, putToGc: true},
{mode: storage.ModePutUpload, putToGc: false},
{mode: storage.ModePutRequest, putToGc: true}, // in ModePutRequest we always insert to GC, so putToGc=false not needed
} {
for _, tc := range multiChunkTestCases {
t.Run(tc.name, func(t *testing.T) {
retVal = m.putToGc
db := newTestDB(t, opts)
wantTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantTimestamp
})()
chunks := generateTestRandomChunks(tc.count)
_, err := db.Put(context.Background(), m.mode, chunks...)
if err != nil {
t.Fatal(err)
}
binIDs := make(map[uint8]uint64)
for _, ch := range chunks {
po := db.po(ch.Address())
binIDs[po]++
var wantErr error
if !m.putToGc {
wantErr = leveldb.ErrNotFound
}
newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp)
newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, binIDs[po], wantErr)(t)
}
})
}
}
}
// TestModePutSync_addToGcExisting validates ModePut* with PutSetCheckFunc stub results
// in the added chunk to show up in GC index
func TestModePut_addToGcExisting(t *testing.T) {
retVal := true
// PutSetCheckFunc's output is toggled from the test case
opts := &Options{PutToGCCheck: func(_ []byte) bool { return retVal }}
for _, m := range []struct {
mode storage.ModePut
putToGc bool
}{
{mode: storage.ModePutSync, putToGc: true},
{mode: storage.ModePutSync, putToGc: false},
{mode: storage.ModePutUpload, putToGc: true},
{mode: storage.ModePutUpload, putToGc: false},
{mode: storage.ModePutRequest, putToGc: true}, // in ModePutRequest we always insert to GC, so putToGc=false not needed
} {
for _, tc := range multiChunkTestCases {
t.Run(tc.name, func(t *testing.T) {
retVal = m.putToGc
db := newTestDB(t, opts)
wantStoreTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantStoreTimestamp
})()
chunks := generateTestRandomChunks(tc.count)
_, err := db.Put(context.Background(), m.mode, chunks...)
if err != nil {
t.Fatal(err)
}
time.Sleep(1 * time.Millisecond)
// change the timestamp, put the chunks again and
// expect the access timestamp to change
wantAccessTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantAccessTimestamp
})()
_, err = db.Put(context.Background(), m.mode, chunks...)
if err != nil {
t.Fatal(err)
}
binIDs := make(map[uint8]uint64)
for _, ch := range chunks {
po := db.po(ch.Address())
binIDs[po]++
var wantErr error
if !m.putToGc {
wantErr = leveldb.ErrNotFound
}
newRetrieveIndexesTestWithAccess(db, ch, wantStoreTimestamp, wantAccessTimestamp)
newGCIndexTest(db, ch, wantStoreTimestamp, wantAccessTimestamp, binIDs[po], wantErr)(t)
}
})
}
}
}
// TestPutDuplicateChunks validates the expected behaviour for
// passing duplicate chunks to the Put method.
func TestPutDuplicateChunks(t *testing.T) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment