Commit 70b7a7e4 authored by acud's avatar acud Committed by GitHub

tags: remove anonymous field (#622)

* tags: remove anonymous field
parent 62e65995
...@@ -66,7 +66,7 @@ func (s *server) getOrCreateTag(tagUid string) (*tags.Tag, bool, error) { ...@@ -66,7 +66,7 @@ func (s *server) getOrCreateTag(tagUid string) (*tags.Tag, bool, error) {
if tagUid == "" { if tagUid == "" {
tagName := fmt.Sprintf("unnamed_tag_%d", time.Now().Unix()) tagName := fmt.Sprintf("unnamed_tag_%d", time.Now().Unix())
var err error var err error
tag, err := s.Tags.Create(tagName, 0, false) tag, err := s.Tags.Create(tagName, 0)
if err != nil { if err != nil {
return nil, false, fmt.Errorf("cannot create tag: %w", err) return nil, false, fmt.Errorf("cannot create tag: %w", err)
} }
......
...@@ -48,7 +48,6 @@ func newTagResponse(tag *tags.Tag) tagResponse { ...@@ -48,7 +48,6 @@ func newTagResponse(tag *tags.Tag) tagResponse {
Sent: tag.Sent, Sent: tag.Sent,
Synced: tag.Synced, Synced: tag.Synced,
Uid: tag.Uid, Uid: tag.Uid,
Anonymous: tag.Anonymous,
Name: tag.Name, Name: tag.Name,
Address: tag.Address, Address: tag.Address,
StartedAt: tag.StartedAt, StartedAt: tag.StartedAt,
...@@ -82,7 +81,7 @@ func (s *server) createTag(w http.ResponseWriter, r *http.Request) { ...@@ -82,7 +81,7 @@ func (s *server) createTag(w http.ResponseWriter, r *http.Request) {
tagr.Name = fmt.Sprintf("unnamed_tag_%d", time.Now().Unix()) tagr.Name = fmt.Sprintf("unnamed_tag_%d", time.Now().Unix())
} }
tag, err := s.Tags.Create(tagr.Name, 0, false) tag, err := s.Tags.Create(tagr.Name, 0)
if err != nil { if err != nil {
s.Logger.Debugf("create tag: tag create error: %v", err) s.Logger.Debugf("create tag: tag create error: %v", err)
s.Logger.Error("create tag: tag create error") s.Logger.Error("create tag: tag create error")
......
...@@ -212,14 +212,6 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed ...@@ -212,14 +212,6 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed
if exists { if exists {
return true, 0, nil return true, 0, nil
} }
anonymous := false
if db.tags != nil && item.Tag != 0 {
tag, err := db.tags.Get(item.Tag)
if err != nil {
return false, 0, err
}
anonymous = tag.Anonymous
}
item.StoreTimestamp = now() item.StoreTimestamp = now()
item.BinID, err = db.incBinID(binIDs, db.po(swarm.NewAddress(item.Address))) item.BinID, err = db.incBinID(binIDs, db.po(swarm.NewAddress(item.Address)))
...@@ -234,11 +226,9 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed ...@@ -234,11 +226,9 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed
if err != nil { if err != nil {
return false, 0, err return false, 0, err
} }
if !anonymous { err = db.pushIndex.PutInBatch(batch, item)
err = db.pushIndex.PutInBatch(batch, item) if err != nil {
if err != nil { return false, 0, err
return false, 0, err
}
} }
return false, 0, nil return false, 0, nil
......
...@@ -228,46 +228,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod ...@@ -228,46 +228,7 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod
item.StoreTimestamp = i.StoreTimestamp item.StoreTimestamp = i.StoreTimestamp
item.BinID = i.BinID item.BinID = i.BinID
switch mode { if mode == storage.ModeSetSyncPush {
case storage.ModeSetSyncPull:
// if we are setting a chunk for pullsync we expect it to be in the index
// if it has a tag - we increment it and set the index item to _not_ contain the tag reference
// this prevents duplicate increments
i, err := db.pullIndex.Get(item)
if err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
// we handle this error internally, since this is an internal inconsistency of the indices
// if we return the error here - it means that for example, in stream protocol peers which we sync
// to would be dropped. this is possible when the chunk is put with ModePutRequest and ModeSetSyncPull is
// called on the same chunk (which should not happen)
db.logger.Debugf("localstore: chunk with address %s not found in pull index", addr)
break
}
return 0, err
}
if db.tags != nil && i.Tag != 0 {
t, err := db.tags.Get(i.Tag)
// increment if and only if tag is anonymous
if err == nil && t.Anonymous {
// since pull sync does not guarantee that
// a chunk has reached its NN, we can only mark
// it as Sent
t.Inc(tags.StateSent)
// setting the tag to zero makes sure that
// we don't increment the same tag twice when syncing
// the same chunk to different peers
item.Tag = 0
err = db.pullIndex.PutInBatch(batch, item)
if err != nil {
return 0, err
}
}
}
case storage.ModeSetSyncPush:
i, err := db.pushIndex.Get(item) i, err := db.pushIndex.Get(item)
if err != nil { if err != nil {
if errors.Is(err, leveldb.ErrNotFound) { if errors.Is(err, leveldb.ErrNotFound) {
...@@ -275,22 +236,17 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod ...@@ -275,22 +236,17 @@ func (db *DB) setSync(batch *leveldb.Batch, addr swarm.Address, mode storage.Mod
// this error can happen if the chunk is put with ModePutRequest or ModePutSync // this error can happen if the chunk is put with ModePutRequest or ModePutSync
// but this function is called with ModeSetSyncPush // but this function is called with ModeSetSyncPush
db.logger.Debugf("localstore: chunk with address %s not found in push index", addr) db.logger.Debugf("localstore: chunk with address %s not found in push index", addr)
break } else {
return 0, err
} }
return 0, err
} }
if db.tags != nil && i.Tag != 0 { if err == nil && db.tags != nil && i.Tag != 0 {
t, err := db.tags.Get(i.Tag) t, err := db.tags.Get(i.Tag)
if err != nil { if err != nil {
// we cannot break or return here since the function needs to // we cannot break or return here since the function needs to
// run to end from db.pushIndex.DeleteInBatch // run to end from db.pushIndex.DeleteInBatch
db.logger.Errorf("localstore: get tags on push sync set uid %d: %v", i.Tag, err) db.logger.Errorf("localstore: get tags on push sync set uid %d: %v", i.Tag, err)
} else { } else {
// setting a chunk for push sync assumes the tag is not anonymous
if t.Anonymous {
return 0, errors.New("got an anonymous chunk in push sync index")
}
t.Inc(tags.StateSynced) t.Inc(tags.StateSynced)
} }
} }
......
...@@ -72,7 +72,7 @@ func TestModeSetAccess(t *testing.T) { ...@@ -72,7 +72,7 @@ func TestModeSetAccess(t *testing.T) {
func TestModeSetSyncPullNormalTag(t *testing.T) { func TestModeSetSyncPullNormalTag(t *testing.T) {
db := newTestDB(t, &Options{Tags: tags.NewTags()}) db := newTestDB(t, &Options{Tags: tags.NewTags()})
tag, err := db.tags.Create("test", 1, false) tag, err := db.tags.Create("test", 1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -120,131 +120,13 @@ func TestModeSetSyncPullNormalTag(t *testing.T) { ...@@ -120,131 +120,13 @@ func TestModeSetSyncPullNormalTag(t *testing.T) {
tagtesting.CheckTag(t, tag, 0, 1, 0, 1, 0, 1) tagtesting.CheckTag(t, tag, 0, 1, 0, 1, 0, 1)
} }
// TestModeSetSyncPullAnonymousTag checks that pull sync correcly increments
// counters on an anonymous tag which is expected to be handled only by pull sync
func TestModeSetSyncPullAnonymousTag(t *testing.T) {
db := newTestDB(t, &Options{Tags: tags.NewTags()})
tag, err := db.tags.Create("test", 1, true)
if err != nil {
t.Fatal(err)
}
ch := generateTestRandomChunk().WithTagID(tag.Uid)
_, err = db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
tag.Inc(tags.StateStored) // so we don't get an error on tag.Status later on
item, err := db.pullIndex.Get(shed.Item{
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
t.Fatal(err)
}
if item.Tag != tag.Uid {
t.Fatalf("unexpected tag id value got %d want %d", item.Tag, tag.Uid)
}
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
item, err = db.pullIndex.Get(shed.Item{
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
t.Fatal(err)
}
if item.Tag != 0 {
t.Fatalf("unexpected tag id value got %d want %d", item.Tag, 0)
}
// 1 stored (because incremented manually in test), 1 sent, 1 total
tagtesting.CheckTag(t, tag, 0, 1, 0, 1, 0, 1)
}
// TestModeSetSyncPullPushAnonymousTag creates an anonymous tag and a corresponding chunk
// then tries to Set both with push and pull Sync modes, but asserts that only the pull sync
// increments were done to the tag
func TestModeSetSyncPullPushAnonymousTag(t *testing.T) {
db := newTestDB(t, &Options{Tags: tags.NewTags()})
tag, err := db.tags.Create("test", 1, true)
if err != nil {
t.Fatal(err)
}
ch := generateTestRandomChunk().WithTagID(tag.Uid)
_, err = db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
tag.Inc(tags.StateStored) // so we don't get an error on tag.Status later on
item, err := db.pullIndex.Get(shed.Item{
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
t.Fatal(err)
}
if item.Tag != tag.Uid {
t.Fatalf("unexpected tag id value got %d want %d", item.Tag, tag.Uid)
}
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
// expect no error here. if the item cannot be found in pushsync the rest of the
// setSync logic should be executed
err = db.Set(context.Background(), storage.ModeSetSyncPush, ch.Address())
if err != nil {
t.Fatal(err)
}
// check that the tag has been incremented
item, err = db.pullIndex.Get(shed.Item{
Address: ch.Address().Bytes(),
BinID: 1,
})
if err != nil {
t.Fatal(err)
}
if item.Tag != 0 {
t.Fatalf("unexpected tag id value got %d want %d", item.Tag, 0)
}
// 1 stored (because incremented manually in test), 1 sent, 1 total
tagtesting.CheckTag(t, tag, 0, 1, 0, 1, 0, 1)
// verify that the item does not exist in the push index
item, err = db.pushIndex.Get(shed.Item{
Address: ch.Address().Bytes(),
BinID: 1,
})
if err == nil {
t.Fatal("expected error but got none")
}
}
// TestModeSetSyncPushNormalTag makes sure that push sync increments tags // TestModeSetSyncPushNormalTag makes sure that push sync increments tags
// correctly on a normal tag (that is, a tag that is expected to show progress bars // correctly on a normal tag (that is, a tag that is expected to show progress bars
// according to push sync progress) // according to push sync progress)
func TestModeSetSyncPushNormalTag(t *testing.T) { func TestModeSetSyncPushNormalTag(t *testing.T) {
db := newTestDB(t, &Options{Tags: tags.NewTags()}) db := newTestDB(t, &Options{Tags: tags.NewTags()})
tag, err := db.tags.Create("test", 1, false) tag, err := db.tags.Create("test", 1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -60,7 +60,7 @@ func TestSendChunkToPushSyncWithTag(t *testing.T) { ...@@ -60,7 +60,7 @@ func TestSendChunkToPushSyncWithTag(t *testing.T) {
mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer)) mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
defer storer.Close() defer storer.Close()
ta, err := mtags.Create("test", 1, false) ta, err := mtags.Create("test", 1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -111,7 +111,7 @@ func TestPushChunkToClosest(t *testing.T) { ...@@ -111,7 +111,7 @@ func TestPushChunkToClosest(t *testing.T) {
psPivot, storerPivot, pivotTags, pivotAccounting := createPushSyncNode(t, pivotNode, recorder, nil, mock.WithClosestPeer(closestPeer)) psPivot, storerPivot, pivotTags, pivotAccounting := createPushSyncNode(t, pivotNode, recorder, nil, mock.WithClosestPeer(closestPeer))
defer storerPivot.Close() defer storerPivot.Close()
ta, err := pivotTags.Create("test", 1, false) ta, err := pivotTags.Create("test", 1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -157,7 +157,7 @@ func TestResolve(t *testing.T) { ...@@ -157,7 +157,7 @@ func TestResolve(t *testing.T) {
tld: ".empty", tld: ".empty",
}, },
{ {
tld: ".fails", tld: ".errors",
res: []resolver.Interface{ res: []resolver.Interface{
newErrResolver(), newErrResolver(),
newErrResolver(), newErrResolver(),
...@@ -195,8 +195,8 @@ func TestResolve(t *testing.T) { ...@@ -195,8 +195,8 @@ func TestResolve(t *testing.T) {
wantErr: resolver.ErrResolverChainEmpty, wantErr: resolver.ErrResolverChainEmpty,
}, },
{ {
name: "this.fails", name: "this.errors",
wantErr: fmt.Errorf("name resolution failed for %q", "this.fails"), wantErr: fmt.Errorf("name resolution failed for %q", "this.errors"),
}, },
} }
......
...@@ -57,7 +57,6 @@ type Tag struct { ...@@ -57,7 +57,6 @@ type Tag struct {
Synced int64 // number of chunks synced with proof Synced int64 // number of chunks synced with proof
Uid uint32 // a unique identifier for this tag Uid uint32 // a unique identifier for this tag
Anonymous bool // indicates if the tag is anonymous (i.e. if only pull sync should be used)
Name string // a name tag for this tag Name string // a name tag for this tag
Address swarm.Address // the associated swarm hash for this tag Address swarm.Address // the associated swarm hash for this tag
StartedAt time.Time // tag started to calculate ETA StartedAt time.Time // tag started to calculate ETA
...@@ -69,10 +68,9 @@ type Tag struct { ...@@ -69,10 +68,9 @@ type Tag struct {
} }
// NewTag creates a new tag, and returns it // NewTag creates a new tag, and returns it
func NewTag(ctx context.Context, uid uint32, s string, total int64, anon bool, tracer *tracing.Tracer) *Tag { func NewTag(ctx context.Context, uid uint32, s string, total int64, tracer *tracing.Tracer) *Tag {
t := &Tag{ t := &Tag{
Uid: uid, Uid: uid,
Anonymous: anon,
Name: s, Name: s,
StartedAt: time.Now(), StartedAt: time.Now(),
Total: total, Total: total,
......
...@@ -144,7 +144,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) { ...@@ -144,7 +144,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) {
wg.Add(10 * 5 * n) wg.Add(10 * 5 * n)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
s := string([]byte{uint8(i)}) s := string([]byte{uint8(i)})
tag, err := ts.Create(s, int64(n), false) tag, err := ts.Create(s, int64(n))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -185,7 +185,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) { ...@@ -185,7 +185,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) {
// TestMarshallingWithAddr tests that marshalling and unmarshalling is done correctly when the // TestMarshallingWithAddr tests that marshalling and unmarshalling is done correctly when the
// tag Address (byte slice) contains some arbitrary value // tag Address (byte slice) contains some arbitrary value
func TestMarshallingWithAddr(t *testing.T) { func TestMarshallingWithAddr(t *testing.T) {
tg := NewTag(context.Background(), 111, "test/tag", 10, false, nil) tg := NewTag(context.Background(), 111, "test/tag", 10, nil)
tg.Address = swarm.NewAddress([]byte{0, 1, 2, 3, 4, 5, 6}) tg.Address = swarm.NewAddress([]byte{0, 1, 2, 3, 4, 5, 6})
for _, f := range allStates { for _, f := range allStates {
...@@ -210,9 +210,6 @@ func TestMarshallingWithAddr(t *testing.T) { ...@@ -210,9 +210,6 @@ func TestMarshallingWithAddr(t *testing.T) {
if unmarshalledTag.Name != tg.Name { if unmarshalledTag.Name != tg.Name {
t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name) t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name)
} }
if unmarshalledTag.Anonymous != tg.Anonymous {
t.Fatalf("tag anon field not equal. want %t got %t", tg.Anonymous, unmarshalledTag.Anonymous)
}
for _, state := range allStates { for _, state := range allStates {
uv, tv := unmarshalledTag.Get(state), tg.Get(state) uv, tv := unmarshalledTag.Get(state), tg.Get(state)
...@@ -236,7 +233,7 @@ func TestMarshallingWithAddr(t *testing.T) { ...@@ -236,7 +233,7 @@ func TestMarshallingWithAddr(t *testing.T) {
// TestMarshallingNoAddress tests that marshalling and unmarshalling is done correctly // TestMarshallingNoAddress tests that marshalling and unmarshalling is done correctly
func TestMarshallingNoAddr(t *testing.T) { func TestMarshallingNoAddr(t *testing.T) {
tg := NewTag(context.Background(), 111, "test/tag", 10, false, nil) tg := NewTag(context.Background(), 111, "test/tag", 10, nil)
for _, f := range allStates { for _, f := range allStates {
tg.Inc(f) tg.Inc(f)
} }
......
...@@ -48,8 +48,8 @@ func NewTags() *Tags { ...@@ -48,8 +48,8 @@ func NewTags() *Tags {
// Create creates a new tag, stores it by the name and returns it // Create creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists // it returns an error if the tag with this name already exists
func (ts *Tags) Create(s string, total int64, anon bool) (*Tag, error) { func (ts *Tags) Create(s string, total int64) (*Tag, error) {
t := NewTag(context.Background(), TagUidFunc(), s, total, anon, nil) t := NewTag(context.Background(), TagUidFunc(), s, total, nil)
if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded { if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded {
return nil, errExists return nil, errExists
......
...@@ -22,10 +22,10 @@ import ( ...@@ -22,10 +22,10 @@ import (
func TestAll(t *testing.T) { func TestAll(t *testing.T) {
ts := NewTags() ts := NewTags()
if _, err := ts.Create("1", 1, false); err != nil { if _, err := ts.Create("1", 1); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if _, err := ts.Create("2", 1, false); err != nil { if _, err := ts.Create("2", 1); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -43,7 +43,7 @@ func TestAll(t *testing.T) { ...@@ -43,7 +43,7 @@ func TestAll(t *testing.T) {
t.Fatalf("expected tag 1 Total to be 1 got %d", n) t.Fatalf("expected tag 1 Total to be 1 got %d", n)
} }
if _, err := ts.Create("3", 1, false); err != nil { if _, err := ts.Create("3", 1); err != nil {
t.Fatal(err) t.Fatal(err)
} }
all = ts.All() all = ts.All()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment