Commit 00f4c06a authored by acud's avatar acud Committed by GitHub

pipeline: hasher bug (#1267)

parent f56d2edb
......@@ -71,7 +71,9 @@ func TestGatewayMode(t *testing.T) {
jsonhttptest.Request(t, client, http.MethodPost, "/chunks/0773a91efd6547c754fc1d95fb1c62c7d1b47f959c2caa685dfec8736da95c1c", http.StatusForbidden, forbiddenResponseOption, headerOption)
jsonhttptest.Request(t, client, http.MethodPost, "/bytes", http.StatusOK) // should work without pinning
jsonhttptest.Request(t, client, http.MethodPost, "/bytes", http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())),
) // should work without pinning
jsonhttptest.Request(t, client, http.MethodPost, "/bytes", http.StatusForbidden, forbiddenResponseOption, headerOption)
jsonhttptest.Request(t, client, http.MethodPost, "/files", http.StatusForbidden, forbiddenResponseOption, headerOption)
jsonhttptest.Request(t, client, http.MethodPost, "/dirs", http.StatusForbidden, forbiddenResponseOption, headerOption)
......@@ -85,7 +87,9 @@ func TestGatewayMode(t *testing.T) {
Code: http.StatusForbidden,
})
jsonhttptest.Request(t, client, http.MethodPost, "/bytes", http.StatusOK) // should work without pinning
jsonhttptest.Request(t, client, http.MethodPost, "/bytes", http.StatusOK,
jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())),
) // should work without pinning
jsonhttptest.Request(t, client, http.MethodPost, "/bytes", http.StatusForbidden, forbiddenResponseOption, headerOption)
jsonhttptest.Request(t, client, http.MethodPost, "/files", http.StatusForbidden, forbiddenResponseOption, headerOption)
jsonhttptest.Request(t, client, http.MethodPost, "/dirs", http.StatusForbidden, forbiddenResponseOption, headerOption)
......
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package hashtrie
var ErrTrieFull = errTrieFull
......@@ -12,15 +12,21 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
)
var errInconsistentRefs = errors.New("inconsistent reference lengths in level")
var (
errInconsistentRefs = errors.New("inconsistent references")
errTrieFull = errors.New("trie full")
)
const maxLevel = 8
type hashTrieWriter struct {
branching int
chunkSize int
refSize int
fullChunk int // full chunk size in terms of the data represented in the buffer (span+refsize)
cursors []int // level cursors, key is level. level 0 is data level
cursors []int // level cursors, key is level. level 0 is data level and is not represented in this package. writes always start at level 1. higher levels will always have LOWER cursor values.
buffer []byte // keeps all level data
full bool // indicates whether the trie is full. currently we support (128^7)*4096 = 2305843009213693952 bytes
pipelineFn pipeline.PipelineFunc
}
......@@ -44,17 +50,19 @@ func (h *hashTrieWriter) ChainWrite(p *pipeline.PipeWriteArgs) error {
if l%oneRef != 0 {
return errInconsistentRefs
}
if h.full {
return errTrieFull
}
return h.writeToLevel(1, p.Span, p.Ref, p.Key)
}
func (h *hashTrieWriter) writeToLevel(level int, span, ref, key []byte) error {
copy(h.buffer[h.cursors[level]:h.cursors[level]+len(span)], span) //copy the span slongside
copy(h.buffer[h.cursors[level]:h.cursors[level]+len(span)], span)
h.cursors[level] += len(span)
copy(h.buffer[h.cursors[level]:h.cursors[level]+len(ref)], ref)
h.cursors[level] += len(ref)
copy(h.buffer[h.cursors[level]:h.cursors[level]+len(key)], key)
h.cursors[level] += len(key)
howLong := (h.refSize + swarm.SpanSize) * h.branching
if h.levelSize(level) == howLong {
......@@ -72,7 +80,6 @@ func (h *hashTrieWriter) writeToLevel(level int, span, ref, key []byte) error {
// - call the short pipeline with the span and the buffer
// - get the hash that was created, append it one level above, and if necessary, wrap that level too
// - remove already hashed data from buffer
// assumes that the function has been called when refsize+span*branching has been reached
func (h *hashTrieWriter) wrapFullLevel(level int) error {
data := h.buffer[h.cursors[level+1]:h.cursors[level]]
......@@ -105,26 +112,66 @@ func (h *hashTrieWriter) wrapFullLevel(level int) error {
// this "truncates" the current level that was wrapped
// by setting the cursors to the cursors of one level above
h.cursors[level] = h.cursors[level+1]
if level+1 == 8 {
h.full = true
}
return nil
}
// pulls and potentially wraps all levels up to target
func (h *hashTrieWriter) hoistLevels(target int) ([]byte, error) {
func (h *hashTrieWriter) levelSize(level int) int {
if level == 8 {
return h.cursors[level]
}
return h.cursors[level] - h.cursors[level+1]
}
// Sum returns the Swarm merkle-root content-addressed hash
// of an arbitrary-length binary data.
// The algorithm it uses is as follows:
// - From level 1 till maxLevel 8, iterate:
// - If level data length equals 0 then continue to next level
// - If level data length equals 1 reference then carry over level data to next
// - If level data length is bigger than 1 reference then sum the level and
// write the result to the next level
// - Return the hash in level 8
// the cases are as follows:
// - one hash in a given level, in which case we _do not_ perform a hashing operation, but just move
// the hash to the next level, potentially resulting in a level wrap
// - more than one hash, in which case we _do_ perform a hashing operation, appending the hash to
// the next level
func (h *hashTrieWriter) Sum() ([]byte, error) {
oneRef := h.refSize + swarm.SpanSize
for i := 1; i < target; i++ {
for i := 1; i < maxLevel; i++ {
l := h.levelSize(i)
if l%oneRef != 0 {
return nil, errInconsistentRefs
}
switch {
case l == 0:
// level empty, continue to the next.
continue
case l == h.fullChunk:
// this case is possible and necessary due to the carry over
// in the next switch case statement. normal writes done
// through writeToLevel will automatically wrap a full level.
err := h.wrapFullLevel(i)
if err != nil {
return nil, err
}
case l == oneRef:
// this cursor assignment basically means:
// take the hash|span|key from this level, and append it to
// the data of the next level. you may wonder how this works:
// every time we sum a level, the sum gets written into the next level
// and the level cursor gets set to the next level's cursor (see the
// truncating at the end of wrapFullLevel). there might (or not) be
// a hash at the next level, and the cursor of the next level is
// necessarily _smaller_ than the cursor of this level, so in fact what
// happens is that due to the shifting of the cursors, the data of this
// level will appear to be concatenated with the data of the next level.
// we therefore get a "carry-over" behavior between intermediate levels
// that might or might not have data. the eventual result is that the last
// hash generated will always be carried over to the last level (8), then returned.
h.cursors[i+1] = h.cursors[i]
default:
// more than 0 but smaller than chunk size - wrap the level to the one above it
......@@ -134,56 +181,12 @@ func (h *hashTrieWriter) hoistLevels(target int) ([]byte, error) {
}
}
}
level := target
tlen := h.levelSize(target)
data := h.buffer[h.cursors[level+1]:h.cursors[level]]
if tlen%oneRef != 0 {
levelLen := h.levelSize(8)
if levelLen != oneRef {
return nil, errInconsistentRefs
}
if tlen == oneRef {
return data[8:], nil
}
// here we are still with possible length of more than one ref in the highest+1 level
sp := uint64(0)
var hashes []byte
for i := 0; i < len(data); i += h.refSize + 8 {
// sum up the spans of the level, then we need to bmt them and store it as a chunk
// then write the chunk address to the next level up
sp += binary.LittleEndian.Uint64(data[i : i+8])
hash := data[i+8 : i+h.refSize+8]
hashes = append(hashes, hash...)
}
spb := make([]byte, 8)
binary.LittleEndian.PutUint64(spb, sp)
hashes = append(spb, hashes...)
writer := h.pipelineFn()
args := pipeline.PipeWriteArgs{
Data: hashes,
Span: spb,
}
err := writer.ChainWrite(&args)
ref := append(args.Ref, args.Key...)
return ref, err
}
func (h *hashTrieWriter) levelSize(level int) int {
if level == 8 {
return h.cursors[level]
}
return h.cursors[level] - h.cursors[level+1]
}
func (h *hashTrieWriter) Sum() ([]byte, error) {
// look from the top down, to look for the highest hash of a balanced tree
// then, whatever is in the levels below that is necessarily unbalanced,
// so, we'd like to reduce those levels to one hash, then wrap it together
// with the balanced tree hash, to produce the root chunk
highest := 1
for i := 8; i > 0; i-- {
if h.levelSize(i) > 0 && i > highest {
highest = i
}
}
return h.hoistLevels(highest)
// return the hash in the highest level, that's all we need
data := h.buffer[0:h.cursors[8]]
return data[8:], nil
}
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package hashtrie_test
import (
"context"
"encoding/binary"
"errors"
"testing"
"github.com/ethersphere/bee/pkg/file/pipeline"
"github.com/ethersphere/bee/pkg/file/pipeline/bmt"
"github.com/ethersphere/bee/pkg/file/pipeline/hashtrie"
"github.com/ethersphere/bee/pkg/file/pipeline/store"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/mock"
"github.com/ethersphere/bee/pkg/swarm"
)
var (
addr swarm.Address
span []byte
ctx = context.Background()
mode = storage.ModePutUpload
)
func init() {
b := make([]byte, 32)
b[31] = 0x01
addr = swarm.NewAddress(b)
span = make([]byte, 8)
binary.LittleEndian.PutUint64(span, 1)
}
func TestLevels(t *testing.T) {
var (
branching = 4
chunkSize = 128
hashSize = 32
)
// to create a level wrap we need to do branching^(level-1) writes
for _, tc := range []struct {
desc string
writes int
}{
{
desc: "2 at L1",
writes: 2,
},
{
desc: "1 at L2, 1 at L1", // dangling chunk
writes: 16 + 1,
},
{
desc: "1 at L3, 1 at L2, 1 at L1",
writes: 64 + 16 + 1,
},
{
desc: "1 at L3, 2 at L2, 1 at L1",
writes: 64 + 16 + 16 + 1,
},
{
desc: "1 at L5, 1 at L1",
writes: 1024 + 1,
},
{
desc: "1 at L5, 1 at L3",
writes: 1024 + 1,
},
{
desc: "2 at L5, 1 at L1",
writes: 1024 + 1024 + 1,
},
{
desc: "3 at L5, 2 at L3, 1 at L1",
writes: 1024 + 1024 + 1024 + 64 + 64 + 1,
},
{
desc: "1 at L7, 1 at L1",
writes: 4096 + 1,
},
{
desc: "1 at L8", // balanced trie - all good
writes: 16384,
},
} {
t.Run(tc.desc, func(t *testing.T) {
s := mock.NewStorer()
pf := func() pipeline.ChainWriter {
lsw := store.NewStoreWriter(ctx, s, mode, nil)
return bmt.NewBmtWriter(lsw)
}
ht := hashtrie.NewHashTrieWriter(chunkSize, branching, hashSize, pf)
for i := 0; i < tc.writes; i++ {
a := &pipeline.PipeWriteArgs{Ref: addr.Bytes(), Span: span}
err := ht.ChainWrite(a)
if err != nil {
t.Fatal(err)
}
}
ref, err := ht.Sum()
if err != nil {
t.Fatal(err)
}
rootch, err := s.Get(ctx, storage.ModeGetRequest, swarm.NewAddress(ref))
if err != nil {
t.Fatal(err)
}
//check the span. since write spans are 1 value 1, then expected span == tc.writes
sp := binary.LittleEndian.Uint64(rootch.Data()[:swarm.SpanSize])
if sp != uint64(tc.writes) {
t.Fatalf("want span %d got %d", tc.writes, sp)
}
})
}
}
func TestLevels_TrieFull(t *testing.T) {
var (
branching = 4
chunkSize = 128
hashSize = 32
writes = 16384 // this is to get a balanced trie
s = mock.NewStorer()
pf = func() pipeline.ChainWriter {
lsw := store.NewStoreWriter(ctx, s, mode, nil)
return bmt.NewBmtWriter(lsw)
}
ht = hashtrie.NewHashTrieWriter(chunkSize, branching, hashSize, pf)
)
// to create a level wrap we need to do branching^(level-1) writes
for i := 0; i < writes; i++ {
a := &pipeline.PipeWriteArgs{Ref: addr.Bytes(), Span: span}
err := ht.ChainWrite(a)
if err != nil {
t.Fatal(err)
}
}
a := &pipeline.PipeWriteArgs{Ref: addr.Bytes(), Span: span}
err := ht.ChainWrite(a)
if !errors.Is(err, hashtrie.ErrTrieFull) {
t.Fatal(err)
}
// it is questionable whether the writer should go into some
// corrupt state after the last write which causes the trie full
// error, in which case we would return an error on Sum()
_, err = ht.Sum()
if err != nil {
t.Fatal(err)
}
}
// TestRegression is a regression test for the bug
// described in https://github.com/ethersphere/bee/issues/1175
func TestRegression(t *testing.T) {
var (
branching = 128
chunkSize = 4096
hashSize = 32
writes = 67100000 / 4096
span = make([]byte, 8)
s = mock.NewStorer()
pf = func() pipeline.ChainWriter {
lsw := store.NewStoreWriter(ctx, s, mode, nil)
return bmt.NewBmtWriter(lsw)
}
ht = hashtrie.NewHashTrieWriter(chunkSize, branching, hashSize, pf)
)
binary.LittleEndian.PutUint64(span, 4096)
for i := 0; i < writes; i++ {
a := &pipeline.PipeWriteArgs{Ref: addr.Bytes(), Span: span}
err := ht.ChainWrite(a)
if err != nil {
t.Fatal(err)
}
}
ref, err := ht.Sum()
if err != nil {
t.Fatal(err)
}
rootch, err := s.Get(ctx, storage.ModeGetRequest, swarm.NewAddress(ref))
if err != nil {
t.Fatal(err)
}
sp := binary.LittleEndian.Uint64(rootch.Data()[:swarm.SpanSize])
if sp != uint64(writes*4096) {
t.Fatalf("want span %d got %d", writes*4096, sp)
}
}
......@@ -45,7 +45,6 @@ func (w *storeWriter) ChainWrite(p *pipeline.PipeWriteArgs) error {
} else {
c = swarm.NewChunk(swarm.NewAddress(p.Ref), p.Data)
}
seen, err := w.l.Put(w.ctx, w.mode, c)
if err != nil {
return err
......
......@@ -93,7 +93,13 @@ func (m *MockStorer) Put(ctx context.Context, mode storage.ModePut, chs ...swarm
po := swarm.Proximity(ch.Address().Bytes(), m.baseAddress)
m.bins[po]++
}
m.store[ch.Address().String()] = ch.Data()
// this is needed since the chunk feeder shares memory across calls
// to the pipeline. this is in order to avoid multiple allocations.
// this change mimics the behavior of shed and localstore
// and copies the data from the call into the in-memory store
b := make([]byte, len(ch.Data()))
copy(b, ch.Data())
m.store[ch.Address().String()] = b
m.modePut[ch.Address().String()] = mode
// pin chunks if needed
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment