Commit 8466a42e authored by vicotor's avatar vicotor

update client to rapid

parent 80bc1592
...@@ -234,6 +234,7 @@ func TaskAdd(c *fiber.Ctx) error { ...@@ -234,6 +234,7 @@ func TaskAdd(c *fiber.Ctx) error {
} }
fc := 0 fc := 0
uid := req.User
if req.TaskType == FollowType { // do some check. if req.TaskType == FollowType { // do some check.
// follower task count < available account count. // follower task count < available account count.
if passed, err := CheckFollowerTaskAndAccountCount(); err != nil { if passed, err := CheckFollowerTaskAndAccountCount(); err != nil {
...@@ -248,7 +249,7 @@ func TaskAdd(c *fiber.Ctx) error { ...@@ -248,7 +249,7 @@ func TaskAdd(c *fiber.Ctx) error {
}) })
} }
fc, err = swarm.GetSwarm().GetFollowerCount(req.TaskId) uid, fc, err = swarm.GetSwarm().GetUserProfile(req.TaskId)
if err != nil { if err != nil {
return c.JSON(Res{ return c.JSON(Res{
Code: 500, Code: 500,
...@@ -277,7 +278,7 @@ func TaskAdd(c *fiber.Ctx) error { ...@@ -277,7 +278,7 @@ func TaskAdd(c *fiber.Ctx) error {
job := TaskJob{ job := TaskJob{
Config: req.ApiConfig, Config: req.ApiConfig,
Idx: make([]UserTask, 0), Idx: make([]UserTask, 0),
UserId: req.User, UserId: uid,
TaskId: req.TaskId, TaskId: req.TaskId,
TaskType: req.TaskType, TaskType: req.TaskType,
FollowerCount: fc, FollowerCount: fc,
......
...@@ -156,7 +156,7 @@ func (c *Client) MyInfo() (UserInfo, error) { ...@@ -156,7 +156,7 @@ func (c *Client) MyInfo() (UserInfo, error) {
return UserInfo{}, fmt.Errorf("no user info found") return UserInfo{}, fmt.Errorf("no user info found")
} }
func (c *Client) Retweeters(tweetId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) { func (c *Client) Retweeters(tweetId string, userId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
// ctx is generated here only to use with Ratelimiter // ctx is generated here only to use with Ratelimiter
// TODO: Fix performance by removing unneeded allocaton here // TODO: Fix performance by removing unneeded allocaton here
...@@ -183,7 +183,7 @@ func (c *Client) Retweeters(tweetId string, next string) ([]*twitter.UserObj, st ...@@ -183,7 +183,7 @@ func (c *Client) Retweeters(tweetId string, next string) ([]*twitter.UserObj, st
} }
func (c *Client) TweetLikingUsers(tweetId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) { func (c *Client) TweetLikingUsers(tweetId string, userId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
// ctx is generated here only to use with Ratelimiter // ctx is generated here only to use with Ratelimiter
// TODO: Fix performance by removing unneeded allocaton here // TODO: Fix performance by removing unneeded allocaton here
......
...@@ -38,6 +38,7 @@ require ( ...@@ -38,6 +38,7 @@ require (
github.com/go-playground/validator/v10 v10.20.0 // indirect github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect github.com/goccy/go-json v0.10.2 // indirect
github.com/google/uuid v1.6.0 // indirect github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect
...@@ -77,7 +78,7 @@ require ( ...@@ -77,7 +78,7 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.56.0 // indirect github.com/valyala/fasthttp v1.56.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect
github.com/xueqianLu/twitter-bee v0.0.0-20250111162052-eadf23586222 // indirect github.com/xueqianLu/twitter-bee v0.1.0-r // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect
golang.org/x/arch v0.8.0 // indirect golang.org/x/arch v0.8.0 // indirect
......
...@@ -55,6 +55,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN ...@@ -55,6 +55,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
...@@ -173,6 +175,8 @@ github.com/xueqianLu/twitter-bee v0.0.0-20241213092233-9a0472c44890 h1:DrNzTS0w4 ...@@ -173,6 +175,8 @@ github.com/xueqianLu/twitter-bee v0.0.0-20241213092233-9a0472c44890 h1:DrNzTS0w4
github.com/xueqianLu/twitter-bee v0.0.0-20241213092233-9a0472c44890/go.mod h1:gPCAcKZyfZpAg1WkDpOQWUnN0ZOJTMmwczlnGqQ+xYU= github.com/xueqianLu/twitter-bee v0.0.0-20241213092233-9a0472c44890/go.mod h1:gPCAcKZyfZpAg1WkDpOQWUnN0ZOJTMmwczlnGqQ+xYU=
github.com/xueqianLu/twitter-bee v0.0.0-20250111162052-eadf23586222 h1:7Sy/YL75xYart8lBx41cjc2kD6v9btDLabFaFsqMRrg= github.com/xueqianLu/twitter-bee v0.0.0-20250111162052-eadf23586222 h1:7Sy/YL75xYart8lBx41cjc2kD6v9btDLabFaFsqMRrg=
github.com/xueqianLu/twitter-bee v0.0.0-20250111162052-eadf23586222/go.mod h1:gPCAcKZyfZpAg1WkDpOQWUnN0ZOJTMmwczlnGqQ+xYU= github.com/xueqianLu/twitter-bee v0.0.0-20250111162052-eadf23586222/go.mod h1:gPCAcKZyfZpAg1WkDpOQWUnN0ZOJTMmwczlnGqQ+xYU=
github.com/xueqianLu/twitter-bee v0.1.0-r h1:48ORd/qMjaYX2antV+IY5oX0ba+KAxh7zrngFe0ORqI=
github.com/xueqianLu/twitter-bee v0.1.0-r/go.mod h1:OjMtMK7G+4dVFJWtY/suKIpIV5h0HteKXqAG2cMo7Ug=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
......
...@@ -133,25 +133,17 @@ func NewPageUsers(idx *Idx) *PageUsers { ...@@ -133,25 +133,17 @@ func NewPageUsers(idx *Idx) *PageUsers {
} }
} }
type req func(tweetId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) type req func(tweetId string, id string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error)
/* /*
TODO 最后一页的标识,没有处理; TODO 最后一页的标识,没有处理;
*/ */
func (p *PageUsers) Request(tweetId string, next string, f req) ([]UserTask, error) { func (p *PageUsers) Request(tweetId string, id string, next string, f req) ([]UserTask, error) {
users, newNext, rt, err := f(tweetId, id, next)
//users, next, rt, err := p.cli.TweetLikingUsers(tweetId, next)
users, newNext, rt, err := f(tweetId, next)
if err != nil { if err != nil {
return nil, err return nil, err
} }
slog.Info("Request", "tweet id", tweetId, "next", next, "newnext", newNext, "len(users)", len(users)) slog.Info("Request", "tweet id", tweetId, "next", next, "newnext", newNext, "len(users)", len(users))
//if len(users) > 0 && tweetId == "gokunocool" {
// for _, v := range users {
// fmt.Println("quest user id", v.ID, "user name", v.UserName)
// }
//}
taskUser := userObjectToUserTask(users) taskUser := userObjectToUserTask(users)
...@@ -175,7 +167,7 @@ func (p *PageUsers) Request(tweetId string, next string, f req) ([]UserTask, err ...@@ -175,7 +167,7 @@ func (p *PageUsers) Request(tweetId string, next string, f req) ([]UserTask, err
time.Sleep(waitTime) time.Sleep(waitTime)
} }
return p.Request(tweetId, newNext, f) return p.Request(tweetId, id, newNext, f)
} }
func userObjectToUserTask(pageUsers []*twitter.UserObj) []UserTask { func userObjectToUserTask(pageUsers []*twitter.UserObj) []UserTask {
......
...@@ -4,20 +4,17 @@ import ( ...@@ -4,20 +4,17 @@ import (
"fmt" "fmt"
"github.com/g8rswimmer/go-twitter/v2" "github.com/g8rswimmer/go-twitter/v2"
"github.com/xueqianLu/twitter-bee/client" "github.com/xueqianLu/twitter-bee/client"
"golang.org/x/time/rate"
"sync" "sync"
"time"
) )
type ClientWithRateLimiter struct { type ClientWithRateLimiter struct {
*client.BeeClient *client.BeeClient
RateLimit *rate.Limiter
} }
type Swarm struct { type Swarm struct {
clients map[string]*ClientWithRateLimiter clients map[string]*ClientWithRateLimiter
mu sync.Mutex mu sync.Mutex
countmu sync.Mutex profilemu sync.Mutex
followermu sync.Mutex followermu sync.Mutex
} }
...@@ -58,7 +55,6 @@ func (s *Swarm) AddClient(url string) { ...@@ -58,7 +55,6 @@ func (s *Swarm) AddClient(url string) {
defer s.mu.Unlock() defer s.mu.Unlock()
cli := new(ClientWithRateLimiter) cli := new(ClientWithRateLimiter)
cli.BeeClient = client.NewBeeClient(url) cli.BeeClient = client.NewBeeClient(url)
cli.RateLimit = rate.NewLimiter(rate.Every(15*time.Minute), 40)
s.clients[url] = cli s.clients[url] = cli
} }
...@@ -69,34 +65,31 @@ func (s *Swarm) RemoveClient(url string) { ...@@ -69,34 +65,31 @@ func (s *Swarm) RemoveClient(url string) {
delete(s.clients, url) delete(s.clients, url)
} }
func (s *Swarm) GetFollowerCount(userID string) (int, error) { func (s *Swarm) GetUserProfile(name string) (string, int, error) {
clients := s.copyedClients() clients := s.copyedClients()
s.countmu.Lock() s.profilemu.Lock()
defer s.countmu.Unlock() defer s.profilemu.Unlock()
for _, cli := range clients { for _, cli := range clients {
res, err := cli.GetFollowerCount(userID) res, err := cli.GetUserProfile(name)
if err == nil { if err == nil {
fmt.Printf("get %v follower count %d\n", userID, res.Count) fmt.Println("get user profile", res)
return res.Count, nil return res.Id, res.Follower, nil
} else { } else {
fmt.Println("get follower count failed with err", err.Error()) fmt.Println("get user profile failed with err", err.Error())
} }
} }
return 0, fmt.Errorf("can not get the %v follower count", userID) return "", 0, fmt.Errorf("can not get the %v profile", name)
} }
func (s *Swarm) GetFollowerList(user string, cursor string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) { func (s *Swarm) GetFollowerList(user string, id string, cursor string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
clients := s.copyedClients() clients := s.copyedClients()
s.followermu.Lock() s.followermu.Lock()
defer s.followermu.Unlock() defer s.followermu.Unlock()
for _, cli := range clients { for _, cli := range clients {
//if cli.RateLimit.Allow() == false { res, err := cli.GetFollowerList(user, id, cursor)
// continue
//}
res, err := cli.GetFollowerList(user, cursor)
if err == nil { if err == nil {
list := make([]*twitter.UserObj, 0, len(res.List)) list := make([]*twitter.UserObj, 0, len(res.List))
for _, u := range res.List { for _, u := range res.List {
......
...@@ -83,7 +83,7 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} { ...@@ -83,7 +83,7 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} {
cli := swarm.GetSwarm() cli := swarm.GetSwarm()
secondTicker := time.NewTicker(time.Second * 3) secondTicker := time.NewTicker(time.Minute * 3)
fiveMinutesTicker := time.NewTicker(time.Minute * 1) fiveMinutesTicker := time.NewTicker(time.Minute * 1)
halfHourTicker := time.NewTicker(time.Minute * 30) halfHourTicker := time.NewTicker(time.Minute * 30)
...@@ -130,7 +130,7 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} { ...@@ -130,7 +130,7 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} {
//recordFc = make(map[string]int) //recordFc = make(map[string]int)
case <-secondTicker.C: case <-secondTicker.C:
fc, err := cli.GetFollowerCount(t.TaskId) _, fc, err := cli.GetUserProfile(t.TaskId)
if err != nil { if err != nil {
slog.Error("TryProfileFollowerCount", "err", err.Error()) slog.Error("TryProfileFollowerCount", "err", err.Error())
...@@ -141,7 +141,7 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} { ...@@ -141,7 +141,7 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} {
Date: time.Now(), Date: time.Now(),
FollowCount: fc, FollowCount: fc,
}) })
secondTicker.Reset(time.Minute * 1) secondTicker.Reset(time.Minute * 3)
// 先用和like retweet一样的周期模式; // 先用和like retweet一样的周期模式;
...@@ -210,7 +210,7 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} { ...@@ -210,7 +210,7 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} {
func Request(f req, page *PageUsers, t TaskJob) error { func Request(f req, page *PageUsers, t TaskJob) error {
users, err := page.Request(t.TaskId, "", f) users, err := page.Request(t.TaskId, t.UserId, "", f)
if err != nil { if err != nil {
return err return err
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment