Commit 98d77422 authored by Ubuntu's avatar Ubuntu

add twitter api

parent 3af78dfd
......@@ -50,6 +50,8 @@ type AddTaskReq struct {
TaskId string `json:"task_id"`
}
var taskIn chan<- taskInterface
func TaskAdd(c *fiber.Ctx) error {
//fmt.Println(string(c.Request().Body()))
......@@ -74,6 +76,19 @@ func TaskAdd(c *fiber.Ctx) error {
Msg: "must provide TaskId and TaskType",
})
}
var task taskInterface
if req.TaskType == FollowType {
task = NewFollowTask(req.TaskId, req.User, req.TaskType)
}
if req.TaskType == RetweetType {
task = NewRetweetTask(req.TaskId, req.User, req.TaskType)
}
taskIn <- task
//Todo
// 校验任务 条件是否存在;
......
package main
import "encoding/json"
type Task struct {
// ID int `json:"id"`
User string `json:"user"`
TaskType string `json:"task_type"`
TaskId string `json:"task_id"`
Start bool `json:"start"`
Stop bool `json:"stop"`
}
func QueryAllTask() ([]Task, error) {
data, count, err := client.From("tasks").Select("*", "exact", false).
Eq("start", "true").Neq("stop", "true").
Execute()
if err != nil {
return nil, err
}
_ = count
// fmt.Println(count, string(data))
res := make([]Task, 0, count)
if err := json.Unmarshal(data, &res); err != nil {
return nil, err
}
return res, nil
}
func VerifyFollowerInDb(userId, followerId string) (bool, error) {
_, count, err := client.From("followers").Select("*", "exact", false).Eq("user_id", userId).Eq("follower_id", followerId).Execute()
......
......@@ -123,13 +123,13 @@ paths:
required: true
explode: true
type: string
example: available
example: "1570057485914087429"
- name: follower_id
in: query
required: true
explode: true
type: string
example: available
example: "1577163456993558529"
- name: follower_username
in: query
required: false
......@@ -164,14 +164,14 @@ paths:
explode: true
type: string
# default: available
example: available
example: "1807764329489375585"
- name: retweeter_id
in: query
required: true
explode: true
type: string
# default: available
example: available
example: "1686945219571589120"
- name: begin_time
in: query
type: string
......
......@@ -9,6 +9,7 @@ require (
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/g8rswimmer/go-twitter/v2 v2.1.5 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/spec v0.21.0 // indirect
......@@ -23,6 +24,7 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/michimani/gotwi v0.15.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
......@@ -48,5 +50,4 @@ require (
sigs.k8s.io/yaml v1.4.0 // indirect
)
replace github.com/imperatrona/twitter-scraper => ../twitter-scraper
replace github.com/imperatrona/twitter-scraper => ../twitter-scraper
......@@ -17,6 +17,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/g8rswimmer/go-twitter/v2 v2.1.5 h1:Uj9Yuof2UducrP4Xva7irnUJfB9354/VyUXKmc2D5gg=
github.com/g8rswimmer/go-twitter/v2 v2.1.5/go.mod h1:/55xWb313KQs25X7oZrNSEwLQNkYHhPsDwFstc45vhc=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
......@@ -72,6 +74,8 @@ github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZ
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/michimani/gotwi v0.15.0 h1:fD7606Lkm8u+aM5Y2pgpWp8jujGm/HqUXyoK8WCDuUY=
github.com/michimani/gotwi v0.15.0/go.mod h1:yz1cyV/30Uy/KGQyN8BVfXFPt/63Imzonykny8/SMi0=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
......
......@@ -25,11 +25,14 @@ func main() {
defer close(done)
go func() {
if err := newSync(done); err != nil {
panic(err)
}
}()
//go func() {
taskInStream, err := newSync(done)
if err != nil {
panic(err)
}
//}()
taskIn = taskInStream
app := fiber.New()
app.Use(cors.New())
......
......@@ -12,7 +12,7 @@ import (
twitterscraper "github.com/imperatrona/twitter-scraper"
)
func newSync(done <-chan interface{}) error {
func newSync(done <-chan interface{}) (chan<- taskInterface, error) {
//done := make(<-chan interface{})
......@@ -20,7 +20,7 @@ func newSync(done <-chan interface{}) error {
tasks, err := GetTasksIdx()
if err != nil {
return err
return nil, err
}
for _, task := range tasks {
connStream <- task
......@@ -31,12 +31,12 @@ func newSync(done <-chan interface{}) error {
usersStream := BackListToQueue(done, listStream)
if err := InsertOrUpdateUsers(done, usersStream); err != nil {
return err
return nil, err
}
resStream, err := InitResource()
if err != nil {
return err
return nil, err
}
availRes := AvailableResource(done, resStream)
......@@ -49,7 +49,7 @@ func newSync(done <-chan interface{}) error {
//select {}
return nil
return connStream, nil
}
......
package main
import (
"context"
"fmt"
"net/http"
twitter "github.com/g8rswimmer/go-twitter/v2"
)
//https://github.com/michimani/gotwi.git
const (
OAuthTokenEnvKeyName = "GOTWI_ACCESS_TOKEN"
OAuthTokenSecretEnvKeyName = "GOTWI_ACCESS_TOKEN_SECRET"
)
type authorize struct {
Token string
}
func (a authorize) Add(req *http.Request) {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", a.Token))
}
type Client struct {
*twitter.Client
}
func NewOAuth2Client() *Client {
client := &twitter.Client{
Authorizer: authorize{
Token: "",
},
Client: http.DefaultClient,
Host: "https://api.twitter.com",
}
return &Client{
Client: client,
}
}
func (c *Client) Followers(userId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
opts := twitter.UserFollowersLookupOpts{
Expansions: []twitter.Expansion{twitter.ExpansionPinnedTweetID},
TweetFields: []twitter.TweetField{twitter.TweetFieldContextAnnotations},
PaginationToken: next,
}
fmt.Println("Callout to user followers lookup callout")
userResponse, err := c.Client.UserFollowersLookup(context.Background(), userId, opts)
if err != nil {
return nil, "", nil, err
}
return userResponse.Raw.Users, userResponse.Meta.NextToken, userResponse.RateLimit, nil
}
func (c *Client) Retweeters(tweetId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
opts := twitter.UserRetweetLookupOpts{
Expansions: []twitter.Expansion{twitter.ExpansionPinnedTweetID},
PaginationToken: next,
}
userResponse, err := c.Client.UserRetweetLookup(context.Background(), tweetId, opts)
if err != nil {
return nil, "", nil, err
}
return userResponse.Raw.Users, userResponse.Meta.NextToken, userResponse.RateLimit, nil
}
func (c *Client) TweetLikingUsers(tweetId string, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
opts := twitter.TweetLikesLookupOpts{
Expansions: []twitter.Expansion{twitter.ExpansionPinnedTweetID},
TweetFields: []twitter.TweetField{twitter.TweetFieldCreatedAt, twitter.TweetFieldConversationID, twitter.TweetFieldAttachments},
PaginationToken: next,
}
fmt.Println("Callout to tweet like lookup callout")
tweetResponse, err := c.Client.TweetLikesLookup(context.Background(), tweetId, opts)
if err != nil {
return nil, "", nil, err
}
return tweetResponse.Raw.Users, tweetResponse.Meta.NextToken, tweetResponse.RateLimit, nil
}
package main
func Controller(done <-chan interface{}, inStream <-chan taskInterface) (<-chan taskInterface, <-chan taskInterface) {
return nil, nil
}
func Idx(done <-chan interface{}, inStream <-chan taskInterface) (<-chan taskInterface, <-chan taskInterface) {
return nil, nil
}
func Scheduler(done <-chan interface{}, inStream <-chan taskInterface) (<-chan taskInterface, <-chan taskInterface) {
return nil, nil
}
func Request(done <-chan interface{}, inStream <-chan taskInterface) (<-chan taskInterface, <-chan taskInterface) {
return nil, nil
}
func ConnectTask(done <-chan interface{}, beginStream chan<- taskInterface, tailStream <-chan taskInterface) {
go func() {
for {
select {
case <-done:
return
case beginStream <- <-tailStream:
}
}
}()
}
package main
import (
b64 "encoding/base64"
"encoding/json"
"fmt"
"log/slog"
"strings"
"github.com/supabase-community/postgrest-go"
)
const FollowType = "followers"
const RetweetType = "retweeters"
const TweetLikingUsersType = "tweet_liking_users"
func ListToQueue(done <-chan interface{}, inStream <-chan TaskIdAndList) <-chan TaskIdAndProfiles {
outStream := make(chan TaskIdAndProfiles, 1)
go func() {
for {
select {
case <-done:
return
case users, ok := <-inStream:
if ok == false {
return
}
c := 0
// if c < 100 {
// c = c + 1
res := make([]Profile, 0, users.List.Len())
for e := users.List.Front(); e != nil; e = e.Next() {
if user, ok := e.Value.(Profile); ok {
//fmt.Printf("The data is a string: %s\n", str)
res = append(res, user)
c = c + 1
if c%100 == 0 {
fmt.Println("BackListToQueue", "len(inStream)", len(inStream), "len(outStream)", len(outStream), "len(res)", len(res))
select {
case <-done:
return
case outStream <- TaskIdAndProfiles{
Profiles: res,
TaskId: users.TaskId,
TaskType: users.TaskType,
}:
res = make([]Profile, 0, users.List.Len())
}
}
}
}
fmt.Println("BackListToQueue", "len(inStream)", len(inStream), "len(outStream)", len(outStream), "len(res)", len(res))
select {
case <-done:
return
case outStream <- TaskIdAndProfiles{
Profiles: res,
TaskId: users.TaskId,
TaskType: users.TaskType,
}:
}
// } else {
// c = 0
// }
}
}
}()
return outStream
}
func InsertOrUpdateFinishTask(done <-chan interface{}, inStream <-chan TaskIdAndProfiles) error {
go func() {
for {
select {
case <-done:
return
case users, ok := <-inStream:
if ok == false {
return
}
var res []byte
var err error
//if users.TaskType == FollowType {
rows := make([]Follower, 0, len(users.Profiles))
for _, user := range users.Profiles {
sDec, _ := b64.StdEncoding.DecodeString(user.UserID)
userId, _ := strings.CutPrefix(string(sDec), "User:")
row := Follower{
Follower: userId,
UserName: user.Username,
UserId: users.TaskId,
}
rows = append(rows, row)
}
res, _, err = client.From(users.TaskType).Insert(rows, true, "", "representation", "").Execute()
//}
if err != nil {
slog.Error("insert into followers or retweeters ", err)
for _, user := range users.Profiles {
usersAsJson, err := json.Marshal(user)
if err != nil {
slog.Error("insert into followers or retweeters json.Marshal", err)
continue
}
sDec, _ := b64.StdEncoding.DecodeString(user.UserID)
userId, _ := strings.CutPrefix(string(sDec), "User:")
slog.Error("insert into followers or retweeters error", string(usersAsJson), userId)
}
} else {
slog.Info("insert into followers or retweeters", string(res), err)
}
fmt.Println("InsertOrUpdateUsers", "len(inStream)", len(inStream))
}
}
}()
return nil
}
type UserTaskIdAndTime struct {
UserTask
Id int `json:"id"`
CreatedAt string `json:"created_at"`
}
type UserTask struct {
//user_id
TaskId string `json:"task_id"`
UserId string `json:"user_id"`
UserName string `json:"user_name"`
}
type Task struct {
// ID int `json:"id"`
User string `json:"user"`
TaskType string `json:"task_type"`
TaskId string `json:"task_id"`
Start bool `json:"start"`
Stop bool `json:"stop"`
}
func QueryAllTask() ([]Task, error) {
data, count, err := client.From("tasks").Select("*", "exact", false).
Eq("start", "true").Neq("stop", "true").
Execute()
if err != nil {
return nil, err
}
_ = count
// fmt.Println(count, string(data))
res := make([]Task, 0, count)
if err := json.Unmarshal(data, &res); err != nil {
return nil, err
}
return res, nil
}
func GetTasksIdxWithTaskType() ([]*TaskJob, error) {
tasks, err := QueryAllTask()
if err != nil {
return nil, err
}
res := make([]*TaskJob, 0, 10)
for _, task := range tasks {
//if task.TaskType == FollowType {
data, count, err := client.From(task.TaskType).Select("", "user_id", false).
Eq("user_id", task.TaskId).
Order("id", &postgrest.OrderOpts{
Ascending: false,
// NullsFirst bool
// ForeignTable string
}).Range(0, 10, "").Execute()
if err != nil {
slog.Error("select * from followers error", err)
return nil, err
}
_ = count
slog.Info("idx data", "user id", task.TaskId, "user name", task.User, "idx", data)
fmt.Println("idx data", string(data))
userRes := make([]UserTaskIdAndTime, 0, 10)
if err := json.Unmarshal(data, &userRes); err != nil {
return nil, err
}
taskJob := NewTaskJob(task.TaskId, task.User, task.TaskType)
taskJob.Idx = userRes
res = append(res, taskJob)
}
return res, nil
}
......@@ -4,10 +4,57 @@ import (
"encoding/json"
"fmt"
"log/slog"
"time"
twitter "github.com/g8rswimmer/go-twitter/v2"
"github.com/supabase-community/postgrest-go"
)
type streamIdx struct {
cli Client
idx []Task
taskId string
toDbQueue chan []UserTask
// timer time.Timer
}
func (s *streamIdx) Request(taskType, next string) ([]*twitter.UserObj, string, *twitter.RateLimit, error) {
switch taskType {
case FollowType:
return s.cli.Followers(s.taskId, next)
case RetweetType:
return s.cli.Retweeters(s.taskId, next)
case TweetLikingUsersType:
return s.cli.TweetLikingUsers(s.taskId, next)
}
return nil, "", nil, fmt.Errorf("currently not suport the task type %v", taskType)
}
func (s *streamIdx) Idx() {
}
func (s *streamIdx) Scheduler(timer *time.Timer) {
for t := range timer.C {
_ = t
s.Idx()
//s.Scheduler(time.NewTimer(10 * time.Second))
}
}
// func scheduler(tick *time.Ticker) {
// for t := range tick.C {
// task(t)
// }
// }
// func task(t time.Time) {
// fmt.Println("hello! printed at ", t)
// }
// func GetFollowTasks() ([]Task, error) {
// data, count, err := client.From("tasks").Select("*", "exact", false).
......@@ -28,8 +75,13 @@ import (
// }
// follow, retweet
const FollowType = "follow"
const RetweetType = "retweet"
// const FollowType = "follow"
// const RetweetType = "retweet"
// const FollowType = "follow"
// followers
// const RetweetType = "retweet"
type FollowerId struct {
Follower
......
package main
import (
"time"
"github.com/g8rswimmer/go-twitter/v2"
)
type Token struct {
Bearer string
ConsumerKey string
ConsumerSecret string
AccessToken string
AccessSecret string
//MonthRateLimit
}
// const FollowRateLimit
type Fee struct {
RateLimits map[string]twitter.RateLimit
Apps int
}
// type RateLimit struct {
// Duration time.Duration
// Num int
// }
type Project struct {
name string
token Token
fee Fee
}
var rateLimit map[string]twitter.RateLimit
const ProjectMonthKey = "ProjectMonth"
// const RetweetPerApp = "RetweetPerApp"
// const RetweetPerUser = "RetweetPerUser"
func getMonthEnd(t time.Time) time.Time {
//func getMonthStartAndEnd(t time.Time) (time.Time, time.Time) {
year, month, _ := t.Date()
start := time.Date(year, month, 1, 0, 0, 0, 0, t.Location())
end := start.AddDate(0, 1, -1).Add(23 * time.Hour).Add(59 * time.Minute).Add(59 * time.Second).Add(999 * time.Nanosecond)
//return start, end
return end
}
func init() {
rateLimit[ProjectMonthKey] = twitter.RateLimit{
Limit: 10000,
Reset: twitter.Epoch(getMonthEnd(time.Now()).Unix()),
}
}
package main
import "container/list"
type TaskJob struct {
//URL string
TaskTypeStr string
TaskId string
UserNameStr string
Next string
Idx []UserTaskIdAndTime
//[]RetweeterId
// backPushPop = list.New()
backPushPop *list.List
NewIdx []UserTaskIdAndTime
Res Users
}
// func (r *RetweetTask) URl() string {
// return r.URL
// }
func NewTaskJob(Id, userName, taskType string) *TaskJob {
return &TaskJob{
TaskTypeStr: taskType,
TaskId: Id,
UserNameStr: userName,
backPushPop: list.New(),
Idx: make([]UserTaskIdAndTime, 0),
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment