Commit 4742eb73 authored by Nemanja Zbiljić's avatar Nemanja Zbiljić Committed by GitHub

Revert reversing the iteration order from pushsync (#880)

* Revert "Reverse the order of the queue for push sync index (#827)"
parent d316a59e
......@@ -17,7 +17,6 @@
package localstore
import (
"bytes"
"context"
"sync"
"time"
......@@ -53,15 +52,14 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop fun
// close the returned chunkInfo channel at the end to
// signal that the subscription is done
defer close(chunks)
// lastItem is the first Item received in the last iteration.
var lastItem *shed.Item
// toItemKey is the key for the Item that was oldest in the last iteration.
var toItemKey []byte
// sinceItem is the Item from which the next iteration
// should start. The first iteration starts from the first Item.
var sinceItem *shed.Item
for {
select {
case <-trigger:
// iterate until:
// - last non-processed Item is reached
// - last index Item is reached
// - subscription stop is called
// - context is done.met
db.metrics.SubscribePushIteration.Inc()
......@@ -75,27 +73,12 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop fun
return true, err
}
// check if we reached item that was already processed
// and stop
if toItemKey != nil {
dataItemKey, err := db.pushIndex.ItemKey(dataItem)
if err != nil {
return true, err
}
if bytes.Equal(dataItemKey, toItemKey) {
toItemKey = nil
return true, nil
}
}
select {
case chunks <- swarm.NewChunk(swarm.NewAddress(dataItem.Address), dataItem.Data).WithTagID(item.Tag):
count++
// we set first one sent, which is "oldest" at that point
if lastItem == nil {
lastItem = &item
}
// set next iteration start item
// when its chunk is successfully sent to channel
sinceItem = &item
return false, nil
case <-stopChan:
// gracefully stop the iteration
......@@ -109,7 +92,10 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop fun
return true, ctx.Err()
}
}, &shed.IterateOptions{
Reverse: true,
StartFrom: sinceItem,
// sinceItem was sent as the last Address in the previous
// iterator call, skip it in this one
SkipStartFromItem: true,
})
totalTimeMetric(db.metrics.TotalTimeSubscribePushIteration, iterStart)
......@@ -120,19 +106,6 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop fun
return
}
// save last Item from this iteration in order to know where
// to stop on next iteration
if lastItem != nil && toItemKey == nil {
// move 'toItemKey' to point to last item in previous iteration
toItemKey, err = db.pushIndex.ItemKey(*lastItem)
if err != nil {
return
}
}
// 'lastItem' should be populated on next iteration again
lastItem = nil
case <-stopChan:
// terminate the subscription
// on stop
......
......@@ -57,17 +57,6 @@ func TestDB_SubscribePush(t *testing.T) {
}
}
// caller expected to hold lock on chunksMu
findChunkIndex := func(chunk swarm.Chunk) int {
for i, c := range chunks {
if chunk.Address().Equal(c.Address()) {
return i
}
}
return -1
}
// prepopulate database with some chunks
// before the subscription
uploadRandomChunks(10)
......@@ -83,8 +72,6 @@ func TestDB_SubscribePush(t *testing.T) {
ch, stop := db.SubscribePush(ctx)
defer stop()
var lastStartIndex int = -1
// receive and validate addresses from the subscription
go func() {
var err error
......@@ -96,16 +83,7 @@ func TestDB_SubscribePush(t *testing.T) {
return
}
chunksMu.Lock()
if i > lastStartIndex {
// no way to know which chunk will come first here
gotIndex := findChunkIndex(got)
if gotIndex <= lastStartIndex {
err = fmt.Errorf("got index %v, expected index above %v", gotIndex, lastStartIndex)
}
lastStartIndex = gotIndex
i = 0
}
cIndex := lastStartIndex - i
cIndex := i
want := chunks[cIndex]
chunkProcessedTimes[cIndex]++
chunksMu.Unlock()
......@@ -141,10 +119,6 @@ func TestDB_SubscribePush(t *testing.T) {
checkErrChan(ctx, t, errChan, len(chunks))
chunksMu.Lock()
if lastStartIndex != len(chunks)-1 {
t.Fatalf("got %d chunks, expected %d", lastStartIndex, len(chunks))
}
for i, pc := range chunkProcessedTimes {
if pc != 1 {
t.Fatalf("chunk on address %s processed %d times, should be only once", chunks[i].Address(), pc)
......@@ -178,17 +152,6 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
}
}
// caller expected to hold lock on addrsMu
findAddressIndex := func(address swarm.Address) int {
for i, a := range addrs {
if a.Equal(address) {
return i
}
}
return -1
}
// prepopulate database with some chunks
// before the subscription
uploadRandomChunks(10)
......@@ -203,8 +166,6 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
subsCount := 10
lastStartIndexSlice := make([]int, subsCount)
// start a number of subscriptions
// that all of them will write every addresses error to errChan
for j := 0; j < subsCount; j++ {
......@@ -215,7 +176,6 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
go func(j int) {
var err error
var i int // address index
lastStartIndexSlice[j] = -1
for {
select {
case got, ok := <-ch:
......@@ -223,16 +183,7 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
return
}
addrsMu.Lock()
if i > lastStartIndexSlice[j] {
// no way to know which chunk will come first here
gotIndex := findAddressIndex(got.Address())
if gotIndex <= lastStartIndexSlice[j] {
err = fmt.Errorf("got index %v, expected index above %v", gotIndex, lastStartIndexSlice[j])
}
lastStartIndexSlice[j] = gotIndex
i = 0
}
aIndex := lastStartIndexSlice[j] - i
aIndex := i
want := addrs[aIndex]
addrsMu.Unlock()
if !got.Address().Equal(want) {
......@@ -266,12 +217,4 @@ func TestDB_SubscribePush_multiple(t *testing.T) {
wantedChunksCount := len(addrs) * subsCount
checkErrChan(ctx, t, errChan, wantedChunksCount)
for j := 0; j < subsCount; j++ {
addrsMu.Lock()
if lastStartIndexSlice[j] != len(addrs)-1 {
t.Fatalf("got %d chunks, expected %d", lastStartIndexSlice[j], len(addrs))
}
addrsMu.Unlock()
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment