Commit ff5874c9 authored by Ubuntu's avatar Ubuntu

update task table

parent 55dfb842
...@@ -32,13 +32,22 @@ type VerifyRes struct { ...@@ -32,13 +32,22 @@ type VerifyRes struct {
} `json:"data"` } `json:"data"`
} }
type Req struct { /*
{
"user_id": "1570057485914087429",
"project": "OnlyDD_D",
"task_type": "follow",
"start": true
}
*/
type AddTaskReq struct {
// AddOrStop bool // AddOrStop bool
Project string `json:"project"` User string `json:"user"`
TweetId string `json:"tweet_id"` TaskType string `json:"task_type"`
// Like bool `json:"like"` TaskId string `json:"task_id"`
Follow bool `json:"follow"`
Retweet bool `json:"retweet"`
} }
func TaskAdd(c *fiber.Ctx) error { func TaskAdd(c *fiber.Ctx) error {
...@@ -47,7 +56,7 @@ func TaskAdd(c *fiber.Ctx) error { ...@@ -47,7 +56,7 @@ func TaskAdd(c *fiber.Ctx) error {
slog.Info(c.Route().Path, "body", string(c.Request().Body())) slog.Info(c.Route().Path, "body", string(c.Request().Body()))
req := Req{} req := AddTaskReq{}
if err := json.Unmarshal(c.Request().Body(), &req); err != nil { if err := json.Unmarshal(c.Request().Body(), &req); err != nil {
slog.Error("json.Unmarshal(c.Request().Body(), &req)", "err", err.Error()) slog.Error("json.Unmarshal(c.Request().Body(), &req)", "err", err.Error())
...@@ -57,24 +66,18 @@ func TaskAdd(c *fiber.Ctx) error { ...@@ -57,24 +66,18 @@ func TaskAdd(c *fiber.Ctx) error {
}) })
} }
slog.Info(c.Route().Path, "Project", req.Project, "TweetId", req.TweetId, "Retweet", req.Retweet, "Follow", req.Follow) slog.Info(c.Route().Path, "user", req.User, "TaskType", req.TaskType, "TaskId", req.TaskId)
if req.Project == "" || req.TweetId == "" {
return c.JSON(Res{
Code: 500,
Msg: "must provide project and tweet_id",
})
}
if !req.Retweet && !req.Follow { if req.TaskType == "" || req.TaskId == "" {
return c.JSON(Res{ return c.JSON(Res{
Code: 500, Code: 500,
Msg: "like or retweet must be true", Msg: "must provide TaskId and TaskType",
}) })
} }
//Todo
// 校验任务 条件是否存在;
// req.AddOrStop = true // req.AddOrStop = true
err := AddTaskInsertOrUpdate(req) err := AddTaskInsertOrUpdate(req)
//res, _, err := client.From("twitter_task").Insert(req, true, "", "representation", "").Execute() //res, _, err := client.From("twitter_task").Insert(req, true, "", "representation", "").Execute()
...@@ -99,33 +102,26 @@ func TaskStop(c *fiber.Ctx) error { ...@@ -99,33 +102,26 @@ func TaskStop(c *fiber.Ctx) error {
slog.Info(c.Route().Path, "body", string(c.Request().Body())) slog.Info(c.Route().Path, "body", string(c.Request().Body()))
// fmt.Println(string(c.Request().Body())) req := AddTaskReq{}
req := Req{}
if err := json.Unmarshal(c.Request().Body(), &req); err != nil { if err := json.Unmarshal(c.Request().Body(), &req); err != nil {
slog.Error("json.Unmarshal(c.Request().Body(), &req)", "err", err.Error())
return c.JSON(Res{ return c.JSON(Res{
Code: 500, Code: 500,
Msg: err.Error(), Msg: err.Error(),
}) })
} }
slog.Info(c.Route().Path, "Project", req.Project, "TweetId", req.TweetId, "Retweet", req.Retweet, "Follow", req.Follow) slog.Info(c.Route().Path, "user", req.User, "TaskType", req.TaskType, "TaskId", req.TaskId)
if req.Project == "" || req.TweetId == "" { if req.TaskType == "" || req.TaskId == "" {
return c.JSON(Res{ return c.JSON(Res{
Code: 500, Code: 500,
Msg: "must provide project and tweet_id", Msg: "must provide TaskId and TaskType",
})
}
if !req.Retweet && !req.Follow {
return c.JSON(Res{
Code: 200,
}) })
} }
err := StopTaskInsertOrUpdate(req) err := StopTaskUpdate(req)
if err != nil { if err != nil {
......
package main package main
import ( import "encoding/json"
"encoding/json"
"fmt"
_ "code.wuban.net.cn/odysseus/twitter_syncer/docs" type Task struct {
// docs are generated by Swag CLI, you have to import them. // ID int `json:"id"`
// replace with your own docs folder, usually "github.com/username/reponame/docs" User string `json:"user"`
//_ "github.com/gofiber/swagger/example/docs" TaskType string `json:"task_type"`
) TaskId string `json:"task_id"`
Start bool `json:"start"`
Stop bool `json:"stop"`
}
// var client *supabase.Client func QueryAllTask() ([]Task, error) {
// func init() { data, count, err := client.From("tasks").Select("*", "exact", false).
Eq("start", "true").Neq("stop", "true").
Execute()
// var API_URL = "http://43.198.54.207:8000" if err != nil {
// var API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJzZXJ2aWNlX3JvbGUiLAogICAgImlzcyI6ICJzdXBhYmFzZS1kZW1vIiwKICAgICJpYXQiOiAxNjQxNzY5MjAwLAogICAgImV4cCI6IDE3OTk1MzU2MDAKfQ.DaYlNEoUrrEn2Ig7tqibS-PHK5vgusbcbo7X36XVt4Q" return nil, err
}
// for { _ = count
// cli, err := supabase.NewClient(API_URL, API_KEY, &supabase.ClientOptions{
// Schema: "twitter",
// })
// if err != nil {
// slog.Error("supabase.NewClient", "err", err.Error())
// return
// } else {
// client = cli
// break
// }
// }
// }
func AddTaskInsertOrUpdate(req Req) error { // fmt.Println(count, string(data))
task, ok, err := Query(req) res := make([]Task, 0, count)
if err != nil { if err := json.Unmarshal(data, &res); err != nil {
return err return nil, err
} }
_ = task return res, nil
if ok {
//update
return AddTaskUpdate(req, task)
} else {
//insert
return Insert(req)
}
return nil
} }
func AddTaskUpdate(req Req, task Task) error { func VerifyFollowerInDb(userId, followerId string) (bool, error) {
if !task.Follow && req.Follow { _, count, err := client.From("followers").Select("*", "exact", false).Eq("user_id", userId).Eq("follower_id", followerId).Execute()
task.Follow = true
}
if !task.Retweet && req.Retweet { if err != nil {
task.Retweet = true return false, err
} }
res, _, err := client.From("tasks").Update(&struct { return count == 1, nil
Retweet bool `json:"retweet"`
Follow bool `json:"follow"`
}{
Retweet: task.Retweet,
Follow: task.Follow,
}, "", "exact").Eq("project", req.Project).Eq("tweet_id", req.TweetId).Execute()
_ = res
return err
} }
func StopTaskInsertOrUpdate(req Req) error { func VerifyRetweeterInDb(tweetId, retweeter string) (bool, error) {
task, ok, err := Query(req) _, count, err := client.From("retweeters").Select("*", "exact", false).Eq("tweet_id", tweetId).Eq("retweeter_id", retweeter).Execute()
if err != nil { if err != nil {
return err return false, err
}
if !ok {
return fmt.Errorf("can not stop the task,no record")
} }
return StopTaskUpdate(req, task) return count == 1, nil
} }
func StopTaskUpdate(req Req, task Task) error { func AddTaskInsertOrUpdate(req AddTaskReq) error {
if !task.FollowStop && req.Follow { task := Task{
task.FollowStop = true User: req.User,
TaskType: req.TaskType,
TaskId: req.TaskId,
Start: true,
Stop: false,
} }
if !task.RetweetStop && req.Retweet { res, _, err := client.From("tasks").Insert(task, true, "", "representation", "").Execute()
task.RetweetStop = true
}
res, _, err := client.From("tasks").Update(&struct {
Retweet bool `json:"retweet_stop"`
Follow bool `json:"follow_stop"`
}{
Retweet: task.RetweetStop,
Follow: task.FollowStop,
}, "", "exact").Eq("project", req.Project).Eq("tweet_id", req.TweetId).Execute()
_ = res _ = res
return err return err
//return nil
} }
func Insert(req Req) error { func StopTaskUpdate(req AddTaskReq) error {
res, _, err := client.From("tasks").Insert(req, true, "", "representation", "").Execute() //res, _, err := client.From("tasks").Insert(task, true, "", "representation", "").Execute()
res, _, err := client.From("tasks").Update(&struct {
Stop bool `json:"stop"`
}{
Stop: true,
}, "", "exact").Eq("task_type", req.TaskType).Eq("task_id", req.TaskId).Execute()
_ = res _ = res
return err return err
} }
func Query(req Req) (Task, bool, error) {
data, count, err := client.From("tasks").Select("*", "exact", false).Eq("project", req.Project).Eq("tweet_id", req.TweetId).Execute()
if err != nil {
return Task{}, false, err
}
if count == 0 {
return Task{}, false, nil
}
tasks := make([]Task, 0)
if err := json.Unmarshal(data, &tasks); err != nil {
return Task{}, false, err
}
var task Task
if len(tasks) == 1 {
task = tasks[0]
} else {
return Task{}, false, fmt.Errorf("expected raws len ==1, but actually %d", len(tasks))
}
return task, true, nil
}
/*
[{"id":3,"created_at":"2024-07-30T09:18:22.110223+00:00","project":"1","tweet_id":"1","like":true,"retweet":true,"like_stop":null,"retweet_stop":null}]
*/
// type Tasks []Task
type Task struct {
Project string `json:"project"`
TweetId string `json:"tweet_id"`
Like bool `json:"like"`
Follow bool `json:"follow"`
Retweet bool `json:"retweet"`
LikeStop bool `json:"like_stop"`
FollowStop bool `json:"follow_stop"`
RetweetStop bool `json:"retweet_stop"`
}
func VerifyFollowerInDb(userId, followerId string) (bool, error) {
_, count, err := client.From("followers").Select("*", "exact", false).Eq("user_id", userId).Eq("follower_id", followerId).Execute()
if err != nil {
return false, err
}
return count == 1, nil
}
func VerifyRetweeterInDb(tweetId, retweeter string) (bool, error) {
_, count, err := client.From("retweeters").Select("*", "exact", false).Eq("tweet_id", tweetId).Eq("retweeter_id", retweeter).Execute()
if err != nil {
return false, err
}
return count == 1, nil
}
package main package main
import ( import (
"encoding/json"
"log/slog" "log/slog"
_ "code.wuban.net.cn/odysseus/twitter_syncer/docs" _ "code.wuban.net.cn/odysseus/twitter_syncer/docs"
...@@ -31,3 +32,22 @@ func init() { ...@@ -31,3 +32,22 @@ func init() {
} }
} }
} }
func TwitterAccountFromDB() ([]TwitterAccount, error) {
data, count, err := client.From("account").Select("*", "exact", false).Eq("available", "true").Execute()
if err != nil {
return nil, err
}
slog.Info("TwitterAccountFromDB", "count", count)
res := make([]TwitterAccount, 0, count)
if err := json.Unmarshal(data, &res); err != nil {
return nil, err
}
return res, nil
}
package main package main
import ( import (
"fmt"
"testing" "testing"
) )
func TestQueryTask(t *testing.T) { func TestQueryTask(t *testing.T) {
req := Req{ tasks, err := QueryAllTask()
Project: "1",
TweetId: "1",
}
task, ok, err := Query(req)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
fmt.Println("found", ok, task) for k, v := range tasks {
t.Log(k, "v.User", v.User, "v.TaskType", v.TaskType, "v.TaskId", v.TaskId)
}
} }
func TestInsertTask(t *testing.T) { // func TestInsertTask(t *testing.T) {
req := Req{ // req := Req{
Project: "1", // Project: "1",
TweetId: "11", // TweetId: "11",
} // }
err := Insert(req) // err := Insert(req)
// if err != nil {
// t.Fatal(err)
// }
// // fmt.Println("found", ok, task)
// }
func TestTwitterAccountFromDB(t *testing.T) {
accounts, err := TwitterAccountFromDB()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
for k, v := range accounts {
t.Log(k, v.User, v.PassWd)
}
// fmt.Println("found", ok, task) // fmt.Println("found", ok, task)
} }
...@@ -67,12 +67,11 @@ paths: ...@@ -67,12 +67,11 @@ paths:
tags: tags:
- Task - Task
requestBody: requestBody:
description: like 或 retweet 为true的时候,开启对应的同步任务;
content: content:
application/json: application/json:
schema: schema:
type: object type: object
$ref: "#/components/schemas/task" $ref: "#/components/schemas/AddTaskReq"
responses: responses:
"200": "200":
description: successful operation description: successful operation
...@@ -94,12 +93,11 @@ paths: ...@@ -94,12 +93,11 @@ paths:
tags: tags:
- Task - Task
requestBody: requestBody:
description: like 或 retweet 为true的时候,停止指定任务like或retweet;
content: content:
application/json: application/json:
schema: schema:
type: object type: object
$ref: "#/components/schemas/task" $ref: "#/components/schemas/StopTaskReq"
responses: responses:
"200": "200":
description: successful operation description: successful operation
...@@ -225,22 +223,45 @@ components: ...@@ -225,22 +223,45 @@ components:
write:pets: modify pets in your account write:pets: modify pets in your account
type: oauth2 type: oauth2
schemas: schemas:
task: AddTaskReq:
required:
- task_id
- task_type
type: object
properties:
user:
description: 方便识别用户身份信息
type: string
example: "OnlyDD_D"
task_type:
type: string
enum:
- follow
- retweet
example: follow
task_id:
description: user id or full name(不要各提交一次,现在没有校验。) or tweet id;
type: string
example: "1570057485914087429"
StopTaskReq:
required: required:
- project - project
- tweet_id - task_type
type: object type: object
properties: properties:
project: task_type:
type: string type: string
tweet_id: enum:
- follow
- retweet
example: follow
task_id:
description: user id or full name for follow, tweet id for retweet.
type: string type: string
like_cancel: example: "1570057485914087429"
type: boolean # start:
follow: # type: boolean
type: boolean # example: true
retweet:
type: boolean
verify_res: verify_res:
required: required:
- ok - ok
......
...@@ -11,10 +11,9 @@ import ( ...@@ -11,10 +11,9 @@ import (
"github.com/supabase-community/postgrest-go" "github.com/supabase-community/postgrest-go"
) )
func GetTasks() ([]Task, error) { func GetFollowTasks() ([]Task, error) {
data, count, err := client.From("tasks").Select("*", "exact", false). data, count, err := client.From("tasks").Select("*", "exact", false).
Eq("retweet", "true").Neq("retweet_stop", "true").
Eq("follow", "true").Neq("follow_stop", "true"). Eq("follow", "true").Neq("follow_stop", "true").
Execute() Execute()
...@@ -44,7 +43,7 @@ func GetTasksFollowIdx() ([][]FollowerId, error) { ...@@ -44,7 +43,7 @@ func GetTasksFollowIdx() ([][]FollowerId, error) {
for _, task := range tasks { for _, task := range tasks {
data, count, err := client.From("followers").Select("", "user_id", false). data, count, err := client.From("followers").Select("", "user_id", false).
Eq("user_id", task.Project). Eq("user_id", task.TaskId).
Order("id", &postgrest.OrderOpts{ Order("id", &postgrest.OrderOpts{
Ascending: false, Ascending: false,
// NullsFirst bool // NullsFirst bool
...@@ -71,9 +70,13 @@ func GetTasksFollowIdx() ([][]FollowerId, error) { ...@@ -71,9 +70,13 @@ func GetTasksFollowIdx() ([][]FollowerId, error) {
return res, nil return res, nil
} }
func GetFollowTasks() ([]Task, error) { //////////////////
// ///////////////
func GetTasks() ([]Task, error) {
data, count, err := client.From("tasks").Select("*", "exact", false). data, count, err := client.From("tasks").Select("*", "exact", false).
Eq("retweet", "true").Neq("retweet_stop", "true").
Eq("follow", "true").Neq("follow_stop", "true"). Eq("follow", "true").Neq("follow_stop", "true").
Execute() Execute()
......
...@@ -42,8 +42,8 @@ func main() { ...@@ -42,8 +42,8 @@ func main() {
app.Post("/task/add", TaskAdd) app.Post("/task/add", TaskAdd)
app.Post("/task/stop", TaskStop) app.Post("/task/stop", TaskStop)
app.Get("/verify/follower", VerifyFollower) // app.Get("/verify/follower", VerifyFollower)
app.Get("/verify/retweeter", VerifyRetweeter) // app.Get("/verify/retweeter", VerifyRetweeter)
if err := app.Listen(":8001"); err != nil { if err := app.Listen(":8001"); err != nil {
slog.Error(err.Error()) slog.Error(err.Error())
......
...@@ -15,7 +15,7 @@ func newSync() error { ...@@ -15,7 +15,7 @@ func newSync() error {
idxStream := TaskIdx(done, connStream) idxStream := TaskIdx(done, connStream)
resStream, err := InitResource(users) resStream, err := InitResource()
if err != nil { if err != nil {
return err return err
} }
...@@ -42,11 +42,17 @@ type ScraperTimer struct { ...@@ -42,11 +42,17 @@ type ScraperTimer struct {
Timer time.Timer Timer time.Timer
} }
func InitResource(users []TwitterAccount) (chan ScraperTimer, error) { func InitResource() (chan ScraperTimer, error) {
outStream := make(chan ScraperTimer, 1000) outStream := make(chan ScraperTimer, 1000)
for _, v := range users { accounts, err := TwitterAccountFromDB()
if err != nil {
return nil, err
}
for _, v := range accounts {
scraper, err := InitScraper(v.User, v.PassWd) scraper, err := InitScraper(v.User, v.PassWd)
......
package main package main
import "testing" import (
"fmt"
"testing"
"time"
)
func add1[T int | float32 | float64](a, b T) T { func add1[T int | float32 | float64](a, b T) T {
c := a + b c := a + b
...@@ -65,3 +69,21 @@ strchan := make(gchan[string]) ...@@ -65,3 +69,21 @@ strchan := make(gchan[string])
//NewTask //NewTask
//func InitTask(done <-chan interface{}, instream <-chan any) { //func InitTask(done <-chan interface{}, instream <-chan any) {
func TestTimer(t *testing.T) {
timer1 := time.NewTimer(2 * time.Second)
time.Sleep(4 * time.Second)
<-timer1.C
fmt.Println("Timer 1 fired")
timer1.Reset(2 * time.Second)
time.Sleep(4 * time.Second)
<-timer1.C
fmt.Println("Timer 1 fired")
}
...@@ -129,9 +129,9 @@ type TwitterAccount struct { ...@@ -129,9 +129,9 @@ type TwitterAccount struct {
PassWd string PassWd string
} }
var users []TwitterAccount = []TwitterAccount{ // var users []TwitterAccount = []TwitterAccount{
TwitterAccount{ // TwitterAccount{
User: "Wade_Leeeee", // User: "Wade_Leeeee",
PassWd: "923881393time", // PassWd: "923881393time",
}, // },
} // }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment