Commit 052f7515 authored by acud's avatar acud Committed by GitHub

[bee #48, #49, #40]: introduce netstore (#104)

* netstore: introduce netstore
parent 7459b900
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package netstore
import (
"context"
"errors"
"github.com/ethersphere/bee/pkg/retrieval"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
type store struct {
storage.Storer
retrieval retrieval.Interface
validators []swarm.ChunkValidator
}
// New returns a new NetStore that wraps a given Storer.
func New(s storage.Storer, r retrieval.Interface, validators ...swarm.ChunkValidator) storage.Storer {
return &store{Storer: s, retrieval: r, validators: validators}
}
// Get retrieves a given chunk address.
// It will request a chunk from the network whenever it cannot be found locally.
func (s *store) Get(ctx context.Context, mode storage.ModeGet, addr swarm.Address) (ch swarm.Chunk, err error) {
ch, err = s.Storer.Get(ctx, mode, addr)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
// request from network
data, err := s.retrieval.RetrieveChunk(ctx, addr)
if err != nil {
return nil, err
}
ch = swarm.NewChunk(addr, data)
if err != nil {
return nil, err
}
if !s.valid(ch) {
return nil, storage.ErrInvalidChunk
}
_, err = s.Storer.Put(ctx, storage.ModePutRequest, ch)
if err != nil {
return nil, err
}
return ch, nil
}
return nil, err
}
return ch, nil
}
// Put stores a given chunk in the local storage.
// returns a storage.ErrInvalidChunk error when
// encountering an invalid chunk.
func (s *store) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
for _, ch := range chs {
if !s.valid(ch) {
return nil, storage.ErrInvalidChunk
}
}
return s.Storer.Put(ctx, mode, chs...)
}
// checks if a particular chunk is valid using the built in validators
func (s *store) valid(ch swarm.Chunk) (ok bool) {
for _, v := range s.validators {
if ok = v.Validate(ch); ok {
return true
}
}
return false
}
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package netstore_test
import (
"bytes"
"context"
"sync/atomic"
"testing"
"github.com/ethersphere/bee/pkg/netstore"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
var chunkData = []byte("mockdata")
type mockValidator struct{}
func (_ mockValidator) Validate(_ swarm.Chunk) bool { return true }
// TestNetstoreRetrieval verifies that a chunk is asked from the network whenever
// it is not found locally
func TestNetstoreRetrieval(t *testing.T) {
retrieve, store, nstore := newRetrievingNetstore()
addr := swarm.MustParseHexAddress("000001")
_, err := nstore.Get(context.Background(), storage.ModeGetRequest, addr)
if err != nil {
t.Fatal(err)
}
if !retrieve.called {
t.Fatal("retrieve request not issued")
}
if retrieve.callCount != 1 {
t.Fatalf("call count %d", retrieve.callCount)
}
if !retrieve.addr.Equal(addr) {
t.Fatalf("addresses not equal. got %s want %s", retrieve.addr, addr)
}
// store should have the chunk now
d, err := store.Get(context.Background(), storage.ModeGetRequest, addr)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(d.Data(), chunkData) {
t.Fatal("chunk data not equal to expected data")
}
// check that the second call does not result in another retrieve request
d, err = nstore.Get(context.Background(), storage.ModeGetRequest, addr)
if err != nil {
t.Fatal(err)
}
if retrieve.callCount != 1 {
t.Fatalf("call count %d", retrieve.callCount)
}
if !bytes.Equal(d.Data(), chunkData) {
t.Fatal("chunk data not equal to expected data")
}
}
// TestNetstoreNoRetrieval verifies that a chunk is not requested from the network
// whenever it is found locally.
func TestNetstoreNoRetrieval(t *testing.T) {
retrieve, store, nstore := newRetrievingNetstore()
addr := swarm.MustParseHexAddress("000001")
// store should have the chunk in advance
_, err := store.Put(context.Background(), storage.ModePutUpload, swarm.NewChunk(addr, chunkData))
if err != nil {
t.Fatal(err)
}
c, err := nstore.Get(context.Background(), storage.ModeGetRequest, addr)
if err != nil {
t.Fatal(err)
}
if retrieve.called {
t.Fatal("retrieve request issued but shouldn't")
}
if retrieve.callCount != 0 {
t.Fatalf("call count %d", retrieve.callCount)
}
if !bytes.Equal(c.Data(), chunkData) {
t.Fatal("chunk data mismatch")
}
}
// returns a mock retrieval protocol, a mock local storage and a netstore
func newRetrievingNetstore() (ret *retrievalMock, mockStore storage.Storer, ns storage.Storer) {
retrieve := &retrievalMock{}
store := mock.NewStorer()
nstore := netstore.New(store, retrieve, mockValidator{})
return retrieve, store, nstore
}
type retrievalMock struct {
called bool
callCount int32
addr swarm.Address
}
func (r *retrievalMock) RetrieveChunk(ctx context.Context, addr swarm.Address) (data []byte, err error) {
r.called = true
atomic.AddInt32(&r.callCount, 1)
r.addr = addr
return chunkData, nil
}
......@@ -28,14 +28,17 @@ import (
"github.com/ethersphere/bee/pkg/localstore"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/netstore"
"github.com/ethersphere/bee/pkg/p2p/libp2p"
"github.com/ethersphere/bee/pkg/pingpong"
"github.com/ethersphere/bee/pkg/retrieval"
"github.com/ethersphere/bee/pkg/statestore/leveldb"
mockinmem "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology/full"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/bee/pkg/validator"
ma "github.com/multiformats/go-multiaddr"
)
......@@ -190,12 +193,21 @@ func NewBee(o Options) (*Bee, error) {
}
b.localstoreCloser = storer
retrieve := retrieval.New(retrieval.Options{
Streamer: p2ps,
ChunkPeerer: topologyDriver,
Storer: storer,
Logger: logger,
})
ns := netstore.New(storer, retrieve, validator.NewContentAddressValidator())
var apiService api.Service
if o.APIAddr != "" {
// API server
apiService = api.New(api.Options{
Pingpong: pingPong,
Storer: storer,
Storer: ns,
Logger: logger,
Tracer: tracer,
})
......
......@@ -5,11 +5,10 @@ package pb
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
proto "github.com/gogo/protobuf/proto"
)
// Reference imports to suppress errors if they are not otherwise used.
......@@ -112,8 +111,8 @@ func (m *Delivery) GetData() []byte {
}
func init() {
proto.RegisterType((*Request)(nil), "retrieval.Request")
proto.RegisterType((*Delivery)(nil), "retrieval.Delivery")
proto.RegisterType((*Request)(nil), "pb.Request")
proto.RegisterType((*Delivery)(nil), "pb.Delivery")
}
func init() { proto.RegisterFile("retrieval.proto", fileDescriptor_fcade0a564e5dcd4) }
......@@ -121,13 +120,13 @@ func init() { proto.RegisterFile("retrieval.proto", fileDescriptor_fcade0a564e5d
var fileDescriptor_fcade0a564e5dcd4 = []byte{
// 127 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2f, 0x4a, 0x2d, 0x29,
0xca, 0x4c, 0x2d, 0x4b, 0xcc, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0x0b, 0x28,
0xc9, 0x72, 0xb1, 0x07, 0xa5, 0x16, 0x96, 0xa6, 0x16, 0x97, 0x08, 0x09, 0x71, 0xb1, 0x38, 0xa6,
0xa4, 0x14, 0x49, 0x30, 0x2a, 0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x4a, 0x72, 0x5c, 0x1c, 0x2e,
0xa9, 0x39, 0x99, 0x65, 0xa9, 0x45, 0x95, 0x20, 0x79, 0x97, 0xc4, 0x92, 0x44, 0x98, 0x3c, 0x88,
0xed, 0x24, 0x71, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0x4e,
0x78, 0x2c, 0xc7, 0x70, 0xe1, 0xb1, 0x1c, 0xc3, 0x8d, 0xc7, 0x72, 0x0c, 0x49, 0x6c, 0x60, 0xab,
0x8c, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0x00, 0x18, 0xd7, 0x30, 0x7d, 0x00, 0x00, 0x00,
0xca, 0x4c, 0x2d, 0x4b, 0xcc, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52,
0x92, 0xe5, 0x62, 0x0f, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x12, 0xe2, 0x62, 0x71, 0x4c,
0x49, 0x29, 0x92, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x09, 0x02, 0xb3, 0x95, 0xe4, 0xb8, 0x38, 0x5c,
0x52, 0x73, 0x32, 0xcb, 0x52, 0x8b, 0x2a, 0x41, 0xf2, 0x2e, 0x89, 0x25, 0x89, 0x30, 0x79, 0x10,
0xdb, 0x49, 0xe2, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x9c,
0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, 0x92, 0xd8, 0xc0, 0x76,
0x18, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x4f, 0xfc, 0x90, 0x25, 0x76, 0x00, 0x00, 0x00,
}
func (m *Request) Marshal() (dAtA []byte, err error) {
......
......@@ -23,6 +23,12 @@ const (
streamName = "retrieval"
)
var _ Interface = (*Service)(nil)
type Interface interface {
RetrieveChunk(ctx context.Context, addr swarm.Address) (data []byte, err error)
}
type Service struct {
streamer p2p.Streamer
peerSuggester topology.ClosestPeerer
......@@ -37,9 +43,6 @@ type Options struct {
Logger logging.Logger
}
type Storer interface {
}
func New(o Options) *Service {
return &Service{
streamer: o.Streamer,
......
......@@ -9,9 +9,9 @@ import (
"encoding/binary"
"hash"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bmt"
"github.com/ethersphere/bee/pkg/logging"
bmtlegacy "github.com/ethersphere/bmt/legacy"
"golang.org/x/crypto/sha3"
)
......@@ -30,7 +30,7 @@ type ContentAddressValidator struct {
}
// New constructs a new ContentAddressValidator
func NewContentAddressValidator() *ContentAddressValidator {
func NewContentAddressValidator() swarm.ChunkValidator {
p := bmtlegacy.NewTreePool(hashFunc, swarm.SectionSize, bmtlegacy.PoolSize)
return &ContentAddressValidator{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment