Commit f8843d15 authored by Nemanja Zbiljić's avatar Nemanja Zbiljić Committed by GitHub

Reverse the order of the queue for push sync index (#827)

parent 5f33df8b
......@@ -17,6 +17,7 @@
package localstore
import (
"bytes"
"context"
"sync"
"time"
......@@ -52,14 +53,15 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop fun
// close the returned chunkInfo channel at the end to
// signal that the subscription is done
defer close(chunks)
// sinceItem is the Item from which the next iteration
// should start. The first iteration starts from the first Item.
var sinceItem *shed.Item
// lastItem is the first Item received in the last iteration.
var lastItem *shed.Item
// toItemKey is the key for the Item that was oldest in the last iteration.
var toItemKey []byte
for {
select {
case <-trigger:
// iterate until:
// - last index Item is reached
// - last non-processed Item is reached
// - subscription stop is called
// - context is done.met
db.metrics.SubscribePushIteration.Inc()
......@@ -73,12 +75,27 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop fun
return true, err
}
// check if we reached item that was already processed
// and stop
if toItemKey != nil {
dataItemKey, err := db.pushIndex.ItemKey(dataItem)
if err != nil {
return true, err
}
if bytes.Equal(dataItemKey, toItemKey) {
toItemKey = nil
return true, nil
}
}
select {
case chunks <- swarm.NewChunk(swarm.NewAddress(dataItem.Address), dataItem.Data).WithTagID(item.Tag):
count++
// set next iteration start item
// when its chunk is successfully sent to channel
sinceItem = &item
// we set first one sent, which is "oldest" at that point
if lastItem == nil {
lastItem = &item
}
return false, nil
case <-stopChan:
// gracefully stop the iteration
......@@ -92,10 +109,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop fun
return true, ctx.Err()
}
}, &shed.IterateOptions{
StartFrom: sinceItem,
// sinceItem was sent as the last Address in the previous
// iterator call, skip it in this one
SkipStartFromItem: true,
Reverse: true,
})
totalTimeMetric(db.metrics.TotalTimeSubscribePushIteration, iterStart)
......@@ -105,6 +119,20 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop fun
db.logger.Debugf("localstore push subscription iteration: %v", err)
return
}
// save last Item from this iteration in order to know where
// to stop on next iteration
if lastItem != nil && toItemKey == nil {
// move 'toItemKey' to point to last item in previous iteration
toItemKey, err = db.pushIndex.ItemKey(*lastItem)
if err != nil {
return
}
}
// 'lastItem' should be populated on next iteration again
lastItem = nil
case <-stopChan:
// terminate the subscription
// on stop
......
......@@ -37,6 +37,8 @@ func TestDB_SubscribePush(t *testing.T) {
chunks := make([]swarm.Chunk, 0)
var chunksMu sync.Mutex
chunkProcessedTimes := make([]int, 0)
uploadRandomChunks := func(count int) {
chunksMu.Lock()
defer chunksMu.Unlock()
......@@ -50,9 +52,22 @@ func TestDB_SubscribePush(t *testing.T) {
}
chunks = append(chunks, ch)
chunkProcessedTimes = append(chunkProcessedTimes, 0)
}
}
// caller expected to hold lock on chunksMu
findChunkIndex := func(chunk swarm.Chunk) int {
for i, c := range chunks {
if chunk.Address().Equal(c.Address()) {
return i
}
}
return -1
}
// prepopulate database with some chunks
// before the subscription
uploadRandomChunks(10)
......@@ -68,8 +83,11 @@ func TestDB_SubscribePush(t *testing.T) {
ch, stop := db.SubscribePush(ctx)
defer stop()
var lastStartIndex int = -1
// receive and validate addresses from the subscription
go func() {
var err error
var i int // address index
for {
select {
......@@ -78,9 +96,19 @@ func TestDB_SubscribePush(t *testing.T) {
return
}
chunksMu.Lock()
want := chunks[i]
if i > lastStartIndex {
// no way to know which chunk will come first here
gotIndex := findChunkIndex(got)
if gotIndex <= lastStartIndex {
err = fmt.Errorf("got index %v, expected index above %v", gotIndex, lastStartIndex)
}
lastStartIndex = gotIndex
i = 0
}
cIndex := lastStartIndex - i
want := chunks[cIndex]
chunkProcessedTimes[cIndex]++
chunksMu.Unlock()
var err error
if !bytes.Equal(got.Data(), want.Data()) {
err = fmt.Errorf("got chunk %v data %x, want %x", i, got.Data(), want.Data())
}
......@@ -111,6 +139,18 @@ func TestDB_SubscribePush(t *testing.T) {
uploadRandomChunks(3)
checkErrChan(ctx, t, errChan, len(chunks))
chunksMu.Lock()
if lastStartIndex != len(chunks)-1 {
t.Fatalf("got %d chunks, expected %d", lastStartIndex, len(chunks))
}
for i, pc := range chunkProcessedTimes {
if pc != 1 {
t.Fatalf("chunk on address %s processed %d times, should be only once", chunks[i].Address(), pc)
}
}
chunksMu.Unlock()
}
// TestDB_SubscribePush_multiple uploads chunks before and after
......@@ -138,6 +178,17 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
}
}
// caller expected to hold lock on addrsMu
findAddressIndex := func(address swarm.Address) int {
for i, a := range addrs {
if a.Equal(address) {
return i
}
}
return -1
}
// prepopulate database with some chunks
// before the subscription
uploadRandomChunks(10)
......@@ -152,6 +203,8 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
subsCount := 10
lastStartIndexSlice := make([]int, subsCount)
// start a number of subscriptions
// that all of them will write every addresses error to errChan
for j := 0; j < subsCount; j++ {
......@@ -160,7 +213,9 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
// receive and validate addresses from the subscription
go func(j int) {
var err error
var i int // address index
lastStartIndexSlice[j] = -1
for {
select {
case got, ok := <-ch:
......@@ -168,9 +223,18 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
return
}
addrsMu.Lock()
want := addrs[i]
if i > lastStartIndexSlice[j] {
// no way to know which chunk will come first here
gotIndex := findAddressIndex(got.Address())
if gotIndex <= lastStartIndexSlice[j] {
err = fmt.Errorf("got index %v, expected index above %v", gotIndex, lastStartIndexSlice[j])
}
lastStartIndexSlice[j] = gotIndex
i = 0
}
aIndex := lastStartIndexSlice[j] - i
want := addrs[aIndex]
addrsMu.Unlock()
var err error
if !got.Address().Equal(want) {
err = fmt.Errorf("got chunk %v address on subscription %v %s, want %s", i, j, got, want)
}
......@@ -202,4 +266,12 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
wantedChunksCount := len(addrs) * subsCount
checkErrChan(ctx, t, errChan, wantedChunksCount)
for j := 0; j < subsCount; j++ {
addrsMu.Lock()
if lastStartIndexSlice[j] != len(addrs)-1 {
t.Fatalf("got %d chunks, expected %d", lastStartIndexSlice[j], len(addrs))
}
addrsMu.Unlock()
}
}
......@@ -134,6 +134,11 @@ func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error) {
}, nil
}
// ItemKey accepts an Item and returns generated key for it.
func (f Index) ItemKey(item Item) (key []byte, err error) {
return f.encodeKeyFunc(item)
}
// Get accepts key fields represented as Item to retrieve a
// value from the index and return maximum available information
// from the index represented as another Item.
......@@ -284,6 +289,8 @@ type IterateOptions struct {
SkipStartFromItem bool
// Iterate over items which keys have a common prefix.
Prefix []byte
// Iterate over items in reverse order.
Reverse bool
}
// Iterate function iterates over keys of the Index.
......@@ -303,21 +310,67 @@ func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error) {
return fmt.Errorf("encode key: %w", err)
}
}
it := f.db.NewIterator()
defer it.Release()
var ok bool
// move the cursor to the start key
ok := it.Seek(startKey)
if !ok {
// stop iterator if seek has failed
return it.Error()
ok = it.Seek(startKey)
if !options.Reverse {
if !ok {
// stop iterator if seek has failed
return it.Error()
}
} else {
// reverse seeker
if options.StartFrom != nil {
if !ok {
return it.Error()
}
} else {
// find last key for this index (and prefix)
// move cursor to last key
ok = it.Last()
if !ok {
return it.Error()
}
if lastKeyHasPrefix := bytes.HasPrefix(it.Key(), prefix); !lastKeyHasPrefix {
// increment last prefix byte (that is not 0xFF) to try to find last key
incrementedPrefix := bytesIncrement(prefix)
if incrementedPrefix == nil {
return fmt.Errorf("index iterator invalid prefix: %v -> %v", prefix, string(prefix))
}
// should find first key after prefix (same or different index)
ok = it.Seek(incrementedPrefix)
if !ok {
return it.Error()
}
// previous key should have proper prefix
ok = it.Prev()
if !ok {
return it.Error()
}
}
}
}
itSeekerFn := it.Next
if options.Reverse {
itSeekerFn = it.Prev
}
if options.SkipStartFromItem && bytes.Equal(startKey, it.Key()) {
// skip the start from Item if it is the first key
// and it is explicitly configured to skip it
ok = it.Next()
ok = itSeekerFn()
}
for ; ok; ok = it.Next() {
for ; ok; ok = itSeekerFn() {
item, err := f.itemFromIterator(it, prefix)
if err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
......@@ -336,6 +389,26 @@ func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error) {
return it.Error()
}
// bytesIncrement increments the last byte that is not 0xFF, and returns
// a new byte array truncated after the position that was incremented.
func bytesIncrement(bytes []byte) []byte {
b := append(bytes[:0:0], bytes...)
for i := len(bytes) - 1; i >= 0; {
if b[i] == 0xFF {
i--
continue
}
// found byte smaller than 0xFF: increment and truncate
b[i]++
return b[:i+1]
}
// input contained only 0xFF bytes
return nil
}
// First returns the first item in the Index which encoded key starts with a prefix.
// If the prefix is nil, the first element of the whole index is returned.
// If Index has no elements, a leveldb.ErrNotFound error is returned.
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment