Commit 26eea2bd authored by Zahoor Mohamed's avatar Zahoor Mohamed Committed by GitHub

add localstore from swarm to bee (#82)

parent 0b2d9e34
......@@ -10,7 +10,7 @@ endif
all: build lint vet test binary
.PHONY: binary
binary: export CGO_ENABLED=0
binary: export CGO_ENABLED=1 # set to 0 when go-ethereum/metrics dependecy is removed
binary: dist FORCE
$(GO) version
$(GO) build -trimpath -ldflags "$(LDFLAGS)" -o dist/bee ./cmd/bee
......@@ -31,7 +31,7 @@ test:
$(GO) test -v -race ./...
.PHONY: build
build: export CGO_ENABLED=0
build: export CGO_ENABLED=1 # set to 0 when go-ethereum/metrics dependecy is removed
build:
$(GO) build -trimpath -ldflags "$(LDFLAGS)" ./...
......
......@@ -6,6 +6,7 @@ require (
github.com/btcsuite/btcd v0.20.1-beta
github.com/coreos/go-semver v0.3.0
github.com/dgraph-io/badger/v2 v2.0.3
github.com/ethereum/go-ethereum v1.9.2
github.com/ethersphere/swarm v0.5.7
github.com/gogo/protobuf v1.3.1
github.com/gorilla/handlers v1.4.2
......
This diff is collapsed.
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
/*
Package localstore provides disk storage layer for Swarm Chunk persistence.
It uses swarm/shed abstractions on top of github.com/syndtr/goleveldb LevelDB
implementation.
The main type is DB which manages the storage by providing methods to
access and add Chunks and to manage their status.
Modes are abstractions that do specific changes to Chunks. There are three
mode types:
- ModeGet, for Chunk access
- ModePut, for adding Chunks to the database
- ModeSet, for changing Chunk statuses
Every mode type has a corresponding type (Getter, Putter and Setter)
that provides adequate method to perform the opperation and that type
should be injected into localstore consumers instead the whole DB.
This provides more clear insight which operations consumer is performing
on the database.
Getters, Putters and Setters accept different get, put and set modes
to perform different actions. For example, ModeGet has two different
variables ModeGetRequest and ModeGetSync and two different Getters
can be constructed with them that are used when the chunk is requested
or when the chunk is synced as this two events are differently changing
the database.
Subscription methods are implemented for a specific purpose of
continuous iterations over Chunks that should be provided to
Push and Pull syncing.
DB implements an internal garbage collector that removes only synced
Chunks from the database based on their most recent access time.
Internally, DB stores Chunk data and any required information, such as
store and access timestamps in different shed indexes that can be
iterated on by garbage collector or subscriptions.
*/
package localstore
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"archive/tar"
"context"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"sync"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/shed"
)
const (
// filename in tar archive that holds the information
// about exported data format version
exportVersionFilename = ".swarm-export-version"
// legacy version for previous LDBStore
legacyExportVersion = "1"
// current export format version
currentExportVersion = "2"
)
// Export writes a tar structured data to the writer of
// all chunks in the retrieval data index. It returns the
// number of chunks exported.
func (db *DB) Export(w io.Writer) (count int64, err error) {
tw := tar.NewWriter(w)
defer tw.Close()
if err := tw.WriteHeader(&tar.Header{
Name: exportVersionFilename,
Mode: 0644,
Size: int64(len(currentExportVersion)),
}); err != nil {
return 0, err
}
if _, err := tw.Write([]byte(currentExportVersion)); err != nil {
return 0, err
}
err = db.retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) {
hdr := &tar.Header{
Name: hex.EncodeToString(item.Address),
Mode: 0644,
Size: int64(len(item.Data)),
}
if err := tw.WriteHeader(hdr); err != nil {
return false, err
}
if _, err := tw.Write(item.Data); err != nil {
return false, err
}
count++
return false, nil
}, nil)
return count, err
}
// Import reads a tar structured data from the reader and
// stores chunks in the database. It returns the number of
// chunks imported.
func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) {
tr := tar.NewReader(r)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errC := make(chan error)
doneC := make(chan struct{})
tokenPool := make(chan struct{}, 100)
var wg sync.WaitGroup
go func() {
var (
firstFile = true
// if exportVersionFilename file is not present
// assume legacy version
version = legacyExportVersion
)
for {
hdr, err := tr.Next()
if err != nil {
if err == io.EOF {
break
}
select {
case errC <- err:
case <-ctx.Done():
}
}
if firstFile {
firstFile = false
if hdr.Name == exportVersionFilename {
data, err := ioutil.ReadAll(tr)
if err != nil {
select {
case errC <- err:
case <-ctx.Done():
}
}
version = string(data)
continue
}
}
if len(hdr.Name) != 64 {
log.Warn("ignoring non-chunk file", "name", hdr.Name)
continue
}
keybytes, err := hex.DecodeString(hdr.Name)
if err != nil {
log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
continue
}
data, err := ioutil.ReadAll(tr)
if err != nil {
select {
case errC <- err:
case <-ctx.Done():
}
}
key := chunk.Address(keybytes)
var ch chunk.Chunk
switch version {
case legacyExportVersion:
// LDBStore Export exported chunk data prefixed with the chunk key.
// That is not necessary, as the key is in the chunk filename,
// but backward compatibility needs to be preserved.
ch = chunk.NewChunk(key, data[32:])
case currentExportVersion:
ch = chunk.NewChunk(key, data)
default:
select {
case errC <- fmt.Errorf("unsupported export data version %q", version):
case <-ctx.Done():
}
}
tokenPool <- struct{}{}
wg.Add(1)
go func() {
_, err := db.Put(ctx, chunk.ModePutUpload, ch)
select {
case errC <- err:
case <-ctx.Done():
wg.Done()
<-tokenPool
default:
_, err := db.Put(ctx, chunk.ModePutUpload, ch)
if err != nil {
errC <- err
}
wg.Done()
<-tokenPool
}
}()
count++
}
wg.Wait()
close(doneC)
}()
// wait for all chunks to be stored
for {
select {
case err := <-errC:
if err != nil {
return count, err
}
case <-ctx.Done():
return count, ctx.Err()
default:
select {
case <-doneC:
return count, nil
default:
}
}
}
}
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"bytes"
"context"
"testing"
"github.com/ethersphere/swarm/chunk"
)
// TestExportImport constructs two databases, one to put and export
// chunks and another one to import and validate that all chunks are
// imported.
func TestExportImport(t *testing.T) {
db1, cleanup1 := newTestDB(t, nil)
defer cleanup1()
var chunkCount = 100
chunks := make(map[string][]byte, chunkCount)
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()
_, err := db1.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
chunks[string(ch.Address())] = ch.Data()
}
var buf bytes.Buffer
c, err := db1.Export(&buf)
if err != nil {
t.Fatal(err)
}
wantChunksCount := int64(len(chunks))
if c != wantChunksCount {
t.Errorf("got export count %v, want %v", c, wantChunksCount)
}
db2, cleanup2 := newTestDB(t, nil)
defer cleanup2()
c, err = db2.Import(&buf, false)
if err != nil {
t.Fatal(err)
}
if c != wantChunksCount {
t.Errorf("got import count %v, want %v", c, wantChunksCount)
}
for a, want := range chunks {
addr := chunk.Address([]byte(a))
ch, err := db2.Get(context.Background(), chunk.ModeGetRequest, addr)
if err != nil {
t.Fatal(err)
}
got := ch.Data()
if !bytes.Equal(got, want) {
t.Fatalf("chunk %s: got data %x, want %x", addr.Hex(), got, want)
}
}
}
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
var (
// gcTargetRatio defines the target number of items
// in garbage collection index that will not be removed
// on garbage collection. The target number of items
// is calculated by gcTarget function. This value must be
// in range (0,1]. For example, with 0.9 value,
// garbage collection will leave 90% of defined capacity
// in database after its run. This prevents frequent
// garbage collection runs.
gcTargetRatio = 0.9
// gcBatchSize limits the number of chunks in a single
// leveldb batch on garbage collection.
gcBatchSize uint64 = 200
)
// collectGarbageWorker is a long running function that waits for
// collectGarbageTrigger channel to signal a garbage collection
// run. GC run iterates on gcIndex and removes older items
// form retrieval and other indexes.
func (db *DB) collectGarbageWorker() {
defer close(db.collectGarbageWorkerDone)
for {
select {
case <-db.collectGarbageTrigger:
// run a single collect garbage run and
// if done is false, gcBatchSize is reached and
// another collect garbage run is needed
collectedCount, done, err := db.collectGarbage()
if err != nil {
log.Error("localstore collect garbage", "err", err)
}
// check if another gc run is needed
if !done {
db.triggerGarbageCollection()
}
if testHookCollectGarbage != nil {
testHookCollectGarbage(collectedCount)
}
case <-db.close:
return
}
}
}
// collectGarbage removes chunks from retrieval and other
// indexes if maximal number of chunks in database is reached.
// This function returns the number of removed chunks. If done
// is false, another call to this function is needed to collect
// the rest of the garbage as the batch size limit is reached.
// This function is called in collectGarbageWorker.
func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
metricName := "localstore/gc"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())
defer func() {
if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1)
}
}()
batch := new(leveldb.Batch)
target := db.gcTarget()
// protect database from changing idexes and gcSize
db.batchMu.Lock()
defer db.batchMu.Unlock()
// run through the recently pinned chunks and
// remove them from the gcIndex before iterating through gcIndex
err = db.removeChunksInExcludeIndexFromGC()
if err != nil {
log.Error("localstore exclude pinned chunks", "err", err)
return 0, true, err
}
gcSize, err := db.gcSize.Get()
if err != nil {
return 0, true, err
}
metrics.GetOrRegisterGauge(metricName+"/gcsize", nil).Update(int64(gcSize))
done = true
err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
if gcSize-collectedCount <= target {
return true, nil
}
metrics.GetOrRegisterGauge(metricName+"/storets", nil).Update(item.StoreTimestamp)
metrics.GetOrRegisterGauge(metricName+"/accessts", nil).Update(item.AccessTimestamp)
// delete from retrieve, pull, gc
err = db.retrievalDataIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
}
err = db.retrievalAccessIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
}
err = db.pullIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
}
err = db.gcIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
}
collectedCount++
if collectedCount >= gcBatchSize {
// bach size limit reached,
// another gc run is needed
done = false
return true, nil
}
return false, nil
}, nil)
if err != nil {
return 0, false, err
}
metrics.GetOrRegisterCounter(metricName+"/collected-count", nil).Inc(int64(collectedCount))
db.gcSize.PutInBatch(batch, gcSize-collectedCount)
err = db.shed.WriteBatch(batch)
if err != nil {
metrics.GetOrRegisterCounter(metricName+"/writebatch/err", nil).Inc(1)
return 0, false, err
}
return collectedCount, done, nil
}
// removeChunksInExcludeIndexFromGC removed any recently chunks in the exclude Index, from the gcIndex.
func (db *DB) removeChunksInExcludeIndexFromGC() (err error) {
metricName := "localstore/gc/exclude"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())
defer func() {
if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1)
}
}()
batch := new(leveldb.Batch)
excludedCount := 0
var gcSizeChange int64
err = db.gcExcludeIndex.Iterate(func(item shed.Item) (stop bool, err error) {
// Get access timestamp
retrievalAccessIndexItem, err := db.retrievalAccessIndex.Get(item)
if err != nil {
return false, err
}
item.AccessTimestamp = retrievalAccessIndexItem.AccessTimestamp
// Get the binId
retrievalDataIndexItem, err := db.retrievalDataIndex.Get(item)
if err != nil {
return false, err
}
item.BinID = retrievalDataIndexItem.BinID
// Check if this item is in gcIndex and remove it
ok, err := db.gcIndex.Has(item)
if err != nil {
return false, nil
}
if ok {
err = db.gcIndex.DeleteInBatch(batch, item)
if err != nil {
return false, nil
}
if _, err := db.gcIndex.Get(item); err == nil {
gcSizeChange--
}
excludedCount++
err = db.gcExcludeIndex.DeleteInBatch(batch, item)
if err != nil {
return false, nil
}
}
return false, nil
}, nil)
if err != nil {
return err
}
// update the gc size based on the no of entries deleted in gcIndex
err = db.incGCSizeInBatch(batch, gcSizeChange)
if err != nil {
return err
}
metrics.GetOrRegisterCounter(metricName+"/excluded-count", nil).Inc(int64(excludedCount))
err = db.shed.WriteBatch(batch)
if err != nil {
metrics.GetOrRegisterCounter(metricName+"/writebatch/err", nil).Inc(1)
return err
}
return nil
}
// gcTrigger retruns the absolute value for garbage collection
// target value, calculated from db.capacity and gcTargetRatio.
func (db *DB) gcTarget() (target uint64) {
return uint64(float64(db.capacity) * gcTargetRatio)
}
// triggerGarbageCollection signals collectGarbageWorker
// to call collectGarbage.
func (db *DB) triggerGarbageCollection() {
select {
case db.collectGarbageTrigger <- struct{}{}:
case <-db.close:
default:
}
}
// incGCSizeInBatch changes gcSize field value
// by change which can be negative. This function
// must be called under batchMu lock.
func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) {
if change == 0 {
return nil
}
gcSize, err := db.gcSize.Get()
if err != nil {
return err
}
var new uint64
if change > 0 {
new = gcSize + uint64(change)
} else {
// 'change' is an int64 and is negative
// a conversion is needed with correct sign
c := uint64(-change)
if c > gcSize {
// protect uint64 undeflow
return nil
}
new = gcSize - c
}
db.gcSize.PutInBatch(batch, new)
// trigger garbage collection if we reached the capacity
if new >= db.capacity {
db.triggerGarbageCollection()
}
return nil
}
// testHookCollectGarbage is a hook that can provide
// information when a garbage collection run is done
// and how many items it removed.
var testHookCollectGarbage func(collectedCount uint64)
This diff is collapsed.
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"bytes"
"context"
"math/rand"
"testing"
"github.com/ethersphere/swarm/chunk"
)
// TestDB_pullIndex validates the ordering of keys in pull index.
// Pull index key contains PO prefix which is calculated from
// DB base key and chunk address. This is not an Item field
// which are checked in Mode tests.
// This test uploads chunks, sorts them in expected order and
// validates that pull index iterator will iterate it the same
// order.
func TestDB_pullIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
chunkCount := 50
chunks := make([]testIndexChunk, chunkCount)
// upload random chunks
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
chunks[i] = testIndexChunk{
Chunk: ch,
binID: uint64(i),
}
}
testItemsOrder(t, db.pullIndex, chunks, func(i, j int) (less bool) {
poi := chunk.Proximity(db.baseKey, chunks[i].Address())
poj := chunk.Proximity(db.baseKey, chunks[j].Address())
if poi < poj {
return true
}
if poi > poj {
return false
}
if chunks[i].binID < chunks[j].binID {
return true
}
if chunks[i].binID > chunks[j].binID {
return false
}
return bytes.Compare(chunks[i].Address(), chunks[j].Address()) == -1
})
}
// TestDB_gcIndex validates garbage collection index by uploading
// a chunk with and performing operations using synced, access and
// request modes.
func TestDB_gcIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
chunkCount := 50
chunks := make([]testIndexChunk, chunkCount)
// upload random chunks
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
chunks[i] = testIndexChunk{
Chunk: ch,
}
}
// check if all chunks are stored
newItemsCountTest(db.pullIndex, chunkCount)(t)
// check that chunks are not collectable for garbage
newItemsCountTest(db.gcIndex, 0)(t)
// set update gc test hook to signal when
// update gc goroutine is done by sending to
// testHookUpdateGCChan channel, which is
// used to wait for indexes change verifications
testHookUpdateGCChan := make(chan struct{})
defer setTestHookUpdateGC(func() {
testHookUpdateGCChan <- struct{}{}
})()
t.Run("request unsynced", func(t *testing.T) {
ch := chunks[1]
_, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
// the chunk is not synced
// should not be in the garbace collection index
newItemsCountTest(db.gcIndex, 0)(t)
newIndexGCSizeTest(db)(t)
})
t.Run("sync one chunk", func(t *testing.T) {
ch := chunks[0]
err := db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
// the chunk is synced and should be in gc index
newItemsCountTest(db.gcIndex, 1)(t)
newIndexGCSizeTest(db)(t)
})
t.Run("sync all chunks", func(t *testing.T) {
for i := range chunks {
err := db.Set(context.Background(), chunk.ModeSetSyncPull, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
}
testItemsOrder(t, db.gcIndex, chunks, nil)
newIndexGCSizeTest(db)(t)
})
t.Run("request one chunk", func(t *testing.T) {
i := 6
_, err := db.Get(context.Background(), chunk.ModeGetRequest, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
// move the chunk to the end of the expected gc
c := chunks[i]
chunks = append(chunks[:i], chunks[i+1:]...)
chunks = append(chunks, c)
testItemsOrder(t, db.gcIndex, chunks, nil)
newIndexGCSizeTest(db)(t)
})
t.Run("random chunk request", func(t *testing.T) {
rand.Shuffle(len(chunks), func(i, j int) {
chunks[i], chunks[j] = chunks[j], chunks[i]
})
for _, ch := range chunks {
_, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
}
testItemsOrder(t, db.gcIndex, chunks, nil)
newIndexGCSizeTest(db)(t)
})
t.Run("remove one chunk", func(t *testing.T) {
i := 3
err := db.Set(context.Background(), chunk.ModeSetRemove, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
// remove the chunk from the expected chunks in gc index
chunks = append(chunks[:i], chunks[i+1:]...)
testItemsOrder(t, db.gcIndex, chunks, nil)
newIndexGCSizeTest(db)(t)
})
}
This diff is collapsed.
This diff is collapsed.
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"encoding/binary"
"errors"
"fmt"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
var errMissingCurrentSchema = errors.New("could not find current db schema")
var errMissingTargetSchema = errors.New("could not find target db schema")
type migration struct {
name string // name of the schema
fn func(db *DB) error // the migration function that needs to be performed in order to get to the current schema name
}
// schemaMigrations contains an ordered list of the database schemes, that is
// in order to run data migrations in the correct sequence
var schemaMigrations = []migration{
{name: DbSchemaPurity, fn: func(db *DB) error { return nil }},
{name: DbSchemaHalloween, fn: func(db *DB) error { return nil }},
{name: DbSchemaSanctuary, fn: func(db *DB) error { return nil }},
{name: DbSchemaDiwali, fn: migrateSanctuary},
}
func (db *DB) migrate(schemaName string) error {
migrations, err := getMigrations(schemaName, DbSchemaCurrent, schemaMigrations)
if err != nil {
return fmt.Errorf("error getting migrations for current schema (%s): %v", schemaName, err)
}
// no migrations to run
if migrations == nil {
return nil
}
log.Info("need to run data migrations on localstore", "numMigrations", len(migrations), "schemaName", schemaName)
for i := 0; i < len(migrations); i++ {
err := migrations[i].fn(db)
if err != nil {
return err
}
err = db.schemaName.Put(migrations[i].name) // put the name of the current schema
if err != nil {
return err
}
schemaName, err = db.schemaName.Get()
if err != nil {
return err
}
log.Info("successfully ran migration", "migrationId", i, "currentSchema", schemaName)
}
return nil
}
// getMigrations returns an ordered list of migrations that need be executed
// with no errors in order to bring the localstore to the most up-to-date
// schema definition
func getMigrations(currentSchema, targetSchema string, allSchemeMigrations []migration) (migrations []migration, err error) {
foundCurrent := false
foundTarget := false
if currentSchema == DbSchemaCurrent {
return nil, nil
}
for i, v := range allSchemeMigrations {
switch v.name {
case currentSchema:
if foundCurrent {
return nil, errors.New("found schema name for the second time when looking for migrations")
}
foundCurrent = true
log.Info("found current localstore schema", "currentSchema", currentSchema, "migrateTo", DbSchemaCurrent, "total migrations", len(allSchemeMigrations)-i)
continue // current schema migration should not be executed (already has been when schema was migrated to)
case targetSchema:
foundTarget = true
}
if foundCurrent {
migrations = append(migrations, v)
}
}
if !foundCurrent {
return nil, errMissingCurrentSchema
}
if !foundTarget {
return nil, errMissingTargetSchema
}
return migrations, nil
}
// this function migrates Sanctuary schema to the Diwali schema
func migrateSanctuary(db *DB) error {
// just rename the pull index
renamed, err := db.shed.RenameIndex("PO|BinID->Hash", "PO|BinID->Hash|Tag")
if err != nil {
return err
}
if !renamed {
return errors.New("pull index was not successfully renamed")
}
if db.tags == nil {
return errors.New("had an error accessing the tags object")
}
batch := new(leveldb.Batch)
db.batchMu.Lock()
defer db.batchMu.Unlock()
// since pullIndex points to the Tag value, we should eliminate possible
// pushIndex leak due to items that were used by previous pull sync tag
// increment logic. we need to build the index first since db object is
// still not initialised at this stage
db.pushIndex, err = db.shed.NewIndex("StoreTimestamp|Hash->Tags", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 40)
binary.BigEndian.PutUint64(key[:8], uint64(fields.StoreTimestamp))
copy(key[8:], fields.Address[:])
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key[8:]
e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
tag := make([]byte, 4)
binary.BigEndian.PutUint32(tag, fields.Tag)
return tag, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
if value != nil {
e.Tag = binary.BigEndian.Uint32(value)
}
return e, nil
},
})
if err != nil {
return err
}
err = db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) {
tag, err := db.tags.Get(item.Tag)
if err != nil {
if err == chunk.TagNotFoundErr {
return false, nil
}
return true, err
}
// anonymous tags should no longer appear in pushIndex
if tag != nil && tag.Anonymous {
err = db.pushIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
}
}
return false, nil
}, nil)
if err != nil {
return err
}
return db.shed.WriteBatch(batch)
}
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"io"
"io/ioutil"
"log"
"math/rand"
"os"
"path"
"strings"
"testing"
"github.com/ethersphere/swarm/chunk"
)
func TestOneMigration(t *testing.T) {
defer func(v []migration, s string) {
schemaMigrations = v
DbSchemaCurrent = s
}(schemaMigrations, DbSchemaCurrent)
DbSchemaCurrent = DbSchemaSanctuary
ran := false
shouldNotRun := false
schemaMigrations = []migration{
{name: DbSchemaSanctuary, fn: func(db *DB) error {
shouldNotRun = true // this should not be executed
return nil
}},
{name: DbSchemaDiwali, fn: func(db *DB) error {
ran = true
return nil
}},
}
dir, err := ioutil.TempDir("", "localstore-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
baseKey := make([]byte, 32)
if _, err := rand.Read(baseKey); err != nil {
t.Fatal(err)
}
// start the fresh localstore with the sanctuary schema name
db, err := New(dir, baseKey, nil)
if err != nil {
t.Fatal(err)
}
err = db.Close()
if err != nil {
t.Fatal(err)
}
DbSchemaCurrent = DbSchemaDiwali
// start the existing localstore and expect the migration to run
db, err = New(dir, baseKey, nil)
if err != nil {
t.Fatal(err)
}
schemaName, err := db.schemaName.Get()
if err != nil {
t.Fatal(err)
}
if schemaName != DbSchemaDiwali {
t.Errorf("schema name mismatch. got '%s', want '%s'", schemaName, DbSchemaDiwali)
}
if !ran {
t.Errorf("expected migration did not run")
}
if shouldNotRun {
t.Errorf("migration ran but shouldnt have")
}
err = db.Close()
if err != nil {
t.Error(err)
}
}
func TestManyMigrations(t *testing.T) {
defer func(v []migration, s string) {
schemaMigrations = v
DbSchemaCurrent = s
}(schemaMigrations, DbSchemaCurrent)
DbSchemaCurrent = DbSchemaSanctuary
shouldNotRun := false
executionOrder := []int{-1, -1, -1, -1}
schemaMigrations = []migration{
{name: DbSchemaSanctuary, fn: func(db *DB) error {
shouldNotRun = true // this should not be executed
return nil
}},
{name: DbSchemaDiwali, fn: func(db *DB) error {
executionOrder[0] = 0
return nil
}},
{name: "coconut", fn: func(db *DB) error {
executionOrder[1] = 1
return nil
}},
{name: "mango", fn: func(db *DB) error {
executionOrder[2] = 2
return nil
}},
{name: "salvation", fn: func(db *DB) error {
executionOrder[3] = 3
return nil
}},
}
dir, err := ioutil.TempDir("", "localstore-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
baseKey := make([]byte, 32)
if _, err := rand.Read(baseKey); err != nil {
t.Fatal(err)
}
// start the fresh localstore with the sanctuary schema name
db, err := New(dir, baseKey, nil)
if err != nil {
t.Fatal(err)
}
err = db.Close()
if err != nil {
t.Fatal(err)
}
DbSchemaCurrent = "salvation"
// start the existing localstore and expect the migration to run
db, err = New(dir, baseKey, nil)
if err != nil {
t.Fatal(err)
}
schemaName, err := db.schemaName.Get()
if err != nil {
t.Fatal(err)
}
if schemaName != "salvation" {
t.Errorf("schema name mismatch. got '%s', want '%s'", schemaName, "salvation")
}
if shouldNotRun {
t.Errorf("migration ran but shouldnt have")
}
for i, v := range executionOrder {
if i != v && i != len(executionOrder)-1 {
t.Errorf("migration did not run in sequence, slot %d value %d", i, v)
}
}
err = db.Close()
if err != nil {
t.Error(err)
}
}
// TestMigrationFailFrom checks that local store boot should fail when the schema we're migrating from cannot be found
func TestMigrationFailFrom(t *testing.T) {
defer func(v []migration, s string) {
schemaMigrations = v
DbSchemaCurrent = s
}(schemaMigrations, DbSchemaCurrent)
DbSchemaCurrent = "koo-koo-schema"
shouldNotRun := false
schemaMigrations = []migration{
{name: "langur", fn: func(db *DB) error {
shouldNotRun = true
return nil
}},
{name: "coconut", fn: func(db *DB) error {
shouldNotRun = true
return nil
}},
{name: "chutney", fn: func(db *DB) error {
shouldNotRun = true
return nil
}},
}
dir, err := ioutil.TempDir("", "localstore-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
baseKey := make([]byte, 32)
if _, err := rand.Read(baseKey); err != nil {
t.Fatal(err)
}
// start the fresh localstore with the sanctuary schema name
db, err := New(dir, baseKey, nil)
if err != nil {
t.Fatal(err)
}
err = db.Close()
if err != nil {
t.Fatal(err)
}
DbSchemaCurrent = "foo"
// start the existing localstore and expect the migration to run
_, err = New(dir, baseKey, nil)
if !strings.Contains(err.Error(), errMissingCurrentSchema.Error()) {
t.Fatalf("expected errCannotFindSchema but got %v", err)
}
if shouldNotRun {
t.Errorf("migration ran but shouldnt have")
}
}
// TestMigrationFailTo checks that local store boot should fail when the schema we're migrating to cannot be found
func TestMigrationFailTo(t *testing.T) {
defer func(v []migration, s string) {
schemaMigrations = v
DbSchemaCurrent = s
}(schemaMigrations, DbSchemaCurrent)
DbSchemaCurrent = "langur"
shouldNotRun := false
schemaMigrations = []migration{
{name: "langur", fn: func(db *DB) error {
shouldNotRun = true
return nil
}},
{name: "coconut", fn: func(db *DB) error {
shouldNotRun = true
return nil
}},
{name: "chutney", fn: func(db *DB) error {
shouldNotRun = true
return nil
}},
}
dir, err := ioutil.TempDir("", "localstore-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
baseKey := make([]byte, 32)
if _, err := rand.Read(baseKey); err != nil {
t.Fatal(err)
}
// start the fresh localstore with the sanctuary schema name
db, err := New(dir, baseKey, nil)
if err != nil {
t.Fatal(err)
}
err = db.Close()
if err != nil {
t.Fatal(err)
}
DbSchemaCurrent = "foo"
// start the existing localstore and expect the migration to run
_, err = New(dir, baseKey, nil)
if !strings.Contains(err.Error(), errMissingTargetSchema.Error()) {
t.Fatalf("expected errMissingTargetSchema but got %v", err)
}
if shouldNotRun {
t.Errorf("migration ran but shouldnt have")
}
}
// TestMigrateSanctuaryFixture migrates an actual Sanctuary localstore
// to the most recent schema.
func TestMigrateSanctuaryFixture(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "localstore-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpdir)
dir := path.Join(".", "testdata", "sanctuary")
if err != nil {
t.Fatal(err)
}
files, err := ioutil.ReadDir(dir)
if err != nil {
log.Fatal(err)
}
for _, f := range files {
err = copyFileContents(path.Join(dir, f.Name()), path.Join(tmpdir, f.Name()))
if err != nil {
t.Fatal(err)
}
}
baseKey := make([]byte, 32)
if _, err := rand.Read(baseKey); err != nil {
t.Fatal(err)
}
// start localstore with the copied fixture
db, err := New(tmpdir, baseKey, &Options{Tags: chunk.NewTags()})
if err != nil {
t.Fatal(err)
}
schemaName, err := db.schemaName.Get()
if err != nil {
t.Fatal(err)
}
if schemaName != DbSchemaCurrent {
t.Fatalf("schema name mismatch, want '%s' got '%s'", DbSchemaCurrent, schemaName)
}
err = db.Close()
if err != nil {
t.Fatal(err)
}
}
func copyFileContents(src, dst string) (err error) {
in, err := os.Open(src)
if err != nil {
return err
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
return err
}
defer out.Close()
if _, err = io.Copy(out, in); err != nil {
return err
}
return out.Sync()
}
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"context"
"fmt"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
// Get returns a chunk from the database. If the chunk is
// not found chunk.ErrChunkNotFound will be returned.
// All required indexes will be updated required by the
// Getter Mode. Get is required to implement chunk.Store
// interface.
func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
metricName := fmt.Sprintf("localstore/Get/%s", mode)
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())
defer func() {
if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1)
}
}()
out, err := db.get(mode, addr)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, chunk.ErrChunkNotFound
}
return nil, err
}
return chunk.NewChunk(out.Address, out.Data).WithPinCounter(out.PinCounter), nil
}
// get returns Item from the retrieval index
// and updates other indexes.
func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err error) {
item := addressToItem(addr)
out, err = db.retrievalDataIndex.Get(item)
if err != nil {
return out, err
}
switch mode {
// update the access timestamp and gc index
case chunk.ModeGetRequest:
db.updateGCItems(out)
case chunk.ModeGetPin:
pinnedItem, err := db.pinIndex.Get(item)
if err != nil {
return out, err
}
return pinnedItem, nil
// no updates to indexes
case chunk.ModeGetSync:
case chunk.ModeGetLookup:
default:
return out, ErrInvalidMode
}
return out, nil
}
// updateGCItems is called when ModeGetRequest is used
// for Get or GetMulti to update access time and gc indexes
// for all returned chunks.
func (db *DB) updateGCItems(items ...shed.Item) {
if db.updateGCSem != nil {
// wait before creating new goroutines
// if updateGCSem buffer id full
db.updateGCSem <- struct{}{}
}
db.updateGCWG.Add(1)
go func() {
defer db.updateGCWG.Done()
if db.updateGCSem != nil {
// free a spot in updateGCSem buffer
// for a new goroutine
defer func() { <-db.updateGCSem }()
}
metricName := "localstore/updateGC"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())
for _, item := range items {
err := db.updateGC(item)
if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1)
log.Error("localstore update gc", "err", err)
}
}
// if gc update hook is defined, call it
if testHookUpdateGC != nil {
testHookUpdateGC()
}
}()
}
// updateGC updates garbage collection index for
// a single item. Provided item is expected to have
// only Address and Data fields with non zero values,
// which is ensured by the get function.
func (db *DB) updateGC(item shed.Item) (err error) {
db.batchMu.Lock()
defer db.batchMu.Unlock()
batch := new(leveldb.Batch)
// update accessTimeStamp in retrieve, gc
i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
item.AccessTimestamp = i.AccessTimestamp
case leveldb.ErrNotFound:
// no chunk accesses
default:
return err
}
if item.AccessTimestamp == 0 {
// chunk is not yet synced
// do not add it to the gc index
return nil
}
// delete current entry from the gc index
err = db.gcIndex.DeleteInBatch(batch, item)
if err != nil {
return err
}
// update access timestamp
item.AccessTimestamp = now()
// update retrieve access index
err = db.retrievalAccessIndex.PutInBatch(batch, item)
if err != nil {
return err
}
// add new entry to gc index
err = db.gcIndex.PutInBatch(batch, item)
if err != nil {
return err
}
return db.shed.WriteBatch(batch)
}
// testHookUpdateGC is a hook that can provide
// information when a garbage collection index is updated.
var testHookUpdateGC func()
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"context"
"fmt"
"time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
// GetMulti returns chunks from the database. If one of the chunks is not found
// chunk.ErrChunkNotFound will be returned. All required indexes will be updated
// required by the Getter Mode. GetMulti is required to implement chunk.Store
// interface.
func (db *DB) GetMulti(ctx context.Context, mode chunk.ModeGet, addrs ...chunk.Address) (chunks []chunk.Chunk, err error) {
metricName := fmt.Sprintf("localstore/GetMulti/%s", mode)
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())
defer func() {
if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1)
}
}()
out, err := db.getMulti(mode, addrs...)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, chunk.ErrChunkNotFound
}
return nil, err
}
chunks = make([]chunk.Chunk, len(out))
for i, ch := range out {
chunks[i] = chunk.NewChunk(ch.Address, ch.Data).WithPinCounter(ch.PinCounter)
}
return chunks, nil
}
// getMulti returns Items from the retrieval index
// and updates other indexes.
func (db *DB) getMulti(mode chunk.ModeGet, addrs ...chunk.Address) (out []shed.Item, err error) {
out = make([]shed.Item, len(addrs))
for i, addr := range addrs {
out[i].Address = addr
}
err = db.retrievalDataIndex.Fill(out)
if err != nil {
return nil, err
}
switch mode {
// update the access timestamp and gc index
case chunk.ModeGetRequest:
db.updateGCItems(out...)
case chunk.ModeGetPin:
err := db.pinIndex.Fill(out)
if err != nil {
return nil, err
}
// no updates to indexes
case chunk.ModeGetSync:
case chunk.ModeGetLookup:
default:
return out, ErrInvalidMode
}
return out, nil
}
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"context"
"reflect"
"testing"
"github.com/ethersphere/swarm/chunk"
)
// TestModeGetMulti stores chunks and validates that GetMulti
// is returning them correctly.
func TestModeGetMulti(t *testing.T) {
const chunkCount = 10
for _, mode := range []chunk.ModeGet{
chunk.ModeGetRequest,
chunk.ModeGetSync,
chunk.ModeGetLookup,
chunk.ModeGetPin,
} {
t.Run(mode.String(), func(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
chunks := generateTestRandomChunks(chunkCount)
_, err := db.Put(context.Background(), chunk.ModePutUpload, chunks...)
if err != nil {
t.Fatal(err)
}
if mode == chunk.ModeGetPin {
// pin chunks so that it is not returned as not found by pinIndex
for i, ch := range chunks {
err := db.Set(context.Background(), chunk.ModeSetPin, ch.Address())
if err != nil {
t.Fatal(err)
}
chunks[i] = ch.WithPinCounter(1)
}
}
addrs := chunkAddresses(chunks)
got, err := db.GetMulti(context.Background(), mode, addrs...)
if err != nil {
t.Fatal(err)
}
for i := 0; i < chunkCount; i++ {
if !reflect.DeepEqual(got[i], chunks[i]) {
t.Errorf("got %v chunk %v, want %v", i, got[i], chunks[i])
}
}
missingChunk := generateTestRandomChunk()
want := chunk.ErrChunkNotFound
_, err = db.GetMulti(context.Background(), mode, append(addrs, missingChunk.Address())...)
if err != want {
t.Errorf("got error %v, want %v", err, want)
}
})
}
}
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"bytes"
"context"
"testing"
"time"
"github.com/ethersphere/swarm/chunk"
)
// TestModeGetRequest validates ModeGetRequest index values on the provided DB.
func TestModeGetRequest(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
uploadTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return uploadTimestamp
})()
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
// set update gc test hook to signal when
// update gc goroutine is done by sending to
// testHookUpdateGCChan channel, which is
// used to wait for garbage colletion index
// changes
testHookUpdateGCChan := make(chan struct{})
defer setTestHookUpdateGC(func() {
testHookUpdateGCChan <- struct{}{}
})()
t.Run("get unsynced", func(t *testing.T) {
got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
if !bytes.Equal(got.Address(), ch.Address()) {
t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
if !bytes.Equal(got.Data(), ch.Data()) {
t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data())
}
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, 0))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
t.Run("gc size", newIndexGCSizeTest(db))
})
// set chunk to synced state
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
t.Run("first get", func(t *testing.T) {
got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
if !bytes.Equal(got.Address(), ch.Address()) {
t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
if !bytes.Equal(got.Data(), ch.Data()) {
t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data())
}
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, uploadTimestamp))
t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, uploadTimestamp, 1, nil))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
t.Run("gc size", newIndexGCSizeTest(db))
})
t.Run("second get", func(t *testing.T) {
accessTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return accessTimestamp
})()
got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
if !bytes.Equal(got.Address(), ch.Address()) {
t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
if !bytes.Equal(got.Data(), ch.Data()) {
t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data())
}
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, accessTimestamp))
t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, accessTimestamp, 1, nil))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
t.Run("gc size", newIndexGCSizeTest(db))
})
t.Run("multi", func(t *testing.T) {
got, err := db.GetMulti(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
if !bytes.Equal(got[0].Address(), ch.Address()) {
t.Errorf("got chunk address %x, want %x", got[0].Address(), ch.Address())
}
if !bytes.Equal(got[0].Data(), ch.Data()) {
t.Errorf("got chunk data %x, want %x", got[0].Data(), ch.Data())
}
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, uploadTimestamp))
t.Run("gc index", newGCIndexTest(db, ch, uploadTimestamp, uploadTimestamp, 1, nil))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
t.Run("gc size", newIndexGCSizeTest(db))
})
}
// TestModeGetSync validates ModeGetSync index values on the provided DB.
func TestModeGetSync(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
uploadTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return uploadTimestamp
})()
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
got, err := db.Get(context.Background(), chunk.ModeGetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got.Address(), ch.Address()) {
t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
if !bytes.Equal(got.Data(), ch.Data()) {
t.Errorf("got chunk data %x, want %x", got.Data(), ch.Data())
}
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, ch, uploadTimestamp, 0))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
t.Run("gc size", newIndexGCSizeTest(db))
t.Run("multi", func(t *testing.T) {
got, err := db.GetMulti(context.Background(), chunk.ModeGetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got[0].Address(), ch.Address()) {
t.Errorf("got chunk address %x, want %x", got[0].Address(), ch.Address())
}
if !bytes.Equal(got[0].Data(), ch.Data()) {
t.Errorf("got chunk data %x, want %x", got[0].Data(), ch.Data())
}
})
}
// setTestHookUpdateGC sets testHookUpdateGC and
// returns a function that will reset it to the
// value before the change.
func setTestHookUpdateGC(h func()) (reset func()) {
current := testHookUpdateGC
reset = func() { testHookUpdateGC = current }
testHookUpdateGC = h
return reset
}
// TestSetTestHookUpdateGC tests if setTestHookUpdateGC changes
// testHookUpdateGC function correctly and if its reset function
// resets the original function.
func TestSetTestHookUpdateGC(t *testing.T) {
// Set the current function after the test finishes.
defer func(h func()) { testHookUpdateGC = h }(testHookUpdateGC)
// expected value for the unchanged function
original := 1
// expected value for the changed function
changed := 2
// this variable will be set with two different functions
var got int
// define the original (unchanged) functions
testHookUpdateGC = func() {
got = original
}
// set got variable
testHookUpdateGC()
// test if got variable is set correctly
if got != original {
t.Errorf("got hook value %v, want %v", got, original)
}
// set the new function
reset := setTestHookUpdateGC(func() {
got = changed
})
// set got variable
testHookUpdateGC()
// test if got variable is set correctly to changed value
if got != changed {
t.Errorf("got hook value %v, want %v", got, changed)
}
// set the function to the original one
reset()
// set got variable
testHookUpdateGC()
// test if got variable is set correctly to original value
if got != original {
t.Errorf("got hook value %v, want %v", got, original)
}
}
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"context"
"time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk"
)
// Has returns true if the chunk is stored in database.
func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) {
metricName := "localstore/Has"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())
has, err := db.retrievalDataIndex.Has(addressToItem(addr))
if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1)
}
return has, err
}
// HasMulti returns a slice of booleans which represent if the provided chunks
// are stored in database.
func (db *DB) HasMulti(ctx context.Context, addrs ...chunk.Address) ([]bool, error) {
metricName := "localstore/HasMulti"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())
have, err := db.retrievalDataIndex.HasMulti(addressesToItems(addrs...)...)
if err != nil {
metrics.GetOrRegisterCounter(metricName+"/error", nil).Inc(1)
}
return have, err
}
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
"github.com/ethersphere/swarm/chunk"
)
// TestHas validates that Has method is returning true for
// the stored chunk and false for one that is not stored.
func TestHas(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
has, err := db.Has(context.Background(), ch.Address())
if err != nil {
t.Fatal(err)
}
if !has {
t.Error("chunk not found")
}
missingChunk := generateTestRandomChunk()
has, err = db.Has(context.Background(), missingChunk.Address())
if err != nil {
t.Fatal(err)
}
if has {
t.Error("unexpected chunk is found")
}
}
// TestHasMulti validates that HasMulti method is returning correct boolean
// slice for stored chunks.
func TestHasMulti(t *testing.T) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, tc := range multiChunkTestCases {
t.Run(tc.name, func(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
chunks := generateTestRandomChunks(tc.count)
want := make([]bool, tc.count)
for i, ch := range chunks {
if r.Intn(2) == 0 {
// randomly exclude half of the chunks
continue
}
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
want[i] = true
}
got, err := db.HasMulti(context.Background(), chunkAddresses(chunks)...)
if err != nil {
t.Fatal(err)
}
if fmt.Sprint(got) != fmt.Sprint(want) {
t.Errorf("got %v, want %v", got, want)
}
})
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"github.com/ethersphere/swarm/log"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)
// The DB schema we want to use. The actual/current DB schema might differ
// until migrations are run.
var DbSchemaCurrent = DbSchemaDiwali
// There was a time when we had no schema at all.
const DbSchemaNone = ""
// "purity" is the first formal schema of LevelDB we release together with Swarm 0.3.5
const DbSchemaPurity = "purity"
// "halloween" is here because we had a screw in the garbage collector index.
// Because of that we had to rebuild the GC index to get rid of erroneous
// entries and that takes a long time. This schema is used for bookkeeping,
// so rebuild index will run just once.
const DbSchemaHalloween = "halloween"
const DbSchemaSanctuary = "sanctuary"
// the "diwali" migration simply renames the pullIndex in localstore
const DbSchemaDiwali = "diwali"
// returns true if legacy database is in the datadir
func IsLegacyDatabase(datadir string) bool {
var (
legacyDbSchemaKey = []byte{8}
)
db, err := leveldb.OpenFile(datadir, &opt.Options{OpenFilesCacheCapacity: 128})
if err != nil {
log.Error("got an error while trying to open leveldb path", "path", datadir, "err", err)
return false
}
defer db.Close()
data, err := db.Get(legacyDbSchemaKey, nil)
if err != nil {
if err == leveldb.ErrNotFound {
// if we haven't found anything under the legacy db schema key- we are not on legacy
return false
}
log.Error("got an unexpected error fetching legacy name from the database", "err", err)
}
log.Trace("checking if database scheme is legacy", "schema name", string(data))
return string(data) == DbSchemaHalloween || string(data) == DbSchemaPurity
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment