Commit bc67a6bd authored by acud's avatar acud Committed by GitHub

Integrate localstore to bee (#94)

* integrate bee to use localstore
parent 0751607e
......@@ -38,7 +38,7 @@ func (s *server) chunkUploadHandler(w http.ResponseWriter, r *http.Request) {
}
err = s.Storer.Put(ctx, address, data)
_, err = s.Storer.Put(ctx, storage.ModePutUpload, swarm.NewChunk(address, data))
if err != nil {
s.Logger.Debugf("bzz-chunk: chunk write error: %v, addr %s", err, address)
s.Logger.Error("bzz-chunk: chunk write error")
......@@ -61,7 +61,7 @@ func (s *server) chunkGetHandler(w http.ResponseWriter, r *http.Request) {
return
}
data, err := s.Storer.Get(ctx, address)
chunk, err := s.Storer.Get(ctx, storage.ModeGetRequest, address)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
s.Logger.Trace("bzz-chunk: chunk not found. addr %s", address)
......@@ -75,5 +75,5 @@ func (s *server) chunkGetHandler(w http.ResponseWriter, r *http.Request) {
return
}
w.Header().Set("Content-Type", "binary/octet-stream")
_, _ = io.Copy(w, bytes.NewReader(data))
_, _ = io.Copy(w, bytes.NewReader(chunk.Data()))
}
......@@ -32,8 +32,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
// DB implements chunk.Store.
var _ storage.Store = &DB{}
var _ storage.Storer = &DB{}
var (
// ErrInvalidMode is retuned when an unknown Mode
......@@ -104,6 +103,7 @@ type DB struct {
// are done before closing the database
updateGCWG sync.WaitGroup
// baseKey is the overlay address
baseKey []byte
batchMu sync.Mutex
......
......@@ -25,6 +25,7 @@ import (
"github.com/ethersphere/bee/pkg/keystore"
filekeystore "github.com/ethersphere/bee/pkg/keystore/file"
memkeystore "github.com/ethersphere/bee/pkg/keystore/mem"
"github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
......@@ -32,7 +33,6 @@ import (
"github.com/ethersphere/bee/pkg/statestore/leveldb"
mockinmem "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/topology/full"
"github.com/ethersphere/bee/pkg/tracing"
ma "github.com/multiformats/go-multiaddr"
......@@ -46,6 +46,7 @@ type Bee struct {
errorLogWriter *io.PipeWriter
tracerCloser io.Closer
stateStoreCloser io.Closer
localstoreCloser io.Closer
}
type Options struct {
......@@ -174,8 +175,16 @@ func NewBee(o Options) (*Bee, error) {
logger.Infof("p2p address: %s", addr)
}
// for now, storer is an in-memory store.
storer := mock.NewStorer()
var storer storage.Storer
if o.DataDir == "" {
// TODO: this needs to support in-mem localstore implementation somehow
} else {
storer, err = localstore.New(filepath.Join(o.DataDir, "localstore"), address.Bytes(), nil, logger)
if err != nil {
return nil, fmt.Errorf("localstore: %w", err)
}
}
b.localstoreCloser = storer
var apiService api.Service
if o.APIAddr != "" {
......@@ -326,5 +335,9 @@ func (b *Bee) Shutdown(ctx context.Context) error {
return fmt.Errorf("statestore: %w", err)
}
if err := b.localstoreCloser.Close(); err != nil {
return fmt.Errorf("localstore: %w", err)
}
return b.errorLogWriter.Close()
}
......@@ -97,13 +97,13 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) er
return err
}
data, err := s.storer.Get(context.TODO(), swarm.NewAddress(req.Addr))
chunk, err := s.storer.Get(ctx, storage.ModeGetRequest, swarm.NewAddress(req.Addr))
if err != nil {
return err
}
if err := w.WriteMsgWithContext(ctx, &pb.Delivery{
Data: data,
Data: chunk.Data(),
}); err != nil {
return err
}
......
......@@ -17,6 +17,7 @@ import (
"github.com/ethersphere/bee/pkg/p2p/streamtest"
"github.com/ethersphere/bee/pkg/retrieval"
pb "github.com/ethersphere/bee/pkg/retrieval/pb"
"github.com/ethersphere/bee/pkg/storage"
storemock "github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
......@@ -35,7 +36,10 @@ func TestDelivery(t *testing.T) {
reqData := []byte("data data data")
// put testdata in the mock store of the server
_ = mockStorer.Put(context.TODO(), reqAddr, reqData)
_, err = mockStorer.Put(context.Background(), storage.ModePutUpload, swarm.NewChunk(reqAddr, reqData))
if err != nil {
t.Fatal(err)
}
// create the server that will handle the request and will serve the response
server := retrieval.New(retrieval.Options{
......
......@@ -28,21 +28,50 @@ func NewValidatingStorer(f storage.ChunkValidatorFunc) storage.Storer {
validator: f,
}
}
func (m *mockStorer) Get(ctx context.Context, addr swarm.Address) (data []byte, err error) {
func (m *mockStorer) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (ch swarm.Chunk, err error) {
v, has := m.store[addr.String()]
if !has {
return nil, storage.ErrNotFound
}
return v, nil
return swarm.NewChunk(addr, v), nil
}
func (m *mockStorer) Put(ctx context.Context, addr swarm.Address, data []byte) error {
if m.validator != nil {
if !m.validator(addr, data) {
return storage.ErrInvalidChunk
func (m *mockStorer) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
for _, ch := range chs {
if m.validator != nil {
if !m.validator(ch.Address(), ch.Data()) {
return nil, storage.ErrInvalidChunk
}
}
m.store[ch.Address().String()] = ch.Data()
}
m.store[addr.String()] = data
return nil
return nil, nil
}
func (m *mockStorer) GetMulti(ctx context.Context, mode storage.ModeGet, addrs ...swarm.Address) (ch []swarm.Chunk, err error) {
panic("not implemented") // TODO: Implement
}
func (m *mockStorer) Has(ctx context.Context, addr swarm.Address) (yes bool, err error) {
panic("not implemented") // TODO: Implement
}
func (m *mockStorer) HasMulti(ctx context.Context, addrs ...swarm.Address) (yes []bool, err error) {
panic("not implemented") // TODO: Implement
}
func (m *mockStorer) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) (err error) {
panic("not implemented") // TODO: Implement
}
func (m *mockStorer) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
panic("not implemented") // TODO: Implement
}
func (m *mockStorer) SubscribePull(ctx context.Context, bin uint8, since uint64, until uint64) (c <-chan storage.Descriptor, stop func()) {
panic("not implemented") // TODO: Implement
}
func (m *mockStorer) Close() error {
panic("not implemented") // TODO: Implement
}
......@@ -25,24 +25,24 @@ func TestMockStorer(t *testing.T) {
valueFound := []byte("data data data")
ctx := context.Background()
if _, err := s.Get(ctx, keyFound); err != storage.ErrNotFound {
if _, err := s.Get(ctx, storage.ModeGetRequest, keyFound); err != storage.ErrNotFound {
t.Fatalf("expected ErrNotFound, got %v", err)
}
if _, err := s.Get(ctx, keyNotFound); err != storage.ErrNotFound {
if _, err := s.Get(ctx, storage.ModeGetRequest, keyNotFound); err != storage.ErrNotFound {
t.Fatalf("expected ErrNotFound, got %v", err)
}
if err := s.Put(ctx, keyFound, valueFound); err != nil {
if _, err := s.Put(ctx, storage.ModePutUpload, swarm.NewChunk(keyFound, valueFound)); err != nil {
t.Fatalf("expected not error but got: %v", err)
}
if data, err := s.Get(ctx, keyFound); err != nil {
if chunk, err := s.Get(ctx, storage.ModeGetRequest, keyFound); err != nil {
t.Fatalf("expected not error but got: %v", err)
} else {
if !bytes.Equal(data, valueFound) {
t.Fatalf("expected value %s but got %s", valueFound, data)
if !bytes.Equal(chunk.Data(), valueFound) {
t.Fatalf("expected value %s but got %s", valueFound, chunk.Data())
}
}
}
......@@ -77,27 +77,27 @@ func TestMockValidatingStorer(t *testing.T) {
ctx := context.Background()
if err := s.Put(ctx, keyValid, validContent); err != nil {
if _, err := s.Put(ctx, storage.ModePutUpload, swarm.NewChunk(keyValid, validContent)); err != nil {
t.Fatalf("expected not error but got: %v", err)
}
if err := s.Put(ctx, keyInvalid, validContent); err == nil {
if _, err := s.Put(ctx, storage.ModePutUpload, swarm.NewChunk(keyInvalid, validContent)); err == nil {
t.Fatalf("expected error but got none")
}
if err := s.Put(ctx, keyInvalid, invalidContent); err == nil {
if _, err := s.Put(ctx, storage.ModePutUpload, swarm.NewChunk(keyInvalid, invalidContent)); err == nil {
t.Fatalf("expected error but got none")
}
if data, err := s.Get(ctx, keyValid); err != nil {
if chunk, err := s.Get(ctx, storage.ModeGetRequest, keyValid); err != nil {
t.Fatalf("got error on get but expected none: %v", err)
} else {
if !bytes.Equal(data, validContent) {
if !bytes.Equal(chunk.Data(), validContent) {
t.Fatal("stored content not identical to input data")
}
}
if _, err := s.Get(ctx, keyInvalid); err == nil {
if _, err := s.Get(ctx, storage.ModeGetRequest, keyInvalid); err == nil {
t.Fatal("got no error on get but expected one")
}
......
......@@ -130,11 +130,6 @@ func (d *Descriptor) String() string {
}
type Storer interface {
Get(ctx context.Context, addr swarm.Address) (data []byte, err error)
Put(ctx context.Context, addr swarm.Address, data []byte) (err error)
}
type Store interface {
Get(ctx context.Context, mode ModeGet, addr swarm.Address) (ch swarm.Chunk, err error)
GetMulti(ctx context.Context, mode ModeGet, addrs ...swarm.Address) (ch []swarm.Chunk, err error)
Put(ctx context.Context, mode ModePut, chs ...swarm.Chunk) (exist []bool, err error)
......@@ -153,7 +148,7 @@ type StateStorer interface {
Put(key string, i interface{}) (err error)
Delete(key string) (err error)
Iterate(prefix string, iterFunc StateIterFunc) (err error)
Close() (err error)
io.Closer
}
// StateIterFunc is used when iterating through StateStorer key/value pairs
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment