Commit db94164e authored by vicotor's avatar vicotor

update syncer

parent b99bf09b
......@@ -12,7 +12,7 @@ import (
"time"
)
func manual(name string, bee string) {
func manual(name string, bee string, sync_all bool) {
allTask, err := core.GetTasks()
if err != nil {
slog.Error("GetTasks", "err", err.Error())
......@@ -33,6 +33,15 @@ func manual(name string, bee string) {
swarm.InitSwarm([]string{bee})
cli := swarm.GetSwarm()
if sync_all {
todoTask.Idx = []core.UserTask{
{
// an non-exist user id.
UserId: "1",
},
}
}
page := core.NewPageUsers(core.NewIdx(todoTask.Idx))
var (
......@@ -105,9 +114,10 @@ func manual(name string, bee string) {
var (
taskname = flag.String("task", "", "task name")
bee = flag.String("bee", "http://127.0.0.1:8088", "bee url")
syncAll = flag.Bool("a", false, "sync all followers of the task")
)
func main() {
flag.Parse()
manual(*taskname, *bee)
manual(*taskname, *bee, *syncAll)
}
......@@ -218,16 +218,32 @@ func InsertTaskRes(content []UserTask, tableName string, taskId string) error {
rows = append(rows, v)
}
res, _, err := DBClient.From(tableName).Insert(rows, true, "", "representation", "").Execute()
res, cnt, err := DBClient.From(tableName).Insert(rows, true, "", "representation", "").Execute()
if err != nil {
slog.Error("InsertTaskRes", "err", err.Error())
return err
}
slog.Info("InsertTaskRes one page finished", "data count", len(rows))
slog.Info("InsertTaskRes one page finished", "data count", len(rows), "insert count", cnt)
time.Sleep(1 * time.Second)
_ = res
}
} else {
rows := make([]UserTask, 0, len(content))
for _, v := range content {
v.TaskId = taskId
rows = append(rows, v)
}
res, cnt, err := DBClient.From(tableName).Insert(rows, true, "", "representation", "").Execute()
if err != nil {
slog.Error("InsertTaskRes", "err", err.Error())
return err
}
slog.Info("InsertTaskRes", "data count", len(rows), "insert count", cnt)
_ = res
}
return nil
......
......@@ -54,6 +54,7 @@ func (s *Idx) Idx(page []UserTask) (bool, *list.List) {
match := false
for ik, iv := range s.idx {
if v.UserId == iv.UserId {
slog.Info("Idx", "pageIndex", k, "pageUserId", v.UserId, "idxIndex", ik, "idxUserId", iv.UserId)
match = true
break
_, _ = k, ik
......@@ -61,6 +62,9 @@ func (s *Idx) Idx(page []UserTask) (bool, *list.List) {
}
if !match {
newItems.PushFront(v)
} else {
// stop, all new items has added to newItems.
break
}
}
if newItems.Len() < len(page) {
......
......@@ -223,9 +223,9 @@ func Request(f req, page *PageUsers, t TaskJob) error {
if err := InsertTaskRes(users, t.TaskType, t.TaskId); err != nil {
for k, v := range users {
fmt.Println(k, v.UserId, v.UserName)
}
//for k, v := range users {
// fmt.Println(k, v.UserId, v.UserName)
//}
slog.Error("InsertTaskRes", "task id", t.TaskId, "t.TaskType", t.TaskType, "len(users)", len(users), "err", err.Error())
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment