Commit 7bc8253a authored by Ubuntu's avatar Ubuntu

retweet wait 5m

parent fa3106c2
...@@ -88,7 +88,7 @@ func Project(c *fiber.Ctx) error { ...@@ -88,7 +88,7 @@ func Project(c *fiber.Ctx) error {
} }
cli := NewClient(req.Config) cli := NewClient(req.Config, nil)
me, err := cli.Me() me, err := cli.Me()
...@@ -264,6 +264,13 @@ func TaskStop(c *fiber.Ctx) error { ...@@ -264,6 +264,13 @@ func TaskStop(c *fiber.Ctx) error {
}) })
} }
if err := Worker.StopJob(req.User, req.TaskType); err != nil {
return c.JSON(Res{
Code: 500,
Msg: err.Error(),
})
}
err := StopTaskUpdate(req) err := StopTaskUpdate(req)
if err != nil { if err != nil {
......
...@@ -38,7 +38,7 @@ PER USER ...@@ -38,7 +38,7 @@ PER USER
*/ */
var LikeRateLimit *rate.Limiter = rate.NewLimiter(rate.Every(3*time.Minute), 1) var LikeRateLimit *rate.Limiter = rate.NewLimiter(rate.Every(3*time.Minute), 1)
var RetweetRateLimit *rate.Limiter = rate.NewLimiter(rate.Every(3*time.Minute), 1) var RetweetRateLimit *rate.Limiter = rate.NewLimiter(rate.Every(5*time.Minute), 1)
type authorize struct { type authorize struct {
Token string Token string
...@@ -65,17 +65,17 @@ type Config struct { ...@@ -65,17 +65,17 @@ type Config struct {
Token string `json:"token"` Token string `json:"token"`
} }
// func NewLikeClient(cfg Config) *Client { func NewLikeClient(cfg Config) *Client {
// return NewClient(cfg, LikeRateLimit) return NewClient(cfg, LikeRateLimit)
// } }
// func NewRetweeterClient(cfg Config) *Client { func NewRetweeterClient(cfg Config) *Client {
// return NewClient(cfg, RetweetRateLimit) return NewClient(cfg, RetweetRateLimit)
// } }
//func NewClient(cfg Config, rts *rate.Limiter) *Client { func NewClient(cfg Config, rts *rate.Limiter) *Client {
func NewClient(cfg Config) *Client { //func NewClient(cfg Config) *Client {
//twitterAPIKey := "lVnj6Ox9HPcI4LwArSSYU7Pba" //os.Getenv("TWITTER_API_KEY") //twitterAPIKey := "lVnj6Ox9HPcI4LwArSSYU7Pba" //os.Getenv("TWITTER_API_KEY")
//twitterAPIKeySecret := "QMSnWG4QwyXWBVW2hQazzxhw9cSjd32CDfXGkg2DEaUUdscCRZ" //os.Getenv("TWITTER_API_KEY_SECRET") //twitterAPIKeySecret := "QMSnWG4QwyXWBVW2hQazzxhw9cSjd32CDfXGkg2DEaUUdscCRZ" //os.Getenv("TWITTER_API_KEY_SECRET")
...@@ -100,10 +100,10 @@ func NewClient(cfg Config) *Client { ...@@ -100,10 +100,10 @@ func NewClient(cfg Config) *Client {
} }
return &Client{ return &Client{
Client: twitterClient, Client: twitterClient,
// Ratelimiter: rts, Ratelimiter: rts,
RetweeterRatelimiter: RetweetRateLimit, //RetweeterRatelimiter: RetweetRateLimit,
LikingUserRatelimiter: LikeRateLimit, //LikingUserRatelimiter: LikeRateLimit,
} }
} }
...@@ -142,9 +142,9 @@ func (c *Client) Retweeters(tweetId string, next string) ([]*twitter.UserObj, st ...@@ -142,9 +142,9 @@ func (c *Client) Retweeters(tweetId string, next string) ([]*twitter.UserObj, st
// TODO: Fix performance by removing unneeded allocaton here // TODO: Fix performance by removing unneeded allocaton here
ctx := context.Background() ctx := context.Background()
//err := c.Ratelimiter.Wait(ctx) // This is a blocking call. err := c.Ratelimiter.Wait(ctx) // This is a blocking call.
err := c.RetweeterRatelimiter.Wait(ctx) // This is a blocking call. //err := c.RetweeterRatelimiter.Wait(ctx) // This is a blocking call.
if err != nil { if err != nil {
return nil, "", nil, err return nil, "", nil, err
} }
...@@ -169,8 +169,8 @@ func (c *Client) TweetLikingUsers(tweetId string, next string) ([]*twitter.UserO ...@@ -169,8 +169,8 @@ func (c *Client) TweetLikingUsers(tweetId string, next string) ([]*twitter.UserO
// TODO: Fix performance by removing unneeded allocaton here // TODO: Fix performance by removing unneeded allocaton here
ctx := context.Background() ctx := context.Background()
//err := c.Ratelimiter.Wait(ctx) // This is a blocking call. err := c.Ratelimiter.Wait(ctx) // This is a blocking call.
err := c.LikingUserRatelimiter.Wait(ctx) // This is a blocking call. //err := c.LikingUserRatelimiter.Wait(ctx) // This is a blocking call.
if err != nil { if err != nil {
return nil, "", nil, err return nil, "", nil, err
} }
...@@ -192,7 +192,7 @@ func (c *Client) TweetLikingUsers(tweetId string, next string) ([]*twitter.UserO ...@@ -192,7 +192,7 @@ func (c *Client) TweetLikingUsers(tweetId string, next string) ([]*twitter.UserO
} }
func (c *Client) Me() (map[string]*twitter.UserDictionary, error) { func (c *Client) Usage() (map[string]*twitter.UserDictionary, error) {
// ctx is generated here only to use with Ratelimiter // ctx is generated here only to use with Ratelimiter
// TODO: Fix performance by removing unneeded allocaton here // TODO: Fix performance by removing unneeded allocaton here
......
...@@ -358,7 +358,7 @@ components: ...@@ -358,7 +358,7 @@ components:
example: "1800805503066661056" example: "1800805503066661056"
StopTaskReq: StopTaskReq:
required: required:
- task_id - user_id
- task_type - task_type
type: object type: object
properties: properties:
...@@ -374,7 +374,6 @@ components: ...@@ -374,7 +374,6 @@ components:
type: string type: string
example: "1800805503066661056" example: "1800805503066661056"
user_id: user_id:
description: 方便识别用户身份信息
type: string type: string
example: "1783145144700874752" example: "1783145144700874752"
# start: # start:
......
No preview for this file type
...@@ -9,19 +9,35 @@ import ( ...@@ -9,19 +9,35 @@ import (
type Work struct { type Work struct {
Lock sync.Mutex Lock sync.Mutex
Task map[string]chan<- interface{} Task map[string]chan<- interface{}
userClient map[string]*Client // userClient map[string]*Client
} }
var Worker Work var Worker Work
func init() { func init() {
Worker = Work{ Worker = Work{
Task: make(map[string]chan<- interface{}), Task: make(map[string]chan<- interface{}),
userClient: make(map[string]*Client), //userClient: make(map[string]*Client),
} }
} }
func (w *Work) StopJob(userId, taskType string) error {
w.Lock.Lock()
if v, ok := w.Task[userId+"-"+taskType]; ok {
close(v)
} else {
return fmt.Errorf("%s do not run", userId+"-"+taskType)
}
// done := w.RunJob(t)
// w.Task[t.UserId+"-"+t.TaskType] = done
w.Lock.Unlock()
return nil
}
func (w *Work) AddJob(t TaskJob) error { func (w *Work) AddJob(t TaskJob) error {
w.Lock.Lock() w.Lock.Lock()
...@@ -44,11 +60,10 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} { ...@@ -44,11 +60,10 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} {
go func() { go func() {
var cli *Client var cli *Client
if v, ok := w.userClient[t.UserId]; ok { if t.TaskType == RetweetType {
cli = v cli = NewRetweeterClient(t.Config)
} else { } else {
cli = NewClient(t.Config) cli = NewLikeClient(t.Config)
w.userClient[t.UserId] = cli
} }
page := NewPageUsers(NewIdx(t.Idx)) page := NewPageUsers(NewIdx(t.Idx))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment