Commit 6fcbd50c authored by Anatolie Lupacescu's avatar Anatolie Lupacescu Committed by GitHub

feat: schema migrations to clean interval keys (#1734)

parent ea178d68
......@@ -202,6 +202,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
stateStore, err := InitStateStore(logger, o.DataDir)
if err != nil {
_ = stateStore.Close()
return nil, err
}
b.stateStoreCloser = stateStore
......
......@@ -19,19 +19,24 @@ package leveldb
import (
"errors"
"fmt"
"strings"
)
var errMissingCurrentSchema = errors.New("could not find current db schema")
var errMissingTargetSchema = errors.New("could not find target db schema")
var (
errMissingCurrentSchema = errors.New("could not find current db schema")
errMissingTargetSchema = errors.New("could not find target db schema")
)
const (
dbSchemaKey = "statestore_schema"
dbSchemaGrace = "grace"
dbSchemaGrace = "grace"
dbSchemaDrain = "drain"
dbSchemaCleanInterval = "clean-interval"
)
var (
dbSchemaCurrent = dbSchemaGrace
dbSchemaCurrent = dbSchemaCleanInterval
)
type migration struct {
......@@ -43,6 +48,38 @@ type migration struct {
// in order to run data migrations in the correct sequence
var schemaMigrations = []migration{
{name: dbSchemaGrace, fn: func(s *store) error { return nil }},
{name: dbSchemaDrain, fn: migrateGrace},
{name: dbSchemaCleanInterval, fn: migrateGrace},
}
func migrateGrace(s *store) error {
var collectedKeys []string
mgfn := func(k, v []byte) (bool, error) {
stk := string(k)
if strings.Contains(stk, "|") &&
len(k) > 32 &&
!strings.Contains(stk, "swap") &&
!strings.Contains(stk, "peer") {
s.logger.Debugf("found key designated to deletion %s", k)
collectedKeys = append(collectedKeys, stk)
}
return false, nil
}
_ = s.Iterate("", mgfn)
for _, v := range collectedKeys {
err := s.Delete(v)
if err != nil {
s.logger.Debugf("error deleting key %s", v)
continue
}
s.logger.Debugf("deleted key %s", v)
}
s.logger.Debugf("deleted keys: %d", len(collectedKeys))
return nil
}
func (s *store) migrate(schemaName string) error {
......@@ -56,7 +93,7 @@ func (s *store) migrate(schemaName string) error {
return nil
}
s.logger.Infof("statestore: need to run %d data migrations to schema %s", len(migrations), schemaName)
s.logger.Debugf("statestore: need to run %d data migrations to schema %s", len(migrations), schemaName)
for i := 0; i < len(migrations); i++ {
err := migrations[i].fn(s)
if err != nil {
......@@ -70,7 +107,7 @@ func (s *store) migrate(schemaName string) error {
if err != nil {
return err
}
s.logger.Infof("statestore: successfully ran migration: id %d current schema: %s", i, schemaName)
s.logger.Debugf("statestore: successfully ran migration: id %d current schema: %s", i, schemaName)
}
return nil
}
......@@ -91,7 +128,7 @@ func getMigrations(currentSchema, targetSchema string, allSchemeMigrations []mig
return nil, errors.New("found schema name for the second time when looking for migrations")
}
foundCurrent = true
store.logger.Infof("statestore migration: found current schema %s, migrate to %s, total migrations %d", currentSchema, dbSchemaCurrent, len(allSchemeMigrations)-i)
store.logger.Debugf("statestore migration: found current schema %s, migrate to %s, total migrations %d", currentSchema, dbSchemaCurrent, len(allSchemeMigrations)-i)
continue // current schema migration should not be executed (already has been when schema was migrated to)
case targetSchema:
foundTarget = true
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment