Commit 9d8925d2 authored by Janoš Guljaš's avatar Janoš Guljaš Committed by GitHub

replace ethersphere/swarm/chunk and related types with bee ones (#89)

parent c464aec2
......@@ -28,12 +28,12 @@ import (
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/swarm"
)
// Store holds fields and indexes (including their encoding functions)
// and defines operations on them by composing data from them.
// It implements storage.ChunkStore interface.
// It is just an example without any support for parallel operations
// or real world implementation.
type Store struct {
......@@ -145,9 +145,9 @@ func New(path string) (s *Store, err error) {
}
// Put stores the chunk and sets it store timestamp.
func (s *Store) Put(_ context.Context, ch storage.Chunk) (err error) {
func (s *Store) Put(_ context.Context, ch swarm.Chunk) (err error) {
return s.retrievalIndex.Put(shed.Item{
Address: ch.Address(),
Address: ch.Address().Bytes(),
Data: ch.Data(),
StoreTimestamp: time.Now().UTC().UnixNano(),
})
......@@ -157,12 +157,12 @@ func (s *Store) Put(_ context.Context, ch storage.Chunk) (err error) {
// It updates access and gc indexes by removing the previous
// items from them and adding new items as keys of index entries
// are changed.
func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, err error) {
func (s *Store) Get(_ context.Context, addr swarm.Address) (c swarm.Chunk, err error) {
batch := s.db.GetBatch(true)
// Get the chunk data and storage timestamp.
item, err := s.retrievalIndex.Get(shed.Item{
Address: addr,
Address: addr.Bytes(),
})
if err != nil {
return nil, err
......@@ -170,7 +170,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
// Get the chunk access timestamp.
accessItem, err := s.accessIndex.Get(shed.Item{
Address: addr,
Address: addr.Bytes(),
})
switch err {
case nil:
......@@ -195,7 +195,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
// Put new access timestamp in access index.
err = s.accessIndex.PutInBatch(batch, shed.Item{
Address: addr,
Address: addr.Bytes(),
AccessTimestamp: accessTimestamp,
})
if err != nil {
......@@ -226,7 +226,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
}
// Return the chunk.
return storage.NewChunk(item.Address, item.Data), nil
return swarm.NewChunk(swarm.NewAddress(item.Address), item.Data), nil
}
// CollectGarbage is an example of index iteration.
......@@ -312,7 +312,7 @@ func Example_store() {
}
defer s.Close()
ch := storage.GenerateRandomChunk(1024)
ch := testing.GenerateTestRandomChunk()
err = s.Put(context.Background(), ch)
if err != nil {
log.Fatal(err)
......
......@@ -25,7 +25,8 @@ import (
"io/ioutil"
"sync"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/swarm/shed"
)
......@@ -141,12 +142,12 @@ func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) {
case <-ctx.Done():
}
}
key := chunk.Address(keybytes)
key := swarm.NewAddress(keybytes)
var ch chunk.Chunk
var ch swarm.Chunk
switch version {
case currentExportVersion:
ch = chunk.NewChunk(key, data)
ch = swarm.NewChunk(key, data)
default:
select {
case errC <- fmt.Errorf("unsupported export data version %q", version):
......@@ -157,14 +158,14 @@ func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) {
wg.Add(1)
go func() {
_, err := db.Put(ctx, chunk.ModePutUpload, ch)
_, err := db.Put(ctx, storage.ModePutUpload, ch)
select {
case errC <- err:
case <-ctx.Done():
wg.Done()
<-tokenPool
default:
_, err := db.Put(ctx, chunk.ModePutUpload, ch)
_, err := db.Put(ctx, storage.ModePutUpload, ch)
if err != nil {
errC <- err
}
......
......@@ -21,7 +21,8 @@ import (
"context"
"testing"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
// TestExportImport constructs two databases, one to put and export
......@@ -37,11 +38,11 @@ func TestExportImport(t *testing.T) {
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()
_, err := db1.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db1.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
chunks[string(ch.Address())] = ch.Data()
chunks[ch.Address().String()] = ch.Data()
}
var buf bytes.Buffer
......@@ -67,14 +68,14 @@ func TestExportImport(t *testing.T) {
}
for a, want := range chunks {
addr := chunk.Address([]byte(a))
ch, err := db2.Get(context.Background(), chunk.ModeGetRequest, addr)
addr := swarm.MustParseHexAddress(a)
ch, err := db2.Get(context.Background(), storage.ModeGetRequest, addr)
if err != nil {
t.Fatal(err)
}
got := ch.Data()
if !bytes.Equal(got, want) {
t.Fatalf("chunk %s: got data %x, want %x", addr.Hex(), got, want)
t.Fatalf("chunk %s: got data %x, want %x", addr, got, want)
}
}
}
......@@ -26,7 +26,8 @@ import (
"time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/swarm/shed"
)
......@@ -66,18 +67,18 @@ func testDBCollectGarbageWorker(t *testing.T) {
})()
defer cleanupFunc()
addrs := make([]chunk.Address, 0)
addrs := make([]swarm.Address, 0)
// upload random chunks
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -111,24 +112,24 @@ func testDBCollectGarbageWorker(t *testing.T) {
// the first synced chunk should be removed
t.Run("get the first synced chunk", func(t *testing.T) {
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
if err != chunk.ErrChunkNotFound {
t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound)
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[0])
if err != storage.ErrNotFound {
t.Errorf("got error %v, want %v", err, storage.ErrNotFound)
}
})
t.Run("only first inserted chunks should be removed", func(t *testing.T) {
for i := 0; i < (chunkCount - int(gcTarget)); i++ {
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[i])
if err != chunk.ErrChunkNotFound {
t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound)
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[i])
if err != storage.ErrNotFound {
t.Errorf("got error %v, want %v", err, storage.ErrNotFound)
}
}
})
// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[len(addrs)-1])
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
......@@ -155,19 +156,19 @@ func TestPinGC(t *testing.T) {
})()
defer cleanupFunc()
addrs := make([]chunk.Address, 0)
pinAddrs := make([]chunk.Address, 0)
addrs := make([]swarm.Address, 0)
pinAddrs := make([]swarm.Address, 0)
// upload random chunks
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -176,7 +177,7 @@ func TestPinGC(t *testing.T) {
// Pin the chunks at the beginning to make sure they are not removed by GC
if i < pinChunksCount {
err = db.Set(context.Background(), chunk.ModeSetPin, ch.Address())
err = db.Set(context.Background(), storage.ModeSetPin, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -213,7 +214,7 @@ func TestPinGC(t *testing.T) {
t.Run("pinned chunk not in gc Index", func(t *testing.T) {
err := db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
for _, pinHash := range pinAddrs {
if bytes.Equal(pinHash, item.Address) {
if bytes.Equal(pinHash.Bytes(), item.Address) {
t.Fatal("pin chunk present in gcIndex")
}
}
......@@ -226,7 +227,7 @@ func TestPinGC(t *testing.T) {
t.Run("pinned chunks exists", func(t *testing.T) {
for _, hash := range pinAddrs {
_, err := db.Get(context.Background(), chunk.ModeGetRequest, hash)
_, err := db.Get(context.Background(), storage.ModeGetRequest, hash)
if err != nil {
t.Fatal(err)
}
......@@ -235,8 +236,8 @@ func TestPinGC(t *testing.T) {
t.Run("first chunks after pinned chunks should be removed", func(t *testing.T) {
for i := pinChunksCount; i < (int(dbCapacity) - int(gcTarget)); i++ {
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[i])
if err != chunk.ErrChunkNotFound {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[i])
if err != storage.ErrNotFound {
t.Fatal(err)
}
}
......@@ -254,25 +255,25 @@ func TestGCAfterPin(t *testing.T) {
})
defer cleanupFunc()
pinAddrs := make([]chunk.Address, 0)
pinAddrs := make([]swarm.Address, 0)
// upload random chunks
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
// Pin before adding to GC in ModeSetSyncPull
err = db.Set(context.Background(), chunk.ModeSetPin, ch.Address())
err = db.Set(context.Background(), storage.ModeSetPin, ch.Address())
if err != nil {
t.Fatal(err)
}
pinAddrs = append(pinAddrs, ch.Address())
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -285,7 +286,7 @@ func TestGCAfterPin(t *testing.T) {
t.Run("gc index count", newItemsCountTest(db.gcIndex, int(0)))
for _, hash := range pinAddrs {
_, err := db.Get(context.Background(), chunk.ModeGetRequest, hash)
_, err := db.Get(context.Background(), storage.ModeGetRequest, hash)
if err != nil {
t.Fatal(err)
}
......@@ -306,18 +307,18 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
testHookCollectGarbageChan <- collectedCount
})()
addrs := make([]chunk.Address, 0)
addrs := make([]swarm.Address, 0)
// upload random chunks just up to the capacity
for i := 0; i < int(db.capacity)-1; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -336,7 +337,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// request the latest synced chunk
// to prioritize it in the gc index
// not to be collected
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[0])
if err != nil {
t.Fatal(err)
}
......@@ -355,11 +356,11 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// upload and sync another chunk to trigger
// garbage collection
ch := generateTestRandomChunk()
_, err = db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err = db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -399,7 +400,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// requested chunk should not be removed
t.Run("get requested chunk", func(t *testing.T) {
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[0])
if err != nil {
t.Fatal(err)
}
......@@ -407,15 +408,15 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// the second synced chunk should be removed
t.Run("get gc-ed chunk", func(t *testing.T) {
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[1])
if err != chunk.ErrChunkNotFound {
t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound)
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[1])
if err != storage.ErrNotFound {
t.Errorf("got error %v, want %v", err, storage.ErrNotFound)
}
})
// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[len(addrs)-1])
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
......@@ -445,12 +446,12 @@ func TestDB_gcSize(t *testing.T) {
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
......
......@@ -22,7 +22,8 @@ import (
"math/rand"
"testing"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
// TestDB_pullIndex validates the ordering of keys in pull index.
......@@ -44,7 +45,7 @@ func TestDB_pullIndex(t *testing.T) {
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
......@@ -56,8 +57,8 @@ func TestDB_pullIndex(t *testing.T) {
}
testItemsOrder(t, db.pullIndex, chunks, func(i, j int) (less bool) {
poi := chunk.Proximity(db.baseKey, chunks[i].Address())
poj := chunk.Proximity(db.baseKey, chunks[j].Address())
poi := swarm.Proximity(db.baseKey, chunks[i].Address().Bytes())
poj := swarm.Proximity(db.baseKey, chunks[j].Address().Bytes())
if poi < poj {
return true
}
......@@ -70,7 +71,7 @@ func TestDB_pullIndex(t *testing.T) {
if chunks[i].binID > chunks[j].binID {
return false
}
return bytes.Compare(chunks[i].Address(), chunks[j].Address()) == -1
return bytes.Compare(chunks[i].Address().Bytes(), chunks[j].Address().Bytes()) == -1
})
}
......@@ -89,7 +90,7 @@ func TestDB_gcIndex(t *testing.T) {
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
......@@ -117,7 +118,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("request unsynced", func(t *testing.T) {
ch := chunks[1]
_, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
_, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -134,7 +135,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("sync one chunk", func(t *testing.T) {
ch := chunks[0]
err := db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
err := db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -147,7 +148,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("sync all chunks", func(t *testing.T) {
for i := range chunks {
err := db.Set(context.Background(), chunk.ModeSetSyncPull, chunks[i].Address())
err := db.Set(context.Background(), storage.ModeSetSyncPull, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
......@@ -161,7 +162,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("request one chunk", func(t *testing.T) {
i := 6
_, err := db.Get(context.Background(), chunk.ModeGetRequest, chunks[i].Address())
_, err := db.Get(context.Background(), storage.ModeGetRequest, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
......@@ -185,7 +186,7 @@ func TestDB_gcIndex(t *testing.T) {
})
for _, ch := range chunks {
_, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
_, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -201,7 +202,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("remove one chunk", func(t *testing.T) {
i := 3
err := db.Set(context.Background(), chunk.ModeSetRemove, chunks[i].Address())
err := db.Set(context.Background(), storage.ModeSetRemove, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
......
......@@ -25,14 +25,16 @@ import (
"time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/ethersphere/swarm/shed"
"github.com/ethersphere/swarm/storage/mock"
"github.com/prometheus/client_golang/prometheus"
)
// DB implements chunk.Store.
var _ chunk.Store = &DB{}
var _ storage.Store = &DB{}
var (
// ErrInvalidMode is retuned when an unknown Mode
......@@ -52,7 +54,7 @@ var (
// database related objects.
type DB struct {
shed *shed.DB
tags *chunk.Tags
tags *tags.Tags
// schema name of loaded data
schemaName shed.StringField
......@@ -141,7 +143,7 @@ type Options struct {
Capacity uint64
// MetricsPrefix defines a prefix for metrics names.
MetricsPrefix string
Tags *chunk.Tags
Tags *tags.Tags
// PutSetCheckFunc is a function called after a Put of a chunk
// to verify whether that chunk needs to be Set and added to
// garbage collection index too
......@@ -297,7 +299,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
db.pullIndex, err = db.shed.NewIndex("PO|BinID->Hash|Tag", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 41)
key[0] = db.po(fields.Address)
key[0] = db.po(swarm.NewAddress(fields.Address))
binary.BigEndian.PutUint64(key[1:9], fields.BinID)
return key, nil
},
......@@ -468,8 +470,8 @@ func (db *DB) Close() (err error) {
// po computes the proximity order between the address
// and database base key.
func (db *DB) po(addr chunk.Address) (bin uint8) {
return uint8(chunk.Proximity(db.baseKey, addr))
func (db *DB) po(addr swarm.Address) (bin uint8) {
return uint8(swarm.Proximity(db.baseKey, addr.Bytes()))
}
// DebugIndices returns the index sizes for all indexes in localstore
......@@ -501,28 +503,28 @@ func (db *DB) DebugIndices() (indexInfo map[string]int, err error) {
}
// chunkToItem creates new Item with data provided by the Chunk.
func chunkToItem(ch chunk.Chunk) shed.Item {
func chunkToItem(ch swarm.Chunk) shed.Item {
return shed.Item{
Address: ch.Address(),
Address: ch.Address().Bytes(),
Data: ch.Data(),
Tag: ch.TagID(),
}
}
// addressToItem creates new Item with a provided address.
func addressToItem(addr chunk.Address) shed.Item {
func addressToItem(addr swarm.Address) shed.Item {
return shed.Item{
Address: addr,
Address: addr.Bytes(),
}
}
// addressesToItems constructs a slice of Items with only
// addresses set on them.
func addressesToItems(addrs ...chunk.Address) []shed.Item {
func addressesToItems(addrs ...swarm.Address) []shed.Item {
items := make([]shed.Item, len(addrs))
for i, addr := range addrs {
items[i] = shed.Item{
Address: addr,
Address: addr.Bytes(),
}
}
return items
......
......@@ -30,8 +30,9 @@ import (
"time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/swarm/chunk"
chunktesting "github.com/ethersphere/swarm/chunk/testing"
"github.com/ethersphere/bee/pkg/storage"
chunktesting "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
......@@ -64,17 +65,17 @@ func TestDB(t *testing.T) {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
got, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got.Address(), ch.Address()) {
if !got.Address().Equal(ch.Address()) {
t.Errorf("got address %x, want %x", got.Address(), ch.Address())
}
if !bytes.Equal(got.Data(), ch.Data()) {
......@@ -118,7 +119,7 @@ func TestDB_updateGCSem(t *testing.T) {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
......@@ -126,7 +127,7 @@ func TestDB_updateGCSem(t *testing.T) {
// get more chunks then maxParallelUpdateGC
// in time shorter then updateGCSleep
for i := 0; i < 5; i++ {
_, err = db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
_, err = db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -174,8 +175,8 @@ var (
)
// chunkAddresses return chunk addresses of provided chunks.
func chunkAddresses(chunks []chunk.Chunk) []chunk.Address {
addrs := make([]chunk.Address, len(chunks))
func chunkAddresses(chunks []swarm.Chunk) []swarm.Address {
addrs := make([]swarm.Address, len(chunks))
for i, ch := range chunks {
addrs[i] = ch.Address()
}
......@@ -215,23 +216,23 @@ var multiChunkTestCases = []struct {
func TestGenerateTestRandomChunk(t *testing.T) {
c1 := generateTestRandomChunk()
c2 := generateTestRandomChunk()
addrLen := len(c1.Address())
addrLen := len(c1.Address().Bytes())
if addrLen != 32 {
t.Errorf("first chunk address length %v, want %v", addrLen, 32)
}
dataLen := len(c1.Data())
if dataLen != chunk.DefaultSize {
t.Errorf("first chunk data length %v, want %v", dataLen, chunk.DefaultSize)
if dataLen != swarm.ChunkSize {
t.Errorf("first chunk data length %v, want %v", dataLen, swarm.ChunkSize)
}
addrLen = len(c2.Address())
addrLen = len(c2.Address().Bytes())
if addrLen != 32 {
t.Errorf("second chunk address length %v, want %v", addrLen, 32)
}
dataLen = len(c2.Data())
if dataLen != chunk.DefaultSize {
t.Errorf("second chunk data length %v, want %v", dataLen, chunk.DefaultSize)
if dataLen != swarm.ChunkSize {
t.Errorf("second chunk data length %v, want %v", dataLen, swarm.ChunkSize)
}
if bytes.Equal(c1.Address(), c2.Address()) {
if c1.Address().Equal(c2.Address()) {
t.Error("fake chunks addresses do not differ")
}
if bytes.Equal(c1.Data(), c2.Data()) {
......@@ -241,7 +242,7 @@ func TestGenerateTestRandomChunk(t *testing.T) {
// newRetrieveIndexesTest returns a test function that validates if the right
// chunk values are in the retrieval indexes
func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
func newRetrieveIndexesTest(db *DB, chunk swarm.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
......@@ -249,7 +250,7 @@ func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTim
if err != nil {
t.Fatal(err)
}
validateItem(t, item, chunk.Address(), chunk.Data(), storeTimestamp, 0)
validateItem(t, item, chunk.Address().Bytes(), chunk.Data(), storeTimestamp, 0)
// access index should not be set
wantErr := leveldb.ErrNotFound
......@@ -262,7 +263,7 @@ func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTim
// newRetrieveIndexesTestWithAccess returns a test function that validates if the right
// chunk values are in the retrieval indexes when access time must be stored.
func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
func newRetrieveIndexesTestWithAccess(db *DB, ch swarm.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
......@@ -270,64 +271,64 @@ func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, ac
if err != nil {
t.Fatal(err)
}
validateItem(t, item, ch.Address(), ch.Data(), storeTimestamp, 0)
validateItem(t, item, ch.Address().Bytes(), ch.Data(), storeTimestamp, 0)
if accessTimestamp > 0 {
item, err = db.retrievalAccessIndex.Get(addressToItem(ch.Address()))
if err != nil {
t.Fatal(err)
}
validateItem(t, item, ch.Address(), nil, 0, accessTimestamp)
validateItem(t, item, ch.Address().Bytes(), nil, 0, accessTimestamp)
}
}
}
// newPullIndexTest returns a test function that validates if the right
// chunk values are in the pull index.
func newPullIndexTest(db *DB, ch chunk.Chunk, binID uint64, wantError error) func(t *testing.T) {
func newPullIndexTest(db *DB, ch swarm.Chunk, binID uint64, wantError error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
item, err := db.pullIndex.Get(shed.Item{
Address: ch.Address(),
Address: ch.Address().Bytes(),
BinID: binID,
})
if err != wantError {
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, ch.Address(), nil, 0, 0)
validateItem(t, item, ch.Address().Bytes(), nil, 0, 0)
}
}
}
// newPushIndexTest returns a test function that validates if the right
// chunk values are in the push index.
func newPushIndexTest(db *DB, ch chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
func newPushIndexTest(db *DB, ch swarm.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
item, err := db.pushIndex.Get(shed.Item{
Address: ch.Address(),
Address: ch.Address().Bytes(),
StoreTimestamp: storeTimestamp,
})
if err != wantError {
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, ch.Address(), nil, storeTimestamp, 0)
validateItem(t, item, ch.Address().Bytes(), nil, storeTimestamp, 0)
}
}
}
// newGCIndexTest returns a test function that validates if the right
// chunk values are in the GC index.
func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64, binID uint64, wantError error) func(t *testing.T) {
func newGCIndexTest(db *DB, chunk swarm.Chunk, storeTimestamp, accessTimestamp int64, binID uint64, wantError error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
item, err := db.gcIndex.Get(shed.Item{
Address: chunk.Address(),
Address: chunk.Address().Bytes(),
BinID: binID,
AccessTimestamp: accessTimestamp,
})
......@@ -335,7 +336,7 @@ func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp i
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, chunk.Address(), nil, 0, accessTimestamp)
validateItem(t, item, chunk.Address().Bytes(), nil, 0, accessTimestamp)
}
}
}
......@@ -387,7 +388,7 @@ func newIndexGCSizeTest(db *DB) func(t *testing.T) {
// testIndexChunk embeds storageChunk with additional data that is stored
// in database. It is used for index values validations.
type testIndexChunk struct {
chunk.Chunk
swarm.Chunk
binID uint64
}
......@@ -406,7 +407,7 @@ func testItemsOrder(t *testing.T, i shed.Index, chunks []testIndexChunk, sortFun
err := i.Iterate(func(item shed.Item) (stop bool, err error) {
want := chunks[cursor].Address()
got := item.Address
if !bytes.Equal(got, want) {
if !bytes.Equal(got, want.Bytes()) {
return true, fmt.Errorf("got address %x at position %v, want %x", got, cursor, want)
}
cursor++
......@@ -538,7 +539,7 @@ func TestDBDebugIndexes(t *testing.T) {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
......@@ -552,7 +553,7 @@ func TestDBDebugIndexes(t *testing.T) {
testIndexCounts(t, 1, 1, 0, 0, 0, 1, 0, indexCounts)
// set the chunk for pinning and expect the index count to grow
err = db.Set(context.Background(), chunk.ModeSetPin, ch.Address())
err = db.Set(context.Background(), storage.ModeSetPin, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -566,7 +567,7 @@ func TestDBDebugIndexes(t *testing.T) {
testIndexCounts(t, 1, 1, 0, 1, 1, 1, 0, indexCounts)
// set the chunk as accessed and expect the access index to grow
err = db.Set(context.Background(), chunk.ModeSetAccess, ch.Address())
err = db.Set(context.Background(), storage.ModeSetAccess, ch.Address())
if err != nil {
t.Fatal(err)
}
......
......@@ -20,17 +20,18 @@ import (
"context"
"time"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
// Get returns a chunk from the database. If the chunk is
// not found chunk.ErrChunkNotFound will be returned.
// not found storage.ErrNotFound will be returned.
// All required indexes will be updated required by the
// Getter Mode. Get is required to implement chunk.Store
// interface.
func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
func (db *DB) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (ch swarm.Chunk, err error) {
db.metrics.ModeGet.Inc()
defer totalTimeMetric(db.metrics.TotalTimeGet, time.Now())
......@@ -43,16 +44,16 @@ func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) (
out, err := db.get(mode, addr)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, chunk.ErrChunkNotFound
return nil, storage.ErrNotFound
}
return nil, err
}
return chunk.NewChunk(out.Address, out.Data).WithPinCounter(out.PinCounter), nil
return swarm.NewChunk(swarm.NewAddress(out.Address), out.Data).WithPinCounter(out.PinCounter), nil
}
// get returns Item from the retrieval index
// and updates other indexes.
func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err error) {
func (db *DB) get(mode storage.ModeGet, addr swarm.Address) (out shed.Item, err error) {
item := addressToItem(addr)
out, err = db.retrievalDataIndex.Get(item)
......@@ -61,10 +62,10 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er
}
switch mode {
// update the access timestamp and gc index
case chunk.ModeGetRequest:
case storage.ModeGetRequest:
db.updateGCItems(out)
case chunk.ModeGetPin:
case storage.ModeGetPin:
pinnedItem, err := db.pinIndex.Get(item)
if err != nil {
return out, err
......@@ -72,8 +73,8 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er
return pinnedItem, nil
// no updates to indexes
case chunk.ModeGetSync:
case chunk.ModeGetLookup:
case storage.ModeGetSync:
case storage.ModeGetLookup:
default:
return out, ErrInvalidMode
}
......
......@@ -20,16 +20,17 @@ import (
"context"
"time"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
// GetMulti returns chunks from the database. If one of the chunks is not found
// chunk.ErrChunkNotFound will be returned. All required indexes will be updated
// storage.ErrNotFound will be returned. All required indexes will be updated
// required by the Getter Mode. GetMulti is required to implement chunk.Store
// interface.
func (db *DB) GetMulti(ctx context.Context, mode chunk.ModeGet, addrs ...chunk.Address) (chunks []chunk.Chunk, err error) {
func (db *DB) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm.Address) (chunks []swarm.Chunk, err error) {
db.metrics.ModeGetMulti.Inc()
defer totalTimeMetric(db.metrics.TotalTimeGetMulti, time.Now())
......@@ -42,23 +43,23 @@ func (db *DB) GetMulti(ctx context.Context, mode chunk.ModeGet, addrs ...chunk.A
out, err := db.getMulti(mode, addrs...)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, chunk.ErrChunkNotFound
return nil, storage.ErrNotFound
}
return nil, err
}
chunks = make([]chunk.Chunk, len(out))
chunks = make([]swarm.Chunk, len(out))
for i, ch := range out {
chunks[i] = chunk.NewChunk(ch.Address, ch.Data).WithPinCounter(ch.PinCounter)
chunks[i] = swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data).WithPinCounter(ch.PinCounter)
}
return chunks, nil
}
// getMulti returns Items from the retrieval index
// and updates other indexes.
func (db *DB) getMulti(mode chunk.ModeGet, addrs ...chunk.Address) (out []shed.Item, err error) {
func (db *DB) getMulti(mode storage.ModeGet, addrs ...swarm.Address) (out []shed.Item, err error) {
out = make([]shed.Item, len(addrs))
for i, addr := range addrs {
out[i].Address = addr
out[i].Address = addr.Bytes()
}
err = db.retrievalDataIndex.Fill(out)
......@@ -68,18 +69,18 @@ func (db *DB) getMulti(mode chunk.ModeGet, addrs ...chunk.Address) (out []shed.I
switch mode {
// update the access timestamp and gc index
case chunk.ModeGetRequest:
case storage.ModeGetRequest:
db.updateGCItems(out...)
case chunk.ModeGetPin:
case storage.ModeGetPin:
err := db.pinIndex.Fill(out)
if err != nil {
return nil, err
}
// no updates to indexes
case chunk.ModeGetSync:
case chunk.ModeGetLookup:
case storage.ModeGetSync:
case storage.ModeGetLookup:
default:
return out, ErrInvalidMode
}
......
......@@ -21,7 +21,7 @@ import (
"reflect"
"testing"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
)
// TestModeGetMulti stores chunks and validates that GetMulti
......@@ -29,11 +29,11 @@ import (
func TestModeGetMulti(t *testing.T) {
const chunkCount = 10
for _, mode := range []chunk.ModeGet{
chunk.ModeGetRequest,
chunk.ModeGetSync,
chunk.ModeGetLookup,
chunk.ModeGetPin,
for _, mode := range []storage.ModeGet{
storage.ModeGetRequest,
storage.ModeGetSync,
storage.ModeGetLookup,
storage.ModeGetPin,
} {
t.Run(mode.String(), func(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
......@@ -41,15 +41,15 @@ func TestModeGetMulti(t *testing.T) {
chunks := generateTestRandomChunks(chunkCount)
_, err := db.Put(context.Background(), chunk.ModePutUpload, chunks...)
_, err := db.Put(context.Background(), storage.ModePutUpload, chunks...)
if err != nil {
t.Fatal(err)
}
if mode == chunk.ModeGetPin {
if mode == storage.ModeGetPin {
// pin chunks so that it is not returned as not found by pinIndex
for i, ch := range chunks {
err := db.Set(context.Background(), chunk.ModeSetPin, ch.Address())
err := db.Set(context.Background(), storage.ModeSetPin, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -72,7 +72,7 @@ func TestModeGetMulti(t *testing.T) {
missingChunk := generateTestRandomChunk()
want := chunk.ErrChunkNotFound
want := storage.ErrNotFound
_, err = db.GetMulti(context.Background(), mode, append(addrs, missingChunk.Address())...)
if err != want {
t.Errorf("got error %v, want %v", err, want)
......
......@@ -22,7 +22,7 @@ import (
"testing"
"time"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
)
// TestModeGetRequest validates ModeGetRequest index values on the provided DB.
......@@ -37,7 +37,7 @@ func TestModeGetRequest(t *testing.T) {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
......@@ -53,14 +53,14 @@ func TestModeGetRequest(t *testing.T) {
})()
t.Run("get unsynced", func(t *testing.T) {
got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
got, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
if !bytes.Equal(got.Address(), ch.Address()) {
if !got.Address().Equal(ch.Address()) {
t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
......@@ -76,20 +76,20 @@ func TestModeGetRequest(t *testing.T) {
})
// set chunk to synced state
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
t.Run("first get", func(t *testing.T) {
got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
got, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
if !bytes.Equal(got.Address(), ch.Address()) {
if !got.Address().Equal(ch.Address()) {
t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
......@@ -112,14 +112,14 @@ func TestModeGetRequest(t *testing.T) {
return accessTimestamp
})()
got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
got, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
if !bytes.Equal(got.Address(), ch.Address()) {
if !got.Address().Equal(ch.Address()) {
t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
......@@ -137,14 +137,14 @@ func TestModeGetRequest(t *testing.T) {
})
t.Run("multi", func(t *testing.T) {
got, err := db.GetMulti(context.Background(), chunk.ModeGetRequest, ch.Address())
got, err := db.GetMulti(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
if !bytes.Equal(got[0].Address(), ch.Address()) {
if !got[0].Address().Equal(ch.Address()) {
t.Errorf("got chunk address %x, want %x", got[0].Address(), ch.Address())
}
......@@ -174,17 +174,17 @@ func TestModeGetSync(t *testing.T) {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
got, err := db.Get(context.Background(), chunk.ModeGetSync, ch.Address())
got, err := db.Get(context.Background(), storage.ModeGetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got.Address(), ch.Address()) {
if !got.Address().Equal(ch.Address()) {
t.Errorf("got chunk address %x, want %x", got.Address(), ch.Address())
}
......@@ -199,12 +199,12 @@ func TestModeGetSync(t *testing.T) {
t.Run("gc size", newIndexGCSizeTest(db))
t.Run("multi", func(t *testing.T) {
got, err := db.GetMulti(context.Background(), chunk.ModeGetSync, ch.Address())
got, err := db.GetMulti(context.Background(), storage.ModeGetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got[0].Address(), ch.Address()) {
if !got[0].Address().Equal(ch.Address()) {
t.Errorf("got chunk address %x, want %x", got[0].Address(), ch.Address())
}
......
......@@ -20,11 +20,11 @@ import (
"context"
"time"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/swarm"
)
// Has returns true if the chunk is stored in database.
func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) {
func (db *DB) Has(ctx context.Context, addr swarm.Address) (bool, error) {
db.metrics.ModeHas.Inc()
defer totalTimeMetric(db.metrics.TotalTimeHas, time.Now())
......@@ -38,7 +38,7 @@ func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) {
// HasMulti returns a slice of booleans which represent if the provided chunks
// are stored in database.
func (db *DB) HasMulti(ctx context.Context, addrs ...chunk.Address) ([]bool, error) {
func (db *DB) HasMulti(ctx context.Context, addrs ...swarm.Address) ([]bool, error) {
db.metrics.ModeHasMulti.Inc()
defer totalTimeMetric(db.metrics.TotalTimeHasMulti, time.Now())
......
......@@ -23,7 +23,7 @@ import (
"testing"
"time"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
)
// TestHas validates that Has method is returning true for
......@@ -34,7 +34,7 @@ func TestHas(t *testing.T) {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
......@@ -75,7 +75,7 @@ func TestHasMulti(t *testing.T) {
// randomly exclude half of the chunks
continue
}
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
......
......@@ -17,20 +17,20 @@
package localstore
import (
"bytes"
"context"
"time"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
// Put stores Chunks to database and depending
// on the Putter mode, it updates required indexes.
// Put is required to implement chunk.Store
// Put is required to implement storage.Store
// interface.
func (db *DB) Put(ctx context.Context, mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err error) {
func (db *DB) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
db.metrics.ModePut.Inc()
defer totalTimeMetric(db.metrics.TotalTimePut, time.Now())
......@@ -50,7 +50,7 @@ func (db *DB) Put(ctx context.Context, mode chunk.ModePut, chs ...chunk.Chunk) (
// and following ones will have exist set to true for their index in exist
// slice. This is the same behaviour as if the same chunks are passed one by one
// in multiple put method calls.
func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err error) {
func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
......@@ -72,7 +72,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
binIDs := make(map[uint8]uint64)
switch mode {
case chunk.ModePutRequest:
case storage.ModePutRequest:
for i, ch := range chs {
if containsChunk(ch.Address(), chs[:i]...) {
exist[i] = true
......@@ -86,7 +86,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
gcSizeChange += c
}
case chunk.ModePutUpload:
case storage.ModePutUpload:
for i, ch := range chs {
if containsChunk(ch.Address(), chs[:i]...) {
exist[i] = true
......@@ -106,7 +106,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
gcSizeChange += c
}
case chunk.ModePutSync:
case storage.ModePutSync:
for i, ch := range chs {
if containsChunk(ch.Address(), chs[:i]...) {
exist[i] = true
......@@ -174,7 +174,7 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she
item.StoreTimestamp = now()
}
if item.BinID == 0 {
item.BinID, err = db.incBinID(binIDs, db.po(item.Address))
item.BinID, err = db.incBinID(binIDs, db.po(swarm.NewAddress(item.Address)))
if err != nil {
return false, 0, err
}
......@@ -221,7 +221,7 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed
}
item.StoreTimestamp = now()
item.BinID, err = db.incBinID(binIDs, db.po(item.Address))
item.BinID, err = db.incBinID(binIDs, db.po(swarm.NewAddress(item.Address)))
if err != nil {
return false, 0, err
}
......@@ -276,7 +276,7 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I
}
item.StoreTimestamp = now()
item.BinID, err = db.incBinID(binIDs, db.po(item.Address))
item.BinID, err = db.incBinID(binIDs, db.po(swarm.NewAddress(item.Address)))
if err != nil {
return false, 0, err
}
......@@ -363,9 +363,9 @@ func (db *DB) incBinID(binIDs map[uint8]uint64, po uint8) (id uint64, err error)
// containsChunk returns true if the chunk with a specific address
// is present in the provided chunk slice.
func containsChunk(addr chunk.Address, chs ...chunk.Chunk) bool {
func containsChunk(addr swarm.Address, chs ...swarm.Chunk) bool {
for _, c := range chs {
if bytes.Equal(addr, c.Address()) {
if addr.Equal(c.Address()) {
return true
}
}
......
......@@ -24,7 +24,8 @@ import (
"testing"
"time"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)
......@@ -48,7 +49,7 @@ func TestModePutRequest(t *testing.T) {
storeTimestamp = wantTimestamp
_, err := db.Put(context.Background(), chunk.ModePutRequest, chunks...)
_, err := db.Put(context.Background(), storage.ModePutRequest, chunks...)
if err != nil {
t.Fatal(err)
}
......@@ -67,7 +68,7 @@ func TestModePutRequest(t *testing.T) {
return wantTimestamp
})()
_, err := db.Put(context.Background(), chunk.ModePutRequest, chunks...)
_, err := db.Put(context.Background(), storage.ModePutRequest, chunks...)
if err != nil {
t.Fatal(err)
}
......@@ -97,7 +98,7 @@ func TestModePutSync(t *testing.T) {
chunks := generateTestRandomChunks(tc.count)
_, err := db.Put(context.Background(), chunk.ModePutSync, chunks...)
_, err := db.Put(context.Background(), storage.ModePutSync, chunks...)
if err != nil {
t.Fatal(err)
}
......@@ -129,7 +130,7 @@ func TestModePutUpload(t *testing.T) {
chunks := generateTestRandomChunks(tc.count)
_, err := db.Put(context.Background(), chunk.ModePutUpload, chunks...)
_, err := db.Put(context.Background(), storage.ModePutUpload, chunks...)
if err != nil {
t.Fatal(err)
}
......@@ -175,7 +176,7 @@ func TestModePutUpload_parallel(t *testing.T) {
uploadsCount := 100
workerCount := 100
chunksChan := make(chan []chunk.Chunk)
chunksChan := make(chan []swarm.Chunk)
errChan := make(chan error)
doneChan := make(chan struct{})
defer close(doneChan)
......@@ -189,7 +190,7 @@ func TestModePutUpload_parallel(t *testing.T) {
if !ok {
return
}
_, err := db.Put(context.Background(), chunk.ModePutUpload, chunks...)
_, err := db.Put(context.Background(), storage.ModePutUpload, chunks...)
select {
case errChan <- err:
case <-doneChan:
......@@ -201,7 +202,7 @@ func TestModePutUpload_parallel(t *testing.T) {
}(i)
}
chunks := make([]chunk.Chunk, 0)
chunks := make([]swarm.Chunk, 0)
var chunksMu sync.Mutex
// send chunks to workers
......@@ -233,12 +234,12 @@ func TestModePutUpload_parallel(t *testing.T) {
chunksMu.Lock()
defer chunksMu.Unlock()
for _, ch := range chunks {
got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
got, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got.Data(), ch.Data()) {
t.Fatalf("got chunk %s data %x, want %x", ch.Address().Hex(), got.Data(), ch.Data())
t.Fatalf("got chunk %s data %x, want %x", ch.Address(), got.Data(), ch.Data())
}
}
})
......@@ -255,25 +256,25 @@ func TestModePut_sameChunk(t *testing.T) {
for _, tcn := range []struct {
name string
mode chunk.ModePut
mode storage.ModePut
pullIndex bool
pushIndex bool
}{
{
name: "ModePutRequest",
mode: chunk.ModePutRequest,
mode: storage.ModePutRequest,
pullIndex: false,
pushIndex: false,
},
{
name: "ModePutUpload",
mode: chunk.ModePutUpload,
mode: storage.ModePutUpload,
pullIndex: true,
pushIndex: true,
},
{
name: "ModePutSync",
mode: chunk.ModePutSync,
mode: storage.ModePutSync,
pullIndex: true,
pushIndex: false,
},
......@@ -327,14 +328,14 @@ func TestModePut_addToGc(t *testing.T) {
opts := &Options{PutToGCCheck: func(_ []byte) bool { return retVal }}
for _, m := range []struct {
mode chunk.ModePut
mode storage.ModePut
putToGc bool
}{
{mode: chunk.ModePutSync, putToGc: true},
{mode: chunk.ModePutSync, putToGc: false},
{mode: chunk.ModePutUpload, putToGc: true},
{mode: chunk.ModePutUpload, putToGc: false},
{mode: chunk.ModePutRequest, putToGc: true}, // in ModePutRequest we always insert to GC, so putToGc=false not needed
{mode: storage.ModePutSync, putToGc: true},
{mode: storage.ModePutSync, putToGc: false},
{mode: storage.ModePutUpload, putToGc: true},
{mode: storage.ModePutUpload, putToGc: false},
{mode: storage.ModePutRequest, putToGc: true}, // in ModePutRequest we always insert to GC, so putToGc=false not needed
} {
for _, tc := range multiChunkTestCases {
t.Run(tc.name, func(t *testing.T) {
......@@ -380,14 +381,14 @@ func TestModePut_addToGcExisting(t *testing.T) {
opts := &Options{PutToGCCheck: func(_ []byte) bool { return retVal }}
for _, m := range []struct {
mode chunk.ModePut
mode storage.ModePut
putToGc bool
}{
{mode: chunk.ModePutSync, putToGc: true},
{mode: chunk.ModePutSync, putToGc: false},
{mode: chunk.ModePutUpload, putToGc: true},
{mode: chunk.ModePutUpload, putToGc: false},
{mode: chunk.ModePutRequest, putToGc: true}, // in ModePutRequest we always insert to GC, so putToGc=false not needed
{mode: storage.ModePutSync, putToGc: true},
{mode: storage.ModePutSync, putToGc: false},
{mode: storage.ModePutUpload, putToGc: true},
{mode: storage.ModePutUpload, putToGc: false},
{mode: storage.ModePutRequest, putToGc: true}, // in ModePutRequest we always insert to GC, so putToGc=false not needed
} {
for _, tc := range multiChunkTestCases {
t.Run(tc.name, func(t *testing.T) {
......@@ -442,10 +443,10 @@ func TestModePut_addToGcExisting(t *testing.T) {
// TestPutDuplicateChunks validates the expected behaviour for
// passing duplicate chunks to the Put method.
func TestPutDuplicateChunks(t *testing.T) {
for _, mode := range []chunk.ModePut{
chunk.ModePutUpload,
chunk.ModePutRequest,
chunk.ModePutSync,
for _, mode := range []storage.ModePut{
storage.ModePutUpload,
storage.ModePutRequest,
storage.ModePutSync,
} {
t.Run(mode.String(), func(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
......@@ -466,12 +467,12 @@ func TestPutDuplicateChunks(t *testing.T) {
newItemsCountTest(db.retrievalDataIndex, 1)(t)
got, err := db.Get(context.Background(), chunk.ModeGetLookup, ch.Address())
got, err := db.Get(context.Background(), storage.ModeGetLookup, ch.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got.Address(), ch.Address()) {
t.Errorf("got chunk address %s, want %s", got.Address().Hex(), ch.Address().Hex())
if !got.Address().Equal(ch.Address()) {
t.Errorf("got chunk address %s, want %s", got.Address(), ch.Address())
}
})
}
......@@ -544,7 +545,7 @@ func benchmarkPutUpload(b *testing.B, o *Options, count, maxParallelUploads int)
db, cleanupFunc := newTestDB(b, o)
defer cleanupFunc()
chunks := make([]chunk.Chunk, count)
chunks := make([]swarm.Chunk, count)
for i := 0; i < count; i++ {
chunks[i] = generateTestRandomChunk()
}
......@@ -559,7 +560,7 @@ func benchmarkPutUpload(b *testing.B, o *Options, count, maxParallelUploads int)
go func(i int) {
defer func() { <-sem }()
_, err := db.Put(context.Background(), chunk.ModePutUpload, chunks[i])
_, err := db.Put(context.Background(), storage.ModePutUpload, chunks[i])
errs <- err
}(i)
}
......
......@@ -21,7 +21,9 @@ import (
"errors"
"time"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/syndtr/goleveldb/leveldb"
)
......@@ -29,7 +31,7 @@ import (
// chunks represented by provided addresses.
// Set is required to implement chunk.Store
// interface.
func (db *DB) Set(ctx context.Context, mode chunk.ModeSet, addrs ...chunk.Address) (err error) {
func (db *DB) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) (err error) {
db.metrics.ModePut.Inc()
defer totalTimeMetric(db.metrics.TotalTimeSet, time.Now())
err = db.set(mode, addrs...)
......@@ -43,7 +45,7 @@ func (db *DB) Set(ctx context.Context, mode chunk.ModeSet, addrs ...chunk.Addres
// chunks represented by provided addresses.
// It acquires lockAddr to protect two calls
// of this function for the same address in parallel.
func (db *DB) set(mode chunk.ModeSet, addrs ...chunk.Address) (err error) {
func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) {
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
......@@ -56,7 +58,7 @@ func (db *DB) set(mode chunk.ModeSet, addrs ...chunk.Address) (err error) {
triggerPullFeed := make(map[uint8]struct{}) // signal pull feed subscriptions to iterate
switch mode {
case chunk.ModeSetAccess:
case storage.ModeSetAccess:
// A lazy populated map of bin ids to properly set
// BinID values for new chunks based on initial value from database
// and incrementing them.
......@@ -74,7 +76,7 @@ func (db *DB) set(mode chunk.ModeSet, addrs ...chunk.Address) (err error) {
db.binIDs.PutInBatch(batch, uint64(po), id)
}
case chunk.ModeSetSyncPush, chunk.ModeSetSyncPull:
case storage.ModeSetSyncPush, storage.ModeSetSyncPull:
for _, addr := range addrs {
c, err := db.setSync(batch, addr, mode)
if err != nil {
......@@ -83,7 +85,7 @@ func (db *DB) set(mode chunk.ModeSet, addrs ...chunk.Address) (err error) {
gcSizeChange += c
}
case chunk.ModeSetRemove:
case storage.ModeSetRemove:
for _, addr := range addrs {
c, err := db.setRemove(batch, addr)
if err != nil {
......@@ -92,14 +94,14 @@ func (db *DB) set(mode chunk.ModeSet, addrs ...chunk.Address) (err error) {
gcSizeChange += c
}
case chunk.ModeSetPin:
case storage.ModeSetPin:
for _, addr := range addrs {
err := db.setPin(batch, addr)
if err != nil {
return err
}
}
case chunk.ModeSetUnpin:
case storage.ModeSetUnpin:
for _, addr := range addrs {
err := db.setUnpin(batch, addr)
if err != nil {
......@@ -129,7 +131,7 @@ func (db *DB) set(mode chunk.ModeSet, addrs ...chunk.Address) (err error) {
// setAccess sets the chunk access time by updating required indexes:
// - add to pull, insert to gc
// Provided batch and binID map are updated.
func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr chunk.Address, po uint8) (gcSizeChange int64, err error) {
func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr swarm.Address, po uint8) (gcSizeChange int64, err error) {
item := addressToItem(addr)
......@@ -194,7 +196,7 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr chun
// from push sync index
// - update to gc index happens given item does not exist in pin index
// Provided batch is updated.
func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeSet) (gcSizeChange int64, err error) {
func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.ModeSet) (gcSizeChange int64, err error) {
item := addressToItem(addr)
// need to get access timestamp here as it is not
......@@ -220,7 +222,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
item.BinID = i.BinID
switch mode {
case chunk.ModeSetSyncPull:
case storage.ModeSetSyncPull:
// if we are setting a chunk for pullsync we expect it to be in the index
// if it has a tag - we increment it and set the index item to _not_ contain the tag reference
// this prevents duplicate increments
......@@ -245,7 +247,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
// since pull sync does not guarantee that
// a chunk has reached its NN, we can only mark
// it as Sent
t.Inc(chunk.StateSent)
t.Inc(tags.StateSent)
// setting the tag to zero makes sure that
// we don't increment the same tag twice when syncing
......@@ -258,7 +260,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
}
}
}
case chunk.ModeSetSyncPush:
case storage.ModeSetSyncPush:
i, err := db.pushIndex.Get(item)
if err != nil {
if err == leveldb.ErrNotFound {
......@@ -282,7 +284,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
return 0, errors.New("got an anonymous chunk in push sync index")
}
t.Inc(chunk.StateSynced)
t.Inc(tags.StateSynced)
}
}
......@@ -331,7 +333,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
// setRemove removes the chunk by updating indexes:
// - delete from retrieve, pull, gc
// Provided batch is updated.
func (db *DB) setRemove(batch *leveldb.Batch, addr chunk.Address) (gcSizeChange int64, err error) {
func (db *DB) setRemove(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange int64, err error) {
item := addressToItem(addr)
// need to get access timestamp here as it is not
......@@ -381,7 +383,7 @@ func (db *DB) setRemove(batch *leveldb.Batch, addr chunk.Address) (gcSizeChange
// setPin increments pin counter for the chunk by updating
// pin index and sets the chunk to be excluded from garbage collection.
// Provided batch is updated.
func (db *DB) setPin(batch *leveldb.Batch, addr chunk.Address) (err error) {
func (db *DB) setPin(batch *leveldb.Batch, addr swarm.Address) (err error) {
item := addressToItem(addr)
// Get the existing pin counter of the chunk
......@@ -416,7 +418,7 @@ func (db *DB) setPin(batch *leveldb.Batch, addr chunk.Address) (err error) {
// setUnpin decrements pin counter for the chunk by updating pin index.
// Provided batch is updated.
func (db *DB) setUnpin(batch *leveldb.Batch, addr chunk.Address) (err error) {
func (db *DB) setUnpin(batch *leveldb.Batch, addr swarm.Address) (err error) {
item := addressToItem(addr)
// Get the existing pin counter of the chunk
......
......@@ -21,8 +21,9 @@ import (
"testing"
"time"
"github.com/ethersphere/swarm/chunk"
tagtesting "github.com/ethersphere/swarm/chunk/testing"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/tags"
tagtesting "github.com/ethersphere/bee/pkg/tags/testing"
"github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
......@@ -41,7 +42,7 @@ func TestModeSetAccess(t *testing.T) {
return wantTimestamp
})()
err := db.Set(context.Background(), chunk.ModeSetAccess, chunkAddresses(chunks)...)
err := db.Set(context.Background(), storage.ModeSetAccess, chunkAddresses(chunks)...)
if err != nil {
t.Fatal(err)
}
......@@ -69,7 +70,7 @@ func TestModeSetAccess(t *testing.T) {
// as a result we should expect the tag value to remain in the pull index
// and we expect that the tag should not be incremented by pull sync set
func TestModeSetSyncPullNormalTag(t *testing.T) {
db, cleanupFunc := newTestDB(t, &Options{Tags: chunk.NewTags()})
db, cleanupFunc := newTestDB(t, &Options{Tags: tags.NewTags()})
defer cleanupFunc()
tag, err := db.tags.Create("test", 1, false)
......@@ -78,15 +79,15 @@ func TestModeSetSyncPullNormalTag(t *testing.T) {
}
ch := generateTestRandomChunk().WithTagID(tag.Uid)
_, err = db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err = db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
tag.Inc(chunk.StateStored) // so we don't get an error on tag.Status later on
tag.Inc(tags.StateStored) // so we don't get an error on tag.Status later on
item, err := db.pullIndex.Get(shed.Item{
Address: ch.Address(),
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
......@@ -97,13 +98,13 @@ func TestModeSetSyncPullNormalTag(t *testing.T) {
t.Fatalf("unexpected tag id value got %d want %d", item.Tag, tag.Uid)
}
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
item, err = db.pullIndex.Get(shed.Item{
Address: ch.Address(),
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
......@@ -123,7 +124,7 @@ func TestModeSetSyncPullNormalTag(t *testing.T) {
// TestModeSetSyncPullAnonymousTag checks that pull sync correcly increments
// counters on an anonymous tag which is expected to be handled only by pull sync
func TestModeSetSyncPullAnonymousTag(t *testing.T) {
db, cleanupFunc := newTestDB(t, &Options{Tags: chunk.NewTags()})
db, cleanupFunc := newTestDB(t, &Options{Tags: tags.NewTags()})
defer cleanupFunc()
tag, err := db.tags.Create("test", 1, true)
......@@ -132,14 +133,14 @@ func TestModeSetSyncPullAnonymousTag(t *testing.T) {
}
ch := generateTestRandomChunk().WithTagID(tag.Uid)
_, err = db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err = db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
tag.Inc(chunk.StateStored) // so we don't get an error on tag.Status later on
tag.Inc(tags.StateStored) // so we don't get an error on tag.Status later on
item, err := db.pullIndex.Get(shed.Item{
Address: ch.Address(),
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
......@@ -150,13 +151,13 @@ func TestModeSetSyncPullAnonymousTag(t *testing.T) {
t.Fatalf("unexpected tag id value got %d want %d", item.Tag, tag.Uid)
}
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
item, err = db.pullIndex.Get(shed.Item{
Address: ch.Address(),
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
......@@ -175,7 +176,7 @@ func TestModeSetSyncPullAnonymousTag(t *testing.T) {
// then tries to Set both with push and pull Sync modes, but asserts that only the pull sync
// increments were done to the tag
func TestModeSetSyncPullPushAnonymousTag(t *testing.T) {
db, cleanupFunc := newTestDB(t, &Options{Tags: chunk.NewTags()})
db, cleanupFunc := newTestDB(t, &Options{Tags: tags.NewTags()})
defer cleanupFunc()
tag, err := db.tags.Create("test", 1, true)
......@@ -184,14 +185,14 @@ func TestModeSetSyncPullPushAnonymousTag(t *testing.T) {
}
ch := generateTestRandomChunk().WithTagID(tag.Uid)
_, err = db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err = db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
tag.Inc(chunk.StateStored) // so we don't get an error on tag.Status later on
tag.Inc(tags.StateStored) // so we don't get an error on tag.Status later on
item, err := db.pullIndex.Get(shed.Item{
Address: ch.Address(),
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
......@@ -202,21 +203,21 @@ func TestModeSetSyncPullPushAnonymousTag(t *testing.T) {
t.Fatalf("unexpected tag id value got %d want %d", item.Tag, tag.Uid)
}
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
// expect no error here. if the item cannot be found in pushsync the rest of the
// setSync logic should be executed
err = db.Set(context.Background(), chunk.ModeSetSyncPush, ch.Address())
err = db.Set(context.Background(), storage.ModeSetSyncPush, ch.Address())
if err != nil {
t.Fatal(err)
}
// check that the tag has been incremented
item, err = db.pullIndex.Get(shed.Item{
Address: ch.Address(),
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
......@@ -232,7 +233,7 @@ func TestModeSetSyncPullPushAnonymousTag(t *testing.T) {
// verify that the item does not exist in the push index
item, err = db.pushIndex.Get(shed.Item{
Address: ch.Address(),
Address: ch.Address().Bytes(),
BinID: 1,
})
if err == nil {
......@@ -244,7 +245,7 @@ func TestModeSetSyncPullPushAnonymousTag(t *testing.T) {
// correctly on a normal tag (that is, a tag that is expected to show progress bars
// according to push sync progress)
func TestModeSetSyncPushNormalTag(t *testing.T) {
db, cleanupFunc := newTestDB(t, &Options{Tags: chunk.NewTags()})
db, cleanupFunc := newTestDB(t, &Options{Tags: tags.NewTags()})
defer cleanupFunc()
tag, err := db.tags.Create("test", 1, false)
......@@ -253,14 +254,14 @@ func TestModeSetSyncPushNormalTag(t *testing.T) {
}
ch := generateTestRandomChunk().WithTagID(tag.Uid)
_, err = db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err = db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
tag.Inc(chunk.StateStored) // so we don't get an error on tag.Status later on
tag.Inc(tags.StateStored) // so we don't get an error on tag.Status later on
item, err := db.pullIndex.Get(shed.Item{
Address: ch.Address(),
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
......@@ -272,13 +273,13 @@ func TestModeSetSyncPushNormalTag(t *testing.T) {
tagtesting.CheckTag(t, tag, 0, 1, 0, 0, 0, 1)
err = db.Set(context.Background(), chunk.ModeSetSyncPush, ch.Address())
err = db.Set(context.Background(), storage.ModeSetSyncPush, ch.Address())
if err != nil {
t.Fatal(err)
}
item, err = db.pullIndex.Get(shed.Item{
Address: ch.Address(),
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
......@@ -292,13 +293,13 @@ func TestModeSetSyncPushNormalTag(t *testing.T) {
tagtesting.CheckTag(t, tag, 0, 1, 0, 0, 1, 1)
// call pull sync set, expect no changes
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
item, err = db.pullIndex.Get(shed.Item{
Address: ch.Address(),
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
......@@ -321,12 +322,12 @@ func TestModeSetRemove(t *testing.T) {
chunks := generateTestRandomChunks(tc.count)
_, err := db.Put(context.Background(), chunk.ModePutUpload, chunks...)
_, err := db.Put(context.Background(), storage.ModePutUpload, chunks...)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), chunk.ModeSetRemove, chunkAddresses(chunks)...)
err = db.Set(context.Background(), storage.ModeSetRemove, chunkAddresses(chunks)...)
if err != nil {
t.Fatal(err)
}
......
......@@ -21,7 +21,8 @@ import (
"strconv"
"testing"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
// BenchmarkRetrievalIndexes uploads a number of chunks in order to measure
......@@ -62,10 +63,10 @@ func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) {
b.StopTimer()
db, cleanupFunc := newTestDB(b, o)
defer cleanupFunc()
addrs := make([]chunk.Address, count)
addrs := make([]swarm.Address, count)
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
b.Fatal(err)
}
......@@ -83,12 +84,12 @@ func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) {
b.StartTimer()
for i := 0; i < count; i++ {
err := db.Set(context.Background(), chunk.ModeSetSyncPull, addrs[i])
err := db.Set(context.Background(), storage.ModeSetSyncPull, addrs[i])
if err != nil {
b.Fatal(err)
}
_, err = db.Get(context.Background(), chunk.ModeGetRequest, addrs[i])
_, err = db.Get(context.Background(), storage.ModeGetRequest, addrs[i])
if err != nil {
b.Fatal(err)
}
......@@ -131,7 +132,7 @@ func benchmarkUpload(b *testing.B, o *Options, count int) {
b.StopTimer()
db, cleanupFunc := newTestDB(b, o)
defer cleanupFunc()
chunks := make([]chunk.Chunk, count)
chunks := make([]swarm.Chunk, count)
for i := 0; i < count; i++ {
chunk := generateTestRandomChunk()
chunks[i] = chunk
......@@ -139,7 +140,7 @@ func benchmarkUpload(b *testing.B, o *Options, count int) {
b.StartTimer()
for i := 0; i < count; i++ {
_, err := db.Put(context.Background(), chunk.ModePutUpload, chunks[i])
_, err := db.Put(context.Background(), storage.ModePutUpload, chunks[i])
if err != nil {
b.Fatal(err)
}
......
......@@ -22,7 +22,8 @@ import (
"sync"
"time"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)
......@@ -35,10 +36,10 @@ import (
// function will terminate current and further iterations without errors, and also close the returned channel.
// Make sure that you check the second returned parameter from the channel to stop iteration when its value
// is false.
func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) {
func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan storage.Descriptor, stop func()) {
db.metrics.SubscribePull.Inc()
chunkDescriptors := make(chan chunk.Descriptor)
chunkDescriptors := make(chan storage.Descriptor)
trigger := make(chan struct{}, 1)
db.pullTriggersMu.Lock()
......@@ -62,7 +63,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
go func() {
defer db.subscritionsWG.Done()
db.metrics.SubscribePullStop.Inc()
// close the returned chunk.Descriptor channel at the end to
// close the returned store.Descriptor channel at the end to
// signal that the subscription is done
defer close(chunkDescriptors)
// sinceItem is the Item from which the next iteration
......@@ -70,7 +71,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
var sinceItem *shed.Item
if since > 0 {
sinceItem = &shed.Item{
Address: db.addressInBin(bin),
Address: db.addressInBin(bin).Bytes(),
BinID: since,
}
}
......@@ -93,8 +94,8 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
return true, errStopSubscription
}
select {
case chunkDescriptors <- chunk.Descriptor{
Address: item.Address,
case chunkDescriptors <- storage.Descriptor{
Address: swarm.NewAddress(item.Address),
BinID: item.BinID,
}:
if until > 0 && item.BinID == until {
......@@ -216,9 +217,9 @@ func (db *DB) triggerPullSubscriptions(bin uint8) {
// addressInBin returns an address that is in a specific
// proximity order bin from database base key.
func (db *DB) addressInBin(bin uint8) (addr chunk.Address) {
addr = append([]byte(nil), db.baseKey...)
func (db *DB) addressInBin(bin uint8) swarm.Address {
addr := append([]byte(nil), db.baseKey...)
b := bin / 8
addr[b] = addr[b] ^ (1 << (7 - bin%8))
return addr
return swarm.NewAddress(addr)
}
......@@ -22,7 +22,7 @@ import (
"sync"
"time"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/swarm/shed"
)
......@@ -30,10 +30,10 @@ import (
// Returned stop function will terminate current and further iterations, and also it will close
// the returned channel without any errors. Make sure that you check the second returned parameter
// from the channel to stop iteration when its value is false.
func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop func()) {
func (db *DB) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop func()) {
db.metrics.SubscribePush.Inc()
chunks := make(chan chunk.Chunk)
chunks := make(chan swarm.Chunk)
trigger := make(chan struct{}, 1)
db.pushTriggersMu.Lock()
......@@ -75,7 +75,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
}
select {
case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data).WithTagID(item.Tag):
case chunks <- swarm.NewChunk(swarm.NewAddress(dataItem.Address), dataItem.Data).WithTagID(item.Tag):
count++
// set next iteration start item
// when its chunk is successfully sent to channel
......
......@@ -24,7 +24,8 @@ import (
"testing"
"time"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
// TestDB_SubscribePush uploads some chunks before and after
......@@ -34,7 +35,7 @@ func TestDB_SubscribePush(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
chunks := make([]chunk.Chunk, 0)
chunks := make([]swarm.Chunk, 0)
var chunksMu sync.Mutex
uploadRandomChunks := func(count int) {
......@@ -44,7 +45,7 @@ func TestDB_SubscribePush(t *testing.T) {
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
......@@ -84,8 +85,8 @@ func TestDB_SubscribePush(t *testing.T) {
if !bytes.Equal(got.Data(), want.Data()) {
err = fmt.Errorf("got chunk %v data %x, want %x", i, got.Data(), want.Data())
}
if !bytes.Equal(got.Address(), want.Address()) {
err = fmt.Errorf("got chunk %v address %s, want %s", i, got.Address().Hex(), want.Address().Hex())
if !got.Address().Equal(want.Address()) {
err = fmt.Errorf("got chunk %v address %s, want %s", i, got.Address(), want.Address())
}
i++
// send one and only one error per received address
......@@ -120,7 +121,7 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
addrs := make([]chunk.Address, 0)
addrs := make([]swarm.Address, 0)
var addrsMu sync.Mutex
uploadRandomChunks := func(count int) {
......@@ -130,7 +131,7 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
......@@ -172,7 +173,7 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
want := addrs[i]
addrsMu.Unlock()
var err error
if !bytes.Equal(got.Address(), want) {
if !got.Address().Equal(want) {
err = fmt.Errorf("got chunk %v address on subscription %v %s, want %s", i, j, got, want)
}
i++
......
......@@ -7,6 +7,8 @@ package storage
import (
"context"
"errors"
"fmt"
"io"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -19,11 +21,131 @@ var (
// ChunkValidatorFunc validates Swarm chunk address and chunk data
type ChunkValidatorFunc func(swarm.Address, []byte) (valid bool)
// ModeGet enumerates different Getter modes.
type ModeGet int
func (m ModeGet) String() string {
switch m {
case ModeGetRequest:
return "Request"
case ModeGetSync:
return "Sync"
case ModeGetLookup:
return "Lookup"
case ModeGetPin:
return "PinLookup"
default:
return "Unknown"
}
}
// Getter modes.
const (
// ModeGetRequest: when accessed for retrieval
ModeGetRequest ModeGet = iota
// ModeGetSync: when accessed for syncing or proof of custody request
ModeGetSync
// ModeGetLookup: when accessed to lookup a a chunk in feeds or other places
ModeGetLookup
// ModeGetPin: used when a pinned chunk is accessed
ModeGetPin
)
// ModePut enumerates different Putter modes.
type ModePut int
func (m ModePut) String() string {
switch m {
case ModePutRequest:
return "Request"
case ModePutSync:
return "Sync"
case ModePutUpload:
return "Upload"
default:
return "Unknown"
}
}
// Putter modes.
const (
// ModePutRequest: when a chunk is received as a result of retrieve request and delivery
ModePutRequest ModePut = iota
// ModePutSync: when a chunk is received via syncing
ModePutSync
// ModePutUpload: when a chunk is created by local upload
ModePutUpload
)
// ModeSet enumerates different Setter modes.
type ModeSet int
func (m ModeSet) String() string {
switch m {
case ModeSetAccess:
return "Access"
case ModeSetSyncPush:
return "SyncPush"
case ModeSetSyncPull:
return "SyncPull"
case ModeSetRemove:
return "Remove"
case ModeSetPin:
return "ModeSetPin"
case ModeSetUnpin:
return "ModeSetUnpin"
default:
return "Unknown"
}
}
// Setter modes.
const (
// ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
ModeSetAccess ModeSet = iota
// ModeSetSyncPush: when a push sync receipt is received for a chunk
ModeSetSyncPush
// ModeSetSyncPull: when a chunk is added to a pull sync batch
ModeSetSyncPull
// ModeSetRemove: when a chunk is removed
ModeSetRemove
// ModeSetPin: when a chunk is pinned during upload or separately
ModeSetPin
// ModeSetUnpin: when a chunk is unpinned using a command locally
ModeSetUnpin
)
// Descriptor holds information required for Pull syncing. This struct
// is provided by subscribing to pull index.
type Descriptor struct {
Address swarm.Address
BinID uint64
}
func (d *Descriptor) String() string {
if d == nil {
return ""
}
return fmt.Sprintf("%s bin id %v", d.Address, d.BinID)
}
type Storer interface {
Get(ctx context.Context, addr swarm.Address) (data []byte, err error)
Put(ctx context.Context, addr swarm.Address, data []byte) (err error)
}
type Store interface {
Get(ctx context.Context, mode ModeGet, addr swarm.Address) (ch swarm.Chunk, err error)
GetMulti(ctx context.Context, mode ModeGet, addrs ...swarm.Address) (ch []swarm.Chunk, err error)
Put(ctx context.Context, mode ModePut, chs ...swarm.Chunk) (exist []bool, err error)
Has(ctx context.Context, addr swarm.Address) (yes bool, err error)
HasMulti(ctx context.Context, addrs ...swarm.Address) (yes []bool, err error)
Set(ctx context.Context, mode ModeSet, addrs ...swarm.Address) (err error)
LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan Descriptor, stop func())
io.Closer
}
// StateStorer defines methods required to get, set, delete values for different keys
// and close the underlying resources.
type StateStorer interface {
......
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package testing
import (
"math/rand"
"time"
"github.com/ethersphere/bee/pkg/swarm"
)
func init() {
// needed for GenerateTestRandomChunk
rand.Seed(time.Now().UnixNano())
}
// GenerateTestRandomChunk generates a Chunk that is not
// valid, but it contains a random key and a random value.
// This function is faster then storage.GenerateRandomChunk
// which generates a valid chunk.
// Some tests in do not need valid chunks, just
// random data, and their execution time can be decreased
// using this function.
func GenerateTestRandomChunk() swarm.Chunk {
data := make([]byte, swarm.ChunkSize)
rand.Read(data)
key := make([]byte, 32)
rand.Read(key)
return swarm.NewChunk(swarm.NewAddress(key), data)
}
// GenerateTestRandomChunks generates a slice of random
// Chunks by using GenerateTestRandomChunk function.
func GenerateTestRandomChunks(count int) []swarm.Chunk {
chunks := make([]swarm.Chunk, count)
for i := 0; i < count; i++ {
chunks[i] = GenerateTestRandomChunk()
}
return chunks
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package swarm
// Proximity returns the proximity order of the MSB distance between x and y
//
// The distance metric MSB(x, y) of two equal length byte sequences x an y is the
// value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed.
// the binary cast is big endian: most significant bit first (=MSB).
//
// Proximity(x, y) is a discrete logarithmic scaling of the MSB distance.
// It is defined as the reverse rank of the integer part of the base 2
// logarithm of the distance.
// It is calculated by counting the number of common leading zeros in the (MSB)
// binary representation of the x^y.
//
// (0 farthest, 255 closest, 256 self)
func Proximity(one, other []byte) (ret int) {
b := (MaxPO-1)/8 + 1
if b > len(one) {
b = len(one)
}
m := 8
for i := 0; i < b; i++ {
oxo := one[i] ^ other[i]
for j := 0; j < m; j++ {
if (oxo>>uint8(7-j))&0x01 != 0 {
return i*8 + j
}
}
}
return MaxPO
}
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package swarm
import (
"strconv"
"testing"
)
// TestProximity validates Proximity function with explicit
// values in a table-driven test. It is highly dependant on
// MaxPO constant and it validates cases up to MaxPO=32.
func TestProximity(t *testing.T) {
// integer from base2 encoded string
bx := func(s string) uint8 {
i, err := strconv.ParseUint(s, 2, 8)
if err != nil {
t.Fatal(err)
}
return uint8(i)
}
// adjust expected bins in respect to MaxPO
limitPO := func(po uint8) uint8 {
if po > MaxPO {
return MaxPO
}
return po
}
base := []byte{bx("00000000"), bx("00000000"), bx("00000000"), bx("00000000")}
for _, tc := range []struct {
addr []byte
po uint8
}{
{
addr: base,
po: MaxPO,
},
{
addr: []byte{bx("10000000"), bx("00000000"), bx("00000000"), bx("00000000")},
po: limitPO(0),
},
{
addr: []byte{bx("01000000"), bx("00000000"), bx("00000000"), bx("00000000")},
po: limitPO(1),
},
{
addr: []byte{bx("00100000"), bx("00000000"), bx("00000000"), bx("00000000")},
po: limitPO(2),
},
{
addr: []byte{bx("00010000"), bx("00000000"), bx("00000000"), bx("00000000")},
po: limitPO(3),
},
{
addr: []byte{bx("00001000"), bx("00000000"), bx("00000000"), bx("00000000")},
po: limitPO(4),
},
{
addr: []byte{bx("00000100"), bx("00000000"), bx("00000000"), bx("00000000")},
po: limitPO(5),
},
{
addr: []byte{bx("00000010"), bx("00000000"), bx("00000000"), bx("00000000")},
po: limitPO(6),
},
{
addr: []byte{bx("00000001"), bx("00000000"), bx("00000000"), bx("00000000")},
po: limitPO(7),
},
{
addr: []byte{bx("00000000"), bx("10000000"), bx("00000000"), bx("00000000")},
po: limitPO(8),
},
{
addr: []byte{bx("00000000"), bx("01000000"), bx("00000000"), bx("00000000")},
po: limitPO(9),
},
{
addr: []byte{bx("00000000"), bx("00100000"), bx("00000000"), bx("00000000")},
po: limitPO(10),
},
{
addr: []byte{bx("00000000"), bx("00010000"), bx("00000000"), bx("00000000")},
po: limitPO(11),
},
{
addr: []byte{bx("00000000"), bx("00001000"), bx("00000000"), bx("00000000")},
po: limitPO(12),
},
{
addr: []byte{bx("00000000"), bx("00000100"), bx("00000000"), bx("00000000")},
po: limitPO(13),
},
{
addr: []byte{bx("00000000"), bx("00000010"), bx("00000000"), bx("00000000")},
po: limitPO(14),
},
{
addr: []byte{bx("00000000"), bx("00000001"), bx("00000000"), bx("00000000")},
po: limitPO(15),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("10000000"), bx("00000000")},
po: limitPO(16),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("01000000"), bx("00000000")},
po: limitPO(17),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("00100000"), bx("00000000")},
po: limitPO(18),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("00010000"), bx("00000000")},
po: limitPO(19),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("00001000"), bx("00000000")},
po: limitPO(20),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("00000100"), bx("00000000")},
po: limitPO(21),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("00000010"), bx("00000000")},
po: limitPO(22),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("00000001"), bx("00000000")},
po: limitPO(23),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("00000000"), bx("10000000")},
po: limitPO(24),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("00000000"), bx("01000000")},
po: limitPO(25),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("00000000"), bx("00100000")},
po: limitPO(26),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("00000000"), bx("00010000")},
po: limitPO(27),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("00000000"), bx("00001000")},
po: limitPO(28),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("00000000"), bx("00000100")},
po: limitPO(29),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("00000000"), bx("00000010")},
po: limitPO(30),
},
{
addr: []byte{bx("00000000"), bx("00000000"), bx("00000000"), bx("00000001")},
po: limitPO(31),
},
} {
got := uint8(Proximity(base, tc.addr))
if got != tc.po {
t.Errorf("got %v bin, want %v", got, tc.po)
}
}
}
......@@ -9,6 +9,12 @@ import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
)
const (
ChunkSize = 4096
MaxPO = 16
)
// Address represents an address in Swarm metric space of
......@@ -83,3 +89,56 @@ func (a Address) MarshalJSON() ([]byte, error) {
// ZeroAddress is the address that has no value.
var ZeroAddress = NewAddress(nil)
type Chunk interface {
Address() Address
Data() []byte
PinCounter() uint64
WithPinCounter(p uint64) Chunk
TagID() uint32
WithTagID(t uint32) Chunk
}
type chunk struct {
addr Address
sdata []byte
pinCounter uint64
tagID uint32
}
func NewChunk(addr Address, data []byte) Chunk {
return &chunk{
addr: addr,
sdata: data,
}
}
func (c *chunk) WithPinCounter(p uint64) Chunk {
c.pinCounter = p
return c
}
func (c *chunk) WithTagID(t uint32) Chunk {
c.tagID = t
return c
}
func (c *chunk) Address() Address {
return c.addr
}
func (c *chunk) Data() []byte {
return c.sdata
}
func (c *chunk) PinCounter() uint64 {
return c.pinCounter
}
func (c *chunk) TagID() uint32 {
return c.tagID
}
func (self *chunk) String() string {
return fmt.Sprintf("Address: %v Chunksize: %v", self.addr.String(), len(self.sdata))
}
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package tags
import (
"context"
"encoding/binary"
"errors"
"sync"
"sync/atomic"
"time"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/swarm/spancontext"
"github.com/opentracing/opentracing-go"
)
var (
errExists = errors.New("already exists")
errNA = errors.New("not available yet")
errNoETA = errors.New("unable to calculate ETA")
errTagNotFound = errors.New("tag not found")
)
// State is the enum type for chunk states
type State = uint32
const (
StateSplit State = iota // chunk has been processed by filehasher/swarm safe call
StateStored // chunk stored locally
StateSeen // chunk previously seen
StateSent // chunk sent to neighbourhood
StateSynced // proof is received; chunk removed from sync db; chunk is available everywhere
)
// Tag represents info on the status of new chunks
type Tag struct {
Total int64 // total chunks belonging to a tag
Split int64 // number of chunks already processed by splitter for hashing
Seen int64 // number of chunks already seen
Stored int64 // number of chunks already stored locally
Sent int64 // number of chunks sent for push syncing
Synced int64 // number of chunks synced with proof
Uid uint32 // a unique identifier for this tag
Anonymous bool // indicates if the tag is anonymous (i.e. if only pull sync should be used)
Name string // a name tag for this tag
Address swarm.Address // the associated swarm hash for this tag
StartedAt time.Time // tag started to calculate ETA
// end-to-end tag tracing
ctx context.Context // tracing context
span opentracing.Span // tracing root span
spanOnce sync.Once // make sure we close root span only once
}
// NewTag creates a new tag, and returns it
func NewTag(uid uint32, s string, total int64, anon bool) *Tag {
t := &Tag{
Uid: uid,
Anonymous: anon,
Name: s,
StartedAt: time.Now(),
Total: total,
}
// context here is used only to store the root span `new.upload.tag` within Tag,
// we don't need any type of ctx Deadline or cancellation for this particular ctx
t.ctx, t.span = spancontext.StartSpan(context.Background(), "new.upload.tag")
return t
}
// Context accessor
func (t *Tag) Context() context.Context {
return t.ctx
}
// FinishRootSpan closes the pushsync span of the tags
func (t *Tag) FinishRootSpan() {
t.spanOnce.Do(func() {
t.span.Finish()
})
}
// IncN increments the count for a state
func (t *Tag) IncN(state State, n int) {
var v *int64
switch state {
case StateSplit:
v = &t.Split
case StateStored:
v = &t.Stored
case StateSeen:
v = &t.Seen
case StateSent:
v = &t.Sent
case StateSynced:
v = &t.Synced
}
atomic.AddInt64(v, int64(n))
}
// Inc increments the count for a state
func (t *Tag) Inc(state State) {
t.IncN(state, 1)
}
// Get returns the count for a state on a tag
func (t *Tag) Get(state State) int64 {
var v *int64
switch state {
case StateSplit:
v = &t.Split
case StateStored:
v = &t.Stored
case StateSeen:
v = &t.Seen
case StateSent:
v = &t.Sent
case StateSynced:
v = &t.Synced
}
return atomic.LoadInt64(v)
}
// GetTotal returns the total count
func (t *Tag) TotalCounter() int64 {
return atomic.LoadInt64(&t.Total)
}
// WaitTillDone returns without error once the tag is complete
// wrt the state given as argument
// it returns an error if the context is done
func (t *Tag) WaitTillDone(ctx context.Context, s State) error {
if t.Done(s) {
return nil
}
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
if t.Done(s) {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}
// Done returns true if tag is complete wrt the state given as argument
func (t *Tag) Done(s State) bool {
n, total, err := t.Status(s)
return err == nil && n == total
}
// DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag
// is meant to be called when splitter finishes for input streams of unknown size
func (t *Tag) DoneSplit(address swarm.Address) int64 {
total := atomic.LoadInt64(&t.Split)
atomic.StoreInt64(&t.Total, total)
t.Address = address
return total
}
// Status returns the value of state and the total count
func (t *Tag) Status(state State) (int64, int64, error) {
count, seen, total := t.Get(state), atomic.LoadInt64(&t.Seen), atomic.LoadInt64(&t.Total)
if total == 0 {
return count, total, errNA
}
switch state {
case StateSplit, StateStored, StateSeen:
return count, total, nil
case StateSent, StateSynced:
stored := atomic.LoadInt64(&t.Stored)
if stored < total {
return count, total - seen, errNA
}
return count, total - seen, nil
}
return count, total, errNA
}
// ETA returns the time of completion estimated based on time passed and rate of completion
func (t *Tag) ETA(state State) (time.Time, error) {
cnt, total, err := t.Status(state)
if err != nil {
return time.Time{}, err
}
if cnt == 0 || total == 0 {
return time.Time{}, errNoETA
}
diff := time.Since(t.StartedAt)
dur := time.Duration(total) * diff / time.Duration(cnt)
return t.StartedAt.Add(dur), nil
}
// MarshalBinary marshals the tag into a byte slice
func (tag *Tag) MarshalBinary() (data []byte, err error) {
buffer := make([]byte, 4)
binary.BigEndian.PutUint32(buffer, tag.Uid)
encodeInt64Append(&buffer, tag.Total)
encodeInt64Append(&buffer, tag.Split)
encodeInt64Append(&buffer, tag.Seen)
encodeInt64Append(&buffer, tag.Stored)
encodeInt64Append(&buffer, tag.Sent)
encodeInt64Append(&buffer, tag.Synced)
intBuffer := make([]byte, 8)
n := binary.PutVarint(intBuffer, tag.StartedAt.Unix())
buffer = append(buffer, intBuffer[:n]...)
n = binary.PutVarint(intBuffer, int64(len(tag.Address.Bytes())))
buffer = append(buffer, intBuffer[:n]...)
buffer = append(buffer, tag.Address.Bytes()...)
buffer = append(buffer, []byte(tag.Name)...)
return buffer, nil
}
// UnmarshalBinary unmarshals a byte slice into a tag
func (tag *Tag) UnmarshalBinary(buffer []byte) error {
if len(buffer) < 13 {
return errors.New("buffer too short")
}
tag.Uid = binary.BigEndian.Uint32(buffer)
buffer = buffer[4:]
tag.Total = decodeInt64Splice(&buffer)
tag.Split = decodeInt64Splice(&buffer)
tag.Seen = decodeInt64Splice(&buffer)
tag.Stored = decodeInt64Splice(&buffer)
tag.Sent = decodeInt64Splice(&buffer)
tag.Synced = decodeInt64Splice(&buffer)
t, n := binary.Varint(buffer)
tag.StartedAt = time.Unix(t, 0)
buffer = buffer[n:]
t, n = binary.Varint(buffer)
buffer = buffer[n:]
if t > 0 {
tag.Address = swarm.NewAddress(buffer[:t])
}
tag.Name = string(buffer[t:])
return nil
}
func encodeInt64Append(buffer *[]byte, val int64) {
intBuffer := make([]byte, 8)
n := binary.PutVarint(intBuffer, val)
*buffer = append(*buffer, intBuffer[:n]...)
}
func decodeInt64Splice(buffer *[]byte) int64 {
val, n := binary.Varint((*buffer))
*buffer = (*buffer)[n:]
return val
}
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package tags
import (
"sync"
"testing"
"time"
"github.com/ethersphere/bee/pkg/swarm"
)
var (
allStates = []State{StateSplit, StateStored, StateSeen, StateSent, StateSynced}
)
// TestTagSingleIncrements tests if Inc increments the tag state value
func TestTagSingleIncrements(t *testing.T) {
tg := &Tag{Total: 10}
tc := []struct {
state uint32
inc int
expcount int64
exptotal int64
}{
{state: StateSplit, inc: 10, expcount: 10, exptotal: 10},
{state: StateStored, inc: 9, expcount: 9, exptotal: 9},
{state: StateSeen, inc: 1, expcount: 1, exptotal: 10},
{state: StateSent, inc: 9, expcount: 9, exptotal: 9},
{state: StateSynced, inc: 9, expcount: 9, exptotal: 9},
}
for _, tc := range tc {
for i := 0; i < tc.inc; i++ {
tg.Inc(tc.state)
}
}
for _, tc := range tc {
if tg.Get(tc.state) != tc.expcount {
t.Fatalf("not incremented")
}
}
}
// TestTagStatus is a unit test to cover Tag.Status method functionality
func TestTagStatus(t *testing.T) {
tg := &Tag{Total: 10}
tg.Inc(StateSeen)
tg.Inc(StateSent)
tg.Inc(StateSynced)
for i := 0; i < 10; i++ {
tg.Inc(StateSplit)
tg.Inc(StateStored)
}
for _, v := range []struct {
state State
expVal int64
expTotal int64
}{
{state: StateStored, expVal: 10, expTotal: 10},
{state: StateSplit, expVal: 10, expTotal: 10},
{state: StateSeen, expVal: 1, expTotal: 10},
{state: StateSent, expVal: 1, expTotal: 9},
{state: StateSynced, expVal: 1, expTotal: 9},
} {
val, total, err := tg.Status(v.state)
if err != nil {
t.Fatal(err)
}
if val != v.expVal {
t.Fatalf("should be %d, got %d", v.expVal, val)
}
if total != v.expTotal {
t.Fatalf("expected Total to be %d, got %d", v.expTotal, total)
}
}
}
// tests ETA is precise
func TestTagETA(t *testing.T) {
now := time.Now()
maxDiff := 100000 // 100 microsecond
tg := &Tag{Total: 10, StartedAt: now}
time.Sleep(100 * time.Millisecond)
tg.Inc(StateSplit)
eta, err := tg.ETA(StateSplit)
if err != nil {
t.Fatal(err)
}
diff := time.Until(eta) - 9*time.Since(now)
if int(diff) > maxDiff {
t.Fatalf("ETA is not precise, got diff %v > .1ms", diff)
}
}
// TestTagConcurrentIncrements tests Inc calls concurrently
func TestTagConcurrentIncrements(t *testing.T) {
tg := &Tag{}
n := 1000
wg := sync.WaitGroup{}
wg.Add(5 * n)
for _, f := range allStates {
go func(f State) {
for j := 0; j < n; j++ {
go func() {
tg.Inc(f)
wg.Done()
}()
}
}(f)
}
wg.Wait()
for _, f := range allStates {
v := tg.Get(f)
if v != int64(n) {
t.Fatalf("expected state %v to be %v, got %v", f, n, v)
}
}
}
// TestTagsMultipleConcurrentIncrements tests Inc calls concurrently
func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) {
ts := NewTags()
n := 100
wg := sync.WaitGroup{}
wg.Add(10 * 5 * n)
for i := 0; i < 10; i++ {
s := string([]byte{uint8(i)})
tag, err := ts.Create(s, int64(n), false)
if err != nil {
t.Fatal(err)
}
for _, f := range allStates {
go func(tag *Tag, f State) {
for j := 0; j < n; j++ {
go func() {
tag.Inc(f)
wg.Done()
}()
}
}(tag, f)
}
}
wg.Wait()
i := 0
ts.Range(func(k, v interface{}) bool {
i++
uid := k.(uint32)
for _, f := range allStates {
tag, err := ts.Get(uid)
if err != nil {
t.Fatal(err)
}
stateVal := tag.Get(f)
if stateVal != int64(n) {
t.Fatalf("expected tag %v state %v to be %v, got %v", uid, f, n, v)
}
}
return true
})
if i != 10 {
t.Fatal("not enough tagz")
}
}
// TestMarshallingWithAddr tests that marshalling and unmarshalling is done correctly when the
// tag Address (byte slice) contains some arbitrary value
func TestMarshallingWithAddr(t *testing.T) {
tg := NewTag(111, "test/tag", 10, false)
tg.Address = swarm.NewAddress([]byte{0, 1, 2, 3, 4, 5, 6})
for _, f := range allStates {
tg.Inc(f)
}
b, err := tg.MarshalBinary()
if err != nil {
t.Fatal(err)
}
unmarshalledTag := &Tag{}
err = unmarshalledTag.UnmarshalBinary(b)
if err != nil {
t.Fatal(err)
}
if unmarshalledTag.Uid != tg.Uid {
t.Fatalf("tag uids not equal. want %d got %d", tg.Uid, unmarshalledTag.Uid)
}
if unmarshalledTag.Name != tg.Name {
t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name)
}
if unmarshalledTag.Anonymous != tg.Anonymous {
t.Fatalf("tag anon field not equal. want %t got %t", tg.Anonymous, unmarshalledTag.Anonymous)
}
for _, state := range allStates {
uv, tv := unmarshalledTag.Get(state), tg.Get(state)
if uv != tv {
t.Fatalf("state %d inconsistent. expected %d to equal %d", state, uv, tv)
}
}
if unmarshalledTag.TotalCounter() != tg.TotalCounter() {
t.Fatalf("tag names not equal. want %d got %d", tg.TotalCounter(), unmarshalledTag.TotalCounter())
}
if len(unmarshalledTag.Address.Bytes()) != len(tg.Address.Bytes()) {
t.Fatalf("tag addresses length mismatch, want %d, got %d", len(tg.Address.Bytes()), len(unmarshalledTag.Address.Bytes()))
}
if !unmarshalledTag.Address.Equal(tg.Address) {
t.Fatalf("expected tag address to be %v got %v", unmarshalledTag.Address, tg.Address)
}
}
// TestMarshallingNoAddress tests that marshalling and unmarshalling is done correctly
func TestMarshallingNoAddr(t *testing.T) {
tg := NewTag(111, "test/tag", 10, false)
for _, f := range allStates {
tg.Inc(f)
}
b, err := tg.MarshalBinary()
if err != nil {
t.Fatal(err)
}
unmarshalledTag := &Tag{}
err = unmarshalledTag.UnmarshalBinary(b)
if err != nil {
t.Fatal(err)
}
if unmarshalledTag.Uid != tg.Uid {
t.Fatalf("tag uids not equal. want %d got %d", tg.Uid, unmarshalledTag.Uid)
}
if unmarshalledTag.Name != tg.Name {
t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name)
}
for _, state := range allStates {
uv, tv := unmarshalledTag.Get(state), tg.Get(state)
if uv != tv {
t.Fatalf("state %d inconsistent. expected %d to equal %d", state, uv, tv)
}
}
if unmarshalledTag.TotalCounter() != tg.TotalCounter() {
t.Fatalf("tag names not equal. want %d got %d", tg.TotalCounter(), unmarshalledTag.TotalCounter())
}
if len(unmarshalledTag.Address.Bytes()) != len(tg.Address.Bytes()) {
t.Fatalf("expected tag addresses to be equal length")
}
}
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package tags
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"time"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/swarm/sctx"
)
var (
TagUidFunc = rand.Uint32
TagNotFoundErr = errors.New("tag not found")
)
// Tags hold tag information indexed by a unique random uint32
type Tags struct {
tags *sync.Map
}
// NewTags creates a tags object
func NewTags() *Tags {
return &Tags{
tags: &sync.Map{},
}
}
// Create creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists
func (ts *Tags) Create(s string, total int64, anon bool) (*Tag, error) {
t := NewTag(TagUidFunc(), s, total, anon)
if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded {
return nil, errExists
}
return t, nil
}
// All returns all existing tags in Tags' sync.Map
// Note that tags are returned in no particular order
func (ts *Tags) All() (t []*Tag) {
ts.tags.Range(func(k, v interface{}) bool {
t = append(t, v.(*Tag))
return true
})
return t
}
// Get returns the underlying tag for the uid or an error if not found
func (ts *Tags) Get(uid uint32) (*Tag, error) {
t, ok := ts.tags.Load(uid)
if !ok {
return nil, TagNotFoundErr
}
return t.(*Tag), nil
}
// GetByAddress returns the latest underlying tag for the address or an error if not found
func (ts *Tags) GetByAddress(address swarm.Address) (*Tag, error) {
var t *Tag
var lastTime time.Time
ts.tags.Range(func(key interface{}, value interface{}) bool {
rcvdTag := value.(*Tag)
if rcvdTag.Address.Equal(address) && rcvdTag.StartedAt.After(lastTime) {
t = rcvdTag
lastTime = rcvdTag.StartedAt
}
return true
})
if t == nil {
return nil, errTagNotFound
}
return t, nil
}
// GetFromContext gets a tag from the tag uid stored in the context
func (ts *Tags) GetFromContext(ctx context.Context) (*Tag, error) {
uid := sctx.GetTag(ctx)
t, ok := ts.tags.Load(uid)
if !ok {
return nil, errTagNotFound
}
return t.(*Tag), nil
}
// Range exposes sync.Map's iterator
func (ts *Tags) Range(fn func(k, v interface{}) bool) {
ts.tags.Range(fn)
}
func (ts *Tags) Delete(k interface{}) {
ts.tags.Delete(k)
}
func (ts *Tags) MarshalJSON() (out []byte, err error) {
m := make(map[string]*Tag)
ts.Range(func(k, v interface{}) bool {
key := fmt.Sprintf("%d", k)
val := v.(*Tag)
// don't persist tags which were already done
if !val.Done(StateSynced) {
m[key] = val
}
return true
})
return json.Marshal(m)
}
func (ts *Tags) UnmarshalJSON(value []byte) error {
m := make(map[string]*Tag)
err := json.Unmarshal(value, &m)
if err != nil {
return err
}
for k, v := range m {
key, err := strconv.ParseUint(k, 10, 32)
if err != nil {
return err
}
// prevent a condition where a chunk was sent before shutdown
// and the node was turned off before the receipt was received
v.Sent = v.Synced
ts.tags.Store(key, v)
}
return err
}
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package tags
import (
"testing"
)
func TestAll(t *testing.T) {
ts := NewTags()
if _, err := ts.Create("1", 1, false); err != nil {
t.Fatal(err)
}
if _, err := ts.Create("2", 1, false); err != nil {
t.Fatal(err)
}
all := ts.All()
if len(all) != 2 {
t.Fatalf("expected length to be 2 got %d", len(all))
}
if n := all[0].TotalCounter(); n != 1 {
t.Fatalf("expected tag 0 Total to be 1 got %d", n)
}
if n := all[1].TotalCounter(); n != 1 {
t.Fatalf("expected tag 1 Total to be 1 got %d", n)
}
if _, err := ts.Create("3", 1, false); err != nil {
t.Fatal(err)
}
all = ts.All()
if len(all) != 3 {
t.Fatalf("expected length to be 3 got %d", len(all))
}
}
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.
package testing
import (
"testing"
"github.com/ethersphere/bee/pkg/tags"
)
// CheckTag checks the first tag in the api struct to be in a certain state
func CheckTag(t *testing.T, tag *tags.Tag, split, stored, seen, sent, synced, total int64) {
t.Helper()
if tag == nil {
t.Fatal("no tag found")
}
tSplit := tag.Get(tags.StateSplit)
if tSplit != split {
t.Fatalf("should have had split chunks, got %d want %d", tSplit, split)
}
tSeen := tag.Get(tags.StateSeen)
if tSeen != seen {
t.Fatalf("should have had seen chunks, got %d want %d", tSeen, seen)
}
tStored := tag.Get(tags.StateStored)
if tStored != stored {
t.Fatalf("mismatch stored chunks, got %d want %d", tStored, stored)
}
tSent := tag.Get(tags.StateSent)
if tStored != stored {
t.Fatalf("mismatch sent chunks, got %d want %d", tSent, sent)
}
tSynced := tag.Get(tags.StateSynced)
if tSynced != synced {
t.Fatalf("mismatch synced chunks, got %d want %d", tSynced, synced)
}
tTotal := tag.TotalCounter()
if tTotal != total {
t.Fatalf("mismatch total chunks, got %d want %d", tTotal, total)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment