Commit 32f0689d authored by Janoš Guljaš's avatar Janoš Guljaš Committed by GitHub

set chunk as synced only by push sync (#908)

parent 141f5f8e
...@@ -81,7 +81,7 @@ func testDBCollectGarbageWorker(t *testing.T) { ...@@ -81,7 +81,7 @@ func testDBCollectGarbageWorker(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -173,7 +173,7 @@ func TestPinGC(t *testing.T) { ...@@ -173,7 +173,7 @@ func TestPinGC(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -270,14 +270,14 @@ func TestGCAfterPin(t *testing.T) { ...@@ -270,14 +270,14 @@ func TestGCAfterPin(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// Pin before adding to GC in ModeSetSyncPull // Pin before adding to GC in ModeSetSync
err = db.Set(context.Background(), storage.ModeSetPin, ch.Address()) err = db.Set(context.Background(), storage.ModeSetPin, ch.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
pinAddrs = append(pinAddrs, ch.Address()) pinAddrs = append(pinAddrs, ch.Address())
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -321,7 +321,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { ...@@ -321,7 +321,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -363,7 +363,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { ...@@ -363,7 +363,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -454,7 +454,7 @@ func TestDB_gcSize(t *testing.T) { ...@@ -454,7 +454,7 @@ func TestDB_gcSize(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -549,7 +549,7 @@ func TestPinAfterMultiGC(t *testing.T) { ...@@ -549,7 +549,7 @@ func TestPinAfterMultiGC(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -565,7 +565,7 @@ func TestPinAfterMultiGC(t *testing.T) { ...@@ -565,7 +565,7 @@ func TestPinAfterMultiGC(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -576,7 +576,7 @@ func TestPinAfterMultiGC(t *testing.T) { ...@@ -576,7 +576,7 @@ func TestPinAfterMultiGC(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -612,7 +612,7 @@ func generateAndPinAChunk(t *testing.T, db *DB) swarm.Chunk { ...@@ -612,7 +612,7 @@ func generateAndPinAChunk(t *testing.T, db *DB) swarm.Chunk {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = db.Set(context.Background(), storage.ModeSetSyncPull, pinnedChunk.Address()) err = db.Set(context.Background(), storage.ModeSetSync, pinnedChunk.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -698,7 +698,7 @@ func addRandomChunks(t *testing.T, count int, db *DB, pin bool) []swarm.Chunk { ...@@ -698,7 +698,7 @@ func addRandomChunks(t *testing.T, count int, db *DB, pin bool) []swarm.Chunk {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -713,7 +713,7 @@ func addRandomChunks(t *testing.T, count int, db *DB, pin bool) []swarm.Chunk { ...@@ -713,7 +713,7 @@ func addRandomChunks(t *testing.T, count int, db *DB, pin bool) []swarm.Chunk {
} else { } else {
// Non pinned chunks could be GC'd by the time they reach here. // Non pinned chunks could be GC'd by the time they reach here.
// so it is okay to ignore the error // so it is okay to ignore the error
_ = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) _ = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
_ = db.Set(context.Background(), storage.ModeSetAccess, ch.Address()) _ = db.Set(context.Background(), storage.ModeSetAccess, ch.Address())
_, _ = db.Get(context.Background(), storage.ModeGetRequest, ch.Address()) _, _ = db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
} }
......
...@@ -133,7 +133,7 @@ func TestDB_gcIndex(t *testing.T) { ...@@ -133,7 +133,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("sync one chunk", func(t *testing.T) { t.Run("sync one chunk", func(t *testing.T) {
ch := chunks[0] ch := chunks[0]
err := db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) err := db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -146,7 +146,7 @@ func TestDB_gcIndex(t *testing.T) { ...@@ -146,7 +146,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("sync all chunks", func(t *testing.T) { t.Run("sync all chunks", func(t *testing.T) {
for i := range chunks { for i := range chunks {
err := db.Set(context.Background(), storage.ModeSetSyncPull, chunks[i].Address()) err := db.Set(context.Background(), storage.ModeSetSync, chunks[i].Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -75,7 +75,7 @@ func TestModeGetRequest(t *testing.T) { ...@@ -75,7 +75,7 @@ func TestModeGetRequest(t *testing.T) {
}) })
// set chunk to synced state // set chunk to synced state
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -77,7 +77,7 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) { ...@@ -77,7 +77,7 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) {
db.binIDs.PutInBatch(batch, uint64(po), id) db.binIDs.PutInBatch(batch, uint64(po), id)
} }
case storage.ModeSetSyncPush, storage.ModeSetSyncPull: case storage.ModeSetSync:
for _, addr := range addrs { for _, addr := range addrs {
c, err := db.setSync(batch, addr, mode) c, err := db.setSync(batch, addr, mode)
if err != nil { if err != nil {
...@@ -197,9 +197,7 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr swar ...@@ -197,9 +197,7 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr swar
} }
// setSync adds the chunk to the garbage collection after syncing by updating indexes // setSync adds the chunk to the garbage collection after syncing by updating indexes
// - ModeSetSyncPull - the corresponding tag is incremented, pull index item tag value // - ModeSetSync - the corresponding tag is incremented, then item is removed
// is then set to 0 to prevent duplicate increments for the same chunk synced multiple times
// - ModeSetSyncPush - the corresponding tag is incremented, then item is removed
// from push sync index // from push sync index
// - update to gc index happens given item does not exist in pin index // - update to gc index happens given item does not exist in pin index
// Provided batch is updated. // Provided batch is updated.
...@@ -228,13 +226,12 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod ...@@ -228,13 +226,12 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod
item.StoreTimestamp = i.StoreTimestamp item.StoreTimestamp = i.StoreTimestamp
item.BinID = i.BinID item.BinID = i.BinID
if mode == storage.ModeSetSyncPush { i, err = db.pushIndex.Get(item)
i, err := db.pushIndex.Get(item)
if err != nil { if err != nil {
if errors.Is(err, leveldb.ErrNotFound) { if errors.Is(err, leveldb.ErrNotFound) {
// we handle this error internally, since this is an internal inconsistency of the indices // we handle this error internally, since this is an internal inconsistency of the indices
// this error can happen if the chunk is put with ModePutRequest or ModePutSync // this error can happen if the chunk is put with ModePutRequest or ModePutSync
// but this function is called with ModeSetSyncPush // but this function is called with ModeSetSync
db.logger.Debugf("localstore: chunk with address %s not found in push index", addr) db.logger.Debugf("localstore: chunk with address %s not found in push index", addr)
} else { } else {
return 0, err return 0, err
...@@ -258,7 +255,6 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod ...@@ -258,7 +255,6 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod
if err != nil { if err != nil {
return 0, err return 0, err
} }
}
i, err = db.retrievalAccessIndex.Get(item) i, err = db.retrievalAccessIndex.Get(item)
switch { switch {
......
...@@ -19,12 +19,13 @@ package localstore ...@@ -19,12 +19,13 @@ package localstore
import ( import (
"context" "context"
"errors" "errors"
"github.com/ethersphere/bee/pkg/logging"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"io/ioutil" "io/ioutil"
"testing" "testing"
"time" "time"
"github.com/ethersphere/bee/pkg/logging"
statestore "github.com/ethersphere/bee/pkg/statestore/mock"
"github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/tags" "github.com/ethersphere/bee/pkg/tags"
...@@ -72,7 +73,7 @@ func TestModeSetAccess(t *testing.T) { ...@@ -72,7 +73,7 @@ func TestModeSetAccess(t *testing.T) {
// here we try to set a normal tag (that should be handled by pushsync) // here we try to set a normal tag (that should be handled by pushsync)
// as a result we should expect the tag value to remain in the pull index // as a result we should expect the tag value to remain in the pull index
// and we expect that the tag should not be incremented by pull sync set // and we expect that the tag should not be incremented by pull sync set
func TestModeSetSyncPullNormalTag(t *testing.T) { func TestModeSetSyncNormalTag(t *testing.T) {
mockStatestore := statestore.NewStateStore() mockStatestore := statestore.NewStateStore()
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
db := newTestDB(t, &Options{Tags: tags.NewTags(mockStatestore, logger)}) db := newTestDB(t, &Options{Tags: tags.NewTags(mockStatestore, logger)})
...@@ -105,7 +106,7 @@ func TestModeSetSyncPullNormalTag(t *testing.T) { ...@@ -105,7 +106,7 @@ func TestModeSetSyncPullNormalTag(t *testing.T) {
t.Fatalf("unexpected tag id value got %d want %d", item.Tag, tag.Uid) t.Fatalf("unexpected tag id value got %d want %d", item.Tag, tag.Uid)
} }
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -124,85 +125,8 @@ func TestModeSetSyncPullNormalTag(t *testing.T) { ...@@ -124,85 +125,8 @@ func TestModeSetSyncPullNormalTag(t *testing.T) {
t.Fatalf("unexpected tag id value got %d want %d", item.Tag, tag.Uid) t.Fatalf("unexpected tag id value got %d want %d", item.Tag, tag.Uid)
} }
// 1 stored (because incremented manually in test), 1 sent, 1 total // 1 stored (because incremented manually in test), 1 sent, 1 synced, 1 total
tagtesting.CheckTag(t, tag, 0, 1, 0, 1, 0, 1) tagtesting.CheckTag(t, tag, 0, 1, 0, 1, 1, 1)
}
// TestModeSetSyncPushNormalTag makes sure that push sync increments tags
// correctly on a normal tag (that is, a tag that is expected to show progress bars
// according to push sync progress)
func TestModeSetSyncPushNormalTag(t *testing.T) {
mockStatestore := statestore.NewStateStore()
logger := logging.New(ioutil.Discard, 0)
db := newTestDB(t, &Options{Tags: tags.NewTags(mockStatestore, logger)})
tag, err := db.tags.Create("test", 1)
if err != nil {
t.Fatal(err)
}
ch := generateTestRandomChunk().WithTagID(tag.Uid)
_, err = db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = tag.Inc(tags.StateStored) // so we don't get an error on tag.Status later on
if err != nil {
t.Fatal(err)
}
item, err := db.pullIndex.Get(shed.Item{
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
t.Fatal(err)
}
if item.Tag != tag.Uid {
t.Fatalf("unexpected tag id value got %d want %d", item.Tag, tag.Uid)
}
tagtesting.CheckTag(t, tag, 0, 1, 0, 0, 0, 1)
err = db.Set(context.Background(), storage.ModeSetSyncPush, ch.Address())
if err != nil {
t.Fatal(err)
}
item, err = db.pullIndex.Get(shed.Item{
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
t.Fatal(err)
}
if item.Tag != tag.Uid {
t.Fatalf("unexpected tag id value got %d want %d", item.Tag, tag.Uid)
}
tagtesting.CheckTag(t, tag, 0, 1, 0, 0, 1, 1)
// call pull sync set, expect no changes
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
item, err = db.pullIndex.Get(shed.Item{
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
t.Fatal(err)
}
tagtesting.CheckTag(t, tag, 0, 1, 0, 0, 1, 1)
if item.Tag != tag.Uid {
t.Fatalf("unexpected tag id value got %d want %d", item.Tag, tag.Uid)
}
} }
// TestModeSetRemove validates ModeSetRemove index values on the provided DB. // TestModeSetRemove validates ModeSetRemove index values on the provided DB.
......
...@@ -83,7 +83,7 @@ func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) { ...@@ -83,7 +83,7 @@ func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) {
b.StartTimer() b.StartTimer()
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
err := db.Set(context.Background(), storage.ModeSetSyncPull, addrs[i]) err := db.Set(context.Background(), storage.ModeSetSync, addrs[i])
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
......
...@@ -267,7 +267,7 @@ func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (er ...@@ -267,7 +267,7 @@ func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (er
} }
// make an offer to the upstream peer in return for the requested range // make an offer to the upstream peer in return for the requested range
offer, addrs, err := s.makeOffer(ctx, rn) offer, _, err := s.makeOffer(ctx, rn)
if err != nil { if err != nil {
return fmt.Errorf("make offer: %w", err) return fmt.Errorf("make offer: %w", err)
} }
...@@ -287,12 +287,6 @@ func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (er ...@@ -287,12 +287,6 @@ func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (er
return fmt.Errorf("read want: %w", err) return fmt.Errorf("read want: %w", err)
} }
// empty bitvector implies downstream peer does not want
// any chunks (it has them already). mark chunks as synced
if len(want.BitVector) == 0 {
return s.setChunks(ctx, addrs...)
}
chs, err := s.processWant(ctx, offer, &want) chs, err := s.processWant(ctx, offer, &want)
if err != nil { if err != nil {
return fmt.Errorf("process want: %w", err) return fmt.Errorf("process want: %w", err)
...@@ -305,20 +299,10 @@ func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (er ...@@ -305,20 +299,10 @@ func (s *Syncer) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (er
} }
} }
err = s.setChunks(ctx, addrs...)
if err != nil {
return err
}
time.Sleep(50 * time.Millisecond) //because of test, getting EOF w/o time.Sleep(50 * time.Millisecond) //because of test, getting EOF w/o
return nil return nil
} }
func (s *Syncer) setChunks(ctx context.Context, addrs ...swarm.Address) error {
s.metrics.DbOpsCounter.Inc()
return s.storage.Set(ctx, storage.ModeSetSyncPull, addrs...)
}
// makeOffer tries to assemble an offer for a given requested interval. // makeOffer tries to assemble an offer for a given requested interval.
func (s *Syncer) makeOffer(ctx context.Context, rn pb.GetRange) (o *pb.Offer, addrs []swarm.Address, err error) { func (s *Syncer) makeOffer(ctx context.Context, rn pb.GetRange) (o *pb.Offer, addrs []swarm.Address, err error) {
chs, top, err := s.storage.IntervalChunks(ctx, uint8(rn.Bin), rn.From, rn.To, maxPage) chs, top, err := s.storage.IntervalChunks(ctx, uint8(rn.Bin), rn.From, rn.To, maxPage)
......
...@@ -11,7 +11,6 @@ import ( ...@@ -11,7 +11,6 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"testing" "testing"
"time"
"github.com/ethersphere/bee/pkg/logging" "github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p" "github.com/ethersphere/bee/pkg/p2p"
...@@ -56,7 +55,7 @@ func init() { ...@@ -56,7 +55,7 @@ func init() {
func TestIncoming_WantEmptyInterval(t *testing.T) { func TestIncoming_WantEmptyInterval(t *testing.T) {
var ( var (
mockTopmost = uint64(5) mockTopmost = uint64(5)
ps, serverDb = newPullSync(nil, mock.WithIntervalsResp([]swarm.Address{}, mockTopmost, nil)) ps, _ = newPullSync(nil, mock.WithIntervalsResp([]swarm.Address{}, mockTopmost, nil))
recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol()))
psClient, clientDb = newPullSync(recorder) psClient, clientDb = newPullSync(recorder)
) )
...@@ -74,12 +73,11 @@ func TestIncoming_WantEmptyInterval(t *testing.T) { ...@@ -74,12 +73,11 @@ func TestIncoming_WantEmptyInterval(t *testing.T) {
t.Fatal("too many puts") t.Fatal("too many puts")
} }
waitSet(t, serverDb, 0)
} }
func TestIncoming_WantNone(t *testing.T) { func TestIncoming_WantNone(t *testing.T) {
var ( var (
mockTopmost = uint64(5) mockTopmost = uint64(5)
ps, serverDb = newPullSync(nil, mock.WithIntervalsResp(addrs, mockTopmost, nil), mock.WithChunks(chunks...)) ps, _ = newPullSync(nil, mock.WithIntervalsResp(addrs, mockTopmost, nil), mock.WithChunks(chunks...))
recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol()))
psClient, clientDb = newPullSync(recorder, mock.WithChunks(chunks...)) psClient, clientDb = newPullSync(recorder, mock.WithChunks(chunks...))
) )
...@@ -95,14 +93,12 @@ func TestIncoming_WantNone(t *testing.T) { ...@@ -95,14 +93,12 @@ func TestIncoming_WantNone(t *testing.T) {
if clientDb.PutCalls() > 0 { if clientDb.PutCalls() > 0 {
t.Fatal("too many puts") t.Fatal("too many puts")
} }
waitSet(t, serverDb, 1)
} }
func TestIncoming_WantOne(t *testing.T) { func TestIncoming_WantOne(t *testing.T) {
var ( var (
mockTopmost = uint64(5) mockTopmost = uint64(5)
ps, serverDb = newPullSync(nil, mock.WithIntervalsResp(addrs, mockTopmost, nil), mock.WithChunks(chunks...)) ps, _ = newPullSync(nil, mock.WithIntervalsResp(addrs, mockTopmost, nil), mock.WithChunks(chunks...))
recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol()))
psClient, clientDb = newPullSync(recorder, mock.WithChunks(someChunks(1, 2, 3, 4)...)) psClient, clientDb = newPullSync(recorder, mock.WithChunks(someChunks(1, 2, 3, 4)...))
) )
...@@ -121,13 +117,12 @@ func TestIncoming_WantOne(t *testing.T) { ...@@ -121,13 +117,12 @@ func TestIncoming_WantOne(t *testing.T) {
if clientDb.PutCalls() > 1 { if clientDb.PutCalls() > 1 {
t.Fatal("too many puts") t.Fatal("too many puts")
} }
waitSet(t, serverDb, 1)
} }
func TestIncoming_WantAll(t *testing.T) { func TestIncoming_WantAll(t *testing.T) {
var ( var (
mockTopmost = uint64(5) mockTopmost = uint64(5)
ps, serverDb = newPullSync(nil, mock.WithIntervalsResp(addrs, mockTopmost, nil), mock.WithChunks(chunks...)) ps, _ = newPullSync(nil, mock.WithIntervalsResp(addrs, mockTopmost, nil), mock.WithChunks(chunks...))
recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol()))
psClient, clientDb = newPullSync(recorder) psClient, clientDb = newPullSync(recorder)
) )
...@@ -146,7 +141,6 @@ func TestIncoming_WantAll(t *testing.T) { ...@@ -146,7 +141,6 @@ func TestIncoming_WantAll(t *testing.T) {
if p := clientDb.PutCalls(); p != 5 { if p := clientDb.PutCalls(); p != 5 {
t.Fatalf("want %d puts but got %d", 5, p) t.Fatalf("want %d puts but got %d", 5, p)
} }
waitSet(t, serverDb, 1)
} }
func TestIncoming_UnsolicitedChunk(t *testing.T) { func TestIncoming_UnsolicitedChunk(t *testing.T) {
...@@ -226,20 +220,3 @@ func newPullSync(s p2p.Streamer, o ...mock.Option) (*pullsync.Syncer, *mock.Pull ...@@ -226,20 +220,3 @@ func newPullSync(s p2p.Streamer, o ...mock.Option) (*pullsync.Syncer, *mock.Pull
logger := logging.New(ioutil.Discard, 0) logger := logging.New(ioutil.Discard, 0)
return pullsync.New(s, storage, logger), storage return pullsync.New(s, storage, logger), storage
} }
func waitSet(t *testing.T, db *mock.PullStorage, v int) {
time.Sleep(10 * time.Millisecond) // give leeway for the case where v==0
var s int
for i := 0; i < 10; i++ {
s = db.SetCalls()
switch {
case s > v:
t.Fatalf("too many Set calls: got %d want %d", s, v)
case s == v:
return
default:
time.Sleep(10 * time.Millisecond)
}
}
t.Fatalf("timed out waiting for set to be called. got %d calls want %d", s, v)
}
...@@ -199,7 +199,7 @@ LOOP: ...@@ -199,7 +199,7 @@ LOOP:
} }
func (s *Service) setChunkAsSynced(ctx context.Context, ch swarm.Chunk) error { func (s *Service) setChunkAsSynced(ctx context.Context, ch swarm.Chunk) error {
if err := s.storer.Set(ctx, storage.ModeSetSyncPush, ch.Address()); err != nil { if err := s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil {
s.logger.Errorf("pusher: error setting chunk as synced: %v", err) s.logger.Errorf("pusher: error setting chunk as synced: %v", err)
s.metrics.ErrorSettingChunkToSynced.Inc() s.metrics.ErrorSettingChunkToSynced.Inc()
} }
......
...@@ -40,7 +40,7 @@ type Store struct { ...@@ -40,7 +40,7 @@ type Store struct {
setAfterCloseCount int setAfterCloseCount int
} }
// Override the Set function to capture the ModeSetSyncPush // Override the Set function to capture the ModeSetSync
func (s *Store) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) error { func (s *Store) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) error {
s.modeSetMu.Lock() s.modeSetMu.Lock()
defer s.modeSetMu.Unlock() defer s.modeSetMu.Unlock()
...@@ -67,8 +67,8 @@ func (s *Store) Close() error { ...@@ -67,8 +67,8 @@ func (s *Store) Close() error {
// TestSendChunkToPushSync sends a chunk to pushsync to be sent ot its closest peer and get a receipt. // TestSendChunkToPushSync sends a chunk to pushsync to be sent ot its closest peer and get a receipt.
// once the receipt is got this check to see if the localstore is updated to see if the chunk is set // once the receipt is got this check to see if the localstore is updated to see if the chunk is set
// as ModeSetSyncPush status. // as ModeSetSync status.
func TestSendChunkToPushSyncWithTag(t *testing.T) { func TestSendChunkToSyncWithTag(t *testing.T) {
// create a trigger and a closestpeer // create a trigger and a closestpeer
triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") triggerPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000") closestPeer := swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
...@@ -99,7 +99,7 @@ func TestSendChunkToPushSyncWithTag(t *testing.T) { ...@@ -99,7 +99,7 @@ func TestSendChunkToPushSyncWithTag(t *testing.T) {
// Give some time for chunk to be pushed and receipt to be received // Give some time for chunk to be pushed and receipt to be received
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
err = checkIfModeSet(chunk.Address(), storage.ModeSetSyncPush, storer) err = checkIfModeSet(chunk.Address(), storage.ModeSetSync, storer)
if err == nil { if err == nil {
break break
} }
...@@ -144,7 +144,7 @@ func TestSendChunkToPushSyncWithoutTag(t *testing.T) { ...@@ -144,7 +144,7 @@ func TestSendChunkToPushSyncWithoutTag(t *testing.T) {
// Give some time for chunk to be pushed and receipt to be received // Give some time for chunk to be pushed and receipt to be received
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
err = checkIfModeSet(chunk.Address(), storage.ModeSetSyncPush, storer) err = checkIfModeSet(chunk.Address(), storage.ModeSetSync, storer)
if err == nil { if err == nil {
break break
} }
...@@ -157,7 +157,7 @@ func TestSendChunkToPushSyncWithoutTag(t *testing.T) { ...@@ -157,7 +157,7 @@ func TestSendChunkToPushSyncWithoutTag(t *testing.T) {
// TestSendChunkAndReceiveInvalidReceipt sends a chunk to pushsync to be sent ot its closest peer and // TestSendChunkAndReceiveInvalidReceipt sends a chunk to pushsync to be sent ot its closest peer and
// get a invalid receipt (not with the address of the chunk sent). The test makes sure that this error // get a invalid receipt (not with the address of the chunk sent). The test makes sure that this error
// is received and the ModeSetSyncPush is not set for the chunk. // is received and the ModeSetSync is not set for the chunk.
func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) { func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) {
chunk := createChunk() chunk := createChunk()
...@@ -182,7 +182,7 @@ func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) { ...@@ -182,7 +182,7 @@ func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) {
// Give some time for chunk to be pushed and receipt to be received // Give some time for chunk to be pushed and receipt to be received
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
err = checkIfModeSet(chunk.Address(), storage.ModeSetSyncPush, storer) err = checkIfModeSet(chunk.Address(), storage.ModeSetSync, storer)
if err != nil { if err != nil {
continue continue
} }
...@@ -195,7 +195,7 @@ func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) { ...@@ -195,7 +195,7 @@ func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) {
// TestSendChunkAndTimeoutinReceivingReceipt sends a chunk to pushsync to be sent ot its closest peer and // TestSendChunkAndTimeoutinReceivingReceipt sends a chunk to pushsync to be sent ot its closest peer and
// expects a timeout to get instead of getting a receipt. The test makes sure that timeout error // expects a timeout to get instead of getting a receipt. The test makes sure that timeout error
// is received and the ModeSetSyncPush is not set for the chunk. // is received and the ModeSetSync is not set for the chunk.
func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) { func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
chunk := createChunk() chunk := createChunk()
...@@ -224,7 +224,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) { ...@@ -224,7 +224,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
// Give some time for chunk to be pushed and receipt to be received // Give some time for chunk to be pushed and receipt to be received
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
err = checkIfModeSet(chunk.Address(), storage.ModeSetSyncPush, storer) err = checkIfModeSet(chunk.Address(), storage.ModeSetSync, storer)
if err != nil { if err != nil {
continue continue
} }
......
...@@ -86,10 +86,8 @@ func (m ModeSet) String() string { ...@@ -86,10 +86,8 @@ func (m ModeSet) String() string {
switch m { switch m {
case ModeSetAccess: case ModeSetAccess:
return "Access" return "Access"
case ModeSetSyncPush: case ModeSetSync:
return "SyncPush" return "Sync"
case ModeSetSyncPull:
return "SyncPull"
case ModeSetRemove: case ModeSetRemove:
return "Remove" return "Remove"
case ModeSetPin: case ModeSetPin:
...@@ -105,10 +103,8 @@ func (m ModeSet) String() string { ...@@ -105,10 +103,8 @@ func (m ModeSet) String() string {
const ( const (
// ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery // ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
ModeSetAccess ModeSet = iota ModeSetAccess ModeSet = iota
// ModeSetSyncPush: when a push sync receipt is received for a chunk // ModeSetSync: when a push sync receipt is received for a chunk
ModeSetSyncPush ModeSetSync
// ModeSetSyncPull: when a chunk is added to a pull sync batch
ModeSetSyncPull
// ModeSetRemove: when a chunk is removed // ModeSetRemove: when a chunk is removed
ModeSetRemove ModeSetRemove
// ModeSetPin: when a chunk is pinned during upload or separately // ModeSetPin: when a chunk is pinned during upload or separately
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment