Commit 7459b900 authored by acud's avatar acud Committed by GitHub

Revert badgerDB to levelDB (#113)

* Revert "shed: Replace levelDB with badgerDB (#75)"
parent f3a6d2e6
/dist /dist
/.idea /.idea
/.vscode /.vscode
vendor/*
# Compiled Object files, Static and Dynamic libs (Shared Objects) # Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o *.o
......
...@@ -6,29 +6,32 @@ require ( ...@@ -6,29 +6,32 @@ require (
github.com/btcsuite/btcd v0.20.1-beta github.com/btcsuite/btcd v0.20.1-beta
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/coreos/go-semver v0.3.0 github.com/coreos/go-semver v0.3.0
github.com/dgraph-io/badger/v2 v2.0.3
github.com/ethersphere/bmt v0.1.0 github.com/ethersphere/bmt v0.1.0
github.com/gogo/protobuf v1.3.1 github.com/gogo/protobuf v1.3.1
github.com/gorilla/handlers v1.4.2 github.com/gorilla/handlers v1.4.2
github.com/gorilla/mux v1.7.4 github.com/gorilla/mux v1.7.3
github.com/libp2p/go-libp2p v0.7.4 github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/libp2p/go-libp2p v0.5.1
github.com/libp2p/go-libp2p-autonat-svc v0.1.0 github.com/libp2p/go-libp2p-autonat-svc v0.1.0
github.com/libp2p/go-libp2p-core v0.5.1 github.com/libp2p/go-libp2p-core v0.3.0
github.com/libp2p/go-libp2p-peerstore v0.2.3 github.com/libp2p/go-libp2p-peerstore v0.1.4
github.com/libp2p/go-libp2p-quic-transport v0.3.3 github.com/libp2p/go-libp2p-quic-transport v0.2.2
github.com/libp2p/go-tcp-transport v0.2.0 github.com/libp2p/go-tcp-transport v0.1.1
github.com/libp2p/go-ws-transport v0.3.0 github.com/libp2p/go-ws-transport v0.2.0
github.com/multiformats/go-multiaddr v0.2.1 github.com/mattn/go-colorable v0.1.2 // indirect
github.com/multiformats/go-multistream v0.1.1 github.com/multiformats/go-multiaddr v0.2.0
github.com/multiformats/go-multistream v0.1.0
github.com/opentracing/opentracing-go v1.1.0 github.com/opentracing/opentracing-go v1.1.0
github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_golang v1.3.0
github.com/sirupsen/logrus v1.5.0 github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.7 github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/viper v1.6.3 github.com/spf13/cobra v0.0.5
github.com/syndtr/goleveldb v1.0.0 github.com/spf13/viper v1.6.2
github.com/syndtr/goleveldb v1.0.1-0.20190923125748-758128399b1d
github.com/uber/jaeger-client-go v2.22.1+incompatible github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
golang.org/x/crypto v0.0.0-20200406173513-056763e48d71 golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
resenje.org/web v0.4.3 resenje.org/web v0.4.0
) )
This diff is collapsed.
...@@ -20,8 +20,8 @@ import ( ...@@ -20,8 +20,8 @@ import (
"errors" "errors"
"time" "time"
"github.com/dgraph-io/badger/v2"
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
"github.com/syndtr/goleveldb/leveldb"
) )
var ( var (
...@@ -85,7 +85,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { ...@@ -85,7 +85,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
} }
}() }()
batch := db.shed.GetBatch(true) batch := new(leveldb.Batch)
target := db.gcTarget() target := db.gcTarget()
// protect database from changing idexes and gcSize // protect database from changing idexes and gcSize
...@@ -146,10 +146,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { ...@@ -146,10 +146,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
} }
db.metrics.GCCollectedCounter.Inc() db.metrics.GCCollectedCounter.Inc()
if err := db.gcSize.PutInBatch(batch, gcSize-collectedCount); err != nil { db.gcSize.PutInBatch(batch, gcSize-collectedCount)
return 0, false, err
}
err = db.shed.WriteBatch(batch) err = db.shed.WriteBatch(batch)
if err != nil { if err != nil {
db.metrics.GCExcludeWriteBatchError.Inc() db.metrics.GCExcludeWriteBatchError.Inc()
...@@ -168,7 +165,7 @@ func (db *DB) removeChunksInExcludeIndexFromGC() (err error) { ...@@ -168,7 +165,7 @@ func (db *DB) removeChunksInExcludeIndexFromGC() (err error) {
} }
}() }()
batch := db.shed.GetBatch(true) batch := new(leveldb.Batch)
excludedCount := 0 excludedCount := 0
var gcSizeChange int64 var gcSizeChange int64
err = db.gcExcludeIndex.Iterate(func(item shed.Item) (stop bool, err error) { err = db.gcExcludeIndex.Iterate(func(item shed.Item) (stop bool, err error) {
...@@ -247,18 +244,18 @@ func (db *DB) triggerGarbageCollection() { ...@@ -247,18 +244,18 @@ func (db *DB) triggerGarbageCollection() {
// incGCSizeInBatch changes gcSize field value // incGCSizeInBatch changes gcSize field value
// by change which can be negative. This function // by change which can be negative. This function
// must be called under batchMu lock. // must be called under batchMu lock.
func (db *DB) incGCSizeInBatch(batch *badger.Txn, change int64) (err error) { func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) {
if change == 0 { if change == 0 {
return nil return nil
} }
gcSize, err := db.gcSize.Get() gcSize, err := db.gcSize.Get()
if err != nil && !errors.Is(err, shed.ErrNotFound) { if err != nil && !errors.Is(err, leveldb.ErrNotFound) {
return err return err
} }
var new uint64 var newSize uint64
if change > 0 { if change > 0 {
new = gcSize + uint64(change) newSize = gcSize + uint64(change)
} else { } else {
// 'change' is an int64 and is negative // 'change' is an int64 and is negative
// a conversion is needed with correct sign // a conversion is needed with correct sign
...@@ -267,14 +264,12 @@ func (db *DB) incGCSizeInBatch(batch *badger.Txn, change int64) (err error) { ...@@ -267,14 +264,12 @@ func (db *DB) incGCSizeInBatch(batch *badger.Txn, change int64) (err error) {
// protect uint64 undeflow // protect uint64 undeflow
return nil return nil
} }
new = gcSize - c newSize = gcSize - c
}
if err := db.gcSize.PutInBatch(batch, new); err != nil {
return err
} }
db.gcSize.PutInBatch(batch, newSize)
// trigger garbage collection if we reached the capacity // trigger garbage collection if we reached the capacity
if new >= db.capacity { if newSize >= db.capacity {
db.triggerGarbageCollection() db.triggerGarbageCollection()
} }
return nil return nil
......
...@@ -29,6 +29,7 @@ import ( ...@@ -29,6 +29,7 @@ import (
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
) )
// TestDB_collectGarbageWorker tests garbage collection runs // TestDB_collectGarbageWorker tests garbage collection runs
...@@ -237,7 +238,7 @@ func TestPinGC(t *testing.T) { ...@@ -237,7 +238,7 @@ func TestPinGC(t *testing.T) {
t.Run("first chunks after pinned chunks should be removed", func(t *testing.T) { t.Run("first chunks after pinned chunks should be removed", func(t *testing.T) {
for i := pinChunksCount; i < (int(dbCapacity) - int(gcTarget)); i++ { for i := pinChunksCount; i < (int(dbCapacity) - int(gcTarget)); i++ {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[i]) _, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[i])
if err != storage.ErrNotFound { if err != leveldb.ErrNotFound {
t.Fatal(err) t.Fatal(err)
} }
} }
......
...@@ -30,6 +30,7 @@ import ( ...@@ -30,6 +30,7 @@ import (
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/syndtr/goleveldb/leveldb"
) )
var _ storage.Storer = &DB{} var _ storage.Storer = &DB{}
...@@ -191,7 +192,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB ...@@ -191,7 +192,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
return nil, err return nil, err
} }
schemaName, err := db.schemaName.Get() schemaName, err := db.schemaName.Get()
if err != nil && !errors.Is(err, shed.ErrNotFound) { if err != nil && !errors.Is(err, leveldb.ErrNotFound) {
return nil, err return nil, err
} }
if schemaName == "" { if schemaName == "" {
......
...@@ -33,6 +33,7 @@ import ( ...@@ -33,6 +33,7 @@ import (
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
chunktesting "github.com/ethersphere/bee/pkg/storage/testing" chunktesting "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
) )
func init() { func init() {
...@@ -245,7 +246,7 @@ func newRetrieveIndexesTest(db *DB, chunk swarm.Chunk, storeTimestamp, accessTim ...@@ -245,7 +246,7 @@ func newRetrieveIndexesTest(db *DB, chunk swarm.Chunk, storeTimestamp, accessTim
validateItem(t, item, chunk.Address().Bytes(), chunk.Data(), storeTimestamp, 0) validateItem(t, item, chunk.Address().Bytes(), chunk.Data(), storeTimestamp, 0)
// access index should not be set // access index should not be set
wantErr := shed.ErrNotFound wantErr := leveldb.ErrNotFound
_, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address())) _, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address()))
if err != wantErr { if err != wantErr {
t.Errorf("got error %v, want %v", err, wantErr) t.Errorf("got error %v, want %v", err, wantErr)
......
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
) )
// Get returns a chunk from the database. If the chunk is // Get returns a chunk from the database. If the chunk is
...@@ -42,7 +43,7 @@ func (db *DB) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) ...@@ -42,7 +43,7 @@ func (db *DB) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address)
out, err := db.get(mode, addr) out, err := db.get(mode, addr)
if err != nil { if err != nil {
if err == shed.ErrNotFound { if err == leveldb.ErrNotFound {
return nil, storage.ErrNotFound return nil, storage.ErrNotFound
} }
return nil, err return nil, err
...@@ -123,7 +124,7 @@ func (db *DB) updateGC(item shed.Item) (err error) { ...@@ -123,7 +124,7 @@ func (db *DB) updateGC(item shed.Item) (err error) {
db.batchMu.Lock() db.batchMu.Lock()
defer db.batchMu.Unlock() defer db.batchMu.Unlock()
batch := db.shed.GetBatch(true) batch := new(leveldb.Batch)
// update accessTimeStamp in retrieve, gc // update accessTimeStamp in retrieve, gc
...@@ -131,7 +132,7 @@ func (db *DB) updateGC(item shed.Item) (err error) { ...@@ -131,7 +132,7 @@ func (db *DB) updateGC(item shed.Item) (err error) {
switch err { switch err {
case nil: case nil:
item.AccessTimestamp = i.AccessTimestamp item.AccessTimestamp = i.AccessTimestamp
case shed.ErrNotFound: case leveldb.ErrNotFound:
// no chunk accesses // no chunk accesses
default: default:
return err return err
......
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
) )
// GetMulti returns chunks from the database. If one of the chunks is not found // GetMulti returns chunks from the database. If one of the chunks is not found
...@@ -41,7 +42,7 @@ func (db *DB) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm ...@@ -41,7 +42,7 @@ func (db *DB) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm
out, err := db.getMulti(mode, addrs...) out, err := db.getMulti(mode, addrs...)
if err != nil { if err != nil {
if err == shed.ErrNotFound { if err == leveldb.ErrNotFound {
return nil, storage.ErrNotFound return nil, storage.ErrNotFound
} }
return nil, err return nil, err
......
...@@ -20,10 +20,10 @@ import ( ...@@ -20,10 +20,10 @@ import (
"context" "context"
"time" "time"
"github.com/dgraph-io/badger/v2"
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
) )
// Put stores Chunks to database and depending // Put stores Chunks to database and depending
...@@ -55,7 +55,7 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e ...@@ -55,7 +55,7 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e
db.batchMu.Lock() db.batchMu.Lock()
defer db.batchMu.Unlock() defer db.batchMu.Unlock()
batch := db.shed.GetBatch(true) batch := new(leveldb.Batch)
// variables that provide information for operations // variables that provide information for operations
// to be done after write batch function successfully executes // to be done after write batch function successfully executes
...@@ -130,9 +130,7 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e ...@@ -130,9 +130,7 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e
} }
for po, id := range binIDs { for po, id := range binIDs {
if err := db.binIDs.PutInBatch(batch, uint64(po), id); err != nil { db.binIDs.PutInBatch(batch, uint64(po), id)
return nil, err
}
} }
err = db.incGCSizeInBatch(batch, gcSizeChange) err = db.incGCSizeInBatch(batch, gcSizeChange)
...@@ -159,14 +157,14 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e ...@@ -159,14 +157,14 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e
// - it does not enter the syncpool // - it does not enter the syncpool
// The batch can be written to the database. // The batch can be written to the database.
// Provided batch and binID map are updated. // Provided batch and binID map are updated.
func (db *DB) putRequest(batch *badger.Txn, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) { func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
i, err := db.retrievalDataIndex.Get(item) i, err := db.retrievalDataIndex.Get(item)
switch err { switch err {
case nil: case nil:
exists = true exists = true
item.StoreTimestamp = i.StoreTimestamp item.StoreTimestamp = i.StoreTimestamp
item.BinID = i.BinID item.BinID = i.BinID
case shed.ErrNotFound: case leveldb.ErrNotFound:
// no chunk accesses // no chunk accesses
exists = false exists = false
default: default:
...@@ -199,7 +197,7 @@ func (db *DB) putRequest(batch *badger.Txn, binIDs map[uint8]uint64, item shed.I ...@@ -199,7 +197,7 @@ func (db *DB) putRequest(batch *badger.Txn, binIDs map[uint8]uint64, item shed.I
// - put to indexes: retrieve, push, pull // - put to indexes: retrieve, push, pull
// The batch can be written to the database. // The batch can be written to the database.
// Provided batch and binID map are updated. // Provided batch and binID map are updated.
func (db *DB) putUpload(batch *badger.Txn, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) { func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
exists, err = db.retrievalDataIndex.Has(item) exists, err = db.retrievalDataIndex.Has(item)
if err != nil { if err != nil {
return false, 0, err return false, 0, err
...@@ -261,7 +259,7 @@ func (db *DB) putUpload(batch *badger.Txn, binIDs map[uint8]uint64, item shed.It ...@@ -261,7 +259,7 @@ func (db *DB) putUpload(batch *badger.Txn, binIDs map[uint8]uint64, item shed.It
// - put to indexes: retrieve, pull // - put to indexes: retrieve, pull
// The batch can be written to the database. // The batch can be written to the database.
// Provided batch and binID map are updated. // Provided batch and binID map are updated.
func (db *DB) putSync(batch *badger.Txn, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) { func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
exists, err = db.retrievalDataIndex.Has(item) exists, err = db.retrievalDataIndex.Has(item)
if err != nil { if err != nil {
return false, 0, err return false, 0, err
...@@ -311,7 +309,7 @@ func (db *DB) putSync(batch *badger.Txn, binIDs map[uint8]uint64, item shed.Item ...@@ -311,7 +309,7 @@ func (db *DB) putSync(batch *badger.Txn, binIDs map[uint8]uint64, item shed.Item
// a chunk is added to a node's localstore and given that the chunk is // a chunk is added to a node's localstore and given that the chunk is
// already within that node's NN (thus, it can be added to the gc index // already within that node's NN (thus, it can be added to the gc index
// safely) // safely)
func (db *DB) setGC(batch *badger.Txn, item shed.Item) (gcSizeChange int64, err error) { func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, err error) {
if item.BinID == 0 { if item.BinID == 0 {
i, err := db.retrievalDataIndex.Get(item) i, err := db.retrievalDataIndex.Get(item)
if err != nil { if err != nil {
...@@ -328,7 +326,7 @@ func (db *DB) setGC(batch *badger.Txn, item shed.Item) (gcSizeChange int64, err ...@@ -328,7 +326,7 @@ func (db *DB) setGC(batch *badger.Txn, item shed.Item) (gcSizeChange int64, err
return 0, err return 0, err
} }
gcSizeChange-- gcSizeChange--
case shed.ErrNotFound: case leveldb.ErrNotFound:
// the chunk is not accessed before // the chunk is not accessed before
default: default:
return 0, err return 0, err
......
...@@ -24,9 +24,9 @@ import ( ...@@ -24,9 +24,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
) )
// TestModePutRequest validates ModePutRequest index values on the provided DB. // TestModePutRequest validates ModePutRequest index values on the provided DB.
...@@ -362,7 +362,7 @@ func TestModePut_addToGc(t *testing.T) { ...@@ -362,7 +362,7 @@ func TestModePut_addToGc(t *testing.T) {
binIDs[po]++ binIDs[po]++
var wantErr error var wantErr error
if !m.putToGc { if !m.putToGc {
wantErr = shed.ErrNotFound wantErr = leveldb.ErrNotFound
} }
newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp) newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp)
newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, binIDs[po], wantErr)(t) newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, binIDs[po], wantErr)(t)
...@@ -428,7 +428,7 @@ func TestModePut_addToGcExisting(t *testing.T) { ...@@ -428,7 +428,7 @@ func TestModePut_addToGcExisting(t *testing.T) {
binIDs[po]++ binIDs[po]++
var wantErr error var wantErr error
if !m.putToGc { if !m.putToGc {
wantErr = shed.ErrNotFound wantErr = leveldb.ErrNotFound
} }
newRetrieveIndexesTestWithAccess(db, ch, wantStoreTimestamp, wantAccessTimestamp) newRetrieveIndexesTestWithAccess(db, ch, wantStoreTimestamp, wantAccessTimestamp)
......
...@@ -21,11 +21,10 @@ import ( ...@@ -21,11 +21,10 @@ import (
"errors" "errors"
"time" "time"
"github.com/dgraph-io/badger/v2"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
"github.com/syndtr/goleveldb/leveldb"
) )
// Set updates database indexes for // Set updates database indexes for
...@@ -51,7 +50,7 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) { ...@@ -51,7 +50,7 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) {
db.batchMu.Lock() db.batchMu.Lock()
defer db.batchMu.Unlock() defer db.batchMu.Unlock()
batch := db.shed.GetBatch(true) batch := new(leveldb.Batch)
// variables that provide information for operations // variables that provide information for operations
// to be done after write batch function successfully executes // to be done after write batch function successfully executes
...@@ -74,9 +73,7 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) { ...@@ -74,9 +73,7 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) {
triggerPullFeed[po] = struct{}{} triggerPullFeed[po] = struct{}{}
} }
for po, id := range binIDs { for po, id := range binIDs {
if err := db.binIDs.PutInBatch(batch, uint64(po), id); err != nil { db.binIDs.PutInBatch(batch, uint64(po), id)
return err
}
} }
case storage.ModeSetSyncPush, storage.ModeSetSyncPull: case storage.ModeSetSyncPush, storage.ModeSetSyncPull:
...@@ -134,7 +131,7 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) { ...@@ -134,7 +131,7 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) {
// setAccess sets the chunk access time by updating required indexes: // setAccess sets the chunk access time by updating required indexes:
// - add to pull, insert to gc // - add to pull, insert to gc
// Provided batch and binID map are updated. // Provided batch and binID map are updated.
func (db *DB) setAccess(batch *badger.Txn, binIDs map[uint8]uint64, addr swarm.Address, po uint8) (gcSizeChange int64, err error) { func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr swarm.Address, po uint8) (gcSizeChange int64, err error) {
item := addressToItem(addr) item := addressToItem(addr)
...@@ -146,7 +143,7 @@ func (db *DB) setAccess(batch *badger.Txn, binIDs map[uint8]uint64, addr swarm.A ...@@ -146,7 +143,7 @@ func (db *DB) setAccess(batch *badger.Txn, binIDs map[uint8]uint64, addr swarm.A
case nil: case nil:
item.StoreTimestamp = i.StoreTimestamp item.StoreTimestamp = i.StoreTimestamp
item.BinID = i.BinID item.BinID = i.BinID
case shed.ErrNotFound: case leveldb.ErrNotFound:
err = db.pushIndex.DeleteInBatch(batch, item) err = db.pushIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return 0, err return 0, err
...@@ -169,7 +166,7 @@ func (db *DB) setAccess(batch *badger.Txn, binIDs map[uint8]uint64, addr swarm.A ...@@ -169,7 +166,7 @@ func (db *DB) setAccess(batch *badger.Txn, binIDs map[uint8]uint64, addr swarm.A
return 0, err return 0, err
} }
gcSizeChange-- gcSizeChange--
case shed.ErrNotFound: case leveldb.ErrNotFound:
// the chunk is not accessed before // the chunk is not accessed before
default: default:
return 0, err return 0, err
...@@ -199,7 +196,7 @@ func (db *DB) setAccess(batch *badger.Txn, binIDs map[uint8]uint64, addr swarm.A ...@@ -199,7 +196,7 @@ func (db *DB) setAccess(batch *badger.Txn, binIDs map[uint8]uint64, addr swarm.A
// from push sync index // from push sync index
// - update to gc index happens given item does not exist in pin index // - update to gc index happens given item does not exist in pin index
// Provided batch is updated. // Provided batch is updated.
func (db *DB) setSync(batch *badger.Txn, addr swarm.Address, mode storage.ModeSet) (gcSizeChange int64, err error) { func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.ModeSet) (gcSizeChange int64, err error) {
item := addressToItem(addr) item := addressToItem(addr)
// need to get access timestamp here as it is not // need to get access timestamp here as it is not
...@@ -208,7 +205,7 @@ func (db *DB) setSync(batch *badger.Txn, addr swarm.Address, mode storage.ModeSe ...@@ -208,7 +205,7 @@ func (db *DB) setSync(batch *badger.Txn, addr swarm.Address, mode storage.ModeSe
i, err := db.retrievalDataIndex.Get(item) i, err := db.retrievalDataIndex.Get(item)
if err != nil { if err != nil {
if err == shed.ErrNotFound { if err == leveldb.ErrNotFound {
// chunk is not found, // chunk is not found,
// no need to update gc index // no need to update gc index
// just delete from the push index // just delete from the push index
...@@ -231,7 +228,7 @@ func (db *DB) setSync(batch *badger.Txn, addr swarm.Address, mode storage.ModeSe ...@@ -231,7 +228,7 @@ func (db *DB) setSync(batch *badger.Txn, addr swarm.Address, mode storage.ModeSe
// this prevents duplicate increments // this prevents duplicate increments
i, err := db.pullIndex.Get(item) i, err := db.pullIndex.Get(item)
if err != nil { if err != nil {
if err == shed.ErrNotFound { if err == leveldb.ErrNotFound {
// we handle this error internally, since this is an internal inconsistency of the indices // we handle this error internally, since this is an internal inconsistency of the indices
// if we return the error here - it means that for example, in stream protocol peers which we sync // if we return the error here - it means that for example, in stream protocol peers which we sync
// to would be dropped. this is possible when the chunk is put with ModePutRequest and ModeSetSyncPull is // to would be dropped. this is possible when the chunk is put with ModePutRequest and ModeSetSyncPull is
...@@ -266,7 +263,7 @@ func (db *DB) setSync(batch *badger.Txn, addr swarm.Address, mode storage.ModeSe ...@@ -266,7 +263,7 @@ func (db *DB) setSync(batch *badger.Txn, addr swarm.Address, mode storage.ModeSe
case storage.ModeSetSyncPush: case storage.ModeSetSyncPush:
i, err := db.pushIndex.Get(item) i, err := db.pushIndex.Get(item)
if err != nil { if err != nil {
if err == shed.ErrNotFound { if err == leveldb.ErrNotFound {
// we handle this error internally, since this is an internal inconsistency of the indices // we handle this error internally, since this is an internal inconsistency of the indices
// this error can happen if the chunk is put with ModePutRequest or ModePutSync // this error can happen if the chunk is put with ModePutRequest or ModePutSync
// but this function is called with ModeSetSyncPush // but this function is called with ModeSetSyncPush
...@@ -306,7 +303,7 @@ func (db *DB) setSync(batch *badger.Txn, addr swarm.Address, mode storage.ModeSe ...@@ -306,7 +303,7 @@ func (db *DB) setSync(batch *badger.Txn, addr swarm.Address, mode storage.ModeSe
return 0, err return 0, err
} }
gcSizeChange-- gcSizeChange--
case shed.ErrNotFound: case leveldb.ErrNotFound:
// the chunk is not accessed before // the chunk is not accessed before
default: default:
return 0, err return 0, err
...@@ -336,7 +333,7 @@ func (db *DB) setSync(batch *badger.Txn, addr swarm.Address, mode storage.ModeSe ...@@ -336,7 +333,7 @@ func (db *DB) setSync(batch *badger.Txn, addr swarm.Address, mode storage.ModeSe
// setRemove removes the chunk by updating indexes: // setRemove removes the chunk by updating indexes:
// - delete from retrieve, pull, gc // - delete from retrieve, pull, gc
// Provided batch is updated. // Provided batch is updated.
func (db *DB) setRemove(batch *badger.Txn, addr swarm.Address) (gcSizeChange int64, err error) { func (db *DB) setRemove(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange int64, err error) {
item := addressToItem(addr) item := addressToItem(addr)
// need to get access timestamp here as it is not // need to get access timestamp here as it is not
...@@ -346,7 +343,7 @@ func (db *DB) setRemove(batch *badger.Txn, addr swarm.Address) (gcSizeChange int ...@@ -346,7 +343,7 @@ func (db *DB) setRemove(batch *badger.Txn, addr swarm.Address) (gcSizeChange int
switch err { switch err {
case nil: case nil:
item.AccessTimestamp = i.AccessTimestamp item.AccessTimestamp = i.AccessTimestamp
case shed.ErrNotFound: case leveldb.ErrNotFound:
default: default:
return 0, err return 0, err
} }
...@@ -386,14 +383,14 @@ func (db *DB) setRemove(batch *badger.Txn, addr swarm.Address) (gcSizeChange int ...@@ -386,14 +383,14 @@ func (db *DB) setRemove(batch *badger.Txn, addr swarm.Address) (gcSizeChange int
// setPin increments pin counter for the chunk by updating // setPin increments pin counter for the chunk by updating
// pin index and sets the chunk to be excluded from garbage collection. // pin index and sets the chunk to be excluded from garbage collection.
// Provided batch is updated. // Provided batch is updated.
func (db *DB) setPin(batch *badger.Txn, addr swarm.Address) (err error) { func (db *DB) setPin(batch *leveldb.Batch, addr swarm.Address) (err error) {
item := addressToItem(addr) item := addressToItem(addr)
// Get the existing pin counter of the chunk // Get the existing pin counter of the chunk
existingPinCounter := uint64(0) existingPinCounter := uint64(0)
pinnedChunk, err := db.pinIndex.Get(item) pinnedChunk, err := db.pinIndex.Get(item)
if err != nil { if err != nil {
if err == shed.ErrNotFound { if err == leveldb.ErrNotFound {
// If this Address is not present in DB, then its a new entry // If this Address is not present in DB, then its a new entry
existingPinCounter = 0 existingPinCounter = 0
...@@ -421,7 +418,7 @@ func (db *DB) setPin(batch *badger.Txn, addr swarm.Address) (err error) { ...@@ -421,7 +418,7 @@ func (db *DB) setPin(batch *badger.Txn, addr swarm.Address) (err error) {
// setUnpin decrements pin counter for the chunk by updating pin index. // setUnpin decrements pin counter for the chunk by updating pin index.
// Provided batch is updated. // Provided batch is updated.
func (db *DB) setUnpin(batch *badger.Txn, addr swarm.Address) (err error) { func (db *DB) setUnpin(batch *leveldb.Batch, addr swarm.Address) (err error) {
item := addressToItem(addr) item := addressToItem(addr)
// Get the existing pin counter of the chunk // Get the existing pin counter of the chunk
......
...@@ -25,6 +25,7 @@ import ( ...@@ -25,6 +25,7 @@ import (
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
tagtesting "github.com/ethersphere/bee/pkg/tags/testing" tagtesting "github.com/ethersphere/bee/pkg/tags/testing"
"github.com/syndtr/goleveldb/leveldb"
) )
// TestModeSetAccess validates ModeSetAccess index values on the provided DB. // TestModeSetAccess validates ModeSetAccess index values on the provided DB.
...@@ -333,7 +334,7 @@ func TestModeSetRemove(t *testing.T) { ...@@ -333,7 +334,7 @@ func TestModeSetRemove(t *testing.T) {
t.Run("retrieve indexes", func(t *testing.T) { t.Run("retrieve indexes", func(t *testing.T) {
for _, ch := range chunks { for _, ch := range chunks {
wantErr := shed.ErrNotFound wantErr := leveldb.ErrNotFound
_, err := db.retrievalDataIndex.Get(addressToItem(ch.Address())) _, err := db.retrievalDataIndex.Get(addressToItem(ch.Address()))
if err != wantErr { if err != wantErr {
t.Errorf("got error %v, want %v", err, wantErr) t.Errorf("got error %v, want %v", err, wantErr)
...@@ -352,7 +353,7 @@ func TestModeSetRemove(t *testing.T) { ...@@ -352,7 +353,7 @@ func TestModeSetRemove(t *testing.T) {
}) })
for _, ch := range chunks { for _, ch := range chunks {
newPullIndexTest(db, ch, 0, shed.ErrNotFound)(t) newPullIndexTest(db, ch, 0, leveldb.ErrNotFound)(t)
} }
t.Run("pull index count", newItemsCountTest(db.pullIndex, 0)) t.Run("pull index count", newItemsCountTest(db.pullIndex, 0))
......
...@@ -25,6 +25,7 @@ import ( ...@@ -25,6 +25,7 @@ import (
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
) )
// SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index. // SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
...@@ -186,7 +187,7 @@ func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) { ...@@ -186,7 +187,7 @@ func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
item, err := db.pullIndex.Last([]byte{bin}) item, err := db.pullIndex.Last([]byte{bin})
if err != nil { if err != nil {
if err == shed.ErrNotFound { if err == leveldb.ErrNotFound {
return 0, nil return 0, nil
} }
return 0, err return 0, err
......
...@@ -32,8 +32,6 @@ import ( ...@@ -32,8 +32,6 @@ import (
// push syncing subscription is created and validates if // push syncing subscription is created and validates if
// all addresses are received in the right order. // all addresses are received in the right order.
func TestDB_SubscribePush(t *testing.T) { func TestDB_SubscribePush(t *testing.T) {
t.Skip("fails with badger shed")
db, cleanupFunc := newTestDB(t, nil) db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc() defer cleanupFunc()
...@@ -120,8 +118,6 @@ func TestDB_SubscribePush(t *testing.T) { ...@@ -120,8 +118,6 @@ func TestDB_SubscribePush(t *testing.T) {
// multiple push syncing subscriptions are created and // multiple push syncing subscriptions are created and
// validates if all addresses are received in the right order. // validates if all addresses are received in the right order.
func TestDB_SubscribePush_multiple(t *testing.T) { func TestDB_SubscribePush_multiple(t *testing.T) {
t.Skip("fails with badger shed")
db, cleanupFunc := newTestDB(t, nil) db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc() defer cleanupFunc()
......
This diff is collapsed.
...@@ -27,8 +27,10 @@ import ( ...@@ -27,8 +27,10 @@ import (
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/testing" "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
) )
// Store holds fields and indexes (including their encoding functions) // Store holds fields and indexes (including their encoding functions)
...@@ -157,13 +159,16 @@ func (s *Store) Put(_ context.Context, ch swarm.Chunk) (err error) { ...@@ -157,13 +159,16 @@ func (s *Store) Put(_ context.Context, ch swarm.Chunk) (err error) {
// items from them and adding new items as keys of index entries // items from them and adding new items as keys of index entries
// are changed. // are changed.
func (s *Store) Get(_ context.Context, addr swarm.Address) (c swarm.Chunk, err error) { func (s *Store) Get(_ context.Context, addr swarm.Address) (c swarm.Chunk, err error) {
batch := s.db.GetBatch(true) batch := new(leveldb.Batch)
// Get the chunk data and storage timestamp. // Get the chunk data and storage timestamp.
item, err := s.retrievalIndex.Get(shed.Item{ item, err := s.retrievalIndex.Get(shed.Item{
Address: addr.Bytes(), Address: addr.Bytes(),
}) })
if err != nil { if err != nil {
if err == leveldb.ErrNotFound {
return nil, storage.ErrNotFound
}
return nil, err return nil, err
} }
...@@ -182,7 +187,7 @@ func (s *Store) Get(_ context.Context, addr swarm.Address) (c swarm.Chunk, err e ...@@ -182,7 +187,7 @@ func (s *Store) Get(_ context.Context, addr swarm.Address) (c swarm.Chunk, err e
if err != nil { if err != nil {
return nil, err return nil, err
} }
case shed.ErrNotFound: case leveldb.ErrNotFound:
// Access timestamp is not found. Do not do anything. // Access timestamp is not found. Do not do anything.
// This is the firs get request. // This is the firs get request.
default: default:
...@@ -238,7 +243,7 @@ func (s *Store) CollectGarbage() (err error) { ...@@ -238,7 +243,7 @@ func (s *Store) CollectGarbage() (err error) {
for roundCount := 0; roundCount < maxRounds; roundCount++ { for roundCount := 0; roundCount < maxRounds; roundCount++ {
var garbageCount int var garbageCount int
// New batch for a new cg round. // New batch for a new cg round.
trash := s.db.GetBatch(true) trash := new(leveldb.Batch)
// Iterate through all index items and break when needed. // Iterate through all index items and break when needed.
err = s.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { err = s.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
// Remove the chunk. // Remove the chunk.
...@@ -280,7 +285,7 @@ func (s *Store) CollectGarbage() (err error) { ...@@ -280,7 +285,7 @@ func (s *Store) CollectGarbage() (err error) {
// string from a database field. // string from a database field.
func (s *Store) GetSchema() (name string, err error) { func (s *Store) GetSchema() (name string, err error) {
name, err = s.schemaName.Get() name, err = s.schemaName.Get()
if err == shed.ErrNotFound { if err == leveldb.ErrNotFound {
return "", nil return "", nil
} }
return name, err return name, err
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
package shed package shed
import ( import (
"github.com/dgraph-io/badger/v2"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/syndtr/goleveldb/leveldb"
) )
// StringField is the most simple field implementation // StringField is the most simple field implementation
...@@ -49,6 +49,10 @@ func (db *DB) NewStringField(name string) (f StringField, err error) { ...@@ -49,6 +49,10 @@ func (db *DB) NewStringField(name string) (f StringField, err error) {
func (f StringField) Get() (val string, err error) { func (f StringField) Get() (val string, err error) {
b, err := f.db.Get(f.key) b, err := f.db.Get(f.key)
if err != nil { if err != nil {
if err == leveldb.ErrNotFound {
f.logger.Errorf("key %s not found", string(f.key))
return "", nil
}
return "", err return "", err
} }
return string(b), nil return string(b), nil
...@@ -61,6 +65,6 @@ func (f StringField) Put(val string) (err error) { ...@@ -61,6 +65,6 @@ func (f StringField) Put(val string) (err error) {
// PutInBatch stores a string in a batch that can be // PutInBatch stores a string in a batch that can be
// saved later in database. // saved later in database.
func (f StringField) PutInBatch(batch *badger.Txn, val string) (err error) { func (f StringField) PutInBatch(batch *leveldb.Batch, val string) {
return batch.Set(f.key, []byte(val)) batch.Put(f.key, []byte(val))
} }
...@@ -18,6 +18,8 @@ package shed ...@@ -18,6 +18,8 @@ package shed
import ( import (
"testing" "testing"
"github.com/syndtr/goleveldb/leveldb"
) )
// TestStringField validates put and get operations // TestStringField validates put and get operations
...@@ -33,7 +35,7 @@ func TestStringField(t *testing.T) { ...@@ -33,7 +35,7 @@ func TestStringField(t *testing.T) {
t.Run("get empty", func(t *testing.T) { t.Run("get empty", func(t *testing.T) {
got, err := simpleString.Get() got, err := simpleString.Get()
if err == nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
want := "" want := ""
...@@ -73,12 +75,9 @@ func TestStringField(t *testing.T) { ...@@ -73,12 +75,9 @@ func TestStringField(t *testing.T) {
}) })
t.Run("put in batch", func(t *testing.T) { t.Run("put in batch", func(t *testing.T) {
batch := db.GetBatch(true) batch := new(leveldb.Batch)
want := "simple string batch value" want := "simple string batch value"
err = simpleString.PutInBatch(batch, want) simpleString.PutInBatch(batch, want)
if err != nil {
t.Fatal(err)
}
err = db.WriteBatch(batch) err = db.WriteBatch(batch)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -92,12 +91,9 @@ func TestStringField(t *testing.T) { ...@@ -92,12 +91,9 @@ func TestStringField(t *testing.T) {
} }
t.Run("overwrite", func(t *testing.T) { t.Run("overwrite", func(t *testing.T) {
batch := db.GetBatch(true) batch := new(leveldb.Batch)
want := "overwritten string batch value" want := "overwritten string batch value"
err = simpleString.PutInBatch(batch, want) simpleString.PutInBatch(batch, want)
if err != nil {
t.Fatal(err)
}
err = db.WriteBatch(batch) err = db.WriteBatch(batch)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
......
...@@ -19,8 +19,8 @@ package shed ...@@ -19,8 +19,8 @@ package shed
import ( import (
"encoding/json" "encoding/json"
"github.com/dgraph-io/badger/v2"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/syndtr/goleveldb/leveldb"
) )
// StructField is a helper to store complex structure by // StructField is a helper to store complex structure by
...@@ -46,7 +46,7 @@ func (db *DB) NewStructField(name string) (f StructField, err error) { ...@@ -46,7 +46,7 @@ func (db *DB) NewStructField(name string) (f StructField, err error) {
} }
// Get unmarshals data from the database to a provided val. // Get unmarshals data from the database to a provided val.
// If the data is not found ErrNotFound is returned. // If the data is not found leveldb.ErrNotFound is returned.
func (f StructField) Get(val interface{}) (err error) { func (f StructField) Get(val interface{}) (err error) {
b, err := f.db.Get(f.key) b, err := f.db.Get(f.key)
if err != nil { if err != nil {
...@@ -67,14 +67,12 @@ func (f StructField) Put(val interface{}) (err error) { ...@@ -67,14 +67,12 @@ func (f StructField) Put(val interface{}) (err error) {
} }
// PutInBatch marshals provided val and puts it into the batch. // PutInBatch marshals provided val and puts it into the batch.
func (f StructField) PutInBatch(batch *badger.Txn, val interface{}) (err error) { func (f StructField) PutInBatch(batch *leveldb.Batch, val interface{}) (err error) {
b, err := json.Marshal(val) b, err := json.Marshal(val)
if err != nil { if err != nil {
f.logger.Debugf("could not PUT key %s in batch", string(f.key))
return err return err
} }
err = batch.Set(f.key, b) batch.Put(f.key, b)
if err != nil {
return err
}
return nil return nil
} }
...@@ -18,6 +18,8 @@ package shed ...@@ -18,6 +18,8 @@ package shed
import ( import (
"testing" "testing"
"github.com/syndtr/goleveldb/leveldb"
) )
// TestStructField validates put and get operations // TestStructField validates put and get operations
...@@ -38,8 +40,8 @@ func TestStructField(t *testing.T) { ...@@ -38,8 +40,8 @@ func TestStructField(t *testing.T) {
t.Run("get empty", func(t *testing.T) { t.Run("get empty", func(t *testing.T) {
var s complexStructure var s complexStructure
err := complexField.Get(&s) err := complexField.Get(&s)
if err != ErrNotFound { if err != leveldb.ErrNotFound {
t.Fatalf("got error %v, want %v", err, ErrNotFound) t.Fatalf("got error %v, want %v", err, leveldb.ErrNotFound)
} }
want := "" want := ""
if s.A != want { if s.A != want {
...@@ -84,7 +86,7 @@ func TestStructField(t *testing.T) { ...@@ -84,7 +86,7 @@ func TestStructField(t *testing.T) {
}) })
t.Run("put in batch", func(t *testing.T) { t.Run("put in batch", func(t *testing.T) {
batch := db.GetBatch(true) batch := new(leveldb.Batch)
want := complexStructure{ want := complexStructure{
A: "simple string batch value", A: "simple string batch value",
} }
...@@ -106,7 +108,7 @@ func TestStructField(t *testing.T) { ...@@ -106,7 +108,7 @@ func TestStructField(t *testing.T) {
} }
t.Run("overwrite", func(t *testing.T) { t.Run("overwrite", func(t *testing.T) {
batch := db.GetBatch(true) batch := new(leveldb.Batch)
want := complexStructure{ want := complexStructure{
A: "overwritten string batch value", A: "overwritten string batch value",
} }
......
...@@ -19,8 +19,8 @@ package shed ...@@ -19,8 +19,8 @@ package shed
import ( import (
"encoding/binary" "encoding/binary"
"github.com/dgraph-io/badger/v2"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/syndtr/goleveldb/leveldb"
) )
// Uint64Field provides a way to have a simple counter in the database. // Uint64Field provides a way to have a simple counter in the database.
...@@ -51,7 +51,7 @@ func (db *DB) NewUint64Field(name string) (f Uint64Field, err error) { ...@@ -51,7 +51,7 @@ func (db *DB) NewUint64Field(name string) (f Uint64Field, err error) {
func (f Uint64Field) Get() (val uint64, err error) { func (f Uint64Field) Get() (val uint64, err error) {
b, err := f.db.Get(f.key) b, err := f.db.Get(f.key)
if err != nil { if err != nil {
if err == ErrNotFound { if err == leveldb.ErrNotFound {
f.logger.Errorf("key %s not found", string(f.key)) f.logger.Errorf("key %s not found", string(f.key))
return 0, nil return 0, nil
} }
...@@ -67,8 +67,8 @@ func (f Uint64Field) Put(val uint64) (err error) { ...@@ -67,8 +67,8 @@ func (f Uint64Field) Put(val uint64) (err error) {
// PutInBatch stores a uint64 value in a batch // PutInBatch stores a uint64 value in a batch
// that can be saved later in the database. // that can be saved later in the database.
func (f Uint64Field) PutInBatch(batch *badger.Txn, val uint64) (err error) { func (f Uint64Field) PutInBatch(batch *leveldb.Batch, val uint64) {
return batch.Set(f.key, encodeUint64(val)) batch.Put(f.key, encodeUint64(val))
} }
// Inc increments a uint64 value in the database. // Inc increments a uint64 value in the database.
...@@ -76,8 +76,14 @@ func (f Uint64Field) PutInBatch(batch *badger.Txn, val uint64) (err error) { ...@@ -76,8 +76,14 @@ func (f Uint64Field) PutInBatch(batch *badger.Txn, val uint64) (err error) {
func (f Uint64Field) Inc() (val uint64, err error) { func (f Uint64Field) Inc() (val uint64, err error) {
val, err = f.Get() val, err = f.Get()
if err != nil { if err != nil {
if err == leveldb.ErrNotFound {
f.logger.Debugf("key %s not found", string(f.key))
val = 0
} else {
f.logger.Errorf("key %s not found. Error: %s", string(f.key), err.Error())
return 0, err return 0, err
} }
}
val++ val++
return val, f.Put(val) return val, f.Put(val)
} }
...@@ -85,16 +91,19 @@ func (f Uint64Field) Inc() (val uint64, err error) { ...@@ -85,16 +91,19 @@ func (f Uint64Field) Inc() (val uint64, err error) {
// IncInBatch increments a uint64 value in the batch // IncInBatch increments a uint64 value in the batch
// by retreiving a value from the database, not the same batch. // by retreiving a value from the database, not the same batch.
// This operation is not goroutine save. // This operation is not goroutine save.
func (f Uint64Field) IncInBatch(batch *badger.Txn) (val uint64, err error) { func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error) {
val, err = f.Get() val, err = f.Get()
if err != nil { if err != nil {
if err == leveldb.ErrNotFound {
f.logger.Debugf("key %s not found", string(f.key))
val = 0
} else {
f.logger.Errorf("key %s not found. Error: %s", string(f.key), err.Error())
return 0, err return 0, err
} }
val++
err = f.PutInBatch(batch, val)
if err != nil {
return 0, err
} }
val++
f.PutInBatch(batch, val)
return val, nil return val, nil
} }
...@@ -104,8 +113,14 @@ func (f Uint64Field) IncInBatch(batch *badger.Txn) (val uint64, err error) { ...@@ -104,8 +113,14 @@ func (f Uint64Field) IncInBatch(batch *badger.Txn) (val uint64, err error) {
func (f Uint64Field) Dec() (val uint64, err error) { func (f Uint64Field) Dec() (val uint64, err error) {
val, err = f.Get() val, err = f.Get()
if err != nil { if err != nil {
if err == leveldb.ErrNotFound {
f.logger.Debugf("key %s not found", string(f.key))
val = 0
} else {
f.logger.Errorf("key %s not found. Error: %s", string(f.key), err.Error())
return 0, err return 0, err
} }
}
if val != 0 { if val != 0 {
val-- val--
} }
...@@ -116,18 +131,21 @@ func (f Uint64Field) Dec() (val uint64, err error) { ...@@ -116,18 +131,21 @@ func (f Uint64Field) Dec() (val uint64, err error) {
// by retreiving a value from the database, not the same batch. // by retreiving a value from the database, not the same batch.
// This operation is not goroutine save. // This operation is not goroutine save.
// The field is protected from overflow to a negative value. // The field is protected from overflow to a negative value.
func (f Uint64Field) DecInBatch(batch *badger.Txn) (val uint64, err error) { func (f Uint64Field) DecInBatch(batch *leveldb.Batch) (val uint64, err error) {
val, err = f.Get() val, err = f.Get()
if err != nil { if err != nil {
if err == leveldb.ErrNotFound {
f.logger.Debugf("key %s not found", string(f.key))
val = 0
} else {
f.logger.Errorf("key %s not found. Error: %s", string(f.key), err.Error())
return 0, err return 0, err
} }
}
if val != 0 { if val != 0 {
val-- val--
} }
err = f.PutInBatch(batch, val) f.PutInBatch(batch, val)
if err != nil {
return 0, err
}
return val, nil return val, nil
} }
......
...@@ -18,6 +18,8 @@ package shed ...@@ -18,6 +18,8 @@ package shed
import ( import (
"testing" "testing"
"github.com/syndtr/goleveldb/leveldb"
) )
// TestUint64Field validates put and get operations // TestUint64Field validates put and get operations
...@@ -73,12 +75,9 @@ func TestUint64Field(t *testing.T) { ...@@ -73,12 +75,9 @@ func TestUint64Field(t *testing.T) {
}) })
t.Run("put in batch", func(t *testing.T) { t.Run("put in batch", func(t *testing.T) {
batch := db.GetBatch(true) batch := new(leveldb.Batch)
var want uint64 = 42 var want uint64 = 42
err = counter.PutInBatch(batch, want) counter.PutInBatch(batch, want)
if err != nil {
t.Fatal(err)
}
err = db.WriteBatch(batch) err = db.WriteBatch(batch)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -92,12 +91,9 @@ func TestUint64Field(t *testing.T) { ...@@ -92,12 +91,9 @@ func TestUint64Field(t *testing.T) {
} }
t.Run("overwrite", func(t *testing.T) { t.Run("overwrite", func(t *testing.T) {
batch := db.GetBatch(true) batch := new(leveldb.Batch)
var want uint64 = 84 var want uint64 = 84
err = counter.PutInBatch(batch, want) counter.PutInBatch(batch, want)
if err != nil {
t.Fatal(err)
}
err = db.WriteBatch(batch) err = db.WriteBatch(batch)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -154,7 +150,7 @@ func TestUint64Field_IncInBatch(t *testing.T) { ...@@ -154,7 +150,7 @@ func TestUint64Field_IncInBatch(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
batch := db.GetBatch(true) batch := new(leveldb.Batch)
var want uint64 = 1 var want uint64 = 1
got, err := counter.IncInBatch(batch) got, err := counter.IncInBatch(batch)
if err != nil { if err != nil {
...@@ -175,7 +171,7 @@ func TestUint64Field_IncInBatch(t *testing.T) { ...@@ -175,7 +171,7 @@ func TestUint64Field_IncInBatch(t *testing.T) {
t.Errorf("got uint64 %v, want %v", got, want) t.Errorf("got uint64 %v, want %v", got, want)
} }
batch2 := db.GetBatch(true) batch2 := new(leveldb.Batch)
want = 2 want = 2
got, err = counter.IncInBatch(batch2) got, err = counter.IncInBatch(batch2)
if err != nil { if err != nil {
...@@ -245,7 +241,7 @@ func TestUint64Field_DecInBatch(t *testing.T) { ...@@ -245,7 +241,7 @@ func TestUint64Field_DecInBatch(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
batch := db.GetBatch(true) batch := new(leveldb.Batch)
var want uint64 var want uint64
got, err := counter.DecInBatch(batch) got, err := counter.DecInBatch(batch)
if err != nil { if err != nil {
...@@ -266,12 +262,9 @@ func TestUint64Field_DecInBatch(t *testing.T) { ...@@ -266,12 +262,9 @@ func TestUint64Field_DecInBatch(t *testing.T) {
t.Errorf("got uint64 %v, want %v", got, want) t.Errorf("got uint64 %v, want %v", got, want)
} }
batch2 := db.GetBatch(true) batch2 := new(leveldb.Batch)
want = 42 want = 42
err = counter.PutInBatch(batch2, want) counter.PutInBatch(batch2, want)
if err != nil {
t.Fatal(err)
}
err = db.WriteBatch(batch2) err = db.WriteBatch(batch2)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -284,7 +277,7 @@ func TestUint64Field_DecInBatch(t *testing.T) { ...@@ -284,7 +277,7 @@ func TestUint64Field_DecInBatch(t *testing.T) {
t.Errorf("got uint64 %v, want %v", got, want) t.Errorf("got uint64 %v, want %v", got, want)
} }
batch3 := db.GetBatch(true) batch3 := new(leveldb.Batch)
want = 41 want = 41
got, err = counter.DecInBatch(batch3) got, err = counter.DecInBatch(batch3)
if err != nil { if err != nil {
......
This diff is collapsed.
...@@ -23,6 +23,8 @@ import ( ...@@ -23,6 +23,8 @@ import (
"sort" "sort"
"testing" "testing"
"time" "time"
"github.com/syndtr/goleveldb/leveldb"
) )
// Index functions for the index that is used in tests in this file. // Index functions for the index that is used in tests in this file.
...@@ -104,7 +106,7 @@ func TestIndex(t *testing.T) { ...@@ -104,7 +106,7 @@ func TestIndex(t *testing.T) {
StoreTimestamp: time.Now().UTC().UnixNano(), StoreTimestamp: time.Now().UTC().UnixNano(),
} }
batch := db.GetBatch(true) batch := new(leveldb.Batch)
err = index.PutInBatch(batch, want) err = index.PutInBatch(batch, want)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -128,7 +130,7 @@ func TestIndex(t *testing.T) { ...@@ -128,7 +130,7 @@ func TestIndex(t *testing.T) {
StoreTimestamp: time.Now().UTC().UnixNano(), StoreTimestamp: time.Now().UTC().UnixNano(),
} }
batch := db.GetBatch(true) batch := new(leveldb.Batch)
err = index.PutInBatch(batch, want) err = index.PutInBatch(batch, want)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -150,7 +152,7 @@ func TestIndex(t *testing.T) { ...@@ -150,7 +152,7 @@ func TestIndex(t *testing.T) {
t.Run("put in batch twice", func(t *testing.T) { t.Run("put in batch twice", func(t *testing.T) {
// ensure that the last item of items with the same db keys // ensure that the last item of items with the same db keys
// is actually saved // is actually saved
batch := db.GetBatch(true) batch := new(leveldb.Batch)
address := []byte("put-in-batch-twice-hash") address := []byte("put-in-batch-twice-hash")
// put the first item // put the first item
...@@ -214,7 +216,7 @@ func TestIndex(t *testing.T) { ...@@ -214,7 +216,7 @@ func TestIndex(t *testing.T) {
} }
has, err = index.Has(dontWant) has, err = index.Has(dontWant)
if err != nil && err != ErrNotFound { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if has { if has {
...@@ -248,7 +250,7 @@ func TestIndex(t *testing.T) { ...@@ -248,7 +250,7 @@ func TestIndex(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
wantErr := ErrNotFound wantErr := leveldb.ErrNotFound
_, err = index.Get(Item{ _, err = index.Get(Item{
Address: want.Address, Address: want.Address,
}) })
...@@ -276,7 +278,7 @@ func TestIndex(t *testing.T) { ...@@ -276,7 +278,7 @@ func TestIndex(t *testing.T) {
} }
checkItem(t, got, want) checkItem(t, got, want)
batch := db.GetBatch(true) batch := new(leveldb.Batch)
err = index.DeleteInBatch(batch, Item{ err = index.DeleteInBatch(batch, Item{
Address: want.Address, Address: want.Address,
}) })
...@@ -288,7 +290,7 @@ func TestIndex(t *testing.T) { ...@@ -288,7 +290,7 @@ func TestIndex(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
wantErr := ErrNotFound wantErr := leveldb.ErrNotFound
_, err = index.Get(Item{ _, err = index.Get(Item{
Address: want.Address, Address: want.Address,
}) })
...@@ -351,7 +353,7 @@ func TestIndex(t *testing.T) { ...@@ -351,7 +353,7 @@ func TestIndex(t *testing.T) {
items = append(items, Item{ items = append(items, Item{
Address: []byte("put-hash-missing"), Address: []byte("put-hash-missing"),
}) })
want := ErrNotFound want := leveldb.ErrNotFound
err := index.Fill(items) err := index.Fill(items)
if err != want { if err != want {
t.Errorf("got error %v, want %v", err, want) t.Errorf("got error %v, want %v", err, want)
...@@ -393,7 +395,7 @@ func TestIndex_Iterate(t *testing.T) { ...@@ -393,7 +395,7 @@ func TestIndex_Iterate(t *testing.T) {
Data: []byte("data1"), Data: []byte("data1"),
}, },
} }
batch := db.GetBatch(true) batch := new(leveldb.Batch)
for _, i := range items { for _, i := range items {
err = index.PutInBatch(batch, i) err = index.PutInBatch(batch, i)
if err != nil { if err != nil {
...@@ -567,7 +569,7 @@ func TestIndex_Iterate_withPrefix(t *testing.T) { ...@@ -567,7 +569,7 @@ func TestIndex_Iterate_withPrefix(t *testing.T) {
{Address: []byte("want-hash-09"), Data: []byte("data89")}, {Address: []byte("want-hash-09"), Data: []byte("data89")},
{Address: []byte("skip-hash-10"), Data: []byte("data90")}, {Address: []byte("skip-hash-10"), Data: []byte("data90")},
} }
batch := db.GetBatch(true) batch := new(leveldb.Batch)
for _, i := range allItems { for _, i := range allItems {
err = index.PutInBatch(batch, i) err = index.PutInBatch(batch, i)
if err != nil { if err != nil {
...@@ -763,7 +765,7 @@ func TestIndex_count(t *testing.T) { ...@@ -763,7 +765,7 @@ func TestIndex_count(t *testing.T) {
Data: []byte("data1"), Data: []byte("data1"),
}, },
} }
batch := db.GetBatch(true) batch := new(leveldb.Batch)
for _, i := range items { for _, i := range items {
err = index.PutInBatch(batch, i) err = index.PutInBatch(batch, i)
if err != nil { if err != nil {
...@@ -936,7 +938,7 @@ func TestIndex_firstAndLast(t *testing.T) { ...@@ -936,7 +938,7 @@ func TestIndex_firstAndLast(t *testing.T) {
return bytes.Compare(addrs[i], addrs[j]) == -1 return bytes.Compare(addrs[i], addrs[j]) == -1
}) })
batch := db.GetBatch(true) batch := new(leveldb.Batch)
for _, addr := range addrs { for _, addr := range addrs {
err = index.PutInBatch(batch, Item{ err = index.PutInBatch(batch, Item{
Address: addr, Address: addr,
...@@ -998,11 +1000,11 @@ func TestIndex_firstAndLast(t *testing.T) { ...@@ -998,11 +1000,11 @@ func TestIndex_firstAndLast(t *testing.T) {
}, },
{ {
prefix: []byte{0, 3}, prefix: []byte{0, 3},
err: ErrNotFound, err: leveldb.ErrNotFound,
}, },
{ {
prefix: []byte{222}, prefix: []byte{222},
err: ErrNotFound, err: leveldb.ErrNotFound,
}, },
} { } {
got, err := index.Last(tc.prefix) got, err := index.Last(tc.prefix)
...@@ -1087,7 +1089,7 @@ func TestIndex_HasMulti(t *testing.T) { ...@@ -1087,7 +1089,7 @@ func TestIndex_HasMulti(t *testing.T) {
Data: []byte("data0"), Data: []byte("data0"),
} }
batch := db.GetBatch(true) batch := new(leveldb.Batch)
for _, i := range items { for _, i := range items {
err = index.PutInBatch(batch, i) err = index.PutInBatch(batch, i)
if err != nil { if err != nil {
......
...@@ -13,179 +13,95 @@ type metrics struct { ...@@ -13,179 +13,95 @@ type metrics struct {
// all metrics fields must be exported // all metrics fields must be exported
// to be able to return them by Metrics() // to be able to return them by Metrics()
// using reflection // using reflection
GetCount prometheus.Counter PutCounter prometheus.Counter
GetFailCount prometheus.Counter PutFailCounter prometheus.Counter
GetNotFoundCount prometheus.Counter GetCounter prometheus.Counter
PutCount prometheus.Counter GetFailCounter prometheus.Counter
PutFailCount prometheus.Counter GetNotFoundCounter prometheus.Counter
HasCount prometheus.Counter HasCounter prometheus.Counter
HasFailCount prometheus.Counter HasFailCounter prometheus.Counter
DeleteCount prometheus.Counter DeleteCounter prometheus.Counter
DeleteFailCount prometheus.Counter DeleteFailCounter prometheus.Counter
TotalCount prometheus.Counter IteratorCounter prometheus.Counter
TotalFailCount prometheus.Counter WriteBatchCounter prometheus.Counter
CountPrefixCount prometheus.Counter WriteBatchFailCounter prometheus.Counter
CountPrefixFailCount prometheus.Counter
CountFromCount prometheus.Counter
CountFromFailCount prometheus.Counter
IterationCount prometheus.Counter
IterationFailCount prometheus.Counter
FirstCount prometheus.Counter
FirstFailCount prometheus.Counter
LastCount prometheus.Counter
LastFailCount prometheus.Counter
GetBatchCount prometheus.Counter
WriteBatchCount prometheus.Counter
WriteBatchFailCount prometheus.Counter
} }
func newMetrics() metrics { func newMetrics() metrics {
subsystem := "shed" subsystem := "shed"
return metrics{ return metrics{
GetCount: prometheus.NewCounter(prometheus.CounterOpts{ PutCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "get_count", Name: "put_count",
Help: "Number of times a GET operation is performed.", Help: "Number of times the PUT operation is done.",
}), }),
GetFailCount: prometheus.NewCounter(prometheus.CounterOpts{ PutFailCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "get_failure_count", Name: "put_fail_count",
Help: "Number of times a GET operation failed.", Help: "Number of times the PUT operation failed.",
}), }),
GetNotFoundCount: prometheus.NewCounter(prometheus.CounterOpts{ GetCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "get_not_found_count", Name: "get_count",
Help: "Number of times a GET operation failed.", Help: "Number of times the GET operation is done.",
}), }),
PutCount: prometheus.NewCounter(prometheus.CounterOpts{ GetNotFoundCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "put_count", Name: "get_not_found_count",
Help: "Number of times a PUT operation is performed.", Help: "Number of times the GET operation could not find key.",
}), }),
PutFailCount: prometheus.NewCounter(prometheus.CounterOpts{ GetFailCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "put_failure_count", Name: "get_fail_count",
Help: "Number of times a PUT operation failed.", Help: "Number of times the GET operation is failed.",
}), }),
HasCount: prometheus.NewCounter(prometheus.CounterOpts{ HasCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "has_count", Name: "has_count",
Help: "Number of times a HAS operation is performed.", Help: "Number of times the HAS operation is done.",
}), }),
HasFailCount: prometheus.NewCounter(prometheus.CounterOpts{ HasFailCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "has_failure_count", Name: "has_fail_count",
Help: "Number of times a HAS operation failed.", Help: "Number of times the HAS operation failed.",
}), }),
DeleteCount: prometheus.NewCounter(prometheus.CounterOpts{ DeleteCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "delete_count", Name: "delete_count",
Help: "Number of times a DELETE operation is performed.", Help: "Number of times the DELETE operation is done.",
}),
DeleteFailCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "delete_failure_count",
Help: "Number of times a DELETE operation failed.",
}),
TotalCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_count",
Help: "Number of times a COUNT operation is performed.",
}),
TotalFailCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_failure_count",
Help: "Number of times a COUNT operation failed.",
}),
CountPrefixCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "count_prefix_count",
Help: "Number of times a COUNT_PREFIX operation is performed.",
}),
CountFromFailCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "count_from_failure_count",
Help: "Number of times a COUNT_FROM operation failed.",
}),
CountFromCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "count_from_count",
Help: "Number of times a COUNT_FROM operation is performed.",
}),
CountPrefixFailCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "count_prefix_failure_count",
Help: "Number of times a COUNT_PREFIX operation failed.",
}),
IterationCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "iteration_count",
Help: "Number of times a ITERATION operation is performed.",
}),
IterationFailCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "iteration_failure_count",
Help: "Number of times a ITERATION operation failed.",
}),
FirstCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "first_count",
Help: "Number of times a FIRST operation is performed.",
}),
FirstFailCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "first_failure_count",
Help: "Number of times a FIRST operation failed.",
}),
LastCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "last_count",
Help: "Number of times a LAST operation is performed.",
}), }),
LastFailCount: prometheus.NewCounter(prometheus.CounterOpts{ DeleteFailCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "last_failure_count", Name: "delete_fail_count",
Help: "Number of times a LAST operation failed.", Help: "Number of times the DELETE operation failed.",
}), }),
GetBatchCount: prometheus.NewCounter(prometheus.CounterOpts{ IteratorCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "getbatch_count", Name: "iterator_count",
Help: "Number of times a GET_BATCH operation is performed.", Help: "Number of times the ITERATOR operation is done.",
}), }),
WriteBatchCount: prometheus.NewCounter(prometheus.CounterOpts{ WriteBatchCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "write_batch_count", Name: "write_batch_count",
Help: "Number of times a WRITE_BATCH operation is performed.", Help: "Number of times the WRITE_BATCH operation is done.",
}), }),
WriteBatchFailCount: prometheus.NewCounter(prometheus.CounterOpts{ WriteBatchFailCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace, Namespace: m.Namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "write_batch_failure_count", Name: "write_batch_fail_count",
Help: "Number of times a WRITE_BATCH operation failed.", Help: "Number of times the WRITE_BATCH operation failed.",
}), }),
} }
} }
......
...@@ -19,8 +19,8 @@ package shed ...@@ -19,8 +19,8 @@ package shed
import ( import (
"encoding/binary" "encoding/binary"
"github.com/dgraph-io/badger/v2"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/syndtr/goleveldb/leveldb"
) )
// Uint64Vector provides a way to have multiple counters in the database. // Uint64Vector provides a way to have multiple counters in the database.
...@@ -51,7 +51,7 @@ func (db *DB) NewUint64Vector(name string) (f Uint64Vector, err error) { ...@@ -51,7 +51,7 @@ func (db *DB) NewUint64Vector(name string) (f Uint64Vector, err error) {
func (f Uint64Vector) Get(i uint64) (val uint64, err error) { func (f Uint64Vector) Get(i uint64) (val uint64, err error) {
b, err := f.db.Get(f.indexKey(i)) b, err := f.db.Get(f.indexKey(i))
if err != nil { if err != nil {
if err == ErrNotFound { if err == leveldb.ErrNotFound {
return 0, nil return 0, nil
} }
return 0, err return 0, err
...@@ -66,8 +66,8 @@ func (f Uint64Vector) Put(i, val uint64) (err error) { ...@@ -66,8 +66,8 @@ func (f Uint64Vector) Put(i, val uint64) (err error) {
// PutInBatch stores a uint64 value at index i in a batch // PutInBatch stores a uint64 value at index i in a batch
// that can be saved later in the database. // that can be saved later in the database.
func (f Uint64Vector) PutInBatch(batch *badger.Txn, i, val uint64) (err error) { func (f Uint64Vector) PutInBatch(batch *leveldb.Batch, i, val uint64) {
return batch.Set(f.indexKey(i), encodeUint64(val)) batch.Put(f.indexKey(i), encodeUint64(val))
} }
// Inc increments a uint64 value in the database. // Inc increments a uint64 value in the database.
...@@ -75,8 +75,13 @@ func (f Uint64Vector) PutInBatch(batch *badger.Txn, i, val uint64) (err error) { ...@@ -75,8 +75,13 @@ func (f Uint64Vector) PutInBatch(batch *badger.Txn, i, val uint64) (err error) {
func (f Uint64Vector) Inc(i uint64) (val uint64, err error) { func (f Uint64Vector) Inc(i uint64) (val uint64, err error) {
val, err = f.Get(i) val, err = f.Get(i)
if err != nil { if err != nil {
if err == leveldb.ErrNotFound {
val = 0
} else {
f.logger.Debugf("error getiing value while doing Inc. Error: %s", err.Error())
return 0, err return 0, err
} }
}
val++ val++
return val, f.Put(i, val) return val, f.Put(i, val)
} }
...@@ -84,16 +89,18 @@ func (f Uint64Vector) Inc(i uint64) (val uint64, err error) { ...@@ -84,16 +89,18 @@ func (f Uint64Vector) Inc(i uint64) (val uint64, err error) {
// IncInBatch increments a uint64 value at index i in the batch // IncInBatch increments a uint64 value at index i in the batch
// by retreiving a value from the database, not the same batch. // by retreiving a value from the database, not the same batch.
// This operation is not goroutine safe. // This operation is not goroutine safe.
func (f Uint64Vector) IncInBatch(batch *badger.Txn, i uint64) (val uint64, err error) { func (f Uint64Vector) IncInBatch(batch *leveldb.Batch, i uint64) (val uint64, err error) {
val, err = f.Get(i) val, err = f.Get(i)
if err != nil { if err != nil {
if err == leveldb.ErrNotFound {
val = 0
} else {
f.logger.Debugf("error getiing value while doing IncInBatch. Error: %s", err.Error())
return 0, err return 0, err
} }
val++
err = f.PutInBatch(batch, i, val)
if err != nil {
return 0, err
} }
val++
f.PutInBatch(batch, i, val)
return val, nil return val, nil
} }
...@@ -103,7 +110,7 @@ func (f Uint64Vector) IncInBatch(batch *badger.Txn, i uint64) (val uint64, err e ...@@ -103,7 +110,7 @@ func (f Uint64Vector) IncInBatch(batch *badger.Txn, i uint64) (val uint64, err e
func (f Uint64Vector) Dec(i uint64) (val uint64, err error) { func (f Uint64Vector) Dec(i uint64) (val uint64, err error) {
val, err = f.Get(i) val, err = f.Get(i)
if err != nil { if err != nil {
if err == ErrNotFound { if err == leveldb.ErrNotFound {
val = 0 val = 0
} else { } else {
f.logger.Debugf("error getiing value while doing Dec. Error: %s", err.Error()) f.logger.Debugf("error getiing value while doing Dec. Error: %s", err.Error())
...@@ -120,18 +127,20 @@ func (f Uint64Vector) Dec(i uint64) (val uint64, err error) { ...@@ -120,18 +127,20 @@ func (f Uint64Vector) Dec(i uint64) (val uint64, err error) {
// by retreiving a value from the database, not the same batch. // by retreiving a value from the database, not the same batch.
// This operation is not goroutine safe. // This operation is not goroutine safe.
// The field is protected from overflow to a negative value. // The field is protected from overflow to a negative value.
func (f Uint64Vector) DecInBatch(batch *badger.Txn, i uint64) (val uint64, err error) { func (f Uint64Vector) DecInBatch(batch *leveldb.Batch, i uint64) (val uint64, err error) {
val, err = f.Get(i) val, err = f.Get(i)
if err != nil { if err != nil {
if err == leveldb.ErrNotFound {
val = 0
} else {
f.logger.Debugf("error getiing value while doing DecInBatch. Error: %s", err.Error())
return 0, err return 0, err
} }
}
if val != 0 { if val != 0 {
val-- val--
} }
err = f.PutInBatch(batch, i, val) f.PutInBatch(batch, i, val)
if err != nil {
return 0, err
}
return val, nil return val, nil
} }
......
...@@ -18,6 +18,8 @@ package shed ...@@ -18,6 +18,8 @@ package shed
import ( import (
"testing" "testing"
"github.com/syndtr/goleveldb/leveldb"
) )
// TestUint64Vector validates put and get operations // TestUint64Vector validates put and get operations
...@@ -76,12 +78,9 @@ func TestUint64Vector(t *testing.T) { ...@@ -76,12 +78,9 @@ func TestUint64Vector(t *testing.T) {
t.Run("put in batch", func(t *testing.T) { t.Run("put in batch", func(t *testing.T) {
for _, index := range []uint64{0, 1, 2, 3, 5, 10} { for _, index := range []uint64{0, 1, 2, 3, 5, 10} {
batch := db.GetBatch(true) batch := new(leveldb.Batch)
var want uint64 = 43 + index var want uint64 = 43 + index
err = bins.PutInBatch(batch, index, want) bins.PutInBatch(batch, index, want)
if err != nil {
t.Fatal(err)
}
err = db.WriteBatch(batch) err = db.WriteBatch(batch)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -95,12 +94,9 @@ func TestUint64Vector(t *testing.T) { ...@@ -95,12 +94,9 @@ func TestUint64Vector(t *testing.T) {
} }
t.Run("overwrite", func(t *testing.T) { t.Run("overwrite", func(t *testing.T) {
batch := db.GetBatch(true) batch := new(leveldb.Batch)
var want uint64 = 85 + index var want uint64 = 85 + index
err = bins.PutInBatch(batch, index, want) bins.PutInBatch(batch, index, want)
if err != nil {
t.Fatal(err)
}
err = db.WriteBatch(batch) err = db.WriteBatch(batch)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -161,7 +157,7 @@ func TestUint64Vector_IncInBatch(t *testing.T) { ...@@ -161,7 +157,7 @@ func TestUint64Vector_IncInBatch(t *testing.T) {
} }
for _, index := range []uint64{0, 1, 2, 3, 5, 10} { for _, index := range []uint64{0, 1, 2, 3, 5, 10} {
batch := db.GetBatch(true) batch := new(leveldb.Batch)
var want uint64 = 1 var want uint64 = 1
got, err := bins.IncInBatch(batch, index) got, err := bins.IncInBatch(batch, index)
if err != nil { if err != nil {
...@@ -182,7 +178,7 @@ func TestUint64Vector_IncInBatch(t *testing.T) { ...@@ -182,7 +178,7 @@ func TestUint64Vector_IncInBatch(t *testing.T) {
t.Errorf("got %v uint64 %v, want %v", index, got, want) t.Errorf("got %v uint64 %v, want %v", index, got, want)
} }
batch2 := db.GetBatch(true) batch2 := new(leveldb.Batch)
want = 2 want = 2
got, err = bins.IncInBatch(batch2, index) got, err = bins.IncInBatch(batch2, index)
if err != nil { if err != nil {
...@@ -256,7 +252,7 @@ func TestUint64Vector_DecInBatch(t *testing.T) { ...@@ -256,7 +252,7 @@ func TestUint64Vector_DecInBatch(t *testing.T) {
} }
for _, index := range []uint64{0, 1, 2, 3, 5, 10} { for _, index := range []uint64{0, 1, 2, 3, 5, 10} {
batch := db.GetBatch(true) batch := new(leveldb.Batch)
var want uint64 var want uint64
got, err := bins.DecInBatch(batch, index) got, err := bins.DecInBatch(batch, index)
if err != nil { if err != nil {
...@@ -277,12 +273,9 @@ func TestUint64Vector_DecInBatch(t *testing.T) { ...@@ -277,12 +273,9 @@ func TestUint64Vector_DecInBatch(t *testing.T) {
t.Errorf("got %v uint64 %v, want %v", index, got, want) t.Errorf("got %v uint64 %v, want %v", index, got, want)
} }
batch2 := db.GetBatch(true) batch2 := new(leveldb.Batch)
want = 42 + index want = 42 + index
err = bins.PutInBatch(batch2, index, want) bins.PutInBatch(batch2, index, want)
if err != nil {
t.Fatal(err)
}
err = db.WriteBatch(batch2) err = db.WriteBatch(batch2)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -295,7 +288,7 @@ func TestUint64Vector_DecInBatch(t *testing.T) { ...@@ -295,7 +288,7 @@ func TestUint64Vector_DecInBatch(t *testing.T) {
t.Errorf("got %v uint64 %v, want %v", index, got, want) t.Errorf("got %v uint64 %v, want %v", index, got, want)
} }
batch3 := db.GetBatch(true) batch3 := new(leveldb.Batch)
want = 41 + index want = 41 + index
got, err = bins.DecInBatch(batch3, index) got, err = bins.DecInBatch(batch3, index)
if err != nil { if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment