Commit 787130a2 authored by acud's avatar acud Committed by GitHub

addressbook: add support for persistence (#81)

* initial persistent addressbook
parent 8dd75ac0
......@@ -5,20 +5,142 @@
package addressbook
import (
"encoding/json"
"fmt"
"strings"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
const keyPrefix = "addressbook_entry_"
var _ GetPutter = (*store)(nil)
type GetPutter interface {
Getter
Putter
}
type Getter interface {
Get(overlay swarm.Address) (addr ma.Multiaddr, exists bool)
Get(overlay swarm.Address) (addr ma.Multiaddr, err error)
}
type Putter interface {
Put(overlay swarm.Address, addr ma.Multiaddr) (exists bool)
Put(overlay swarm.Address, addr ma.Multiaddr) (err error)
}
type store struct {
store storage.StateStorer
}
func New(storer storage.StateStorer) GetPutter {
return &store{
store: storer,
}
}
func (s *store) Get(overlay swarm.Address) (ma.Multiaddr, error) {
key := keyPrefix + overlay.String()
v := PeerEntry{}
err := s.store.Get(key, &v)
if err != nil {
return nil, err
}
return v.Multiaddr, nil
}
func (s *store) Put(overlay swarm.Address, addr ma.Multiaddr) (err error) {
key := keyPrefix + overlay.String()
pe := &PeerEntry{Overlay: overlay, Multiaddr: addr}
return s.store.Put(key, pe)
}
func (s *store) Overlays() (overlays []swarm.Address, err error) {
err = s.store.Iterate(keyPrefix, func(key, _ []byte) (stop bool, err error) {
k := string(key)
if !strings.HasPrefix(k, keyPrefix) {
return true, nil
}
split := strings.SplitAfter(k, keyPrefix)
if len(split) != 2 {
return true, fmt.Errorf("invalid overlay key: %s", k)
}
addr, err := swarm.ParseHexAddress(split[1])
if err != nil {
return true, err
}
overlays = append(overlays, addr)
return false, nil
})
if err != nil {
return nil, err
}
return overlays, nil
}
func (s *store) Multiaddresses() (multis []ma.Multiaddr, err error) {
err = s.store.Iterate(keyPrefix, func(_, value []byte) (stop bool, err error) {
entry := &PeerEntry{}
err = entry.UnmarshalJSON(value)
if err != nil {
return true, err
}
multis = append(multis, entry.Multiaddr)
return false, nil
})
if err != nil {
return nil, err
}
return multis, nil
}
type PeerEntry struct {
Overlay swarm.Address
Multiaddr ma.Multiaddr
}
func (p *PeerEntry) MarshalJSON() ([]byte, error) {
v := struct {
Overlay string
Multiaddr string
}{
Overlay: p.Overlay.String(),
Multiaddr: p.Multiaddr.String(),
}
return json.Marshal(&v)
}
func (p *PeerEntry) UnmarshalJSON(b []byte) error {
v := struct {
Overlay string
Multiaddr string
}{}
err := json.Unmarshal(b, &v)
if err != nil {
return err
}
a, err := swarm.ParseHexAddress(v.Overlay)
if err != nil {
return err
}
p.Overlay = a
m, err := ma.NewMultiaddr(v.Multiaddr)
if err != nil {
return err
}
p.Multiaddr = m
return nil
}
......@@ -2,37 +2,53 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package inmem
package addressbook_test
import (
"testing"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
func TestInMemStore(t *testing.T) {
mem := New()
type bookFunc func(t *testing.T) (book addressbook.GetPutter, cleanup func())
func TestInMem(t *testing.T) {
run(t, func(t *testing.T) (addressbook.GetPutter, func()) {
store := mock.NewStateStore()
book := addressbook.New(store)
return book, func() {}
})
}
func run(t *testing.T, f bookFunc) {
store, cleanup := f(t)
defer cleanup()
addr1 := swarm.NewAddress([]byte{0, 1, 2, 3})
addr2 := swarm.NewAddress([]byte{0, 1, 2, 4})
multiaddr, err := ma.NewMultiaddr("/ip4/1.1.1.1")
if err != nil {
t.Fatal(err)
}
//var beep ma.Multiaddr
exists := mem.Put(addr1, multiaddr)
if exists {
t.Fatal("object exists in store but shouldnt")
err = store.Put(addr1, multiaddr)
if err != nil {
t.Fatal(err)
}
_, exists = mem.Get(addr2)
if exists {
t.Fatal("value found in store but should not have been")
v, err := store.Get(addr1)
if err != nil {
t.Fatal(err)
}
v, exists := mem.Get(addr1)
if !exists {
t.Fatal("value not found in store but should have been")
_, err = store.Get(addr2)
if err == nil {
t.Fatal("value found in store but should not have been")
}
if multiaddr.String() != v.String() {
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package inmem
import (
"sync"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
type inmem struct {
mtx sync.Mutex
entries map[string]peerEntry // key: overlay in string value, value: peerEntry
}
type peerEntry struct {
overlay swarm.Address
multiaddr ma.Multiaddr
}
func New() addressbook.GetPutter {
return &inmem{
entries: make(map[string]peerEntry),
}
}
func (i *inmem) Get(overlay swarm.Address) (addr ma.Multiaddr, exists bool) {
i.mtx.Lock()
defer i.mtx.Unlock()
val, exists := i.entries[overlay.String()]
return val.multiaddr, exists
}
func (i *inmem) Put(overlay swarm.Address, addr ma.Multiaddr) (exists bool) {
i.mtx.Lock()
defer i.mtx.Unlock()
_, e := i.entries[overlay.String()]
i.entries[overlay.String()] = peerEntry{overlay: overlay, multiaddr: addr}
return e
}
func (i *inmem) Overlays() []swarm.Address {
i.mtx.Lock()
defer i.mtx.Unlock()
keys := make([]swarm.Address, 0, len(i.entries))
for k := range i.entries {
keys = append(keys, swarm.MustParseHexAddress(k))
}
return keys
}
func (i *inmem) Multiaddresses() []ma.Multiaddr {
i.mtx.Lock()
defer i.mtx.Unlock()
values := make([]ma.Multiaddr, 0, len(i.entries))
for _, v := range i.entries {
values = append(values, v.multiaddr)
}
return values
}
......@@ -12,10 +12,10 @@ import (
"testing"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/addressbook/inmem"
"github.com/ethersphere/bee/pkg/debugapi"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
mockstore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology/mock"
"github.com/multiformats/go-multiaddr"
......@@ -35,7 +35,8 @@ type testServer struct {
}
func newTestServer(t *testing.T, o testServerOptions) *testServer {
addressbook := inmem.New()
statestore := mockstore.NewStateStore()
addressbook := addressbook.New(statestore)
topologyDriver := mock.NewTopologyDriver()
s := debugapi.New(debugapi.Options{
......
......@@ -35,7 +35,13 @@ func (s *server) peerConnectHandler(w http.ResponseWriter, r *http.Request) {
return
}
s.Addressbook.Put(address, addr)
err = s.Addressbook.Put(address, addr)
if err != nil {
s.Logger.Debugf("debug api: addressbook.put %s: %v", addr, err)
s.Logger.Errorf("unable to persist peer %s", addr)
jsonhttp.InternalServerError(w, err)
return
}
if err := s.TopologyDriver.AddPeer(r.Context(), address); err != nil {
_ = s.P2P.Disconnect(address)
s.Logger.Debugf("debug api: topologyDriver.AddPeer %s: %v", addr, err)
......
......@@ -15,6 +15,7 @@ import (
"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
)
......@@ -40,8 +41,8 @@ func TestConnect(t *testing.T) {
Address: overlay.String(),
})
multia, exists := testServer.Addressbook.Get(overlay)
if exists != true && underlay != multia.String() {
multia, err := testServer.Addressbook.Get(overlay)
if err != nil && errors.Is(err, storage.ErrNotFound) && underlay != multia.String() {
t.Fatalf("found wrong underlay. expected: %s, found: %s", underlay, multia.String())
}
})
......@@ -81,8 +82,8 @@ func TestConnect(t *testing.T) {
Message: testErr.Error(),
})
multia, exists := testServer.Addressbook.Get(overlay)
if exists != true && underlay != multia.String() {
multia, err := testServer.Addressbook.Get(overlay)
if err != nil && errors.Is(err, storage.ErrNotFound) && underlay != multia.String() {
t.Fatalf("found wrong underlay. expected: %s, found: %s", underlay, multia.String())
}
......
......@@ -6,6 +6,7 @@ package hive
import (
"context"
"errors"
"fmt"
"time"
......@@ -14,6 +15,7 @@ import (
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
ma "github.com/multiformats/go-multiaddr"
......@@ -91,10 +93,13 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa
w, _ := protobuf.NewWriterAndReader(stream)
var peersRequest pb.Peers
for _, p := range peers {
addr, found := s.addressBook.Get(p)
if !found {
s.logger.Debugf("Peer not found %s", peer, err)
continue
addr, err := s.addressBook.Get(p)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
s.logger.Debugf("Peer not found %s", peer, err)
continue
}
return err
}
peersRequest.Peers = append(peersRequest.Peers, &pb.BzzAddress{
......@@ -126,7 +131,10 @@ func (s *Service) peersHandler(_ context.Context, peer p2p.Peer, stream p2p.Stre
continue
}
s.addressBook.Put(swarm.NewAddress(newPeer.Overlay), addr)
err = s.addressBook.Put(swarm.NewAddress(newPeer.Overlay), addr)
if err != nil {
return err
}
if s.peerHandler != nil {
if err := s.peerHandler(context.Background(), swarm.NewAddress(newPeer.Overlay)); err != nil {
return err
......
......@@ -18,24 +18,26 @@ import (
ma "github.com/multiformats/go-multiaddr"
"github.com/ethersphere/bee/pkg/addressbook/inmem"
book "github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/hive"
"github.com/ethersphere/bee/pkg/hive/pb"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/p2p/streamtest"
"github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
type AddressExporter interface {
Overlays() []swarm.Address
Multiaddresses() []ma.Multiaddr
Overlays() ([]swarm.Address, error)
Multiaddresses() ([]ma.Multiaddr, error)
}
func TestBroadcastPeers(t *testing.T) {
rand.Seed(time.Now().UnixNano())
logger := logging.New(ioutil.Discard, 0)
addressbook := inmem.New()
statestore := mock.NewStateStore()
addressbook := book.New(statestore)
// populate all expected and needed random resources for 2 full batches
// tests cases that uses fewer resources can use sub-slices of this data
......@@ -55,7 +57,10 @@ func TestBroadcastPeers(t *testing.T) {
multiaddrs = append(multiaddrs, ma)
addrs = append(addrs, swarm.NewAddress(createRandomBytes()))
addressbook.Put(addrs[i], multiaddrs[i])
err = addressbook.Put(addrs[i], multiaddrs[i])
if err != nil {
t.Fatal(err)
}
wantMsgs[i/hive.MaxBatchSize].Peers = append(wantMsgs[i/hive.MaxBatchSize].Peers, &pb.BzzAddress{Overlay: addrs[i].Bytes(), Underlay: multiaddrs[i].String()})
}
......@@ -105,7 +110,8 @@ func TestBroadcastPeers(t *testing.T) {
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
addressbookclean := inmem.New()
addressbookclean := book.New(mock.NewStateStore())
exporter, ok := addressbookclean.(AddressExporter)
if !ok {
t.Fatal("could not type assert AddressExporter")
......@@ -161,11 +167,14 @@ func TestBroadcastPeers(t *testing.T) {
}
func expectOverlaysEventually(t *testing.T, exporter AddressExporter, wantOverlays []swarm.Address) {
for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
var stringOverlays []string
var stringWantOverlays []string
for _, k := range exporter.Overlays() {
o, err := exporter.Overlays()
if err != nil {
t.Fatal(err)
}
for _, k := range o {
stringOverlays = append(stringOverlays, k.String())
}
......@@ -182,13 +191,22 @@ func expectOverlaysEventually(t *testing.T, exporter AddressExporter, wantOverla
time.Sleep(50 * time.Millisecond)
}
t.Errorf("Overlays got %v, want %v", exporter.Overlays(), wantOverlays)
o, err := exporter.Overlays()
if err != nil {
t.Fatal(err)
}
t.Errorf("Overlays got %v, want %v", o, wantOverlays)
}
func expectMultiaddresessEventually(t *testing.T, exporter AddressExporter, wantMultiaddresses []ma.Multiaddr) {
for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
var stringMultiaddresses []string
for _, v := range exporter.Multiaddresses() {
m, err := exporter.Multiaddresses()
if err != nil {
t.Fatal(err)
}
for _, v := range m {
stringMultiaddresses = append(stringMultiaddresses, v.String())
}
......@@ -206,7 +224,12 @@ func expectMultiaddresessEventually(t *testing.T, exporter AddressExporter, want
time.Sleep(50 * time.Millisecond)
}
t.Errorf("Multiaddresses got %v, want %v", exporter.Multiaddresses(), wantMultiaddresses)
m, err := exporter.Multiaddresses()
if err != nil {
t.Fatal(err)
}
t.Errorf("Multiaddresses got %v, want %v", m, wantMultiaddresses)
}
func readAndAssertPeersMsgs(in []byte, expectedLen int) ([]pb.Peers, error) {
......
......@@ -17,7 +17,7 @@ import (
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"github.com/ethersphere/bee/pkg/addressbook/inmem"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/api"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/debugapi"
......@@ -29,6 +29,9 @@ import (
"github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/statestore/leveldb"
mockinmem "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/topology/full"
"github.com/ethersphere/bee/pkg/tracing"
......@@ -36,12 +39,13 @@ import (
)
type Bee struct {
p2pService io.Closer
p2pCancel context.CancelFunc
apiServer *http.Server
debugAPIServer *http.Server
errorLogWriter *io.PipeWriter
tracerCloser io.Closer
p2pService io.Closer
p2pCancel context.CancelFunc
apiServer *http.Server
debugAPIServer *http.Server
errorLogWriter *io.PipeWriter
tracerCloser io.Closer
stateStoreCloser io.Closer
}
type Options struct {
......@@ -62,7 +66,6 @@ type Options struct {
func NewBee(o Options) (*Bee, error) {
logger := o.Logger
addressbook := inmem.New()
tracer, tracerCloser, err := tracing.NewTracer(&tracing.Options{
Enabled: o.TracingEnabled,
......@@ -109,6 +112,19 @@ func NewBee(o Options) (*Bee, error) {
logger.Infof("new libp2p key created")
}
var stateStore storage.StateStorer
if o.DataDir == "" {
stateStore = mockinmem.NewStateStore()
logger.Warning("using in-mem state store. no node state will be persisted")
} else {
stateStore, err = leveldb.NewStateStore(filepath.Join(o.DataDir, "statestore"))
if err != nil {
return nil, fmt.Errorf("statestore: %w", err)
}
}
b.stateStoreCloser = stateStore
addressbook := addressbook.New(stateStore)
p2ps, err := libp2p.New(p2pCtx, libp2p.Options{
PrivateKey: libp2pPrivateKey,
Overlay: address,
......@@ -253,7 +269,15 @@ func NewBee(o Options) (*Bee, error) {
return
}
addressbook.Put(overlay, addr)
err = addressbook.Put(overlay, addr)
if err != nil {
_ = p2ps.Disconnect(overlay)
logger.Debugf("addressboook error persisting %s %s: %v", aa, overlay, err)
logger.Errorf("persisting node %s", aa)
return
}
if err := topologyDriver.AddPeer(p2pCtx, overlay); err != nil {
_ = p2ps.Disconnect(overlay)
logger.Debugf("topology add peer fail %s %s: %v", aa, overlay, err)
......@@ -298,5 +322,9 @@ func (b *Bee) Shutdown(ctx context.Context) error {
return fmt.Errorf("tracer: %w", err)
}
if err := b.stateStoreCloser.Close(); err != nil {
return fmt.Errorf("statestore: %w", err)
}
return b.errorLogWriter.Close()
}
......@@ -204,7 +204,14 @@ func New(ctx context.Context, o Options) (*Service, error) {
return
}
s.addrssbook.Put(i.Address, remoteMultiaddr)
err = s.addrssbook.Put(i.Address, remoteMultiaddr)
if err != nil {
s.logger.Debugf("handshake: addressbook put error %s: %v", peerID, err)
s.logger.Errorf("unable to persist peer %v", peerID)
_ = s.disconnect(peerID)
return
}
if s.peerHandler != nil {
if err := s.peerHandler(ctx, i.Address); err != nil {
s.logger.Debugf("peerhandler error: %s: %v", peerID, err)
......
......@@ -12,11 +12,12 @@ import (
"testing"
"time"
"github.com/ethersphere/bee/pkg/addressbook/inmem"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/multiformats/go-multiaddr"
)
......@@ -51,7 +52,8 @@ func newService(t *testing.T, o libp2p.Options) (s *libp2p.Service, overlay swar
}
if o.Addressbook == nil {
o.Addressbook = inmem.New()
statestore := mock.NewStateStore()
o.Addressbook = addressbook.New(statestore)
}
ctx, cancel := context.WithCancel(context.Background())
......
......@@ -2,41 +2,42 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.package storage
package statestore
package leveldb
import (
"encoding"
"encoding/json"
"errors"
"github.com/ethersphere/bee/pkg/storage"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)
var _ storage.StateStorer = (*Store)(nil)
var _ storage.StateStorer = (*store)(nil)
// Store uses LevelDB to store values.
type Store struct {
type store struct {
db *leveldb.DB
}
// New creates a new persistent state storage.
func New(path string) (storage.StateStorer, error) {
func NewStateStore(path string) (storage.StateStorer, error) {
db, err := leveldb.OpenFile(path, nil)
if err != nil {
return nil, err
}
return &Store{
return &store{
db: db,
}, nil
}
// Get retrieves a value of the requested key. If no results are found,
// storage.ErrNotFound will be returned.
func (s *Store) Get(key string, i interface{}) error {
func (s *store) Get(key string, i interface{}) error {
data, err := s.db.Get([]byte(key), nil)
if err != nil {
if err == leveldb.ErrNotFound {
if errors.Is(err, leveldb.ErrNotFound) {
return storage.ErrNotFound
}
return err
......@@ -52,7 +53,7 @@ func (s *Store) Get(key string, i interface{}) error {
// Put stores a value for an arbitrary key. BinaryMarshaler
// interface method will be called on the provided value
// with fallback to JSON serialization.
func (s *Store) Put(key string, i interface{}) (err error) {
func (s *store) Put(key string, i interface{}) (err error) {
var bytes []byte
if marshaler, ok := i.(encoding.BinaryMarshaler); ok {
if bytes, err = marshaler.MarshalBinary(); err != nil {
......@@ -61,16 +62,17 @@ func (s *Store) Put(key string, i interface{}) (err error) {
} else if bytes, err = json.Marshal(i); err != nil {
return err
}
return s.db.Put([]byte(key), bytes, nil)
}
// Delete removes entries stored under a specific key.
func (s *Store) Delete(key string) (err error) {
func (s *store) Delete(key string) (err error) {
return s.db.Delete([]byte(key), nil)
}
// Iterate entries that match the supplied prefix.
func (s *Store) Iterate(prefix string, iterFunc storage.StateIterFunc) (err error) {
func (s *store) Iterate(prefix string, iterFunc storage.StateIterFunc) (err error) {
iter := s.db.NewIterator(util.BytesPrefix([]byte(prefix)), nil)
defer iter.Release()
for iter.Next() {
......@@ -86,6 +88,6 @@ func (s *Store) Iterate(prefix string, iterFunc storage.StateIterFunc) (err erro
}
// Close releases the resources used by the store.
func (s *Store) Close() error {
func (s *store) Close() error {
return s.db.Close()
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.package storage
package leveldb_test
import (
"io/ioutil"
"os"
"testing"
"github.com/ethersphere/bee/pkg/statestore/leveldb"
"github.com/ethersphere/bee/pkg/statestore/test"
"github.com/ethersphere/bee/pkg/storage"
)
func TestPersistentStateStore(t *testing.T) {
test.Run(t, func(t *testing.T) (storage.StateStorer, func()) {
dir, err := ioutil.TempDir("", "statestore_test")
if err != nil {
t.Fatal(err)
}
store, err := leveldb.NewStateStore(dir)
if err != nil {
t.Fatal(err)
}
return store, func() { os.RemoveAll(dir) }
})
test.RunPersist(t, func(t *testing.T, dir string) storage.StateStorer {
store, err := leveldb.NewStateStore(dir)
if err != nil {
t.Fatal(err)
}
return store
})
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.package storage
package mock
import (
"encoding"
"encoding/json"
"strings"
"sync"
"github.com/ethersphere/bee/pkg/storage"
)
var _ storage.StateStorer = (*store)(nil)
type store struct {
store map[string][]byte
mtx sync.Mutex
}
func NewStateStore() storage.StateStorer {
return &store{
store: make(map[string][]byte),
}
}
func (s *store) Get(key string, i interface{}) (err error) {
s.mtx.Lock()
defer s.mtx.Unlock()
data, ok := s.store[key]
if !ok {
return storage.ErrNotFound
}
if unmarshaler, ok := i.(encoding.BinaryUnmarshaler); ok {
return unmarshaler.UnmarshalBinary(data)
}
return json.Unmarshal(data, i)
}
func (s *store) Put(key string, i interface{}) (err error) {
s.mtx.Lock()
defer s.mtx.Unlock()
var bytes []byte
if marshaler, ok := i.(encoding.BinaryMarshaler); ok {
if bytes, err = marshaler.MarshalBinary(); err != nil {
return err
}
} else if bytes, err = json.Marshal(i); err != nil {
return err
}
s.store[key] = bytes
return nil
}
func (s *store) Delete(key string) (err error) {
s.mtx.Lock()
defer s.mtx.Unlock()
delete(s.store, key)
return nil
}
func (s *store) Iterate(prefix string, iterFunc storage.StateIterFunc) (err error) {
s.mtx.Lock()
defer s.mtx.Unlock()
for k, v := range s.store {
if !strings.HasPrefix(k, prefix) {
continue
}
val := make([]byte, len(v))
copy(val, v)
stop, err := iterFunc([]byte(k), val)
if err != nil {
return err
}
if stop {
return nil
}
}
return nil
}
func (s *store) Close() (err error) {
return nil
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.package storage
package mock_test
import (
"testing"
"github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/statestore/test"
"github.com/ethersphere/bee/pkg/storage"
)
func TestMockStateStore(t *testing.T) {
test.Run(t, func(t *testing.T) (storage.StateStorer, func()) {
return mock.NewStateStore(), func() {}
})
}
......@@ -5,16 +5,25 @@
package test
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"reflect"
"strings"
"testing"
"github.com/ethersphere/bee/pkg/statestore"
"github.com/ethersphere/bee/pkg/storage"
)
const (
key1 = "key1" // stores the serialized type
key2 = "key2" // stores a json array
)
var (
value1 = &Serializing{value: "value1"}
value2 = []string{"a", "b", "c"}
)
type Serializing struct {
value string
marshalCalled bool
......@@ -34,50 +43,108 @@ func (st *Serializing) UnmarshalBinary(data []byte) (err error) {
return nil
}
func TestStore(t *testing.T) {
// RunPersist is a specific test case for the persistent state store.
// It tests that values persist across sessions.
func RunPersist(t *testing.T, f func(t *testing.T, dir string) storage.StateStorer) {
dir, err := ioutil.TempDir("", "statestore_test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
const (
key1 = "key1" // stores the serialized type
key2 = "key2" // stores a json array
)
store := f(t, dir)
var (
value1 = &Serializing{value: "value1"}
value2 = []string{"a", "b", "c"}
)
// insert some values
insert(t, store, "some_prefix", 1000)
// create a new persisted store
store, err := statestore.New(dir)
if err != nil {
t.Fatal(err)
}
// test that the iterator works
testStoreIterator(t, store, "some_prefix", 1000)
// close the store
store.Close()
// bootstrap with the same old dir
persistedStore := f(t, dir)
defer persistedStore.Close()
// test that the iterator works
testStoreIterator(t, persistedStore, "some_prefix", 1000)
// insert some more random entries
insert(t, persistedStore, "some_other_prefix", 1000)
// check again
testStoreIterator(t, persistedStore, "some_other_prefix", 1000)
}
func Run(t *testing.T, f func(t *testing.T) (storage.StateStorer, func())) {
t.Helper()
t.Run("test_put_get", func(t *testing.T) { testPutGet(t, f) })
t.Run("test_delete", func(t *testing.T) { testDelete(t, f) })
t.Run("test_iterator", func(t *testing.T) { testIterator(t, f) })
}
func testDelete(t *testing.T, f func(t *testing.T) (storage.StateStorer, func())) {
t.Helper()
// create a store
store, cleanup := f(t)
defer store.Close()
defer cleanup()
// insert some values
insertValues(t, store, key1, key2, value1, value2)
// close the persisted store
store.Close()
// check that the persisted values match
testPersistedValues(t, store, key1, key2, value1, value2)
// bootstrap a new store with the persisted data
persistedStore, err := statestore.New(dir)
err := store.Delete(key1)
if err != nil {
t.Fatal(err)
}
defer persistedStore.Close()
err = store.Delete(key2)
if err != nil {
t.Fatal(err)
}
// check that the store is empty
testEmpty(t, store)
}
func testPutGet(t *testing.T, f func(t *testing.T) (storage.StateStorer, func())) {
t.Helper()
// create a store
store, cleanup := f(t)
defer store.Close()
defer cleanup()
// insert some values
insertValues(t, store, key1, key2, value1, value2)
// check that the persisted values match
testPersistedValues(t, persistedStore, key1, key2, value1, value2)
testPersistedValues(t, store, key1, key2, value1, value2)
}
func testIterator(t *testing.T, f func(t *testing.T) (storage.StateStorer, func())) {
t.Helper()
// create a store
store, cleanup := f(t)
defer store.Close()
defer cleanup()
// insert some values
insert(t, store, "some_prefix", 1000)
// test that the iterator works
testStoreIterator(t, persistedStore)
testStoreIterator(t, store, "some_prefix", 1000)
testStoreIterator(t, store, "no_prefix", 0)
}
func insertValues(t *testing.T, store storage.StateStorer, key1, key2 string, value1 *Serializing, value2 []string) {
t.Helper()
err := store.Put(key1, value1)
if err != nil {
t.Fatal(err)
......@@ -93,7 +160,22 @@ func insertValues(t *testing.T, store storage.StateStorer, key1, key2 string, va
}
}
func insert(t *testing.T, store storage.StateStorer, prefix string, count int) {
t.Helper()
for i := 0; i < count; i++ {
k := prefix + string(i)
err := store.Put(k, i)
if err != nil {
t.Fatal(err)
}
}
}
func testPersistedValues(t *testing.T, store storage.StateStorer, key1, key2 string, value1 *Serializing, value2 []string) {
t.Helper()
v := &Serializing{}
err := store.Get(key1, v)
if err != nil {
......@@ -121,44 +203,31 @@ func testPersistedValues(t *testing.T, store storage.StateStorer, key1, key2 str
}
}
func testStoreIterator(t *testing.T, store storage.StateStorer) {
storePrefix := "test_"
err := store.Put(storePrefix+"key1", "value1")
if err != nil {
t.Fatal(err)
}
// do not include prefix in one of the entries
err = store.Put("key2", "value2")
if err != nil {
t.Fatal(err)
}
err = store.Put(storePrefix+"key3", "value3")
if err != nil {
t.Fatal(err)
}
entries := make(map[string]string)
func testStoreIterator(t *testing.T, store storage.StateStorer, prefix string, size int) {
t.Helper()
matching := 0
entriesIterFunction := func(key []byte, value []byte) (stop bool, err error) {
var entry string
err = json.Unmarshal(value, &entry)
if err != nil {
t.Fatal(err)
k := string(key)
if !strings.HasPrefix(k, prefix) {
return true, fmt.Errorf("iterator called callback with wrong key prefix. key: %s expected prefix: %s", k, prefix)
}
entries[string(key)] = entry
return stop, err
matching++
return false, nil
}
err = store.Iterate(storePrefix, entriesIterFunction)
err := store.Iterate(prefix, entriesIterFunction)
if err != nil {
t.Fatal(err)
}
expectedEntries := map[string]string{"test_key1": "value1", "test_key3": "value3"}
if !reflect.DeepEqual(entries, expectedEntries) {
t.Fatalf("expected store entries to be %v, are %v instead", expectedEntries, entries)
if matching != size {
t.Fatalf("entry number mismatch. want %d got %d", size, matching)
}
}
func testEmpty(t *testing.T, store storage.StateStorer) {
t.Helper()
testStoreIterator(t, store, "", 0)
}
......@@ -6,6 +6,7 @@ package full
import (
"context"
"errors"
"math/rand"
"sync"
"time"
......@@ -14,6 +15,7 @@ import (
"github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
)
......@@ -61,9 +63,12 @@ func (d *Driver) AddPeer(ctx context.Context, addr swarm.Address) error {
d.mtx.Unlock()
connectedPeers := d.p2pService.Peers()
ma, exists := d.addressBook.Get(addr)
if !exists {
return topology.ErrNotFound
ma, err := d.addressBook.Get(addr)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return topology.ErrNotFound
}
return err
}
if !isConnected(addr, connectedPeers) {
......@@ -75,7 +80,10 @@ func (d *Driver) AddPeer(ctx context.Context, addr swarm.Address) error {
// update addr if it is wrong or it has been changed
if !addr.Equal(peerAddr) {
addr = peerAddr
d.addressBook.Put(peerAddr, ma)
err := d.addressBook.Put(peerAddr, ma)
if err != nil {
return err
}
}
}
......
......@@ -11,15 +11,15 @@ import (
"io/ioutil"
"testing"
"github.com/ethersphere/bee/pkg/addressbook/inmem"
"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/discovery/mock"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
p2pmock "github.com/ethersphere/bee/pkg/p2p/mock"
mockstate "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/topology/full"
ma "github.com/multiformats/go-multiaddr"
)
......@@ -41,7 +41,9 @@ func TestAddPeer(t *testing.T) {
t.Run("OK - no connected peers", func(t *testing.T) {
discovery := mock.NewDiscovery()
addressbook := inmem.New()
statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore)
p2p := p2pmock.New(p2pmock.WithConnectFunc(func(_ context.Context, addr ma.Multiaddr) (swarm.Address, error) {
if addr.String() != underlay {
t.Fatalf("expected multiaddr %s, got %s", addr.String(), underlay)
......@@ -49,13 +51,17 @@ func TestAddPeer(t *testing.T) {
return overlay, nil
}))
fullDriver := full.New(discovery, addressbook, p2p, logger)
fullDriver := full.New(discovery, ab, p2p, logger)
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal(err)
}
addressbook.Put(overlay, multiaddr)
err = ab.Put(overlay, multiaddr)
if err != nil {
t.Fatal(err)
}
err = fullDriver.AddPeer(context.Background(), overlay)
if err != nil {
t.Fatalf("full conn driver returned err %s", err.Error())
......@@ -68,13 +74,14 @@ func TestAddPeer(t *testing.T) {
t.Run("ERROR - peer not added", func(t *testing.T) {
discovery := mock.NewDiscovery()
addressbook := inmem.New()
statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore)
p2p := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) {
t.Fatal("should not be called")
return swarm.Address{}, nil
}))
fullDriver := full.New(discovery, addressbook, p2p, logger)
fullDriver := full.New(discovery, ab, p2p, logger)
err := fullDriver.AddPeer(context.Background(), overlay)
if !errors.Is(err, topology.ErrNotFound) {
t.Fatalf("full conn driver returned err %v", err)
......@@ -87,7 +94,8 @@ func TestAddPeer(t *testing.T) {
t.Run("OK - connected peers - peer already connected", func(t *testing.T) {
discovery := mock.NewDiscovery()
addressbook := inmem.New()
statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore)
alreadyConnected := connectedPeers[0].Address
p2p := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) {
......@@ -97,13 +105,17 @@ func TestAddPeer(t *testing.T) {
return connectedPeers
}))
fullDriver := full.New(discovery, addressbook, p2p, logger)
fullDriver := full.New(discovery, ab, p2p, logger)
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal("error creating multiaddr")
}
addressbook.Put(alreadyConnected, multiaddr)
err = ab.Put(alreadyConnected, multiaddr)
if err != nil {
t.Fatal(err)
}
err = fullDriver.AddPeer(context.Background(), alreadyConnected)
if err != nil {
t.Fatalf("full conn driver returned err %s", err.Error())
......@@ -128,7 +140,8 @@ func TestAddPeer(t *testing.T) {
t.Run("OK - connected peers - peer not already connected", func(t *testing.T) {
discovery := mock.NewDiscovery()
addressbook := inmem.New()
statestore := mockstate.NewStateStore()
ab := addressbook.New(statestore)
p2ps := p2pmock.New(p2pmock.WithConnectFunc(func(ctx context.Context, addr ma.Multiaddr) (swarm.Address, error) {
if addr.String() != underlay {
......@@ -139,13 +152,17 @@ func TestAddPeer(t *testing.T) {
return connectedPeers
}))
fullDriver := full.New(discovery, addressbook, p2ps, logger)
fullDriver := full.New(discovery, ab, p2ps, logger)
multiaddr, err := ma.NewMultiaddr(underlay)
if err != nil {
t.Fatal(err)
}
addressbook.Put(overlay, multiaddr)
err = ab.Put(overlay, multiaddr)
if err != nil {
t.Fatal(err)
}
err = fullDriver.AddPeer(context.Background(), overlay)
if err != nil {
t.Fatalf("full conn driver returned err %s", err.Error())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment