Commit fa3106c2 authored by Ubuntu's avatar Ubuntu

add new task ok

parent 86bf7697
...@@ -24,7 +24,18 @@ func VerifyRetweeterInDb(tweetId, retweeter string) (bool, error) { ...@@ -24,7 +24,18 @@ func VerifyRetweeterInDb(tweetId, retweeter string) (bool, error) {
return count == 1, nil return count == 1, nil
} }
func AddTaskInsertOrUpdate(req AddTaskReq) error { func VerifyLikeInDb(tweetId, userId string) (bool, error) {
_, count, err := client.From("tweet_liking_users").Select("*", "exact", false).Eq("task_id", tweetId).Eq("user_id", userId).Execute()
if err != nil {
return false, err
}
return count == 1, nil
}
func AddTaskInsert(req AddTaskReq) error {
task := TaskInDB{ task := TaskInDB{
User: req.User, User: req.User,
...@@ -34,7 +45,7 @@ func AddTaskInsertOrUpdate(req AddTaskReq) error { ...@@ -34,7 +45,7 @@ func AddTaskInsertOrUpdate(req AddTaskReq) error {
Stop: false, Stop: false,
} }
res, _, err := client.From("tasks").Insert(task, true, "", "representation", "").Execute() res, _, err := client.From("tasks").Insert(task, false, "", "representation", "").Execute()
_ = res _ = res
...@@ -97,6 +108,36 @@ func QueryProjectByKeysAndToken(cfg ProjectReq) (bool, error) { ...@@ -97,6 +108,36 @@ func QueryProjectByKeysAndToken(cfg ProjectReq) (bool, error) {
} }
func CheckTask(userId, taskId, taskType string) (bool, error) {
_, count, err := client.From("tasks").Select("*", "exact", false).Eq("user_id", userId).Eq("task_id", taskId).Eq("task_type", taskType).Eq("stop", "false").Execute()
if err != nil {
return false, err
}
return count >= 1, nil
}
func QueryProjectByUserId(userId string) ([]ProjectInDb, bool, error) {
data, count, err := client.From("project").Select("*", "exact", false).Eq("user_id", userId).Execute()
if err != nil {
return nil, false, err
}
res := make([]ProjectInDb, 0, count)
if err := json.Unmarshal(data, &res); err != nil {
return nil, false, err
}
return res, count >= 1, nil
}
func QueryAvailableProject() ([]ProjectInDb, error) { func QueryAvailableProject() ([]ProjectInDb, error) {
data, count, err := client.From("project").Select("*", "exact", false).Eq("available", "true").Execute() data, count, err := client.From("project").Select("*", "exact", false).Eq("available", "true").Execute()
......
...@@ -88,7 +88,7 @@ func Project(c *fiber.Ctx) error { ...@@ -88,7 +88,7 @@ func Project(c *fiber.Ctx) error {
} }
cli := NewClient(req.Config, nil) cli := NewClient(req.Config)
me, err := cli.Me() me, err := cli.Me()
...@@ -166,10 +166,64 @@ func TaskAdd(c *fiber.Ctx) error { ...@@ -166,10 +166,64 @@ func TaskAdd(c *fiber.Ctx) error {
//Todo //Todo
// 校验任务 条件是否存在; // 校验任务 条件是否存在;
// req.AddOrStop = true projects, ok, err := QueryProjectByUserId(req.User)
err := AddTaskInsertOrUpdate(req)
if err != nil {
return c.JSON(Res{
Code: 500,
Msg: "QueryProjectByUserId " + req.User,
})
}
if !ok {
return c.JSON(Res{
Code: 500,
Msg: "can not find the user keys and token info " + req.User,
})
}
ok, err = CheckTask(req.User, req.TaskId, req.TaskType)
if err != nil {
return c.JSON(Res{
Code: 500,
Msg: "QCheckTask " + req.User,
})
}
if ok {
return c.JSON(Res{
Code: 500,
Msg: "task already existed",
})
}
//res, _, err := client.From("twitter_task").Insert(req, true, "", "representation", "").Execute() job := TaskJob{
Config: projects[0].Config,
Idx: make([]UserTask, 0),
UserId: req.User,
TaskId: req.TaskId,
TaskType: req.TaskType,
}
fmt.Println(job.String())
err = Worker.AddJob(job)
if err != nil {
slog.Error(" Worker.AddJob", "err", err.Error())
return c.JSON(Res{
Code: 500,
Msg: err.Error(),
})
}
// req.AddOrStop = true
err = AddTaskInsert(req)
if err != nil { if err != nil {
...@@ -267,7 +321,7 @@ func VerifyRetweeter(c *fiber.Ctx) error { ...@@ -267,7 +321,7 @@ func VerifyRetweeter(c *fiber.Ctx) error {
func VerifyFollower(c *fiber.Ctx) error { func VerifyFollower(c *fiber.Ctx) error {
userId := c.Query("user_id") userId := c.Query("user_id")
followerId := c.Query("follower_id") followerId := c.Query("user_id")
if len(userId) == 0 || len(followerId) == 0 { if len(userId) == 0 || len(followerId) == 0 {
slog.Error("VerifyFollower", "userId", userId, "followerId", followerId) slog.Error("VerifyFollower", "userId", userId, "followerId", followerId)
...@@ -301,3 +355,40 @@ func VerifyFollower(c *fiber.Ctx) error { ...@@ -301,3 +355,40 @@ func VerifyFollower(c *fiber.Ctx) error {
}) })
} }
func VerifyLike(c *fiber.Ctx) error {
tweetId := c.Query("tweet_id")
retweeterId := c.Query("user_id")
if len(tweetId) == 0 || len(retweeterId) == 0 {
slog.Error("VerifyFollower", "tweetId", tweetId, "user_id", retweeterId)
return c.JSON(Res{
Code: 500,
Msg: fmt.Sprintf("must provide tweetId [%v] and user_id [%v]", tweetId, retweeterId),
})
}
slog.Info(c.Route().Path, "tweetId", tweetId, "user_id", retweeterId)
ok, err := VerifyRetweeterInDb(tweetId, retweeterId)
if err != nil {
slog.Error("VerifyRetweeter", "tweetId", tweetId, "user_id", retweeterId, "err", err.Error())
return c.JSON(Res{
Code: 500,
//Msg: fmt.Sprint("VerifyFollowerInDb", "tweetId", tweetId, "retweeterId", retweeterId, "err", err.Error()),
Msg: fmt.Sprintf("VerifyRetweeter tweetId %v user_id %v err %v", tweetId, retweeterId, err.Error()),
})
}
return c.JSON(VerifyRes{
Code: 200,
Data: struct {
Ok bool "json:\"ok\""
}{
Ok: ok,
},
})
}
...@@ -52,6 +52,9 @@ type Client struct { ...@@ -52,6 +52,9 @@ type Client struct {
*twitter.Client *twitter.Client
Ratelimiter *rate.Limiter Ratelimiter *rate.Limiter
RetweeterRatelimiter *rate.Limiter
LikingUserRatelimiter *rate.Limiter
} }
type Config struct { type Config struct {
...@@ -62,15 +65,17 @@ type Config struct { ...@@ -62,15 +65,17 @@ type Config struct {
Token string `json:"token"` Token string `json:"token"`
} }
func NewLikeClient(cfg Config) *Client { // func NewLikeClient(cfg Config) *Client {
return NewClient(cfg, LikeRateLimit) // return NewClient(cfg, LikeRateLimit)
} // }
func NewRetweeterClient(cfg Config) *Client { // func NewRetweeterClient(cfg Config) *Client {
return NewClient(cfg, RetweetRateLimit) // return NewClient(cfg, RetweetRateLimit)
} // }
func NewClient(cfg Config, rts *rate.Limiter) *Client { //func NewClient(cfg Config, rts *rate.Limiter) *Client {
func NewClient(cfg Config) *Client {
//twitterAPIKey := "lVnj6Ox9HPcI4LwArSSYU7Pba" //os.Getenv("TWITTER_API_KEY") //twitterAPIKey := "lVnj6Ox9HPcI4LwArSSYU7Pba" //os.Getenv("TWITTER_API_KEY")
//twitterAPIKeySecret := "QMSnWG4QwyXWBVW2hQazzxhw9cSjd32CDfXGkg2DEaUUdscCRZ" //os.Getenv("TWITTER_API_KEY_SECRET") //twitterAPIKeySecret := "QMSnWG4QwyXWBVW2hQazzxhw9cSjd32CDfXGkg2DEaUUdscCRZ" //os.Getenv("TWITTER_API_KEY_SECRET")
...@@ -95,8 +100,10 @@ func NewClient(cfg Config, rts *rate.Limiter) *Client { ...@@ -95,8 +100,10 @@ func NewClient(cfg Config, rts *rate.Limiter) *Client {
} }
return &Client{ return &Client{
Client: twitterClient, Client: twitterClient,
Ratelimiter: rts, // Ratelimiter: rts,
RetweeterRatelimiter: RetweetRateLimit,
LikingUserRatelimiter: LikeRateLimit,
} }
} }
...@@ -135,7 +142,9 @@ func (c *Client) Retweeters(tweetId string, next string) ([]*twitter.UserObj, st ...@@ -135,7 +142,9 @@ func (c *Client) Retweeters(tweetId string, next string) ([]*twitter.UserObj, st
// TODO: Fix performance by removing unneeded allocaton here // TODO: Fix performance by removing unneeded allocaton here
ctx := context.Background() ctx := context.Background()
err := c.Ratelimiter.Wait(ctx) // This is a blocking call. //err := c.Ratelimiter.Wait(ctx) // This is a blocking call.
err := c.RetweeterRatelimiter.Wait(ctx) // This is a blocking call.
if err != nil { if err != nil {
return nil, "", nil, err return nil, "", nil, err
} }
...@@ -160,7 +169,8 @@ func (c *Client) TweetLikingUsers(tweetId string, next string) ([]*twitter.UserO ...@@ -160,7 +169,8 @@ func (c *Client) TweetLikingUsers(tweetId string, next string) ([]*twitter.UserO
// TODO: Fix performance by removing unneeded allocaton here // TODO: Fix performance by removing unneeded allocaton here
ctx := context.Background() ctx := context.Background()
err := c.Ratelimiter.Wait(ctx) // This is a blocking call. //err := c.Ratelimiter.Wait(ctx) // This is a blocking call.
err := c.LikingUserRatelimiter.Wait(ctx) // This is a blocking call.
if err != nil { if err != nil {
return nil, "", nil, err return nil, "", nil, err
} }
......
...@@ -249,6 +249,12 @@ type TaskJob struct { ...@@ -249,6 +249,12 @@ type TaskJob struct {
Config Config
} }
func (job *TaskJob) String() string {
jobAsJson, _ := json.Marshal(job)
return string(jobAsJson)
}
func GetTasks() ([]TaskJob, error) { func GetTasks() ([]TaskJob, error) {
tasks, err := QueryAllTask() tasks, err := QueryAllTask()
...@@ -337,7 +343,7 @@ func InsertTaskRes(content []UserTask, tableName string, taskId string) error { ...@@ -337,7 +343,7 @@ func InsertTaskRes(content []UserTask, tableName string, taskId string) error {
rows = append(rows, v) rows = append(rows, v)
} }
res, _, err := client.From("retweeters").Insert(rows, true, "", "representation", "").Execute() res, _, err := client.From(tableName).Insert(rows, true, "", "representation", "").Execute()
if err != nil { if err != nil {
return err return err
......
...@@ -180,6 +180,57 @@ paths: ...@@ -180,6 +180,57 @@ paths:
type: string type: string
data: data:
$ref: '#/components/schemas/verify_res' $ref: '#/components/schemas/verify_res'
/verify/like:
get:
tags:
- Verify
summary: Verify the provide user
parameters:
- name: tweet_id
in: query
required: true
explode: true
type: string
# default: available
example: "1800805503066661056"
- name: user_id
in: query
required: true
explode: true
type: string
# default: available
example: "1823984946710765569"
- name: begin_time
in: query
type: string
format: date-time
example: "2024-01-01T00:00:00Z"
# default: "2024-10-01T00:00:00Z"
- name: end_time
in: query
type: string
format: date-time
example: "2024-12-01T00:00:00Z"
# default: "2024-10-01T00:00:00Z"
responses:
"200":
description: successful operation
content:
application/json:
schema:
type: object
properties:
code:
type: integer
format: int64
enum: [200, 500]
msg:
type: string
data:
$ref: '#/components/schemas/verify_res'
/verify/retweeter: /verify/retweeter:
get: get:
tags: tags:
...@@ -192,14 +243,14 @@ paths: ...@@ -192,14 +243,14 @@ paths:
explode: true explode: true
type: string type: string
# default: available # default: available
example: "1807764329489375585" example: "1800805503066661056"
- name: retweeter_id - name: retweeter_id
in: query in: query
required: true required: true
explode: true explode: true
type: string type: string
# default: available # default: available
example: "1686945219571589120" example: "1823984946710765569"
- name: begin_time - name: begin_time
in: query in: query
type: string type: string
......
...@@ -65,7 +65,7 @@ func (s *Idx) Idx(page []UserTask) (bool, *list.List) { ...@@ -65,7 +65,7 @@ func (s *Idx) Idx(page []UserTask) (bool, *list.List) {
match := false match := false
for ik, iv := range s.idx { for ik, iv := range s.idx {
slog.Info("match", "idx", iv.UserId, "page user id", v.UserId, "page user name", v.UserName) slog.Debug("match", "idx", iv.UserId, "page user id", v.UserId, "page user name", v.UserName)
if v.UserId == iv.UserId { if v.UserId == iv.UserId {
match = true match = true
_, _ = k, ik _, _ = k, ik
......
package main package main
import ( import (
"fmt"
"log/slog" "log/slog"
// "github.com/gofiber/fiber/v2" // "github.com/gofiber/fiber/v2"
...@@ -35,6 +36,7 @@ func main() { ...@@ -35,6 +36,7 @@ func main() {
go func() { go func() {
for _, task := range tasks { for _, task := range tasks {
fmt.Println(task.String())
if err := Worker.AddJob(task); err != nil { if err := Worker.AddJob(task); err != nil {
slog.Error(err.Error()) slog.Error(err.Error())
} }
...@@ -67,6 +69,8 @@ func main() { ...@@ -67,6 +69,8 @@ func main() {
app.Post("/task/stop", TaskStop) app.Post("/task/stop", TaskStop)
app.Get("/verify/follower", VerifyFollower) app.Get("/verify/follower", VerifyFollower)
app.Get("/verify/retweeter", VerifyRetweeter) app.Get("/verify/retweeter", VerifyRetweeter)
//VerifyLike
app.Get("/verify/like", VerifyLike)
if err := app.Listen(":8001"); err != nil { if err := app.Listen(":8001"); err != nil {
slog.Error(err.Error()) slog.Error(err.Error())
......
File added
...@@ -8,14 +8,17 @@ import ( ...@@ -8,14 +8,17 @@ import (
type Work struct { type Work struct {
Lock sync.Mutex Lock sync.Mutex
Task map[string]chan<- interface{}
Task map[string]chan<- interface{}
userClient map[string]*Client
} }
var Worker Work var Worker Work
func init() { func init() {
Worker = Work{ Worker = Work{
Task: make(map[string]chan<- interface{}), Task: make(map[string]chan<- interface{}),
userClient: make(map[string]*Client),
} }
} }
...@@ -39,8 +42,14 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} { ...@@ -39,8 +42,14 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} {
done := make(chan interface{}) done := make(chan interface{})
go func() { go func() {
var cli *Client
cli := NewRetweeterClient(t.Config) if v, ok := w.userClient[t.UserId]; ok {
cli = v
} else {
cli = NewClient(t.Config)
w.userClient[t.UserId] = cli
}
page := NewPageUsers(NewIdx(t.Idx)) page := NewPageUsers(NewIdx(t.Idx))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment