Commit 09aa147e authored by Nemanja Zbiljić's avatar Nemanja Zbiljić Committed by GitHub

Add file joiner function for iterating through chunk addresses (#923)

parent a3fdf8bc
...@@ -20,6 +20,8 @@ type Reader interface { ...@@ -20,6 +20,8 @@ type Reader interface {
// Joiner provides the inverse functionality of the Splitter. // Joiner provides the inverse functionality of the Splitter.
type Joiner interface { type Joiner interface {
Reader Reader
// IterateChunkAddresses is used to iterate over chunks addresses of some root hash.
IterateChunkAddresses(swarm.AddressIterFunc) error
// Size returns the span of the hash trie represented by the joiner's root hash. // Size returns the span of the hash trie represented by the joiner's root hash.
Size() int64 Size() int64
} }
......
...@@ -210,6 +210,60 @@ func (j *joiner) Seek(offset int64, whence int) (int64, error) { ...@@ -210,6 +210,60 @@ func (j *joiner) Seek(offset int64, whence int) (int64, error) {
} }
func (j *joiner) IterateChunkAddresses(fn swarm.AddressIterFunc) error {
// report root address
stop := fn(j.addr)
if stop {
return nil
}
var eg errgroup.Group
j.processChunkAddresses(fn, j.rootData, j.span, &eg)
return eg.Wait()
}
func (j *joiner) processChunkAddresses(fn swarm.AddressIterFunc, data []byte, subTrieSize int64, eg *errgroup.Group) {
// we are at a leaf data chunk
if subTrieSize <= int64(len(data)) {
return
}
for cursor := 0; cursor < len(data); cursor += j.refLength {
select {
case <-j.ctx.Done():
return
default:
}
address := swarm.NewAddress(data[cursor : cursor+j.refLength])
stop := fn(address)
if stop {
break
}
sec := subtrieSection(data, cursor, j.refLength, subTrieSize)
if sec <= 4096 {
continue
}
func(address swarm.Address, eg *errgroup.Group) {
eg.Go(func() error {
ch, err := j.getter.Get(j.ctx, storage.ModeGetRequest, address)
if err != nil {
return err
}
chunkData := ch.Data()[8:]
subtrieSpan := int64(chunkToSpan(ch.Data()))
j.processChunkAddresses(fn, chunkData, subtrieSpan, eg)
return nil
})
}(address, eg)
}
}
func (j *joiner) Size() int64 { func (j *joiner) Size() int64 {
return j.span return j.span
} }
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
mrand "math/rand" mrand "math/rand"
"sync"
"testing" "testing"
"time" "time"
...@@ -764,3 +765,67 @@ func TestJoinerTwoLevelsAcrossChunk(t *testing.T) { ...@@ -764,3 +765,67 @@ func TestJoinerTwoLevelsAcrossChunk(t *testing.T) {
t.Fatalf("last chunk expected read %d bytes; got %d", 42, c) t.Fatalf("last chunk expected read %d bytes; got %d", 42, c)
} }
} }
func TestJoinerIterateChunkAddresses(t *testing.T) {
store := mock.NewStorer()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// create root chunk with 2 references and the referenced data chunks
rootChunk := filetest.GenerateTestRandomFileChunk(swarm.ZeroAddress, swarm.ChunkSize*2, swarm.SectionSize*2)
_, err := store.Put(ctx, storage.ModePutUpload, rootChunk)
if err != nil {
t.Fatal(err)
}
firstAddress := swarm.NewAddress(rootChunk.Data()[8 : swarm.SectionSize+8])
firstChunk := filetest.GenerateTestRandomFileChunk(firstAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, firstChunk)
if err != nil {
t.Fatal(err)
}
secondAddress := swarm.NewAddress(rootChunk.Data()[swarm.SectionSize+8:])
secondChunk := filetest.GenerateTestRandomFileChunk(secondAddress, swarm.ChunkSize, swarm.ChunkSize)
_, err = store.Put(ctx, storage.ModePutUpload, secondChunk)
if err != nil {
t.Fatal(err)
}
createdAddresses := []swarm.Address{rootChunk.Address(), firstAddress, secondAddress}
j, _, err := joiner.New(ctx, store, rootChunk.Address())
if err != nil {
t.Fatal(err)
}
foundAddresses := make(map[string]struct{})
var foundAddressesMu sync.Mutex
err = j.IterateChunkAddresses(func(addr swarm.Address) (stop bool) {
foundAddressesMu.Lock()
foundAddresses[addr.String()] = struct{}{}
foundAddressesMu.Unlock()
return false
})
if err != nil {
t.Fatal(err)
}
if len(createdAddresses) != len(foundAddresses) {
t.Fatalf("expected to find %d addresses, got %d", len(createdAddresses), len(foundAddresses))
}
checkAddressFound := func(t *testing.T, foundAddresses map[string]struct{}, address swarm.Address) {
t.Helper()
if _, ok := foundAddresses[address.String()]; !ok {
t.Fatalf("expected address %s not found", address.String())
}
}
for _, createdAddress := range createdAddresses {
checkAddressFound(t, foundAddresses, createdAddress)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment