joiner.go 6.99 KB
Newer Older
1 2 3 4
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

acud's avatar
acud committed
5 6
// Package joiner provides implementations of the file.Joiner interface
package joiner
7 8 9 10 11 12

import (
	"context"
	"encoding/binary"
	"errors"
	"io"
13
	"sync"
14
	"sync/atomic"
15

acud's avatar
acud committed
16 17
	"github.com/ethersphere/bee/pkg/encryption/store"
	"github.com/ethersphere/bee/pkg/file"
18 19
	"github.com/ethersphere/bee/pkg/storage"
	"github.com/ethersphere/bee/pkg/swarm"
20
	"golang.org/x/sync/errgroup"
21 22
)

acud's avatar
acud committed
23
type joiner struct {
24 25 26 27 28
	addr      swarm.Address
	rootData  []byte
	span      int64
	off       int64
	refLength int
29 30 31 32 33

	ctx    context.Context
	getter storage.Getter
}

acud's avatar
acud committed
34 35 36
// New creates a new Joiner. A Joiner provides Read, Seek and Size functionalities.
func New(ctx context.Context, getter storage.Getter, address swarm.Address) (file.Joiner, int64, error) {
	getter = store.New(getter)
37 38 39 40
	// retrieve the root chunk to read the total data length the be retrieved
	rootChunk, err := getter.Get(ctx, storage.ModeGetRequest, address)
	if err != nil {
		return nil, 0, err
41 42
	}

43 44 45 46
	var chunkData = rootChunk.Data()

	span := int64(binary.LittleEndian.Uint64(chunkData[:swarm.SpanSize]))

acud's avatar
acud committed
47
	j := &joiner{
48 49 50 51 52 53 54 55 56
		addr:      rootChunk.Address(),
		refLength: len(address.Bytes()),
		ctx:       ctx,
		getter:    getter,
		span:      span,
		rootData:  chunkData[swarm.SpanSize:],
	}

	return j, span, nil
57 58 59 60
}

// Read is called by the consumer to retrieve the joined data.
// It must be called with a buffer equal to the maximum chunk size.
acud's avatar
acud committed
61
func (j *joiner) Read(b []byte) (n int, err error) {
62 63 64 65 66 67 68 69 70
	read, err := j.ReadAt(b, j.off)
	if err != nil && err != io.EOF {
		return read, err
	}

	j.off += int64(read)
	return read, err
}

acud's avatar
acud committed
71
func (j *joiner) ReadAt(b []byte, off int64) (read int, err error) {
72
	// since offset is int64 and swarm spans are uint64 it means we cannot seek beyond int64 max value
73
	if off >= j.span {
74 75 76
		return 0, io.EOF
	}

77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
	readLen := int64(cap(b))
	if readLen > j.span-off {
		readLen = j.span - off
	}
	var bytesRead int64
	var eg errgroup.Group
	j.readAtOffset(b, j.rootData, 0, j.span, off, 0, readLen, &bytesRead, &eg)

	err = eg.Wait()
	if err != nil {
		return 0, err
	}

	return int(atomic.LoadInt64(&bytesRead)), nil
}

acud's avatar
acud committed
93
func (j *joiner) readAtOffset(b, data []byte, cur, subTrieSize, off, bufferOffset, bytesToRead int64, bytesRead *int64, eg *errgroup.Group) {
94
	// we are at a leaf data chunk
95 96
	if subTrieSize <= int64(len(data)) {
		dataOffsetStart := off - cur
97
		dataOffsetEnd := dataOffsetStart + bytesToRead
98

99
		if lenDataToCopy := int64(len(data)) - dataOffsetStart; bytesToRead > lenDataToCopy {
100 101 102 103
			dataOffsetEnd = dataOffsetStart + lenDataToCopy
		}

		bs := data[dataOffsetStart:dataOffsetEnd]
104 105 106
		n := copy(b[bufferOffset:bufferOffset+int64(len(bs))], bs)
		atomic.AddInt64(bytesRead, int64(n))
		return
107 108
	}

109
	for cursor := 0; cursor < len(data); cursor += j.refLength {
110 111
		if bytesToRead == 0 {
			break
112 113
		}

114 115 116 117 118 119
		// fast forward the cursor
		sec := subtrieSection(data, cursor, j.refLength, subTrieSize)
		if cur+sec < off {
			cur += sec
			continue
		}
120

121 122 123 124
		// if we are here it means that we are within the bounds of the data we need to read
		address := swarm.NewAddress(data[cursor : cursor+j.refLength])
		subtrieSpan := sec
		currentReadSize := subtrieSpan - (off - cur) // the size of the subtrie, minus the offset from the start of the trie
125

126 127 128
		// upper bound alignments
		if currentReadSize > bytesToRead {
			currentReadSize = bytesToRead
129
		}
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
		if currentReadSize > subtrieSpan {
			currentReadSize = subtrieSpan
		}

		func(address swarm.Address, b []byte, cur, subTrieSize, off, bufferOffset, bytesToRead int64) {
			eg.Go(func() error {
				ch, err := j.getter.Get(j.ctx, storage.ModeGetRequest, address)
				if err != nil {
					return err
				}

				chunkData := ch.Data()[8:]
				subtrieSpan := int64(chunkToSpan(ch.Data()))
				j.readAtOffset(b, chunkData, cur, subtrieSpan, off, bufferOffset, currentReadSize, bytesRead, eg)
				return nil
			})
		}(address, b, cur, subtrieSpan, off, bufferOffset, currentReadSize)

		bufferOffset += currentReadSize
		bytesToRead -= currentReadSize
150
		cur += subtrieSpan
151
		off = cur
152
	}
153
}
154

155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
// brute-forces the subtrie size for each of the sections in this intermediate chunk
func subtrieSection(data []byte, startIdx, refLen int, subtrieSize int64) int64 {
	// assume we have a trie of size `y` then we can assume that all of
	// the forks except for the last one on the right are of equal size
	// this is due to how the splitter wraps levels.
	// so for the branches on the left, we can assume that
	// y = (refs - 1) * x + l
	// where y is the size of the subtrie, refs are the number of references
	// x is constant (the brute forced value) and l is the size of the last subtrie
	var (
		refs       = int64(len(data) / refLen) // how many references in the intermediate chunk
		branching  = int64(4096 / refLen)      // branching factor is chunkSize divided by reference length
		branchSize = int64(4096)
	)
	for {
		whatsLeft := subtrieSize - (branchSize * (refs - 1))
		if whatsLeft <= branchSize {
			break
		}
		branchSize *= branching
	}

	// handle last branch edge case
	if startIdx == int(refs-1)*refLen {
		return subtrieSize - (refs-1)*branchSize
	}
	return branchSize
182 183 184 185 186
}

var errWhence = errors.New("seek: invalid whence")
var errOffset = errors.New("seek: invalid offset")

acud's avatar
acud committed
187
func (j *joiner) Seek(offset int64, whence int) (int64, error) {
188 189 190 191 192 193 194
	switch whence {
	case 0:
		offset += 0
	case 1:
		offset += j.off
	case 2:

195
		offset = j.span - offset
196 197 198 199 200 201 202 203 204 205
		if offset < 0 {
			return 0, io.EOF
		}
	default:
		return 0, errWhence
	}

	if offset < 0 {
		return 0, errOffset
	}
206
	if offset > j.span {
207 208 209 210 211 212 213
		return 0, io.EOF
	}
	j.off = offset
	return offset, nil

}

214 215
func (j *joiner) IterateChunkAddresses(fn swarm.AddressIterFunc) error {
	// report root address
216 217 218
	err := fn(j.addr)
	if err != nil {
		return err
219 220
	}

221
	return j.processChunkAddresses(j.ctx, fn, j.rootData, j.span)
222 223
}

224
func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIterFunc, data []byte, subTrieSize int64) error {
225 226
	// we are at a leaf data chunk
	if subTrieSize <= int64(len(data)) {
227 228 229 230 231 232 233
		return nil
	}

	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
234 235
	}

236 237
	eg, ectx := errgroup.WithContext(ctx)

238 239
	var wg sync.WaitGroup

240 241 242 243
	for cursor := 0; cursor < len(data); cursor += j.refLength {

		address := swarm.NewAddress(data[cursor : cursor+j.refLength])

244 245
		if err := fn(address); err != nil {
			return err
246 247 248 249 250 251 252 253
		}

		sec := subtrieSection(data, cursor, j.refLength, subTrieSize)
		if sec <= 4096 {
			continue
		}

		func(address swarm.Address, eg *errgroup.Group) {
254 255
			wg.Add(1)

256
			eg.Go(func() error {
257 258
				defer wg.Done()

259
				ch, err := j.getter.Get(ectx, storage.ModeGetRequest, address)
260 261 262 263 264 265
				if err != nil {
					return err
				}

				chunkData := ch.Data()[8:]
				subtrieSpan := int64(chunkToSpan(ch.Data()))
266 267

				return j.processChunkAddresses(ectx, fn, chunkData, subtrieSpan)
268 269
			})
		}(address, eg)
270 271

		wg.Wait()
272
	}
273 274

	return eg.Wait()
275 276
}

acud's avatar
acud committed
277 278
func (j *joiner) Size() int64 {
	return j.span
279 280 281 282 283
}

func chunkToSpan(data []byte) uint64 {
	return binary.LittleEndian.Uint64(data[:8])
}