Commit 77dd5628 authored by Nemanja Zbiljić's avatar Nemanja Zbiljić Committed by GitHub

Chunk traversal service (#924)

parent 09aa147e
......@@ -10,7 +10,7 @@ require (
github.com/ethereum/go-ethereum v1.9.20
github.com/ethersphere/bmt v0.1.4
github.com/ethersphere/langos v1.0.0
github.com/ethersphere/manifest v0.3.2
github.com/ethersphere/manifest v0.3.3
github.com/ethersphere/sw3-bindings/v2 v2.1.0
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
......
......@@ -167,8 +167,8 @@ github.com/ethersphere/bmt v0.1.4 h1:+rkWYNtMgDx6bkNqGdWu+U9DgGI1rRZplpSW3YhBr1Q
github.com/ethersphere/bmt v0.1.4/go.mod h1:Yd8ft1U69WDuHevZc/rwPxUv1rzPSMpMnS6xbU53aY8=
github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc=
github.com/ethersphere/langos v1.0.0/go.mod h1:dlcN2j4O8sQ+BlCaxeBu43bgr4RQ+inJ+pHwLeZg5Tw=
github.com/ethersphere/manifest v0.3.2 h1:IusNNfpqde2F7uWZ2DE9eyo9PMwUAMop3Ws1NBcdMyM=
github.com/ethersphere/manifest v0.3.2/go.mod h1:ygAx0KLhXYmKqsjUab95RCbXf8UcO7yMDjyfP0lY76Y=
github.com/ethersphere/manifest v0.3.3 h1:Fc4nE1c28v9j2IOGHdpaU7DQLjDWSJxXjCHL0Vl/9pQ=
github.com/ethersphere/manifest v0.3.3/go.mod h1:ygAx0KLhXYmKqsjUab95RCbXf8UcO7yMDjyfP0lY76Y=
github.com/ethersphere/sw3-bindings/v2 v2.1.0 h1:QefDtzU94UelICMPXWr7m52E2oj6r018Yc0XLoCWOxw=
github.com/ethersphere/sw3-bindings/v2 v2.1.0/go.mod h1:ozMVBZZlAirS/FcUpFwzV60v8gC0nVbA/5ZXtCX3xCc=
github.com/fatih/color v1.3.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
......
......@@ -6,6 +6,7 @@ package entry
import (
"errors"
"math"
"github.com/ethersphere/bee/pkg/collection"
"github.com/ethersphere/bee/pkg/encryption"
......@@ -33,6 +34,18 @@ func New(reference, metadata swarm.Address) *Entry {
}
}
// CanUnmarshal returns whether the entry may be might be unmarshaled based on
// the size.
func CanUnmarshal(size int64) bool {
if size < math.MaxInt32 {
switch int(size) {
case serializedDataSize, encryptedSerializedDataSize:
return true
}
}
return false
}
// Reference implements collection.Entry
func (e *Entry) Reference() swarm.Address {
return e.reference
......
......@@ -10,6 +10,7 @@ import (
"encoding/binary"
"errors"
"io"
"sync"
"sync/atomic"
"github.com/ethersphere/bee/pkg/encryption/store"
......@@ -229,6 +230,8 @@ func (j *joiner) processChunkAddresses(fn swarm.AddressIterFunc, data []byte, su
return
}
var wg sync.WaitGroup
for cursor := 0; cursor < len(data); cursor += j.refLength {
select {
case <-j.ctx.Done():
......@@ -249,7 +252,11 @@ func (j *joiner) processChunkAddresses(fn swarm.AddressIterFunc, data []byte, su
}
func(address swarm.Address, eg *errgroup.Group) {
wg.Add(1)
eg.Go(func() error {
defer wg.Done()
ch, err := j.getter.Get(j.ctx, storage.ModeGetRequest, address)
if err != nil {
return err
......@@ -261,6 +268,8 @@ func (j *joiner) processChunkAddresses(fn swarm.AddressIterFunc, data []byte, su
return nil
})
}(address, eg)
wg.Wait()
}
}
......
......@@ -12,7 +12,6 @@ import (
"io"
"io/ioutil"
mrand "math/rand"
"sync"
"testing"
"time"
......@@ -801,12 +800,9 @@ func TestJoinerIterateChunkAddresses(t *testing.T) {
}
foundAddresses := make(map[string]struct{})
var foundAddressesMu sync.Mutex
err = j.IterateChunkAddresses(func(addr swarm.Address) (stop bool) {
foundAddressesMu.Lock()
foundAddresses[addr.String()] = struct{}{}
foundAddressesMu.Unlock()
return false
})
if err != nil {
......
......@@ -21,6 +21,14 @@ var (
// ErrInvalidManifestType is returned when an unknown manifest type
// is provided to the function.
ErrInvalidManifestType = errors.New("manifest: invalid type")
// ErrMissingReference is returned when the reference for the manifest file
// is missing.
ErrMissingReference = errors.New("manifest: missing reference")
)
var (
errStopIterator = errors.New("manifest: stop iterator")
)
// Interface for operations with manifest.
......@@ -37,6 +45,10 @@ type Interface interface {
HasPrefix(string) (bool, error)
// Store stores the manifest, returning the resulting address.
Store(context.Context, storage.ModePut) (swarm.Address, error)
// IterateAddresses is used to iterate over chunks addresses for
// the manifest.
IterateAddresses(context.Context, swarm.AddressIterFunc) error
}
// Entry represents a single manifest entry.
......
......@@ -45,6 +45,24 @@ func NewMantarayManifest(
}, nil
}
// NewMantarayManifestWithObfuscationKeyFn creates a new mantaray-based manifest
// with configured obfuscation key
//
// NOTE: This should only be used in tests.
func NewMantarayManifestWithObfuscationKeyFn(
encrypted bool,
storer storage.Storer,
obfuscationKeyFn func([]byte) (int, error),
) (Interface, error) {
mm := &mantarayManifest{
trie: mantaray.New(),
encrypted: encrypted,
storer: storer,
}
mantaray.SetObfuscationKeyFn(obfuscationKeyFn)
return mm, nil
}
// NewMantarayManifestReference loads existing mantaray-based manifest.
func NewMantarayManifestReference(
ctx context.Context,
......@@ -113,6 +131,7 @@ func (m *mantarayManifest) HasPrefix(prefix string) (bool, error) {
func (m *mantarayManifest) Store(ctx context.Context, mode storage.ModePut) (swarm.Address, error) {
saver := newMantaraySaver(ctx, m.encrypted, m.storer, mode)
m.loader = saver
err := m.trie.Save(saver)
if err != nil {
......@@ -124,6 +143,53 @@ func (m *mantarayManifest) Store(ctx context.Context, mode storage.ModePut) (swa
return address, nil
}
func (m *mantarayManifest) IterateAddresses(ctx context.Context, fn swarm.AddressIterFunc) error {
reference := swarm.NewAddress(m.trie.Reference())
if swarm.ZeroAddress.Equal(reference) {
return ErrMissingReference
}
walker := func(path []byte, node *mantaray.Node, err error) error {
if err != nil {
return err
}
if node != nil {
var stop bool
if node.Reference() != nil {
ref := swarm.NewAddress(node.Reference())
stop = fn(ref)
if stop {
return errStopIterator
}
}
if node.IsValueType() && node.Entry() != nil {
entry := swarm.NewAddress(node.Entry())
stop = fn(entry)
if stop {
return errStopIterator
}
}
}
return nil
}
err := m.trie.WalkNode([]byte{}, m.loader, walker)
if err != nil {
if !errors.Is(err, errStopIterator) {
return fmt.Errorf("manifest iterate addresses: %w", err)
}
// ignore error if interation stopped by caller
}
return nil
}
// mantarayLoadSaver implements required interface 'mantaray.LoadSaver'
type mantarayLoadSaver struct {
ctx context.Context
......
......@@ -27,6 +27,7 @@ const (
type simpleManifest struct {
manifest simple.Manifest
reference swarm.Address
encrypted bool
storer storage.Storer
}
......@@ -52,6 +53,7 @@ func NewSimpleManifestReference(
) (Interface, error) {
m := &simpleManifest{
manifest: simple.NewManifest(),
reference: reference,
encrypted: encrypted,
storer: storer,
}
......@@ -116,9 +118,51 @@ func (m *simpleManifest) Store(ctx context.Context, mode storage.ModePut) (swarm
return swarm.ZeroAddress, fmt.Errorf("manifest save error: %w", err)
}
m.reference = address
return address, nil
}
func (m *simpleManifest) IterateAddresses(ctx context.Context, fn swarm.AddressIterFunc) error {
if swarm.ZeroAddress.Equal(m.reference) {
return ErrMissingReference
}
// NOTE: making it behave same for all manifest implementation
stop := fn(m.reference)
if stop {
return nil
}
walker := func(path string, entry simple.Entry, err error) error {
if err != nil {
return err
}
ref, err := swarm.ParseHexAddress(entry.Reference())
if err != nil {
return err
}
stop := fn(ref)
if stop {
return errStopIterator
}
return nil
}
err := m.manifest.WalkEntry("", walker)
if err != nil {
if !errors.Is(err, errStopIterator) {
return fmt.Errorf("manifest iterate addresses: %w", err)
}
// ignore error if interation stopped by caller
}
return nil
}
func (m *simpleManifest) load(ctx context.Context, reference swarm.Address) error {
j, _, err := joiner.New(ctx, m.storer, reference)
if err != nil {
......
......@@ -107,6 +107,25 @@ func (m *MockStorer) Put(ctx context.Context, mode storage.ModePut, chs ...swarm
}
m.store[ch.Address().String()] = ch.Data()
m.modePut[ch.Address().String()] = mode
// pin chunks if needed
switch mode {
case storage.ModePutUploadPin:
// if mode is set pin, increment the pin counter
var found bool
addr := ch.Address()
for i, ad := range m.pinnedAddress {
if addr.String() == ad.String() {
m.pinnedCounter[i] = m.pinnedCounter[i] + 1
found = true
}
}
if !found {
m.pinnedAddress = append(m.pinnedAddress, addr)
m.pinnedCounter = append(m.pinnedCounter, uint64(1))
}
default:
}
}
return exist, nil
}
......
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package traversal
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/collection/entry"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/joiner"
"github.com/ethersphere/bee/pkg/manifest"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
var (
// ErrInvalidType is returned when the reference was not expected type.
ErrInvalidType = errors.New("traversal: invalid type")
)
// Service is the service to find dependent chunks for an address.
type Service interface {
// TraverseAddresses iterates through each address related to the supplied
// one, if possible.
TraverseAddresses(context.Context, swarm.Address, swarm.AddressIterFunc) error
// TraverseBytesAddresses iterates through each address of a bytes.
TraverseBytesAddresses(context.Context, swarm.Address, swarm.AddressIterFunc) error
// TraverseFileAddresses iterates through each address of a file.
TraverseFileAddresses(context.Context, swarm.Address, swarm.AddressIterFunc) error
// TraverseManifestAddresses iterates through each address of a manifest,
// as well as each entry found in it.
TraverseManifestAddresses(context.Context, swarm.Address, swarm.AddressIterFunc) error
}
type traversalService struct {
storer storage.Storer
}
func NewService(storer storage.Storer) Service {
return &traversalService{
storer: storer,
}
}
func (s *traversalService) TraverseAddresses(
ctx context.Context,
reference swarm.Address,
chunkAddressFunc swarm.AddressIterFunc,
) error {
isFile, e, metadata, err := s.checkIsFile(ctx, reference)
if err != nil {
return err
}
// reference address could be missrepresented as file when:
// - content size is 64 bytes (or 128 for encrypted reference)
// - second reference exists and is JSON (and not actually file metadata)
if isFile {
isManifest, m, err := s.checkIsManifest(ctx, reference, e, metadata)
if err != nil {
return err
}
// reference address could be missrepresented as manifest when:
// - file content type is actually on of manifest type (manually set)
// - content was unmarshalled
//
// even though content could be unmarshaled in some case, iteration
// through addresses will not be possible
if isManifest {
// process as manifest
err = m.IterateAddresses(ctx, func(manifestNodeAddr swarm.Address) (stop bool) {
err := s.traverseChunkAddressesFromManifest(ctx, manifestNodeAddr, chunkAddressFunc)
if err != nil {
stop = true
}
return
})
if err != nil {
return fmt.Errorf("traversal: iterate chunks: %s: %w", reference, err)
}
metadataReference := e.Metadata()
err = s.processBytes(ctx, metadataReference, chunkAddressFunc)
if err != nil {
return err
}
_ = chunkAddressFunc(reference)
} else {
return s.traverseChunkAddressesAsFile(ctx, reference, chunkAddressFunc, e)
}
} else {
return s.processBytes(ctx, reference, chunkAddressFunc)
}
return nil
}
func (s *traversalService) TraverseBytesAddresses(
ctx context.Context,
reference swarm.Address,
chunkAddressFunc swarm.AddressIterFunc,
) error {
return s.processBytes(ctx, reference, chunkAddressFunc)
}
func (s *traversalService) TraverseFileAddresses(
ctx context.Context,
reference swarm.Address,
chunkAddressFunc swarm.AddressIterFunc,
) error {
isFile, e, _, err := s.checkIsFile(ctx, reference)
if err != nil {
return err
}
// reference address could be missrepresented as file when:
// - content size is 64 bytes (or 128 for encrypted reference)
// - second reference exists and is JSON (and not actually file metadata)
if !isFile {
return ErrInvalidType
}
return s.traverseChunkAddressesAsFile(ctx, reference, chunkAddressFunc, e)
}
func (s *traversalService) TraverseManifestAddresses(
ctx context.Context,
reference swarm.Address,
chunkAddressFunc swarm.AddressIterFunc,
) error {
isFile, e, metadata, err := s.checkIsFile(ctx, reference)
if err != nil {
return err
}
if !isFile {
return ErrInvalidType
}
isManifest, m, err := s.checkIsManifest(ctx, reference, e, metadata)
if err != nil {
return err
}
// reference address could be missrepresented as manifest when:
// - file content type is actually on of manifest type (manually set)
// - content was unmarshalled
//
// even though content could be unmarshaled in some case, iteration
// through addresses will not be possible
if !isManifest {
return ErrInvalidType
}
err = m.IterateAddresses(ctx, func(manifestNodeAddr swarm.Address) (stop bool) {
err := s.traverseChunkAddressesFromManifest(ctx, manifestNodeAddr, chunkAddressFunc)
if err != nil {
stop = true
}
return
})
if err != nil {
return fmt.Errorf("traversal: iterate chunks: %s: %w", reference, err)
}
metadataReference := e.Metadata()
err = s.processBytes(ctx, metadataReference, chunkAddressFunc)
if err != nil {
return err
}
_ = chunkAddressFunc(reference)
return nil
}
func (s *traversalService) traverseChunkAddressesFromManifest(
ctx context.Context,
reference swarm.Address,
chunkAddressFunc swarm.AddressIterFunc,
) error {
isFile, e, _, err := s.checkIsFile(ctx, reference)
if err != nil {
return err
}
if isFile {
return s.traverseChunkAddressesAsFile(ctx, reference, chunkAddressFunc, e)
}
return s.processBytes(ctx, reference, chunkAddressFunc)
}
func (s *traversalService) traverseChunkAddressesAsFile(
ctx context.Context,
reference swarm.Address,
chunkAddressFunc swarm.AddressIterFunc,
e *entry.Entry,
) (err error) {
bytesReference := e.Reference()
err = s.processBytes(ctx, bytesReference, chunkAddressFunc)
if err != nil {
// possible it was custom JSON bytes, which matches entry JSON
// but in fact is not file, and does not contain reference to
// existing address, which is why it was not found in storage
if !errors.Is(err, storage.ErrNotFound) {
return nil
}
// ignore
}
metadataReference := e.Metadata()
err = s.processBytes(ctx, metadataReference, chunkAddressFunc)
if err != nil {
return
}
_ = chunkAddressFunc(reference)
return nil
}
// checkIsFile checks if the content is file.
func (s *traversalService) checkIsFile(
ctx context.Context,
reference swarm.Address,
) (isFile bool, e *entry.Entry, metadata *entry.Metadata, err error) {
var (
j file.Joiner
span int64
)
j, span, err = joiner.New(ctx, s.storer, reference)
if err != nil {
err = fmt.Errorf("traversal: joiner: %s: %w", reference, err)
return
}
maybeIsFile := entry.CanUnmarshal(span)
if maybeIsFile {
buf := bytes.NewBuffer(nil)
_, err = file.JoinReadAll(ctx, j, buf)
if err != nil {
err = fmt.Errorf("traversal: read entry: %s: %w", reference, err)
return
}
e = &entry.Entry{}
err = e.UnmarshalBinary(buf.Bytes())
if err != nil {
err = fmt.Errorf("traversal: unmarshal entry: %s: %w", reference, err)
return
}
// NOTE: any bytes will unmarshall to addresses; we need to check metadata
// read metadata
j, _, err = joiner.New(ctx, s.storer, e.Metadata())
if err != nil {
// ignore
err = nil
return
}
buf = bytes.NewBuffer(nil)
_, err = file.JoinReadAll(ctx, j, buf)
if err != nil {
err = fmt.Errorf("traversal: read metadata: %s: %w", reference, err)
return
}
metadata = &entry.Metadata{}
dec := json.NewDecoder(buf)
dec.DisallowUnknownFields()
err = dec.Decode(metadata)
if err != nil {
// may not be metadata JSON
err = nil
return
}
isFile = true
}
return
}
// checkIsManifest checks if the content is manifest.
func (s *traversalService) checkIsManifest(
ctx context.Context,
reference swarm.Address,
e *entry.Entry,
metadata *entry.Metadata,
) (isManifest bool, m manifest.Interface, err error) {
// NOTE: 'encrypted' parameter only used for saving manifest
m, err = manifest.NewManifestReference(
ctx,
metadata.MimeType,
e.Reference(),
false,
s.storer,
)
if err != nil {
if err == manifest.ErrInvalidManifestType {
// ignore
err = nil
return
}
err = fmt.Errorf("traversal: read manifest: %s: %w", reference, err)
return
}
isManifest = true
return
}
func (s *traversalService) processBytes(
ctx context.Context,
reference swarm.Address,
chunkAddressFunc swarm.AddressIterFunc,
) error {
j, _, err := joiner.New(ctx, s.storer, reference)
if err != nil {
return fmt.Errorf("traversal: joiner: %s: %w", reference, err)
}
err = j.IterateChunkAddresses(chunkAddressFunc)
if err != nil {
return fmt.Errorf("traversal: iterate chunks: %s: %w", reference, err)
}
return nil
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment