Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
twitter_syncer
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Odysseus
twitter_syncer
Commits
86bf7697
Commit
86bf7697
authored
Aug 18, 2024
by
Ubuntu
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
retweeter ok
parent
fadd4f42
Changes
10
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
539 additions
and
399 deletions
+539
-399
api_db.go
api_db.go
+22
-2
api_service.go
api_service.go
+50
-2
db.go
db.go
+296
-0
db_test.go
db_test.go
+28
-9
swagger.yaml
docs/swagger.yaml
+19
-12
idx.go
idx.go
+20
-10
main.go
main.go
+16
-7
scraper.go.bak
scraper.go.bak
+0
-109
stream_db.go
stream_db.go
+0
-248
task.go
task.go
+88
-0
No files found.
api_db.go
View file @
86bf7697
package
main
import
"encoding/json"
func
VerifyFollowerInDb
(
userId
,
followerId
string
)
(
bool
,
error
)
{
_
,
count
,
err
:=
client
.
From
(
"followers"
)
.
Select
(
"*"
,
"exact"
,
false
)
.
Eq
(
"user_id"
,
userId
)
.
Eq
(
"follower_id"
,
followerId
)
.
Execute
()
...
...
@@ -13,7 +15,7 @@ func VerifyFollowerInDb(userId, followerId string) (bool, error) {
func
VerifyRetweeterInDb
(
tweetId
,
retweeter
string
)
(
bool
,
error
)
{
_
,
count
,
err
:=
client
.
From
(
"retweeters"
)
.
Select
(
"*"
,
"exact"
,
false
)
.
Eq
(
"t
weet_id"
,
tweetId
)
.
Eq
(
"retweet
er_id"
,
retweeter
)
.
Execute
()
_
,
count
,
err
:=
client
.
From
(
"retweeters"
)
.
Select
(
"*"
,
"exact"
,
false
)
.
Eq
(
"t
ask_id"
,
tweetId
)
.
Eq
(
"us
er_id"
,
retweeter
)
.
Execute
()
if
err
!=
nil
{
return
false
,
err
...
...
@@ -83,7 +85,7 @@ func AddOrUpdateProject(cfg ProjectReq, user UserInfo) error {
return
err
}
func
QueryProject
(
cfg
ProjectReq
)
(
bool
,
error
)
{
func
QueryProject
ByKeysAndToken
(
cfg
ProjectReq
)
(
bool
,
error
)
{
_
,
count
,
err
:=
client
.
From
(
"project"
)
.
Select
(
"*"
,
"exact"
,
false
)
.
Eq
(
"api_key"
,
cfg
.
ApiKey
)
.
Eq
(
"api_key_secret"
,
cfg
.
ApiKeySecrect
)
.
Eq
(
"token"
,
cfg
.
Token
)
.
Eq
(
"access_token"
,
cfg
.
AccessToken
)
.
Eq
(
"access_token_secret"
,
cfg
.
AccessTokenSecret
)
.
Execute
()
...
...
@@ -94,3 +96,21 @@ func QueryProject(cfg ProjectReq) (bool, error) {
return
count
==
1
,
nil
}
func
QueryAvailableProject
()
([]
ProjectInDb
,
error
)
{
data
,
count
,
err
:=
client
.
From
(
"project"
)
.
Select
(
"*"
,
"exact"
,
false
)
.
Eq
(
"available"
,
"true"
)
.
Execute
()
if
err
!=
nil
{
return
nil
,
err
}
res
:=
make
([]
ProjectInDb
,
0
,
count
)
if
err
:=
json
.
Unmarshal
(
data
,
&
res
);
err
!=
nil
{
return
nil
,
err
}
return
res
,
nil
}
api_service.go
View file @
86bf7697
...
...
@@ -70,7 +70,7 @@ func Project(c *fiber.Ctx) error {
slog
.
Info
(
"cfg"
,
"project"
,
req
.
Project
,
"ApiKey"
,
req
.
ApiKey
,
"req.ApiKeySecrect"
,
req
.
ApiKeySecrect
,
"req.AccessToken"
,
req
.
AccessToken
,
"req.AccessTokenSecret"
,
req
.
AccessTokenSecret
,
"req.Token"
,
req
.
Token
)
ok
,
err
:=
QueryProject
(
req
)
ok
,
err
:=
QueryProject
ByKeysAndToken
(
req
)
if
err
!=
nil
{
slog
.
Error
(
"QueryProject"
,
"err"
,
err
.
Error
())
...
...
@@ -134,11 +134,59 @@ func Project(c *fiber.Ctx) error {
type
AddTaskReq
struct
{
// AddOrStop bool
User
string
`json:"user"`
User
string
`json:"user
_id
"`
TaskType
string
`json:"task_type"`
TaskId
string
`json:"task_id"`
}
func
TaskAdd
(
c
*
fiber
.
Ctx
)
error
{
//fmt.Println(string(c.Request().Body()))
slog
.
Info
(
c
.
Route
()
.
Path
,
"body"
,
string
(
c
.
Request
()
.
Body
()))
req
:=
AddTaskReq
{}
if
err
:=
json
.
Unmarshal
(
c
.
Request
()
.
Body
(),
&
req
);
err
!=
nil
{
slog
.
Error
(
"json.Unmarshal(c.Request().Body(), &req)"
,
"err"
,
err
.
Error
())
return
c
.
JSON
(
Res
{
Code
:
500
,
Msg
:
err
.
Error
(),
})
}
slog
.
Info
(
c
.
Route
()
.
Path
,
"user"
,
req
.
User
,
"TaskType"
,
req
.
TaskType
,
"TaskId"
,
req
.
TaskId
)
if
req
.
TaskType
==
""
||
req
.
TaskId
==
""
{
return
c
.
JSON
(
Res
{
Code
:
500
,
Msg
:
"must provide TaskId and TaskType"
,
})
}
//Todo
// 校验任务 条件是否存在;
// req.AddOrStop = true
err
:=
AddTaskInsertOrUpdate
(
req
)
//res, _, err := client.From("twitter_task").Insert(req, true, "", "representation", "").Execute()
if
err
!=
nil
{
slog
.
Error
(
"twitter_syncer insert"
,
"err"
,
err
.
Error
())
return
c
.
JSON
(
Res
{
Code
:
500
,
Msg
:
err
.
Error
(),
})
}
//slog.Info("twitter_syncer insert", "res", string(res))
return
c
.
JSON
(
Res
{
Code
:
200
,
})
}
func
TaskStop
(
c
*
fiber
.
Ctx
)
error
{
slog
.
Info
(
c
.
Route
()
.
Path
,
"body"
,
string
(
c
.
Request
()
.
Body
()))
...
...
db.go
View file @
86bf7697
package
main
import
(
"encoding/json"
"fmt"
"log/slog"
_
"code.wuban.net.cn/odysseus/twitter_syncer/docs"
...
...
@@ -8,6 +10,7 @@ import (
// replace with your own docs folder, usually "github.com/username/reponame/docs"
//_ "github.com/gofiber/swagger/example/docs"
"github.com/supabase-community/postgrest-go"
"github.com/supabase-community/supabase-go"
)
...
...
@@ -50,3 +53,296 @@ func init() {
// return res, nil
// }
const
FollowType
=
"followers"
const
RetweetType
=
"retweeters"
const
TweetLikingUsersType
=
"tweet_liking_users"
// func ListToQueue(done <-chan interface{}, inStream <-chan TaskIdAndList) <-chan TaskIdAndProfiles {
// outStream := make(chan TaskIdAndProfiles, 1)
// go func() {
// for {
// select {
// case <-done:
// return
// case users, ok := <-inStream:
// if ok == false {
// return
// }
// c := 0
// // if c < 100 {
// // c = c + 1
// res := make([]Profile, 0, users.List.Len())
// for e := users.List.Front(); e != nil; e = e.Next() {
// if user, ok := e.Value.(Profile); ok {
// //fmt.Printf("The data is a string: %s\n", str)
// res = append(res, user)
// c = c + 1
// if c%100 == 0 {
// fmt.Println("BackListToQueue", "len(inStream)", len(inStream), "len(outStream)", len(outStream), "len(res)", len(res))
// select {
// case <-done:
// return
// case outStream <- TaskIdAndProfiles{
// Profiles: res,
// TaskId: users.TaskId,
// TaskType: users.TaskType,
// }:
// res = make([]Profile, 0, users.List.Len())
// }
// }
// }
// }
// fmt.Println("BackListToQueue", "len(inStream)", len(inStream), "len(outStream)", len(outStream), "len(res)", len(res))
// select {
// case <-done:
// return
// case outStream <- TaskIdAndProfiles{
// Profiles: res,
// TaskId: users.TaskId,
// TaskType: users.TaskType,
// }:
// }
// // } else {
// // c = 0
// // }
// }
// }
// }()
// return outStream
// }
// func InsertOrUpdateFinishTask(done <-chan interface{}, inStream <-chan TaskIdAndProfiles) error {
// go func() {
// for {
// select {
// case <-done:
// return
// case users, ok := <-inStream:
// if ok == false {
// return
// }
// var res []byte
// var err error
// //if users.TaskType == FollowType {
// rows := make([]Follower, 0, len(users.Profiles))
// for _, user := range users.Profiles {
// sDec, _ := b64.StdEncoding.DecodeString(user.UserID)
// userId, _ := strings.CutPrefix(string(sDec), "User:")
// row := Follower{
// Follower: userId,
// UserName: user.Username,
// UserId: users.TaskId,
// }
// rows = append(rows, row)
// }
// res, _, err = client.From(users.TaskType).Insert(rows, true, "", "representation", "").Execute()
// //}
// if err != nil {
// slog.Error("insert into followers or retweeters ", err)
// for _, user := range users.Profiles {
// usersAsJson, err := json.Marshal(user)
// if err != nil {
// slog.Error("insert into followers or retweeters json.Marshal", err)
// continue
// }
// sDec, _ := b64.StdEncoding.DecodeString(user.UserID)
// userId, _ := strings.CutPrefix(string(sDec), "User:")
// slog.Error("insert into followers or retweeters error", string(usersAsJson), userId)
// }
// } else {
// slog.Info("insert into followers or retweeters", string(res), err)
// }
// fmt.Println("InsertOrUpdateUsers", "len(inStream)", len(inStream))
// }
// }
// }()
// return nil
// }
//UserTask
type
TaskRes
struct
{
UserTask
Id
int
`json:"id"`
CreatedAt
string
`json:"created_at"`
}
type
UserTaskIdAndTime
struct
{
TaskInDB
Id
int
`json:"id"`
CreatedAt
string
`json:"created_at"`
}
type
TaskInDB
struct
{
// ID int `json:"id"`
User
string
`json:"user_id"`
TaskType
string
`json:"task_type"`
TaskId
string
`json:"task_id"`
Start
bool
`json:"start"`
Stop
bool
`json:"stop"`
}
func
QueryAllTask
()
([]
TaskInDB
,
error
)
{
data
,
count
,
err
:=
client
.
From
(
"tasks"
)
.
Select
(
"*"
,
"exact"
,
false
)
.
Eq
(
"start"
,
"true"
)
.
Neq
(
"stop"
,
"true"
)
.
Execute
()
if
err
!=
nil
{
return
nil
,
err
}
_
=
count
// fmt.Println(count, string(data))
res
:=
make
([]
TaskInDB
,
0
,
count
)
if
err
:=
json
.
Unmarshal
(
data
,
&
res
);
err
!=
nil
{
return
nil
,
err
}
return
res
,
nil
}
type
TaskJob
struct
{
TaskId
string
Idx
[]
UserTask
UserId
string
TaskType
string
Config
}
func
GetTasks
()
([]
TaskJob
,
error
)
{
tasks
,
err
:=
QueryAllTask
()
if
err
!=
nil
{
return
nil
,
err
}
res
:=
make
([]
TaskJob
,
0
,
10
)
for
_
,
task
:=
range
tasks
{
data
,
count
,
err
:=
client
.
From
(
task
.
TaskType
)
.
Select
(
""
,
"user_id"
,
false
)
.
Eq
(
"task_id"
,
task
.
TaskId
)
.
Order
(
"id"
,
&
postgrest
.
OrderOpts
{
Ascending
:
false
,
// NullsFirst bool
// ForeignTable string
})
.
Range
(
0
,
10
,
""
)
.
Execute
()
if
err
!=
nil
{
slog
.
Error
(
"select * from followers error"
,
err
)
return
nil
,
err
}
_
=
count
slog
.
Info
(
"idx data"
,
"user id"
,
task
.
TaskId
,
"user name"
,
task
.
User
,
"idx"
,
data
)
fmt
.
Println
(
"idx data"
,
string
(
data
))
userRes
:=
make
([]
UserTask
,
0
,
10
)
if
err
:=
json
.
Unmarshal
(
data
,
&
userRes
);
err
!=
nil
{
return
nil
,
err
}
taskJob
:=
TaskJob
{
Idx
:
userRes
,
UserId
:
task
.
User
,
TaskType
:
task
.
TaskType
,
TaskId
:
task
.
TaskId
,
}
// taskJob := NewTaskJob(task.TaskId, task.User, task.TaskType)
taskJob
.
Idx
=
userRes
res
=
append
(
res
,
taskJob
)
}
projects
,
err
:=
QueryAvailableProject
()
if
err
!=
nil
{
return
nil
,
err
}
resWithKeys
:=
make
([]
TaskJob
,
0
,
10
)
for
_
,
v
:=
range
projects
{
for
_
,
r
:=
range
res
{
if
v
.
UserId
==
r
.
UserId
{
r
.
Config
=
v
.
Config
resWithKeys
=
append
(
resWithKeys
,
r
)
}
}
}
return
resWithKeys
,
nil
}
type
UserTask
struct
{
//user_id
TaskId
string
`json:"task_id"`
UserId
string
`json:"user_id"`
UserName
string
`json:"user_name"`
}
func
InsertTaskRes
(
content
[]
UserTask
,
tableName
string
,
taskId
string
)
error
{
rows
:=
make
([]
UserTask
,
0
,
len
(
content
))
for
_
,
v
:=
range
content
{
v
.
TaskId
=
taskId
rows
=
append
(
rows
,
v
)
}
res
,
_
,
err
:=
client
.
From
(
"retweeters"
)
.
Insert
(
rows
,
true
,
""
,
"representation"
,
""
)
.
Execute
()
if
err
!=
nil
{
return
err
}
_
=
res
return
nil
}
db_test.go
View file @
86bf7697
package
main
// func TestQueryTask(t *testing.T) {
import
"testing"
// tasks, err := QueryAllTask()
func
TestQueryTask
(
t
*
testing
.
T
)
{
// if err != nil {
// t.Fatal(err)
// }
tasks
,
err
:=
QueryAllTask
()
// for k, v := range tasks {
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
// t.Log(k, "v.User", v.User, "v.TaskType", v.TaskType, "v.TaskId", v.TaskId)
// }
for
k
,
v
:=
range
tasks
{
// }
t
.
Log
(
k
,
"v.User"
,
v
.
User
,
"v.TaskType"
,
v
.
TaskType
,
"v.TaskId"
,
v
.
TaskId
)
}
}
//GetTasks()
func
TestGetTasks
(
t
*
testing
.
T
)
{
tasks
,
err
:=
GetTasks
()
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
for
k
,
v
:=
range
tasks
{
t
.
Log
(
k
,
"v.User"
,
v
.
UserId
,
"v.TaskType"
,
v
.
TaskType
,
"v.TaskId"
,
v
.
TaskId
,
"Token"
,
v
.
Token
,
"accessToken"
,
v
.
AccessToken
)
}
}
// func TestInsertTask(t *testing.T) {
...
...
docs/swagger.yaml
View file @
86bf7697
...
...
@@ -287,38 +287,45 @@ components:
required
:
-
task_id
-
task_type
-
user_id
type
:
object
properties
:
user
:
user
_id
:
description
:
方便识别用户身份信息
type
:
string
example
:
"
OnlyDD_D
"
example
:
"
1783145144700874752
"
task_type
:
type
:
string
enum
:
-
follow
-
retweet
example
:
follow
-
retweeters
-
tweet_liking_users
-
followers
example
:
retweeters
task_id
:
description
:
user id (只使用uid https://twiteridfinder.com/) or
tweet id;
description
:
tweet id;
type
:
string
example
:
"
1
570057485914087429
"
example
:
"
1
800805503066661056
"
StopTaskReq
:
required
:
-
project
-
task_id
-
task_type
type
:
object
properties
:
task_type
:
type
:
string
enum
:
-
follow
-
retweet
example
:
follow
-
retweeters
-
tweet_liking_users
-
followers
example
:
retweeters
task_id
:
description
:
user id or full name for follow, tweet id for retweet.
type
:
string
example
:
"
1570057485914087429"
example
:
"
1800805503066661056"
user_id
:
description
:
方便识别用户身份信息
type
:
string
example
:
"
1783145144700874752"
# start:
# type: boolean
# example: true
...
...
idx.go
View file @
86bf7697
...
...
@@ -2,6 +2,7 @@ package main
import
(
"container/list"
"log/slog"
"time"
twitter
"github.com/g8rswimmer/go-twitter/v2"
...
...
@@ -29,6 +30,7 @@ func NewIdx(i []UserTask) *Idx {
return
&
Idx
{
idx
:
i
,
newIdx
:
make
([]
UserTask
,
0
,
10
),
List
:
list
.
New
(),
}
}
...
...
@@ -60,20 +62,28 @@ func (s *Idx) Idx(page []UserTask) (bool, *list.List) {
//TODO 匹配多个元素,防止用户取消;
for
k
,
v
:=
range
page
{
match
:=
false
for
ik
,
iv
:=
range
s
.
idx
{
slog
.
Info
(
"match"
,
"idx"
,
iv
.
UserId
,
"page user id"
,
v
.
UserId
,
"page user name"
,
v
.
UserName
)
if
v
.
UserId
==
iv
.
UserId
{
match
=
true
_
,
_
=
k
,
ik
}
}
newList
:=
s
.
List
s
.
List
=
list
.
New
()
//idx
s
.
idx
=
s
.
newIdx
s
.
newIdx
=
make
([]
UserTask
,
0
,
10
)
if
match
{
return
true
,
newList
}
else
{
s
.
List
.
PushFront
(
v
)
}
_
,
_
=
k
,
ik
newList
:=
s
.
List
s
.
List
=
list
.
New
()
//idx
s
.
idx
=
s
.
newIdx
s
.
newIdx
=
make
([]
UserTask
,
0
,
10
)
return
true
,
newList
}
else
{
s
.
List
.
PushFront
(
v
)
}
}
return
false
,
nil
...
...
main.go
View file @
86bf7697
...
...
@@ -25,12 +25,21 @@ func main() {
defer
close
(
done
)
//go func() {
// taskInStream, err := newSync(done)
// if err != nil {
// panic(err)
// }
// //}()
tasks
,
err
:=
GetTasks
()
if
err
!=
nil
{
slog
.
Error
(
err
.
Error
())
return
}
go
func
()
{
for
_
,
task
:=
range
tasks
{
if
err
:=
Worker
.
AddJob
(
task
);
err
!=
nil
{
slog
.
Error
(
err
.
Error
())
}
}
}()
// taskIn = taskInStream
...
...
@@ -54,7 +63,7 @@ func main() {
}))
app
.
Post
(
"/project"
,
Project
)
//
app.Post("/task/add", TaskAdd)
app
.
Post
(
"/task/add"
,
TaskAdd
)
app
.
Post
(
"/task/stop"
,
TaskStop
)
app
.
Get
(
"/verify/follower"
,
VerifyFollower
)
app
.
Get
(
"/verify/retweeter"
,
VerifyRetweeter
)
...
...
scraper.go.bak
deleted
100644 → 0
View file @
fadd4f42
package
main
//
import
(
//
//
"strings"
//
//
"time"
//
//
b64
"encoding/base64"
//
twitterscraper
"github.com/imperatrona/twitter-scraper"
//
)
//
type
NewScraper
struct
{
//
twitterscraper
.
Scraper
//
}
//
func
(
s
*
NewScraper
)
RetweetsUsers
(
tweetId
string
)
error
{
//
//
https
://
x
.
com
/
i
/
api
/
graphql
/
_D6CD5GX3VTL4ZzoAjJjGA
/
Retweeters
//
//
req
,
err
:=
s
.
newRequest
(
"Get"
,
"https://twitter.com/i/api/graphql/95Bp6m3-FTFVeZ_rPD3mZQ/Favoriters"
)
//
req
,
err
:=
s
.
newRequest
(
"Get"
,
"https://twitter.com/i/api/graphql/_D6CD5GX3VTL4ZzoAjJjGA/Retweeters"
)
//
if
err
!= nil {
//
return
err
//
}
//
req
.
Header
.
Set
(
"content-type"
,
"application/json"
)
//
/*
//
{
"tweetId"
:
"1814231701658444272"
,
"count"
:
20
,
"includePromotedContent"
:
true
}
//
*/
//
variables
:=
map
[
string
]
interface
{}{
//
"tweetId"
:
tweetId
,
//
"count"
:
20
,
//
"includePromotedContent"
:
true
,
//
}
//
features
:=
map
[
string
]
interface
{}{
//
"rweb_tipjar_consumption_enabled"
:
true
,
//
"responsive_web_graphql_exclude_directive_enabled"
:
true
,
//
"verified_phone_label_enabled"
:
false
,
//
"creator_subscriptions_tweet_preview_api_enabled"
:
true
,
//
"responsive_web_graphql_timeline_navigation_enabled"
:
true
,
//
"responsive_web_graphql_skip_user_profile_image_extensions_enabled"
:
false
,
//
"communities_web_enable_tweet_community_results_fetch"
:
true
,
//
"c9s_tweet_anatomy_moderator_badge_enabled"
:
true
,
//
"articles_preview_enabled"
:
true
,
//
"tweetypie_unmention_optimization_enabled"
:
true
,
//
"responsive_web_edit_tweet_api_enabled"
:
true
,
//
"graphql_is_translatable_rweb_tweet_is_translatable_enabled"
:
true
,
//
"view_counts_everywhere_api_enabled"
:
true
,
//
"longform_notetweets_consumption_enabled"
:
true
,
//
"responsive_web_twitter_article_tweet_consumption_enabled"
:
true
,
//
"tweet_awards_web_tipping_enabled"
:
false
,
//
"creator_subscriptions_quote_tweet_preview_enabled"
:
false
,
//
"freedom_of_speech_not_reach_fetch_enabled"
:
true
,
//
"standardized_nudges_misinfo"
:
true
,
//
"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled"
:
true
,
//
"rweb_video_timestamps_enabled"
:
true
,
//
"longform_notetweets_rich_text_read_enabled"
:
true
,
//
"longform_notetweets_inline_media_enabled"
:
true
,
//
"responsive_web_enhance_cards_enabled"
:
false
,
//
}
//
//
query
:=
url
.
Values
{}
//
//
query
.
Set
(
"variables"
,
mapToJSONString
(
variables
))
//
//
query
.
Set
(
"features"
,
mapToJSONString
(
features
))
//
//
req
.
URL
.
RawQuery
=
query
.
Encode
()
//
query
:=
url
.
Values
{}
//
query
.
Set
(
"variables"
,
mapToJSONString
(
variables
))
//
query
.
Set
(
"features"
,
mapToJSONString
(
features
))
//
req
.
URL
.
RawQuery
=
query
.
Encode
()
//
//
body
:=
map
[
string
]
interface
{}{
//
//
"variables"
:
variables
,
//
//
"queryId"
:
"ZYKSe-w7KEslx3JhSIk5LA"
,
//
//
}
//
//
b
,
_
:=
json
.
Marshal
(
body
)
//
//
req
.
Body
=
io
.
NopCloser
(
bytes
.
NewReader
(
b
))
//
//
var
response
struct
{
//
//
Data
struct
{
//
//
UnfavoriteTweet
string
`
json
:
"unfavorite_tweet"
`
//
//
}
`
json
:
"data"
`
//
//
Errors
[]
struct
{
//
//
Message
string
`
json
:
"message"
`
//
//
Code
int
`
json
:
"code"
`
//
//
}
`
json
:
"errors"
`
//
//
}
//
err
=
s
.
RequestAPI
(
req
,
nil
)
//
if
err
!= nil {
//
return
err
//
}
//
//
if
len
(
response
.
Errors
)
>
0
&&
response
.
Errors
[
0
].
Code
==
144
{
//
//
return
errors
.
New
(
"tweet already not liked"
)
//
//
}
//
//
if
response
.
Data
.
UnfavoriteTweet
!= "Done" {
//
//
return
errors
.
New
(
"unknown error"
)
//
//
}
//
return
nil
//
}
stream_db.go
deleted
100644 → 0
View file @
fadd4f42
package
main
const
FollowType
=
"followers"
const
RetweetType
=
"retweeters"
const
TweetLikingUsersType
=
"tweet_liking_users"
// func ListToQueue(done <-chan interface{}, inStream <-chan TaskIdAndList) <-chan TaskIdAndProfiles {
// outStream := make(chan TaskIdAndProfiles, 1)
// go func() {
// for {
// select {
// case <-done:
// return
// case users, ok := <-inStream:
// if ok == false {
// return
// }
// c := 0
// // if c < 100 {
// // c = c + 1
// res := make([]Profile, 0, users.List.Len())
// for e := users.List.Front(); e != nil; e = e.Next() {
// if user, ok := e.Value.(Profile); ok {
// //fmt.Printf("The data is a string: %s\n", str)
// res = append(res, user)
// c = c + 1
// if c%100 == 0 {
// fmt.Println("BackListToQueue", "len(inStream)", len(inStream), "len(outStream)", len(outStream), "len(res)", len(res))
// select {
// case <-done:
// return
// case outStream <- TaskIdAndProfiles{
// Profiles: res,
// TaskId: users.TaskId,
// TaskType: users.TaskType,
// }:
// res = make([]Profile, 0, users.List.Len())
// }
// }
// }
// }
// fmt.Println("BackListToQueue", "len(inStream)", len(inStream), "len(outStream)", len(outStream), "len(res)", len(res))
// select {
// case <-done:
// return
// case outStream <- TaskIdAndProfiles{
// Profiles: res,
// TaskId: users.TaskId,
// TaskType: users.TaskType,
// }:
// }
// // } else {
// // c = 0
// // }
// }
// }
// }()
// return outStream
// }
// func InsertOrUpdateFinishTask(done <-chan interface{}, inStream <-chan TaskIdAndProfiles) error {
// go func() {
// for {
// select {
// case <-done:
// return
// case users, ok := <-inStream:
// if ok == false {
// return
// }
// var res []byte
// var err error
// //if users.TaskType == FollowType {
// rows := make([]Follower, 0, len(users.Profiles))
// for _, user := range users.Profiles {
// sDec, _ := b64.StdEncoding.DecodeString(user.UserID)
// userId, _ := strings.CutPrefix(string(sDec), "User:")
// row := Follower{
// Follower: userId,
// UserName: user.Username,
// UserId: users.TaskId,
// }
// rows = append(rows, row)
// }
// res, _, err = client.From(users.TaskType).Insert(rows, true, "", "representation", "").Execute()
// //}
// if err != nil {
// slog.Error("insert into followers or retweeters ", err)
// for _, user := range users.Profiles {
// usersAsJson, err := json.Marshal(user)
// if err != nil {
// slog.Error("insert into followers or retweeters json.Marshal", err)
// continue
// }
// sDec, _ := b64.StdEncoding.DecodeString(user.UserID)
// userId, _ := strings.CutPrefix(string(sDec), "User:")
// slog.Error("insert into followers or retweeters error", string(usersAsJson), userId)
// }
// } else {
// slog.Info("insert into followers or retweeters", string(res), err)
// }
// fmt.Println("InsertOrUpdateUsers", "len(inStream)", len(inStream))
// }
// }
// }()
// return nil
// }
// type UserTaskIdAndTime struct {
// UserTask
// Id int `json:"id"`
// CreatedAt string `json:"created_at"`
// }
type
TaskInDB
struct
{
// ID int `json:"id"`
User
string
`json:"user"`
TaskType
string
`json:"task_type"`
TaskId
string
`json:"task_id"`
Start
bool
`json:"start"`
Stop
bool
`json:"stop"`
}
// func QueryAllTask() ([]TaskInDB, error) {
// data, count, err := client.From("tasks").Select("*", "exact", false).
// Eq("start", "true").Neq("stop", "true").
// Execute()
// if err != nil {
// return nil, err
// }
// _ = count
// // fmt.Println(count, string(data))
// res := make([]TaskInDB, 0, count)
// if err := json.Unmarshal(data, &res); err != nil {
// return nil, err
// }
// return res, nil
// }
// func GetTasksIdxWithTaskType() ([]*TaskJob, error) {
// tasks, err := QueryAllTask()
// if err != nil {
// return nil, err
// }
// res := make([]*TaskJob, 0, 10)
// for _, task := range tasks {
// //if task.TaskType == FollowType {
// data, count, err := client.From(task.TaskType).Select("", "user_id", false).
// Eq("user_id", task.TaskId).
// Order("id", &postgrest.OrderOpts{
// Ascending: false,
// // NullsFirst bool
// // ForeignTable string
// }).Range(0, 10, "").Execute()
// if err != nil {
// slog.Error("select * from followers error", err)
// return nil, err
// }
// _ = count
// slog.Info("idx data", "user id", task.TaskId, "user name", task.User, "idx", data)
// fmt.Println("idx data", string(data))
// userRes := make([]UserTaskIdAndTime, 0, 10)
// if err := json.Unmarshal(data, &userRes); err != nil {
// return nil, err
// }
// taskJob := NewTaskJob(task.TaskId, task.User, task.TaskType)
// taskJob.Idx = userRes
// res = append(res, taskJob)
// }
// return res, nil
// }
type
UserTask
struct
{
//user_id
TaskId
string
`json:"task_id"`
UserId
string
`json:"user_id"`
UserName
string
`json:"user_name"`
}
func
InsertTaskRes
(
content
UserTask
,
tableName
string
)
error
{
res
,
_
,
err
:=
client
.
From
(
"retweeters"
)
.
Insert
(
content
,
true
,
""
,
"representation"
,
""
)
.
Execute
()
if
err
!=
nil
{
return
err
}
_
=
res
return
nil
}
task.go
0 → 100644
View file @
86bf7697
package
main
import
(
"fmt"
"log/slog"
"sync"
)
type
Work
struct
{
Lock
sync
.
Mutex
Task
map
[
string
]
chan
<-
interface
{}
}
var
Worker
Work
func
init
()
{
Worker
=
Work
{
Task
:
make
(
map
[
string
]
chan
<-
interface
{}),
}
}
func
(
w
*
Work
)
AddJob
(
t
TaskJob
)
error
{
w
.
Lock
.
Lock
()
if
_
,
ok
:=
w
.
Task
[
t
.
UserId
+
"-"
+
t
.
TaskType
];
ok
{
return
fmt
.
Errorf
(
"%s has run"
,
t
.
UserId
+
"-"
+
t
.
TaskId
+
"-"
+
t
.
TaskType
)
}
done
:=
w
.
RunJob
(
t
)
w
.
Task
[
t
.
UserId
+
"-"
+
t
.
TaskType
]
=
done
w
.
Lock
.
Unlock
()
return
nil
}
func
(
w
*
Work
)
RunJob
(
t
TaskJob
)
chan
<-
interface
{}
{
done
:=
make
(
chan
interface
{})
go
func
()
{
cli
:=
NewRetweeterClient
(
t
.
Config
)
page
:=
NewPageUsers
(
NewIdx
(
t
.
Idx
))
for
{
select
{
case
_
,
ok
:=
<-
done
:
if
!
ok
{
return
}
default
:
var
f
req
if
t
.
TaskType
==
RetweetType
{
f
=
cli
.
Retweeters
}
else
{
f
=
cli
.
TweetLikingUsers
}
users
,
err
:=
page
.
Request
(
t
.
TaskId
,
""
,
f
)
if
err
!=
nil
{
slog
.
Error
(
" page.Request"
,
"task id"
,
t
.
TaskId
,
"t.TaskType"
,
t
.
TaskType
,
"err"
,
err
.
Error
())
continue
}
if
err
:=
InsertTaskRes
(
users
,
t
.
TaskType
,
t
.
TaskId
);
err
!=
nil
{
for
k
,
v
:=
range
users
{
fmt
.
Println
(
k
,
v
.
UserId
,
v
.
UserName
)
}
slog
.
Error
(
"InsertTaskRes"
,
"task id"
,
t
.
TaskId
,
"t.TaskType"
,
t
.
TaskType
,
"len(users)"
,
len
(
users
),
"err"
,
err
.
Error
())
}
slog
.
Info
(
"InsertTaskRes"
,
"task id"
,
t
.
TaskId
,
"t.TaskType"
,
t
.
TaskType
,
"len(users)"
,
len
(
users
))
}
}
}()
return
done
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment