Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
mybee
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
vicotor
mybee
Commits
70ff4fa9
Unverified
Commit
70ff4fa9
authored
May 22, 2020
by
Janoš Guljaš
Committed by
GitHub
May 22, 2020
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
annotate errors in shed (#200)
parent
5b58b937
Changes
22
Hide whitespace changes
Inline
Side-by-side
Showing
22 changed files
with
162 additions
and
209 deletions
+162
-209
gc_test.go
pkg/localstore/gc_test.go
+5
-4
localstore.go
pkg/localstore/localstore.go
+1
-1
localstore_test.go
pkg/localstore/localstore_test.go
+5
-4
mode_get.go
pkg/localstore/mode_get.go
+5
-4
mode_get_multi.go
pkg/localstore/mode_get_multi.go
+2
-1
mode_get_multi_test.go
pkg/localstore/mode_get_multi_test.go
+2
-1
mode_put.go
pkg/localstore/mode_put.go
+7
-6
mode_set.go
pkg/localstore/mode_set.go
+16
-16
mode_set_test.go
pkg/localstore/mode_set_test.go
+3
-2
pin.go
pkg/localstore/pin.go
+1
-1
pin_test.go
pkg/localstore/pin_test.go
+3
-2
subscription_pull.go
pkg/localstore/subscription_pull.go
+1
-1
db.go
pkg/shed/db.go
+5
-6
db_test.go
pkg/shed/db_test.go
+3
-7
example_store_test.go
pkg/shed/example_store_test.go
+18
-20
field_string.go
pkg/shed/field_string.go
+9
-10
field_struct.go
pkg/shed/field_struct.go
+6
-11
field_uint64.go
pkg/shed/field_uint64.go
+16
-26
index.go
pkg/shed/index.go
+28
-52
index_test.go
pkg/shed/index_test.go
+10
-13
schema.go
pkg/shed/schema.go
+4
-4
vector_uint64.go
pkg/shed/vector_uint64.go
+12
-17
No files found.
pkg/localstore/gc_test.go
View file @
70ff4fa9
...
@@ -19,6 +19,7 @@ package localstore
...
@@ -19,6 +19,7 @@ package localstore
import
(
import
(
"bytes"
"bytes"
"context"
"context"
"errors"
"io/ioutil"
"io/ioutil"
"math/rand"
"math/rand"
"os"
"os"
...
@@ -114,7 +115,7 @@ func testDBCollectGarbageWorker(t *testing.T) {
...
@@ -114,7 +115,7 @@ func testDBCollectGarbageWorker(t *testing.T) {
// the first synced chunk should be removed
// the first synced chunk should be removed
t
.
Run
(
"get the first synced chunk"
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
"get the first synced chunk"
,
func
(
t
*
testing
.
T
)
{
_
,
err
:=
db
.
Get
(
context
.
Background
(),
storage
.
ModeGetRequest
,
addrs
[
0
])
_
,
err
:=
db
.
Get
(
context
.
Background
(),
storage
.
ModeGetRequest
,
addrs
[
0
])
if
err
!=
storage
.
ErrNotFound
{
if
!
errors
.
Is
(
err
,
storage
.
ErrNotFound
)
{
t
.
Errorf
(
"got error %v, want %v"
,
err
,
storage
.
ErrNotFound
)
t
.
Errorf
(
"got error %v, want %v"
,
err
,
storage
.
ErrNotFound
)
}
}
})
})
...
@@ -122,7 +123,7 @@ func testDBCollectGarbageWorker(t *testing.T) {
...
@@ -122,7 +123,7 @@ func testDBCollectGarbageWorker(t *testing.T) {
t
.
Run
(
"only first inserted chunks should be removed"
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
"only first inserted chunks should be removed"
,
func
(
t
*
testing
.
T
)
{
for
i
:=
0
;
i
<
(
chunkCount
-
int
(
gcTarget
));
i
++
{
for
i
:=
0
;
i
<
(
chunkCount
-
int
(
gcTarget
));
i
++
{
_
,
err
:=
db
.
Get
(
context
.
Background
(),
storage
.
ModeGetRequest
,
addrs
[
i
])
_
,
err
:=
db
.
Get
(
context
.
Background
(),
storage
.
ModeGetRequest
,
addrs
[
i
])
if
err
!=
storage
.
ErrNotFound
{
if
!
errors
.
Is
(
err
,
storage
.
ErrNotFound
)
{
t
.
Errorf
(
"got error %v, want %v"
,
err
,
storage
.
ErrNotFound
)
t
.
Errorf
(
"got error %v, want %v"
,
err
,
storage
.
ErrNotFound
)
}
}
}
}
...
@@ -238,7 +239,7 @@ func TestPinGC(t *testing.T) {
...
@@ -238,7 +239,7 @@ func TestPinGC(t *testing.T) {
t
.
Run
(
"first chunks after pinned chunks should be removed"
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
"first chunks after pinned chunks should be removed"
,
func
(
t
*
testing
.
T
)
{
for
i
:=
pinChunksCount
;
i
<
(
int
(
dbCapacity
)
-
int
(
gcTarget
));
i
++
{
for
i
:=
pinChunksCount
;
i
<
(
int
(
dbCapacity
)
-
int
(
gcTarget
));
i
++
{
_
,
err
:=
db
.
Get
(
context
.
Background
(),
storage
.
ModeGetRequest
,
addrs
[
i
])
_
,
err
:=
db
.
Get
(
context
.
Background
(),
storage
.
ModeGetRequest
,
addrs
[
i
])
if
err
!=
leveldb
.
ErrNotFound
{
if
!
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
t
.
Fatal
(
err
)
t
.
Fatal
(
err
)
}
}
}
}
...
@@ -410,7 +411,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
...
@@ -410,7 +411,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// the second synced chunk should be removed
// the second synced chunk should be removed
t
.
Run
(
"get gc-ed chunk"
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
"get gc-ed chunk"
,
func
(
t
*
testing
.
T
)
{
_
,
err
:=
db
.
Get
(
context
.
Background
(),
storage
.
ModeGetRequest
,
addrs
[
1
])
_
,
err
:=
db
.
Get
(
context
.
Background
(),
storage
.
ModeGetRequest
,
addrs
[
1
])
if
err
!=
storage
.
ErrNotFound
{
if
!
errors
.
Is
(
err
,
storage
.
ErrNotFound
)
{
t
.
Errorf
(
"got error %v, want %v"
,
err
,
storage
.
ErrNotFound
)
t
.
Errorf
(
"got error %v, want %v"
,
err
,
storage
.
ErrNotFound
)
}
}
})
})
...
...
pkg/localstore/localstore.go
View file @
70ff4fa9
...
@@ -181,7 +181,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
...
@@ -181,7 +181,7 @@ func New(path string, baseKey []byte, o *Options, logger logging.Logger) (db *DB
db
.
updateGCSem
=
make
(
chan
struct
{},
maxParallelUpdateGC
)
db
.
updateGCSem
=
make
(
chan
struct
{},
maxParallelUpdateGC
)
}
}
db
.
shed
,
err
=
shed
.
NewDB
(
path
,
logger
)
db
.
shed
,
err
=
shed
.
NewDB
(
path
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
}
}
...
...
pkg/localstore/localstore_test.go
View file @
70ff4fa9
...
@@ -19,6 +19,7 @@ package localstore
...
@@ -19,6 +19,7 @@ package localstore
import
(
import
(
"bytes"
"bytes"
"context"
"context"
"errors"
"fmt"
"fmt"
"io/ioutil"
"io/ioutil"
"math/rand"
"math/rand"
...
@@ -248,7 +249,7 @@ func newRetrieveIndexesTest(db *DB, chunk swarm.Chunk, storeTimestamp, accessTim
...
@@ -248,7 +249,7 @@ func newRetrieveIndexesTest(db *DB, chunk swarm.Chunk, storeTimestamp, accessTim
// access index should not be set
// access index should not be set
wantErr
:=
leveldb
.
ErrNotFound
wantErr
:=
leveldb
.
ErrNotFound
_
,
err
=
db
.
retrievalAccessIndex
.
Get
(
addressToItem
(
chunk
.
Address
()))
_
,
err
=
db
.
retrievalAccessIndex
.
Get
(
addressToItem
(
chunk
.
Address
()))
if
err
!=
wantErr
{
if
!
errors
.
Is
(
err
,
wantErr
)
{
t
.
Errorf
(
"got error %v, want %v"
,
err
,
wantErr
)
t
.
Errorf
(
"got error %v, want %v"
,
err
,
wantErr
)
}
}
}
}
...
@@ -286,7 +287,7 @@ func newPullIndexTest(db *DB, ch swarm.Chunk, binID uint64, wantError error) fun
...
@@ -286,7 +287,7 @@ func newPullIndexTest(db *DB, ch swarm.Chunk, binID uint64, wantError error) fun
Address
:
ch
.
Address
()
.
Bytes
(),
Address
:
ch
.
Address
()
.
Bytes
(),
BinID
:
binID
,
BinID
:
binID
,
})
})
if
err
!=
wantError
{
if
!
errors
.
Is
(
err
,
wantError
)
{
t
.
Errorf
(
"got error %v, want %v"
,
err
,
wantError
)
t
.
Errorf
(
"got error %v, want %v"
,
err
,
wantError
)
}
}
if
err
==
nil
{
if
err
==
nil
{
...
@@ -305,7 +306,7 @@ func newPushIndexTest(db *DB, ch swarm.Chunk, storeTimestamp int64, wantError er
...
@@ -305,7 +306,7 @@ func newPushIndexTest(db *DB, ch swarm.Chunk, storeTimestamp int64, wantError er
Address
:
ch
.
Address
()
.
Bytes
(),
Address
:
ch
.
Address
()
.
Bytes
(),
StoreTimestamp
:
storeTimestamp
,
StoreTimestamp
:
storeTimestamp
,
})
})
if
err
!=
wantError
{
if
!
errors
.
Is
(
err
,
wantError
)
{
t
.
Errorf
(
"got error %v, want %v"
,
err
,
wantError
)
t
.
Errorf
(
"got error %v, want %v"
,
err
,
wantError
)
}
}
if
err
==
nil
{
if
err
==
nil
{
...
@@ -325,7 +326,7 @@ func newGCIndexTest(db *DB, chunk swarm.Chunk, storeTimestamp, accessTimestamp i
...
@@ -325,7 +326,7 @@ func newGCIndexTest(db *DB, chunk swarm.Chunk, storeTimestamp, accessTimestamp i
BinID
:
binID
,
BinID
:
binID
,
AccessTimestamp
:
accessTimestamp
,
AccessTimestamp
:
accessTimestamp
,
})
})
if
err
!=
wantError
{
if
!
errors
.
Is
(
err
,
wantError
)
{
t
.
Errorf
(
"got error %v, want %v"
,
err
,
wantError
)
t
.
Errorf
(
"got error %v, want %v"
,
err
,
wantError
)
}
}
if
err
==
nil
{
if
err
==
nil
{
...
...
pkg/localstore/mode_get.go
View file @
70ff4fa9
...
@@ -18,6 +18,7 @@ package localstore
...
@@ -18,6 +18,7 @@ package localstore
import
(
import
(
"context"
"context"
"errors"
"time"
"time"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/shed"
...
@@ -43,7 +44,7 @@ func (db *DB) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address)
...
@@ -43,7 +44,7 @@ func (db *DB) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address)
out
,
err
:=
db
.
get
(
mode
,
addr
)
out
,
err
:=
db
.
get
(
mode
,
addr
)
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
return
nil
,
storage
.
ErrNotFound
return
nil
,
storage
.
ErrNotFound
}
}
return
nil
,
err
return
nil
,
err
...
@@ -129,10 +130,10 @@ func (db *DB) updateGC(item shed.Item) (err error) {
...
@@ -129,10 +130,10 @@ func (db *DB) updateGC(item shed.Item) (err error) {
// update accessTimeStamp in retrieve, gc
// update accessTimeStamp in retrieve, gc
i
,
err
:=
db
.
retrievalAccessIndex
.
Get
(
item
)
i
,
err
:=
db
.
retrievalAccessIndex
.
Get
(
item
)
switch
err
{
switch
{
case
nil
:
case
err
==
nil
:
item
.
AccessTimestamp
=
i
.
AccessTimestamp
item
.
AccessTimestamp
=
i
.
AccessTimestamp
case
leveldb
.
ErrNotFound
:
case
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
:
// no chunk accesses
// no chunk accesses
default
:
default
:
return
err
return
err
...
...
pkg/localstore/mode_get_multi.go
View file @
70ff4fa9
...
@@ -18,6 +18,7 @@ package localstore
...
@@ -18,6 +18,7 @@ package localstore
import
(
import
(
"context"
"context"
"errors"
"time"
"time"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/shed"
...
@@ -42,7 +43,7 @@ func (db *DB) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm
...
@@ -42,7 +43,7 @@ func (db *DB) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm
out
,
err
:=
db
.
getMulti
(
mode
,
addrs
...
)
out
,
err
:=
db
.
getMulti
(
mode
,
addrs
...
)
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
return
nil
,
storage
.
ErrNotFound
return
nil
,
storage
.
ErrNotFound
}
}
return
nil
,
err
return
nil
,
err
...
...
pkg/localstore/mode_get_multi_test.go
View file @
70ff4fa9
...
@@ -18,6 +18,7 @@ package localstore
...
@@ -18,6 +18,7 @@ package localstore
import
(
import
(
"context"
"context"
"errors"
"reflect"
"reflect"
"testing"
"testing"
...
@@ -74,7 +75,7 @@ func TestModeGetMulti(t *testing.T) {
...
@@ -74,7 +75,7 @@ func TestModeGetMulti(t *testing.T) {
want
:=
storage
.
ErrNotFound
want
:=
storage
.
ErrNotFound
_
,
err
=
db
.
GetMulti
(
context
.
Background
(),
mode
,
append
(
addrs
,
missingChunk
.
Address
())
...
)
_
,
err
=
db
.
GetMulti
(
context
.
Background
(),
mode
,
append
(
addrs
,
missingChunk
.
Address
())
...
)
if
err
!=
want
{
if
!
errors
.
Is
(
err
,
want
)
{
t
.
Errorf
(
"got error %v, want %v"
,
err
,
want
)
t
.
Errorf
(
"got error %v, want %v"
,
err
,
want
)
}
}
})
})
...
...
pkg/localstore/mode_put.go
View file @
70ff4fa9
...
@@ -18,6 +18,7 @@ package localstore
...
@@ -18,6 +18,7 @@ package localstore
import
(
import
(
"context"
"context"
"errors"
"time"
"time"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/shed"
...
@@ -159,12 +160,12 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e
...
@@ -159,12 +160,12 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e
// Provided batch and binID map are updated.
// Provided batch and binID map are updated.
func
(
db
*
DB
)
putRequest
(
batch
*
leveldb
.
Batch
,
binIDs
map
[
uint8
]
uint64
,
item
shed
.
Item
)
(
exists
bool
,
gcSizeChange
int64
,
err
error
)
{
func
(
db
*
DB
)
putRequest
(
batch
*
leveldb
.
Batch
,
binIDs
map
[
uint8
]
uint64
,
item
shed
.
Item
)
(
exists
bool
,
gcSizeChange
int64
,
err
error
)
{
i
,
err
:=
db
.
retrievalDataIndex
.
Get
(
item
)
i
,
err
:=
db
.
retrievalDataIndex
.
Get
(
item
)
switch
err
{
switch
{
case
nil
:
case
err
==
nil
:
exists
=
true
exists
=
true
item
.
StoreTimestamp
=
i
.
StoreTimestamp
item
.
StoreTimestamp
=
i
.
StoreTimestamp
item
.
BinID
=
i
.
BinID
item
.
BinID
=
i
.
BinID
case
leveldb
.
ErrNotFound
:
case
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
:
// no chunk accesses
// no chunk accesses
exists
=
false
exists
=
false
default
:
default
:
...
@@ -318,15 +319,15 @@ func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, e
...
@@ -318,15 +319,15 @@ func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, e
item
.
BinID
=
i
.
BinID
item
.
BinID
=
i
.
BinID
}
}
i
,
err
:=
db
.
retrievalAccessIndex
.
Get
(
item
)
i
,
err
:=
db
.
retrievalAccessIndex
.
Get
(
item
)
switch
err
{
switch
{
case
nil
:
case
err
==
nil
:
item
.
AccessTimestamp
=
i
.
AccessTimestamp
item
.
AccessTimestamp
=
i
.
AccessTimestamp
err
=
db
.
gcIndex
.
DeleteInBatch
(
batch
,
item
)
err
=
db
.
gcIndex
.
DeleteInBatch
(
batch
,
item
)
if
err
!=
nil
{
if
err
!=
nil
{
return
0
,
err
return
0
,
err
}
}
gcSizeChange
--
gcSizeChange
--
case
leveldb
.
ErrNotFound
:
case
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
:
// the chunk is not accessed before
// the chunk is not accessed before
default
:
default
:
return
0
,
err
return
0
,
err
...
...
pkg/localstore/mode_set.go
View file @
70ff4fa9
...
@@ -139,11 +139,11 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr swar
...
@@ -139,11 +139,11 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr swar
// provided by the access function, and it is not
// provided by the access function, and it is not
// a property of a chunk provided to Accessor.Put.
// a property of a chunk provided to Accessor.Put.
i
,
err
:=
db
.
retrievalDataIndex
.
Get
(
item
)
i
,
err
:=
db
.
retrievalDataIndex
.
Get
(
item
)
switch
err
{
switch
{
case
nil
:
case
err
==
nil
:
item
.
StoreTimestamp
=
i
.
StoreTimestamp
item
.
StoreTimestamp
=
i
.
StoreTimestamp
item
.
BinID
=
i
.
BinID
item
.
BinID
=
i
.
BinID
case
leveldb
.
ErrNotFound
:
case
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
:
err
=
db
.
pushIndex
.
DeleteInBatch
(
batch
,
item
)
err
=
db
.
pushIndex
.
DeleteInBatch
(
batch
,
item
)
if
err
!=
nil
{
if
err
!=
nil
{
return
0
,
err
return
0
,
err
...
@@ -158,15 +158,15 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr swar
...
@@ -158,15 +158,15 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr swar
}
}
i
,
err
=
db
.
retrievalAccessIndex
.
Get
(
item
)
i
,
err
=
db
.
retrievalAccessIndex
.
Get
(
item
)
switch
err
{
switch
{
case
nil
:
case
err
==
nil
:
item
.
AccessTimestamp
=
i
.
AccessTimestamp
item
.
AccessTimestamp
=
i
.
AccessTimestamp
err
=
db
.
gcIndex
.
DeleteInBatch
(
batch
,
item
)
err
=
db
.
gcIndex
.
DeleteInBatch
(
batch
,
item
)
if
err
!=
nil
{
if
err
!=
nil
{
return
0
,
err
return
0
,
err
}
}
gcSizeChange
--
gcSizeChange
--
case
leveldb
.
ErrNotFound
:
case
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
:
// the chunk is not accessed before
// the chunk is not accessed before
default
:
default
:
return
0
,
err
return
0
,
err
...
@@ -205,7 +205,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod
...
@@ -205,7 +205,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod
i
,
err
:=
db
.
retrievalDataIndex
.
Get
(
item
)
i
,
err
:=
db
.
retrievalDataIndex
.
Get
(
item
)
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
// chunk is not found,
// chunk is not found,
// no need to update gc index
// no need to update gc index
// just delete from the push index
// just delete from the push index
...
@@ -228,7 +228,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod
...
@@ -228,7 +228,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod
// this prevents duplicate increments
// this prevents duplicate increments
i
,
err
:=
db
.
pullIndex
.
Get
(
item
)
i
,
err
:=
db
.
pullIndex
.
Get
(
item
)
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
// we handle this error internally, since this is an internal inconsistency of the indices
// we handle this error internally, since this is an internal inconsistency of the indices
// if we return the error here - it means that for example, in stream protocol peers which we sync
// if we return the error here - it means that for example, in stream protocol peers which we sync
// to would be dropped. this is possible when the chunk is put with ModePutRequest and ModeSetSyncPull is
// to would be dropped. this is possible when the chunk is put with ModePutRequest and ModeSetSyncPull is
...
@@ -263,7 +263,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod
...
@@ -263,7 +263,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod
case
storage
.
ModeSetSyncPush
:
case
storage
.
ModeSetSyncPush
:
i
,
err
:=
db
.
pushIndex
.
Get
(
item
)
i
,
err
:=
db
.
pushIndex
.
Get
(
item
)
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
// we handle this error internally, since this is an internal inconsistency of the indices
// we handle this error internally, since this is an internal inconsistency of the indices
// this error can happen if the chunk is put with ModePutRequest or ModePutSync
// this error can happen if the chunk is put with ModePutRequest or ModePutSync
// but this function is called with ModeSetSyncPush
// but this function is called with ModeSetSyncPush
...
@@ -295,15 +295,15 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod
...
@@ -295,15 +295,15 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod
}
}
i
,
err
=
db
.
retrievalAccessIndex
.
Get
(
item
)
i
,
err
=
db
.
retrievalAccessIndex
.
Get
(
item
)
switch
err
{
switch
{
case
nil
:
case
err
==
nil
:
item
.
AccessTimestamp
=
i
.
AccessTimestamp
item
.
AccessTimestamp
=
i
.
AccessTimestamp
err
=
db
.
gcIndex
.
DeleteInBatch
(
batch
,
item
)
err
=
db
.
gcIndex
.
DeleteInBatch
(
batch
,
item
)
if
err
!=
nil
{
if
err
!=
nil
{
return
0
,
err
return
0
,
err
}
}
gcSizeChange
--
gcSizeChange
--
case
leveldb
.
ErrNotFound
:
case
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
:
// the chunk is not accessed before
// the chunk is not accessed before
default
:
default
:
return
0
,
err
return
0
,
err
...
@@ -340,10 +340,10 @@ func (db *DB) setRemove(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange
...
@@ -340,10 +340,10 @@ func (db *DB) setRemove(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange
// provided by the access function, and it is not
// provided by the access function, and it is not
// a property of a chunk provided to Accessor.Put.
// a property of a chunk provided to Accessor.Put.
i
,
err
:=
db
.
retrievalAccessIndex
.
Get
(
item
)
i
,
err
:=
db
.
retrievalAccessIndex
.
Get
(
item
)
switch
err
{
switch
{
case
nil
:
case
err
==
nil
:
item
.
AccessTimestamp
=
i
.
AccessTimestamp
item
.
AccessTimestamp
=
i
.
AccessTimestamp
case
leveldb
.
ErrNotFound
:
case
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
:
default
:
default
:
return
0
,
err
return
0
,
err
}
}
...
@@ -390,7 +390,7 @@ func (db *DB) setPin(batch *leveldb.Batch, addr swarm.Address) (err error) {
...
@@ -390,7 +390,7 @@ func (db *DB) setPin(batch *leveldb.Batch, addr swarm.Address) (err error) {
existingPinCounter
:=
uint64
(
0
)
existingPinCounter
:=
uint64
(
0
)
pinnedChunk
,
err
:=
db
.
pinIndex
.
Get
(
item
)
pinnedChunk
,
err
:=
db
.
pinIndex
.
Get
(
item
)
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
// If this Address is not present in DB, then its a new entry
// If this Address is not present in DB, then its a new entry
existingPinCounter
=
0
existingPinCounter
=
0
...
...
pkg/localstore/mode_set_test.go
View file @
70ff4fa9
...
@@ -18,6 +18,7 @@ package localstore
...
@@ -18,6 +18,7 @@ package localstore
import
(
import
(
"context"
"context"
"errors"
"testing"
"testing"
"time"
"time"
...
@@ -336,13 +337,13 @@ func TestModeSetRemove(t *testing.T) {
...
@@ -336,13 +337,13 @@ func TestModeSetRemove(t *testing.T) {
for
_
,
ch
:=
range
chunks
{
for
_
,
ch
:=
range
chunks
{
wantErr
:=
leveldb
.
ErrNotFound
wantErr
:=
leveldb
.
ErrNotFound
_
,
err
:=
db
.
retrievalDataIndex
.
Get
(
addressToItem
(
ch
.
Address
()))
_
,
err
:=
db
.
retrievalDataIndex
.
Get
(
addressToItem
(
ch
.
Address
()))
if
err
!=
wantErr
{
if
!
errors
.
Is
(
err
,
wantErr
)
{
t
.
Errorf
(
"got error %v, want %v"
,
err
,
wantErr
)
t
.
Errorf
(
"got error %v, want %v"
,
err
,
wantErr
)
}
}
// access index should not be set
// access index should not be set
_
,
err
=
db
.
retrievalAccessIndex
.
Get
(
addressToItem
(
ch
.
Address
()))
_
,
err
=
db
.
retrievalAccessIndex
.
Get
(
addressToItem
(
ch
.
Address
()))
if
err
!=
wantErr
{
if
!
errors
.
Is
(
err
,
wantErr
)
{
t
.
Errorf
(
"got error %v, want %v"
,
err
,
wantErr
)
t
.
Errorf
(
"got error %v, want %v"
,
err
,
wantErr
)
}
}
}
}
...
...
pkg/localstore/pin.go
View file @
70ff4fa9
...
@@ -30,7 +30,7 @@ func (db *DB) PinnedChunks(ctx context.Context, cursor swarm.Address) (pinnedChu
...
@@ -30,7 +30,7 @@ func (db *DB) PinnedChunks(ctx context.Context, cursor swarm.Address) (pinnedChu
it
,
err
:=
db
.
pinIndex
.
First
(
prefix
)
it
,
err
:=
db
.
pinIndex
.
First
(
prefix
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"
pin chunks
: %w"
,
err
)
return
nil
,
fmt
.
Errorf
(
"
get first pin
: %w"
,
err
)
}
}
err
=
db
.
pinIndex
.
Iterate
(
func
(
item
shed
.
Item
)
(
stop
bool
,
err
error
)
{
err
=
db
.
pinIndex
.
Iterate
(
func
(
item
shed
.
Item
)
(
stop
bool
,
err
error
)
{
pinnedChunks
=
append
(
pinnedChunks
,
pinnedChunks
=
append
(
pinnedChunks
,
...
...
pkg/localstore/pin_test.go
View file @
70ff4fa9
...
@@ -6,6 +6,7 @@ package localstore
...
@@ -6,6 +6,7 @@ package localstore
import
(
import
(
"context"
"context"
"errors"
"sort"
"sort"
"testing"
"testing"
...
@@ -29,7 +30,7 @@ func TestPinning(t *testing.T) {
...
@@ -29,7 +30,7 @@ func TestPinning(t *testing.T) {
// Nothing should be there in the pinned DB
// Nothing should be there in the pinned DB
_
,
err
:=
db
.
PinnedChunks
(
context
.
Background
(),
swarm
.
NewAddress
([]
byte
{
0
}))
_
,
err
:=
db
.
PinnedChunks
(
context
.
Background
(),
swarm
.
NewAddress
([]
byte
{
0
}))
if
err
!=
nil
{
if
err
!=
nil
{
if
err
.
Error
()
!=
"pin chunks: leveldb: not found"
{
if
!
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
t
.
Fatal
(
err
)
t
.
Fatal
(
err
)
}
}
}
}
...
@@ -119,7 +120,7 @@ func TestPinInfo(t *testing.T) {
...
@@ -119,7 +120,7 @@ func TestPinInfo(t *testing.T) {
}
}
_
,
err
=
db
.
PinInfo
(
swarm
.
NewAddress
(
chunk
.
Address
()
.
Bytes
()))
_
,
err
=
db
.
PinInfo
(
swarm
.
NewAddress
(
chunk
.
Address
()
.
Bytes
()))
if
err
!=
nil
{
if
err
!=
nil
{
if
err
!=
leveldb
.
ErrNotFound
{
if
!
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
t
.
Fatal
(
err
)
t
.
Fatal
(
err
)
}
}
}
}
...
...
pkg/localstore/subscription_pull.go
View file @
70ff4fa9
...
@@ -187,7 +187,7 @@ func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
...
@@ -187,7 +187,7 @@ func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
item
,
err
:=
db
.
pullIndex
.
Last
([]
byte
{
bin
})
item
,
err
:=
db
.
pullIndex
.
Last
([]
byte
{
bin
})
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
return
0
,
nil
return
0
,
nil
}
}
return
0
,
err
return
0
,
err
...
...
pkg/shed/db.go
View file @
70ff4fa9
...
@@ -23,7 +23,8 @@
...
@@ -23,7 +23,8 @@
package
shed
package
shed
import
(
import
(
"github.com/ethersphere/bee/pkg/logging"
"errors"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/opt"
...
@@ -41,14 +42,13 @@ var (
...
@@ -41,14 +42,13 @@ var (
type
DB
struct
{
type
DB
struct
{
ldb
*
leveldb
.
DB
ldb
*
leveldb
.
DB
metrics
metrics
metrics
metrics
logger
logging
.
Logger
quit
chan
struct
{}
// Quit channel to stop the metrics collection before closing the database
quit
chan
struct
{}
// Quit channel to stop the metrics collection before closing the database
}
}
// NewDB constructs a new DB and validates the schema
// NewDB constructs a new DB and validates the schema
// if it exists in database on the given path.
// if it exists in database on the given path.
// metricsPrefix is used for metrics collection for the given DB.
// metricsPrefix is used for metrics collection for the given DB.
func
NewDB
(
path
string
,
logger
logging
.
Logger
)
(
db
*
DB
,
err
error
)
{
func
NewDB
(
path
string
)
(
db
*
DB
,
err
error
)
{
var
ldb
*
leveldb
.
DB
var
ldb
*
leveldb
.
DB
if
path
==
""
{
if
path
==
""
{
ldb
,
err
=
leveldb
.
Open
(
storage
.
NewMemStorage
(),
nil
)
ldb
,
err
=
leveldb
.
Open
(
storage
.
NewMemStorage
(),
nil
)
...
@@ -65,11 +65,10 @@ func NewDB(path string, logger logging.Logger) (db *DB, err error) {
...
@@ -65,11 +65,10 @@ func NewDB(path string, logger logging.Logger) (db *DB, err error) {
db
=
&
DB
{
db
=
&
DB
{
ldb
:
ldb
,
ldb
:
ldb
,
metrics
:
newMetrics
(),
metrics
:
newMetrics
(),
logger
:
logger
,
}
}
if
_
,
err
=
db
.
getSchema
();
err
!=
nil
{
if
_
,
err
=
db
.
getSchema
();
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
// save schema with initialized default fields
// save schema with initialized default fields
if
err
=
db
.
putSchema
(
schema
{
if
err
=
db
.
putSchema
(
schema
{
Fields
:
make
(
map
[
string
]
fieldSpec
),
Fields
:
make
(
map
[
string
]
fieldSpec
),
...
@@ -102,7 +101,7 @@ func (db *DB) Put(key []byte, value []byte) (err error) {
...
@@ -102,7 +101,7 @@ func (db *DB) Put(key []byte, value []byte) (err error) {
// Get wraps LevelDB Get method to increment metrics counter.
// Get wraps LevelDB Get method to increment metrics counter.
func
(
db
*
DB
)
Get
(
key
[]
byte
)
(
value
[]
byte
,
err
error
)
{
func
(
db
*
DB
)
Get
(
key
[]
byte
)
(
value
[]
byte
,
err
error
)
{
value
,
err
=
db
.
ldb
.
Get
(
key
,
nil
)
value
,
err
=
db
.
ldb
.
Get
(
key
,
nil
)
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
db
.
metrics
.
GetNotFoundCounter
.
Inc
()
db
.
metrics
.
GetNotFoundCounter
.
Inc
()
return
nil
,
err
return
nil
,
err
}
else
{
}
else
{
...
...
pkg/shed/db_test.go
View file @
70ff4fa9
...
@@ -20,8 +20,6 @@ import (
...
@@ -20,8 +20,6 @@ import (
"io/ioutil"
"io/ioutil"
"os"
"os"
"testing"
"testing"
"github.com/ethersphere/bee/pkg/logging"
)
)
// TestNewDB constructs a new DB
// TestNewDB constructs a new DB
...
@@ -56,9 +54,8 @@ func TestDB_persistence(t *testing.T) {
...
@@ -56,9 +54,8 @@ func TestDB_persistence(t *testing.T) {
t
.
Fatal
(
err
)
t
.
Fatal
(
err
)
}
}
defer
os
.
RemoveAll
(
dir
)
defer
os
.
RemoveAll
(
dir
)
logger
:=
logging
.
New
(
ioutil
.
Discard
,
0
)
db
,
err
:=
NewDB
(
dir
,
logger
)
db
,
err
:=
NewDB
(
dir
)
if
err
!=
nil
{
if
err
!=
nil
{
t
.
Fatal
(
err
)
t
.
Fatal
(
err
)
}
}
...
@@ -76,7 +73,7 @@ func TestDB_persistence(t *testing.T) {
...
@@ -76,7 +73,7 @@ func TestDB_persistence(t *testing.T) {
t
.
Fatal
(
err
)
t
.
Fatal
(
err
)
}
}
db2
,
err
:=
NewDB
(
dir
,
logger
)
db2
,
err
:=
NewDB
(
dir
)
if
err
!=
nil
{
if
err
!=
nil
{
t
.
Fatal
(
err
)
t
.
Fatal
(
err
)
}
}
...
@@ -98,8 +95,7 @@ func TestDB_persistence(t *testing.T) {
...
@@ -98,8 +95,7 @@ func TestDB_persistence(t *testing.T) {
// be called to remove the data.
// be called to remove the data.
func
newTestDB
(
t
*
testing
.
T
)
(
db
*
DB
,
cleanupFunc
func
())
{
func
newTestDB
(
t
*
testing
.
T
)
(
db
*
DB
,
cleanupFunc
func
())
{
t
.
Helper
()
t
.
Helper
()
logger
:=
logging
.
New
(
ioutil
.
Discard
,
0
)
db
,
err
:=
NewDB
(
""
)
db
,
err
:=
NewDB
(
""
,
logger
)
if
err
!=
nil
{
if
err
!=
nil
{
t
.
Fatal
(
err
)
t
.
Fatal
(
err
)
}
}
...
...
pkg/shed/example_store_test.go
View file @
70ff4fa9
...
@@ -20,12 +20,11 @@ import (
...
@@ -20,12 +20,11 @@ import (
"bytes"
"bytes"
"context"
"context"
"encoding/binary"
"encoding/binary"
"errors"
"fmt"
"fmt"
"io/ioutil"
"log"
"log"
"time"
"time"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/storage/testing"
...
@@ -52,8 +51,7 @@ type Store struct {
...
@@ -52,8 +51,7 @@ type Store struct {
// and possible conflicts with schema from existing database is checked
// and possible conflicts with schema from existing database is checked
// automatically.
// automatically.
func
New
(
path
string
)
(
s
*
Store
,
err
error
)
{
func
New
(
path
string
)
(
s
*
Store
,
err
error
)
{
logger
:=
logging
.
New
(
ioutil
.
Discard
,
0
)
db
,
err
:=
shed
.
NewDB
(
path
)
db
,
err
:=
shed
.
NewDB
(
path
,
logger
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
}
}
...
@@ -166,18 +164,18 @@ func (s *Store) Get(_ context.Context, addr swarm.Address) (c swarm.Chunk, err e
...
@@ -166,18 +164,18 @@ func (s *Store) Get(_ context.Context, addr swarm.Address) (c swarm.Chunk, err e
Address
:
addr
.
Bytes
(),
Address
:
addr
.
Bytes
(),
})
})
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
return
nil
,
storage
.
ErrNotFound
return
nil
,
storage
.
ErrNotFound
}
}
return
nil
,
err
return
nil
,
fmt
.
Errorf
(
"retrieval index get: %w"
,
err
)
}
}
// Get the chunk access timestamp.
// Get the chunk access timestamp.
accessItem
,
err
:=
s
.
accessIndex
.
Get
(
shed
.
Item
{
accessItem
,
err
:=
s
.
accessIndex
.
Get
(
shed
.
Item
{
Address
:
addr
.
Bytes
(),
Address
:
addr
.
Bytes
(),
})
})
switch
err
{
switch
{
case
nil
:
case
err
==
nil
:
// Remove gc index entry if access timestamp is found.
// Remove gc index entry if access timestamp is found.
err
=
s
.
gcIndex
.
DeleteInBatch
(
batch
,
shed
.
Item
{
err
=
s
.
gcIndex
.
DeleteInBatch
(
batch
,
shed
.
Item
{
Address
:
item
.
Address
,
Address
:
item
.
Address
,
...
@@ -185,13 +183,13 @@ func (s *Store) Get(_ context.Context, addr swarm.Address) (c swarm.Chunk, err e
...
@@ -185,13 +183,13 @@ func (s *Store) Get(_ context.Context, addr swarm.Address) (c swarm.Chunk, err e
AccessTimestamp
:
item
.
StoreTimestamp
,
AccessTimestamp
:
item
.
StoreTimestamp
,
})
})
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
fmt
.
Errorf
(
"gc index delete in batch: %w"
,
err
)
}
}
case
leveldb
.
ErrNotFound
:
case
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
:
// Access timestamp is not found. Do not do anything.
// Access timestamp is not found. Do not do anything.
// This is the firs
get request.
// This is the first
get request.
default
:
default
:
return
nil
,
err
return
nil
,
fmt
.
Errorf
(
"access index get: %w"
,
err
)
}
}
// Specify new access timestamp
// Specify new access timestamp
...
@@ -203,7 +201,7 @@ func (s *Store) Get(_ context.Context, addr swarm.Address) (c swarm.Chunk, err e
...
@@ -203,7 +201,7 @@ func (s *Store) Get(_ context.Context, addr swarm.Address) (c swarm.Chunk, err e
AccessTimestamp
:
accessTimestamp
,
AccessTimestamp
:
accessTimestamp
,
})
})
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
fmt
.
Errorf
(
"access index put in batch: %w"
,
err
)
}
}
// Put new access timestamp in gc index.
// Put new access timestamp in gc index.
...
@@ -213,20 +211,20 @@ func (s *Store) Get(_ context.Context, addr swarm.Address) (c swarm.Chunk, err e
...
@@ -213,20 +211,20 @@ func (s *Store) Get(_ context.Context, addr swarm.Address) (c swarm.Chunk, err e
StoreTimestamp
:
item
.
StoreTimestamp
,
StoreTimestamp
:
item
.
StoreTimestamp
,
})
})
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
fmt
.
Errorf
(
"gc index put in batch: %w"
,
err
)
}
}
// Increment access counter.
// Increment access counter.
// Currently this information is not used anywhere.
// Currently this information is not used anywhere.
_
,
err
=
s
.
accessCounter
.
IncInBatch
(
batch
)
_
,
err
=
s
.
accessCounter
.
IncInBatch
(
batch
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
fmt
.
Errorf
(
"access counter inc in batch: %w"
,
err
)
}
}
// Write the batch.
// Write the batch.
err
=
s
.
db
.
WriteBatch
(
batch
)
err
=
s
.
db
.
WriteBatch
(
batch
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
fmt
.
Errorf
(
"write batch: %w"
,
err
)
}
}
// Return the chunk.
// Return the chunk.
...
@@ -285,7 +283,7 @@ func (s *Store) CollectGarbage() (err error) {
...
@@ -285,7 +283,7 @@ func (s *Store) CollectGarbage() (err error) {
// string from a database field.
// string from a database field.
func
(
s
*
Store
)
GetSchema
()
(
name
string
,
err
error
)
{
func
(
s
*
Store
)
GetSchema
()
(
name
string
,
err
error
)
{
name
,
err
=
s
.
schemaName
.
Get
()
name
,
err
=
s
.
schemaName
.
Get
()
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
return
""
,
nil
return
""
,
nil
}
}
return
name
,
err
return
name
,
err
...
@@ -313,13 +311,13 @@ func Example_store() {
...
@@ -313,13 +311,13 @@ func Example_store() {
ch
:=
testing
.
GenerateTestRandomChunk
()
ch
:=
testing
.
GenerateTestRandomChunk
()
err
=
s
.
Put
(
context
.
Background
(),
ch
)
err
=
s
.
Put
(
context
.
Background
(),
ch
)
if
err
!=
nil
{
if
err
!=
nil
{
fmt
.
Println
(
err
)
fmt
.
Println
(
"put chunk:"
,
err
)
return
return
}
}
got
,
err
:=
s
.
Get
(
context
.
Background
(),
ch
.
Address
())
got
,
err
:=
s
.
Get
(
context
.
Background
(),
ch
.
Address
())
if
err
!=
nil
{
if
err
!=
nil
{
fmt
.
Println
(
err
)
fmt
.
Println
(
"get chunk:"
,
err
)
return
return
}
}
...
...
pkg/shed/field_string.go
View file @
70ff4fa9
...
@@ -17,16 +17,17 @@
...
@@ -17,16 +17,17 @@
package
shed
package
shed
import
(
import
(
"github.com/ethersphere/bee/pkg/logging"
"errors"
"fmt"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb"
)
)
// StringField is the most simple field implementation
// StringField is the most simple field implementation
// that stores an arbitrary string under a specific LevelDB key.
// that stores an arbitrary string under a specific LevelDB key.
type
StringField
struct
{
type
StringField
struct
{
db
*
DB
db
*
DB
key
[]
byte
key
[]
byte
logger
logging
.
Logger
}
}
// NewStringField retruns a new Instance of StringField.
// NewStringField retruns a new Instance of StringField.
...
@@ -34,12 +35,11 @@ type StringField struct {
...
@@ -34,12 +35,11 @@ type StringField struct {
func
(
db
*
DB
)
NewStringField
(
name
string
)
(
f
StringField
,
err
error
)
{
func
(
db
*
DB
)
NewStringField
(
name
string
)
(
f
StringField
,
err
error
)
{
key
,
err
:=
db
.
schemaFieldKey
(
name
,
"string"
)
key
,
err
:=
db
.
schemaFieldKey
(
name
,
"string"
)
if
err
!=
nil
{
if
err
!=
nil
{
return
f
,
err
return
f
,
fmt
.
Errorf
(
"get schema key: %w"
,
err
)
}
}
return
StringField
{
return
StringField
{
db
:
db
,
db
:
db
,
key
:
key
,
key
:
key
,
logger
:
db
.
logger
,
},
nil
},
nil
}
}
...
@@ -49,8 +49,7 @@ func (db *DB) NewStringField(name string) (f StringField, err error) {
...
@@ -49,8 +49,7 @@ func (db *DB) NewStringField(name string) (f StringField, err error) {
func
(
f
StringField
)
Get
()
(
val
string
,
err
error
)
{
func
(
f
StringField
)
Get
()
(
val
string
,
err
error
)
{
b
,
err
:=
f
.
db
.
Get
(
f
.
key
)
b
,
err
:=
f
.
db
.
Get
(
f
.
key
)
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
f
.
logger
.
Errorf
(
"key %s not found"
,
string
(
f
.
key
))
return
""
,
nil
return
""
,
nil
}
}
return
""
,
err
return
""
,
err
...
...
pkg/shed/field_struct.go
View file @
70ff4fa9
...
@@ -18,17 +18,16 @@ package shed
...
@@ -18,17 +18,16 @@ package shed
import
(
import
(
"encoding/json"
"encoding/json"
"fmt"
"github.com/ethersphere/bee/pkg/logging"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb"
)
)
// StructField is a helper to store complex structure by
// StructField is a helper to store complex structure by
// encoding it in RLP format.
// encoding it in RLP format.
type
StructField
struct
{
type
StructField
struct
{
db
*
DB
db
*
DB
key
[]
byte
key
[]
byte
logger
logging
.
Logger
}
}
// NewStructField returns a new StructField.
// NewStructField returns a new StructField.
...
@@ -36,12 +35,11 @@ type StructField struct {
...
@@ -36,12 +35,11 @@ type StructField struct {
func
(
db
*
DB
)
NewStructField
(
name
string
)
(
f
StructField
,
err
error
)
{
func
(
db
*
DB
)
NewStructField
(
name
string
)
(
f
StructField
,
err
error
)
{
key
,
err
:=
db
.
schemaFieldKey
(
name
,
"struct-rlp"
)
key
,
err
:=
db
.
schemaFieldKey
(
name
,
"struct-rlp"
)
if
err
!=
nil
{
if
err
!=
nil
{
return
f
,
err
return
f
,
fmt
.
Errorf
(
"get schema key: %w"
,
err
)
}
}
return
StructField
{
return
StructField
{
db
:
db
,
db
:
db
,
key
:
key
,
key
:
key
,
logger
:
db
.
logger
,
},
nil
},
nil
}
}
...
@@ -50,7 +48,6 @@ func (db *DB) NewStructField(name string) (f StructField, err error) {
...
@@ -50,7 +48,6 @@ func (db *DB) NewStructField(name string) (f StructField, err error) {
func
(
f
StructField
)
Get
(
val
interface
{})
(
err
error
)
{
func
(
f
StructField
)
Get
(
val
interface
{})
(
err
error
)
{
b
,
err
:=
f
.
db
.
Get
(
f
.
key
)
b
,
err
:=
f
.
db
.
Get
(
f
.
key
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"could not GET key %s"
,
string
(
f
.
key
))
return
err
return
err
}
}
return
json
.
Unmarshal
(
b
,
val
)
return
json
.
Unmarshal
(
b
,
val
)
...
@@ -60,7 +57,6 @@ func (f StructField) Get(val interface{}) (err error) {
...
@@ -60,7 +57,6 @@ func (f StructField) Get(val interface{}) (err error) {
func
(
f
StructField
)
Put
(
val
interface
{})
(
err
error
)
{
func
(
f
StructField
)
Put
(
val
interface
{})
(
err
error
)
{
b
,
err
:=
json
.
Marshal
(
val
)
b
,
err
:=
json
.
Marshal
(
val
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"could not PUT key %s"
,
string
(
f
.
key
))
return
err
return
err
}
}
return
f
.
db
.
Put
(
f
.
key
,
b
)
return
f
.
db
.
Put
(
f
.
key
,
b
)
...
@@ -70,7 +66,6 @@ func (f StructField) Put(val interface{}) (err error) {
...
@@ -70,7 +66,6 @@ func (f StructField) Put(val interface{}) (err error) {
func
(
f
StructField
)
PutInBatch
(
batch
*
leveldb
.
Batch
,
val
interface
{})
(
err
error
)
{
func
(
f
StructField
)
PutInBatch
(
batch
*
leveldb
.
Batch
,
val
interface
{})
(
err
error
)
{
b
,
err
:=
json
.
Marshal
(
val
)
b
,
err
:=
json
.
Marshal
(
val
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"could not PUT key %s in batch"
,
string
(
f
.
key
))
return
err
return
err
}
}
batch
.
Put
(
f
.
key
,
b
)
batch
.
Put
(
f
.
key
,
b
)
...
...
pkg/shed/field_uint64.go
View file @
70ff4fa9
...
@@ -18,17 +18,17 @@ package shed
...
@@ -18,17 +18,17 @@ package shed
import
(
import
(
"encoding/binary"
"encoding/binary"
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/logging"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb"
)
)
// Uint64Field provides a way to have a simple counter in the database.
// Uint64Field provides a way to have a simple counter in the database.
// It transparently encodes uint64 type value to bytes.
// It transparently encodes uint64 type value to bytes.
type
Uint64Field
struct
{
type
Uint64Field
struct
{
db
*
DB
db
*
DB
key
[]
byte
key
[]
byte
logger
logging
.
Logger
}
}
// NewUint64Field returns a new Uint64Field.
// NewUint64Field returns a new Uint64Field.
...
@@ -36,12 +36,11 @@ type Uint64Field struct {
...
@@ -36,12 +36,11 @@ type Uint64Field struct {
func
(
db
*
DB
)
NewUint64Field
(
name
string
)
(
f
Uint64Field
,
err
error
)
{
func
(
db
*
DB
)
NewUint64Field
(
name
string
)
(
f
Uint64Field
,
err
error
)
{
key
,
err
:=
db
.
schemaFieldKey
(
name
,
"uint64"
)
key
,
err
:=
db
.
schemaFieldKey
(
name
,
"uint64"
)
if
err
!=
nil
{
if
err
!=
nil
{
return
f
,
err
return
f
,
fmt
.
Errorf
(
"get schema key: %w"
,
err
)
}
}
return
Uint64Field
{
return
Uint64Field
{
db
:
db
,
db
:
db
,
key
:
key
,
key
:
key
,
logger
:
db
.
logger
,
},
nil
},
nil
}
}
...
@@ -51,8 +50,7 @@ func (db *DB) NewUint64Field(name string) (f Uint64Field, err error) {
...
@@ -51,8 +50,7 @@ func (db *DB) NewUint64Field(name string) (f Uint64Field, err error) {
func
(
f
Uint64Field
)
Get
()
(
val
uint64
,
err
error
)
{
func
(
f
Uint64Field
)
Get
()
(
val
uint64
,
err
error
)
{
b
,
err
:=
f
.
db
.
Get
(
f
.
key
)
b
,
err
:=
f
.
db
.
Get
(
f
.
key
)
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
f
.
logger
.
Errorf
(
"key %s not found"
,
string
(
f
.
key
))
return
0
,
nil
return
0
,
nil
}
}
return
0
,
err
return
0
,
err
...
@@ -76,12 +74,10 @@ func (f Uint64Field) PutInBatch(batch *leveldb.Batch, val uint64) {
...
@@ -76,12 +74,10 @@ func (f Uint64Field) PutInBatch(batch *leveldb.Batch, val uint64) {
func
(
f
Uint64Field
)
Inc
()
(
val
uint64
,
err
error
)
{
func
(
f
Uint64Field
)
Inc
()
(
val
uint64
,
err
error
)
{
val
,
err
=
f
.
Get
()
val
,
err
=
f
.
Get
()
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
f
.
logger
.
Debugf
(
"key %s not found"
,
string
(
f
.
key
))
val
=
0
val
=
0
}
else
{
}
else
{
f
.
logger
.
Errorf
(
"key %s not found. Error: %s"
,
string
(
f
.
key
),
err
.
Error
())
return
0
,
fmt
.
Errorf
(
"get value: %w"
,
err
)
return
0
,
err
}
}
}
}
val
++
val
++
...
@@ -94,12 +90,10 @@ func (f Uint64Field) Inc() (val uint64, err error) {
...
@@ -94,12 +90,10 @@ func (f Uint64Field) Inc() (val uint64, err error) {
func
(
f
Uint64Field
)
IncInBatch
(
batch
*
leveldb
.
Batch
)
(
val
uint64
,
err
error
)
{
func
(
f
Uint64Field
)
IncInBatch
(
batch
*
leveldb
.
Batch
)
(
val
uint64
,
err
error
)
{
val
,
err
=
f
.
Get
()
val
,
err
=
f
.
Get
()
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
f
.
logger
.
Debugf
(
"key %s not found"
,
string
(
f
.
key
))
val
=
0
val
=
0
}
else
{
}
else
{
f
.
logger
.
Errorf
(
"key %s not found. Error: %s"
,
string
(
f
.
key
),
err
.
Error
())
return
0
,
fmt
.
Errorf
(
"get value: %w"
,
err
)
return
0
,
err
}
}
}
}
val
++
val
++
...
@@ -113,12 +107,10 @@ func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error) {
...
@@ -113,12 +107,10 @@ func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error) {
func
(
f
Uint64Field
)
Dec
()
(
val
uint64
,
err
error
)
{
func
(
f
Uint64Field
)
Dec
()
(
val
uint64
,
err
error
)
{
val
,
err
=
f
.
Get
()
val
,
err
=
f
.
Get
()
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
f
.
logger
.
Debugf
(
"key %s not found"
,
string
(
f
.
key
))
val
=
0
val
=
0
}
else
{
}
else
{
f
.
logger
.
Errorf
(
"key %s not found. Error: %s"
,
string
(
f
.
key
),
err
.
Error
())
return
0
,
fmt
.
Errorf
(
"get value: %w"
,
err
)
return
0
,
err
}
}
}
}
if
val
!=
0
{
if
val
!=
0
{
...
@@ -134,12 +126,10 @@ func (f Uint64Field) Dec() (val uint64, err error) {
...
@@ -134,12 +126,10 @@ func (f Uint64Field) Dec() (val uint64, err error) {
func
(
f
Uint64Field
)
DecInBatch
(
batch
*
leveldb
.
Batch
)
(
val
uint64
,
err
error
)
{
func
(
f
Uint64Field
)
DecInBatch
(
batch
*
leveldb
.
Batch
)
(
val
uint64
,
err
error
)
{
val
,
err
=
f
.
Get
()
val
,
err
=
f
.
Get
()
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
errors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
f
.
logger
.
Debugf
(
"key %s not found"
,
string
(
f
.
key
))
val
=
0
val
=
0
}
else
{
}
else
{
f
.
logger
.
Errorf
(
"key %s not found. Error: %s"
,
string
(
f
.
key
),
err
.
Error
())
return
0
,
fmt
.
Errorf
(
"get value: %w"
,
err
)
return
0
,
err
}
}
}
}
if
val
!=
0
{
if
val
!=
0
{
...
...
pkg/shed/index.go
View file @
70ff4fa9
...
@@ -18,8 +18,9 @@ package shed
...
@@ -18,8 +18,9 @@ package shed
import
(
import
(
"bytes"
"bytes"
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/logging"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/iterator"
)
)
...
@@ -49,7 +50,7 @@ type Item struct {
...
@@ -49,7 +50,7 @@ type Item struct {
// Merge is a helper method to construct a new
// Merge is a helper method to construct a new
// Item by filling up fields with default values
// Item by filling up fields with default values
// of a particular Item with values from another one.
// of a particular Item with values from another one.
func
(
i
Item
)
Merge
(
i2
Item
)
(
new
Item
)
{
func
(
i
Item
)
Merge
(
i2
Item
)
Item
{
if
i
.
Address
==
nil
{
if
i
.
Address
==
nil
{
i
.
Address
=
i2
.
Address
i
.
Address
=
i2
.
Address
}
}
...
@@ -83,7 +84,6 @@ func (i Item) Merge(i2 Item) (new Item) {
...
@@ -83,7 +84,6 @@ func (i Item) Merge(i2 Item) (new Item) {
// It implements IndexIteratorInterface interface.
// It implements IndexIteratorInterface interface.
type
Index
struct
{
type
Index
struct
{
db
*
DB
db
*
DB
logger
logging
.
Logger
prefix
[]
byte
prefix
[]
byte
encodeKeyFunc
func
(
fields
Item
)
(
key
[]
byte
,
err
error
)
encodeKeyFunc
func
(
fields
Item
)
(
key
[]
byte
,
err
error
)
decodeKeyFunc
func
(
key
[]
byte
)
(
e
Item
,
err
error
)
decodeKeyFunc
func
(
key
[]
byte
)
(
e
Item
,
err
error
)
...
@@ -106,12 +106,11 @@ type IndexFuncs struct {
...
@@ -106,12 +106,11 @@ type IndexFuncs struct {
func
(
db
*
DB
)
NewIndex
(
name
string
,
funcs
IndexFuncs
)
(
f
Index
,
err
error
)
{
func
(
db
*
DB
)
NewIndex
(
name
string
,
funcs
IndexFuncs
)
(
f
Index
,
err
error
)
{
id
,
err
:=
db
.
schemaIndexPrefix
(
name
)
id
,
err
:=
db
.
schemaIndexPrefix
(
name
)
if
err
!=
nil
{
if
err
!=
nil
{
return
f
,
err
return
f
,
fmt
.
Errorf
(
"get schema index prefix: %w"
,
err
)
}
}
prefix
:=
[]
byte
{
id
}
prefix
:=
[]
byte
{
id
}
return
Index
{
return
Index
{
db
:
db
,
db
:
db
,
logger
:
db
.
logger
,
prefix
:
prefix
,
prefix
:
prefix
,
// This function adjusts Index LevelDB key
// This function adjusts Index LevelDB key
// by appending the provided index id byte.
// by appending the provided index id byte.
...
@@ -141,18 +140,15 @@ func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error) {
...
@@ -141,18 +140,15 @@ func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error) {
func
(
f
Index
)
Get
(
keyFields
Item
)
(
out
Item
,
err
error
)
{
func
(
f
Index
)
Get
(
keyFields
Item
)
(
out
Item
,
err
error
)
{
key
,
err
:=
f
.
encodeKeyFunc
(
keyFields
)
key
,
err
:=
f
.
encodeKeyFunc
(
keyFields
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"keyfields encoding error in Get. Error: %s"
,
err
.
Error
())
return
out
,
fmt
.
Errorf
(
"encode key: %w"
,
err
)
return
out
,
err
}
}
value
,
err
:=
f
.
db
.
Get
(
key
)
value
,
err
:=
f
.
db
.
Get
(
key
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"error getting key %s in Get. Error: %s"
,
string
(
key
),
err
.
Error
())
return
out
,
fmt
.
Errorf
(
"get value: %w"
,
err
)
return
out
,
err
}
}
out
,
err
=
f
.
decodeValueFunc
(
keyFields
,
value
)
out
,
err
=
f
.
decodeValueFunc
(
keyFields
,
value
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"error decofing keyfields in Get. Error: %s"
,
err
.
Error
())
return
out
,
fmt
.
Errorf
(
"decode value: %w"
,
err
)
return
out
,
err
}
}
return
out
.
Merge
(
keyFields
),
nil
return
out
.
Merge
(
keyFields
),
nil
}
}
...
@@ -166,26 +162,22 @@ func (f Index) Get(keyFields Item) (out Item, err error) {
...
@@ -166,26 +162,22 @@ func (f Index) Get(keyFields Item) (out Item, err error) {
func
(
f
Index
)
Fill
(
items
[]
Item
)
(
err
error
)
{
func
(
f
Index
)
Fill
(
items
[]
Item
)
(
err
error
)
{
snapshot
,
err
:=
f
.
db
.
ldb
.
GetSnapshot
()
snapshot
,
err
:=
f
.
db
.
ldb
.
GetSnapshot
()
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"error getting snapshot in Fill. Error: %s"
,
err
.
Error
())
return
fmt
.
Errorf
(
"get snapshot: %w"
,
err
)
return
err
}
}
defer
snapshot
.
Release
()
defer
snapshot
.
Release
()
for
i
,
item
:=
range
items
{
for
i
,
item
:=
range
items
{
key
,
err
:=
f
.
encodeKeyFunc
(
item
)
key
,
err
:=
f
.
encodeKeyFunc
(
item
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"keyfields encoding error in Fill. Error: %s"
,
err
.
Error
())
return
fmt
.
Errorf
(
"encode key: %w"
,
err
)
return
err
}
}
value
,
err
:=
snapshot
.
Get
(
key
,
nil
)
value
,
err
:=
snapshot
.
Get
(
key
,
nil
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"error getting key %s in Fill. Error: %s"
,
string
(
key
),
err
.
Error
())
return
fmt
.
Errorf
(
"get value: %w"
,
err
)
return
err
}
}
v
,
err
:=
f
.
decodeValueFunc
(
item
,
value
)
v
,
err
:=
f
.
decodeValueFunc
(
item
,
value
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"error decofing keyfields in Fill . Error: %s"
,
err
.
Error
())
return
fmt
.
Errorf
(
"decode value: %w"
,
err
)
return
err
}
}
items
[
i
]
=
v
.
Merge
(
item
)
items
[
i
]
=
v
.
Merge
(
item
)
}
}
...
@@ -198,8 +190,7 @@ func (f Index) Fill(items []Item) (err error) {
...
@@ -198,8 +190,7 @@ func (f Index) Fill(items []Item) (err error) {
func
(
f
Index
)
Has
(
keyFields
Item
)
(
bool
,
error
)
{
func
(
f
Index
)
Has
(
keyFields
Item
)
(
bool
,
error
)
{
key
,
err
:=
f
.
encodeKeyFunc
(
keyFields
)
key
,
err
:=
f
.
encodeKeyFunc
(
keyFields
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"keyfields encoding error in Has. Error: %s"
,
err
.
Error
())
return
false
,
fmt
.
Errorf
(
"encode key: %w"
,
err
)
return
false
,
err
}
}
return
f
.
db
.
Has
(
key
)
return
f
.
db
.
Has
(
key
)
}
}
...
@@ -210,20 +201,17 @@ func (f Index) HasMulti(items ...Item) ([]bool, error) {
...
@@ -210,20 +201,17 @@ func (f Index) HasMulti(items ...Item) ([]bool, error) {
have
:=
make
([]
bool
,
len
(
items
))
have
:=
make
([]
bool
,
len
(
items
))
snapshot
,
err
:=
f
.
db
.
ldb
.
GetSnapshot
()
snapshot
,
err
:=
f
.
db
.
ldb
.
GetSnapshot
()
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"error getting snapshot in HasMulti. Error: %s"
,
err
.
Error
())
return
nil
,
fmt
.
Errorf
(
"get snapshot: %w"
,
err
)
return
nil
,
err
}
}
defer
snapshot
.
Release
()
defer
snapshot
.
Release
()
for
i
,
keyFields
:=
range
items
{
for
i
,
keyFields
:=
range
items
{
key
,
err
:=
f
.
encodeKeyFunc
(
keyFields
)
key
,
err
:=
f
.
encodeKeyFunc
(
keyFields
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"keyfields encoding error in HasMulti. Error: %s"
,
err
.
Error
())
return
nil
,
fmt
.
Errorf
(
"encode key for address %x: %w"
,
keyFields
.
Address
,
err
)
return
nil
,
err
}
}
have
[
i
],
err
=
snapshot
.
Has
(
key
,
nil
)
have
[
i
],
err
=
snapshot
.
Has
(
key
,
nil
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"snaoshot Has error in HasMulti. Error: %s"
,
err
.
Error
())
return
nil
,
fmt
.
Errorf
(
"has key for address %x: %w"
,
keyFields
.
Address
,
err
)
return
nil
,
err
}
}
}
}
return
have
,
nil
return
have
,
nil
...
@@ -234,13 +222,11 @@ func (f Index) HasMulti(items ...Item) ([]bool, error) {
...
@@ -234,13 +222,11 @@ func (f Index) HasMulti(items ...Item) ([]bool, error) {
func
(
f
Index
)
Put
(
i
Item
)
(
err
error
)
{
func
(
f
Index
)
Put
(
i
Item
)
(
err
error
)
{
key
,
err
:=
f
.
encodeKeyFunc
(
i
)
key
,
err
:=
f
.
encodeKeyFunc
(
i
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"keyfields encoding error in Put. Error: %s"
,
err
.
Error
())
return
fmt
.
Errorf
(
"encode key: %w"
,
err
)
return
err
}
}
value
,
err
:=
f
.
encodeValueFunc
(
i
)
value
,
err
:=
f
.
encodeValueFunc
(
i
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"keyfields encoding error in Put. Error: %s"
,
err
.
Error
())
return
fmt
.
Errorf
(
"encode value: %w"
,
err
)
return
err
}
}
return
f
.
db
.
Put
(
key
,
value
)
return
f
.
db
.
Put
(
key
,
value
)
}
}
...
@@ -251,13 +237,11 @@ func (f Index) Put(i Item) (err error) {
...
@@ -251,13 +237,11 @@ func (f Index) Put(i Item) (err error) {
func
(
f
Index
)
PutInBatch
(
batch
*
leveldb
.
Batch
,
i
Item
)
(
err
error
)
{
func
(
f
Index
)
PutInBatch
(
batch
*
leveldb
.
Batch
,
i
Item
)
(
err
error
)
{
key
,
err
:=
f
.
encodeKeyFunc
(
i
)
key
,
err
:=
f
.
encodeKeyFunc
(
i
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"keyfields encoding error in PutInBatch. Error: %s"
,
err
.
Error
())
return
fmt
.
Errorf
(
"encode key: %w"
,
err
)
return
err
}
}
value
,
err
:=
f
.
encodeValueFunc
(
i
)
value
,
err
:=
f
.
encodeValueFunc
(
i
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"keyfields encoding error in PutInBatch. Error: %s"
,
err
.
Error
())
return
fmt
.
Errorf
(
"encode value: %w"
,
err
)
return
err
}
}
batch
.
Put
(
key
,
value
)
batch
.
Put
(
key
,
value
)
return
nil
return
nil
...
@@ -268,8 +252,7 @@ func (f Index) PutInBatch(batch *leveldb.Batch, i Item) (err error) {
...
@@ -268,8 +252,7 @@ func (f Index) PutInBatch(batch *leveldb.Batch, i Item) (err error) {
func
(
f
Index
)
Delete
(
keyFields
Item
)
(
err
error
)
{
func
(
f
Index
)
Delete
(
keyFields
Item
)
(
err
error
)
{
key
,
err
:=
f
.
encodeKeyFunc
(
keyFields
)
key
,
err
:=
f
.
encodeKeyFunc
(
keyFields
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"keyfields encoding error in Delete. Error: %s"
,
err
.
Error
())
return
fmt
.
Errorf
(
"encode key: %w"
,
err
)
return
err
}
}
return
f
.
db
.
Delete
(
key
)
return
f
.
db
.
Delete
(
key
)
}
}
...
@@ -279,8 +262,7 @@ func (f Index) Delete(keyFields Item) (err error) {
...
@@ -279,8 +262,7 @@ func (f Index) Delete(keyFields Item) (err error) {
func
(
f
Index
)
DeleteInBatch
(
batch
*
leveldb
.
Batch
,
keyFields
Item
)
(
err
error
)
{
func
(
f
Index
)
DeleteInBatch
(
batch
*
leveldb
.
Batch
,
keyFields
Item
)
(
err
error
)
{
key
,
err
:=
f
.
encodeKeyFunc
(
keyFields
)
key
,
err
:=
f
.
encodeKeyFunc
(
keyFields
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"keyfields encoding error in DeleteInBatch. Error: %s"
,
err
.
Error
())
return
fmt
.
Errorf
(
"encode key: %w"
,
err
)
return
err
}
}
batch
.
Delete
(
key
)
batch
.
Delete
(
key
)
return
nil
return
nil
...
@@ -318,8 +300,7 @@ func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error) {
...
@@ -318,8 +300,7 @@ func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error) {
// start from the provided StartFrom Item key value
// start from the provided StartFrom Item key value
startKey
,
err
=
f
.
encodeKeyFunc
(
*
options
.
StartFrom
)
startKey
,
err
=
f
.
encodeKeyFunc
(
*
options
.
StartFrom
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"keyfields encoding error in Iterate. Error: %s"
,
err
.
Error
())
return
fmt
.
Errorf
(
"encode key: %w"
,
err
)
return
err
}
}
}
}
it
:=
f
.
db
.
NewIterator
()
it
:=
f
.
db
.
NewIterator
()
...
@@ -328,7 +309,6 @@ func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error) {
...
@@ -328,7 +309,6 @@ func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error) {
// move the cursor to the start key
// move the cursor to the start key
ok
:=
it
.
Seek
(
startKey
)
ok
:=
it
.
Seek
(
startKey
)
if
!
ok
{
if
!
ok
{
f
.
logger
.
Debugf
(
"seek error in Iterate. Error: %s"
,
it
.
Error
())
// stop iterator if seek has failed
// stop iterator if seek has failed
return
it
.
Error
()
return
it
.
Error
()
}
}
...
@@ -340,15 +320,14 @@ func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error) {
...
@@ -340,15 +320,14 @@ func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error) {
for
;
ok
;
ok
=
it
.
Next
()
{
for
;
ok
;
ok
=
it
.
Next
()
{
item
,
err
:=
f
.
itemFromIterator
(
it
,
prefix
)
item
,
err
:=
f
.
itemFromIterator
(
it
,
prefix
)
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
break
break
}
}
return
err
return
fmt
.
Errorf
(
"get item from iterator: %w"
,
err
)
}
}
stop
,
err
:=
fn
(
item
)
stop
,
err
:=
fn
(
item
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"error executing callback function in Iterate. Error: %s"
,
err
.
Error
())
return
fmt
.
Errorf
(
"index iterator function: %w"
,
err
)
return
err
}
}
if
stop
{
if
stop
{
break
break
...
@@ -382,14 +361,12 @@ func (f Index) itemFromIterator(it iterator.Iterator, totalPrefix []byte) (i Ite
...
@@ -382,14 +361,12 @@ func (f Index) itemFromIterator(it iterator.Iterator, totalPrefix []byte) (i Ite
// create a copy of key byte slice not to share leveldb underlaying slice array
// create a copy of key byte slice not to share leveldb underlaying slice array
keyItem
,
err
:=
f
.
decodeKeyFunc
(
append
([]
byte
(
nil
),
key
...
))
keyItem
,
err
:=
f
.
decodeKeyFunc
(
append
([]
byte
(
nil
),
key
...
))
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"error decoding key in itemFromIterator. Error: %s"
,
err
.
Error
())
return
i
,
fmt
.
Errorf
(
"decode key: %w"
,
err
)
return
i
,
err
}
}
// create a copy of value byte slice not to share leveldb underlaying slice array
// create a copy of value byte slice not to share leveldb underlaying slice array
valueItem
,
err
:=
f
.
decodeValueFunc
(
keyItem
,
append
([]
byte
(
nil
),
it
.
Value
()
...
))
valueItem
,
err
:=
f
.
decodeValueFunc
(
keyItem
,
append
([]
byte
(
nil
),
it
.
Value
()
...
))
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"error decoding value in itemFromIterator. Error: %s"
,
err
.
Error
())
return
i
,
fmt
.
Errorf
(
"decode value: %w"
,
err
)
return
i
,
err
}
}
return
keyItem
.
Merge
(
valueItem
),
it
.
Error
()
return
keyItem
.
Merge
(
valueItem
),
it
.
Error
()
}
}
...
@@ -460,8 +437,7 @@ func (f Index) Count() (count int, err error) {
...
@@ -460,8 +437,7 @@ func (f Index) Count() (count int, err error) {
func
(
f
Index
)
CountFrom
(
start
Item
)
(
count
int
,
err
error
)
{
func
(
f
Index
)
CountFrom
(
start
Item
)
(
count
int
,
err
error
)
{
startKey
,
err
:=
f
.
encodeKeyFunc
(
start
)
startKey
,
err
:=
f
.
encodeKeyFunc
(
start
)
if
err
!=
nil
{
if
err
!=
nil
{
f
.
logger
.
Debugf
(
"error encoding item in CountFrom. Error: %s"
,
err
.
Error
())
return
0
,
fmt
.
Errorf
(
"encode key: %w"
,
err
)
return
0
,
err
}
}
it
:=
f
.
db
.
NewIterator
()
it
:=
f
.
db
.
NewIterator
()
defer
it
.
Release
()
defer
it
.
Release
()
...
...
pkg/shed/index_test.go
View file @
70ff4fa9
...
@@ -19,6 +19,7 @@ package shed
...
@@ -19,6 +19,7 @@ package shed
import
(
import
(
"bytes"
"bytes"
"encoding/binary"
"encoding/binary"
"errors"
"fmt"
"fmt"
"sort"
"sort"
"testing"
"testing"
...
@@ -254,7 +255,7 @@ func TestIndex(t *testing.T) {
...
@@ -254,7 +255,7 @@ func TestIndex(t *testing.T) {
_
,
err
=
index
.
Get
(
Item
{
_
,
err
=
index
.
Get
(
Item
{
Address
:
want
.
Address
,
Address
:
want
.
Address
,
})
})
if
err
!=
wantErr
{
if
!
errors
.
Is
(
err
,
wantErr
)
{
t
.
Fatalf
(
"got error %v, want %v"
,
err
,
wantErr
)
t
.
Fatalf
(
"got error %v, want %v"
,
err
,
wantErr
)
}
}
})
})
...
@@ -294,7 +295,7 @@ func TestIndex(t *testing.T) {
...
@@ -294,7 +295,7 @@ func TestIndex(t *testing.T) {
_
,
err
=
index
.
Get
(
Item
{
_
,
err
=
index
.
Get
(
Item
{
Address
:
want
.
Address
,
Address
:
want
.
Address
,
})
})
if
err
!=
wantErr
{
if
!
errors
.
Is
(
err
,
wantErr
)
{
t
.
Fatalf
(
"got error %v, want %v"
,
err
,
wantErr
)
t
.
Fatalf
(
"got error %v, want %v"
,
err
,
wantErr
)
}
}
})
})
...
@@ -355,7 +356,7 @@ func TestIndex(t *testing.T) {
...
@@ -355,7 +356,7 @@ func TestIndex(t *testing.T) {
})
})
want
:=
leveldb
.
ErrNotFound
want
:=
leveldb
.
ErrNotFound
err
:=
index
.
Fill
(
items
)
err
:=
index
.
Fill
(
items
)
if
err
!=
want
{
if
!
errors
.
Is
(
err
,
want
)
{
t
.
Errorf
(
"got error %v, want %v"
,
err
,
want
)
t
.
Errorf
(
"got error %v, want %v"
,
err
,
want
)
}
}
})
})
...
@@ -1008,21 +1009,17 @@ func TestIndex_firstAndLast(t *testing.T) {
...
@@ -1008,21 +1009,17 @@ func TestIndex_firstAndLast(t *testing.T) {
},
},
}
{
}
{
got
,
err
:=
index
.
Last
(
tc
.
prefix
)
got
,
err
:=
index
.
Last
(
tc
.
prefix
)
if
tc
.
err
!=
err
{
if
!
errors
.
Is
(
err
,
tc
.
err
)
{
t
.
Errorf
(
"got error %v for Last with prefix %v, want %v"
,
err
,
tc
.
prefix
,
tc
.
err
)
t
.
Errorf
(
"got error %v for Last with prefix %v, want %v"
,
err
,
tc
.
prefix
,
tc
.
err
)
}
else
{
}
else
if
!
bytes
.
Equal
(
got
.
Address
,
tc
.
last
)
{
if
!
bytes
.
Equal
(
got
.
Address
,
tc
.
last
)
{
t
.
Errorf
(
"got %v for Last with prefix %v, want %v"
,
got
.
Address
,
tc
.
prefix
,
tc
.
last
)
t
.
Errorf
(
"got %v for Last with prefix %v, want %v"
,
got
.
Address
,
tc
.
prefix
,
tc
.
last
)
}
}
}
got
,
err
=
index
.
First
(
tc
.
prefix
)
got
,
err
=
index
.
First
(
tc
.
prefix
)
if
tc
.
err
!=
err
{
if
!
errors
.
Is
(
err
,
tc
.
err
)
{
t
.
Errorf
(
"got error %v for First with prefix %v, want %v"
,
err
,
tc
.
prefix
,
tc
.
err
)
t
.
Errorf
(
"got error %v for First with prefix %v, want %v"
,
err
,
tc
.
prefix
,
tc
.
err
)
}
else
{
}
else
if
!
bytes
.
Equal
(
got
.
Address
,
tc
.
first
)
{
if
!
bytes
.
Equal
(
got
.
Address
,
tc
.
first
)
{
t
.
Errorf
(
"got %v for First with prefix %v, want %v"
,
got
.
Address
,
tc
.
prefix
,
tc
.
first
)
t
.
Errorf
(
"got %v for First with prefix %v, want %v"
,
got
.
Address
,
tc
.
prefix
,
tc
.
first
)
}
}
}
}
}
}
}
...
...
pkg/shed/schema.go
View file @
70ff4fa9
...
@@ -63,7 +63,7 @@ func (db *DB) schemaFieldKey(name, fieldType string) (key []byte, err error) {
...
@@ -63,7 +63,7 @@ func (db *DB) schemaFieldKey(name, fieldType string) (key []byte, err error) {
}
}
s
,
err
:=
db
.
getSchema
()
s
,
err
:=
db
.
getSchema
()
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
fmt
.
Errorf
(
"get schema: %w"
,
err
)
}
}
var
found
bool
var
found
bool
for
n
,
f
:=
range
s
.
Fields
{
for
n
,
f
:=
range
s
.
Fields
{
...
@@ -80,7 +80,7 @@ func (db *DB) schemaFieldKey(name, fieldType string) (key []byte, err error) {
...
@@ -80,7 +80,7 @@ func (db *DB) schemaFieldKey(name, fieldType string) (key []byte, err error) {
}
}
err
:=
db
.
putSchema
(
s
)
err
:=
db
.
putSchema
(
s
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
fmt
.
Errorf
(
"put schema: %w"
,
err
)
}
}
}
}
return
append
([]
byte
{
keyPrefixFields
},
[]
byte
(
name
)
...
),
nil
return
append
([]
byte
{
keyPrefixFields
},
[]
byte
(
name
)
...
),
nil
...
@@ -102,7 +102,7 @@ func (db *DB) RenameIndex(name, newName string) (renamed bool, err error) {
...
@@ -102,7 +102,7 @@ func (db *DB) RenameIndex(name, newName string) (renamed bool, err error) {
}
}
s
,
err
:=
db
.
getSchema
()
s
,
err
:=
db
.
getSchema
()
if
err
!=
nil
{
if
err
!=
nil
{
return
false
,
err
return
false
,
fmt
.
Errorf
(
"get schema: %w"
,
err
)
}
}
for
i
,
f
:=
range
s
.
Indexes
{
for
i
,
f
:=
range
s
.
Indexes
{
if
f
.
Name
==
name
{
if
f
.
Name
==
name
{
...
@@ -126,7 +126,7 @@ func (db *DB) schemaIndexPrefix(name string) (id byte, err error) {
...
@@ -126,7 +126,7 @@ func (db *DB) schemaIndexPrefix(name string) (id byte, err error) {
}
}
s
,
err
:=
db
.
getSchema
()
s
,
err
:=
db
.
getSchema
()
if
err
!=
nil
{
if
err
!=
nil
{
return
0
,
err
return
0
,
fmt
.
Errorf
(
"get schema: %w"
,
err
)
}
}
nextID
:=
keyPrefixIndexStart
nextID
:=
keyPrefixIndexStart
for
i
,
f
:=
range
s
.
Indexes
{
for
i
,
f
:=
range
s
.
Indexes
{
...
...
pkg/shed/vector_uint64.go
View file @
70ff4fa9
...
@@ -18,17 +18,17 @@ package shed
...
@@ -18,17 +18,17 @@ package shed
import
(
import
(
"encoding/binary"
"encoding/binary"
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/logging"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb"
)
)
// Uint64Vector provides a way to have multiple counters in the database.
// Uint64Vector provides a way to have multiple counters in the database.
// It transparently encodes uint64 type value to bytes.
// It transparently encodes uint64 type value to bytes.
type
Uint64Vector
struct
{
type
Uint64Vector
struct
{
db
*
DB
db
*
DB
key
[]
byte
key
[]
byte
logger
logging
.
Logger
}
}
// NewUint64Vector returns a new Uint64Vector.
// NewUint64Vector returns a new Uint64Vector.
...
@@ -36,12 +36,11 @@ type Uint64Vector struct {
...
@@ -36,12 +36,11 @@ type Uint64Vector struct {
func
(
db
*
DB
)
NewUint64Vector
(
name
string
)
(
f
Uint64Vector
,
err
error
)
{
func
(
db
*
DB
)
NewUint64Vector
(
name
string
)
(
f
Uint64Vector
,
err
error
)
{
key
,
err
:=
db
.
schemaFieldKey
(
name
,
"vector-uint64"
)
key
,
err
:=
db
.
schemaFieldKey
(
name
,
"vector-uint64"
)
if
err
!=
nil
{
if
err
!=
nil
{
return
f
,
err
return
f
,
fmt
.
Errorf
(
"get schema key: %w"
,
err
)
}
}
return
Uint64Vector
{
return
Uint64Vector
{
db
:
db
,
db
:
db
,
key
:
key
,
key
:
key
,
logger
:
db
.
logger
,
},
nil
},
nil
}
}
...
@@ -51,7 +50,7 @@ func (db *DB) NewUint64Vector(name string) (f Uint64Vector, err error) {
...
@@ -51,7 +50,7 @@ func (db *DB) NewUint64Vector(name string) (f Uint64Vector, err error) {
func
(
f
Uint64Vector
)
Get
(
i
uint64
)
(
val
uint64
,
err
error
)
{
func
(
f
Uint64Vector
)
Get
(
i
uint64
)
(
val
uint64
,
err
error
)
{
b
,
err
:=
f
.
db
.
Get
(
f
.
indexKey
(
i
))
b
,
err
:=
f
.
db
.
Get
(
f
.
indexKey
(
i
))
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
return
0
,
nil
return
0
,
nil
}
}
return
0
,
err
return
0
,
err
...
@@ -75,10 +74,9 @@ func (f Uint64Vector) PutInBatch(batch *leveldb.Batch, i, val uint64) {
...
@@ -75,10 +74,9 @@ func (f Uint64Vector) PutInBatch(batch *leveldb.Batch, i, val uint64) {
func
(
f
Uint64Vector
)
Inc
(
i
uint64
)
(
val
uint64
,
err
error
)
{
func
(
f
Uint64Vector
)
Inc
(
i
uint64
)
(
val
uint64
,
err
error
)
{
val
,
err
=
f
.
Get
(
i
)
val
,
err
=
f
.
Get
(
i
)
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
val
=
0
val
=
0
}
else
{
}
else
{
f
.
logger
.
Debugf
(
"error getiing value while doing Inc. Error: %s"
,
err
.
Error
())
return
0
,
err
return
0
,
err
}
}
}
}
...
@@ -92,10 +90,9 @@ func (f Uint64Vector) Inc(i uint64) (val uint64, err error) {
...
@@ -92,10 +90,9 @@ func (f Uint64Vector) Inc(i uint64) (val uint64, err error) {
func
(
f
Uint64Vector
)
IncInBatch
(
batch
*
leveldb
.
Batch
,
i
uint64
)
(
val
uint64
,
err
error
)
{
func
(
f
Uint64Vector
)
IncInBatch
(
batch
*
leveldb
.
Batch
,
i
uint64
)
(
val
uint64
,
err
error
)
{
val
,
err
=
f
.
Get
(
i
)
val
,
err
=
f
.
Get
(
i
)
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
val
=
0
val
=
0
}
else
{
}
else
{
f
.
logger
.
Debugf
(
"error getiing value while doing IncInBatch. Error: %s"
,
err
.
Error
())
return
0
,
err
return
0
,
err
}
}
}
}
...
@@ -110,10 +107,9 @@ func (f Uint64Vector) IncInBatch(batch *leveldb.Batch, i uint64) (val uint64, er
...
@@ -110,10 +107,9 @@ func (f Uint64Vector) IncInBatch(batch *leveldb.Batch, i uint64) (val uint64, er
func
(
f
Uint64Vector
)
Dec
(
i
uint64
)
(
val
uint64
,
err
error
)
{
func
(
f
Uint64Vector
)
Dec
(
i
uint64
)
(
val
uint64
,
err
error
)
{
val
,
err
=
f
.
Get
(
i
)
val
,
err
=
f
.
Get
(
i
)
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
val
=
0
val
=
0
}
else
{
}
else
{
f
.
logger
.
Debugf
(
"error getiing value while doing Dec. Error: %s"
,
err
.
Error
())
return
0
,
err
return
0
,
err
}
}
}
}
...
@@ -130,10 +126,9 @@ func (f Uint64Vector) Dec(i uint64) (val uint64, err error) {
...
@@ -130,10 +126,9 @@ func (f Uint64Vector) Dec(i uint64) (val uint64, err error) {
func
(
f
Uint64Vector
)
DecInBatch
(
batch
*
leveldb
.
Batch
,
i
uint64
)
(
val
uint64
,
err
error
)
{
func
(
f
Uint64Vector
)
DecInBatch
(
batch
*
leveldb
.
Batch
,
i
uint64
)
(
val
uint64
,
err
error
)
{
val
,
err
=
f
.
Get
(
i
)
val
,
err
=
f
.
Get
(
i
)
if
err
!=
nil
{
if
err
!=
nil
{
if
err
==
leveldb
.
ErrNotFound
{
if
err
ors
.
Is
(
err
,
leveldb
.
ErrNotFound
)
{
val
=
0
val
=
0
}
else
{
}
else
{
f
.
logger
.
Debugf
(
"error getiing value while doing DecInBatch. Error: %s"
,
err
.
Error
())
return
0
,
err
return
0
,
err
}
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment