Commit 54b400eb authored by acud's avatar acud Committed by GitHub

feat: reupload to network (#1705)

* feat: reupload local content

* feat: reupload. address comments
parent 825c7592
...@@ -30,6 +30,7 @@ import ( ...@@ -30,6 +30,7 @@ import (
"github.com/ethersphere/bee/pkg/postage/postagecontract" "github.com/ethersphere/bee/pkg/postage/postagecontract"
"github.com/ethersphere/bee/pkg/pss" "github.com/ethersphere/bee/pkg/pss"
"github.com/ethersphere/bee/pkg/resolver" "github.com/ethersphere/bee/pkg/resolver"
"github.com/ethersphere/bee/pkg/steward"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
...@@ -91,6 +92,7 @@ type server struct { ...@@ -91,6 +92,7 @@ type server struct {
pss pss.Interface pss pss.Interface
traversal traversal.Traverser traversal traversal.Traverser
pinning pinning.Interface pinning pinning.Interface
steward steward.Reuploader
logger logging.Logger logger logging.Logger
tracer *tracing.Tracer tracer *tracing.Tracer
feedFactory feeds.Factory feedFactory feeds.Factory
...@@ -117,7 +119,7 @@ const ( ...@@ -117,7 +119,7 @@ const (
) )
// New will create a and initialize a new API service. // New will create a and initialize a new API service.
func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, pss pss.Interface, traversalService traversal.Traverser, pinning pinning.Interface, feedFactory feeds.Factory, post postage.Service, postageContract postagecontract.Interface, signer crypto.Signer, logger logging.Logger, tracer *tracing.Tracer, o Options) Service { func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, pss pss.Interface, traversalService traversal.Traverser, pinning pinning.Interface, feedFactory feeds.Factory, post postage.Service, postageContract postagecontract.Interface, steward steward.Reuploader, signer crypto.Signer, logger logging.Logger, tracer *tracing.Tracer, o Options) Service {
s := &server{ s := &server{
tags: tags, tags: tags,
storer: storer, storer: storer,
...@@ -128,6 +130,7 @@ func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, ps ...@@ -128,6 +130,7 @@ func New(tags *tags.Tags, storer storage.Storer, resolver resolver.Interface, ps
feedFactory: feedFactory, feedFactory: feedFactory,
post: post, post: post,
postageContract: postageContract, postageContract: postageContract,
steward: steward,
signer: signer, signer: signer,
Options: o, Options: o,
logger: logger, logger: logger,
......
...@@ -30,6 +30,7 @@ import ( ...@@ -30,6 +30,7 @@ import (
"github.com/ethersphere/bee/pkg/resolver" "github.com/ethersphere/bee/pkg/resolver"
resolverMock "github.com/ethersphere/bee/pkg/resolver/mock" resolverMock "github.com/ethersphere/bee/pkg/resolver/mock"
statestore "github.com/ethersphere/bee/pkg/statestore/mock" statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/steward"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock" "github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
...@@ -68,6 +69,7 @@ type testServerOptions struct { ...@@ -68,6 +69,7 @@ type testServerOptions struct {
CORSAllowedOrigins []string CORSAllowedOrigins []string
PostageContract postagecontract.Interface PostageContract postagecontract.Interface
Post postage.Service Post postage.Service
Steward steward.Reuploader
} }
func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string) { func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string) {
...@@ -87,7 +89,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. ...@@ -87,7 +89,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
if o.Post == nil { if o.Post == nil {
o.Post = mockpost.New() o.Post = mockpost.New()
} }
s := api.New(o.Tags, o.Storer, o.Resolver, o.Pss, o.Traversal, o.Pinning, o.Feeds, o.Post, o.PostageContract, signer, o.Logger, nil, api.Options{ s := api.New(o.Tags, o.Storer, o.Resolver, o.Pss, o.Traversal, o.Pinning, o.Feeds, o.Post, o.PostageContract, o.Steward, signer, o.Logger, nil, api.Options{
CORSAllowedOrigins: o.CORSAllowedOrigins, CORSAllowedOrigins: o.CORSAllowedOrigins,
GatewayMode: o.GatewayMode, GatewayMode: o.GatewayMode,
WsPingPeriod: o.WsPingPeriod, WsPingPeriod: o.WsPingPeriod,
...@@ -206,7 +208,7 @@ func TestParseName(t *testing.T) { ...@@ -206,7 +208,7 @@ func TestParseName(t *testing.T) {
signer := crypto.NewDefaultSigner(pk) signer := crypto.NewDefaultSigner(pk)
mockPostage := mockpost.New() mockPostage := mockpost.New()
s := api.New(nil, nil, tC.res, nil, nil, nil, nil, mockPostage, nil, signer, log, nil, api.Options{}).(*api.Server) s := api.New(nil, nil, tC.res, nil, nil, nil, nil, mockPostage, nil, nil, signer, log, nil, api.Options{}).(*api.Server)
t.Run(tC.desc, func(t *testing.T) { t.Run(tC.desc, func(t *testing.T) {
got, err := s.ResolveNameOrAddress(tC.name) got, err := s.ResolveNameOrAddress(tC.name)
......
...@@ -503,3 +503,22 @@ func (s *server) manifestFeed( ...@@ -503,3 +503,22 @@ func (s *server) manifestFeed(
f := feeds.New(topic, common.BytesToAddress(owner)) f := feeds.New(topic, common.BytesToAddress(owner))
return s.feedFactory.NewLookup(*t, f) return s.feedFactory.NewLookup(*t, f)
} }
func (s *server) bzzPatchHandler(w http.ResponseWriter, r *http.Request) {
nameOrHex := mux.Vars(r)["address"]
address, err := s.resolveNameOrAddress(nameOrHex)
if err != nil {
s.logger.Debugf("bzz patch: parse address %s: %v", nameOrHex, err)
s.logger.Error("bzz patch: parse address")
jsonhttp.NotFound(w, nil)
return
}
err = s.steward.Reupload(r.Context(), address)
if err != nil {
s.logger.Debugf("bzz patch: reupload %s: %v", address.String(), err)
s.logger.Error("bzz patch: reupload")
jsonhttp.InternalServerError(w, nil)
return
}
jsonhttp.OK(w, nil)
}
...@@ -600,3 +600,37 @@ func TestFeedIndirection(t *testing.T) { ...@@ -600,3 +600,37 @@ func TestFeedIndirection(t *testing.T) {
jsonhttptest.WithExpectedResponse(updateData), jsonhttptest.WithExpectedResponse(updateData),
) )
} }
func TestBzzReupload(t *testing.T) {
var (
logger = logging.New(ioutil.Discard, 0)
mockStatestore = statestore.NewStateStore()
m = &mockSteward{}
storer = smock.NewStorer()
addr = swarm.NewAddress([]byte{31: 128})
)
client, _, _ := newTestServer(t, testServerOptions{
Storer: storer,
Tags: tags.NewTags(mockStatestore, logger),
Logger: logger,
Steward: m,
})
jsonhttptest.Request(t, client, http.MethodPatch, "/v1/bzz/"+addr.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: http.StatusText(http.StatusOK),
Code: http.StatusOK,
}),
)
if !m.addr.Equal(addr) {
t.Fatalf("got address %s want %s", m.addr.String(), addr.String())
}
}
type mockSteward struct {
addr swarm.Address
}
func (m *mockSteward) Reupload(_ context.Context, addr swarm.Address) error {
m.addr = addr
return nil
}
...@@ -83,7 +83,7 @@ func TestPinHandlers(t *testing.T) { ...@@ -83,7 +83,7 @@ func TestPinHandlers(t *testing.T) {
storerMock = mock.NewStorer() storerMock = mock.NewStorer()
client, _, _ = newTestServer(t, testServerOptions{ client, _, _ = newTestServer(t, testServerOptions{
Storer: storerMock, Storer: storerMock,
Traversal: traversal.NewService(storerMock), Traversal: traversal.New(storerMock),
Tags: tags.NewTags(statestore.NewStateStore(), logging.New(ioutil.Discard, 0)), Tags: tags.NewTags(statestore.NewStateStore(), logging.New(ioutil.Discard, 0)),
Pinning: pinning.NewServiceMock(), Pinning: pinning.NewServiceMock(),
Logger: logging.New(ioutil.Discard, 5), Logger: logging.New(ioutil.Discard, 5),
......
...@@ -98,6 +98,10 @@ func (s *server) setupRouting() { ...@@ -98,6 +98,10 @@ func (s *server) setupRouting() {
s.newTracingHandler("bzz-download"), s.newTracingHandler("bzz-download"),
web.FinalHandlerFunc(s.bzzDownloadHandler), web.FinalHandlerFunc(s.bzzDownloadHandler),
), ),
"PATCH": web.ChainHandlers(
s.newTracingHandler("bzz-patch"),
web.FinalHandlerFunc(s.bzzPatchHandler),
),
}) })
handle("/pss/send/{topic}/{targets}", web.ChainHandlers( handle("/pss/send/{topic}/{targets}", web.ChainHandlers(
......
...@@ -59,6 +59,7 @@ import ( ...@@ -59,6 +59,7 @@ import (
"github.com/ethersphere/bee/pkg/settlement/swap" "github.com/ethersphere/bee/pkg/settlement/swap"
"github.com/ethersphere/bee/pkg/settlement/swap/chequebook" "github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
"github.com/ethersphere/bee/pkg/settlement/swap/transaction" "github.com/ethersphere/bee/pkg/settlement/swap/transaction"
"github.com/ethersphere/bee/pkg/steward"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
...@@ -525,7 +526,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -525,7 +526,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
ns = netstore.New(storer, validStamp, nil, retrieve, logger) ns = netstore.New(storer, validStamp, nil, retrieve, logger)
} }
traversalService := traversal.NewService(ns) traversalService := traversal.New(ns)
pinningService := pinning.NewService(storer, stateStore, traversalService) pinningService := pinning.NewService(storer, stateStore, traversalService)
...@@ -584,7 +585,8 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -584,7 +585,8 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
if o.APIAddr != "" { if o.APIAddr != "" {
// API server // API server
feedFactory := factory.New(ns) feedFactory := factory.New(ns)
apiService = api.New(tagService, ns, multiResolver, pssService, traversalService, pinningService, feedFactory, post, postageContractService, signer, logger, tracer, api.Options{ steward := steward.New(storer, traversalService, pushSyncProtocol)
apiService = api.New(tagService, ns, multiResolver, pssService, traversalService, pinningService, feedFactory, post, postageContractService, steward, signer, logger, tracer, api.Options{
CORSAllowedOrigins: o.CORSAllowedOrigins, CORSAllowedOrigins: o.CORSAllowedOrigins,
GatewayMode: o.GatewayMode, GatewayMode: o.GatewayMode,
WsPingPeriod: 60 * time.Second, WsPingPeriod: 60 * time.Second,
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style // Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package pinning package pinning_test
import ( import (
"context" "context"
...@@ -10,24 +10,23 @@ import ( ...@@ -10,24 +10,23 @@ import (
"testing" "testing"
"github.com/ethersphere/bee/pkg/file/pipeline/builder" "github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/pinning"
statestorem "github.com/ethersphere/bee/pkg/statestore/mock" statestorem "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
storagem "github.com/ethersphere/bee/pkg/storage/mock" storagem "github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/traversal" "github.com/ethersphere/bee/pkg/traversal"
) )
var _ Interface = (*Service)(nil)
func TestPinningService(t *testing.T) { func TestPinningService(t *testing.T) {
const content = "Hello, Bee!" const content = "Hello, Bee!"
var ( var (
ctx = context.Background() ctx = context.Background()
storerMock = storagem.NewStorer() storerMock = storagem.NewStorer()
service = NewService( service = pinning.NewService(
storerMock, storerMock,
statestorem.NewStateStore(), statestorem.NewStateStore(),
traversal.NewService(storerMock), traversal.New(storerMock),
) )
) )
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.package storage
// Package stewardess provides convenience methods
// for reseeding content on Swarm.
package steward
import (
"context"
"fmt"
"github.com/ethersphere/bee/pkg/pushsync"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/traversal"
"golang.org/x/sync/errgroup"
)
// how many parallel push operations
const parallelPush = 5
type Reuploader interface {
// Reupload root hash and all of its underlying
// associated chunks to the network.
Reupload(context.Context, swarm.Address) error
}
type steward struct {
getter storage.Getter
push pushsync.PushSyncer
traverser traversal.Traverser
}
func New(getter storage.Getter, t traversal.Traverser, p pushsync.PushSyncer) Reuploader {
return &steward{getter: getter, push: p, traverser: t}
}
// Reupload content with the given root hash to the network.
// The service will automatically dereference and traverse all
// addresses and push every chunk individually to the network.
// It assumes all chunks are available locally. It is therefore
// advisable to pin the content locally before trying to reupload it.
func (s *steward) Reupload(ctx context.Context, root swarm.Address) error {
sem := make(chan struct{}, parallelPush)
eg, _ := errgroup.WithContext(ctx)
fn := func(addr swarm.Address) error {
c, err := s.getter.Get(ctx, storage.ModeGetSync, addr)
if err != nil {
return err
}
sem <- struct{}{}
eg.Go(func() error {
defer func() { <-sem }()
_, err := s.push.PushChunkToClosest(ctx, c)
if err != nil {
return err
}
return nil
})
return nil
}
if err := s.traverser.Traverse(ctx, root, fn); err != nil {
return fmt.Errorf("traversal of %s failed: %w", root.String(), err)
}
if err := eg.Wait(); err != nil {
return fmt.Errorf("push error during reupload: %w", err)
}
return nil
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.package storage
package steward_test
import (
"bytes"
"context"
"crypto/rand"
"sync"
"testing"
"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/pushsync"
psmock "github.com/ethersphere/bee/pkg/pushsync/mock"
"github.com/ethersphere/bee/pkg/steward"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/traversal"
)
func TestSteward(t *testing.T) {
var (
ctx = context.Background()
chunks = 1000
data = make([]byte, chunks*4096) //1k chunks
store = mock.NewStorer()
traverser = traversal.New(store)
traversedAddrs = make(map[string]struct{})
mu sync.Mutex
fn = func(_ context.Context, ch swarm.Chunk) (*pushsync.Receipt, error) {
mu.Lock()
traversedAddrs[ch.Address().String()] = struct{}{}
mu.Unlock()
return nil, nil
}
ps = psmock.New(fn)
s = steward.New(store, traverser, ps)
)
n, err := rand.Read(data)
if n != cap(data) {
t.Fatal("short read")
}
if err != nil {
t.Fatal(err)
}
l := &loggingStore{Storer: store}
pipe := builder.NewPipelineBuilder(ctx, l, storage.ModePutUpload, false)
addr, err := builder.FeedPipeline(ctx, pipe, bytes.NewReader(data))
if err != nil {
t.Fatal(err)
}
err = s.Reupload(ctx, addr)
if err != nil {
t.Fatal(err)
}
mu.Lock()
defer mu.Unlock()
// check that everything that was stored is also traversed
for _, a := range l.addrs {
if _, ok := traversedAddrs[a.String()]; !ok {
t.Fatalf("expected address %s to be traversed", a.String())
}
}
}
type loggingStore struct {
storage.Storer
addrs []swarm.Address
}
func (l *loggingStore) Put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err error) {
for _, c := range chs {
l.addrs = append(l.addrs, c.Address())
}
return l.Storer.Put(ctx, mode, chs...)
}
...@@ -27,18 +27,18 @@ type Traverser interface { ...@@ -27,18 +27,18 @@ type Traverser interface {
Traverse(context.Context, swarm.Address, swarm.AddressIterFunc) error Traverse(context.Context, swarm.Address, swarm.AddressIterFunc) error
} }
// NewService is a convenient constructor for Service. // New constructs for a new Traverser.
func NewService(store storage.Storer) *Service { func New(store storage.Storer) Traverser {
return &Service{store: store} return &service{store: store}
} }
// Service is implementation of Interface using storage.Storer as its storage. // service is implementation of Traverser using storage.Storer as its storage.
type Service struct { type service struct {
store storage.Storer store storage.Storer
} }
// Traverse implements Traverser.Traverse method. // Traverse implements Traverser.Traverse method.
func (s *Service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm.AddressIterFunc) error { func (s *service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm.AddressIterFunc) error {
processBytes := func(ref swarm.Address) error { processBytes := func(ref swarm.Address) error {
j, _, err := joiner.New(ctx, s.store, ref) j, _, err := joiner.New(ctx, s.store, ref)
if err != nil { if err != nil {
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style // Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package traversal package traversal_test
import ( import (
"bytes" "bytes"
...@@ -20,6 +20,7 @@ import ( ...@@ -20,6 +20,7 @@ import (
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock" "github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/traversal"
) )
const ( const (
...@@ -67,8 +68,6 @@ func (i *addressIterator) Next(addr swarm.Address) error { ...@@ -67,8 +68,6 @@ func (i *addressIterator) Next(addr swarm.Address) error {
return nil return nil
} }
var _ Traverser = (*Service)(nil)
func TestTraversalBytes(t *testing.T) { func TestTraversalBytes(t *testing.T) {
testCases := []struct { testCases := []struct {
dataSize int dataSize int
...@@ -161,7 +160,7 @@ func TestTraversalBytes(t *testing.T) { ...@@ -161,7 +160,7 @@ func TestTraversalBytes(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = NewService(storerMock).Traverse(ctx, address, iter.Next) err = traversal.New(storerMock).Traverse(ctx, address, iter.Next)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -283,7 +282,7 @@ func TestTraversalFiles(t *testing.T) { ...@@ -283,7 +282,7 @@ func TestTraversalFiles(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = NewService(storerMock).Traverse(ctx, address, iter.Next) err = traversal.New(storerMock).Traverse(ctx, address, iter.Next)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -436,7 +435,7 @@ func TestTraversalManifest(t *testing.T) { ...@@ -436,7 +435,7 @@ func TestTraversalManifest(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = NewService(storerMock).Traverse(ctx, address, iter.Next) err = traversal.New(storerMock).Traverse(ctx, address, iter.Next)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment