Commit fadd4f42 authored by Ubuntu's avatar Ubuntu

add project keys and token ok

parent 98d77422
package main
import (
"encoding/json"
"testing"
)
func TestMe(t *testing.T) {
//cli := NewOAuth2Client()
cli := NewOAuth2Client0816()
me, err := cli.Me()
if err != nil {
t.Error(err)
}
t.Log("Me")
meAsJson, err := json.Marshal(me)
if err != nil {
t.Error(err)
}
t.Log(string(meAsJson))
}
// func TestCreateTweet(t *testing.T) {
// cli := NewOAuth2Client()
// //cli := NewOAuth2ClientSelf()
// res, err := cli.CreateTweet("Hello world!")
// if err != nil {
// t.Error(err)
// }
// t.Log("Me")
// meAsJson, err := json.Marshal(res)
// if err != nil {
// t.Error(err)
// }
// t.Log(string(meAsJson))
// }
// func TestFollowers(t *testing.T) {
// //cli := NewOAuth2Client()
// //cli = NewOAuth2ClientSelf()
// cli := NewOAuth2Client0816()
// users, rateLinmit, next, err := cli.Followers("1783145144700874752", "")
// if err != nil {
// t.Error(err)
// }
// t.Log("next", next)
// meAsJson, err := json.Marshal(users)
// if err != nil {
// t.Error(err)
// }
// t.Log(string(meAsJson))
// rAsJson, err := json.Marshal(rateLinmit)
// if err != nil {
// t.Error(err)
// }
// t.Log(string(rAsJson))
// }
//1800805503066661056
func TestLike(t *testing.T) {
//cli := NewOAuth2Client()
cli := NewOAuth2Client0817()
users, next, rateLimit, err := cli.TweetLikedUsers("1800805503066661056", "")
if err != nil {
t.Error(err)
}
t.Log("next", next)
for k, v := range users {
t.Logf("k %v v %v \n", k, v)
}
rAsJson, err := json.Marshal(rateLimit)
if err != nil {
t.Error(err)
}
t.Log(string(rAsJson))
}
func TestRetweet(t *testing.T) {
//cli := NewOAuth2Client()
cli := NewOAuth2Client0816()
users, next, rateLimit, err := cli.Retweeters("1800805503066661056", "")
if err != nil {
t.Error(err)
}
t.Log("next", next)
for k, v := range users {
t.Logf("k %v v %v \n", k, v)
}
rAsJson, err := json.Marshal(rateLimit)
if err != nil {
t.Error(err)
}
t.Log(string(rAsJson))
}
// TweetSearchAll
func TestTweetSearchAll(t *testing.T) {
cli := NewOAuth2Client()
//cli := NewOAuth2Client2()
users, err := cli.TweetTweetLookup("1800805503066661056")
if err != nil {
t.Error(err)
}
rAsJson, err := json.Marshal(users)
if err != nil {
t.Error(err)
}
t.Log(string(rAsJson))
}
...@@ -5,10 +5,10 @@ func Controller(done <-chan interface{}, inStream <-chan taskInterface) (<-chan ...@@ -5,10 +5,10 @@ func Controller(done <-chan interface{}, inStream <-chan taskInterface) (<-chan
return nil, nil return nil, nil
} }
func Idx(done <-chan interface{}, inStream <-chan taskInterface) (<-chan taskInterface, <-chan taskInterface) { // func Idx(done <-chan interface{}, inStream <-chan taskInterface) (<-chan taskInterface, <-chan taskInterface) {
return nil, nil // return nil, nil
} // }
func Scheduler(done <-chan interface{}, inStream <-chan taskInterface) (<-chan taskInterface, <-chan taskInterface) { func Scheduler(done <-chan interface{}, inStream <-chan taskInterface) (<-chan taskInterface, <-chan taskInterface) {
......
...@@ -53,6 +53,8 @@ func getMonthEnd(t time.Time) time.Time { ...@@ -53,6 +53,8 @@ func getMonthEnd(t time.Time) time.Time {
func init() { func init() {
rateLimit = make(map[string]twitter.RateLimit)
rateLimit[ProjectMonthKey] = twitter.RateLimit{ rateLimit[ProjectMonthKey] = twitter.RateLimit{
Limit: 10000, Limit: 10000,
Reset: twitter.Epoch(getMonthEnd(time.Now()).Unix()), Reset: twitter.Epoch(getMonthEnd(time.Now()).Unix()),
......
...@@ -30,10 +30,10 @@ type Profile struct { ...@@ -30,10 +30,10 @@ type Profile struct {
*twitterscraper.Profile *twitterscraper.Profile
} }
type NewTask[T FollowTask | RetweetTask] struct { // type NewTask[T FollowTask | RetweetTask] struct {
Task T // Task T
Init bool // Init bool
} // }
type FollowTask struct { type FollowTask struct {
// URL string // URL string
......
...@@ -24,7 +24,7 @@ func VerifyRetweeterInDb(tweetId, retweeter string) (bool, error) { ...@@ -24,7 +24,7 @@ func VerifyRetweeterInDb(tweetId, retweeter string) (bool, error) {
func AddTaskInsertOrUpdate(req AddTaskReq) error { func AddTaskInsertOrUpdate(req AddTaskReq) error {
task := Task{ task := TaskInDB{
User: req.User, User: req.User,
TaskType: req.TaskType, TaskType: req.TaskType,
TaskId: req.TaskId, TaskId: req.TaskId,
...@@ -55,3 +55,42 @@ func StopTaskUpdate(req AddTaskReq) error { ...@@ -55,3 +55,42 @@ func StopTaskUpdate(req AddTaskReq) error {
return err return err
} }
type ProjectInDb struct {
ProjectReq
UserInfo
Available bool `json:"available"`
}
type UserInfo struct {
UserId string `json:"user_id"`
UserName string `json:"username"`
Name string `json:"name"`
}
func AddOrUpdateProject(cfg ProjectReq, user UserInfo) error {
project := ProjectInDb{
ProjectReq: cfg,
UserInfo: user,
Available: true,
}
res, _, err := client.From("project").Insert(project, true, "", "representation", "").Execute()
_ = res
return err
}
func QueryProject(cfg ProjectReq) (bool, error) {
_, count, err := client.From("project").Select("*", "exact", false).Eq("api_key", cfg.ApiKey).Eq("api_key_secret", cfg.ApiKeySecrect).Eq("token", cfg.Token).Eq("access_token", cfg.AccessToken).Eq("access_token_secret", cfg.AccessTokenSecret).Execute()
if err != nil {
return false, err
}
return count == 1, nil
}
package main
import "golang.org/x/time/rate"
type Task struct {
Idx *PageUsers
cli *Client
}
func NewTask() *Task {
p := NewPageUsers(NewIdx([]UserTask{}))
return &Task{
Idx: p,
}
}
type LikingUserTask struct {
task *Task
taskId string
Ratelimiter *rate.Limiter // 需要和单个或两个的cli rate limit一致;
}
func NewLikingTask() {
}
func (l *LikingUserTask) Request() {
users, err := l.task.Idx.Request("", "", l.task.cli.TweetLikingUsers)
_, _ = users, err
}
type RetweeterTask struct {
task *Task
taskId string
Ratelimiter *rate.Limiter // 需要和单个或两个的cli rate limit一致;
}
func NewRetweeterTask() {
}
func (r *RetweeterTask) Request() {
users, err := r.task.Idx.Request("", "", r.task.cli.TweetLikingUsers)
_, _ = users, err
}
...@@ -32,6 +32,12 @@ type VerifyRes struct { ...@@ -32,6 +32,12 @@ type VerifyRes struct {
} `json:"data"` } `json:"data"`
} }
type ProjectRes struct {
Code int64 `json:"code"`
Msg string `json:"msg"`
Data UserInfo `json:"data"`
}
/* /*
{ {
...@@ -43,76 +49,96 @@ type VerifyRes struct { ...@@ -43,76 +49,96 @@ type VerifyRes struct {
*/ */
type AddTaskReq struct { type ProjectReq struct {
// AddOrStop bool Config
User string `json:"user"` Project string `json:"project"`
TaskType string `json:"task_type"`
TaskId string `json:"task_id"`
} }
var taskIn chan<- taskInterface func Project(c *fiber.Ctx) error {
func TaskAdd(c *fiber.Ctx) error {
//fmt.Println(string(c.Request().Body()))
slog.Info(c.Route().Path, "body", string(c.Request().Body())) slog.Info(c.Route().Path, "body", string(c.Request().Body()))
req := AddTaskReq{} req := ProjectReq{}
if err := json.Unmarshal(c.Request().Body(), &req); err != nil { if err := json.Unmarshal(c.Request().Body(), &req); err != nil {
slog.Error("json.Unmarshal(c.Request().Body(), &req)", "err", err.Error()) slog.Error("json.Unmarshal(c.Request().Body(), &req)", "err", err.Error())
return c.JSON(Res{ return c.JSON(ProjectRes{
Code: 500, Code: 500,
Msg: err.Error(), Msg: err.Error(),
}) })
} }
slog.Info(c.Route().Path, "user", req.User, "TaskType", req.TaskType, "TaskId", req.TaskId) slog.Info("cfg", "project", req.Project, "ApiKey", req.ApiKey, "req.ApiKeySecrect", req.ApiKeySecrect, "req.AccessToken", req.AccessToken, "req.AccessTokenSecret", req.AccessTokenSecret, "req.Token", req.Token)
if req.TaskType == "" || req.TaskId == "" { ok, err := QueryProject(req)
return c.JSON(Res{
if err != nil {
slog.Error("QueryProject", "err", err.Error())
return c.JSON(ProjectRes{
Code: 500, Code: 500,
Msg: "must provide TaskId and TaskType", Msg: err.Error(),
}) })
} }
var task taskInterface if ok {
return c.JSON(ProjectRes{
if req.TaskType == FollowType { Code: 500,
task = NewFollowTask(req.TaskId, req.User, req.TaskType) Msg: "already existed",
} })
if req.TaskType == RetweetType {
task = NewRetweetTask(req.TaskId, req.User, req.TaskType)
} }
taskIn <- task cli := NewClient(req.Config, nil)
//Todo
// 校验任务 条件是否存在;
// req.AddOrStop = true
err := AddTaskInsertOrUpdate(req)
//res, _, err := client.From("twitter_task").Insert(req, true, "", "representation", "").Execute() me, err := cli.Me()
if err != nil { if err != nil {
slog.Error("me", "err", err.Error())
slog.Error("twitter_syncer insert", "err", err.Error()) return c.JSON(ProjectRes{
return c.JSON(Res{
Code: 500, Code: 500,
Msg: err.Error(), Msg: err.Error(),
}) })
} }
//slog.Info("twitter_syncer insert", "res", string(res)) for _, v := range me {
user := UserInfo{
UserId: v.User.ID,
UserName: v.User.UserName,
Name: v.User.Name,
}
if err := AddOrUpdateProject(req, user); err != nil {
slog.Error("insert db ", "err", err.Error())
return c.JSON(ProjectRes{
Code: 500,
Msg: err.Error(),
})
}
return c.JSON(ProjectRes{
Code: 200,
Data: UserInfo{
UserId: v.User.ID,
UserName: v.User.UserName,
Name: v.User.Name,
},
})
}
return c.JSON(Res{ return c.JSON(ProjectRes{
Code: 200, Code: 500,
Msg: "can not find out the user info with me API",
}) })
} }
type AddTaskReq struct {
// AddOrStop bool
User string `json:"user"`
TaskType string `json:"task_type"`
TaskId string `json:"task_id"`
}
func TaskStop(c *fiber.Ctx) error { func TaskStop(c *fiber.Ctx) error {
slog.Info(c.Route().Path, "body", string(c.Request().Body())) slog.Info(c.Route().Path, "body", string(c.Request().Body()))
...@@ -153,30 +179,29 @@ func TaskStop(c *fiber.Ctx) error { ...@@ -153,30 +179,29 @@ func TaskStop(c *fiber.Ctx) error {
} }
func VerifyFollower(c *fiber.Ctx) error { func VerifyRetweeter(c *fiber.Ctx) error {
userId := c.Query("user_id") tweetId := c.Query("tweet_id")
followerId := c.Query("follower_id") retweeterId := c.Query("retweeter_id")
if len(userId) == 0 || len(followerId) == 0 { if len(tweetId) == 0 || len(retweeterId) == 0 {
slog.Error("VerifyFollower", "userId", userId, "followerId", followerId) slog.Error("VerifyFollower", "tweetId", tweetId, "retweeterId", retweeterId)
return c.JSON(Res{ return c.JSON(Res{
Code: 500, Code: 500,
Msg: fmt.Sprintf("must provide userId [%v] and followerId [%v]", userId, followerId), Msg: fmt.Sprintf("must provide tweetId [%v] and retweeterId [%v]", tweetId, retweeterId),
}) })
} }
followerUserName := c.Query("follower_username") slog.Info(c.Route().Path, "tweetId", tweetId, "retweeterId", retweeterId)
slog.Info(c.Route().Path, "user_id", userId, "followerId", followerId, "followerUserName", followerUserName)
ok, err := VerifyFollowerInDb(userId, followerId) ok, err := VerifyRetweeterInDb(tweetId, retweeterId)
if err != nil { if err != nil {
slog.Error("VerifyFollowerInDb", "userId", userId, "followerId", followerId, "err", err.Error()) slog.Error("VerifyRetweeter", "tweetId", tweetId, "retweeterId", retweeterId, "err", err.Error())
return c.JSON(Res{ return c.JSON(Res{
Code: 500, Code: 500,
Msg: fmt.Sprintf("VerifyFollowerInDb userId %v followerId %v err %v", userId, followerId, err.Error()), //Msg: fmt.Sprint("VerifyFollowerInDb", "tweetId", tweetId, "retweeterId", retweeterId, "err", err.Error()),
Msg: fmt.Sprintf("VerifyRetweeter tweetId %v retweeterId %v err %v", tweetId, retweeterId, err.Error()),
}) })
} }
...@@ -191,29 +216,30 @@ func VerifyFollower(c *fiber.Ctx) error { ...@@ -191,29 +216,30 @@ func VerifyFollower(c *fiber.Ctx) error {
} }
func VerifyRetweeter(c *fiber.Ctx) error { func VerifyFollower(c *fiber.Ctx) error {
tweetId := c.Query("tweet_id") userId := c.Query("user_id")
retweeterId := c.Query("retweeter_id") followerId := c.Query("follower_id")
if len(tweetId) == 0 || len(retweeterId) == 0 { if len(userId) == 0 || len(followerId) == 0 {
slog.Error("VerifyFollower", "tweetId", tweetId, "retweeterId", retweeterId) slog.Error("VerifyFollower", "userId", userId, "followerId", followerId)
return c.JSON(Res{ return c.JSON(Res{
Code: 500, Code: 500,
Msg: fmt.Sprintf("must provide tweetId [%v] and retweeterId [%v]", tweetId, retweeterId), Msg: fmt.Sprintf("must provide userId [%v] and followerId [%v]", userId, followerId),
}) })
} }
slog.Info(c.Route().Path, "tweetId", tweetId, "retweeterId", retweeterId) followerUserName := c.Query("follower_username")
ok, err := VerifyRetweeterInDb(tweetId, retweeterId) slog.Info(c.Route().Path, "user_id", userId, "followerId", followerId, "followerUserName", followerUserName)
ok, err := VerifyFollowerInDb(userId, followerId)
if err != nil { if err != nil {
slog.Error("VerifyRetweeter", "tweetId", tweetId, "retweeterId", retweeterId, "err", err.Error()) slog.Error("VerifyFollowerInDb", "userId", userId, "followerId", followerId, "err", err.Error())
return c.JSON(Res{ return c.JSON(Res{
Code: 500, Code: 500,
//Msg: fmt.Sprint("VerifyFollowerInDb", "tweetId", tweetId, "retweeterId", retweeterId, "err", err.Error()), Msg: fmt.Sprintf("VerifyFollowerInDb userId %v followerId %v err %v", userId, followerId, err.Error()),
Msg: fmt.Sprintf("VerifyRetweeter tweetId %v retweeterId %v err %v", tweetId, retweeterId, err.Error()),
}) })
} }
......
package main
import (
"context"
"fmt"
"net/http"
"time"
"github.com/dghubble/oauth1"
twitter "github.com/g8rswimmer/go-twitter/v2"
"golang.org/x/time/rate"
)
/*
GET /2/tweets/:id/liking_users
5 requests / 15 mins
PER USER
25 requests / 15 mins
PER APP
GET /2/users/:id/liked_tweets
5 requests / 15 mins
PER APP
5 requests / 15 mins
PER USER
*/
/*
GET /2/tweets/:id/retweeted_by
5 requests / 15 mins
PER APP
5 requests / 15 mins
PER USER
*/
var LikeRateLimit *rate.Limiter = rate.NewLimiter(rate.Every(3*time.Minute), 1)
var RetweetRateLimit *rate.Limiter = rate.NewLimiter(rate.Every(3*time.Minute), 1)
type authorize struct {
Token string
}
func (a authorize) Add(req *http.Request) {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", a.Token))
}
type Client struct {
*twitter.Client
Ratelimiter *rate.Limiter
}
type Config struct {
ApiKey string `json:"api_key"`
ApiKeySecrect string `json:"api_key_secret"`
AccessToken string `json:"access_token"`
AccessTokenSecret string `json:"access_token_secret"`
Token string `json:"token"`
}
func NewLikeClient(cfg Config) *Client {
return NewClient(cfg, LikeRateLimit)
}
func NewRetweeterClient(cfg Config) *Client {
return NewClient(cfg, RetweetRateLimit)
}
func NewClient(cfg Config, rts *rate.Limiter) *Client {
//twitterAPIKey := "lVnj6Ox9HPcI4LwArSSYU7Pba" //os.Getenv("TWITTER_API_KEY")
//twitterAPIKeySecret := "QMSnWG4QwyXWBVW2hQazzxhw9cSjd32CDfXGkg2DEaUUdscCRZ" //os.Getenv("TWITTER_API_KEY_SECRET")
oauth1Config := oauth1.NewConfig(cfg.ApiKey, cfg.ApiKeySecrect)
//twitterAccessToken := "1783145144700874752-TqHrsFUL20fEz4nz71yYlYVihkGmZn" //os.Getenv("TWITTER_ACCESS_TOKEN")
//twitterAccessTokenSecret := "QDmtDfsiMigTJk1iqoyq8zCHNQJq5zCeC560NH9T5yUZl" //os.Getenv("TWITTER_ACCESS_TOKEN_SECRET")
twitterHttpClient := oauth1Config.Client(oauth1.NoContext, &oauth1.Token{
Token: cfg.AccessToken,
TokenSecret: cfg.AccessTokenSecret,
})
twitterClient := &twitter.Client{
Authorizer: authorize{
Token: cfg.Token,
//Token: "AAAAAAAAAAAAAAAAAAAAAEaPvQEAAAAAWDyrWaIbZHPYeg3ifnvXWdlylvs%3D7XFIO4y2HA0suNxLv570AsaIfmWD4x6XB64zHd9saEqVAhuTMq",
},
Client: twitterHttpClient, //http.DefaultClient,
Host: "https://api.twitter.com",
}
return &Client{
Client: twitterClient,
Ratelimiter: rts,
}
}
func NewOAuth2Client0817() *Client {
twitterAPIKey := "lVnj6Ox9HPcI4LwArSSYU7Pba" //os.Getenv("TWITTER_API_KEY")
twitterAPIKeySecret := "QMSnWG4QwyXWBVW2hQazzxhw9cSjd32CDfXGkg2DEaUUdscCRZ" //os.Getenv("TWITTER_API_KEY_SECRET")
oauth1Config := oauth1.NewConfig(twitterAPIKey, twitterAPIKeySecret)
twitterAccessToken := "1783145144700874752-TqHrsFUL20fEz4nz71yYlYVihkGmZn" //os.Getenv("TWITTER_ACCESS_TOKEN")
twitterAccessTokenSecret := "QDmtDfsiMigTJk1iqoyq8zCHNQJq5zCeC560NH9T5yUZl" //os.Getenv("TWITTER_ACCESS_TOKEN_SECRET")
twitterHttpClient := oauth1Config.Client(oauth1.NoContext, &oauth1.Token{
Token: twitterAccessToken,
TokenSecret: twitterAccessTokenSecret,
})
twitterClient := &twitter.Client{
Authorizer: authorize{
Token: "AAAAAAAAAAAAAAAAAAAAAEaPvQEAAAAAWDyrWaIbZHPYeg3ifnvXWdlylvs%3D7XFIO4y2HA0suNxLv570AsaIfmWD4x6XB64zHd9saEqVAhuTMq",
},
Client: twitterHttpClient, //http.DefaultClient,
Host: "https://api.twitter.com",
}
return &Client{
Client: twitterClient,
}
}
func (c *Client) Retweeters(tweetId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
// ctx is generated here only to use with Ratelimiter
// TODO: Fix performance by removing unneeded allocaton here
ctx := context.Background()
err := c.Ratelimiter.Wait(ctx) // This is a blocking call.
if err != nil {
return nil, "", nil, err
}
opts := twitter.UserRetweetLookupOpts{
Expansions: []twitter.Expansion{twitter.ExpansionPinnedTweetID},
PaginationToken: next,
}
userResponse, err := c.Client.UserRetweetLookup(context.Background(), tweetId, opts)
if err != nil {
return nil, "", nil, err
}
return userResponse.Raw.Users, userResponse.Meta.NextToken, userResponse.RateLimit, nil
}
func (c *Client) TweetLikingUsers(tweetId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
// ctx is generated here only to use with Ratelimiter
// TODO: Fix performance by removing unneeded allocaton here
ctx := context.Background()
err := c.Ratelimiter.Wait(ctx) // This is a blocking call.
if err != nil {
return nil, "", nil, err
}
opts := twitter.TweetLikesLookupOpts{
Expansions: []twitter.Expansion{twitter.ExpansionPinnedTweetID},
TweetFields: []twitter.TweetField{twitter.TweetFieldCreatedAt, twitter.TweetFieldConversationID, twitter.TweetFieldAttachments},
PaginationToken: next,
}
fmt.Println("Callout to tweet like lookup callout")
tweetResponse, err := c.Client.TweetLikesLookup(context.Background(), tweetId, opts)
if err != nil {
return nil, "", nil, err
}
return tweetResponse.Raw.Users, tweetResponse.Meta.NextToken, tweetResponse.RateLimit, nil
}
func (c *Client) Me() (map[string]*twitter.UserDictionary, error) {
// ctx is generated here only to use with Ratelimiter
// TODO: Fix performance by removing unneeded allocaton here
if c.Ratelimiter != nil {
ctx := context.Background()
err := c.Ratelimiter.Wait(ctx) // This is a blocking call.
if err != nil {
return nil, err
}
}
opts := twitter.UserLookupOpts{
Expansions: []twitter.Expansion{twitter.ExpansionPinnedTweetID},
}
userResponse, err := c.Client.AuthUserLookup(context.Background(), opts)
if err != nil {
return nil, err
}
dictionaries := userResponse.Raw.UserDictionaries()
return dictionaries, nil
}
/*
先不考虑单个app cli异常;
确认根据数量 交替,还是根据时间;优先时间;
*/
type TwoClient struct {
cli1 *Client
cli2 *Client
//queue chan *Client
count int
}
func NewTwoClient() {
//先确认,周期是怎么计算的,确定能不能交叉;
}
func (c *TwoClient) Retweeters(tweetId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
//for c := range c.queue {
// return c.Retweeters(tweetId, next)
//}
if c.count < 6 {
return c.cli1.Retweeters(tweetId, next)
}
return c.cli1.Retweeters(tweetId, next)
//return nil, "", nil, fmt.Errorf("two client queue has closed")
}
func (c *TwoClient) TweetLikingUsers(tweetId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
return nil, "", nil, fmt.Errorf("two client queue has closed")
}
func (c *TwoClient) Me() (map[string]*twitter.UserDictionary, error) {
// for c := range c.queue {
// return c.Me()
// }
return nil, fmt.Errorf("two client queue has closed")
}
/*
func NewOAuth2ClientSelf() *Client {
twitterAPIKey := "Ufhj9NggOmRb61LTYUinaDHws" //os.Getenv("TWITTER_API_KEY")
twitterAPIKeySecret := "IfsfhxpyKqmYaEkyB89uH5tT8Ma77FJqrB0BsFN7uUnNX0UZ4B" //os.Getenv("TWITTER_API_KEY_SECRET")
oauth1Config := oauth1.NewConfig(twitterAPIKey, twitterAPIKeySecret)
twitterAccessToken := "1823984946710765569-9Nj7JZaBKQQiTnSyFiOBC3ADnepQqR" //os.Getenv("TWITTER_ACCESS_TOKEN")
twitterAccessTokenSecret := "MltXbwW8Rrb6DJKJo3qnG3lHMUWZ6ILCcqFnujfHrZ875" //os.Getenv("TWITTER_ACCESS_TOKEN_SECRET")
twitterHttpClient := oauth1Config.Client(oauth1.NoContext, &oauth1.Token{
Token: twitterAccessToken,
TokenSecret: twitterAccessTokenSecret,
})
twitterClient := &twitter.Client{
Authorizer: authorize{
Token: "AAAAAAAAAAAAAAAAAAAAAD6AvQEAAAAApN304Hsb89%2FMWG2RoLfSEgb2RS0%3DVQin0pVyPOsOBkCPFoy4wQKOXh3nBvoxMqQ6dc7ulaJ2anvoCm",
},
Client: twitterHttpClient,
Host: "https://api.twitter.com",
}
return &Client{
Client: twitterClient,
}
}
func NewOAuth2Client0816() *Client {
twitterAPIKey := "lVnj6Ox9HPcI4LwArSSYU7Pba" //os.Getenv("TWITTER_API_KEY")
twitterAPIKeySecret := "QMSnWG4QwyXWBVW2hQazzxhw9cSjd32CDfXGkg2DEaUUdscCRZ" //os.Getenv("TWITTER_API_KEY_SECRET")
oauth1Config := oauth1.NewConfig(twitterAPIKey, twitterAPIKeySecret)
twitterAccessToken := "1783145144700874752-TqHrsFUL20fEz4nz71yYlYVihkGmZn" //os.Getenv("TWITTER_ACCESS_TOKEN")
twitterAccessTokenSecret := "QDmtDfsiMigTJk1iqoyq8zCHNQJq5zCeC560NH9T5yUZl" //os.Getenv("TWITTER_ACCESS_TOKEN_SECRET")
twitterHttpClient := oauth1Config.Client(oauth1.NoContext, &oauth1.Token{
Token: twitterAccessToken,
TokenSecret: twitterAccessTokenSecret,
})
twitterClient := &twitter.Client{
Authorizer: authorize{
Token: "AAAAAAAAAAAAAAAAAAAAAEaPvQEAAAAAWDyrWaIbZHPYeg3ifnvXWdlylvs%3D7XFIO4y2HA0suNxLv570AsaIfmWD4x6XB64zHd9saEqVAhuTMq",
},
Client: twitterHttpClient,
Host: "https://api.twitter.com",
}
return &Client{
Client: twitterClient,
}
}
*/
package main
import (
"encoding/json"
"testing"
)
func TestMe(t *testing.T) {
//cli := NewOAuth2Client()
cli := NewOAuth2Client0817()
me, err := cli.Me()
if err != nil {
t.Error(err)
}
t.Log("Me")
meAsJson, err := json.Marshal(me)
if err != nil {
t.Error(err)
}
t.Log(string(meAsJson))
}
func TestCfg(t *testing.T) {
cfg := Config{
ApiKey: "lVnj6Ox9HPcI4LwArSSYU7Pba",
ApiKeySecrect: "QMSnWG4QwyXWBVW2hQazzxhw9cSjd32CDfXGkg2DEaUUdscCRZ",
AccessToken: "1783145144700874752-TqHrsFUL20fEz4nz71yYlYVihkGmZn",
AccessTokenSecret: "QDmtDfsiMigTJk1iqoyq8zCHNQJq5zCeC560NH9T5yUZl",
Token: "AAAAAAAAAAAAAAAAAAAAAEaPvQEAAAAAWDyrWaIbZHPYeg3ifnvXWdlylvs%3D7XFIO4y2HA0suNxLv570AsaIfmWD4x6XB64zHd9saEqVAhuTMq",
}
cfgAsJson, err := json.Marshal(cfg)
if err != nil {
t.Fatal(err)
}
t.Log(string(cfgAsJson))
}
package main package main
import ( import (
"encoding/json"
"log/slog" "log/slog"
_ "code.wuban.net.cn/odysseus/twitter_syncer/docs" _ "code.wuban.net.cn/odysseus/twitter_syncer/docs"
...@@ -33,21 +32,21 @@ func init() { ...@@ -33,21 +32,21 @@ func init() {
} }
} }
func TwitterAccountFromDB() ([]TwitterAccount, error) { // func TwitterAccountFromDB() ([]TwitterAccount, error) {
data, count, err := client.From("account").Select("*", "exact", false).Eq("available", "true").Execute() // data, count, err := client.From("account").Select("*", "exact", false).Eq("available", "true").Execute()
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
slog.Info("TwitterAccountFromDB", "count", count) // slog.Info("TwitterAccountFromDB", "count", count)
res := make([]TwitterAccount, 0, count) // res := make([]TwitterAccount, 0, count)
if err := json.Unmarshal(data, &res); err != nil { // if err := json.Unmarshal(data, &res); err != nil {
return nil, err // return nil, err
} // }
return res, nil // return res, nil
} // }
package main package main
import ( // func TestQueryTask(t *testing.T) {
"testing"
)
func TestQueryTask(t *testing.T) { // tasks, err := QueryAllTask()
tasks, err := QueryAllTask() // if err != nil {
// t.Fatal(err)
if err != nil { // }
t.Fatal(err)
}
for k, v := range tasks { // for k, v := range tasks {
t.Log(k, "v.User", v.User, "v.TaskType", v.TaskType, "v.TaskId", v.TaskId) // t.Log(k, "v.User", v.User, "v.TaskType", v.TaskType, "v.TaskId", v.TaskId)
} // }
} // }
// func TestInsertTask(t *testing.T) { // func TestInsertTask(t *testing.T) {
...@@ -34,16 +30,16 @@ func TestQueryTask(t *testing.T) { ...@@ -34,16 +30,16 @@ func TestQueryTask(t *testing.T) {
// // fmt.Println("found", ok, task) // // fmt.Println("found", ok, task)
// } // }
func TestTwitterAccountFromDB(t *testing.T) { // func TestTwitterAccountFromDB(t *testing.T) {
accounts, err := TwitterAccountFromDB() // accounts, err := TwitterAccountFromDB()
if err != nil { // if err != nil {
t.Fatal(err) // t.Fatal(err)
} // }
for k, v := range accounts { // for k, v := range accounts {
t.Log(k, v.User, v.PassWd) // t.Log(k, v.User, v.PassWd)
} // }
// fmt.Println("found", ok, task) // // fmt.Println("found", ok, task)
} // }
...@@ -61,6 +61,34 @@ x-tagGroups: ...@@ -61,6 +61,34 @@ x-tagGroups:
- store_model - store_model
paths: paths:
/project:
post:
tags:
- Task
requestBody:
content:
application/json:
schema:
type: object
$ref: "#/components/schemas/Project"
responses:
"200":
description: successful operation
content:
application/json:
schema:
type: object
properties:
code:
type: integer
format: int64
enum: [200, 500]
msg:
type: string
data:
type: object
$ref: "#/components/schemas/Me"
/task/add: /task/add:
summary: Create or update a new task in the twitter syncer summary: Create or update a new task in the twitter syncer
post: post:
...@@ -223,6 +251,38 @@ components: ...@@ -223,6 +251,38 @@ components:
write:pets: modify pets in your account write:pets: modify pets in your account
type: oauth2 type: oauth2
schemas: schemas:
Project:
required:
- api_key
- api_key_secret
- access_token
- access_token_secret
- token
- project
type: object
properties:
api_key:
type: string
api_key_secret:
type: string
access_token:
type: string
access_token_secret:
type: string
token:
type: string
project:
type: string
Me:
type: object
properties:
user_id:
type: string
username:
type: string
name:
type: string
AddTaskReq: AddTaskReq:
required: required:
- task_id - task_id
......
...@@ -9,6 +9,7 @@ require ( ...@@ -9,6 +9,7 @@ require (
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect github.com/andybalholm/brotli v1.1.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/dghubble/oauth1 v0.7.3 // indirect
github.com/g8rswimmer/go-twitter/v2 v2.1.5 // indirect github.com/g8rswimmer/go-twitter/v2 v2.1.5 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/jsonreference v0.21.0 // indirect
...@@ -42,8 +43,10 @@ require ( ...@@ -42,8 +43,10 @@ require (
github.com/valyala/tcplisten v1.0.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
golang.org/x/net v0.27.0 // indirect golang.org/x/net v0.27.0 // indirect
golang.org/x/oauth2 v0.22.0 // indirect
golang.org/x/sys v0.22.0 // indirect golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.23.0 // indirect golang.org/x/tools v0.23.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
......
...@@ -17,6 +17,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t ...@@ -17,6 +17,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dghubble/oauth1 v0.7.3 h1:EkEM/zMDMp3zOsX2DC/ZQ2vnEX3ELK0/l9kb+vs4ptE=
github.com/dghubble/oauth1 v0.7.3/go.mod h1:oxTe+az9NSMIucDPDCCtzJGsPhciJV33xocHfcR2sVY=
github.com/g8rswimmer/go-twitter/v2 v2.1.5 h1:Uj9Yuof2UducrP4Xva7irnUJfB9354/VyUXKmc2D5gg= github.com/g8rswimmer/go-twitter/v2 v2.1.5 h1:Uj9Yuof2UducrP4Xva7irnUJfB9354/VyUXKmc2D5gg=
github.com/g8rswimmer/go-twitter/v2 v2.1.5/go.mod h1:/55xWb313KQs25X7oZrNSEwLQNkYHhPsDwFstc45vhc= github.com/g8rswimmer/go-twitter/v2 v2.1.5/go.mod h1:/55xWb313KQs25X7oZrNSEwLQNkYHhPsDwFstc45vhc=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
...@@ -138,6 +140,8 @@ golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= ...@@ -138,6 +140,8 @@ golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA=
golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
...@@ -174,6 +178,8 @@ golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= ...@@ -174,6 +178,8 @@ golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
......
package main package main
import ( import (
"encoding/json" "container/list"
"fmt"
"log/slog"
"time" "time"
twitter "github.com/g8rswimmer/go-twitter/v2" twitter "github.com/g8rswimmer/go-twitter/v2"
"github.com/supabase-community/postgrest-go"
) )
type streamIdx struct { /*
cli Client
idx []Task 处理三种情况:
taskId string
toDbQueue chan []UserTask 1. idx 为空;
// timer time.Timer 2. idx 正常匹配
3. idx 没有匹配上
x 4. 到最后也没匹配上,或者超过了多少页。 外面处理;
*/
type Idx struct {
idx []UserTask
newIdx []UserTask
//taskId string
List *list.List
}
func NewIdx(i []UserTask) *Idx {
return &Idx{
idx: i,
newIdx: make([]UserTask, 0, 10),
}
} }
func (s *streamIdx) Request(taskType, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) { func (s *Idx) Idx(page []UserTask) (bool, *list.List) {
if len(s.newIdx) == 0 {
for k, v := range page {
s.newIdx = append(s.newIdx, v)
if k > 5 {
break
}
}
}
if s.idx != nil && len(s.idx) == 0 {
newList := list.New()
for _, v := range page {
newList.PushFront(v)
}
s.idx = s.newIdx
s.newIdx = make([]UserTask, 0, 10)
switch taskType { return true, newList
case FollowType:
return s.cli.Followers(s.taskId, next)
case RetweetType:
return s.cli.Retweeters(s.taskId, next)
case TweetLikingUsersType:
return s.cli.TweetLikingUsers(s.taskId, next)
} }
return nil, "", nil, fmt.Errorf("currently not suport the task type %v", taskType) //TODO 匹配多个元素,防止用户取消;
for k, v := range page {
for ik, iv := range s.idx {
if v.UserId == iv.UserId {
newList := s.List
s.List = list.New()
//idx
s.idx = s.newIdx
s.newIdx = make([]UserTask, 0, 10)
return true, newList
} else {
s.List.PushFront(v)
}
_, _ = k, ik
}
}
return false, nil
} }
func (s *streamIdx) Idx() { type PageUsers struct {
//cli *Client
idx *Idx
}
func NewPageUsers(idx *Idx) *PageUsers {
return &PageUsers{
//idx: NewIdx(idx),
idx: idx,
// cli: cli,
}
} }
func (s *streamIdx) Scheduler(timer *time.Timer) { type req func(tweetId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error)
for t := range timer.C { /*
_ = t TODO 最后一页的标识,没有处理;
s.Idx() */
//s.Scheduler(time.NewTimer(10 * time.Second)) func (p *PageUsers) Request(tweetId string, next string, f req) ([]UserTask, error) {
//users, next, rt, err := p.cli.TweetLikingUsers(tweetId, next)
users, newNext, rt, err := f(tweetId, next)
if err != nil {
return nil, err
}
if rt.Remaining == 0 {
time.Sleep(time.Until(rt.Reset.Time().Add(500 * time.Millisecond)))
}
taskUser := userObjectToUserTask(users)
ok, l := p.idx.Idx(taskUser)
if ok {
res := make([]UserTask, 0, l.Len())
for e := l.Front(); e != nil; e = e.Next() {
if user, ok := e.Value.(UserTask); ok {
res = append(res, user)
}
}
return res, nil
} }
return p.Request(tweetId, newNext, f)
}
func userObjectToUserTask(pageUsers []*twitter.UserObj) []UserTask {
res := make([]UserTask, 0, len(pageUsers))
for _, v := range pageUsers {
res = append(res, UserTask{
//TaskId string `json:"task_id"`
UserId: v.ID, //string `json:"user_id"`
UserName: v.UserName, //string `json:"user_name"`
})
}
return res
} }
// func scheduler(tick *time.Ticker) { // func scheduler(tick *time.Ticker) {
...@@ -83,123 +183,123 @@ func (s *streamIdx) Scheduler(timer *time.Timer) { ...@@ -83,123 +183,123 @@ func (s *streamIdx) Scheduler(timer *time.Timer) {
// const RetweetType = "retweet" // const RetweetType = "retweet"
type FollowerId struct { // type FollowerId struct {
Follower // Follower
Id int `json:"id"` // Id int `json:"id"`
CreatedAt string `json:"created_at"` // CreatedAt string `json:"created_at"`
} // }
type Follower struct { // type Follower struct {
//user_id // //user_id
UserId string `json:"user_id"` // UserId string `json:"user_id"`
Follower string `json:"follower_id"` // Follower string `json:"follower_id"`
UserName string `json:"follower_username"` // UserName string `json:"follower_username"`
} // }
type RetweeterId struct { // type RetweeterId struct {
Retweeter // Retweeter
Id int `json:"id"` // Id int `json:"id"`
CreatedAt string `json:"created_at"` // CreatedAt string `json:"created_at"`
} // }
type Retweeter struct { // type Retweeter struct {
TweetId string `json:"tweet_id"` // TweetId string `json:"tweet_id"`
RetweeterId string `json:"retweeter_id"` // RetweeterId string `json:"retweeter_id"`
RetweeterUserName string `json:"retweeter_username"` // RetweeterUserName string `json:"retweeter_username"`
} // }
func GetTasksIdx() ([]taskInterface, error) { // func GetTasksIdx() ([]taskInterface, error) {
tasks, err := QueryAllTask() // tasks, err := QueryAllTask()
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
res := make([]taskInterface, 0, 10) // res := make([]taskInterface, 0, 10)
for _, task := range tasks { // for _, task := range tasks {
if task.TaskType == FollowType { // if task.TaskType == FollowType {
data, count, err := client.From("followers").Select("", "user_id", false). // data, count, err := client.From("followers").Select("", "user_id", false).
Eq("user_id", task.TaskId). // Eq("user_id", task.TaskId).
Order("id", &postgrest.OrderOpts{ // Order("id", &postgrest.OrderOpts{
Ascending: false, // Ascending: false,
// NullsFirst bool // // NullsFirst bool
// ForeignTable string // // ForeignTable string
}).Range(0, 10, "").Execute() // }).Range(0, 10, "").Execute()
if err != nil { // if err != nil {
slog.Error("select * from followers error", err) // slog.Error("select * from followers error", err)
return nil, err // return nil, err
} // }
_ = count // _ = count
slog.Info("idx data", "user id", task.TaskId, "user name", task.User, "idx", data) // slog.Info("idx data", "user id", task.TaskId, "user name", task.User, "idx", data)
fmt.Println("idx data", string(data)) // fmt.Println("idx data", string(data))
userRes := make([]FollowerId, 0, 10) // userRes := make([]FollowerId, 0, 10)
if err := json.Unmarshal(data, &userRes); err != nil { // if err := json.Unmarshal(data, &userRes); err != nil {
return nil, err // return nil, err
} // }
followTask := NewFollowTask(task.TaskId, task.User, task.TaskType) // followTask := NewFollowTask(task.TaskId, task.User, task.TaskType)
followTask.Idx = userRes // followTask.Idx = userRes
// if len(userRes) == 0 { // // if len(userRes) == 0 {
// followTask.Init = true // // followTask.Init = true
// } // // }
res = append(res, followTask) // res = append(res, followTask)
} // }
if task.TaskType == RetweetType { // if task.TaskType == RetweetType {
data, count, err := client.From("retweeters").Select("", "tweet_id", false). // data, count, err := client.From("retweeters").Select("", "tweet_id", false).
Eq("tweet_id", task.TaskId). // Eq("tweet_id", task.TaskId).
Order("id", &postgrest.OrderOpts{ // Order("id", &postgrest.OrderOpts{
Ascending: false, // Ascending: false,
// NullsFirst bool // // NullsFirst bool
// ForeignTable string // // ForeignTable string
}).Range(0, 10, "").Execute() // }).Range(0, 10, "").Execute()
if err != nil { // if err != nil {
slog.Error("select * from retweeters error", err) // slog.Error("select * from retweeters error", err)
return nil, err // return nil, err
} // }
_ = count // _ = count
slog.Info("idx data", "tweet id", task.TaskId, "user name", task.User, "idx", data) // slog.Info("idx data", "tweet id", task.TaskId, "user name", task.User, "idx", data)
fmt.Println("idx data", string(data)) // fmt.Println("idx data", string(data))
userRes := make([]RetweeterId, 0, 10) // userRes := make([]RetweeterId, 0, 10)
if err := json.Unmarshal(data, &userRes); err != nil { // if err := json.Unmarshal(data, &userRes); err != nil {
return nil, err // return nil, err
} // }
retweetTask := NewRetweetTask(task.TaskId, task.User, task.TaskType) // retweetTask := NewRetweetTask(task.TaskId, task.User, task.TaskType)
//followTask := NewFollowTask(task.TaskId, task.User, task.TaskType) // //followTask := NewFollowTask(task.TaskId, task.User, task.TaskType)
retweetTask.Idx = userRes // retweetTask.Idx = userRes
// if len(userRes) == 0 { // // if len(userRes) == 0 {
// followTask.Init = true // // followTask.Init = true
// } // // }
res = append(res, retweetTask) // res = append(res, retweetTask)
} // }
} // }
return res, nil // return res, nil
} // }
// func FollowersToBackList(done <-chan interface{}, idxss [][]FollowerId) (<-chan *list.List, error) { // func FollowersToBackList(done <-chan interface{}, idxss [][]FollowerId) (<-chan *list.List, error) {
......
package main
import (
"log/slog"
"time"
// "github.com/gofiber/fiber/v2"
// "github.com/gofiber/fiber/v2/middleware/cors"
// "github.com/gofiber/swagger"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/swagger"
// docs are generated by Swag CLI, you have to import them.
// replace with your own docs folder, usually "github.com/username/reponame/docs"
//_ "github.com/gofiber/swagger/example/docs"
)
func mainBak() {
app := fiber.New()
app.Use(cors.New())
// app.Get("/", func(c *fiber.Ctx) error {
// return c.SendString("OK")
// })
app.Static("/*", "./public", fiber.Static{
Compress: true,
ByteRange: true,
Browse: true,
CacheDuration: 10 * time.Second,
MaxAge: 3600,
Download: true,
})
// app.Static("/*", "./public/")
// app.Static("/swagger/docs", "./docs")
// app.Static("/swagger/docs", "./docs")
app.Get("/swagger/*", swagger.New(swagger.Config{ // custom
URL: "http://43.198.54.207:8001/swagger/docs/swagger.yaml", //http://124.193.167.71:8000/
DeepLinking: false,
// Expand ("list") or Collapse ("none") tag groups by default
DocExpansion: "none",
// Prefill OAuth ClientId on Authorize popup
// OAuth: &swagger.OAuthConfig{
// AppName: "OAuth Provider",
// ClientId: "21bb4edc-05a7-4afc-86f1-2e151e4ba6e2",
// },
// Ability to change OAuth2 redirect uri location
//OAuth2RedirectUrl: "http://localhost:8080/swagger/oauth2-redirect.html",
}))
// app.Static("/swagger/docs", "./docs")
// app.Get("/swagger/*", swagger.New(swagger.Config{ // custom
// URL: "http://43.198.54.207:8001/swagger/docs/swagger.yaml", //http://124.193.167.71:8000/
// DeepLinking: false,
// // Expand ("list") or Collapse ("none") tag groups by default
// DocExpansion: "none",
// // Prefill OAuth ClientId on Authorize popup
// // OAuth: &swagger.OAuthConfig{
// // AppName: "OAuth Provider",
// // ClientId: "21bb4edc-05a7-4afc-86f1-2e151e4ba6e2",
// // },
// // Ability to change OAuth2 redirect uri location
// //OAuth2RedirectUrl: "http://localhost:8080/swagger/oauth2-redirect.html",
// }))
if err := app.Listen(":8001"); err != nil {
slog.Error(err.Error())
}
}
...@@ -26,13 +26,13 @@ func main() { ...@@ -26,13 +26,13 @@ func main() {
defer close(done) defer close(done)
//go func() { //go func() {
taskInStream, err := newSync(done) // taskInStream, err := newSync(done)
if err != nil { // if err != nil {
panic(err) // panic(err)
} // }
//}() // //}()
taskIn = taskInStream // taskIn = taskInStream
app := fiber.New() app := fiber.New()
app.Use(cors.New()) app.Use(cors.New())
...@@ -53,7 +53,8 @@ func main() { ...@@ -53,7 +53,8 @@ func main() {
//OAuth2RedirectUrl: "http://localhost:8080/swagger/oauth2-redirect.html", //OAuth2RedirectUrl: "http://localhost:8080/swagger/oauth2-redirect.html",
})) }))
app.Post("/task/add", TaskAdd) app.Post("/project", Project)
//app.Post("/task/add", TaskAdd)
app.Post("/task/stop", TaskStop) app.Post("/task/stop", TaskStop)
app.Get("/verify/follower", VerifyFollower) app.Get("/verify/follower", VerifyFollower)
app.Get("/verify/retweeter", VerifyRetweeter) app.Get("/verify/retweeter", VerifyRetweeter)
......
package main
import (
"context"
"fmt"
"net/http"
twitter "github.com/g8rswimmer/go-twitter/v2"
)
//https://github.com/michimani/gotwi.git
const (
OAuthTokenEnvKeyName = "GOTWI_ACCESS_TOKEN"
OAuthTokenSecretEnvKeyName = "GOTWI_ACCESS_TOKEN_SECRET"
)
type authorize struct {
Token string
}
func (a authorize) Add(req *http.Request) {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", a.Token))
}
type Client struct {
*twitter.Client
}
func NewOAuth2Client() *Client {
client := &twitter.Client{
Authorizer: authorize{
Token: "",
},
Client: http.DefaultClient,
Host: "https://api.twitter.com",
}
return &Client{
Client: client,
}
}
func (c *Client) Followers(userId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
opts := twitter.UserFollowersLookupOpts{
Expansions: []twitter.Expansion{twitter.ExpansionPinnedTweetID},
TweetFields: []twitter.TweetField{twitter.TweetFieldContextAnnotations},
PaginationToken: next,
}
fmt.Println("Callout to user followers lookup callout")
userResponse, err := c.Client.UserFollowersLookup(context.Background(), userId, opts)
if err != nil {
return nil, "", nil, err
}
return userResponse.Raw.Users, userResponse.Meta.NextToken, userResponse.RateLimit, nil
}
func (c *Client) Retweeters(tweetId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
opts := twitter.UserRetweetLookupOpts{
Expansions: []twitter.Expansion{twitter.ExpansionPinnedTweetID},
PaginationToken: next,
}
userResponse, err := c.Client.UserRetweetLookup(context.Background(), tweetId, opts)
if err != nil {
return nil, "", nil, err
}
return userResponse.Raw.Users, userResponse.Meta.NextToken, userResponse.RateLimit, nil
}
func (c *Client) TweetLikingUsers(tweetId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
opts := twitter.TweetLikesLookupOpts{
Expansions: []twitter.Expansion{twitter.ExpansionPinnedTweetID},
TweetFields: []twitter.TweetField{twitter.TweetFieldCreatedAt, twitter.TweetFieldConversationID, twitter.TweetFieldAttachments},
PaginationToken: next,
}
fmt.Println("Callout to tweet like lookup callout")
tweetResponse, err := c.Client.TweetLikesLookup(context.Background(), tweetId, opts)
if err != nil {
return nil, "", nil, err
}
return tweetResponse.Raw.Users, tweetResponse.Meta.NextToken, tweetResponse.RateLimit, nil
}
package main package main
import (
b64 "encoding/base64"
"encoding/json"
"fmt"
"log/slog"
"strings"
"github.com/supabase-community/postgrest-go"
)
const FollowType = "followers" const FollowType = "followers"
const RetweetType = "retweeters" const RetweetType = "retweeters"
const TweetLikingUsersType = "tweet_liking_users" const TweetLikingUsersType = "tweet_liking_users"
func ListToQueue(done <-chan interface{}, inStream <-chan TaskIdAndList) <-chan TaskIdAndProfiles { // func ListToQueue(done <-chan interface{}, inStream <-chan TaskIdAndList) <-chan TaskIdAndProfiles {
outStream := make(chan TaskIdAndProfiles, 1) // outStream := make(chan TaskIdAndProfiles, 1)
go func() { // go func() {
for { // for {
select { // select {
case <-done: // case <-done:
return // return
case users, ok := <-inStream: // case users, ok := <-inStream:
if ok == false { // if ok == false {
return // return
} // }
c := 0 // c := 0
// if c < 100 { // // if c < 100 {
// c = c + 1 // // c = c + 1
res := make([]Profile, 0, users.List.Len()) // res := make([]Profile, 0, users.List.Len())
for e := users.List.Front(); e != nil; e = e.Next() { // for e := users.List.Front(); e != nil; e = e.Next() {
if user, ok := e.Value.(Profile); ok { // if user, ok := e.Value.(Profile); ok {
//fmt.Printf("The data is a string: %s\n", str) // //fmt.Printf("The data is a string: %s\n", str)
res = append(res, user) // res = append(res, user)
c = c + 1 // c = c + 1
if c%100 == 0 { // if c%100 == 0 {
fmt.Println("BackListToQueue", "len(inStream)", len(inStream), "len(outStream)", len(outStream), "len(res)", len(res)) // fmt.Println("BackListToQueue", "len(inStream)", len(inStream), "len(outStream)", len(outStream), "len(res)", len(res))
select { // select {
case <-done: // case <-done:
return // return
case outStream <- TaskIdAndProfiles{ // case outStream <- TaskIdAndProfiles{
Profiles: res, // Profiles: res,
TaskId: users.TaskId, // TaskId: users.TaskId,
TaskType: users.TaskType, // TaskType: users.TaskType,
}: // }:
res = make([]Profile, 0, users.List.Len()) // res = make([]Profile, 0, users.List.Len())
} // }
} // }
} // }
} // }
fmt.Println("BackListToQueue", "len(inStream)", len(inStream), "len(outStream)", len(outStream), "len(res)", len(res)) // fmt.Println("BackListToQueue", "len(inStream)", len(inStream), "len(outStream)", len(outStream), "len(res)", len(res))
select { // select {
case <-done: // case <-done:
return // return
case outStream <- TaskIdAndProfiles{ // case outStream <- TaskIdAndProfiles{
Profiles: res, // Profiles: res,
TaskId: users.TaskId, // TaskId: users.TaskId,
TaskType: users.TaskType, // TaskType: users.TaskType,
}: // }:
} // }
// } else { // // } else {
// c = 0 // // c = 0
// } // // }
} // }
} // }
}() // }()
return outStream
}
func InsertOrUpdateFinishTask(done <-chan interface{}, inStream <-chan TaskIdAndProfiles) error { // return outStream
// }
go func() { // func InsertOrUpdateFinishTask(done <-chan interface{}, inStream <-chan TaskIdAndProfiles) error {
for { // go func() {
select {
case <-done:
return
case users, ok := <-inStream:
if ok == false {
return
}
var res []byte // for {
var err error // select {
// case <-done:
// return
// case users, ok := <-inStream:
// if ok == false {
// return
// }
//if users.TaskType == FollowType { // var res []byte
rows := make([]Follower, 0, len(users.Profiles)) // var err error
for _, user := range users.Profiles { // //if users.TaskType == FollowType {
// rows := make([]Follower, 0, len(users.Profiles))
sDec, _ := b64.StdEncoding.DecodeString(user.UserID) // for _, user := range users.Profiles {
userId, _ := strings.CutPrefix(string(sDec), "User:")
row := Follower{ // sDec, _ := b64.StdEncoding.DecodeString(user.UserID)
Follower: userId, // userId, _ := strings.CutPrefix(string(sDec), "User:")
UserName: user.Username,
UserId: users.TaskId,
}
rows = append(rows, row)
}
res, _, err = client.From(users.TaskType).Insert(rows, true, "", "representation", "").Execute()
//}
if err != nil { // row := Follower{
slog.Error("insert into followers or retweeters ", err) // Follower: userId,
// UserName: user.Username,
// UserId: users.TaskId,
// }
// rows = append(rows, row)
// }
// res, _, err = client.From(users.TaskType).Insert(rows, true, "", "representation", "").Execute()
// //}
for _, user := range users.Profiles { // if err != nil {
usersAsJson, err := json.Marshal(user) // slog.Error("insert into followers or retweeters ", err)
if err != nil {
slog.Error("insert into followers or retweeters json.Marshal", err)
continue
}
sDec, _ := b64.StdEncoding.DecodeString(user.UserID) // for _, user := range users.Profiles {
userId, _ := strings.CutPrefix(string(sDec), "User:") // usersAsJson, err := json.Marshal(user)
// if err != nil {
// slog.Error("insert into followers or retweeters json.Marshal", err)
// continue
// }
slog.Error("insert into followers or retweeters error", string(usersAsJson), userId) // sDec, _ := b64.StdEncoding.DecodeString(user.UserID)
} // userId, _ := strings.CutPrefix(string(sDec), "User:")
} else { // slog.Error("insert into followers or retweeters error", string(usersAsJson), userId)
slog.Info("insert into followers or retweeters", string(res), err) // }
}
fmt.Println("InsertOrUpdateUsers", "len(inStream)", len(inStream)) // } else {
// slog.Info("insert into followers or retweeters", string(res), err)
// }
} // fmt.Println("InsertOrUpdateUsers", "len(inStream)", len(inStream))
}
}() // }
// }
return nil // }()
} // return nil
type UserTaskIdAndTime struct { // }
UserTask
Id int `json:"id"`
CreatedAt string `json:"created_at"`
}
type UserTask struct { // type UserTaskIdAndTime struct {
//user_id // UserTask
TaskId string `json:"task_id"` // Id int `json:"id"`
UserId string `json:"user_id"` // CreatedAt string `json:"created_at"`
UserName string `json:"user_name"` // }
}
type Task struct { type TaskInDB struct {
// ID int `json:"id"` // ID int `json:"id"`
User string `json:"user"` User string `json:"user"`
TaskType string `json:"task_type"` TaskType string `json:"task_type"`
...@@ -172,75 +155,94 @@ type Task struct { ...@@ -172,75 +155,94 @@ type Task struct {
Stop bool `json:"stop"` Stop bool `json:"stop"`
} }
func QueryAllTask() ([]Task, error) { // func QueryAllTask() ([]TaskInDB, error) {
data, count, err := client.From("tasks").Select("*", "exact", false). // data, count, err := client.From("tasks").Select("*", "exact", false).
Eq("start", "true").Neq("stop", "true"). // Eq("start", "true").Neq("stop", "true").
Execute() // Execute()
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
_ = count // _ = count
// fmt.Println(count, string(data)) // // fmt.Println(count, string(data))
res := make([]Task, 0, count) // res := make([]TaskInDB, 0, count)
if err := json.Unmarshal(data, &res); err != nil { // if err := json.Unmarshal(data, &res); err != nil {
return nil, err // return nil, err
} // }
return res, nil // return res, nil
} // }
func GetTasksIdxWithTaskType() ([]*TaskJob, error) { // func GetTasksIdxWithTaskType() ([]*TaskJob, error) {
tasks, err := QueryAllTask() // tasks, err := QueryAllTask()
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
// res := make([]*TaskJob, 0, 10)
// for _, task := range tasks {
// //if task.TaskType == FollowType {
// data, count, err := client.From(task.TaskType).Select("", "user_id", false).
// Eq("user_id", task.TaskId).
// Order("id", &postgrest.OrderOpts{
// Ascending: false,
// // NullsFirst bool
// // ForeignTable string
// }).Range(0, 10, "").Execute()
// if err != nil {
// slog.Error("select * from followers error", err)
// return nil, err
// }
// _ = count
res := make([]*TaskJob, 0, 10) // slog.Info("idx data", "user id", task.TaskId, "user name", task.User, "idx", data)
for _, task := range tasks { // fmt.Println("idx data", string(data))
//if task.TaskType == FollowType { // userRes := make([]UserTaskIdAndTime, 0, 10)
data, count, err := client.From(task.TaskType).Select("", "user_id", false). // if err := json.Unmarshal(data, &userRes); err != nil {
Eq("user_id", task.TaskId). // return nil, err
Order("id", &postgrest.OrderOpts{ // }
Ascending: false,
// NullsFirst bool
// ForeignTable string
}).Range(0, 10, "").Execute()
if err != nil { // taskJob := NewTaskJob(task.TaskId, task.User, task.TaskType)
slog.Error("select * from followers error", err)
return nil, err
}
_ = count
slog.Info("idx data", "user id", task.TaskId, "user name", task.User, "idx", data) // taskJob.Idx = userRes
fmt.Println("idx data", string(data)) // res = append(res, taskJob)
userRes := make([]UserTaskIdAndTime, 0, 10) // }
if err := json.Unmarshal(data, &userRes); err != nil { // return res, nil
return nil, err // }
}
taskJob := NewTaskJob(task.TaskId, task.User, task.TaskType) type UserTask struct {
//user_id
TaskId string `json:"task_id"`
UserId string `json:"user_id"`
UserName string `json:"user_name"`
}
taskJob.Idx = userRes func InsertTaskRes(content UserTask, tableName string) error {
res = append(res, taskJob) res, _, err := client.From("retweeters").Insert(content, true, "", "representation", "").Execute()
if err != nil {
return err
} }
return res, nil _ = res
return nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment