Commit 1219bc0b authored by Ubuntu's avatar Ubuntu

sync followers ok

parent c8ae0af4
...@@ -39,7 +39,9 @@ func newSync(done <-chan interface{}) error { ...@@ -39,7 +39,9 @@ func newSync(done <-chan interface{}) error {
return err return err
} }
taskFetchStream, resTailStream := TaskImplement(done, idxStream, resStream) availRes := AvailableResource(done, resStream)
taskFetchStream, resTailStream := TaskImplement(done, idxStream, availRes)
ConnectTailTask(done, connStream, taskFetchStream) ConnectTailTask(done, connStream, taskFetchStream)
...@@ -64,29 +66,29 @@ func TaskIdx(done <-chan interface{}, inStream <-chan taskInterface) (<-chan tas ...@@ -64,29 +66,29 @@ func TaskIdx(done <-chan interface{}, inStream <-chan taskInterface) (<-chan tas
return return
case task := <-inStream: case task := <-inStream:
slog.Info("TaskIdx", "task.ID()", task.ID(), "task.InitIdx()", task.InitIdx()) slog.Info("TaskIdx", "task.ID()", task.ID(), "task.UserName()", task.UserName(), "task.InitIdx()", task.InitIdx())
if task.InitIdx() { if !task.InitIdx() {
select {
case <-done:
return
case outStream <- task:
}
continue
}
list, ok := task.UpdateIdx() list, ok := task.UpdateIdx()
if ok {
select {
case <-done:
return
case outListStream <- TaskIdAndList{
TaskId: task.ID(),
List: list,
}:
}
if ok {
select {
case <-done:
return
case outListStream <- TaskIdAndList{
TaskId: task.ID(),
List: list,
}:
} }
}
select {
case <-done:
return
case outStream <- task:
} }
} }
...@@ -96,18 +98,16 @@ func TaskIdx(done <-chan interface{}, inStream <-chan taskInterface) (<-chan tas ...@@ -96,18 +98,16 @@ func TaskIdx(done <-chan interface{}, inStream <-chan taskInterface) (<-chan tas
return outStream, outListStream return outStream, outListStream
} }
type TaskIdAndList struct {
type TaskIdAndList struct{ TaskId string
TaskId string List *list.List
List *list.List
} }
type TaskIdAndProfiles struct{ type TaskIdAndProfiles struct {
TaskId string TaskId string
Profiles []Profile Profiles []Profile
} }
func BackListToQueue(done <-chan interface{}, inStream <-chan TaskIdAndList) <-chan TaskIdAndProfiles { func BackListToQueue(done <-chan interface{}, inStream <-chan TaskIdAndList) <-chan TaskIdAndProfiles {
outStream := make(chan TaskIdAndProfiles, 1) outStream := make(chan TaskIdAndProfiles, 1)
...@@ -145,7 +145,7 @@ func BackListToQueue(done <-chan interface{}, inStream <-chan TaskIdAndList) <-c ...@@ -145,7 +145,7 @@ func BackListToQueue(done <-chan interface{}, inStream <-chan TaskIdAndList) <-c
return return
case outStream <- TaskIdAndProfiles{ case outStream <- TaskIdAndProfiles{
Profiles: res, Profiles: res,
TaskId: users.TaskId, TaskId: users.TaskId,
}: }:
res = make([]Profile, 0, users.List.Len()) res = make([]Profile, 0, users.List.Len())
...@@ -162,7 +162,7 @@ func BackListToQueue(done <-chan interface{}, inStream <-chan TaskIdAndList) <-c ...@@ -162,7 +162,7 @@ func BackListToQueue(done <-chan interface{}, inStream <-chan TaskIdAndList) <-c
return return
case outStream <- TaskIdAndProfiles{ case outStream <- TaskIdAndProfiles{
Profiles: res, Profiles: res,
TaskId: users.TaskId, TaskId: users.TaskId,
}: }:
} }
...@@ -177,7 +177,7 @@ func BackListToQueue(done <-chan interface{}, inStream <-chan TaskIdAndList) <-c ...@@ -177,7 +177,7 @@ func BackListToQueue(done <-chan interface{}, inStream <-chan TaskIdAndList) <-c
return outStream return outStream
} }
func InsertOrUpdateUsers(done <-chan interface{}, inStream <-chan TaskIdAndProfiles ) error { func InsertOrUpdateUsers(done <-chan interface{}, inStream <-chan TaskIdAndProfiles) error {
// client, err := supabase.NewClient(API_URL, API_KEY, nil) // client, err := supabase.NewClient(API_URL, API_KEY, nil)
// if err != nil { // if err != nil {
...@@ -207,7 +207,7 @@ func InsertOrUpdateUsers(done <-chan interface{}, inStream <-chan TaskIdAndProfi ...@@ -207,7 +207,7 @@ func InsertOrUpdateUsers(done <-chan interface{}, inStream <-chan TaskIdAndProfi
row := Follower{ row := Follower{
Follower: userId, Follower: userId,
UserName: user.Username, UserName: user.Username,
UserId:users.TaskId, UserId: users.TaskId,
} }
rows = append(rows, row) rows = append(rows, row)
} }
...@@ -301,9 +301,12 @@ func AvailableResource(done <-chan interface{}, inStream <-chan ScraperTimer) <- ...@@ -301,9 +301,12 @@ func AvailableResource(done <-chan interface{}, inStream <-chan ScraperTimer) <-
//TODO //TODO
// 1. 携带web服务器的rate limit 做判断; // 1. 携带web服务器的rate limit 做判断;
// 2. 校验 是否可用 // 2. 校验 是否可用
slog.Info("AvailableResource------------------------------")
<-scraper.Timer.C <-scraper.Timer.C
//scraper.Timer = *time.NewTimer(30 * time.Second) scraper.Timer = *time.NewTimer(60 * time.Second)
scraper.Timer.Reset(30 * time.Second) //scraper.Timer.Reset(30 * time.Second)
outStream <- scraper outStream <- scraper
} }
} }
...@@ -331,7 +334,8 @@ func TaskImplement(done <-chan interface{}, inTaskStream <-chan taskInterface, i ...@@ -331,7 +334,8 @@ func TaskImplement(done <-chan interface{}, inTaskStream <-chan taskInterface, i
return return
case res := <-inResourceStream: case res := <-inResourceStream:
fmt.Println("TaskImplement", task.ID()) slog.Info("TaskImplement", "task.ID()", task.ID(), "task.UserName()", task.UserName(), "len(inResourceStream)", len(inResourceStream))
if err := task.Fetch(res.Scraper); err != nil { if err := task.Fetch(res.Scraper); err != nil {
slog.Error("task.Fetch", "err", err.Error()) slog.Error("task.Fetch", "err", err.Error())
} }
......
...@@ -82,8 +82,8 @@ func GetTasksIdx() ([]taskInterface, error) { ...@@ -82,8 +82,8 @@ func GetTasksIdx() ([]taskInterface, error) {
return nil, err return nil, err
} }
followTask := NewFollowTask() followTask := NewFollowTask(task.TaskId, task.User)
followTask.UserId = task.TaskId
followTask.Idx = userRes followTask.Idx = userRes
if len(userRes) == 0 { if len(userRes) == 0 {
......
...@@ -39,7 +39,8 @@ type FollowTask struct { ...@@ -39,7 +39,8 @@ type FollowTask struct {
// URL string // URL string
Init bool Init bool
UserId string UserIdStr string
UserNameStr string
Next string Next string
Idx []FollowerId Idx []FollowerId
NewIdx []FollowerId NewIdx []FollowerId
...@@ -50,16 +51,19 @@ type FollowTask struct { ...@@ -50,16 +51,19 @@ type FollowTask struct {
//Scraper *twitterscraper.Scraper //Scraper *twitterscraper.Scraper
} }
func NewFollowTask() *FollowTask { func NewFollowTask(userId, userName string) *FollowTask {
return &FollowTask{ return &FollowTask{
backPushPop: list.New(), backPushPop: list.New(),
Idx: make([]FollowerId, 0), Idx: make([]FollowerId, 0),
UserIdStr: userId,
UserNameStr: userName,
} }
} }
type taskInterface interface { type taskInterface interface {
ID() string ID() string
UserName() string
Fetch(scraper *twitterscraper.Scraper) error Fetch(scraper *twitterscraper.Scraper) error
InitIdx() bool InitIdx() bool
UpdateIdx() (*list.List, bool) UpdateIdx() (*list.List, bool)
...@@ -79,7 +83,11 @@ func (f *FollowTask) InitIdx() bool { ...@@ -79,7 +83,11 @@ func (f *FollowTask) InitIdx() bool {
} }
func (f *FollowTask) ID() string { func (f *FollowTask) ID() string {
return f.UserId return f.UserIdStr
}
func (f *FollowTask) UserName() string {
return f.UserNameStr
} }
func (f *FollowTask) UpdateIdx() (*list.List, bool) { func (f *FollowTask) UpdateIdx() (*list.List, bool) {
...@@ -106,6 +114,8 @@ func (f *FollowTask) UpdateIdx() (*list.List, bool) { ...@@ -106,6 +114,8 @@ func (f *FollowTask) UpdateIdx() (*list.List, bool) {
f.backPushPop.PushFront(v) f.backPushPop.PushFront(v)
} }
slog.Info("MatchIdx", "f.UserIdStr", f.UserIdStr, "f.UserNameStr", f.UserNameStr, "match", ok, "len(profiles)", len(profiles))
if !ok { if !ok {
f.Next = f.Res.Next f.Next = f.Res.Next
return nil, false return nil, false
...@@ -149,7 +159,7 @@ func (f *FollowTask) Fetch(scraper *twitterscraper.Scraper) error { ...@@ -149,7 +159,7 @@ func (f *FollowTask) Fetch(scraper *twitterscraper.Scraper) error {
// f.UserId = "OnlyDD_D" // f.UserId = "OnlyDD_D"
users, newNext, err := scraper.FetchFollowersByUserID(f.UserId, 20, f.Next) users, newNext, err := scraper.FetchFollowersByUserID(f.UserIdStr, 20, f.Next)
//users, newNext, err := scraper.FetchFollowers(f.UserId, 20, f.Next) //users, newNext, err := scraper.FetchFollowers(f.UserId, 20, f.Next)
...@@ -175,7 +185,7 @@ func (f *FollowTask) Fetch(scraper *twitterscraper.Scraper) error { ...@@ -175,7 +185,7 @@ func (f *FollowTask) Fetch(scraper *twitterscraper.Scraper) error {
Next: newNext, Next: newNext,
} }
slog.Info("follow fetch", "res.Current", res.Current, "res.Next", res.Next, "len(res.Profiles)", len(res.Profiles)) slog.Info("follow fetch", "f.UserIdStr", f.UserIdStr, "f.UserNameStr", f.UserNameStr, "res.Current", res.Current, "res.Next", res.Next, "len(res.Profiles)", len(res.Profiles))
f.Res = res f.Res = res
...@@ -184,10 +194,11 @@ func (f *FollowTask) Fetch(scraper *twitterscraper.Scraper) error { ...@@ -184,10 +194,11 @@ func (f *FollowTask) Fetch(scraper *twitterscraper.Scraper) error {
} }
type RetweetTask struct { type RetweetTask struct {
URL string //URL string
TweetId string TweetId string
Next string UserNameStr string
Idx []FollowerId Next string
Idx []FollowerId
// backPushPop = list.New() // backPushPop = list.New()
backPushPop list.List backPushPop list.List
...@@ -197,6 +208,10 @@ type RetweetTask struct { ...@@ -197,6 +208,10 @@ type RetweetTask struct {
// return r.URL // return r.URL
// } // }
func (f *RetweetTask) UserName() string {
return f.UserNameStr
}
func (f *RetweetTask) ID() string { func (f *RetweetTask) ID() string {
return f.TweetId return f.TweetId
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment