Commit aa45aa15 authored by acud's avatar acud Committed by GitHub

statestore: add migration capabilities (#1392)

parent 545df010
package leveldb
var DbSchemaCurrent = dbSchemaCurrent
func (s *store) GetSchemaName() (string, error) {
return s.getSchemaName()
}
...@@ -19,7 +19,7 @@ import ( ...@@ -19,7 +19,7 @@ import (
var _ storage.StateStorer = (*store)(nil) var _ storage.StateStorer = (*store)(nil)
// Store uses LevelDB to store values. // store uses LevelDB to store values.
type store struct { type store struct {
db *leveldb.DB db *leveldb.DB
logger logging.Logger logger logging.Logger
...@@ -40,10 +40,32 @@ func NewStateStore(path string, l logging.Logger) (storage.StateStorer, error) { ...@@ -40,10 +40,32 @@ func NewStateStore(path string, l logging.Logger) (storage.StateStorer, error) {
} }
l.Warning("statestore recovery ok! you are kindly request to inform us about the steps that preceded the last Bee shutdown.") l.Warning("statestore recovery ok! you are kindly request to inform us about the steps that preceded the last Bee shutdown.")
} }
return &store{
s := &store{
db: db, db: db,
logger: l, logger: l,
}, nil }
sn, err := s.getSchemaName()
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
_ = s.Close()
return nil, fmt.Errorf("get schema name: %w", err)
}
// new statestore - put schema key with current name
if err := s.putSchemaName(dbSchemaCurrent); err != nil {
_ = s.Close()
return nil, fmt.Errorf("put schema name: %w", err)
}
sn = dbSchemaCurrent
}
if err = s.migrate(sn); err != nil {
_ = s.Close()
return nil, fmt.Errorf("migrate: %w", err)
}
return s, nil
} }
// Get retrieves a value of the requested key. If no results are found, // Get retrieves a value of the requested key. If no results are found,
...@@ -101,6 +123,21 @@ func (s *store) Iterate(prefix string, iterFunc storage.StateIterFunc) (err erro ...@@ -101,6 +123,21 @@ func (s *store) Iterate(prefix string, iterFunc storage.StateIterFunc) (err erro
return iter.Error() return iter.Error()
} }
func (s *store) getSchemaName() (string, error) {
name, err := s.db.Get([]byte(dbSchemaKey), nil)
if err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
return "", storage.ErrNotFound
}
return "", err
}
return string(name), nil
}
func (s *store) putSchemaName(val string) error {
return s.db.Put([]byte(dbSchemaKey), []byte(val), nil)
}
// Close releases the resources used by the store. // Close releases the resources used by the store.
func (s *store) Close() error { func (s *store) Close() error {
return s.db.Close() return s.db.Close()
......
...@@ -48,3 +48,35 @@ func TestPersistentStateStore(t *testing.T) { ...@@ -48,3 +48,35 @@ func TestPersistentStateStore(t *testing.T) {
return store return store
}) })
} }
func TestGetSchemaName(t *testing.T) {
dir, err := ioutil.TempDir("", "statestore_test")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := os.RemoveAll(dir); err != nil {
t.Fatal(err)
}
})
store, err := leveldb.NewStateStore(dir, nil)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := store.Close(); err != nil {
t.Fatal(err)
}
})
sn := store.(interface {
GetSchemaName() (string, error)
})
n, err := sn.GetSchemaName() // expect current
if err != nil {
t.Fatal(err)
}
if n != leveldb.DbSchemaCurrent {
t.Fatalf("wanted current db schema but got '%s'", n)
}
}
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package leveldb
import (
"errors"
"fmt"
)
var errMissingCurrentSchema = errors.New("could not find current db schema")
var errMissingTargetSchema = errors.New("could not find target db schema")
const (
dbSchemaKey = "statestore_schema"
dbSchemaGrace = "grace"
)
var (
dbSchemaCurrent = dbSchemaGrace
)
type migration struct {
name string // name of the schema
fn func(s *store) error // the migration function that needs to be performed in order to get to the current schema name
}
// schemaMigrations contains an ordered list of the database schemes, that is
// in order to run data migrations in the correct sequence
var schemaMigrations = []migration{
{name: dbSchemaGrace, fn: func(s *store) error { return nil }},
}
func (s *store) migrate(schemaName string) error {
migrations, err := getMigrations(schemaName, dbSchemaCurrent, schemaMigrations, s)
if err != nil {
return fmt.Errorf("error getting migrations for current schema (%s): %w", schemaName, err)
}
// no migrations to run
if migrations == nil {
return nil
}
s.logger.Infof("statestore: need to run %d data migrations to schema %s", len(migrations), schemaName)
for i := 0; i < len(migrations); i++ {
err := migrations[i].fn(s)
if err != nil {
return err
}
err = s.putSchemaName(migrations[i].name) // put the name of the current schema
if err != nil {
return err
}
schemaName, err = s.getSchemaName()
if err != nil {
return err
}
s.logger.Infof("statestore: successfully ran migration: id %d current schema: %s", i, schemaName)
}
return nil
}
// getMigrations returns an ordered list of migrations that need be executed
// with no errors in order to bring the statestore to the most up-to-date
// schema definition
func getMigrations(currentSchema, targetSchema string, allSchemeMigrations []migration, store *store) (migrations []migration, err error) {
foundCurrent := false
foundTarget := false
if currentSchema == dbSchemaCurrent {
return nil, nil
}
for i, v := range allSchemeMigrations {
switch v.name {
case currentSchema:
if foundCurrent {
return nil, errors.New("found schema name for the second time when looking for migrations")
}
foundCurrent = true
store.logger.Infof("statestore migration: found current schema %s, migrate to %s, total migrations %d", currentSchema, dbSchemaCurrent, len(allSchemeMigrations)-i)
continue // current schema migration should not be executed (already has been when schema was migrated to)
case targetSchema:
foundTarget = true
}
if foundCurrent {
migrations = append(migrations, v)
}
}
if !foundCurrent {
return nil, errMissingCurrentSchema
}
if !foundTarget {
return nil, errMissingTargetSchema
}
return migrations, nil
}
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package leveldb
import (
"errors"
"io/ioutil"
"testing"
"github.com/ethersphere/bee/pkg/logging"
)
func TestOneMigration(t *testing.T) {
defer func(v []migration, s string) {
schemaMigrations = v
dbSchemaCurrent = s
}(schemaMigrations, dbSchemaCurrent)
dbSchemaCode := "code"
dbSchemaCurrent = dbSchemaCode
dbSchemaNext := "dbSchemaNext"
ran := false
shouldNotRun := false
schemaMigrations = []migration{
{name: dbSchemaCode, fn: func(db *store) error {
shouldNotRun = true // this should not be executed
return nil
}},
{name: dbSchemaNext, fn: func(db *store) error {
ran = true
return nil
}},
}
dir := t.TempDir()
logger := logging.New(ioutil.Discard, 0)
// start the fresh statestore with the sanctuary schema name
db, err := NewStateStore(dir, logger)
if err != nil {
t.Fatal(err)
}
err = db.Close()
if err != nil {
t.Fatal(err)
}
dbSchemaCurrent = dbSchemaNext
// start the existing statestore and expect the migration to run
db, err = NewStateStore(dir, logger)
if err != nil {
t.Fatal(err)
}
sn := db.(interface {
GetSchemaName() (string, error)
})
schemaName, err := sn.GetSchemaName()
if err != nil {
t.Fatal(err)
}
if schemaName != dbSchemaNext {
t.Errorf("schema name mismatch. got '%s', want '%s'", schemaName, dbSchemaNext)
}
if !ran {
t.Errorf("expected migration did not run")
}
if shouldNotRun {
t.Errorf("migration ran but shouldnt have")
}
err = db.Close()
if err != nil {
t.Error(err)
}
}
func TestManyMigrations(t *testing.T) {
defer func(v []migration, s string) {
schemaMigrations = v
dbSchemaCurrent = s
}(schemaMigrations, dbSchemaCurrent)
dbSchemaCode := "code"
dbSchemaCurrent = dbSchemaCode
shouldNotRun := false
executionOrder := []int{-1, -1, -1, -1}
schemaMigrations = []migration{
{name: dbSchemaCode, fn: func(db *store) error {
shouldNotRun = true // this should not be executed
return nil
}},
{name: "keju", fn: func(db *store) error {
executionOrder[0] = 0
return nil
}},
{name: "coconut", fn: func(db *store) error {
executionOrder[1] = 1
return nil
}},
{name: "mango", fn: func(db *store) error {
executionOrder[2] = 2
return nil
}},
{name: "salvation", fn: func(db *store) error {
executionOrder[3] = 3
return nil
}},
}
dir := t.TempDir()
logger := logging.New(ioutil.Discard, 0)
// start the fresh statestore with the sanctuary schema name
db, err := NewStateStore(dir, logger)
if err != nil {
t.Fatal(err)
}
err = db.Close()
if err != nil {
t.Fatal(err)
}
dbSchemaCurrent = "salvation"
// start the existing statestore and expect the migration to run
db, err = NewStateStore(dir, logger)
if err != nil {
t.Fatal(err)
}
sn := db.(interface {
GetSchemaName() (string, error)
})
schemaName, err := sn.GetSchemaName()
if err != nil {
t.Fatal(err)
}
if schemaName != "salvation" {
t.Errorf("schema name mismatch. got '%s', want '%s'", schemaName, "salvation")
}
if shouldNotRun {
t.Errorf("migration ran but shouldnt have")
}
for i, v := range executionOrder {
if i != v && i != len(executionOrder)-1 {
t.Errorf("migration did not run in sequence, slot %d value %d", i, v)
}
}
err = db.Close()
if err != nil {
t.Error(err)
}
}
// TestMigrationErrorFrom checks that local store boot should fail when the schema we're migrating from cannot be found
func TestMigrationErrorFrom(t *testing.T) {
defer func(v []migration, s string) {
schemaMigrations = v
dbSchemaCurrent = s
}(schemaMigrations, dbSchemaCurrent)
dbSchemaCurrent = "koo-koo-schema"
shouldNotRun := false
schemaMigrations = []migration{
{name: "langur", fn: func(db *store) error {
shouldNotRun = true
return nil
}},
{name: "coconut", fn: func(db *store) error {
shouldNotRun = true
return nil
}},
{name: "chutney", fn: func(db *store) error {
shouldNotRun = true
return nil
}},
}
dir := t.TempDir()
logger := logging.New(ioutil.Discard, 0)
// start the fresh statestore with the sanctuary schema name
db, err := NewStateStore(dir, logger)
if err != nil {
t.Fatal(err)
}
err = db.Close()
if err != nil {
t.Fatal(err)
}
dbSchemaCurrent = "foo"
// start the existing statestore and expect the migration to run
_, err = NewStateStore(dir, logger)
if !errors.Is(err, errMissingCurrentSchema) {
t.Fatalf("expected errCannotFindSchema but got %v", err)
}
if shouldNotRun {
t.Errorf("migration ran but shouldnt have")
}
}
// TestMigrationErrorTo checks that local store boot should fail when the schema we're migrating to cannot be found
func TestMigrationErrorTo(t *testing.T) {
defer func(v []migration, s string) {
schemaMigrations = v
dbSchemaCurrent = s
}(schemaMigrations, dbSchemaCurrent)
dbSchemaCurrent = "langur"
shouldNotRun := false
schemaMigrations = []migration{
{name: "langur", fn: func(db *store) error {
shouldNotRun = true
return nil
}},
{name: "coconut", fn: func(db *store) error {
shouldNotRun = true
return nil
}},
{name: "chutney", fn: func(db *store) error {
shouldNotRun = true
return nil
}},
}
dir := t.TempDir()
logger := logging.New(ioutil.Discard, 0)
// start the fresh statestore with the sanctuary schema name
db, err := NewStateStore(dir, logger)
if err != nil {
t.Fatal(err)
}
err = db.Close()
if err != nil {
t.Fatal(err)
}
dbSchemaCurrent = "foo"
// start the existing statestore and expect the migration to run
_, err = NewStateStore(dir, logger)
if !errors.Is(err, errMissingTargetSchema) {
t.Fatalf("expected errMissingTargetSchema but got %v", err)
}
if shouldNotRun {
t.Errorf("migration ran but shouldnt have")
}
}
...@@ -7,6 +7,7 @@ package mock ...@@ -7,6 +7,7 @@ package mock
import ( import (
"encoding" "encoding"
"encoding/json" "encoding/json"
"fmt"
"strings" "strings"
"sync" "sync"
...@@ -15,15 +16,23 @@ import ( ...@@ -15,15 +16,23 @@ import (
var _ storage.StateStorer = (*store)(nil) var _ storage.StateStorer = (*store)(nil)
const mockSchemaNameKey = "schema_name"
type store struct { type store struct {
store map[string][]byte store map[string][]byte
mtx sync.RWMutex mtx sync.RWMutex
} }
func NewStateStore() storage.StateStorer { func NewStateStore() storage.StateStorer {
return &store{ s := &store{
store: make(map[string][]byte), store: make(map[string][]byte),
} }
if err := s.Put(mockSchemaNameKey, "mock_schema"); err != nil {
panic(fmt.Errorf("put schema name: %w", err))
}
return s
} }
func (s *store) Get(key string, i interface{}) (err error) { func (s *store) Get(key string, i interface{}) (err error) {
......
...@@ -46,6 +46,7 @@ func (st *Serializing) UnmarshalBinary(data []byte) (err error) { ...@@ -46,6 +46,7 @@ func (st *Serializing) UnmarshalBinary(data []byte) (err error) {
// RunPersist is a specific test case for the persistent state store. // RunPersist is a specific test case for the persistent state store.
// It tests that values persist across sessions. // It tests that values persist across sessions.
func RunPersist(t *testing.T, f func(t *testing.T, dir string) storage.StateStorer) { func RunPersist(t *testing.T, f func(t *testing.T, dir string) storage.StateStorer) {
t.Helper()
dir, err := ioutil.TempDir("", "statestore_test") dir, err := ioutil.TempDir("", "statestore_test")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -225,5 +226,5 @@ func testStoreIterator(t *testing.T, store storage.StateStorer, prefix string, s ...@@ -225,5 +226,5 @@ func testStoreIterator(t *testing.T, store storage.StateStorer, prefix string, s
func testEmpty(t *testing.T, store storage.StateStorer) { func testEmpty(t *testing.T, store storage.StateStorer) {
t.Helper() t.Helper()
testStoreIterator(t, store, "", 0) testStoreIterator(t, store, "", 1)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment