Commit f28ff0db authored by acud's avatar acud Committed by GitHub

tags: delete persisted tag (#1255)

parent 60facaea
...@@ -36,6 +36,7 @@ import ( ...@@ -36,6 +36,7 @@ import (
const ( const (
maxPage = 1000 // hard limit of page size maxPage = 1000 // hard limit of page size
tagKeyPrefix = "tags_"
) )
var ( var (
...@@ -125,6 +126,13 @@ func (ts *Tags) Range(fn func(k, v interface{}) bool) { ...@@ -125,6 +126,13 @@ func (ts *Tags) Range(fn func(k, v interface{}) bool) {
func (ts *Tags) Delete(k interface{}) { func (ts *Tags) Delete(k interface{}) {
ts.tags.Delete(k) ts.tags.Delete(k)
// k is a uint32, try to create the tag key and remove
// from statestore
if uid, ok := k.(uint32); ok && uid != 0 {
key := tagKey(uid)
_ = ts.stateStore.Delete(key)
}
} }
func (ts *Tags) MarshalJSON() (out []byte, err error) { func (ts *Tags) MarshalJSON() (out []byte, err error) {
...@@ -192,7 +200,7 @@ func (ts *Tags) ListAll(ctx context.Context, offset, limit int) (t []*Tag, err e ...@@ -192,7 +200,7 @@ func (ts *Tags) ListAll(ctx context.Context, offset, limit int) (t []*Tag, err e
} }
// and then from statestore // and then from statestore
err = ts.stateStore.Iterate("tags_", func(key, value []byte) (stop bool, err error) { err = ts.stateStore.Iterate(tagKeyPrefix, func(key, value []byte) (stop bool, err error) {
if offset > 0 { if offset > 0 {
offset-- offset--
return false, nil return false, nil
...@@ -239,7 +247,7 @@ func decodeTagValueFromStore(value []byte) (*Tag, error) { ...@@ -239,7 +247,7 @@ func decodeTagValueFromStore(value []byte) (*Tag, error) {
// getTagFromStore get a given tag from the state store. // getTagFromStore get a given tag from the state store.
func (ts *Tags) getTagFromStore(uid uint32) (*Tag, error) { func (ts *Tags) getTagFromStore(uid uint32) (*Tag, error) {
key := "tags_" + strconv.Itoa(int(uid)) key := tagKey(uid)
var data []byte var data []byte
err := ts.stateStore.Get(key, &data) err := ts.stateStore.Get(key, &data)
if err != nil { if err != nil {
...@@ -266,3 +274,7 @@ func (ts *Tags) Close() (err error) { ...@@ -266,3 +274,7 @@ func (ts *Tags) Close() (err error) {
} }
return nil return nil
} }
func tagKey(uid uint32) string {
return tagKeyPrefix + strconv.Itoa(int(uid))
}
...@@ -18,6 +18,7 @@ package tags ...@@ -18,6 +18,7 @@ package tags
import ( import (
"context" "context"
"errors"
"io/ioutil" "io/ioutil"
"sort" "sort"
"testing" "testing"
...@@ -210,4 +211,21 @@ func TestPersistence(t *testing.T) { ...@@ -210,4 +211,21 @@ func TestPersistence(t *testing.T) {
if ta.Synced != rcvd2.Synced { if ta.Synced != rcvd2.Synced {
t.Fatalf("invalid synced: expected %d got %d", ta.Synced, rcvd2.Synced) t.Fatalf("invalid synced: expected %d got %d", ta.Synced, rcvd2.Synced)
} }
// regression test case: make sure that a persisted tag
// is flushed from statestore on Delete
ts.Delete(ta.Uid)
// simulate node closing down and booting up
err = ts.Close()
if err != nil {
t.Fatal(err)
}
ts = NewTags(mockStatestore, logger)
// get the tag after the node boot up
_, err = ts.Get(ta.Uid)
if !errors.Is(err, ErrNotFound) {
t.Fatal(err)
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment