Commit b6278435 authored by vicotor's avatar vicotor

add comment and update code.

parent 606ed130
......@@ -75,15 +75,6 @@ func GetLoginAccount() ([]*twitterscraper.Scraper, error) {
}
cookies := scraper.GetCookies()
// data, err := json.Marshal(cookies)
// if err != nil {
// slog.Error("cookies json.Marshal", "err", err.Error())
// continue
// }
//fmt.Println("user cookies", v.Username, v.Email, string(data))
if err := UpdateCookies(v.Username, cookies); err != nil {
slog.Error("cookies UpdateCookies", "err", err.Error())
continue
......@@ -122,10 +113,9 @@ type FollowerRateLimit struct {
func (f *FollowerRateLimit) TryProfileFollowerCount(username string) (int, error) {
i := 0
tryCount := 0
for {
if i > 10 {
if tryCount > 10 {
return 0, fmt.Errorf("can not get the %v follower count", username)
}
......@@ -137,7 +127,9 @@ func (f *FollowerRateLimit) TryProfileFollowerCount(username string) (int, error
// twitterscraper.GetGuestToken()
// c.Scraper = twitterscraper.New()
slog.Error("ProfileFollowerCount", "err", err.Error())
time.Sleep(time.Second * time.Duration(i))
tryCount++
time.Sleep(time.Second * time.Duration(tryCount))
continue
}
......@@ -153,8 +145,6 @@ func (f *FollowerRateLimit) ProfileFollowerCount(username string) (int, error) {
f.Scraper = twitterscraper.New()
}
//c.Scraper.GetProfile("aon_aonet")
pro, err := f.Scraper.GetProfile(username)
if err != nil {
......@@ -167,26 +157,19 @@ func (f *FollowerRateLimit) ProfileFollowerCount(username string) (int, error) {
//= rate.NewLimiter(rate.Every(15*time.Minute), 40)
func (f *FollowerRateLimit) Follower(userName string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
func (f *FollowerRateLimit) Follower(userName string, cursor string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
fmt.Println("Follower request-------------")
slog.Info("Follower request", "userName", userName, "cursor", cursor)
//FollowerRateLimit.
ctx := context.Background()
err := f.RateLimit.Wait(ctx) // This is a blocking call.
//err := c.RetweeterRatelimiter.Wait(ctx) // This is a blocking call.
if err != nil {
if err := f.RateLimit.Wait(ctx); err != nil { // This is a blocking call.
return nil, "", nil, err
}
select {
case account := <-accChan:
//users, next, err := account.WithDelay(300).FetchFollowers("bitcoin", 20, "")
users, next, err := account.FetchFollowers(userName, 20, "")
users, next, err := account.FetchFollowers(userName, 1000, cursor)
if err != nil {
slog.Error(err.Error())
}
......@@ -194,7 +177,6 @@ func (f *FollowerRateLimit) Follower(userName string, next string) ([]*twitter.U
res := make([]*twitter.UserObj, 0, len(users))
for _, v := range users {
sDec, _ := base64.StdEncoding.DecodeString(v.UserID)
userId, _ := strings.CutPrefix(string(sDec), "User:")
......@@ -206,7 +188,6 @@ func (f *FollowerRateLimit) Follower(userName string, next string) ([]*twitter.U
}
accChan <- account
return res, next, nil, nil
return res, next, nil, err
}
}
......@@ -147,7 +147,7 @@ func CheckTaskExist(userId, taskId, taskType string) (bool, error) {
}
func CheckFollowerTaskAndAccountCount() (bool, error) {
// check follower task count need < available account count.
fc, err := FollowerTaskCount()
if err != nil {
return false, err
......
......@@ -203,8 +203,9 @@ func TaskAdd(c *fiber.Ctx) error {
}
fc := 0
if req.TaskType == FollowType {
if req.TaskType == FollowType { // do some check.
// follower task count < available account count.
ok, err := CheckFollowerTaskAndAccountCount()
if err != nil {
......@@ -238,9 +239,6 @@ func TaskAdd(c *fiber.Ctx) error {
Msg: "QueryProjectByUserIdAndName !ok",
})
}
//NewFollowerOb()
//var err error
//fc, err = NewFollowClient().TryProfileFollowerCount(req.TaskId)
fc, err = NewFollowerOb().TryProfileFollowerCount(req.TaskId)
......@@ -253,6 +251,7 @@ func TaskAdd(c *fiber.Ctx) error {
}
// create a taskJob.
job := TaskJob{
Config: projects[0].Config,
Idx: make([]UserTask, 0),
......@@ -263,10 +262,9 @@ func TaskAdd(c *fiber.Ctx) error {
FollowerCount: fc,
}
fmt.Println(job.String())
slog.Info("new", "job-", job.String())
err = Worker.AddJob(job)
if err != nil {
slog.Error(" Worker.AddJob", "err", err.Error())
......@@ -277,12 +275,7 @@ func TaskAdd(c *fiber.Ctx) error {
}
slog.Info("add task into db", "user", req.User, "TaskType", req.TaskType, "TaskId", req.TaskId)
// req.AddOrStop = true
err = AddTaskInsert(req, fc)
if err != nil {
if err = AddTaskInsert(req, fc); err != nil {
slog.Error("twitter_syncer insert", "err", err.Error())
return c.JSON(Res{
Code: 500,
......@@ -290,8 +283,6 @@ func TaskAdd(c *fiber.Ctx) error {
})
}
//slog.Info("twitter_syncer insert", "res", string(res))
return c.JSON(Res{
Code: 200,
})
......
......@@ -94,16 +94,13 @@ func (s *Idx) Idx(page []UserTask) (bool, *list.List) {
}
type PageUsers struct {
//cli *Client
idx *Idx
}
func NewPageUsers(idx *Idx) *PageUsers {
return &PageUsers{
//idx: NewIdx(idx),
idx: idx,
// cli: cli,
}
}
......@@ -120,19 +117,20 @@ func (p *PageUsers) Request(tweetId string, next string, f req) ([]UserTask, err
if err != nil {
return nil, err
}
slog.Debug("Request", "tweet id", tweetId, "next", newNext, "len(users)", len(users))
// wait rate limit.
if rt != nil && rt.Remaining == 0 {
time.Sleep(time.Until(rt.Reset.Time().Add(500 * time.Millisecond)))
}
taskUser := userObjectToUserTask(users)
// 查询最新的一页 followers, 去历史中匹配。
// 如果已经与历史记录相连接,表明已经完成了查询,不必再继续查next.
ok, l := p.idx.Idx(taskUser)
if ok {
res := make([]UserTask, 0, l.Len())
for e := l.Front(); e != nil; e = e.Next() {
if user, ok := e.Value.(UserTask); ok {
res = append(res, user)
......@@ -141,7 +139,6 @@ func (p *PageUsers) Request(tweetId string, next string, f req) ([]UserTask, err
return res, nil
}
return p.Request(tweetId, newNext, f)
}
func userObjectToUserTask(pageUsers []*twitter.UserObj) []UserTask {
......@@ -158,354 +155,3 @@ func userObjectToUserTask(pageUsers []*twitter.UserObj) []UserTask {
return res
}
// func scheduler(tick *time.Ticker) {
// for t := range tick.C {
// task(t)
// }
// }
// func task(t time.Time) {
// fmt.Println("hello! printed at ", t)
// }
// func GetFollowTasks() ([]Task, error) {
// data, count, err := client.From("tasks").Select("*", "exact", false).
// Eq("follow", "true").Neq("follow_stop", "true").
// Execute()
// if err != nil {
// return nil, err
// }
// res := make([]Task, 0, count)
// if err := json.Unmarshal(data, &res); err != nil {
// return nil, err
// }
// return res, nil
// }
// follow, retweet
// const FollowType = "follow"
// const RetweetType = "retweet"
// const FollowType = "follow"
// followers
// const RetweetType = "retweet"
// type FollowerId struct {
// Follower
// Id int `json:"id"`
// CreatedAt string `json:"created_at"`
// }
// type Follower struct {
// //user_id
// UserId string `json:"user_id"`
// Follower string `json:"follower_id"`
// UserName string `json:"follower_username"`
// }
// type RetweeterId struct {
// Retweeter
// Id int `json:"id"`
// CreatedAt string `json:"created_at"`
// }
// type Retweeter struct {
// TweetId string `json:"tweet_id"`
// RetweeterId string `json:"retweeter_id"`
// RetweeterUserName string `json:"retweeter_username"`
// }
// func GetTasksIdx() ([]taskInterface, error) {
// tasks, err := QueryAllTask()
// if err != nil {
// return nil, err
// }
// res := make([]taskInterface, 0, 10)
// for _, task := range tasks {
// if task.TaskType == FollowType {
// data, count, err := client.From("followers").Select("", "user_id", false).
// Eq("user_id", task.TaskId).
// Order("id", &postgrest.OrderOpts{
// Ascending: false,
// // NullsFirst bool
// // ForeignTable string
// }).Range(0, 10, "").Execute()
// if err != nil {
// slog.Error("select * from followers error", err)
// return nil, err
// }
// _ = count
// slog.Info("idx data", "user id", task.TaskId, "user name", task.User, "idx", data)
// fmt.Println("idx data", string(data))
// userRes := make([]FollowerId, 0, 10)
// if err := json.Unmarshal(data, &userRes); err != nil {
// return nil, err
// }
// followTask := NewFollowTask(task.TaskId, task.User, task.TaskType)
// followTask.Idx = userRes
// // if len(userRes) == 0 {
// // followTask.Init = true
// // }
// res = append(res, followTask)
// }
// if task.TaskType == RetweetType {
// data, count, err := client.From("retweeters").Select("", "tweet_id", false).
// Eq("tweet_id", task.TaskId).
// Order("id", &postgrest.OrderOpts{
// Ascending: false,
// // NullsFirst bool
// // ForeignTable string
// }).Range(0, 10, "").Execute()
// if err != nil {
// slog.Error("select * from retweeters error", err)
// return nil, err
// }
// _ = count
// slog.Info("idx data", "tweet id", task.TaskId, "user name", task.User, "idx", data)
// fmt.Println("idx data", string(data))
// userRes := make([]RetweeterId, 0, 10)
// if err := json.Unmarshal(data, &userRes); err != nil {
// return nil, err
// }
// retweetTask := NewRetweetTask(task.TaskId, task.User, task.TaskType)
// //followTask := NewFollowTask(task.TaskId, task.User, task.TaskType)
// retweetTask.Idx = userRes
// // if len(userRes) == 0 {
// // followTask.Init = true
// // }
// res = append(res, retweetTask)
// }
// }
// return res, nil
// }
// func FollowersToBackList(done <-chan interface{}, idxss [][]FollowerId) (<-chan *list.List, error) {
// scraper := twitterscraper.New()
// //err := scraper.Login("Wade_Leeeee", "923881393time")
// err := scraper.Login("wuban358369", "123456789T")
// if err != nil {
// fmt.Println("scraper", err.Error())
// return nil, err
// }
// /*
// wuban01@gmail.com
// wuban01
// 1234567890Wuban
// 1234567890Wuban
// // 2346
// 980b
// 641a
// 774d
// 39fb
// 9425
// c6c5
// 3140
// b469
// 0b93
// d1c6
// 6101
// ef6f
// 9fb6
// ae94
// ee6f
// //wuban001
// wuban01@tutamail.com
// 1234567890Wuban
// */
// scraper3 := twitterscraper.New()
// //err = scraper3.Login("tifawe2861@stikezz.com", "123456789T")
// //err = scraper3.Login("wuban358369", "123456789T")
// err = scraper3.Login("Wade_Leeeee", "923881393time")
// if err != nil {
// fmt.Println("scraper3", err.Error())
// return nil, err
// }
// fmt.Println("scraper.IsLoggedIn()", scraper.IsLoggedIn())
// fmt.Println("scraper3.IsLoggedIn()", scraper3.IsLoggedIn())
// userId := "Bitcoin"
// next := ""
// var backPushPop = list.New()
// outStream := make(chan *list.List, 1)
// go func() {
// //newBegin := true
// for _, idxs := range idxss {
// newIdxs := make([]FollowerId, 0, len(idxs))
// c := true
// for {
// c = !c
// var err error
// var users Users
// if c {
// users, err = FetchFollowers(scraper, userId, next)
// if err != nil {
// slog.Error("FetchFollowers", err)
// time.Sleep(time.Second)
// continue
// }
// } else {
// users, err = FetchFollowers(scraper3, userId, next)
// if err != nil {
// slog.Error("FetchFollowers", err)
// time.Sleep(time.Second)
// continue
// }
// }
// // users, err := FetchFollowers(scraper, userId, next)
// // if err != nil {
// // slog.Error("FetchFollowers", err)
// // time.Sleep(time.Second)
// // continue
// // }
// fmt.Println("len(users)-----------------", len(users.Profiles))
// for k, v := range users.Profiles {
// fmt.Println("k", k, "v", v.UserIdAsNumber, v.Username)
// }
// if len(newIdxs) == 0 {
// for k, v := range users.Profiles {
// newIdxs = append(newIdxs, FollowerId{
// Follower: Follower{
// Follower: v.UserIdAsNumber,
// UserName: v.Username,
// },
// })
// if k > 5 {
// break
// }
// }
// }
// profiles, ok := MatchIdx(users, idxs)
// fmt.Println("len(profiles)-----------------", len(profiles))
// for _, v := range profiles {
// backPushPop.PushFront(v)
// }
// fmt.Printf("!ok %v users.Next %s %d len(outStream) %d backPushPop len %d\n", !ok, users.Next, len(users.Next), len(outStream), backPushPop.Len())
// time.Sleep(time.Second * 2)
// if !ok && len(users.Next) != 0 {
// next = users.Next
// continue
// }
// /*
// Api has a global limit on how many requests per second are allowed, don’t make requests more than once per 1.5 seconds from one account.
// Also each endpoint has its own limits, most of them are 150 requests per 15 minutes.
// */
// if backPushPop.Len() == 0 {
// next = ""
// //time.Sleep(time.Second * 20)
// continue
// }
// idxs = newIdxs
// newIdxs = make([]FollowerId, 0, len(idxs))
// select {
// case <-done:
// return
// case outStream <- backPushPop:
// fmt.Println("case outStream <- backPushPop: ------------")
// backPushPop = list.New()
// //time.Sleep(time.Second * 20)
// }
// next = ""
// //time.Sleep(time.Second * 20)
// }
// }
// }()
// return outStream, nil
// }
// func MatchIdx(data Users, idxs []FollowerId) ([]Profile, bool) {
// if len(idxs) == 0 {
// return data.Profiles, false
// }
// for k, v := range data.Profiles {
// for _, idx := range idxs {
// if v.UserIdAsNumber == idx.Follower.Follower {
// return data.Profiles[:k], true
// }
// }
// }
// return data.Profiles, false
// }
......@@ -37,9 +37,6 @@ func (w *Work) StopJob(req AddTaskReq) error {
return fmt.Errorf("%s do not run", key)
}
// done := w.RunJob(t)
// w.Task[t.UserId+"-"+t.TaskType] = done
return nil
}
......@@ -67,7 +64,7 @@ type TimeAndFollowCount struct {
func (w *Work) RunJob(t TaskJob) chan<- interface{} {
slog.Info("exect job", "userid", t.UserId, "task type", t.TaskType, "task id", t.TaskId, "t.FollowerCount", t.FollowerCount)
slog.Info("exec job", "userid", t.UserId, "task type", t.TaskType, "task id", t.TaskId, "t.FollowerCount", t.FollowerCount)
done := make(chan interface{})
go func() {
......@@ -86,7 +83,7 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} {
cli := NewFollowerOb()
secondTicker := time.NewTicker(time.Second * 3)
fiveMinutesTicker := time.NewTicker(time.Minute * 5)
fiveMinutesTicker := time.NewTicker(time.Minute * 1)
halfHourTicker := time.NewTicker(time.Minute * 30)
//recordFc := make(map[string]int)
......@@ -95,17 +92,16 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} {
for {
select {
case _, ok := <-done:
case _, ok := <-done: // task is stopped by api.
if !ok {
return
}
case <-fiveMinutesTicker.C:
slog.Info("case <-fiveMinutesTicker.C:")
maybeFound := false
maybeFound := false // trigger followers request when maybeFound is true.
for k, v := range recordFc {
fmt.Println(k, v)
if v.FollowCount != t.FollowerCount {
maybeFound = true
......@@ -118,6 +114,7 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} {
fmt.Println(" t.FollowerCount", t.FollowerCount, "maybeFound", maybeFound)
if maybeFound {
fiveMinutesTicker.Reset(time.Minute * 5)
halfHourTicker.Reset(time.Minute * 30)
if err := Request(cli.Follower, page, t); err != nil {
slog.Error(" page.Request", "task id", t.TaskId, "t.TaskType", t.TaskType, "err", err.Error())
......@@ -150,10 +147,6 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} {
case <-halfHourTicker.C:
slog.Info("case <-halfHourTicker.C:")
//recordFc = make(map[string]int)
//maybeFound := false
for k, v := range recordFc {
fmt.Println(k, v)
......@@ -164,8 +157,6 @@ func (w *Work) RunJob(t TaskJob) chan<- interface{} {
fmt.Println(" t.FollowerCount", t.FollowerCount)
// if maybeFound {
//halfHourTicker = (*time.Ticker)(time.NewTimer()) //Reset(time.Minute * 30)
if err := Request(cli.Follower, page, t); err != nil {
slog.Error(" page.Request", "task id", t.TaskId, "t.TaskType", t.TaskType, "err", err.Error())
continue
......@@ -239,21 +230,3 @@ func Request(f req, page *PageUsers, t TaskJob) error {
return nil
}
// users, err := page.Request(t.TaskId, "", f)
// if err != nil {
// slog.Error(" page.Request", "task id", t.TaskId, "t.TaskType", t.TaskType, "err", err.Error())
// continue
// }
// if err := InsertTaskRes(users, t.TaskType, t.TaskId); err != nil {
// for k, v := range users {
// fmt.Println(k, v.UserId, v.UserName)
// }
// slog.Error("InsertTaskRes", "task id", t.TaskId, "t.TaskType", t.TaskType, "len(users)", len(users), "err", err.Error())
// }
// slog.Info("InsertTaskRes", "task id", t.TaskId, "t.TaskType", t.TaskType, "len(users)", len(users))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment