Commit c6910097 authored by acud's avatar acud Committed by GitHub

feat: unreserve on-demand (#2071)

parent 726b61f2
48c48 48c48
< var Capacity = exp2(22) < var Capacity = exp2(22)
--- ---
> var Capacity = exp2(10) > var Capacity = exp2(6)
...@@ -110,25 +110,6 @@ jobs: ...@@ -110,25 +110,6 @@ jobs:
run: | run: |
beekeeper delete bee-cluster --cluster-name local-clef beekeeper delete bee-cluster --cluster-name local-clef
make beelocal ACTION=uninstall make beelocal ACTION=uninstall
- name: Apply patches
run: |
patch pkg/postage/batchstore/reserve.go .github/patches/postagereserve_gc.patch
- name: Prepare testing cluster (storage incentives setup)
run: |
timeout 10m make beelocal OPTS='ci skip-vet'
- name: Set kube config
run: |
mkdir -p ~/.kube
cp /etc/rancher/k3s/k3s.yaml ~/.kube/config
- name: Set testing cluster (storage incentives setup)
run: |
timeout 10m make deploylocal BEEKEEPER_CLUSTER=local-gc
- name: Test pingpong
id: pingpong-3
run: until beekeeper check --cluster-name local-gc --checks ci-pingpong; do echo "waiting for pingpong..."; sleep .3; done
- name: Test gc
id: gc-chunk-1
run: beekeeper check --cluster-name local-gc --checks=ci-gc
- name: Retag Docker image and push for cache - name: Retag Docker image and push for cache
if: success() if: success()
run: | run: |
...@@ -170,8 +151,6 @@ jobs: ...@@ -170,8 +151,6 @@ jobs:
if ${{ steps.settlements-2.outcome=='failure' }}; then FAILED=settlements-2; fi if ${{ steps.settlements-2.outcome=='failure' }}; then FAILED=settlements-2; fi
if ${{ steps.pss.outcome=='failure' }}; then FAILED=pss; fi if ${{ steps.pss.outcome=='failure' }}; then FAILED=pss; fi
if ${{ steps.soc.outcome=='failure' }}; then FAILED=soc; fi if ${{ steps.soc.outcome=='failure' }}; then FAILED=soc; fi
if ${{ steps.pingpong-3.outcome=='failure' }}; then FAILED=pingpong-3; fi
if ${{ steps.gc-chunk-1.outcome=='failure' }}; then FAILED=gc-chunk-1; fi
KEYS=$(curl -sSf -X POST https://eu.relay.tunshell.com/api/sessions) KEYS=$(curl -sSf -X POST https://eu.relay.tunshell.com/api/sessions)
curl -sSf -X POST -H "Content-Type: application/json" -d "{\"text\": \"**${RUN_TYPE}** ${{ github.head_ref }}\nFailed -> \`${FAILED}\`\nDebug -> \`sh <(curl -sSf https://lets.tunshell.com/init.sh) L $(echo $KEYS | jq -r .peer2_key) \${TUNSHELL_SECRET} eu.relay.tunshell.com\`\"}" https://beehive.ethswarm.org/hooks/${{ secrets.WEBHOOK_KEY }} curl -sSf -X POST -H "Content-Type: application/json" -d "{\"text\": \"**${RUN_TYPE}** ${{ github.head_ref }}\nFailed -> \`${FAILED}\`\nDebug -> \`sh <(curl -sSf https://lets.tunshell.com/init.sh) L $(echo $KEYS | jq -r .peer2_key) \${TUNSHELL_SECRET} eu.relay.tunshell.com\`\"}" https://beehive.ethswarm.org/hooks/${{ secrets.WEBHOOK_KEY }}
echo "Failed test: ${FAILED}" echo "Failed test: ${FAILED}"
......
...@@ -12,10 +12,11 @@ import ( ...@@ -12,10 +12,11 @@ import (
) )
type reserveStateResponse struct { type reserveStateResponse struct {
Radius uint8 `json:"radius"` Radius uint8 `json:"radius"`
Available int64 `json:"available"` StorageRadius uint8 `json:"storageRadius"`
Outer *bigint.BigInt `json:"outer"` // lower value limit for outer layer = the further half of chunks Available int64 `json:"available"`
Inner *bigint.BigInt `json:"inner"` Outer *bigint.BigInt `json:"outer"` // lower value limit for outer layer = the further half of chunks
Inner *bigint.BigInt `json:"inner"`
} }
type chainStateResponse struct { type chainStateResponse struct {
......
...@@ -38,6 +38,19 @@ var ( ...@@ -38,6 +38,19 @@ var (
// gcBatchSize limits the number of chunks in a single // gcBatchSize limits the number of chunks in a single
// transaction on garbage collection. // transaction on garbage collection.
gcBatchSize uint64 = 2000 gcBatchSize uint64 = 2000
// reserveCollectionRatio is the ratio of the cache to evict from
// the reserve every time it hits the limit. If the cache size is
// 1000 chunks then we will evict 500 chunks from the reserve, this is
// not to overwhelm the cache with too many chunks which it will flush
// anyway.
reserveCollectionRatio = 0.5
// reserveEvictionBatch limits the number of chunks collected in
// a single reserve eviction run.
reserveEvictionBatch uint64 = 200
// maxPurgeablePercentageOfReserve is a ceiling of size of the reserve
// to evict in case the cache size is bigger than the reserve
maxPurgeablePercentageOfReserve = 0.1
) )
// collectGarbageWorker is a long running function that waits for // collectGarbageWorker is a long running function that waits for
...@@ -104,8 +117,11 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { ...@@ -104,8 +117,11 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
if err != nil { if err != nil {
return 0, true, err return 0, true, err
} }
if gcSize == target {
return 0, true, nil
}
db.metrics.GCSize.Set(float64(gcSize)) db.metrics.GCSize.Set(float64(gcSize))
defer func() { db.logger.Debugf("gc collected %d, target %d, startSize %d", collectedCount, target, gcSize) }()
done = true done = true
first := true first := true
start := time.Now() start := time.Now()
...@@ -208,6 +224,15 @@ func (db *DB) gcTarget() (target uint64) { ...@@ -208,6 +224,15 @@ func (db *DB) gcTarget() (target uint64) {
return uint64(float64(db.cacheCapacity) * gcTargetRatio) return uint64(float64(db.cacheCapacity) * gcTargetRatio)
} }
func (db *DB) reserveEvictionTarget() (target uint64) {
targetCache := db.reserveCapacity - uint64(float64(db.cacheCapacity)*reserveCollectionRatio)
targetCeiling := db.reserveCapacity - uint64(float64(db.reserveCapacity)*maxPurgeablePercentageOfReserve)
if targetCeiling > targetCache {
return targetCeiling
}
return targetCache
}
// triggerGarbageCollection signals collectGarbageWorker // triggerGarbageCollection signals collectGarbageWorker
// to call collectGarbage. // to call collectGarbage.
func (db *DB) triggerGarbageCollection() { func (db *DB) triggerGarbageCollection() {
...@@ -218,6 +243,16 @@ func (db *DB) triggerGarbageCollection() { ...@@ -218,6 +243,16 @@ func (db *DB) triggerGarbageCollection() {
} }
} }
// triggerGarbageCollection signals collectGarbageWorker
// to call collectGarbage.
func (db *DB) triggerReserveEviction() {
select {
case db.reserveEvictionTrigger <- struct{}{}:
case <-db.close:
default:
}
}
// incGCSizeInBatch changes gcSize field value // incGCSizeInBatch changes gcSize field value
// by change which can be negative. This function // by change which can be negative. This function
// must be called under batchMu lock. // must be called under batchMu lock.
...@@ -243,6 +278,7 @@ func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) { ...@@ -243,6 +278,7 @@ func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) {
} }
newSize = gcSize - c newSize = gcSize - c
} }
db.logger.Debugf("inc gc size %d change %d", gcSize, change)
db.gcSize.PutInBatch(batch, newSize) db.gcSize.PutInBatch(batch, newSize)
db.metrics.GCSize.Set(float64(newSize)) db.metrics.GCSize.Set(float64(newSize))
...@@ -253,6 +289,122 @@ func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) { ...@@ -253,6 +289,122 @@ func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) {
return nil return nil
} }
// incReserveSizeInBatch changes reserveSize field value
// by change which can be negative. This function
// must be called under batchMu lock.
func (db *DB) incReserveSizeInBatch(batch *leveldb.Batch, change int64) (err error) {
if change == 0 {
return nil
}
reserveSize, err := db.reserveSize.Get()
if err != nil && !errors.Is(err, leveldb.ErrNotFound) {
return err
}
var newSize uint64
if change > 0 {
newSize = reserveSize + uint64(change)
} else {
// 'change' is an int64 and is negative
// a conversion is needed with correct sign
c := uint64(-change)
if c > reserveSize {
// protect uint64 undeflow
return nil
}
newSize = reserveSize - c
}
db.logger.Debugf("inc reserve size in batch %d old %d change %d", newSize, reserveSize, change)
db.reserveSize.PutInBatch(batch, newSize)
db.metrics.ReserveSize.Set(float64(newSize))
// trigger garbage collection if we reached the capacity
if newSize >= db.reserveCapacity {
db.triggerReserveEviction()
}
return nil
}
func (db *DB) reserveEvictionWorker() {
defer close(db.reserveEvictionWorkerDone)
for {
select {
case <-db.reserveEvictionTrigger:
evictedCount, done, err := db.evictReserve()
if err != nil {
db.logger.Errorf("localstore: evict reserve: %v", err)
}
if !done {
db.triggerReserveEviction()
}
if testHookEviction != nil {
testHookEviction(evictedCount)
}
case <-db.close:
return
}
}
}
func (db *DB) evictReserve() (totalEvicted uint64, done bool, err error) {
var target uint64
db.metrics.EvictReserveCounter.Inc()
defer func(start time.Time) {
if err != nil {
db.metrics.EvictReserveErrorCounter.Inc()
}
totalTimeMetric(db.metrics.TotalTimeEvictReserve, start)
}(time.Now())
target = db.reserveEvictionTarget()
db.batchMu.Lock()
defer db.batchMu.Unlock()
reserveSizeStart, err := db.reserveSize.Get()
if err != nil {
return 0, false, err
}
if reserveSizeStart == target {
return 0, true, nil
}
// if we dont get any entries at all then there's no use
// of triggering subsequent runs in case we're not done
totalCallbacks := 0
err = db.unreserveFunc(func(batchID []byte, radius uint8) (bool, error) {
totalCallbacks++
e, err := db.UnreserveBatch(batchID, radius)
if err != nil {
return true, err
}
totalEvicted += e
if reserveSizeStart-totalEvicted <= target {
done = true
return true, nil
}
if totalEvicted >= reserveEvictionBatch {
// stop collecting when we reach the eviction
// batch size so that we can avoid lock contention
// on localstore.
return true, nil
}
return false, nil
})
if err != nil {
return 0, false, err
}
if totalCallbacks == 0 {
// if we did not get any items from the batchstore
// it means there's no point of trigerring a subsequent
// round
done = true
}
db.logger.Debugf("reserve evicted %d done %t size %d callbacks %d", totalEvicted, done, reserveSizeStart, totalCallbacks)
return totalEvicted, done, nil
}
// testHookCollectGarbage is a hook that can provide // testHookCollectGarbage is a hook that can provide
// information when a garbage collection run is done // information when a garbage collection run is done
// and how many items it removed. // and how many items it removed.
...@@ -264,3 +416,5 @@ var testHookCollectGarbage func(collectedCount uint64) ...@@ -264,3 +416,5 @@ var testHookCollectGarbage func(collectedCount uint64)
var testHookGCIteratorDone func() var testHookGCIteratorDone func()
var withinRadiusFn func(*DB, shed.Item) bool var withinRadiusFn func(*DB, shed.Item) bool
var testHookEviction func(count uint64)
...@@ -28,6 +28,7 @@ import ( ...@@ -28,6 +28,7 @@ import (
"time" "time"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
...@@ -279,7 +280,8 @@ func TestGCAfterPin(t *testing.T) { ...@@ -279,7 +280,8 @@ func TestGCAfterPin(t *testing.T) {
chunkCount := 50 chunkCount := 50
db := newTestDB(t, &Options{ db := newTestDB(t, &Options{
Capacity: 100, Capacity: 100,
ReserveCapacity: 100,
}) })
pinAddrs := make([]swarm.Address, 0) pinAddrs := make([]swarm.Address, 0)
...@@ -596,7 +598,8 @@ func TestSetTestHookCollectGarbage(t *testing.T) { ...@@ -596,7 +598,8 @@ func TestSetTestHookCollectGarbage(t *testing.T) {
func TestPinAfterMultiGC(t *testing.T) { func TestPinAfterMultiGC(t *testing.T) {
t.Cleanup(setWithinRadiusFunc(func(_ *DB, _ shed.Item) bool { return false })) t.Cleanup(setWithinRadiusFunc(func(_ *DB, _ shed.Item) bool { return false }))
db := newTestDB(t, &Options{ db := newTestDB(t, &Options{
Capacity: 10, Capacity: 10,
ReserveCapacity: 10,
}) })
pinnedChunks := make([]swarm.Address, 0) pinnedChunks := make([]swarm.Address, 0)
...@@ -715,7 +718,8 @@ func TestPinSyncAndAccessPutSetChunkMultipleTimes(t *testing.T) { ...@@ -715,7 +718,8 @@ func TestPinSyncAndAccessPutSetChunkMultipleTimes(t *testing.T) {
} }
})) }))
db := newTestDB(t, &Options{ db := newTestDB(t, &Options{
Capacity: 10, Capacity: 10,
ReserveCapacity: 100,
}) })
closed = db.close closed = db.close
...@@ -959,9 +963,202 @@ func setTestHookGCIteratorDone(h func()) (reset func()) { ...@@ -959,9 +963,202 @@ func setTestHookGCIteratorDone(h func()) (reset func()) {
func unreserveChunkBatch(t *testing.T, db *DB, radius uint8, chs ...swarm.Chunk) { func unreserveChunkBatch(t *testing.T, db *DB, radius uint8, chs ...swarm.Chunk) {
t.Helper() t.Helper()
for _, ch := range chs { for _, ch := range chs {
err := db.UnreserveBatch(ch.Stamp().BatchID(), radius) _, err := db.UnreserveBatch(ch.Stamp().BatchID(), radius)
if err != nil {
t.Fatal(err)
}
}
}
func setTestHookEviction(h func(count uint64)) (reset func()) {
current := testHookEviction
reset = func() { testHookEviction = current }
testHookEviction = h
return reset
}
// TestReserveEvictionWorker tests that the reserve
// eviction works correctly once the reserve hits the
// capacity. The necessary items are then moved into the
// gc index.
func TestReserveEvictionWorker(t *testing.T) {
var (
chunkCount = 10
batchIDs [][]byte
db *DB
addrs []swarm.Address
closed chan struct{}
mtx sync.Mutex
)
testHookEvictionChan := make(chan uint64)
t.Cleanup(setTestHookEviction(func(count uint64) {
if count == 0 {
return
}
select {
case testHookEvictionChan <- count:
case <-closed:
}
}))
t.Cleanup(setWithinRadiusFunc(func(_ *DB, _ shed.Item) bool { return true }))
unres := func(f postage.UnreserveIteratorFn) error {
mtx.Lock()
defer mtx.Unlock()
for i := 0; i < len(batchIDs); i++ {
// pop an element from batchIDs, call the Unreserve
item := batchIDs[i]
// here we mock the behavior of the batchstore
// that would call the localstore back with the
// batch IDs and the radiuses from the FIFO queue
stop, err := f(item, 2)
if err != nil {
return err
}
if stop {
return nil
}
stop, err = f(item, 4)
if err != nil {
return err
}
if stop {
return nil
}
}
batchIDs = nil
return nil
}
testHookCollectGarbageChan := make(chan uint64)
t.Cleanup(setTestHookCollectGarbage(func(collectedCount uint64) {
// don't trigger if we haven't collected anything - this may
// result in a race condition when we inspect the gcsize below,
// causing the database to shut down while the cleanup to happen
// before the correct signal has been communicated here.
if collectedCount == 0 {
return
}
select {
case testHookCollectGarbageChan <- collectedCount:
case <-db.close:
}
}))
db = newTestDB(t, &Options{
Capacity: 10,
ReserveCapacity: 10,
UnreserveFunc: unres,
})
// insert 10 chunks that fall into the reserve, then
// expect first one to be evicted
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
mtx.Lock()
addrs = append(addrs, ch.Address())
batchIDs = append(batchIDs, ch.Stamp().BatchID())
mtx.Unlock()
}
evictTarget := db.reserveEvictionTarget()
for {
select {
case <-testHookEvictionChan:
case <-time.After(10 * time.Second):
t.Fatal("eviction timeout")
}
reserveSize, err := db.reserveSize.Get()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if reserveSize == evictTarget {
break
}
} }
t.Run("pull index count", newItemsCountTest(db.pullIndex, chunkCount))
t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, chunkCount))
t.Run("postage radius count", newItemsCountTest(db.postageRadiusIndex, 1))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
t.Run("gc size", newIndexGCSizeTest(db))
t.Run("all chunks should be accessible", func(t *testing.T) {
for _, a := range addrs {
if _, err := db.Get(context.Background(), storage.ModeGetRequest, a); err != nil {
t.Errorf("got error %v, want none", err)
}
}
})
for i := 0; i < chunkCount-1; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 3).WithBatch(2, 3, 2, false)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
mtx.Lock()
addrs = append(addrs, ch.Address())
batchIDs = append(batchIDs, ch.Stamp().BatchID())
mtx.Unlock()
}
for {
select {
case <-testHookEvictionChan:
case <-time.After(10 * time.Second):
t.Fatal("eviction timeout")
}
reserveSize, err := db.reserveSize.Get()
if err != nil {
t.Fatal(err)
}
if reserveSize == evictTarget {
break
}
}
gcTarget := db.gcTarget()
for {
select {
case <-testHookCollectGarbageChan:
case <-time.After(10 * time.Second):
t.Error("collect garbage timeout")
}
gcSize, err := db.gcSize.Get()
if err != nil {
t.Fatal(err)
}
if gcSize == gcTarget {
break
}
}
t.Run("9/10 of the first chunks should be accessible", func(t *testing.T) {
has := 0
for _, a := range addrs[:10] {
if _, err := db.Get(context.Background(), storage.ModeGetRequest, a); err == nil {
has++
}
}
if has != 9 {
t.Errorf("got %d chunks, want 9", has)
}
})
} }
...@@ -106,13 +106,24 @@ type DB struct { ...@@ -106,13 +106,24 @@ type DB struct {
// field that stores number of intems in gc index // field that stores number of intems in gc index
gcSize shed.Uint64Field gcSize shed.Uint64Field
// field that stores the size of the reserve
reserveSize shed.Uint64Field
// garbage collection is triggered when gcSize exceeds // garbage collection is triggered when gcSize exceeds
// the cacheCapacity value // the cacheCapacity value
cacheCapacity uint64 cacheCapacity uint64
// the size of the reserve in chunks
reserveCapacity uint64
unreserveFunc func(postage.UnreserveIteratorFn) error
// triggers garbage collection event loop // triggers garbage collection event loop
collectGarbageTrigger chan struct{} collectGarbageTrigger chan struct{}
// triggers reserve eviction event loop
reserveEvictionTrigger chan struct{}
// a buffered channel acting as a semaphore // a buffered channel acting as a semaphore
// to limit the maximal number of goroutines // to limit the maximal number of goroutines
// created by Getters to call updateGC function // created by Getters to call updateGC function
...@@ -142,7 +153,8 @@ type DB struct { ...@@ -142,7 +153,8 @@ type DB struct {
// protect Close method from exiting before // protect Close method from exiting before
// garbage collection and gc size write workers // garbage collection and gc size write workers
// are done // are done
collectGarbageWorkerDone chan struct{} collectGarbageWorkerDone chan struct{}
reserveEvictionWorkerDone chan struct{}
// wait for all subscriptions to finish before closing // wait for all subscriptions to finish before closing
// underlaying leveldb to prevent possible panics from // underlaying leveldb to prevent possible panics from
...@@ -159,6 +171,11 @@ type Options struct { ...@@ -159,6 +171,11 @@ type Options struct {
// Capacity is a limit that triggers garbage collection when // Capacity is a limit that triggers garbage collection when
// number of items in gcIndex equals or exceeds it. // number of items in gcIndex equals or exceeds it.
Capacity uint64 Capacity uint64
// ReserveCapacity is the capacity of the reserve.
ReserveCapacity uint64
// UnreserveFunc is an iterator needed to facilitate reserve
// eviction once ReserveCapacity is reached.
UnreserveFunc func(postage.UnreserveIteratorFn) error
// OpenFilesLimit defines the upper bound of open files that the // OpenFilesLimit defines the upper bound of open files that the
// the localstore should maintain at any point of time. It is // the localstore should maintain at any point of time. It is
// passed on to the shed constructor. // passed on to the shed constructor.
...@@ -184,24 +201,29 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger ...@@ -184,24 +201,29 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger
if o == nil { if o == nil {
// default options // default options
o = &Options{ o = &Options{
Capacity: defaultCacheCapacity, Capacity: defaultCacheCapacity,
ReserveCapacity: uint64(batchstore.Capacity),
} }
} }
db = &DB{ db = &DB{
stateStore: ss, stateStore: ss,
cacheCapacity: o.Capacity, cacheCapacity: o.Capacity,
baseKey: baseKey, reserveCapacity: o.ReserveCapacity,
tags: o.Tags, unreserveFunc: o.UnreserveFunc,
baseKey: baseKey,
tags: o.Tags,
// channel collectGarbageTrigger // channel collectGarbageTrigger
// needs to be buffered with the size of 1 // needs to be buffered with the size of 1
// to signal another event if it // to signal another event if it
// is triggered during already running function // is triggered during already running function
collectGarbageTrigger: make(chan struct{}, 1), collectGarbageTrigger: make(chan struct{}, 1),
close: make(chan struct{}), reserveEvictionTrigger: make(chan struct{}, 1),
collectGarbageWorkerDone: make(chan struct{}), close: make(chan struct{}),
metrics: newMetrics(), collectGarbageWorkerDone: make(chan struct{}),
logger: logger, reserveEvictionWorkerDone: make(chan struct{}),
metrics: newMetrics(),
logger: logger,
} }
if db.cacheCapacity == 0 { if db.cacheCapacity == 0 {
db.cacheCapacity = defaultCacheCapacity db.cacheCapacity = defaultCacheCapacity
...@@ -264,6 +286,12 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger ...@@ -264,6 +286,12 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger
return nil, err return nil, err
} }
// reserve size
db.reserveSize, err = db.shed.NewUint64Field("reserve-size")
if err != nil {
return nil, err
}
// Index storing actual chunk address, data and bin id. // Index storing actual chunk address, data and bin id.
headerSize := 16 + postage.StampSize headerSize := 16 + postage.StampSize
db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Data", shed.IndexFuncs{ db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Data", shed.IndexFuncs{
...@@ -523,6 +551,7 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger ...@@ -523,6 +551,7 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger
// start garbage collection worker // start garbage collection worker
go db.collectGarbageWorker() go db.collectGarbageWorker()
go db.reserveEvictionWorker()
return db, nil return db, nil
} }
...@@ -538,6 +567,7 @@ func (db *DB) Close() (err error) { ...@@ -538,6 +567,7 @@ func (db *DB) Close() (err error) {
// wait for gc worker to // wait for gc worker to
// return before closing the shed // return before closing the shed
<-db.collectGarbageWorkerDone <-db.collectGarbageWorkerDone
<-db.reserveEvictionWorkerDone
close(done) close(done)
}() }()
select { select {
......
...@@ -59,6 +59,11 @@ type metrics struct { ...@@ -59,6 +59,11 @@ type metrics struct {
GCSize prometheus.Gauge GCSize prometheus.Gauge
GCStoreTimeStamps prometheus.Gauge GCStoreTimeStamps prometheus.Gauge
GCStoreAccessTimeStamps prometheus.Gauge GCStoreAccessTimeStamps prometheus.Gauge
ReserveSize prometheus.Gauge
EvictReserveCounter prometheus.Counter
EvictReserveErrorCounter prometheus.Counter
TotalTimeEvictReserve prometheus.Counter
} }
func newMetrics() metrics { func newMetrics() metrics {
...@@ -343,6 +348,30 @@ func newMetrics() metrics { ...@@ -343,6 +348,30 @@ func newMetrics() metrics {
Name: "gc_access_time_stamp", Name: "gc_access_time_stamp",
Help: "Access timestamp in Garbage collection iteration.", Help: "Access timestamp in Garbage collection iteration.",
}), }),
ReserveSize: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "reserve_size",
Help: "Number of elements in reserve.",
}),
EvictReserveCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "evict_reserve_count",
Help: "number of times the evict reserve worker was invoked",
}),
EvictReserveErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "evict_reserve_err_count",
Help: "number of times evict reserve got an error",
}),
TotalTimeEvictReserve: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "evict_reserve_total_time",
Help: "total time spent evicting from reserve",
}),
} }
} }
......
...@@ -219,6 +219,19 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she ...@@ -219,6 +219,19 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she
if err != nil { if err != nil {
return false, 0, err return false, 0, err
} }
radius, err := db.postageRadiusIndex.Get(item)
if err != nil {
if !errors.Is(err, leveldb.ErrNotFound) {
return false, 0, err
}
} else {
if db.po(swarm.NewAddress(item.Address)) >= radius.Radius {
if err := db.incReserveSizeInBatch(batch, -1); err != nil {
return false, 0, err
}
}
}
} }
item.StoreTimestamp = now() item.StoreTimestamp = now()
...@@ -353,6 +366,18 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I ...@@ -353,6 +366,18 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I
if err != nil { if err != nil {
return false, 0, err return false, 0, err
} }
radius, err := db.postageRadiusIndex.Get(item)
if err != nil {
if !errors.Is(err, leveldb.ErrNotFound) {
return false, 0, err
}
} else {
if db.po(swarm.NewAddress(item.Address)) >= radius.Radius {
if err := db.incReserveSizeInBatch(batch, -1); err != nil {
return false, 0, err
}
}
}
} }
item.StoreTimestamp = now() item.StoreTimestamp = now()
...@@ -393,15 +418,12 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I ...@@ -393,15 +418,12 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I
// preserveOrCache is a helper function used to add chunks to either a pinned reserve or gc cache // preserveOrCache is a helper function used to add chunks to either a pinned reserve or gc cache
// (the retrieval access index and the gc index) // (the retrieval access index and the gc index)
func (db *DB) preserveOrCache(batch *leveldb.Batch, item shed.Item, forcePin, forceCache bool) (gcSizeChange int64, err error) { func (db *DB) preserveOrCache(batch *leveldb.Batch, item shed.Item, forcePin, forceCache bool) (gcSizeChange int64, err error) {
// item needs to be populated with Radius
item2, err := db.postageRadiusIndex.Get(item)
if err != nil {
// if there's an error, assume the chunk needs to be GCd
forceCache = true
} else {
item.Radius = item2.Radius
}
if !forceCache && (withinRadiusFn(db, item) || forcePin) { if !forceCache && (withinRadiusFn(db, item) || forcePin) {
if !forcePin {
if err := db.incReserveSizeInBatch(batch, 1); err != nil {
return 0, err
}
}
return db.setPin(batch, item) return db.setPin(batch, item)
} }
......
...@@ -19,7 +19,6 @@ package localstore ...@@ -19,7 +19,6 @@ package localstore
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"time" "time"
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
...@@ -193,12 +192,6 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange in ...@@ -193,12 +192,6 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange in
} else { } else {
item.AccessTimestamp = i1.AccessTimestamp item.AccessTimestamp = i1.AccessTimestamp
} }
// item needs to be populated with Radius
item2, err := db.postageRadiusIndex.Get(item)
if err != nil {
return 0, fmt.Errorf("postage chunks index: %w", err)
}
item.Radius = item2.Radius
return db.preserveOrCache(batch, item, false, false) return db.preserveOrCache(batch, item, false, false)
} }
......
...@@ -71,7 +71,7 @@ func TestModeSetRemove_WithSync(t *testing.T) { ...@@ -71,7 +71,7 @@ func TestModeSetRemove_WithSync(t *testing.T) {
var chs []swarm.Chunk var chs []swarm.Chunk
for i := 0; i < tc.count; i++ { for i := 0; i < tc.count; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false) ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
err := db.UnreserveBatch(ch.Stamp().BatchID(), 2) _, err := db.UnreserveBatch(ch.Stamp().BatchID(), 2)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -16,29 +16,26 @@ import ( ...@@ -16,29 +16,26 @@ import (
// UnreserveBatch atomically unpins chunks of a batch in proximity order upto and including po. // UnreserveBatch atomically unpins chunks of a batch in proximity order upto and including po.
// Unpinning will result in all chunks with pincounter 0 to be put in the gc index // Unpinning will result in all chunks with pincounter 0 to be put in the gc index
// so if a chunk was only pinned by the reserve, unreserving it will make it gc-able. // so if a chunk was only pinned by the reserve, unreserving it will make it gc-able.
func (db *DB) UnreserveBatch(id []byte, radius uint8) error { func (db *DB) UnreserveBatch(id []byte, radius uint8) (evicted uint64, err error) {
db.batchMu.Lock()
defer db.batchMu.Unlock()
var ( var (
item = shed.Item{ item = shed.Item{
BatchID: id, BatchID: id,
} }
batch = new(leveldb.Batch) batch = new(leveldb.Batch)
oldRadius = radius
) )
i, err := db.postageRadiusIndex.Get(item) i, err := db.postageRadiusIndex.Get(item)
if err != nil { if err != nil {
if !errors.Is(err, leveldb.ErrNotFound) { if !errors.Is(err, leveldb.ErrNotFound) {
return err return 0, err
}
item.Radius = radius
if err := db.postageRadiusIndex.PutInBatch(batch, item); err != nil {
return err
} }
return db.shed.WriteBatch(batch) } else {
oldRadius = i.Radius
} }
oldRadius := i.Radius var (
var gcSizeChange int64 // number to add or subtract from gcSize gcSizeChange int64 // number to add or subtract from gcSize and reserveSize
reserveSizeChange uint64
)
unpin := func(item shed.Item) (stop bool, err error) { unpin := func(item shed.Item) (stop bool, err error) {
addr := swarm.NewAddress(item.Address) addr := swarm.NewAddress(item.Address)
c, err := db.setUnpin(batch, addr) c, err := db.setUnpin(batch, addr)
...@@ -50,6 +47,13 @@ func (db *DB) UnreserveBatch(id []byte, radius uint8) error { ...@@ -50,6 +47,13 @@ func (db *DB) UnreserveBatch(id []byte, radius uint8) error {
// a dirty shutdown // a dirty shutdown
db.logger.Tracef("unreserve set unpin chunk %s: %v", addr.String(), err) db.logger.Tracef("unreserve set unpin chunk %s: %v", addr.String(), err)
} }
} else {
// we need to do this because a user might pin a chunk on top of
// the reserve pinning. when we unpin due to an unreserve call, then
// we should logically deduct the chunk anyway from the reserve size
// otherwise the reserve size leaks, since c returned from setUnpin
// will be zero.
reserveSizeChange++
} }
gcSizeChange += c gcSizeChange += c
...@@ -60,38 +64,60 @@ func (db *DB) UnreserveBatch(id []byte, radius uint8) error { ...@@ -60,38 +64,60 @@ func (db *DB) UnreserveBatch(id []byte, radius uint8) error {
for bin := oldRadius; bin < radius; bin++ { for bin := oldRadius; bin < radius; bin++ {
err := db.postageChunksIndex.Iterate(unpin, &shed.IterateOptions{Prefix: append(id, bin)}) err := db.postageChunksIndex.Iterate(unpin, &shed.IterateOptions{Prefix: append(id, bin)})
if err != nil { if err != nil {
return err return 0, err
} }
// adjust gcSize // adjust gcSize
if err := db.incGCSizeInBatch(batch, gcSizeChange); err != nil { if err := db.incGCSizeInBatch(batch, gcSizeChange); err != nil {
return err return 0, err
} }
item.Radius = bin item.Radius = bin
if err := db.postageRadiusIndex.PutInBatch(batch, item); err != nil { if err := db.postageRadiusIndex.PutInBatch(batch, item); err != nil {
return err return 0, err
} }
if bin == swarm.MaxPO { if bin == swarm.MaxPO {
if err := db.postageRadiusIndex.DeleteInBatch(batch, item); err != nil { if err := db.postageRadiusIndex.DeleteInBatch(batch, item); err != nil {
return err return 0, err
} }
} }
if err := db.shed.WriteBatch(batch); err != nil { if err := db.shed.WriteBatch(batch); err != nil {
return err return 0, err
} }
db.logger.Debugf("unreserveBatch gc change %d reserve size change %d", gcSizeChange, reserveSizeChange)
batch = new(leveldb.Batch) batch = new(leveldb.Batch)
gcSizeChange = 0 gcSizeChange = 0
} }
if radius != swarm.MaxPO+1 {
item.Radius = radius
if err := db.postageRadiusIndex.PutInBatch(batch, item); err != nil {
return 0, err
}
if err := db.shed.WriteBatch(batch); err != nil {
return 0, err
}
}
gcSize, err := db.gcSize.Get() gcSize, err := db.gcSize.Get()
if err != nil && !errors.Is(err, leveldb.ErrNotFound) { if err != nil && !errors.Is(err, leveldb.ErrNotFound) {
return err return 0, err
}
if reserveSizeChange > 0 {
batch = new(leveldb.Batch)
if err := db.incReserveSizeInBatch(batch, -int64(reserveSizeChange)); err != nil {
return 0, err
}
if err := db.shed.WriteBatch(batch); err != nil {
return 0, err
}
} }
// trigger garbage collection if we reached the capacity // trigger garbage collection if we reached the capacity
if gcSize >= db.cacheCapacity { if gcSize >= db.cacheCapacity {
db.triggerGarbageCollection() db.triggerGarbageCollection()
} }
return nil return reserveSizeChange, nil
} }
func withinRadius(db *DB, item shed.Item) bool { func withinRadius(db *DB, item shed.Item) bool {
......
This diff is collapsed.
...@@ -64,6 +64,7 @@ import ( ...@@ -64,6 +64,7 @@ import (
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/steward" "github.com/ethersphere/bee/pkg/steward"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/kademlia" "github.com/ethersphere/bee/pkg/topology/kademlia"
...@@ -349,6 +350,17 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo ...@@ -349,6 +350,17 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
b.p2pService = p2ps b.p2pService = p2ps
b.p2pHalter = p2ps b.p2pHalter = p2ps
var unreserveFn func([]byte, uint8) (uint64, error)
var evictFn = func(b []byte) error {
_, err := unreserveFn(b, swarm.MaxPO+1)
return err
}
batchStore, err := batchstore.New(stateStore, evictFn, logger)
if err != nil {
return nil, fmt.Errorf("batchstore: %w", err)
}
// localstore depends on batchstore // localstore depends on batchstore
var path string var path string
...@@ -358,6 +370,8 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo ...@@ -358,6 +370,8 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
} }
lo := &localstore.Options{ lo := &localstore.Options{
Capacity: o.CacheCapacity, Capacity: o.CacheCapacity,
ReserveCapacity: uint64(batchstore.Capacity),
UnreserveFunc: batchStore.Unreserve,
OpenFilesLimit: o.DBOpenFilesLimit, OpenFilesLimit: o.DBOpenFilesLimit,
BlockCacheCapacity: o.DBBlockCacheCapacity, BlockCacheCapacity: o.DBBlockCacheCapacity,
WriteBufferSize: o.DBWriteBufferSize, WriteBufferSize: o.DBWriteBufferSize,
...@@ -369,11 +383,8 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo ...@@ -369,11 +383,8 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
return nil, fmt.Errorf("localstore: %w", err) return nil, fmt.Errorf("localstore: %w", err)
} }
b.localstoreCloser = storer b.localstoreCloser = storer
unreserveFn = storer.UnreserveBatch
batchStore, err := batchstore.New(stateStore, storer.UnreserveBatch)
if err != nil {
return nil, fmt.Errorf("batchstore: %w", err)
}
validStamp := postage.ValidStamp(batchStore) validStamp := postage.ValidStamp(batchStore)
post, err := postage.NewService(stateStore, batchStore, chainID) post, err := postage.NewService(stateStore, batchStore, chainID)
if err != nil { if err != nil {
......
...@@ -6,7 +6,6 @@ package batchstore ...@@ -6,7 +6,6 @@ package batchstore
import ( import (
"fmt" "fmt"
"math/big"
"github.com/ethersphere/bee/pkg/postage" "github.com/ethersphere/bee/pkg/postage"
) )
...@@ -20,24 +19,11 @@ var BatchKey = batchKey ...@@ -20,24 +19,11 @@ var BatchKey = batchKey
// power of 2 function // power of 2 function
var Exp2 = exp2 var Exp2 = exp2
// iterates through all batches
func IterateAll(bs postage.Storer, f func(b *postage.Batch) (bool, error)) error {
s := bs.(*store)
return s.store.Iterate(batchKeyPrefix, func(key []byte, _ []byte) (bool, error) {
b, err := s.Get(key[len(key)-32:])
if err != nil {
return true, err
}
return f(b)
})
}
// GetReserve extracts the inner limit and depth of reserve
func GetReserve(si postage.Storer) (*big.Int, uint8) {
s, _ := si.(*store)
return s.rs.Inner, s.rs.Radius
}
func (s *store) String() string { func (s *store) String() string {
return fmt.Sprintf("inner=%d,outer=%d", s.rs.Inner.Uint64(), s.rs.Outer.Uint64()) return fmt.Sprintf("inner=%d,outer=%d", s.rs.Inner.Uint64(), s.rs.Outer.Uint64())
} }
func SetUnreserveFunc(s postage.Storer, fn func([]byte, uint8) error) {
st := s.(*store)
st.unreserveFn = fn
}
...@@ -140,7 +140,9 @@ func (bs *BatchStore) GetReserveState() *postage.ReserveState { ...@@ -140,7 +140,9 @@ func (bs *BatchStore) GetReserveState() *postage.ReserveState {
} }
return rs return rs
} }
func (bs *BatchStore) Unreserve(_ postage.UnreserveIteratorFn) error {
panic("not implemented")
}
func (bs *BatchStore) SetRadiusSetter(r postage.RadiusSetter) { func (bs *BatchStore) SetRadiusSetter(r postage.RadiusSetter) {
panic("not implemented") panic("not implemented")
} }
......
...@@ -28,11 +28,14 @@ package batchstore ...@@ -28,11 +28,14 @@ package batchstore
import ( import (
"bytes" "bytes"
"encoding/binary"
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"strings"
"github.com/ethersphere/bee/pkg/postage" "github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
) )
...@@ -54,6 +57,10 @@ type reserveState struct { ...@@ -54,6 +57,10 @@ type reserveState struct {
// it defines the proximity order of chunks which we // it defines the proximity order of chunks which we
// would like to guarantee that all chunks are stored // would like to guarantee that all chunks are stored
Radius uint8 `json:"radius"` Radius uint8 `json:"radius"`
// StorageRadius is the de-facto storage radius tracked
// by monitoring the events communicated to the localstore
// reserve eviction worker.
StorageRadius uint8 `json:"storageRadius"`
// Available capacity of the reserve which can still be used. // Available capacity of the reserve which can still be used.
Available int64 `json:"available"` Available int64 `json:"available"`
Outer *big.Int `json:"outer"` // lower value limit for outer layer = the further half of chunks Outer *big.Int `json:"outer"` // lower value limit for outer layer = the further half of chunks
...@@ -61,9 +68,63 @@ type reserveState struct { ...@@ -61,9 +68,63 @@ type reserveState struct {
} }
// unreserve is called when the batchstore decides not to reserve a batch on a PO // unreserve is called when the batchstore decides not to reserve a batch on a PO
// i.e. chunk of the batch in bins [0 upto PO] (closed interval) are unreserved // i.e. chunk of the batch in bins [0 upto PO] (closed interval) are unreserved.
func (s *store) unreserve(b *postage.Batch, radius uint8) error { // this adds the batch at the mentioned PO to the unreserve fifo queue, that can be
return s.unreserveFunc(b.ID, radius) // dequeued by the localstore once the storage fills up.
func (s *store) unreserve(b []byte, radius uint8) error {
c := s.queueIdx
c++
v := make([]byte, 8)
binary.BigEndian.PutUint64(v, c)
i := &UnreserveItem{BatchID: b, Radius: radius}
if err := s.store.Put(fmt.Sprintf("%s_%s", unreserveQueueKey, string(v)), i); err != nil {
return err
}
if err := s.putQueueCardinality(c); err != nil {
return err
}
s.queueIdx = c
return nil
}
func (s *store) Unreserve(cb postage.UnreserveIteratorFn) error {
var entries []string // entries to clean up
defer func() {
for _, v := range entries {
if err := s.store.Delete(v); err != nil {
s.logger.Errorf("batchstore: unreserve entry delete: %v", err)
return
}
}
}()
return s.store.Iterate(unreserveQueueKey, func(key, val []byte) (bool, error) {
if !strings.HasPrefix(string(key), unreserveQueueKey) {
return true, nil
}
v := &UnreserveItem{}
err := v.UnmarshalBinary(val)
if err != nil {
return true, err
}
stop, err := cb(v.BatchID, v.Radius)
if err != nil {
return true, err
}
s.rsMtx.Lock()
defer s.rsMtx.Unlock()
if s.rs.StorageRadius+1 < v.Radius {
s.rs.StorageRadius = v.Radius - 1
if err = s.store.Put(reserveStateKey, s.rs); err != nil {
return true, err
}
}
entries = append(entries, string(key))
if stop {
return true, nil
}
return false, nil
})
} }
// evictExpired is called when PutChainState is called (and there is 'settlement') // evictExpired is called when PutChainState is called (and there is 'settlement')
...@@ -112,10 +173,11 @@ func (s *store) evictExpired() error { ...@@ -112,10 +173,11 @@ func (s *store) evictExpired() error {
} }
// unreserve batch fully // unreserve batch fully
err = s.unreserve(b, swarm.MaxPO+1) err = s.evictFn(b.ID)
if err != nil { if err != nil {
return true, err return true, err
} }
s.rs.Available += multiplier * exp2(b.Radius-s.rs.Radius-1) s.rs.Available += multiplier * exp2(b.Radius-s.rs.Radius-1)
// if batch has no value then delete it // if batch has no value then delete it
...@@ -250,7 +312,7 @@ func (s *store) update(b *postage.Batch, oldDepth uint8, oldValue *big.Int) erro ...@@ -250,7 +312,7 @@ func (s *store) update(b *postage.Batch, oldDepth uint8, oldValue *big.Int) erro
capacityChange, reserveRadius := s.rs.change(oldValue, newValue, oldDepth, newDepth) capacityChange, reserveRadius := s.rs.change(oldValue, newValue, oldDepth, newDepth)
s.rs.Available += capacityChange s.rs.Available += capacityChange
if err := s.unreserve(b, reserveRadius); err != nil { if err := s.unreserveFn(b.ID, reserveRadius); err != nil {
return err return err
} }
err := s.evictOuter(b) err := s.evictOuter(b)
...@@ -293,7 +355,7 @@ func (s *store) evictOuter(last *postage.Batch) error { ...@@ -293,7 +355,7 @@ func (s *store) evictOuter(last *postage.Batch) error {
// unreserve outer PO of the lowest priority batch until capacity is back to positive // unreserve outer PO of the lowest priority batch until capacity is back to positive
s.rs.Available += exp2(b.Depth - s.rs.Radius - 1) s.rs.Available += exp2(b.Depth - s.rs.Radius - 1)
s.rs.Outer.Set(b.Value) s.rs.Outer.Set(b.Value)
return false, s.unreserve(b, s.rs.Radius) return false, s.unreserveFn(b.ID, s.rs.Radius)
}) })
if err != nil { if err != nil {
return err return err
...@@ -310,6 +372,41 @@ func (s *store) evictOuter(last *postage.Batch) error { ...@@ -310,6 +372,41 @@ func (s *store) evictOuter(last *postage.Batch) error {
return s.store.Put(reserveStateKey, s.rs) return s.store.Put(reserveStateKey, s.rs)
} }
func (s *store) getQueueCardinality() (val uint64, err error) {
err = s.store.Get(ureserveQueueCardinalityKey, &val)
if errors.Is(err, storage.ErrNotFound) {
return 0, nil
}
return val, err
}
func (s *store) putQueueCardinality(val uint64) error {
return s.store.Put(ureserveQueueCardinalityKey, val)
}
type UnreserveItem struct {
BatchID []byte
Radius uint8
}
func (u *UnreserveItem) MarshalBinary() ([]byte, error) {
out := make([]byte, 32+1) // 32 byte batch ID + 1 byte uint8 radius
copy(out, u.BatchID)
out[32] = u.Radius
return out, nil
}
func (u *UnreserveItem) UnmarshalBinary(b []byte) error {
if len(b) != 33 {
return errors.New("invalid unreserve item length")
}
u.BatchID = make([]byte, 32)
copy(u.BatchID, b[:32])
u.Radius = b[32]
return nil
}
// exp2 returns the e-th power of 2 // exp2 returns the e-th power of 2
func exp2(e uint8) int64 { func exp2(e uint8) int64 {
if e == 0 { if e == 0 {
......
This diff is collapsed.
...@@ -10,34 +10,44 @@ import ( ...@@ -10,34 +10,44 @@ import (
"fmt" "fmt"
"math/big" "math/big"
"strings" "strings"
"sync"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/postage" "github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
) )
const ( const (
batchKeyPrefix = "batchstore_batch_" batchKeyPrefix = "batchstore_batch_"
valueKeyPrefix = "batchstore_value_" valueKeyPrefix = "batchstore_value_"
chainStateKey = "batchstore_chainstate" chainStateKey = "batchstore_chainstate"
reserveStateKey = "batchstore_reservestate" reserveStateKey = "batchstore_reservestate"
unreserveQueueKey = "batchstore_unreserve_queue_"
ureserveQueueCardinalityKey = "batchstore_queue_cardinality"
) )
type unreserveFn func(batchID []byte, radius uint8) error type unreserveFn func(batchID []byte, radius uint8) error
type evictFn func(batchID []byte) error
// store implements postage.Storer // store implements postage.Storer
type store struct { type store struct {
store storage.StateStorer // State store backend to persist batches. store storage.StateStorer // State store backend to persist batches.
cs *postage.ChainState // the chain state cs *postage.ChainState // the chain state
rs *reserveState // the reserve state
unreserveFunc unreserveFn // unreserve function rsMtx sync.Mutex
metrics metrics // metrics rs *reserveState // the reserve state
unreserveFn unreserveFn // unreserve function
evictFn evictFn // evict function
queueIdx uint64 // unreserve queue cardinality
metrics metrics // metrics
logger logging.Logger
radiusSetter postage.RadiusSetter // setter for radius notifications radiusSetter postage.RadiusSetter // setter for radius notifications
} }
// New constructs a new postage batch store. // New constructs a new postage batch store.
// It initialises both chain state and reserve state from the persistent state store // It initialises both chain state and reserve state from the persistent state store
func New(st storage.StateStorer, unreserveFunc unreserveFn) (postage.Storer, error) { func New(st storage.StateStorer, ev evictFn, logger logging.Logger) (postage.Storer, error) {
cs := &postage.ChainState{} cs := &postage.ChainState{}
err := st.Get(chainStateKey, cs) err := st.Get(chainStateKey, cs)
if err != nil { if err != nil {
...@@ -63,23 +73,33 @@ func New(st storage.StateStorer, unreserveFunc unreserveFn) (postage.Storer, err ...@@ -63,23 +73,33 @@ func New(st storage.StateStorer, unreserveFunc unreserveFn) (postage.Storer, err
Available: Capacity, Available: Capacity,
} }
} }
s := &store{ s := &store{
store: st, store: st,
cs: cs, cs: cs,
rs: rs, rs: rs,
unreserveFunc: unreserveFunc, evictFn: ev,
metrics: newMetrics(), metrics: newMetrics(),
logger: logger,
}
s.unreserveFn = s.unreserve
if s.queueIdx, err = s.getQueueCardinality(); err != nil {
return nil, err
} }
return s, nil return s, nil
} }
func (s *store) GetReserveState() *postage.ReserveState { func (s *store) GetReserveState() *postage.ReserveState {
s.rsMtx.Lock()
defer s.rsMtx.Unlock()
return &postage.ReserveState{ return &postage.ReserveState{
Radius: s.rs.Radius, Radius: s.rs.Radius,
Available: s.rs.Available, StorageRadius: s.rs.StorageRadius,
Outer: new(big.Int).Set(s.rs.Outer), Available: s.rs.Available,
Inner: new(big.Int).Set(s.rs.Inner), Outer: new(big.Int).Set(s.rs.Outer),
Inner: new(big.Int).Set(s.rs.Inner),
} }
} }
...@@ -90,7 +110,15 @@ func (s *store) Get(id []byte) (*postage.Batch, error) { ...@@ -90,7 +110,15 @@ func (s *store) Get(id []byte) (*postage.Batch, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("get batch %s: %w", hex.EncodeToString(id), err) return nil, fmt.Errorf("get batch %s: %w", hex.EncodeToString(id), err)
} }
b.Radius = s.rs.radius(s.rs.tier(b.Value))
s.rsMtx.Lock()
defer s.rsMtx.Unlock()
if s.rs.StorageRadius < s.rs.Radius {
b.Radius = s.rs.StorageRadius
} else {
b.Radius = s.rs.radius(s.rs.tier(b.Value))
}
return b, nil return b, nil
} }
...@@ -114,7 +142,9 @@ func (s *store) Put(b *postage.Batch, value *big.Int, depth uint8) error { ...@@ -114,7 +142,9 @@ func (s *store) Put(b *postage.Batch, value *big.Int, depth uint8) error {
} }
if s.radiusSetter != nil { if s.radiusSetter != nil {
s.rsMtx.Lock()
s.radiusSetter.SetRadius(s.rs.Radius) s.radiusSetter.SetRadius(s.rs.Radius)
s.rsMtx.Unlock()
} }
return s.store.Put(batchKey(b.ID), b) return s.store.Put(batchKey(b.ID), b)
} }
...@@ -150,7 +180,9 @@ func (s *store) PutChainState(cs *postage.ChainState) error { ...@@ -150,7 +180,9 @@ func (s *store) PutChainState(cs *postage.ChainState) error {
// this needs to be improved, since we can miss some calls on // this needs to be improved, since we can miss some calls on
// startup. the same goes for the other call to radiusSetter // startup. the same goes for the other call to radiusSetter
if s.radiusSetter != nil { if s.radiusSetter != nil {
s.rsMtx.Lock()
s.radiusSetter.SetRadius(s.rs.Radius) s.radiusSetter.SetRadius(s.rs.Radius)
s.rsMtx.Unlock()
} }
return s.store.Put(chainStateKey, cs) return s.store.Put(chainStateKey, cs)
......
...@@ -18,13 +18,14 @@ import ( ...@@ -18,13 +18,14 @@ import (
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
) )
func unreserve([]byte, uint8) error { return nil } var noopEvictFn = func([]byte) error { return nil }
func TestBatchStoreGet(t *testing.T) { func TestBatchStoreGet(t *testing.T) {
testBatch := postagetest.MustNewBatch() testBatch := postagetest.MustNewBatch()
key := batchstore.BatchKey(testBatch.ID) key := batchstore.BatchKey(testBatch.ID)
stateStore := mock.NewStateStore() stateStore := mock.NewStateStore()
batchStore, _ := batchstore.New(stateStore, nil) batchStore, _ := batchstore.New(stateStore, nil, logging.New(ioutil.Discard, 0))
stateStorePut(t, stateStore, key, testBatch) stateStorePut(t, stateStore, key, testBatch)
got := batchStoreGetBatch(t, batchStore, testBatch.ID) got := batchStoreGetBatch(t, batchStore, testBatch.ID)
...@@ -36,7 +37,7 @@ func TestBatchStorePut(t *testing.T) { ...@@ -36,7 +37,7 @@ func TestBatchStorePut(t *testing.T) {
key := batchstore.BatchKey(testBatch.ID) key := batchstore.BatchKey(testBatch.ID)
stateStore := mock.NewStateStore() stateStore := mock.NewStateStore()
batchStore, _ := batchstore.New(stateStore, unreserve) batchStore, _ := batchstore.New(stateStore, nil, logging.New(ioutil.Discard, 0))
batchStore.SetRadiusSetter(noopRadiusSetter{}) batchStore.SetRadiusSetter(noopRadiusSetter{})
batchStorePutBatch(t, batchStore, testBatch) batchStorePutBatch(t, batchStore, testBatch)
...@@ -49,7 +50,7 @@ func TestBatchStoreGetChainState(t *testing.T) { ...@@ -49,7 +50,7 @@ func TestBatchStoreGetChainState(t *testing.T) {
testChainState := postagetest.NewChainState() testChainState := postagetest.NewChainState()
stateStore := mock.NewStateStore() stateStore := mock.NewStateStore()
batchStore, _ := batchstore.New(stateStore, nil) batchStore, _ := batchstore.New(stateStore, nil, logging.New(ioutil.Discard, 0))
batchStore.SetRadiusSetter(noopRadiusSetter{}) batchStore.SetRadiusSetter(noopRadiusSetter{})
err := batchStore.PutChainState(testChainState) err := batchStore.PutChainState(testChainState)
...@@ -64,7 +65,7 @@ func TestBatchStorePutChainState(t *testing.T) { ...@@ -64,7 +65,7 @@ func TestBatchStorePutChainState(t *testing.T) {
testChainState := postagetest.NewChainState() testChainState := postagetest.NewChainState()
stateStore := mock.NewStateStore() stateStore := mock.NewStateStore()
batchStore, _ := batchstore.New(stateStore, nil) batchStore, _ := batchstore.New(stateStore, nil, logging.New(ioutil.Discard, 0))
batchStore.SetRadiusSetter(noopRadiusSetter{}) batchStore.SetRadiusSetter(noopRadiusSetter{})
batchStorePutChainState(t, batchStore, testChainState) batchStorePutChainState(t, batchStore, testChainState)
...@@ -89,7 +90,7 @@ func TestBatchStoreReset(t *testing.T) { ...@@ -89,7 +90,7 @@ func TestBatchStoreReset(t *testing.T) {
} }
defer stateStore.Close() defer stateStore.Close()
batchStore, _ := batchstore.New(stateStore, func([]byte, uint8) error { return nil }) batchStore, _ := batchstore.New(stateStore, noopEvictFn, logger)
batchStore.SetRadiusSetter(noopRadiusSetter{}) batchStore.SetRadiusSetter(noopRadiusSetter{})
err = batchStore.Put(testBatch, big.NewInt(15), 8) err = batchStore.Put(testBatch, big.NewInt(15), 8)
if err != nil { if err != nil {
......
...@@ -23,6 +23,8 @@ type EventUpdater interface { ...@@ -23,6 +23,8 @@ type EventUpdater interface {
TransactionEnd() error TransactionEnd() error
} }
type UnreserveIteratorFn func(id []byte, radius uint8) (bool, error)
// Storer represents the persistence layer for batches on the current (highest // Storer represents the persistence layer for batches on the current (highest
// available) block. // available) block.
type Storer interface { type Storer interface {
...@@ -32,6 +34,7 @@ type Storer interface { ...@@ -32,6 +34,7 @@ type Storer interface {
GetChainState() *ChainState GetChainState() *ChainState
GetReserveState() *ReserveState GetReserveState() *ReserveState
SetRadiusSetter(RadiusSetter) SetRadiusSetter(RadiusSetter)
Unreserve(UnreserveIteratorFn) error
Reset() error Reset() error
} }
......
...@@ -7,8 +7,9 @@ package postage ...@@ -7,8 +7,9 @@ package postage
import "math/big" import "math/big"
type ReserveState struct { type ReserveState struct {
Radius uint8 Radius uint8
Available int64 StorageRadius uint8
Outer *big.Int // lower value limit for outer layer = the further half of chunks Available int64
Inner *big.Int Outer *big.Int // lower value limit for outer layer = the further half of chunks
Inner *big.Int
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment