Commit c3c87289 authored by Peter Mrekaj's avatar Peter Mrekaj Committed by GitHub

feat(localstore): add migration for batch-index (#2112)

parent 2c84af85
......@@ -58,7 +58,7 @@ func dbExportCmd(cmd *cobra.Command) {
path := filepath.Join(dataDir, "localstore")
storer, err := localstore.New(path, nil, nil, logger)
storer, err := localstore.New(path, nil, nil, nil, logger)
if err != nil {
return fmt.Errorf("localstore: %w", err)
}
......@@ -118,7 +118,7 @@ func dbImportCmd(cmd *cobra.Command) {
path := filepath.Join(dataDir, "localstore")
storer, err := localstore.New(path, nil, nil, logger)
storer, err := localstore.New(path, nil, nil, nil, logger)
if err != nil {
return fmt.Errorf("localstore: %w", err)
}
......
......@@ -486,7 +486,7 @@ func TestDB_gcSize(t *testing.T) {
t.Fatal(err)
}
logger := logging.New(ioutil.Discard, 0)
db, err := New(dir, baseKey, nil, logger)
db, err := New(dir, baseKey, nil, nil, logger)
if err != nil {
t.Fatal(err)
}
......@@ -514,7 +514,7 @@ func TestDB_gcSize(t *testing.T) {
if err := db.Close(); err != nil {
t.Fatal(err)
}
db, err = New(dir, baseKey, nil, logger)
db, err = New(dir, baseKey, nil, nil, logger)
if err != nil {
t.Fatal(err)
}
......
......@@ -25,6 +25,7 @@ import (
"time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/pinning"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/postage/batchstore"
"github.com/ethersphere/bee/pkg/shed"
......@@ -62,6 +63,9 @@ type DB struct {
shed *shed.DB
tags *tags.Tags
// stateStore is needed to access the pinning Service.Pins() method.
stateStore storage.StateStorer
// schema name of loaded data
schemaName shed.StringField
......@@ -176,7 +180,7 @@ type Options struct {
// New returns a new DB. All fields and indexes are initialized
// and possible conflicts with schema from existing database is checked.
// One goroutine for writing batches is created.
func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB, err error) {
func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger logging.Logger) (db *DB, err error) {
if o == nil {
// default options
o = &Options{
......@@ -185,6 +189,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
}
db = &DB{
stateStore: ss,
cacheCapacity: o.Capacity,
baseKey: baseKey,
tags: o.Tags,
......@@ -241,7 +246,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
}
if schemaName == "" {
// initial new localstore run
err := db.schemaName.Put(DbSchemaCurrent)
err := db.schemaName.Put(DBSchemaCurrent)
if err != nil {
return nil, err
}
......@@ -585,6 +590,16 @@ func (db *DB) DebugIndices() (indexInfo map[string]int, err error) {
return indexInfo, err
}
// stateStoreHasPins returns true if the state-store
// contains any pins, otherwise false is returned.
func (db *DB) stateStoreHasPins() (bool, error) {
pins, err := pinning.NewService(nil, db.stateStore, nil).Pins()
if err != nil {
return false, err
}
return len(pins) > 0, nil
}
// chunkToItem creates new Item with data provided by the Chunk.
func chunkToItem(ch swarm.Chunk) shed.Item {
return shed.Item{
......
......@@ -158,7 +158,7 @@ func newTestDB(t testing.TB, o *Options) *DB {
t.Fatal(err)
}
logger := logging.New(ioutil.Discard, 0)
db, err := New("", baseKey, o, logger)
db, err := New("", baseKey, nil, o, logger)
if err != nil {
t.Fatal(err)
}
......
This diff is collapsed.
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package localstore
import (
"encoding/binary"
"errors"
"fmt"
"time"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm"
)
// DBSchemaBatchIndex is the bee schema identifier for batch index.
const DBSchemaBatchIndex = "batch-index"
// migrateBatchIndex removes all existing database content, unless pinned
// content is detected, in which case it aborts the operation for the user to
// resolve.
func migrateBatchIndex(db *DB) error {
has, err := db.stateStoreHasPins()
if err != nil {
return err
}
if has {
return errors.New("failed to update your node due to the existence of pinned content; please refer to the release notes on how to safely migrate your pinned content")
}
// Define the old indexes from the previous schema and swipe them clean.
retrievalDataIndex, err := db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|Sig|Data", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
return nil, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
return keyItem, nil
},
})
if err != nil {
return err
}
retrievalAccessIndex, err := db.shed.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp))
return b, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(value))
return e, nil
},
})
if err != nil {
return err
}
// pull index allows history and live syncing per po bin
pullIndex, err := db.shed.NewIndex("PO|BinID->Hash|Tag", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 41)
key[0] = db.po(swarm.NewAddress(fields.Address))
binary.BigEndian.PutUint64(key[1:9], fields.BinID)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.BinID = binary.BigEndian.Uint64(key[1:9])
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
value = make([]byte, 36) // 32 bytes address, 4 bytes tag
copy(value, fields.Address)
if fields.Tag != 0 {
binary.BigEndian.PutUint32(value[32:], fields.Tag)
}
return value, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.Address = value[:32]
if len(value) > 32 {
e.Tag = binary.BigEndian.Uint32(value[32:])
}
return e, nil
},
})
if err != nil {
return err
}
// create a vector for bin IDs
binIDs, err := db.shed.NewUint64Vector("bin-ids")
if err != nil {
return err
}
pushIndex, err := db.shed.NewIndex("StoreTimestamp|Hash->Tags", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 40)
binary.BigEndian.PutUint64(key[:8], uint64(fields.StoreTimestamp))
copy(key[8:], fields.Address)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key[8:]
e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
tag := make([]byte, 4)
binary.BigEndian.PutUint32(tag, fields.Tag)
return tag, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
if len(value) == 4 { // only values with tag should be decoded
e.Tag = binary.BigEndian.Uint32(value)
}
return e, nil
},
})
if err != nil {
return err
}
gcIndex, err := db.shed.NewIndex("AccessTimestamp|BinID|Hash->nil", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
b := make([]byte, 16, 16+len(fields.Address))
binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
binary.BigEndian.PutUint64(b[8:16], fields.BinID)
key = append(b, fields.Address...)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
e.BinID = binary.BigEndian.Uint64(key[8:16])
e.Address = key[16:]
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
return nil, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
return e, nil
},
})
if err != nil {
return err
}
// Create a index structure for excluding pinned chunks from gcIndex
gcExcludeIndex, err := db.shed.NewIndex("Hash->nil", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
return nil, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
return e, nil
},
})
if err != nil {
return err
}
start := time.Now()
db.logger.Debug("truncating indexes")
var count int
for _, v := range []struct {
name string
idx shed.Index
}{
{"pullsync", pullIndex},
{"pushsync", pushIndex},
{"gc", gcIndex},
{"gcExclude", gcExcludeIndex},
{"retrievalAccess", retrievalAccessIndex},
{"retrievalData", retrievalDataIndex},
} {
db.logger.Debugf("truncating %s index", v.name)
n, err := truncateIndex(db, v.idx)
if err != nil {
return fmt.Errorf("truncate %s index: %w", v.name, err)
}
count += n
db.logger.Debugf("truncated %d %s index entries", count, v.name)
}
gcSize, err := db.shed.NewUint64Field("gc-size")
if err != nil {
return fmt.Errorf("gc size index: %w", err)
}
err = gcSize.Put(0)
if err != nil {
return fmt.Errorf("put gcsize: %w", err)
}
for i := 0; i < int(swarm.MaxBins); i++ {
if err := binIDs.Put(uint64(i), 0); err != nil {
return fmt.Errorf("zero binsIDs: %w", err)
}
}
db.logger.Debugf("done truncating indexes. took %s", time.Since(start))
return nil
}
......@@ -29,20 +29,20 @@ import (
func TestOneMigration(t *testing.T) {
defer func(v []migration, s string) {
schemaMigrations = v
DbSchemaCurrent = s
}(schemaMigrations, DbSchemaCurrent)
DBSchemaCurrent = s
}(schemaMigrations, DBSchemaCurrent)
DbSchemaCurrent = DbSchemaCode
DBSchemaCurrent = DBSchemaCode
dbSchemaNext := "dbSchemaNext"
ran := false
shouldNotRun := false
schemaMigrations = []migration{
{name: DbSchemaCode, fn: func(db *DB) error {
{schemaName: DBSchemaCode, fn: func(db *DB) error {
shouldNotRun = true // this should not be executed
return nil
}},
{name: dbSchemaNext, fn: func(db *DB) error {
{schemaName: dbSchemaNext, fn: func(db *DB) error {
ran = true
return nil
}},
......@@ -61,7 +61,7 @@ func TestOneMigration(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
// start the fresh localstore with the sanctuary schema name
db, err := New(dir, baseKey, nil, logger)
db, err := New(dir, baseKey, nil, nil, logger)
if err != nil {
t.Fatal(err)
}
......@@ -71,10 +71,10 @@ func TestOneMigration(t *testing.T) {
t.Fatal(err)
}
DbSchemaCurrent = dbSchemaNext
DBSchemaCurrent = dbSchemaNext
// start the existing localstore and expect the migration to run
db, err = New(dir, baseKey, nil, logger)
db, err = New(dir, baseKey, nil, nil, logger)
if err != nil {
t.Fatal(err)
}
......@@ -105,32 +105,32 @@ func TestOneMigration(t *testing.T) {
func TestManyMigrations(t *testing.T) {
defer func(v []migration, s string) {
schemaMigrations = v
DbSchemaCurrent = s
}(schemaMigrations, DbSchemaCurrent)
DBSchemaCurrent = s
}(schemaMigrations, DBSchemaCurrent)
DbSchemaCurrent = DbSchemaCode
DBSchemaCurrent = DBSchemaCode
shouldNotRun := false
executionOrder := []int{-1, -1, -1, -1}
schemaMigrations = []migration{
{name: DbSchemaCode, fn: func(db *DB) error {
{schemaName: DBSchemaCode, fn: func(db *DB) error {
shouldNotRun = true // this should not be executed
return nil
}},
{name: "keju", fn: func(db *DB) error {
{schemaName: "keju", fn: func(db *DB) error {
executionOrder[0] = 0
return nil
}},
{name: "coconut", fn: func(db *DB) error {
{schemaName: "coconut", fn: func(db *DB) error {
executionOrder[1] = 1
return nil
}},
{name: "mango", fn: func(db *DB) error {
{schemaName: "mango", fn: func(db *DB) error {
executionOrder[2] = 2
return nil
}},
{name: "salvation", fn: func(db *DB) error {
{schemaName: "salvation", fn: func(db *DB) error {
executionOrder[3] = 3
return nil
}},
......@@ -148,7 +148,7 @@ func TestManyMigrations(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
// start the fresh localstore with the sanctuary schema name
db, err := New(dir, baseKey, nil, logger)
db, err := New(dir, baseKey, nil, nil, logger)
if err != nil {
t.Fatal(err)
}
......@@ -158,10 +158,10 @@ func TestManyMigrations(t *testing.T) {
t.Fatal(err)
}
DbSchemaCurrent = "salvation"
DBSchemaCurrent = "salvation"
// start the existing localstore and expect the migration to run
db, err = New(dir, baseKey, nil, logger)
db, err = New(dir, baseKey, nil, nil, logger)
if err != nil {
t.Fatal(err)
}
......@@ -195,22 +195,22 @@ func TestManyMigrations(t *testing.T) {
func TestMigrationErrorFrom(t *testing.T) {
defer func(v []migration, s string) {
schemaMigrations = v
DbSchemaCurrent = s
}(schemaMigrations, DbSchemaCurrent)
DBSchemaCurrent = s
}(schemaMigrations, DBSchemaCurrent)
DbSchemaCurrent = "koo-koo-schema"
DBSchemaCurrent = "koo-koo-schema"
shouldNotRun := false
schemaMigrations = []migration{
{name: "langur", fn: func(db *DB) error {
{schemaName: "langur", fn: func(db *DB) error {
shouldNotRun = true
return nil
}},
{name: "coconut", fn: func(db *DB) error {
{schemaName: "coconut", fn: func(db *DB) error {
shouldNotRun = true
return nil
}},
{name: "chutney", fn: func(db *DB) error {
{schemaName: "chutney", fn: func(db *DB) error {
shouldNotRun = true
return nil
}},
......@@ -228,7 +228,7 @@ func TestMigrationErrorFrom(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
// start the fresh localstore with the sanctuary schema name
db, err := New(dir, baseKey, nil, logger)
db, err := New(dir, baseKey, nil, nil, logger)
if err != nil {
t.Fatal(err)
}
......@@ -238,10 +238,10 @@ func TestMigrationErrorFrom(t *testing.T) {
t.Fatal(err)
}
DbSchemaCurrent = "foo"
DBSchemaCurrent = "foo"
// start the existing localstore and expect the migration to run
_, err = New(dir, baseKey, nil, logger)
_, err = New(dir, baseKey, nil, nil, logger)
if !strings.Contains(err.Error(), errMissingCurrentSchema.Error()) {
t.Fatalf("expected errCannotFindSchema but got %v", err)
}
......@@ -255,22 +255,22 @@ func TestMigrationErrorFrom(t *testing.T) {
func TestMigrationErrorTo(t *testing.T) {
defer func(v []migration, s string) {
schemaMigrations = v
DbSchemaCurrent = s
}(schemaMigrations, DbSchemaCurrent)
DBSchemaCurrent = s
}(schemaMigrations, DBSchemaCurrent)
DbSchemaCurrent = "langur"
DBSchemaCurrent = "langur"
shouldNotRun := false
schemaMigrations = []migration{
{name: "langur", fn: func(db *DB) error {
{schemaName: "langur", fn: func(db *DB) error {
shouldNotRun = true
return nil
}},
{name: "coconut", fn: func(db *DB) error {
{schemaName: "coconut", fn: func(db *DB) error {
shouldNotRun = true
return nil
}},
{name: "chutney", fn: func(db *DB) error {
{schemaName: "chutney", fn: func(db *DB) error {
shouldNotRun = true
return nil
}},
......@@ -289,7 +289,7 @@ func TestMigrationErrorTo(t *testing.T) {
logger := logging.New(ioutil.Discard, 0)
// start the fresh localstore with the sanctuary schema name
db, err := New(dir, baseKey, nil, logger)
db, err := New(dir, baseKey, nil, nil, logger)
if err != nil {
t.Fatal(err)
}
......@@ -299,10 +299,10 @@ func TestMigrationErrorTo(t *testing.T) {
t.Fatal(err)
}
DbSchemaCurrent = "foo"
DBSchemaCurrent = "foo"
// start the existing localstore and expect the migration to run
_, err = New(dir, baseKey, nil, logger)
_, err = New(dir, baseKey, nil, nil, logger)
if !strings.Contains(err.Error(), errMissingTargetSchema.Error()) {
t.Fatalf("expected errMissingTargetSchema but got %v", err)
}
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package localstore
import (
"encoding/binary"
"errors"
"fmt"
"time"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm"
)
// DBSchemaYuj is the bee schema identifier for storage incentives initial iteration.
const DBSchemaYuj = "yuj"
// migrateYuj removes all existing database content, unless
// pinned content is detected, in which case it aborts the
// operation for the user to resolve.
func migrateYuj(db *DB) error {
pinIndex, err := db.shed.NewIndex("Hash->PinCounter", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b[:8], fields.PinCounter)
return b, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.PinCounter = binary.BigEndian.Uint64(value[:8])
return e, nil
},
})
if err != nil {
return err
}
hasPins := false
_ = pinIndex.Iterate(func(item shed.Item) (stop bool, err error) {
hasPins = true
return true, nil
}, nil)
if hasPins {
return errors.New("failed to update your node due to the existence of pinned content; please refer to the release notes on how to safely migrate your pinned content")
}
// Define the old indexes from the previous schema and swipe them clean.
retrievalDataIndex, err := db.shed.NewIndex("Address->StoreTimestamp|BinID|Data", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
b := make([]byte, 16)
binary.BigEndian.PutUint64(b[:8], fields.BinID)
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
value = append(b, fields.Data...)
return value, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
e.BinID = binary.BigEndian.Uint64(value[:8])
e.Data = value[16:]
return e, nil
},
})
if err != nil {
return err
}
retrievalAccessIndex, err := db.shed.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp))
return b, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(value))
return e, nil
},
})
if err != nil {
return err
}
// pull index allows history and live syncing per po bin
pullIndex, err := db.shed.NewIndex("PO|BinID->Hash|Tag", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 41)
key[0] = db.po(swarm.NewAddress(fields.Address))
binary.BigEndian.PutUint64(key[1:9], fields.BinID)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.BinID = binary.BigEndian.Uint64(key[1:9])
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
value = make([]byte, 36) // 32 bytes address, 4 bytes tag
copy(value, fields.Address)
if fields.Tag != 0 {
binary.BigEndian.PutUint32(value[32:], fields.Tag)
}
return value, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.Address = value[:32]
if len(value) > 32 {
e.Tag = binary.BigEndian.Uint32(value[32:])
}
return e, nil
},
})
if err != nil {
return err
}
// create a vector for bin IDs
binIDs, err := db.shed.NewUint64Vector("bin-ids")
if err != nil {
return err
}
pushIndex, err := db.shed.NewIndex("StoreTimestamp|Hash->Tags", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 40)
binary.BigEndian.PutUint64(key[:8], uint64(fields.StoreTimestamp))
copy(key[8:], fields.Address)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key[8:]
e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
tag := make([]byte, 4)
binary.BigEndian.PutUint32(tag, fields.Tag)
return tag, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
if len(value) == 4 { // only values with tag should be decoded
e.Tag = binary.BigEndian.Uint32(value)
}
return e, nil
},
})
if err != nil {
return err
}
gcIndex, err := db.shed.NewIndex("AccessTimestamp|BinID|Hash->nil", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
b := make([]byte, 16, 16+len(fields.Address))
binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
binary.BigEndian.PutUint64(b[8:16], fields.BinID)
key = append(b, fields.Address...)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
e.BinID = binary.BigEndian.Uint64(key[8:16])
e.Address = key[16:]
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
return nil, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
return e, nil
},
})
if err != nil {
return err
}
// Create a index structure for excluding pinned chunks from gcIndex
gcExcludeIndex, err := db.shed.NewIndex("Hash->nil", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
return nil, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
return e, nil
},
})
if err != nil {
return err
}
start := time.Now()
db.logger.Debug("truncating indexes")
var count int
for _, v := range []struct {
name string
idx shed.Index
}{
{"pullsync", pullIndex},
{"pushsync", pushIndex},
{"gc", gcIndex},
{"gcExclude", gcExcludeIndex},
{"retrievalAccess", retrievalAccessIndex},
{"retrievalData", retrievalDataIndex},
} {
db.logger.Debugf("truncating %s index", v.name)
n, err := truncateIndex(db, v.idx)
if err != nil {
return fmt.Errorf("truncate %s index: %w", v.name, err)
}
count += n
db.logger.Debugf("truncated %d %s index entries", count, v.name)
}
gcSize, err := db.shed.NewUint64Field("gc-size")
if err != nil {
return fmt.Errorf("gc size index: %w", err)
}
err = gcSize.Put(0)
if err != nil {
return fmt.Errorf("put gcsize: %w", err)
}
for i := 0; i < int(swarm.MaxBins); i++ {
if err := binIDs.Put(uint64(i), 0); err != nil {
return fmt.Errorf("zero binsIDs: %w", err)
}
}
db.logger.Debugf("done truncating indexes. took %s", time.Since(start))
return nil
}
......@@ -16,16 +16,9 @@
package localstore
// The DB schema we want to use. The actual/current DB schema might differ
// until migrations are run.
var DbSchemaCurrent = DbSchemaYuj
// DBSchemaCode is the first bee schema identifier.
const DBSchemaCode = "code"
// There was a time when we had no schema at all.
const DbSchemaNone = ""
// DbSchemaCode is the first bee schema identifier
const DbSchemaCode = "code"
// DbSchemaYuj is the bee schema indentifier for storage incentives
// initial iteration.
const DbSchemaYuj = "yuj"
// DBSchemaCurrent represents the DB schema we want to use.
// The actual/current DB schema might differ until migrations are run.
var DBSchemaCurrent = DBSchemaBatchIndex
......@@ -348,7 +348,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
DisableSeeksCompaction: o.DBDisableSeeksCompaction,
}
storer, err := localstore.New(path, swarmAddress.Bytes(), lo, logger)
storer, err := localstore.New(path, swarmAddress.Bytes(), stateStore, lo, logger)
if err != nil {
return nil, fmt.Errorf("localstore: %w", err)
}
......
......@@ -562,7 +562,7 @@ func newTestDB(t testing.TB, o *localstore.Options) (baseKey []byte, db *localst
}
logger := logging.New(ioutil.Discard, 0)
db, err := localstore.New("", baseKey, o, logger)
db, err := localstore.New("", baseKey, nil, o, logger)
if err != nil {
t.Fatal(err)
}
......
......@@ -420,7 +420,7 @@ func TestPusherRetryShallow(t *testing.T) {
func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, mockOpts ...mock.Option) (*tags.Tags, *pusher.Service, *Store) {
t.Helper()
logger := logging.New(ioutil.Discard, 0)
storer, err := localstore.New("", addr.Bytes(), nil, logger)
storer, err := localstore.New("", addr.Bytes(), nil, nil, logger)
if err != nil {
t.Fatal(err)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment