Commit b90bd7cd authored by acud's avatar acud Committed by GitHub

localstore: reduce critical section size on gc (#1435)

parent 1de167ea
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"time" "time"
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
...@@ -85,13 +86,20 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { ...@@ -85,13 +86,20 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
} }
totalTimeMetric(db.metrics.TotalTimeCollectGarbage, start) totalTimeMetric(db.metrics.TotalTimeCollectGarbage, start)
}(time.Now()) }(time.Now())
batch := new(leveldb.Batch) batch := new(leveldb.Batch)
target := db.gcTarget() target := db.gcTarget()
// protect database from changing idexes and gcSize // tell the localstore to start logging dirty addresses
db.batchMu.Lock() db.batchMu.Lock()
defer db.batchMu.Unlock() db.gcRunning = true
db.batchMu.Unlock()
defer func() {
db.batchMu.Lock()
db.gcRunning = false
db.dirtyAddresses = nil
db.batchMu.Unlock()
}()
// run through the recently pinned chunks and // run through the recently pinned chunks and
// remove them from the gcIndex before iterating through gcIndex // remove them from the gcIndex before iterating through gcIndex
...@@ -109,6 +117,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { ...@@ -109,6 +117,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
done = true done = true
first := true first := true
start := time.Now() start := time.Now()
candidates := make([]shed.Item, 0)
err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
if first { if first {
totalTimeMetric(db.metrics.TotalTimeGCFirstItem, start) totalTimeMetric(db.metrics.TotalTimeGCFirstItem, start)
...@@ -118,39 +127,69 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { ...@@ -118,39 +127,69 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
return true, nil return true, nil
} }
candidates = append(candidates, item)
collectedCount++
if collectedCount >= gcBatchSize {
// batch size limit reached,
// another gc run is needed
done = false
return true, nil
}
return false, nil
}, nil)
if err != nil {
return 0, false, err
}
db.metrics.GCCollectedCounter.Add(float64(collectedCount))
if testHookGCIteratorDone != nil {
testHookGCIteratorDone()
}
// protect database from changing idexes and gcSize
db.batchMu.Lock()
defer totalTimeMetric(db.metrics.TotalTimeGCLock, time.Now())
defer db.batchMu.Unlock()
// refresh gcSize value, since it might have
// changed in the meanwhile
gcSize, err = db.gcSize.Get()
if err != nil {
return 0, false, err
}
// get rid of dirty entries
for _, item := range candidates {
if swarm.NewAddress(item.Address).MemberOf(db.dirtyAddresses) {
collectedCount--
if gcSize-collectedCount > target {
done = false
}
continue
}
db.metrics.GCStoreTimeStamps.Set(float64(item.StoreTimestamp)) db.metrics.GCStoreTimeStamps.Set(float64(item.StoreTimestamp))
db.metrics.GCStoreAccessTimeStamps.Set(float64(item.AccessTimestamp)) db.metrics.GCStoreAccessTimeStamps.Set(float64(item.AccessTimestamp))
// delete from retrieve, pull, gc // delete from retrieve, pull, gc
err = db.retrievalDataIndex.DeleteInBatch(batch, item) err = db.retrievalDataIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return true, nil return 0, false, err
} }
err = db.retrievalAccessIndex.DeleteInBatch(batch, item) err = db.retrievalAccessIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return true, nil return 0, false, err
} }
err = db.pullIndex.DeleteInBatch(batch, item) err = db.pullIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return true, nil return 0, false, err
} }
err = db.gcIndex.DeleteInBatch(batch, item) err = db.gcIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return true, nil return 0, false, err
} }
collectedCount++
if collectedCount >= gcBatchSize {
// bach size limit reached,
// another gc run is needed
done = false
return true, nil
}
return false, nil
}, nil)
if err != nil {
return 0, false, err
} }
db.metrics.GCCollectedCounter.Add(float64(collectedCount)) db.metrics.GCCommittedCounter.Add(float64(collectedCount))
db.gcSize.PutInBatch(batch, gcSize-collectedCount) db.gcSize.PutInBatch(batch, gcSize-collectedCount)
err = db.shed.WriteBatch(batch) err = db.shed.WriteBatch(batch)
...@@ -286,3 +325,8 @@ func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) { ...@@ -286,3 +325,8 @@ func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) {
// information when a garbage collection run is done // information when a garbage collection run is done
// and how many items it removed. // and how many items it removed.
var testHookCollectGarbage func(collectedCount uint64) var testHookCollectGarbage func(collectedCount uint64)
// testHookGCIteratorDone is a hook which is called
// when the GC is done collecting candidate items for
// eviction.
var testHookGCIteratorDone func()
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"os" "os"
"sync"
"testing" "testing"
"time" "time"
...@@ -716,3 +717,146 @@ func addRandomChunks(t *testing.T, count int, db *DB, pin bool) []swarm.Chunk { ...@@ -716,3 +717,146 @@ func addRandomChunks(t *testing.T, count int, db *DB, pin bool) []swarm.Chunk {
} }
return chunks return chunks
} }
// TestGC_NoEvictDirty checks that the garbage collection
// does not evict chunks that are marked as dirty while the gc
// is running.
func TestGC_NoEvictDirty(t *testing.T) {
// lower the maximal number of chunks in a single
// gc batch to ensure multiple batches.
defer func(s uint64) { gcBatchSize = s }(gcBatchSize)
gcBatchSize = 2
chunkCount := 15
db := newTestDB(t, &Options{
Capacity: 10,
})
closed := db.close
testHookCollectGarbageChan := make(chan uint64)
t.Cleanup(setTestHookCollectGarbage(func(collectedCount uint64) {
select {
case testHookCollectGarbageChan <- collectedCount:
case <-closed:
}
}))
dirtyChan := make(chan struct{})
incomingChan := make(chan struct{})
t.Cleanup(setTestHookGCIteratorDone(func() {
incomingChan <- struct{}{}
<-dirtyChan
}))
addrs := make([]swarm.Address, 0)
mtx := new(sync.Mutex)
online := make(chan struct{})
go func() {
close(online) // make sure this is scheduled, otherwise test might flake
i := 0
for range incomingChan {
// set a chunk to be updated in gc, resulting
// in a removal from the gc round. but don't do this
// for all chunks!
if i < 2 {
mtx.Lock()
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[i])
mtx.Unlock()
if err != nil {
t.Error(err)
}
i++
// we sleep so that the async update to gc index
// happens and that the dirtyAddresses get updated
time.Sleep(100 * time.Millisecond)
}
dirtyChan <- struct{}{}
}
}()
<-online
// upload random chunks
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
mtx.Lock()
addrs = append(addrs, ch.Address())
mtx.Unlock()
}
gcTarget := db.gcTarget()
for {
select {
case <-testHookCollectGarbageChan:
case <-time.After(10 * time.Second):
t.Error("collect garbage timeout")
}
gcSize, err := db.gcSize.Get()
if err != nil {
t.Fatal(err)
}
if gcSize == gcTarget {
break
}
}
t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))
t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))
t.Run("gc size", newIndexGCSizeTest(db))
// the first synced chunk should be removed
t.Run("get the first two chunks, third is gone", func(t *testing.T) {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[0])
if err != nil {
t.Error("got error but expected none")
}
_, err = db.Get(context.Background(), storage.ModeGetRequest, addrs[1])
if err != nil {
t.Error("got error but expected none")
}
_, err = db.Get(context.Background(), storage.ModeGetRequest, addrs[2])
if !errors.Is(err, storage.ErrNotFound) {
t.Errorf("expected err not found but got %v", err)
}
})
t.Run("only later inserted chunks should be removed", func(t *testing.T) {
for i := 2; i < (chunkCount - int(gcTarget)); i++ {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[i])
if !errors.Is(err, storage.ErrNotFound) {
t.Errorf("got error %v, want %v", err, storage.ErrNotFound)
}
}
})
// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
})
}
// setTestHookGCIteratorDone sets testHookGCIteratorDone and
// returns a function that will reset it to the
// value before the change.
func setTestHookGCIteratorDone(h func()) (reset func()) {
current := testHookGCIteratorDone
reset = func() { testHookGCIteratorDone = current }
testHookGCIteratorDone = h
return reset
}
...@@ -109,6 +109,15 @@ type DB struct { ...@@ -109,6 +109,15 @@ type DB struct {
batchMu sync.Mutex batchMu sync.Mutex
// gcRunning is true while GC is running. it is
// used to avoid touching dirty gc index entries
// while garbage collecting.
gcRunning bool
// dirtyAddresses are marked while gc is running
// in order to avoid the removal of dirty entries.
dirtyAddresses []swarm.Address
// this channel is closed when close function is called // this channel is closed when close function is called
// to terminate other goroutines // to terminate other goroutines
close chan struct{} close chan struct{}
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
) )
type metrics struct { type metrics struct {
TotalTimeGCLock prometheus.Counter
TotalTimeGCFirstItem prometheus.Counter TotalTimeGCFirstItem prometheus.Counter
TotalTimeCollectGarbage prometheus.Counter TotalTimeCollectGarbage prometheus.Counter
TotalTimeGCExclude prometheus.Counter TotalTimeGCExclude prometheus.Counter
...@@ -26,6 +27,7 @@ type metrics struct { ...@@ -26,6 +27,7 @@ type metrics struct {
GCCounter prometheus.Counter GCCounter prometheus.Counter
GCErrorCounter prometheus.Counter GCErrorCounter prometheus.Counter
GCCollectedCounter prometheus.Counter GCCollectedCounter prometheus.Counter
GCCommittedCounter prometheus.Counter
GCExcludeCounter prometheus.Counter GCExcludeCounter prometheus.Counter
GCExcludeError prometheus.Counter GCExcludeError prometheus.Counter
GCExcludeWriteBatchError prometheus.Counter GCExcludeWriteBatchError prometheus.Counter
...@@ -63,6 +65,12 @@ func newMetrics() metrics { ...@@ -63,6 +65,12 @@ func newMetrics() metrics {
subsystem := "localstore" subsystem := "localstore"
return metrics{ return metrics{
TotalTimeGCLock: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "gc_lock_time",
Help: "Total time under lock in gc.",
}),
TotalTimeGCFirstItem: prometheus.NewCounter(prometheus.CounterOpts{ TotalTimeGCFirstItem: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
...@@ -153,6 +161,12 @@ func newMetrics() metrics { ...@@ -153,6 +161,12 @@ func newMetrics() metrics {
Name: "gc_collected_count", Name: "gc_collected_count",
Help: "Number of times the GC_COLLECTED operation is done.", Help: "Number of times the GC_COLLECTED operation is done.",
}), }),
GCCommittedCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "gc_committed_count",
Help: "Number of gc items to commit.",
}),
GCExcludeCounter: prometheus.NewCounter(prometheus.CounterOpts{ GCExcludeCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
......
...@@ -125,6 +125,9 @@ func (db *DB) updateGCItems(items ...shed.Item) { ...@@ -125,6 +125,9 @@ func (db *DB) updateGCItems(items ...shed.Item) {
func (db *DB) updateGC(item shed.Item) (err error) { func (db *DB) updateGC(item shed.Item) (err error) {
db.batchMu.Lock() db.batchMu.Lock()
defer db.batchMu.Unlock() defer db.batchMu.Unlock()
if db.gcRunning {
db.dirtyAddresses = append(db.dirtyAddresses, swarm.NewAddress(item.Address))
}
batch := new(leveldb.Batch) batch := new(leveldb.Batch)
......
...@@ -55,6 +55,11 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e ...@@ -55,6 +55,11 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e
// protect parallel updates // protect parallel updates
db.batchMu.Lock() db.batchMu.Lock()
defer db.batchMu.Unlock() defer db.batchMu.Unlock()
if db.gcRunning {
for _, ch := range chs {
db.dirtyAddresses = append(db.dirtyAddresses, ch.Address())
}
}
batch := new(leveldb.Batch) batch := new(leveldb.Batch)
......
...@@ -50,6 +50,9 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) { ...@@ -50,6 +50,9 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) {
// protect parallel updates // protect parallel updates
db.batchMu.Lock() db.batchMu.Lock()
defer db.batchMu.Unlock() defer db.batchMu.Unlock()
if db.gcRunning {
db.dirtyAddresses = append(db.dirtyAddresses, addrs...)
}
batch := new(leveldb.Batch) batch := new(leveldb.Batch)
......
...@@ -77,6 +77,17 @@ func (a Address) Equal(b Address) bool { ...@@ -77,6 +77,17 @@ func (a Address) Equal(b Address) bool {
return bytes.Equal(a.b, b.b) return bytes.Equal(a.b, b.b)
} }
// MemberOf returns true if the address is a member of the
// provided set.
func (a Address) MemberOf(addrs []Address) bool {
for _, v := range addrs {
if v.Equal(a) {
return true
}
}
return false
}
// IsZero returns true if the Address is not set to any value. // IsZero returns true if the Address is not set to any value.
func (a Address) IsZero() bool { func (a Address) IsZero() bool {
return a.Equal(ZeroAddress) return a.Equal(ZeroAddress)
......
...@@ -81,3 +81,21 @@ func TestAddress_jsonMarshalling(t *testing.T) { ...@@ -81,3 +81,21 @@ func TestAddress_jsonMarshalling(t *testing.T) {
t.Error("unmarshalled address is not equal to the original") t.Error("unmarshalled address is not equal to the original")
} }
} }
func TestAddress_MemberOf(t *testing.T) {
a1 := swarm.MustParseHexAddress("24798dd5a470e927fa")
a2 := swarm.MustParseHexAddress("24798dd5a470e927fa")
a3 := swarm.MustParseHexAddress("24798dd5a470e927fb")
a4 := swarm.MustParseHexAddress("24798dd5a470e927fc")
set1 := []swarm.Address{a2, a3}
if !a1.MemberOf(set1) {
t.Fatal("expected addr as member")
}
set2 := []swarm.Address{a3, a4}
if a1.MemberOf(set2) {
t.Fatal("expected addr not member")
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment