Commit 490125b3 authored by Zahoor Mohamed's avatar Zahoor Mohamed Committed by GitHub

P8 - Add Bee's logging to localstore (#84)

* Added bee's logging to localstore
Co-authored-by: default avataracud <12988138+acud@users.noreply.github.com>
parent dee4a635
...@@ -318,7 +318,7 @@ func (db *DB) Last(prefix []byte) (key []byte, value []byte, err error) { ...@@ -318,7 +318,7 @@ func (db *DB) Last(prefix []byte) (key []byte, value []byte, err error) {
o := badger.DefaultIteratorOptions o := badger.DefaultIteratorOptions
o.PrefetchValues = true o.PrefetchValues = true
o.PrefetchSize = 1024 o.PrefetchSize = 1024
o.Reverse = true // iterate backwards o.Reverse = true // iterate backwards
i := txn.NewIterator(o) i := txn.NewIterator(o)
defer i.Close() defer i.Close()
...@@ -335,7 +335,7 @@ func (db *DB) Last(prefix []byte) (key []byte, value []byte, err error) { ...@@ -335,7 +335,7 @@ func (db *DB) Last(prefix []byte) (key []byte, value []byte, err error) {
// If there is a no key which starts which nextPrefix, badger moves the // If there is a no key which starts which nextPrefix, badger moves the
// cursor to the previous key (which should be our key). // cursor to the previous key (which should be our key).
i.Seek(nextPrefix) i.Seek(nextPrefix)
if bytes.HasPrefix(i.Item().Key(),prefix) { if bytes.HasPrefix(i.Item().Key(), prefix) {
key = i.Item().Key() key = i.Item().Key()
value, err = i.Item().ValueCopy(nil) value, err = i.Item().ValueCopy(nil)
if err != nil { if err != nil {
...@@ -345,7 +345,7 @@ func (db *DB) Last(prefix []byte) (key []byte, value []byte, err error) { ...@@ -345,7 +345,7 @@ func (db *DB) Last(prefix []byte) (key []byte, value []byte, err error) {
// If there is a key which starts with nextPrefix, we do reverse Next() to // If there is a key which starts with nextPrefix, we do reverse Next() to
// reach our key and pick that up. // reach our key and pick that up.
i.Next() i.Next()
if bytes.HasPrefix(i.Item().Key(),prefix) { if bytes.HasPrefix(i.Item().Key(), prefix) {
key = i.Item().Key() key = i.Item().Key()
value, err = i.Item().ValueCopy(nil) value, err = i.Item().ValueCopy(nil)
if err != nil { if err != nil {
......
...@@ -67,7 +67,7 @@ func (f Uint64Field) Put(val uint64) (err error) { ...@@ -67,7 +67,7 @@ func (f Uint64Field) Put(val uint64) (err error) {
// PutInBatch stores a uint64 value in a batch // PutInBatch stores a uint64 value in a batch
// that can be saved later in the database. // that can be saved later in the database.
func (f Uint64Field) PutInBatch(batch *badger.Txn, val uint64) (err error){ func (f Uint64Field) PutInBatch(batch *badger.Txn, val uint64) (err error) {
return batch.Set(f.key, encodeUint64(val)) return batch.Set(f.key, encodeUint64(val))
} }
......
...@@ -256,7 +256,7 @@ func (f Index) PutInBatch(batch *badger.Txn, i Item) (err error) { ...@@ -256,7 +256,7 @@ func (f Index) PutInBatch(batch *badger.Txn, i Item) (err error) {
if err != nil { if err != nil {
return err return err
} }
return batch.Set(key, value) return batch.Set(key, value)
} }
// Delete accepts Item to remove a key/value pair // Delete accepts Item to remove a key/value pair
......
...@@ -217,7 +217,7 @@ func TestIndex(t *testing.T) { ...@@ -217,7 +217,7 @@ func TestIndex(t *testing.T) {
} }
has, err = index.Has(dontWant) has, err = index.Has(dontWant)
if err != nil && err != ErrNotFound{ if err != nil && err != ErrNotFound {
t.Fatal(err) t.Fatal(err)
} }
if has { if has {
......
...@@ -66,7 +66,7 @@ func (f Uint64Vector) Put(i, val uint64) (err error) { ...@@ -66,7 +66,7 @@ func (f Uint64Vector) Put(i, val uint64) (err error) {
// PutInBatch stores a uint64 value at index i in a batch // PutInBatch stores a uint64 value at index i in a batch
// that can be saved later in the database. // that can be saved later in the database.
func (f Uint64Vector) PutInBatch(batch *badger.Txn, i, val uint64) (err error){ func (f Uint64Vector) PutInBatch(batch *badger.Txn, i, val uint64) (err error) {
return batch.Set(f.indexKey(i), encodeUint64(val)) return batch.Set(f.indexKey(i), encodeUint64(val))
} }
......
...@@ -26,7 +26,6 @@ import ( ...@@ -26,7 +26,6 @@ import (
"sync" "sync"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
) )
...@@ -126,13 +125,13 @@ func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) { ...@@ -126,13 +125,13 @@ func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) {
} }
if len(hdr.Name) != 64 { if len(hdr.Name) != 64 {
log.Warn("ignoring non-chunk file", "name", hdr.Name) db.logger.Warningf("ignoring non-chunk file, name : %s", hdr.Name)
continue continue
} }
keybytes, err := hex.DecodeString(hdr.Name) keybytes, err := hex.DecodeString(hdr.Name)
if err != nil { if err != nil {
log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err) db.logger.Warningf("ignoring invalid chunk file. name : %s , Error : %s", hdr.Name, err)
continue continue
} }
......
...@@ -19,7 +19,6 @@ package localstore ...@@ -19,7 +19,6 @@ package localstore
import ( import (
"time" "time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
...@@ -54,7 +53,7 @@ func (db *DB) collectGarbageWorker() { ...@@ -54,7 +53,7 @@ func (db *DB) collectGarbageWorker() {
// another collect garbage run is needed // another collect garbage run is needed
collectedCount, done, err := db.collectGarbage() collectedCount, done, err := db.collectGarbage()
if err != nil { if err != nil {
log.Error("localstore collect garbage", "err", err) db.logger.Errorf("localstore collect garbage. Error : %s", err.Error())
} }
// check if another gc run is needed // check if another gc run is needed
if !done { if !done {
...@@ -96,7 +95,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { ...@@ -96,7 +95,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
// remove them from the gcIndex before iterating through gcIndex // remove them from the gcIndex before iterating through gcIndex
err = db.removeChunksInExcludeIndexFromGC() err = db.removeChunksInExcludeIndexFromGC()
if err != nil { if err != nil {
log.Error("localstore exclude pinned chunks", "err", err) db.logger.Errorf("localstore exclude pinned chunks. Error : %s", err)
return 0, true, err return 0, true, err
} }
......
...@@ -25,6 +25,7 @@ import ( ...@@ -25,6 +25,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
) )
...@@ -433,7 +434,8 @@ func TestDB_gcSize(t *testing.T) { ...@@ -433,7 +434,8 @@ func TestDB_gcSize(t *testing.T) {
if _, err := rand.Read(baseKey); err != nil { if _, err := rand.Read(baseKey); err != nil {
t.Fatal(err) t.Fatal(err)
} }
db, err := New(dir, baseKey, nil) logger := logging.New(ioutil.Discard, 0)
db, err := New(dir, baseKey, nil, logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -457,8 +459,7 @@ func TestDB_gcSize(t *testing.T) { ...@@ -457,8 +459,7 @@ func TestDB_gcSize(t *testing.T) {
if err := db.Close(); err != nil { if err := db.Close(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
db, err = New(dir, baseKey, nil, logger)
db, err = New(dir, baseKey, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -19,16 +19,16 @@ package localstore ...@@ -19,16 +19,16 @@ package localstore
import ( import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"github.com/prometheus/client_golang/prometheus"
"os" "os"
"runtime/pprof" "runtime/pprof"
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
"github.com/ethersphere/swarm/storage/mock" "github.com/ethersphere/swarm/storage/mock"
"github.com/prometheus/client_golang/prometheus"
) )
// DB implements chunk.Store. // DB implements chunk.Store.
...@@ -38,10 +38,6 @@ var ( ...@@ -38,10 +38,6 @@ var (
// ErrInvalidMode is retuned when an unknown Mode // ErrInvalidMode is retuned when an unknown Mode
// is provided to the function. // is provided to the function.
ErrInvalidMode = errors.New("invalid mode") ErrInvalidMode = errors.New("invalid mode")
// ErrAddressLockTimeout is returned when the same chunk
// is updated in parallel and one of the updates
// takes longer then the configured timeout duration.
ErrAddressLockTimeout = errors.New("address lock timeout")
) )
var ( var (
...@@ -128,6 +124,8 @@ type DB struct { ...@@ -128,6 +124,8 @@ type DB struct {
subscritionsWG sync.WaitGroup subscritionsWG sync.WaitGroup
metrics metrics metrics metrics
logger logging.Logger
} }
// Options struct holds optional parameters for configuring DB. // Options struct holds optional parameters for configuring DB.
...@@ -153,7 +151,7 @@ type Options struct { ...@@ -153,7 +151,7 @@ type Options struct {
// New returns a new DB. All fields and indexes are initialized // New returns a new DB. All fields and indexes are initialized
// and possible conflicts with schema from existing database is checked. // and possible conflicts with schema from existing database is checked.
// One goroutine for writing batches is created. // One goroutine for writing batches is created.
func New(path string, baseKey []byte, o *Options) (db *DB, err error) { func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB, err error) {
if o == nil { if o == nil {
// default options // default options
o = &Options{ o = &Options{
...@@ -178,6 +176,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { ...@@ -178,6 +176,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
collectGarbageWorkerDone: make(chan struct{}), collectGarbageWorkerDone: make(chan struct{}),
putToGCCheck: o.PutToGCCheck, putToGCCheck: o.PutToGCCheck,
metrics: newMetrics(), metrics: newMetrics(),
logger: logger,
} }
if db.capacity == 0 { if db.capacity == 0 {
db.capacity = defaultCapacity db.capacity = defaultCapacity
...@@ -455,7 +454,7 @@ func (db *DB) Close() (err error) { ...@@ -455,7 +454,7 @@ func (db *DB) Close() (err error) {
select { select {
case <-done: case <-done:
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
log.Error("localstore closed with still active goroutines") db.logger.Errorf("localstore closed with still active goroutines")
// Print a full goroutine dump to debug blocking. // Print a full goroutine dump to debug blocking.
// TODO: use a logger to write a goroutine profile // TODO: use a logger to write a goroutine profile
prof := pprof.Lookup("goroutine") prof := pprof.Lookup("goroutine")
......
...@@ -29,6 +29,7 @@ import ( ...@@ -29,6 +29,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
chunktesting "github.com/ethersphere/swarm/chunk/testing" chunktesting "github.com/ethersphere/swarm/chunk/testing"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
...@@ -151,7 +152,8 @@ func newTestDB(t testing.TB, o *Options) (db *DB, cleanupFunc func()) { ...@@ -151,7 +152,8 @@ func newTestDB(t testing.TB, o *Options) (db *DB, cleanupFunc func()) {
if _, err := rand.Read(baseKey); err != nil { if _, err := rand.Read(baseKey); err != nil {
t.Fatal(err) t.Fatal(err)
} }
db, err = New(dir, baseKey, o) logger := logging.New(ioutil.Discard, 0)
db, err = New(dir, baseKey, o, logger)
if err != nil { if err != nil {
cleanupFunc() cleanupFunc()
t.Fatal(err) t.Fatal(err)
......
...@@ -22,7 +22,6 @@ import ( ...@@ -22,7 +22,6 @@ import (
"fmt" "fmt"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
...@@ -45,7 +44,7 @@ var schemaMigrations = []migration{ ...@@ -45,7 +44,7 @@ var schemaMigrations = []migration{
} }
func (db *DB) migrate(schemaName string) error { func (db *DB) migrate(schemaName string) error {
migrations, err := getMigrations(schemaName, DbSchemaCurrent, schemaMigrations) migrations, err := getMigrations(schemaName, DbSchemaCurrent, schemaMigrations, db)
if err != nil { if err != nil {
return fmt.Errorf("error getting migrations for current schema (%s): %v", schemaName, err) return fmt.Errorf("error getting migrations for current schema (%s): %v", schemaName, err)
} }
...@@ -55,7 +54,7 @@ func (db *DB) migrate(schemaName string) error { ...@@ -55,7 +54,7 @@ func (db *DB) migrate(schemaName string) error {
return nil return nil
} }
log.Info("need to run data migrations on localstore", "numMigrations", len(migrations), "schemaName", schemaName) db.logger.Infof("need to run data migrations on localstore. numMigrations : %s, schemaName : %s ", len(migrations), schemaName)
for i := 0; i < len(migrations); i++ { for i := 0; i < len(migrations); i++ {
err := migrations[i].fn(db) err := migrations[i].fn(db)
if err != nil { if err != nil {
...@@ -69,7 +68,7 @@ func (db *DB) migrate(schemaName string) error { ...@@ -69,7 +68,7 @@ func (db *DB) migrate(schemaName string) error {
if err != nil { if err != nil {
return err return err
} }
log.Info("successfully ran migration", "migrationId", i, "currentSchema", schemaName) db.logger.Infof("successfully ran migration. migrationId : %s, currentSchema : %s", i, schemaName)
} }
return nil return nil
} }
...@@ -77,7 +76,7 @@ func (db *DB) migrate(schemaName string) error { ...@@ -77,7 +76,7 @@ func (db *DB) migrate(schemaName string) error {
// getMigrations returns an ordered list of migrations that need be executed // getMigrations returns an ordered list of migrations that need be executed
// with no errors in order to bring the localstore to the most up-to-date // with no errors in order to bring the localstore to the most up-to-date
// schema definition // schema definition
func getMigrations(currentSchema, targetSchema string, allSchemeMigrations []migration) (migrations []migration, err error) { func getMigrations(currentSchema, targetSchema string, allSchemeMigrations []migration, db *DB) (migrations []migration, err error) {
foundCurrent := false foundCurrent := false
foundTarget := false foundTarget := false
if currentSchema == DbSchemaCurrent { if currentSchema == DbSchemaCurrent {
...@@ -90,7 +89,7 @@ func getMigrations(currentSchema, targetSchema string, allSchemeMigrations []mig ...@@ -90,7 +89,7 @@ func getMigrations(currentSchema, targetSchema string, allSchemeMigrations []mig
return nil, errors.New("found schema name for the second time when looking for migrations") return nil, errors.New("found schema name for the second time when looking for migrations")
} }
foundCurrent = true foundCurrent = true
log.Info("found current localstore schema", "currentSchema", currentSchema, "migrateTo", DbSchemaCurrent, "total migrations", len(allSchemeMigrations)-i) db.logger.Infof("found current localstore schema. currentSchema : %s , migrateTo : %s, total migrations : %d", currentSchema, DbSchemaCurrent, len(allSchemeMigrations)-i)
continue // current schema migration should not be executed (already has been when schema was migrated to) continue // current schema migration should not be executed (already has been when schema was migrated to)
case targetSchema: case targetSchema:
foundTarget = true foundTarget = true
......
...@@ -26,6 +26,7 @@ import ( ...@@ -26,6 +26,7 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
) )
...@@ -60,8 +61,10 @@ func TestOneMigration(t *testing.T) { ...@@ -60,8 +61,10 @@ func TestOneMigration(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
logger := logging.New(ioutil.Discard, 0)
// start the fresh localstore with the sanctuary schema name // start the fresh localstore with the sanctuary schema name
db, err := New(dir, baseKey, nil) db, err := New(dir, baseKey, nil, logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -74,7 +77,7 @@ func TestOneMigration(t *testing.T) { ...@@ -74,7 +77,7 @@ func TestOneMigration(t *testing.T) {
DbSchemaCurrent = DbSchemaDiwali DbSchemaCurrent = DbSchemaDiwali
// start the existing localstore and expect the migration to run // start the existing localstore and expect the migration to run
db, err = New(dir, baseKey, nil) db, err = New(dir, baseKey, nil, logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -145,9 +148,10 @@ func TestManyMigrations(t *testing.T) { ...@@ -145,9 +148,10 @@ func TestManyMigrations(t *testing.T) {
if _, err := rand.Read(baseKey); err != nil { if _, err := rand.Read(baseKey); err != nil {
t.Fatal(err) t.Fatal(err)
} }
logger := logging.New(ioutil.Discard, 0)
// start the fresh localstore with the sanctuary schema name // start the fresh localstore with the sanctuary schema name
db, err := New(dir, baseKey, nil) db, err := New(dir, baseKey, nil, logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -160,7 +164,7 @@ func TestManyMigrations(t *testing.T) { ...@@ -160,7 +164,7 @@ func TestManyMigrations(t *testing.T) {
DbSchemaCurrent = "salvation" DbSchemaCurrent = "salvation"
// start the existing localstore and expect the migration to run // start the existing localstore and expect the migration to run
db, err = New(dir, baseKey, nil) db, err = New(dir, baseKey, nil, logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -224,9 +228,10 @@ func TestMigrationFailFrom(t *testing.T) { ...@@ -224,9 +228,10 @@ func TestMigrationFailFrom(t *testing.T) {
if _, err := rand.Read(baseKey); err != nil { if _, err := rand.Read(baseKey); err != nil {
t.Fatal(err) t.Fatal(err)
} }
logger := logging.New(ioutil.Discard, 0)
// start the fresh localstore with the sanctuary schema name // start the fresh localstore with the sanctuary schema name
db, err := New(dir, baseKey, nil) db, err := New(dir, baseKey, nil, logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -239,7 +244,7 @@ func TestMigrationFailFrom(t *testing.T) { ...@@ -239,7 +244,7 @@ func TestMigrationFailFrom(t *testing.T) {
DbSchemaCurrent = "foo" DbSchemaCurrent = "foo"
// start the existing localstore and expect the migration to run // start the existing localstore and expect the migration to run
_, err = New(dir, baseKey, nil) _, err = New(dir, baseKey, nil, logger)
if !strings.Contains(err.Error(), errMissingCurrentSchema.Error()) { if !strings.Contains(err.Error(), errMissingCurrentSchema.Error()) {
t.Fatalf("expected errCannotFindSchema but got %v", err) t.Fatalf("expected errCannotFindSchema but got %v", err)
} }
...@@ -284,8 +289,10 @@ func TestMigrationFailTo(t *testing.T) { ...@@ -284,8 +289,10 @@ func TestMigrationFailTo(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
logger := logging.New(ioutil.Discard, 0)
// start the fresh localstore with the sanctuary schema name // start the fresh localstore with the sanctuary schema name
db, err := New(dir, baseKey, nil) db, err := New(dir, baseKey, nil, logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -298,7 +305,7 @@ func TestMigrationFailTo(t *testing.T) { ...@@ -298,7 +305,7 @@ func TestMigrationFailTo(t *testing.T) {
DbSchemaCurrent = "foo" DbSchemaCurrent = "foo"
// start the existing localstore and expect the migration to run // start the existing localstore and expect the migration to run
_, err = New(dir, baseKey, nil) _, err = New(dir, baseKey, nil, logger)
if !strings.Contains(err.Error(), errMissingTargetSchema.Error()) { if !strings.Contains(err.Error(), errMissingTargetSchema.Error()) {
t.Fatalf("expected errMissingTargetSchema but got %v", err) t.Fatalf("expected errMissingTargetSchema but got %v", err)
} }
...@@ -339,9 +346,10 @@ func TestMigrateSanctuaryFixture(t *testing.T) { ...@@ -339,9 +346,10 @@ func TestMigrateSanctuaryFixture(t *testing.T) {
if _, err := rand.Read(baseKey); err != nil { if _, err := rand.Read(baseKey); err != nil {
t.Fatal(err) t.Fatal(err)
} }
logger := logging.New(ioutil.Discard, 0)
// start localstore with the copied fixture // start localstore with the copied fixture
db, err := New(tmpdir, baseKey, &Options{Tags: chunk.NewTags()}) db, err := New(tmpdir, baseKey, &Options{Tags: chunk.NewTags()}, logger)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -20,7 +20,6 @@ import ( ...@@ -20,7 +20,6 @@ import (
"context" "context"
"time" "time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
...@@ -106,7 +105,7 @@ func (db *DB) updateGCItems(items ...shed.Item) { ...@@ -106,7 +105,7 @@ func (db *DB) updateGCItems(items ...shed.Item) {
err := db.updateGC(item) err := db.updateGC(item)
if err != nil { if err != nil {
db.metrics.GCUpdateError.Inc() db.metrics.GCUpdateError.Inc()
log.Error("localstore update gc", "err", err) db.logger.Errorf("localstore update gc. Error : %s", err.Error())
} }
} }
// if gc update hook is defined, call it // if gc update hook is defined, call it
......
...@@ -227,16 +227,16 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed ...@@ -227,16 +227,16 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed
} }
err = db.retrievalDataIndex.PutInBatch(batch, item) err = db.retrievalDataIndex.PutInBatch(batch, item)
if err != nil { if err != nil {
return false, 0 , err return false, 0, err
} }
err = db.pullIndex.PutInBatch(batch, item) err = db.pullIndex.PutInBatch(batch, item)
if err != nil { if err != nil {
return false, 0 , err return false, 0, err
} }
if !anonymous { if !anonymous {
err = db.pushIndex.PutInBatch(batch, item) err = db.pushIndex.PutInBatch(batch, item)
if err != nil { if err != nil {
return false, 0 , err return false, 0, err
} }
} }
...@@ -282,11 +282,11 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I ...@@ -282,11 +282,11 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I
} }
err = db.retrievalDataIndex.PutInBatch(batch, item) err = db.retrievalDataIndex.PutInBatch(batch, item)
if err != nil { if err != nil {
return false, 0 , err return false, 0, err
} }
err = db.pullIndex.PutInBatch(batch, item) err = db.pullIndex.PutInBatch(batch, item)
if err != nil { if err != nil {
return false, 0 , err return false, 0, err
} }
if db.putToGCCheck(item.Address) { if db.putToGCCheck(item.Address) {
...@@ -323,7 +323,7 @@ func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, e ...@@ -323,7 +323,7 @@ func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, e
item.AccessTimestamp = i.AccessTimestamp item.AccessTimestamp = i.AccessTimestamp
err = db.gcIndex.DeleteInBatch(batch, item) err = db.gcIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
gcSizeChange-- gcSizeChange--
case leveldb.ErrNotFound: case leveldb.ErrNotFound:
...@@ -334,12 +334,12 @@ func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, e ...@@ -334,12 +334,12 @@ func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, e
item.AccessTimestamp = now() item.AccessTimestamp = now()
err = db.retrievalAccessIndex.PutInBatch(batch, item) err = db.retrievalAccessIndex.PutInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
err = db.gcIndex.PutInBatch(batch, item) err = db.gcIndex.PutInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
gcSizeChange++ gcSizeChange++
......
...@@ -22,7 +22,6 @@ import ( ...@@ -22,7 +22,6 @@ import (
"time" "time"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
...@@ -145,7 +144,7 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr chun ...@@ -145,7 +144,7 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr chun
case leveldb.ErrNotFound: case leveldb.ErrNotFound:
err = db.pushIndex.DeleteInBatch(batch, item) err = db.pushIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
item.StoreTimestamp = now() item.StoreTimestamp = now()
item.BinID, err = db.incBinID(binIDs, po) item.BinID, err = db.incBinID(binIDs, po)
...@@ -162,7 +161,7 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr chun ...@@ -162,7 +161,7 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr chun
item.AccessTimestamp = i.AccessTimestamp item.AccessTimestamp = i.AccessTimestamp
err = db.gcIndex.DeleteInBatch(batch, item) err = db.gcIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
gcSizeChange-- gcSizeChange--
case leveldb.ErrNotFound: case leveldb.ErrNotFound:
...@@ -173,15 +172,15 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr chun ...@@ -173,15 +172,15 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr chun
item.AccessTimestamp = now() item.AccessTimestamp = now()
err = db.retrievalAccessIndex.PutInBatch(batch, item) err = db.retrievalAccessIndex.PutInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
err = db.pullIndex.PutInBatch(batch, item) err = db.pullIndex.PutInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
err = db.gcIndex.PutInBatch(batch, item) err = db.gcIndex.PutInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
gcSizeChange++ gcSizeChange++
...@@ -211,7 +210,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS ...@@ -211,7 +210,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
// if it is there // if it is there
err = db.pushIndex.DeleteInBatch(batch, item) err = db.pushIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
return 0, nil return 0, nil
} }
...@@ -232,7 +231,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS ...@@ -232,7 +231,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
// if we return the error here - it means that for example, in stream protocol peers which we sync // if we return the error here - it means that for example, in stream protocol peers which we sync
// to would be dropped. this is possible when the chunk is put with ModePutRequest and ModeSetSyncPull is // to would be dropped. this is possible when the chunk is put with ModePutRequest and ModeSetSyncPull is
// called on the same chunk (which should not happen) // called on the same chunk (which should not happen)
log.Error("chunk not found in pull index", "addr", addr) db.logger.Errorf("chunk not found in pull index. addr: %s", addr.String())
break break
} }
return 0, err return 0, err
...@@ -266,7 +265,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS ...@@ -266,7 +265,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
// we handle this error internally, since this is an internal inconsistency of the indices // we handle this error internally, since this is an internal inconsistency of the indices
// this error can happen if the chunk is put with ModePutRequest or ModePutSync // this error can happen if the chunk is put with ModePutRequest or ModePutSync
// but this function is called with ModeSetSyncPush // but this function is called with ModeSetSyncPush
log.Error("chunk not found in push index", "addr", addr) db.logger.Errorf("chunk not found in push index. addr : %s", addr.String())
break break
} }
return 0, err return 0, err
...@@ -276,7 +275,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS ...@@ -276,7 +275,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
if err != nil { if err != nil {
// we cannot break or return here since the function needs to // we cannot break or return here since the function needs to
// run to end from db.pushIndex.DeleteInBatch // run to end from db.pushIndex.DeleteInBatch
log.Error("error getting tags on push sync set", "uid", i.Tag) db.logger.Errorf("error getting tags on push sync set. uid : %d", i.Tag)
} else { } else {
// setting a chunk for push sync assumes the tag is not anonymous // setting a chunk for push sync assumes the tag is not anonymous
if t.Anonymous { if t.Anonymous {
...@@ -289,7 +288,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS ...@@ -289,7 +288,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
err = db.pushIndex.DeleteInBatch(batch, item) err = db.pushIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
} }
...@@ -299,7 +298,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS ...@@ -299,7 +298,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
item.AccessTimestamp = i.AccessTimestamp item.AccessTimestamp = i.AccessTimestamp
err = db.gcIndex.DeleteInBatch(batch, item) err = db.gcIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
gcSizeChange-- gcSizeChange--
case leveldb.ErrNotFound: case leveldb.ErrNotFound:
...@@ -310,7 +309,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS ...@@ -310,7 +309,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
item.AccessTimestamp = now() item.AccessTimestamp = now()
err = db.retrievalAccessIndex.PutInBatch(batch, item) err = db.retrievalAccessIndex.PutInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
// Add in gcIndex only if this chunk is not pinned // Add in gcIndex only if this chunk is not pinned
...@@ -321,7 +320,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS ...@@ -321,7 +320,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
if !ok { if !ok {
err = db.gcIndex.PutInBatch(batch, item) err = db.gcIndex.PutInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
gcSizeChange++ gcSizeChange++
} }
...@@ -355,19 +354,19 @@ func (db *DB) setRemove(batch *leveldb.Batch, addr chunk.Address) (gcSizeChange ...@@ -355,19 +354,19 @@ func (db *DB) setRemove(batch *leveldb.Batch, addr chunk.Address) (gcSizeChange
err = db.retrievalDataIndex.DeleteInBatch(batch, item) err = db.retrievalDataIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
err = db.retrievalAccessIndex.DeleteInBatch(batch, item) err = db.retrievalAccessIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
err = db.pullIndex.DeleteInBatch(batch, item) err = db.pullIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
err = db.gcIndex.DeleteInBatch(batch, item) err = db.gcIndex.DeleteInBatch(batch, item)
if err != nil { if err != nil {
return 0 , err return 0, err
} }
// a check is needed for decrementing gcSize // a check is needed for decrementing gcSize
// as delete is not reporting if the key/value pair // as delete is not reporting if the key/value pair
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package localstore package localstore
import ( import (
"github.com/ethersphere/swarm/log"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
) )
...@@ -52,7 +51,6 @@ func IsLegacyDatabase(datadir string) bool { ...@@ -52,7 +51,6 @@ func IsLegacyDatabase(datadir string) bool {
db, err := leveldb.OpenFile(datadir, &opt.Options{OpenFilesCacheCapacity: 128}) db, err := leveldb.OpenFile(datadir, &opt.Options{OpenFilesCacheCapacity: 128})
if err != nil { if err != nil {
log.Error("got an error while trying to open leveldb path", "path", datadir, "err", err)
return false return false
} }
defer db.Close() defer db.Close()
...@@ -64,8 +62,6 @@ func IsLegacyDatabase(datadir string) bool { ...@@ -64,8 +62,6 @@ func IsLegacyDatabase(datadir string) bool {
return false return false
} }
log.Error("got an unexpected error fetching legacy name from the database", "err", err)
} }
log.Trace("checking if database scheme is legacy", "schema name", string(data))
return string(data) == DbSchemaHalloween || string(data) == DbSchemaPurity return string(data) == DbSchemaHalloween || string(data) == DbSchemaPurity
} }
...@@ -22,7 +22,6 @@ import ( ...@@ -22,7 +22,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
...@@ -136,7 +135,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) ...@@ -136,7 +135,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
return return
} }
db.metrics.SubscribePullIterationFailure.Inc() db.metrics.SubscribePullIterationFailure.Inc()
log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err) db.logger.Errorf("localstore pull subscription iteration. bin: %d, since: %d, until: %d. Error : %s", bin, since, until, err.Error())
return return
} }
if count > 0 { if count > 0 {
...@@ -153,7 +152,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) ...@@ -153,7 +152,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
case <-ctx.Done(): case <-ctx.Done():
err := ctx.Err() err := ctx.Err()
if err != nil { if err != nil {
log.Error("localstore pull subscription", "bin", bin, "since", since, "until", until, "err", err) db.logger.Errorf("localstore pull subscription. bin: %d, since: %d, until: %d. Error : %s", bin, since, until, err.Error())
} }
return return
} }
......
...@@ -22,7 +22,6 @@ import ( ...@@ -22,7 +22,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed" "github.com/ethersphere/swarm/shed"
) )
...@@ -81,7 +80,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun ...@@ -81,7 +80,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
// set next iteration start item // set next iteration start item
// when its chunk is successfully sent to channel // when its chunk is successfully sent to channel
sinceItem = &item sinceItem = &item
log.Trace("subscribe.push", "ref", fmt.Sprintf("%x", sinceItem.Address), "binid", sinceItem.BinID) db.logger.Tracef("subscribe.push. ref : %s, binId : %d", fmt.Sprintf("%x", sinceItem.Address), sinceItem.BinID)
return false, nil return false, nil
case <-stopChan: case <-stopChan:
// gracefully stop the iteration // gracefully stop the iteration
...@@ -105,7 +104,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun ...@@ -105,7 +104,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
if err != nil { if err != nil {
db.metrics.SubscribePushIterationFailure.Inc() db.metrics.SubscribePushIterationFailure.Inc()
log.Error("localstore push subscription iteration", "err", err) db.logger.Errorf("localstore push subscription iteration. Error : %s", err.Error())
return return
} }
case <-stopChan: case <-stopChan:
...@@ -119,7 +118,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun ...@@ -119,7 +118,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
case <-ctx.Done(): case <-ctx.Done():
err := ctx.Err() err := ctx.Err()
if err != nil { if err != nil {
log.Error("localstore push subscription", "err", err) db.logger.Errorf("localstore push subscription. Error : %s", err.Error())
} }
return return
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment