Commit b99bf09b authored by vicotor's avatar vicotor

update

parent 8466a42e
...@@ -10,9 +10,19 @@ default: twitter_syncer ...@@ -10,9 +10,19 @@ default: twitter_syncer
all: twitter_syncer docker all: twitter_syncer docker
twitter_syncer: twitter_syncer:
go build -o=${GOBIN}/$@ -gcflags "all=-N -l" . go build -o=${GOBIN}/$@ -gcflags "all=-N -l" ./cmd/syncer
@echo "Done building." @echo "Done building."
manual:
go build -o=${GOBIN}/$@ -gcflags "all=-N -l" ./cmd/manual
@echo "Done building."
.PHNOY: manual
write:
go build -o=${GOBIN}/$@ -gcflags "all=-N -l" ./cmd/write
@echo "Done building."
.PHNOY: write
docker: docker:
docker build -t twitter_syncer:${TAG} . docker build -t twitter_syncer:${TAG} .
......
package main
import (
"code.wuban.net.cn/odysseus/twitter_syncer/core"
"code.wuban.net.cn/odysseus/twitter_syncer/swarm"
"encoding/json"
"flag"
"fmt"
"github.com/g8rswimmer/go-twitter/v2"
"log/slog"
"os"
"time"
)
func manual(name string, bee string) {
allTask, err := core.GetTasks()
if err != nil {
slog.Error("GetTasks", "err", err.Error())
return
}
var todoTask *core.TaskJob
for _, task := range allTask {
if task.TaskType == core.FollowType && task.TaskId == name {
todoTask = &task
break
}
}
if todoTask.TaskId == "" {
slog.Error("GetTasks", "err", "not found")
return
}
swarm.InitSwarm([]string{bee})
cli := swarm.GetSwarm()
page := core.NewPageUsers(core.NewIdx(todoTask.Idx))
var (
cursor = ""
newCursor = ""
users []*twitter.UserObj
)
//var maxTime = 20
for {
users, newCursor, _, err = cli.GetFollowerList(todoTask.TaskId, todoTask.UserId, cursor)
if err != nil {
slog.Error("GetFollowerList", "task id", todoTask.TaskId, "cursor", cursor, "err", err.Error())
break
}
taskUser := core.UserObjectToUserTask(users)
slog.Info("GetFollowerList", "task id", todoTask.TaskId, "cursor", cursor, "len(users)", len(users), "newCursor", newCursor)
if (cursor != "" && newCursor == "") || (cursor != "" && newCursor == "0") {
slog.Info("Get all followers finished")
l := page.GetIdx().List
res := make([]core.UserTask, 0, l.Len())
for e := l.Front(); e != nil; e = e.Next() {
if user, ok := e.Value.(core.UserTask); ok {
res = append(res, user)
}
}
// save it to json file.
d, _ := json.MarshalIndent(res, "", " ")
filename := fmt.Sprintf("follower-%s.json", todoTask.TaskId)
err = os.WriteFile(filename, d, 0644)
if err != nil {
slog.Error("write followers", "err", err.Error())
} else {
slog.Info("write followers", "task id", todoTask.TaskId, "len(users)", len(res))
}
if err := core.InsertTaskRes(res, todoTask.TaskType, todoTask.TaskId); err != nil {
slog.Error("InsertTaskRes", "task id", todoTask.TaskId,
"t.TaskType", todoTask.TaskType, "len(users)", len(users), "err", err.Error())
}
return
} else {
ok, l := page.GetIdx().Idx(taskUser)
if ok {
res := make([]core.UserTask, 0, l.Len())
for e := l.Front(); e != nil; e = e.Next() {
if user, ok := e.Value.(core.UserTask); ok {
res = append(res, user)
}
}
if err := core.InsertTaskRes(res, todoTask.TaskType, todoTask.TaskId); err != nil {
slog.Error("InsertTaskRes", "task id", todoTask.TaskId,
"t.TaskType", todoTask.TaskType, "len(users)", len(users), "err", err.Error())
} else {
slog.Info("InsertTaskRes", "task id", todoTask.TaskId, "len(users)", len(res))
}
return
}
}
if newCursor == "" || newCursor == "0" {
break
}
cursor = newCursor
time.Sleep(time.Second * 1)
}
}
var (
taskname = flag.String("task", "", "task name")
bee = flag.String("bee", "http://127.0.0.1:8088", "bee url")
)
func main() {
flag.Parse()
manual(*taskname, *bee)
}
...@@ -16,6 +16,7 @@ package main ...@@ -16,6 +16,7 @@ package main
import ( import (
"code.wuban.net.cn/odysseus/twitter_syncer/acmanager" "code.wuban.net.cn/odysseus/twitter_syncer/acmanager"
"code.wuban.net.cn/odysseus/twitter_syncer/core"
"code.wuban.net.cn/odysseus/twitter_syncer/swarm" "code.wuban.net.cn/odysseus/twitter_syncer/swarm"
"flag" "flag"
"fmt" "fmt"
...@@ -23,7 +24,6 @@ import ( ...@@ -23,7 +24,6 @@ import (
"log/slog" "log/slog"
"strings" "strings"
//"github.com/gofiber/contrib/swagger"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/cors"
...@@ -43,7 +43,7 @@ func main() { ...@@ -43,7 +43,7 @@ func main() {
defer close(done) defer close(done)
tasks, err := GetTasks() tasks, err := core.GetTasks()
if err != nil { if err != nil {
slog.Error(err.Error()) slog.Error(err.Error())
...@@ -54,13 +54,13 @@ func main() { ...@@ -54,13 +54,13 @@ func main() {
for _, task := range tasks { for _, task := range tasks {
fmt.Println(task.String()) fmt.Println(task.String())
if err := Worker.AddJob(task); err != nil { if err := core.Worker.AddJob(task); err != nil {
slog.Error(err.Error()) slog.Error(err.Error())
} }
} }
}() }()
manager := acmanager.NewManager(client) manager := acmanager.NewManager(core.DBClient)
manager.Start() manager.Start()
// taskIn = taskInStream // taskIn = taskInStream
...@@ -81,16 +81,16 @@ func main() { ...@@ -81,16 +81,16 @@ func main() {
//app.Use(swagger.New(cfg)) //app.Use(swagger.New(cfg))
app.Get("/swagger/*", swagger.HandlerDefault) app.Get("/swagger/*", swagger.HandlerDefault)
app.Post("/project", Project) app.Post("/project", core.Project)
app.Get("/apikey/owner", GetConfigOwner) app.Get("/apikey/owner", core.GetConfigOwner)
app.Post("/task/add", TaskAdd) app.Post("/task/add", core.TaskAdd)
app.Post("/task/stop", TaskStop) app.Post("/task/stop", core.TaskStop)
app.Get("/verify/follower", VerifyFollower) app.Get("/verify/follower", core.VerifyFollower)
app.Get("/verify/retweeter", VerifyRetweeter) app.Get("/verify/retweeter", core.VerifyRetweeter)
app.Post("/bee/add", BeeAdd) app.Post("/bee/add", core.BeeAdd)
app.Post("/bee/del", BeeDel) app.Post("/bee/del", core.BeeDel)
//VerifyLike //VerifyLike
app.Get("/verify/like", VerifyLike) app.Get("/verify/like", core.VerifyLike)
if err := app.Listen(":8001"); err != nil { if err := app.Listen(":8001"); err != nil {
slog.Error(err.Error()) slog.Error(err.Error())
......
package main
import (
"code.wuban.net.cn/odysseus/twitter_syncer/core"
"encoding/json"
"flag"
"log/slog"
"os"
)
func write(name string, file string) {
allTask, err := core.GetTasks()
if err != nil {
slog.Error("GetTasks", "err", err.Error())
return
}
var todoTask *core.TaskJob
for _, task := range allTask {
if task.TaskType == core.FollowType && task.TaskId == name {
todoTask = &task
break
}
}
if todoTask.TaskId == "" {
slog.Error("GetTasks", "err", "not found")
return
}
data, err := os.ReadFile(file)
if err != nil {
slog.Error("read file", "err", err.Error())
return
}
var users []core.UserTask
err = json.Unmarshal(data, &users)
if err != nil {
slog.Error("unmarshal", "err", err.Error())
return
}
if err := core.InsertTaskRes(users, todoTask.TaskType, todoTask.TaskId); err != nil {
slog.Error("InsertTaskRes", "task id", todoTask.TaskId,
"t.TaskType", todoTask.TaskType, "len(users)", len(users), "err", err.Error())
}
}
var (
taskname = flag.String("task", "", "task name")
file = flag.String("file", "", "write file name")
)
func main() {
flag.Parse()
write(*taskname, *file)
}
package main package core
import ( import (
"context" "context"
......
package main package core
import "encoding/json" import "encoding/json"
func VerifyFollowerInDb(userId, followerId string) (bool, error) { func VerifyFollowerInDb(userId, followerId string) (bool, error) {
_, count, err := client.From("followers").Select("*", "exact", false).Eq("task_id", userId).Eq("user_id", followerId).Execute() _, count, err := DBClient.From("followers").Select("*", "exact", false).Eq("task_id", userId).Eq("user_id", followerId).Execute()
if err != nil { if err != nil {
return false, err return false, err
...@@ -15,7 +15,7 @@ func VerifyFollowerInDb(userId, followerId string) (bool, error) { ...@@ -15,7 +15,7 @@ func VerifyFollowerInDb(userId, followerId string) (bool, error) {
func VerifyRetweeterInDb(tweetId, retweeter string) (bool, error) { func VerifyRetweeterInDb(tweetId, retweeter string) (bool, error) {
_, count, err := client.From("retweeters").Select("*", "exact", false).Eq("task_id", tweetId).Eq("user_id", retweeter).Execute() _, count, err := DBClient.From("retweeters").Select("*", "exact", false).Eq("task_id", tweetId).Eq("user_id", retweeter).Execute()
if err != nil { if err != nil {
return false, err return false, err
...@@ -26,7 +26,7 @@ func VerifyRetweeterInDb(tweetId, retweeter string) (bool, error) { ...@@ -26,7 +26,7 @@ func VerifyRetweeterInDb(tweetId, retweeter string) (bool, error) {
func VerifyLikeInDb(tweetId, userId string) (bool, error) { func VerifyLikeInDb(tweetId, userId string) (bool, error) {
_, count, err := client.From("tweet_liking_users").Select("*", "exact", false).Eq("task_id", tweetId).Eq("user_id", userId).Execute() _, count, err := DBClient.From("tweet_liking_users").Select("*", "exact", false).Eq("task_id", tweetId).Eq("user_id", userId).Execute()
if err != nil { if err != nil {
return false, err return false, err
...@@ -46,7 +46,7 @@ func AddTaskInsert(req AddTaskReq, followerCount int) error { ...@@ -46,7 +46,7 @@ func AddTaskInsert(req AddTaskReq, followerCount int) error {
FollowerCount: followerCount, FollowerCount: followerCount,
} }
res, _, err := client.From("tasks").Insert(task, true, "", "representation", "").Execute() res, _, err := DBClient.From("tasks").Insert(task, true, "", "representation", "").Execute()
_ = res _ = res
...@@ -55,7 +55,7 @@ func AddTaskInsert(req AddTaskReq, followerCount int) error { ...@@ -55,7 +55,7 @@ func AddTaskInsert(req AddTaskReq, followerCount int) error {
func UpdateFollowerTaskCount(taskId, taskType, userId string, fc int) error { func UpdateFollowerTaskCount(taskId, taskType, userId string, fc int) error {
res, _, err := client.From("tasks").Update(&struct { res, _, err := DBClient.From("tasks").Update(&struct {
FC int `json:"follower_count"` FC int `json:"follower_count"`
}{ }{
FC: fc, FC: fc,
...@@ -68,9 +68,9 @@ func UpdateFollowerTaskCount(taskId, taskType, userId string, fc int) error { ...@@ -68,9 +68,9 @@ func UpdateFollowerTaskCount(taskId, taskType, userId string, fc int) error {
func StopTaskUpdate(req StopTaskReq) error { func StopTaskUpdate(req StopTaskReq) error {
//res, _, err := client.From("tasks").Insert(task, true, "", "representation", "").Execute() //res, _, err := DBClient.From("tasks").Insert(task, true, "", "representation", "").Execute()
res, _, err := client.From("tasks").Update(&struct { res, _, err := DBClient.From("tasks").Update(&struct {
Stop bool `json:"stop"` Stop bool `json:"stop"`
}{ }{
Stop: true, Stop: true,
...@@ -102,7 +102,7 @@ func AddOrUpdateProject(cfg ProjectReq, user UserInfo) error { ...@@ -102,7 +102,7 @@ func AddOrUpdateProject(cfg ProjectReq, user UserInfo) error {
Available: true, Available: true,
} }
res, _, err := client.From("project").Insert(project, true, "", "representation", "").Execute() res, _, err := DBClient.From("project").Insert(project, true, "", "representation", "").Execute()
_ = res _ = res
...@@ -111,7 +111,7 @@ func AddOrUpdateProject(cfg ProjectReq, user UserInfo) error { ...@@ -111,7 +111,7 @@ func AddOrUpdateProject(cfg ProjectReq, user UserInfo) error {
func QueryProjectByKeysAndToken(cfg ProjectReq) (UserInfo, bool, error) { func QueryProjectByKeysAndToken(cfg ProjectReq) (UserInfo, bool, error) {
data, count, err := client.From("project").Select("*", "exact", false).Eq("api_key", cfg.ApiKey).Eq("api_key_secret", cfg.ApiKeySecrect).Eq("token", cfg.Token).Eq("access_token", cfg.AccessToken).Eq("access_token_secret", cfg.AccessTokenSecret).Execute() data, count, err := DBClient.From("project").Select("*", "exact", false).Eq("api_key", cfg.ApiKey).Eq("api_key_secret", cfg.ApiKeySecrect).Eq("token", cfg.Token).Eq("access_token", cfg.AccessToken).Eq("access_token_secret", cfg.AccessTokenSecret).Execute()
if err != nil { if err != nil {
return UserInfo{}, false, err return UserInfo{}, false, err
...@@ -136,7 +136,7 @@ func QueryProjectByKeysAndToken(cfg ProjectReq) (UserInfo, bool, error) { ...@@ -136,7 +136,7 @@ func QueryProjectByKeysAndToken(cfg ProjectReq) (UserInfo, bool, error) {
func CheckTaskExist(userId, taskId, taskType string) (bool, error) { func CheckTaskExist(userId, taskId, taskType string) (bool, error) {
_, count, err := client.From("tasks").Select("*", "exact", false).Eq("user_id", userId).Eq("task_id", taskId).Eq("task_type", taskType).Eq("stop", "false").Execute() _, count, err := DBClient.From("tasks").Select("*", "exact", false).Eq("user_id", userId).Eq("task_id", taskId).Eq("task_type", taskType).Eq("stop", "false").Execute()
if err != nil { if err != nil {
return false, err return false, err
...@@ -165,7 +165,7 @@ func CheckFollowerTaskAndAccountCount() (bool, error) { ...@@ -165,7 +165,7 @@ func CheckFollowerTaskAndAccountCount() (bool, error) {
func FollowerTaskCount() (int64, error) { func FollowerTaskCount() (int64, error) {
_, count, err := client.From("tasks").Select("*", "exact", false).Eq("stop", "false").Execute() _, count, err := DBClient.From("tasks").Select("*", "exact", false).Eq("stop", "false").Execute()
if err != nil { if err != nil {
return 0, err return 0, err
...@@ -177,7 +177,7 @@ func FollowerTaskCount() (int64, error) { ...@@ -177,7 +177,7 @@ func FollowerTaskCount() (int64, error) {
func AvailableAccountCount() (int64, error) { func AvailableAccountCount() (int64, error) {
_, count, err := client.From("accounts").Select("*", "exact", false).Eq("available", "false").Execute() _, count, err := DBClient.From("accounts").Select("*", "exact", false).Eq("available", "false").Execute()
if err != nil { if err != nil {
return 0, err return 0, err
...@@ -188,7 +188,7 @@ func AvailableAccountCount() (int64, error) { ...@@ -188,7 +188,7 @@ func AvailableAccountCount() (int64, error) {
func QueryProjectByUserId(userId string) ([]ProjectInDb, bool, error) { func QueryProjectByUserId(userId string) ([]ProjectInDb, bool, error) {
data, count, err := client.From("project").Select("*", "exact", false).Eq("user_id", userId).Eq("available", "true").Execute() data, count, err := DBClient.From("project").Select("*", "exact", false).Eq("user_id", userId).Eq("available", "true").Execute()
if err != nil { if err != nil {
return nil, false, err return nil, false, err
...@@ -206,7 +206,7 @@ func QueryProjectByUserId(userId string) ([]ProjectInDb, bool, error) { ...@@ -206,7 +206,7 @@ func QueryProjectByUserId(userId string) ([]ProjectInDb, bool, error) {
func QueryProjectByUserIdAndName(userName, userId string) (bool, error) { func QueryProjectByUserIdAndName(userName, userId string) (bool, error) {
_, count, err := client.From("project").Select("*", "exact", false).Eq("user_id", userId).Eq("username", userName).Execute() _, count, err := DBClient.From("project").Select("*", "exact", false).Eq("user_id", userId).Eq("username", userName).Execute()
if err != nil { if err != nil {
return false, err return false, err
...@@ -218,7 +218,7 @@ func QueryProjectByUserIdAndName(userName, userId string) (bool, error) { ...@@ -218,7 +218,7 @@ func QueryProjectByUserIdAndName(userName, userId string) (bool, error) {
func QueryAvailableProject() ([]ProjectInDb, error) { func QueryAvailableProject() ([]ProjectInDb, error) {
data, count, err := client.From("project").Select("*", "exact", false).Eq("available", "true").Execute() data, count, err := DBClient.From("project").Select("*", "exact", false).Eq("available", "true").Execute()
if err != nil { if err != nil {
return nil, err return nil, err
......
package main package core
import ( import (
"testing" "testing"
...@@ -7,7 +7,7 @@ import ( ...@@ -7,7 +7,7 @@ import (
func TestTime(t *testing.T) { func TestTime(t *testing.T) {
data, count, err := client.From("tweet_liking_users").Select("*", "exact", false).Eq("task_id", "1800805503066661056").Eq("user_id", "1823984946710765569").Gt("created_at", "2024-08-17T13:18:18.505072+00:00").Lt("created_at", "2024-08-18T13:18:19.505072+00:00").Execute() data, count, err := DBClient.From("tweet_liking_users").Select("*", "exact", false).Eq("task_id", "1800805503066661056").Eq("user_id", "1823984946710765569").Gt("created_at", "2024-08-17T13:18:18.505072+00:00").Lt("created_at", "2024-08-18T13:18:19.505072+00:00").Execute()
if err != nil { if err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
......
package main package core
import ( import (
"code.wuban.net.cn/odysseus/twitter_syncer/swarm" "code.wuban.net.cn/odysseus/twitter_syncer/swarm"
...@@ -536,7 +536,7 @@ type BeeReq struct { ...@@ -536,7 +536,7 @@ type BeeReq struct {
// BeeAdd godoc // BeeAdd godoc
// @Summary BeeAdd // @Summary BeeAdd
// @Description add a bee client. // @Description add a bee DBClient.
// @Tags bee // @Tags bee
// @Accept json // @Accept json
// @Produce json // @Produce json
...@@ -572,7 +572,7 @@ func BeeAdd(c *fiber.Ctx) error { ...@@ -572,7 +572,7 @@ func BeeAdd(c *fiber.Ctx) error {
// BeeDel godoc // BeeDel godoc
// @Summary BeeDel // @Summary BeeDel
// @Description add a bee client. // @Description add a bee DBClient.
// @Tags bee // @Tags bee
// @Accept json // @Accept json
// @Produce json // @Produce json
......
package main package core
import ( import (
"context" "context"
......
package main package core
import ( import (
"encoding/json" "encoding/json"
......
package main package core
import ( import (
_ "code.wuban.net.cn/odysseus/twitter_syncer/docs" _ "code.wuban.net.cn/odysseus/twitter_syncer/docs"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log/slog" "log/slog"
"time"
// docs are generated by Swag CLI, you have to import them. // docs are generated by Swag CLI, you have to import them.
// replace with your own docs folder, usually "github.com/username/reponame/docs" // replace with your own docs folder, usually "github.com/username/reponame/docs"
//_ "github.com/gofiber/swagger/example/docs" //_ "github.com/gofiber/swagger/example/docs"
...@@ -13,7 +15,7 @@ import ( ...@@ -13,7 +15,7 @@ import (
"github.com/supabase-community/supabase-go" "github.com/supabase-community/supabase-go"
) )
var client *supabase.Client var DBClient *supabase.Client
func init() { func init() {
...@@ -29,7 +31,7 @@ func init() { ...@@ -29,7 +31,7 @@ func init() {
slog.Error("supabase.NewClient", "err", err.Error()) slog.Error("supabase.NewClient", "err", err.Error())
return return
} else { } else {
client = cli DBClient = cli
break break
} }
} }
...@@ -75,7 +77,7 @@ type TaskInDB struct { ...@@ -75,7 +77,7 @@ type TaskInDB struct {
func QueryAllTask() ([]TaskInDB, error) { func QueryAllTask() ([]TaskInDB, error) {
data, count, err := client.From("tasks").Select("*", "exact", false). data, count, err := DBClient.From("tasks").Select("*", "exact", false).
Eq("start", "true").Neq("stop", "true"). Eq("start", "true").Neq("stop", "true").
Execute() Execute()
...@@ -151,7 +153,7 @@ func GetTasks() ([]TaskJob, error) { ...@@ -151,7 +153,7 @@ func GetTasks() ([]TaskJob, error) {
continue continue
} }
// get idx data from each task table. // get idx data from each task table.
data, count, err := client.From(task.TaskType).Select("", "user_id", false). data, count, err := DBClient.From(task.TaskType).Select("", "user_id", false).
Eq("task_id", task.TaskId). Eq("task_id", task.TaskId).
Order("id", &postgrest.OrderOpts{ Order("id", &postgrest.OrderOpts{
Ascending: false, Ascending: false,
...@@ -199,20 +201,34 @@ type UserTask struct { ...@@ -199,20 +201,34 @@ type UserTask struct {
} }
func InsertTaskRes(content []UserTask, tableName string, taskId string) error { func InsertTaskRes(content []UserTask, tableName string, taskId string) error {
page := 5000
rows := make([]UserTask, 0, len(content)) if len(content) > page {
var newcontent []UserTask
for _, v := range content { for i := 0; i < len(content); i += page {
v.TaskId = taskId end := i + page
rows = append(rows, v) if end > len(content) {
} end = len(content)
}
res, _, err := client.From(tableName).Insert(rows, true, "", "representation", "").Execute() newcontent = make([]UserTask, len(content[i:end]))
copy(newcontent, content[i:end])
if err != nil { rows := make([]UserTask, 0, len(newcontent))
return err
for _, v := range newcontent {
v.TaskId = taskId
rows = append(rows, v)
}
res, _, err := DBClient.From(tableName).Insert(rows, true, "", "representation", "").Execute()
if err != nil {
slog.Error("InsertTaskRes", "err", err.Error())
return err
}
slog.Info("InsertTaskRes one page finished", "data count", len(rows))
time.Sleep(1 * time.Second)
_ = res
}
} }
_ = res
return nil return nil
} }
package main package core
import ( import (
"encoding/json" "encoding/json"
...@@ -41,7 +41,7 @@ func TestAddTasks(t *testing.T) { ...@@ -41,7 +41,7 @@ func TestAddTasks(t *testing.T) {
Stop: true, Stop: true,
FollowerCount: 0, FollowerCount: 0,
} }
res, _, err := client.From("tasks").Insert(task, true, "", "representation", "").Execute() res, _, err := DBClient.From("tasks").Insert(task, true, "", "representation", "").Execute()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -49,7 +49,7 @@ func TestAddTasks(t *testing.T) { ...@@ -49,7 +49,7 @@ func TestAddTasks(t *testing.T) {
} }
func TestGetTaskInDB(t *testing.T) { func TestGetTaskInDB(t *testing.T) {
data, count, err := client.From("tasks").Select("*", "exact", false). data, count, err := DBClient.From("tasks").Select("*", "exact", false).
Eq("start", "true").Eq("stop", "true"). Eq("start", "true").Eq("stop", "true").
Execute() Execute()
...@@ -149,7 +149,7 @@ func TestUpdateTask(t *testing.T) { ...@@ -149,7 +149,7 @@ func TestUpdateTask(t *testing.T) {
idstr := strconv.Itoa(task.ID) idstr := strconv.Itoa(task.ID)
t.Log("id str ", idstr, "apiconfig", task.ApiConfig) t.Log("id str ", idstr, "apiconfig", task.ApiConfig)
// update task. // update task.
_, _, err := client.From("tasks").Update(&task, "", "exact").Eq("id", idstr).Execute() _, _, err := DBClient.From("tasks").Update(&task, "", "exact").Eq("id", idstr).Execute()
if err != nil { if err != nil {
t.Error("update task error:", err) t.Error("update task error:", err)
} }
......
package main package core
import ( import (
"container/list" "container/list"
...@@ -36,15 +36,6 @@ func NewIdx(i []UserTask) *Idx { ...@@ -36,15 +36,6 @@ func NewIdx(i []UserTask) *Idx {
func (s *Idx) Idx(page []UserTask) (bool, *list.List) { func (s *Idx) Idx(page []UserTask) (bool, *list.List) {
//if len(s.newIdx) == 0 {
// for k, v := range page {
// s.newIdx = append(s.newIdx, v)
// if k > 5 {
// break
// }
// }
//}
if s.idx != nil && len(s.idx) == 0 { if s.idx != nil && len(s.idx) == 0 {
newList := list.New() newList := list.New()
...@@ -88,44 +79,16 @@ func (s *Idx) Idx(page []UserTask) (bool, *list.List) { ...@@ -88,44 +79,16 @@ func (s *Idx) Idx(page []UserTask) (bool, *list.List) {
} }
return false, nil return false, nil
} }
////TODO 匹配多个元素,防止用户取消;
//for k, v := range page {
//
// match := false
//
// for ik, iv := range s.idx {
// //slog.Info("match", "idx", iv.UserId, "page user id", v.UserId, "page user name", v.UserName)
// if v.UserId == iv.UserId {
// match = true
// break
// _, _ = k, ik
// }
// }
//
// if match {
//
// newList := s.List
// s.List = list.New()
// //idx
// s.idx = s.newIdx
// s.newIdx = make([]UserTask, 0, 10)
//
// fmt.Println("newList", "new users", newList.Len())
//
// return true, newList
//
// } else {
// s.List.PushFront(v)
// }
//}
//return false, nil
} }
type PageUsers struct { type PageUsers struct {
idx *Idx idx *Idx
} }
func (p *PageUsers) GetIdx() *Idx {
return p.idx
}
func NewPageUsers(idx *Idx) *PageUsers { func NewPageUsers(idx *Idx) *PageUsers {
return &PageUsers{ return &PageUsers{
...@@ -145,7 +108,7 @@ func (p *PageUsers) Request(tweetId string, id string, next string, f req) ([]Us ...@@ -145,7 +108,7 @@ func (p *PageUsers) Request(tweetId string, id string, next string, f req) ([]Us
} }
slog.Info("Request", "tweet id", tweetId, "next", next, "newnext", newNext, "len(users)", len(users)) slog.Info("Request", "tweet id", tweetId, "next", next, "newnext", newNext, "len(users)", len(users))
taskUser := userObjectToUserTask(users) taskUser := UserObjectToUserTask(users)
// 查询最新的一页 followers, 去历史中匹配。 // 查询最新的一页 followers, 去历史中匹配。
// 如果已经与历史记录相连接,表明已经完成了查询,不必再继续查next. // 如果已经与历史记录相连接,表明已经完成了查询,不必再继续查next.
...@@ -170,7 +133,7 @@ func (p *PageUsers) Request(tweetId string, id string, next string, f req) ([]Us ...@@ -170,7 +133,7 @@ func (p *PageUsers) Request(tweetId string, id string, next string, f req) ([]Us
return p.Request(tweetId, id, newNext, f) return p.Request(tweetId, id, newNext, f)
} }
func userObjectToUserTask(pageUsers []*twitter.UserObj) []UserTask { func UserObjectToUserTask(pageUsers []*twitter.UserObj) []UserTask {
res := make([]UserTask, 0, len(pageUsers)) res := make([]UserTask, 0, len(pageUsers))
......
package main package core
import ( import (
"testing" "testing"
......
package main package core
import ( import (
"code.wuban.net.cn/odysseus/twitter_syncer/swarm" "code.wuban.net.cn/odysseus/twitter_syncer/swarm"
...@@ -86,6 +86,7 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} { ...@@ -86,6 +86,7 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} {
secondTicker := time.NewTicker(time.Minute * 3) secondTicker := time.NewTicker(time.Minute * 3)
fiveMinutesTicker := time.NewTicker(time.Minute * 1) fiveMinutesTicker := time.NewTicker(time.Minute * 1)
halfHourTicker := time.NewTicker(time.Minute * 30) halfHourTicker := time.NewTicker(time.Minute * 30)
first := true
recordFc := make([]TimeAndFollowCount, 0, 100) recordFc := make([]TimeAndFollowCount, 0, 100)
...@@ -109,6 +110,10 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} { ...@@ -109,6 +110,10 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} {
t.FollowerCount = v.FollowCount t.FollowerCount = v.FollowCount
} }
} }
if first {
maybeFound = true
first = false
}
fmt.Println(" t.FollowerCount", t.FollowerCount, "maybeFound", maybeFound) fmt.Println(" t.FollowerCount", t.FollowerCount, "maybeFound", maybeFound)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment