Commit 55dfb842 authored by Ubuntu's avatar Ubuntu

add main stream

parent a6620d2c
......@@ -6,14 +6,14 @@ import (
func TestTwitterTasks(t *testing.T) {
tasks, err := GetTasksIdx()
// tasks, err := GetTasksIdx()
if err != nil {
t.Fatal(err)
}
// if err != nil {
// t.Fatal(err)
// }
for k, v := range tasks {
t.Log(k, "v.Project", v.Project, "v.TweetId", v.TweetId, "v.Follow", v.Follow)
}
// for k, v := range tasks {
// t.Log(k, "v.Project", v.Project, "v.TweetId", v.TweetId, "v.Follow", v.Follow)
// }
}
package main
import (
"log/slog"
"time"
twitterscraper "github.com/imperatrona/twitter-scraper"
)
func newSync() error {
done := make(<-chan interface{})
connStream := make(chan taskInterface, 1000)
idxStream := TaskIdx(done, connStream)
resStream, err := InitResource(users)
if err != nil {
return err
}
taskFetchStream, resTailStream := TaskImplement(done, idxStream, resStream)
ConnectTailTask(done, connStream, taskFetchStream)
ConnectTailResource(done, resStream, resTailStream)
select {}
}
func TaskIdx(done <-chan interface{}, inStream <-chan taskInterface) <-chan taskInterface {
outStream := make(chan taskInterface, 1000)
return outStream
}
type ScraperTimer struct {
Scraper *twitterscraper.Scraper
Timer time.Timer
}
func InitResource(users []TwitterAccount) (chan ScraperTimer, error) {
outStream := make(chan ScraperTimer, 1000)
for _, v := range users {
scraper, err := InitScraper(v.User, v.PassWd)
if err != nil {
return nil, err
}
newScraperTimer := ScraperTimer{
Scraper: scraper,
Timer: *time.NewTimer(0),
}
outStream <- newScraperTimer
}
return outStream, nil
}
func AvailableResource(done <-chan interface{}, inStream <-chan ScraperTimer) <-chan ScraperTimer {
outStream := make(chan ScraperTimer, 1000)
go func() {
for {
select {
case <-done:
return
case scraper := <-inStream:
//TODO
// 1. 携带web服务器的rate limit 做判断;
// 2. 校验 是否可用
<-scraper.Timer.C
//scraper.Timer = *time.NewTimer(30 * time.Second)
scraper.Timer.Reset(30 * time.Second)
outStream <- scraper
}
}
}()
return outStream
}
func TaskImplement(done <-chan interface{}, inTaskStream <-chan taskInterface, inResourceStream <-chan ScraperTimer) (<-chan taskInterface, <-chan ScraperTimer) {
taskOutStream := make(chan taskInterface, 1000)
scraperOutStream := make(chan ScraperTimer, 1000)
go func() {
for {
select {
case <-done:
return
case task := <-inTaskStream:
select {
case <-done:
return
case res := <-inResourceStream:
if err := task.Fetch(res.Scraper); err != nil {
slog.Error("task.Fetch", "err", err.Error())
}
}
}
}
}()
return taskOutStream, scraperOutStream
}
func ConnectTailTask(done <-chan interface{}, beginStream chan<- taskInterface, tailStream <-chan taskInterface) {
go func() {
for {
select {
case <-done:
return
case beginStream <- <-tailStream:
}
}
}()
}
func ConnectTailResource(done <-chan interface{}, beginStream chan<- ScraperTimer, tailStream <-chan ScraperTimer) {
go func() {
for {
select {
case <-done:
return
case beginStream <- <-tailStream:
}
}
}()
}
package main
import "testing"
func add1[T int | float32 | float64](a, b T) T {
c := a + b
return c
}
func TestChan(t *testing.T) {
// 创建一个可以容纳任何类型的通道
ch := make(chan any, 10)
// 使用通道
ch <- 123 // 可以存放int类型
number := <-ch // 读取通道,number类型为any
t.Logf("type %T %v \n", number, number)
ch <- "hello" // 可以存放string类型
number = <-ch // 读取通道,number类型为any
t.Logf("type %T %v \n", number, number)
}
/*
// 定义
type Struct1 [T string|int|float64] struct {
Title string
Content T
}
// 实例化
test:= Struct1[string]{
Title:"aaa"
Content :"bbb"
}
// 定义
type MyMap[K string | int, V string | int] map[K]V
// 实例化
test := MyMap[string, int]{
"1": 1,
"2": 2,
}
fmt.Println(test["1"])
type gchan[T any] chan T
实例化
strchan := make(gchan[string])
*/
//type gchan[T FollowTask | RetweetTask] chan T
//NewTask
//func InitTask(done <-chan interface{}, instream <-chan any) {
package main
import (
b64 "encoding/base64"
"strings"
twitterscraper "github.com/imperatrona/twitter-scraper"
)
......@@ -24,3 +27,111 @@ type Profile struct {
UserIdAsNumber string
*twitterscraper.Profile
}
type NewTask[T FollowTask | RetweetTask] struct {
Task T
Init bool
}
type FollowTask struct {
URL string
UserId string
Next string
Res Users
Scraper *twitterscraper.Scraper
}
type taskInterface interface {
Fetch(scraper *twitterscraper.Scraper) error
SetScraper(scraper *twitterscraper.Scraper)
SetScraperNil()
}
// func (f *FollowTask) URl() string {
// return f.URL
// }
func (f *FollowTask) ID() string {
return f.UserId
}
func (f *FollowTask) SetScraper(scraper *twitterscraper.Scraper) {
f.Scraper = scraper
}
func (f *FollowTask) SetScraperNil() {
f.Scraper = nil
}
func (f *FollowTask) Fetch(scraper *twitterscraper.Scraper) error {
users, newNext, err := scraper.FetchFollowers(f.UserId, 20, f.Next)
if err != nil {
return err
}
usersWithUserNumber := make([]Profile, 0, len(users))
for _, v := range users {
sDec, _ := b64.StdEncoding.DecodeString(v.UserID)
userId, _ := strings.CutPrefix(string(sDec), "User:")
item := Profile{
UserIdAsNumber: userId,
Profile: v,
}
usersWithUserNumber = append(usersWithUserNumber, item)
}
res := Users{
Profiles: usersWithUserNumber,
Current: f.Next,
Next: newNext,
}
f.Res = res
return nil
}
type RetweetTask struct {
URL string
TweetId string
Next string
Scraper *twitterscraper.Scraper
}
// func (r *RetweetTask) URl() string {
// return r.URL
// }
func (f *RetweetTask) ID() string {
return f.TweetId
}
func (f *RetweetTask) SetScraper(scraper *twitterscraper.Scraper) {
f.Scraper = scraper
}
func (f *RetweetTask) Fetch(scraper *twitterscraper.Scraper) error {
return scraper.RetweetsUsers(f.TweetId)
}
func (f *RetweetTask) SetScraperNil() {
f.Scraper = nil
}
type TwitterAccount struct {
User string
PassWd string
}
var users []TwitterAccount = []TwitterAccount{
TwitterAccount{
User: "Wade_Leeeee",
PassWd: "923881393time",
},
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment