Commit 7b42ae38 authored by acud's avatar acud Committed by GitHub

seekjoiner: fill read buffer, integrate langos (#667)

* seekjoiner: prefetch entire read buffer
* integrate langos (#674)
parent c4d5b031
...@@ -9,6 +9,7 @@ require ( ...@@ -9,6 +9,7 @@ require (
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/ethereum/go-ethereum v1.9.20 github.com/ethereum/go-ethereum v1.9.20
github.com/ethersphere/bmt v0.1.2 github.com/ethersphere/bmt v0.1.2
github.com/ethersphere/langos v1.0.0
github.com/ethersphere/manifest v0.3.0 github.com/ethersphere/manifest v0.3.0
github.com/ethersphere/sw3-bindings/v2 v2.1.0 github.com/ethersphere/sw3-bindings/v2 v2.1.0
github.com/gogo/protobuf v1.3.1 github.com/gogo/protobuf v1.3.1
......
...@@ -165,6 +165,8 @@ github.com/ethereum/go-ethereum v1.9.20 h1:kk/J5OIoaoz3DRrCXznz3RGi212mHHXwzXlY/ ...@@ -165,6 +165,8 @@ github.com/ethereum/go-ethereum v1.9.20 h1:kk/J5OIoaoz3DRrCXznz3RGi212mHHXwzXlY/
github.com/ethereum/go-ethereum v1.9.20/go.mod h1:JSSTypSMTkGZtAdAChH2wP5dZEvPGh3nUTuDpH+hNrg= github.com/ethereum/go-ethereum v1.9.20/go.mod h1:JSSTypSMTkGZtAdAChH2wP5dZEvPGh3nUTuDpH+hNrg=
github.com/ethersphere/bmt v0.1.2 h1:FEuvQY9xuK+rDp3VwDVyde8T396Matv/u9PdtKa2r9Q= github.com/ethersphere/bmt v0.1.2 h1:FEuvQY9xuK+rDp3VwDVyde8T396Matv/u9PdtKa2r9Q=
github.com/ethersphere/bmt v0.1.2/go.mod h1:fqRBDmYwn3lX2MH4lkImXQgFWeNP8ikLkS/hgi/HRws= github.com/ethersphere/bmt v0.1.2/go.mod h1:fqRBDmYwn3lX2MH4lkImXQgFWeNP8ikLkS/hgi/HRws=
github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc=
github.com/ethersphere/langos v1.0.0/go.mod h1:dlcN2j4O8sQ+BlCaxeBu43bgr4RQ+inJ+pHwLeZg5Tw=
github.com/ethersphere/manifest v0.3.0 h1:+QRXY/AQ17mg0x3e20gvn4aAOHsZpm3rzi930bsOlro= github.com/ethersphere/manifest v0.3.0 h1:+QRXY/AQ17mg0x3e20gvn4aAOHsZpm3rzi930bsOlro=
github.com/ethersphere/manifest v0.3.0/go.mod h1:ygAx0KLhXYmKqsjUab95RCbXf8UcO7yMDjyfP0lY76Y= github.com/ethersphere/manifest v0.3.0/go.mod h1:ygAx0KLhXYmKqsjUab95RCbXf8UcO7yMDjyfP0lY76Y=
github.com/ethersphere/sw3-bindings/v2 v2.1.0 h1:QefDtzU94UelICMPXWr7m52E2oj6r018Yc0XLoCWOxw= github.com/ethersphere/sw3-bindings/v2 v2.1.0 h1:QefDtzU94UelICMPXWr7m52E2oj6r018Yc0XLoCWOxw=
......
...@@ -32,6 +32,18 @@ const ( ...@@ -32,6 +32,18 @@ const (
SwarmErrorDocumentHeader = "Swarm-Error-Document" SwarmErrorDocumentHeader = "Swarm-Error-Document"
) )
// The size of buffer used for prefetching content with Langos.
// Warning: This value influences the number of chunk requests and chunker join goroutines
// per file request.
// Recommended value is 8 or 16 times the io.Copy default buffer value which is 32kB, depending
// on the file size. Use lookaheadBufferSize() to get the correct buffer size for the request.
const (
smallFileBufferSize = 8 * 32 * 1024
largeFileBufferSize = 16 * 32 * 1024
largeBufferFilesizeThreshold = 10 * 1000000 // ten megs
)
var ( var (
errInvalidNameOrAddress = errors.New("invalid name or bzz address") errInvalidNameOrAddress = errors.New("invalid name or bzz address")
errNoResolver = errors.New("no resolver connected") errNoResolver = errors.New("no resolver connected")
...@@ -178,3 +190,10 @@ func (s *server) newTracingHandler(spanName string) func(h http.Handler) http.Ha ...@@ -178,3 +190,10 @@ func (s *server) newTracingHandler(spanName string) func(h http.Handler) http.Ha
}) })
} }
} }
func lookaheadBufferSize(size int64) int {
if size <= largeBufferFilesizeThreshold {
return smallFileBufferSize
}
return largeFileBufferSize
}
...@@ -28,6 +28,7 @@ import ( ...@@ -28,6 +28,7 @@ import (
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tracing" "github.com/ethersphere/bee/pkg/tracing"
"github.com/ethersphere/langos"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
...@@ -336,5 +337,5 @@ func (s *server) downloadHandler(w http.ResponseWriter, r *http.Request, referen ...@@ -336,5 +337,5 @@ func (s *server) downloadHandler(w http.ResponseWriter, r *http.Request, referen
w.Header().Set("Decompressed-Content-Length", fmt.Sprintf("%d", l)) w.Header().Set("Decompressed-Content-Length", fmt.Sprintf("%d", l))
w.Header().Set(TargetsRecoveryHeader, targets) w.Header().Set(TargetsRecoveryHeader, targets)
http.ServeContent(w, r, "", time.Now(), reader) http.ServeContent(w, r, "", time.Now(), langos.NewBufferedLangos(reader, lookaheadBufferSize(l)))
} }
...@@ -14,9 +14,14 @@ import ( ...@@ -14,9 +14,14 @@ import (
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
) )
type Reader interface {
io.ReadSeeker
io.ReaderAt
}
// JoinSeeker provides a Joiner that can seek. // JoinSeeker provides a Joiner that can seek.
type JoinSeeker interface { type JoinSeeker interface {
Join(ctx context.Context, address swarm.Address) (dataOut io.ReadSeeker, dataLength int64, err error) Join(ctx context.Context, address swarm.Address) (dataOut Reader, dataLength int64, err error)
Size(ctx context.Context, address swarm.Address) (dataLength int64, err error) Size(ctx context.Context, address swarm.Address) (dataLength int64, err error)
} }
......
...@@ -9,9 +9,11 @@ import ( ...@@ -9,9 +9,11 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"io" "io"
"sync/atomic"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/swarm"
"golang.org/x/sync/errgroup"
) )
type SimpleJoiner struct { type SimpleJoiner struct {
...@@ -63,48 +65,115 @@ func (j *SimpleJoiner) Read(b []byte) (n int, err error) { ...@@ -63,48 +65,115 @@ func (j *SimpleJoiner) Read(b []byte) (n int, err error) {
func (j *SimpleJoiner) ReadAt(b []byte, off int64) (read int, err error) { func (j *SimpleJoiner) ReadAt(b []byte, off int64) (read int, err error) {
// since offset is int64 and swarm spans are uint64 it means we cannot seek beyond int64 max value // since offset is int64 and swarm spans are uint64 it means we cannot seek beyond int64 max value
return j.readAtOffset(b, j.rootData, 0, j.span, off)
}
func (j *SimpleJoiner) readAtOffset(b, data []byte, cur, subTrieSize, off int64) (read int, err error) {
if off >= j.span { if off >= j.span {
return 0, io.EOF return 0, io.EOF
} }
readLen := int64(cap(b))
if readLen > j.span-off {
readLen = j.span - off
}
var bytesRead int64
var eg errgroup.Group
j.readAtOffset(b, j.rootData, 0, j.span, off, 0, readLen, &bytesRead, &eg)
err = eg.Wait()
if err != nil {
return 0, err
}
return int(atomic.LoadInt64(&bytesRead)), nil
}
func (j *SimpleJoiner) readAtOffset(b, data []byte, cur, subTrieSize, off, bufferOffset, bytesToRead int64, bytesRead *int64, eg *errgroup.Group) {
// we are at a leaf data chunk
if subTrieSize <= int64(len(data)) { if subTrieSize <= int64(len(data)) {
capacity := int64(cap(b))
dataOffsetStart := off - cur dataOffsetStart := off - cur
dataOffsetEnd := dataOffsetStart + capacity dataOffsetEnd := dataOffsetStart + bytesToRead
if lenDataToCopy := int64(len(data)) - dataOffsetStart; capacity > lenDataToCopy { if lenDataToCopy := int64(len(data)) - dataOffsetStart; bytesToRead > lenDataToCopy {
dataOffsetEnd = dataOffsetStart + lenDataToCopy dataOffsetEnd = dataOffsetStart + lenDataToCopy
} }
bs := data[dataOffsetStart:dataOffsetEnd] bs := data[dataOffsetStart:dataOffsetEnd]
n := copy(b, bs) n := copy(b[bufferOffset:bufferOffset+int64(len(bs))], bs)
return n, nil atomic.AddInt64(bytesRead, int64(n))
return
} }
for cursor := 0; cursor < len(data); cursor += j.refLength { for cursor := 0; cursor < len(data); cursor += j.refLength {
if bytesToRead == 0 {
break
}
// fast forward the cursor
sec := subtrieSection(data, cursor, j.refLength, subTrieSize)
if cur+sec < off {
cur += sec
continue
}
// if we are here it means that we are within the bounds of the data we need to read
address := swarm.NewAddress(data[cursor : cursor+j.refLength]) address := swarm.NewAddress(data[cursor : cursor+j.refLength])
subtrieSpan := sec
currentReadSize := subtrieSpan - (off - cur) // the size of the subtrie, minus the offset from the start of the trie
// upper bound alignments
if currentReadSize > bytesToRead {
currentReadSize = bytesToRead
}
if currentReadSize > subtrieSpan {
currentReadSize = subtrieSpan
}
func(address swarm.Address, b []byte, cur, subTrieSize, off, bufferOffset, bytesToRead int64) {
eg.Go(func() error {
ch, err := j.getter.Get(j.ctx, storage.ModeGetRequest, address) ch, err := j.getter.Get(j.ctx, storage.ModeGetRequest, address)
if err != nil { if err != nil {
return 0, err return err
} }
chunkData := ch.Data()[8:] chunkData := ch.Data()[8:]
subtrieSpan := int64(chunkToSpan(ch.Data())) subtrieSpan := int64(chunkToSpan(ch.Data()))
j.readAtOffset(b, chunkData, cur, subtrieSpan, off, bufferOffset, currentReadSize, bytesRead, eg)
return nil
})
}(address, b, cur, subtrieSpan, off, bufferOffset, currentReadSize)
// we have the size of the subtrie now, if the read offset is within this chunk, bufferOffset += currentReadSize
// then we drilldown more bytesToRead -= currentReadSize
if off < cur+subtrieSpan { cur += subtrieSpan
return j.readAtOffset(b, chunkData, cur, subtrieSpan, off) off = cur
}
}
// brute-forces the subtrie size for each of the sections in this intermediate chunk
func subtrieSection(data []byte, startIdx, refLen int, subtrieSize int64) int64 {
// assume we have a trie of size `y` then we can assume that all of
// the forks except for the last one on the right are of equal size
// this is due to how the splitter wraps levels.
// so for the branches on the left, we can assume that
// y = (refs - 1) * x + l
// where y is the size of the subtrie, refs are the number of references
// x is constant (the brute forced value) and l is the size of the last subtrie
var (
refs = int64(len(data) / refLen) // how many references in the intermediate chunk
branching = int64(4096 / refLen) // branching factor is chunkSize divided by reference length
branchSize = int64(4096)
)
for {
whatsLeft := subtrieSize - (branchSize * (refs - 1))
if whatsLeft <= branchSize {
break
} }
cur += subtrieSpan branchSize *= branching
} }
return 0, errOffset // handle last branch edge case
if startIdx == int(refs-1)*refLen {
return subtrieSize - (refs-1)*branchSize
}
return branchSize
} }
var errWhence = errors.New("seek: invalid whence") var errWhence = errors.New("seek: invalid whence")
......
...@@ -99,7 +99,7 @@ func TestSeek(t *testing.T) { ...@@ -99,7 +99,7 @@ func TestSeek(t *testing.T) {
got = got[:count] got = got[:count]
want := data[i : i+count] want := data[i : i+count]
if !bytes.Equal(got, want) { if !bytes.Equal(got, want) {
t.Errorf("read on seek to %v from %v: got data %x, want %s", name, i, got, want) t.Fatal("data mismatch")
} }
} }
...@@ -176,6 +176,208 @@ func TestSeek(t *testing.T) { ...@@ -176,6 +176,208 @@ func TestSeek(t *testing.T) {
} }
} }
// TestPrefetch tests that prefetching chunks is made to fill up the read buffer
func TestPrefetch(t *testing.T) {
seed := time.Now().UnixNano()
r := mrand.New(mrand.NewSource(seed))
for _, tc := range []struct {
name string
size int64
bufferSize int
readOffset int64
expRead int
}{
{
name: "one byte",
size: 1,
bufferSize: 1,
readOffset: 0,
expRead: 1,
},
{
name: "one byte",
size: 1,
bufferSize: 10,
readOffset: 0,
expRead: 1,
},
{
name: "ten bytes",
size: 10,
bufferSize: 5,
readOffset: 0,
expRead: 5,
},
{
name: "thousand bytes",
size: 1000,
bufferSize: 100,
readOffset: 0,
expRead: 100,
},
{
name: "thousand bytes",
size: 1000,
bufferSize: 100,
readOffset: 900,
expRead: 100,
},
{
name: "thousand bytes",
size: 1000,
bufferSize: 100,
readOffset: 800,
expRead: 100,
},
{
name: "one chunk",
size: 4096,
bufferSize: 4096,
readOffset: 0,
expRead: 4096,
},
{
name: "one chunk minus a few",
size: 4096,
bufferSize: 4093,
readOffset: 0,
expRead: 4093,
},
{
name: "one chunk minus a few",
size: 4096,
bufferSize: 4093,
readOffset: 3,
expRead: 4093,
},
{
name: "one byte at the end",
size: 4096,
bufferSize: 1,
readOffset: 4095,
expRead: 1,
},
{
name: "one byte at the end",
size: 8192,
bufferSize: 1,
readOffset: 8191,
expRead: 1,
},
{
name: "one byte at the end",
size: 8192,
bufferSize: 1,
readOffset: 8190,
expRead: 1,
},
{
name: "one byte at the end",
size: 100000,
bufferSize: 1,
readOffset: 99999,
expRead: 1,
},
{
name: "10kb",
size: 10000,
bufferSize: 5,
readOffset: 5,
expRead: 5,
},
{
name: "10kb",
size: 10000,
bufferSize: 1500,
readOffset: 5,
expRead: 1500,
},
{
name: "100kb",
size: 100000,
bufferSize: 8000,
readOffset: 100,
expRead: 8000,
},
{
name: "100kb",
size: 100000,
bufferSize: 80000,
readOffset: 100,
expRead: 80000,
},
{
name: "10megs",
size: 10000000,
bufferSize: 8000,
readOffset: 990000,
expRead: 8000,
},
{
name: "10megs",
size: 10000000,
bufferSize: 80000,
readOffset: 900000,
expRead: 80000,
},
{
name: "10megs",
size: 10000000,
bufferSize: 8000000,
readOffset: 900000,
expRead: 8000000,
},
{
name: "10megs",
size: 1000000,
bufferSize: 2000000,
readOffset: 900000,
expRead: 100000,
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
store := mock.NewStorer()
defer store.Close()
data, err := ioutil.ReadAll(io.LimitReader(r, tc.size))
if err != nil {
t.Fatal(err)
}
s := splitter.NewSimpleSplitter(store, storage.ModePutUpload)
addr, err := s.Split(ctx, ioutil.NopCloser(bytes.NewReader(data)), tc.size, false)
if err != nil {
t.Fatal(err)
}
j, _, err := internal.NewSimpleJoiner(ctx, store, addr)
if err != nil {
t.Fatal(err)
}
b := make([]byte, tc.bufferSize)
n, err := j.ReadAt(b, tc.readOffset)
if err != nil {
t.Fatal(err)
}
if n != tc.expRead {
t.Errorf("read %d bytes out of %d", n, tc.expRead)
}
ro := int(tc.readOffset)
if !bytes.Equal(b[:n], data[ro:ro+n]) {
t.Error("buffer does not match generated data")
}
})
}
}
// TestSimpleJoinerReadAt // TestSimpleJoinerReadAt
func TestSimpleJoinerReadAt(t *testing.T) { func TestSimpleJoinerReadAt(t *testing.T) {
store := mock.NewStorer() store := mock.NewStorer()
......
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io"
"github.com/ethersphere/bee/pkg/encryption/store" "github.com/ethersphere/bee/pkg/encryption/store"
"github.com/ethersphere/bee/pkg/file" "github.com/ethersphere/bee/pkg/file"
...@@ -50,6 +49,6 @@ func (s *simpleJoiner) Size(ctx context.Context, address swarm.Address) (int64, ...@@ -50,6 +49,6 @@ func (s *simpleJoiner) Size(ctx context.Context, address swarm.Address) (int64,
// //
// It uses a non-optimized internal component that only retrieves a data chunk // It uses a non-optimized internal component that only retrieves a data chunk
// after the previous has been read. // after the previous has been read.
func (s *simpleJoiner) Join(ctx context.Context, address swarm.Address) (dataOut io.ReadSeeker, dataSize int64, err error) { func (s *simpleJoiner) Join(ctx context.Context, address swarm.Address) (dataOut file.Reader, dataSize int64, err error) {
return internal.NewSimpleJoiner(ctx, s.getter, address) return internal.NewSimpleJoiner(ctx, s.getter, address)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment