Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
twitter_syncer
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Odysseus
twitter_syncer
Commits
e72fa1c2
Commit
e72fa1c2
authored
Oct 18, 2024
by
vicotor
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update task param
parent
b425d85a
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
313 additions
and
409 deletions
+313
-409
accounts_test.go
accounts_test.go
+109
-5
api_db.go
api_db.go
+2
-2
api_service.go
api_service.go
+62
-105
client.go
client.go
+20
-111
client_test.go
client_test.go
+25
-5
db.go
db.go
+36
-179
db_test.go
db_test.go
+57
-1
main.go
main.go
+1
-0
task.go
task.go
+1
-1
No files found.
accounts_test.go
View file @
e72fa1c2
package
main
import
(
"context"
"encoding/json"
"golang.org/x/time/rate"
"net/http"
"os"
"strings"
"testing"
"time"
...
...
@@ -98,12 +101,9 @@ func TestGetLoginAccount(t *testing.T) {
t
.
Log
(
"login account ok"
)
for
k
,
v
:=
range
accounts
{
for
_
,
v
:=
range
accounts
{
//v.Timer = time.NewTimer(time.Duration(k) * time.Duration(5) * time.Minute)
accChan
<-
ScraperWithTimer
{
Scraper
:
v
,
Timer
:
time
.
NewTimer
(
time
.
Duration
(
k
)
*
time
.
Duration
(
5
)
*
time
.
Minute
),
}
accChan
<-
v
}
for
{
...
...
@@ -152,3 +152,107 @@ func TestCookies(t *testing.T) {
// }
}
func
TestRateLimiter
(
t
*
testing
.
T
)
{
rl
:=
rate
.
NewLimiter
(
rate
.
Limit
(
1.0
/
60
),
10
)
ctx
:=
context
.
TODO
()
for
i
:=
0
;
i
<
20
;
i
++
{
rl
.
Wait
(
ctx
)
t
.
Log
(
i
,
time
.
Now
())
time
.
Sleep
(
1
*
time
.
Second
)
}
}
func
TestLogin
(
t
*
testing
.
T
)
{
//userCookies := make(map[string][]*http.Cookie)
info
:=
[]
struct
{
Username
string
`json:"username"`
Password
string
`json:"password"`
Email
string
`json:"email"`
F2A
string
`json:"f2a"`
}{
//{"EulaSusann56500", "SuIZROXhN14D", "szahcqejrt@rambler.ru", "XDC6WMJEH4ILTTXN"},
{
"MShondra82485"
,
"B7P38LYzz"
,
"mvwadpmfxr@rambler.ru"
,
"I6XQJHQMUUL5MDXX"
},
//{"KylieBritt10079", "up1kj10rgjkry", "xttuuxbtdc@rambler.ru", "GDZ6QWDQ5Q2KDOE5"},
}
tasks
:=
make
(
chan
string
,
1000
)
type
HistoryInfo
struct
{
Users
[]
string
Next
string
}
filter
:=
func
(
users
[]
string
,
history
*
HistoryInfo
)
[]
string
{
if
history
==
nil
{
return
users
}
m
:=
make
(
map
[
string
]
bool
)
for
_
,
v
:=
range
history
.
Users
{
m
[
v
]
=
true
}
newusers
:=
make
([]
string
,
0
,
1000
)
for
_
,
v
:=
range
users
{
if
_
,
ok
:=
m
[
v
];
!
ok
{
newusers
=
append
(
newusers
,
v
)
}
}
return
newusers
}
history
:=
make
(
map
[
string
]
*
HistoryInfo
)
tasks
<-
"sager"
accounts
:=
make
(
map
[
string
]
*
twitterscraper
.
Scraper
)
for
_
,
v
:=
range
info
{
scraper
:=
twitterscraper
.
New
()
code
,
err
:=
generateTOTP
(
v
.
F2A
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
{
if
err
:=
scraper
.
AutoLogin
(
v
.
Username
,
v
.
Password
,
v
.
Email
,
code
);
err
!=
nil
{
t
.
Error
(
err
)
continue
}
else
{
t
.
Log
(
"login success"
)
accounts
[
v
.
Username
]
=
scraper
}
}
}
for
{
select
{
case
task
:=
<-
tasks
:
tHistory
:=
history
[
task
]
if
tHistory
==
nil
{
tHistory
=
&
HistoryInfo
{}
}
for
k
,
v
:=
range
accounts
{
newusers
:=
make
([]
string
,
0
,
1000
)
users
,
next
,
err
:=
v
.
FetchFollowers
(
task
,
1000
,
tHistory
.
Next
)
if
err
!=
nil
{
t
.
Error
(
"fetch followers error"
,
err
,
"now"
,
time
.
Now
())
if
strings
.
Contains
(
err
.
Error
(),
"429"
)
{
time
.
Sleep
(
1
*
time
.
Minute
)
}
continue
}
for
_
,
u
:=
range
users
{
newusers
=
append
(
newusers
,
u
.
UserID
)
}
tHistory
.
Users
=
append
(
tHistory
.
Users
,
filter
(
newusers
,
tHistory
)
...
)
tHistory
.
Next
=
next
if
len
(
users
)
==
0
{
t
.
Logf
(
"query follower with username:%s, next:%s, newuser=0"
,
k
,
next
)
}
else
{
t
.
Logf
(
"query follower with username:%s, next:%s, newuser[0]=%s, newusercount=%d"
,
k
,
next
,
users
[
0
]
.
Username
,
len
(
users
))
}
time
.
Sleep
(
1
*
time
.
Second
)
}
history
[
task
]
=
tHistory
tasks
<-
task
}
}
}
api_db.go
View file @
e72fa1c2
...
...
@@ -36,13 +36,13 @@ func VerifyLikeInDb(tweetId, userId string) (bool, error) {
}
func
AddTaskInsert
(
req
AddTaskReq
,
followerCount
int
)
error
{
task
:=
TaskInDB
{
User
:
req
.
User
,
TaskType
:
req
.
TaskType
,
TaskId
:
req
.
TaskId
,
Start
:
true
,
Stop
:
false
,
ApiConfig
:
parseToApiConfig
(
req
.
ApiConfig
),
FollowerCount
:
followerCount
,
}
...
...
@@ -66,7 +66,7 @@ func UpdateFollowerTaskCount(taskId, taskType, userId string, fc int) error {
return
err
}
func
StopTaskUpdate
(
req
Add
TaskReq
)
error
{
func
StopTaskUpdate
(
req
Stop
TaskReq
)
error
{
//res, _, err := client.From("tasks").Insert(task, true, "", "representation", "").Execute()
...
...
api_service.go
View file @
e72fa1c2
...
...
@@ -54,6 +54,36 @@ type ProjectReq struct {
Project
string
`json:"project"`
}
func
GetConfigOwner
(
c
*
fiber
.
Ctx
)
error
{
slog
.
Info
(
c
.
Route
()
.
Path
,
"body"
,
string
(
c
.
Request
()
.
Body
()))
reqCfg
:=
Config
{}
if
err
:=
json
.
Unmarshal
(
c
.
Request
()
.
Body
(),
&
reqCfg
);
err
!=
nil
{
slog
.
Error
(
"json.Unmarshal(c.Request().Body(), &req)"
,
"err"
,
err
.
Error
())
return
c
.
JSON
(
Res
{
Code
:
500
,
Msg
:
err
.
Error
(),
})
}
slog
.
Info
(
"get config owner info"
,
"config"
,
reqCfg
)
user
,
err
:=
NewClient
(
reqCfg
,
nil
)
.
MyInfo
()
if
err
!=
nil
{
slog
.
Error
(
"MyInfo"
,
"err"
,
err
.
Error
())
return
c
.
JSON
(
Res
{
Code
:
500
,
Msg
:
err
.
Error
(),
})
}
return
c
.
JSON
(
ProjectRes
{
Code
:
200
,
Data
:
user
,
})
}
func
Project
(
c
*
fiber
.
Ctx
)
error
{
slog
.
Info
(
c
.
Route
()
.
Path
,
"body"
,
string
(
c
.
Request
()
.
Body
()))
...
...
@@ -89,62 +119,36 @@ func Project(c *fiber.Ctx) error {
}
cli
:=
NewClient
(
req
.
Config
,
nil
)
me
,
err
:=
cli
.
Me
()
user
,
err
:=
NewClient
(
req
.
Config
,
nil
)
.
MyInfo
()
if
err
!=
nil
{
slog
.
Error
(
"
me
"
,
"err"
,
err
.
Error
())
slog
.
Error
(
"
MyInfo
"
,
"err"
,
err
.
Error
())
return
c
.
JSON
(
ProjectRes
{
Code
:
500
,
Msg
:
err
.
Error
(),
})
}
for
_
,
v
:=
range
me
{
user
:=
UserInfo
{
UserId
:
v
.
User
.
ID
,
UserName
:
v
.
User
.
UserName
,
Name
:
v
.
User
.
Name
,
}
if
err
:=
AddOrUpdateProject
(
req
,
user
);
err
!=
nil
{
slog
.
Error
(
"insert db "
,
"err"
,
err
.
Error
())
return
c
.
JSON
(
ProjectRes
{
Code
:
500
,
Msg
:
err
.
Error
(),
})
}
if
err
:=
AddOrUpdateProject
(
req
,
user
);
err
!=
nil
{
slog
.
Error
(
"insert db "
,
"err"
,
err
.
Error
())
return
c
.
JSON
(
ProjectRes
{
Code
:
200
,
Data
:
UserInfo
{
UserId
:
v
.
User
.
ID
,
UserName
:
v
.
User
.
UserName
,
Name
:
v
.
User
.
Name
,
},
Code
:
500
,
Msg
:
err
.
Error
(),
})
}
return
c
.
JSON
(
ProjectRes
{
Code
:
5
00
,
Msg
:
"can not find out the user info with me API"
,
Code
:
2
00
,
Data
:
user
,
})
}
type
AddTaskReq
struct
{
// AddOrStop bool
User
string
`json:"user_id"`
TaskType
string
`json:"task_type"`
TaskId
string
`json:"task_id"`
//FollowCount int
User
string
`json:"user_id"`
TaskType
string
`json:"task_type"`
TaskId
string
`json:"task_id"`
ApiConfig
Config
`json:"config"`
}
func
TaskAdd
(
c
*
fiber
.
Ctx
)
error
{
//fmt.Println(string(c.Request().Body()))
slog
.
Info
(
c
.
Route
()
.
Path
,
"body"
,
string
(
c
.
Request
()
.
Body
()))
req
:=
AddTaskReq
{}
...
...
@@ -157,7 +161,7 @@ func TaskAdd(c *fiber.Ctx) error {
})
}
slog
.
Info
(
c
.
Route
()
.
Path
,
"user"
,
req
.
User
,
"TaskType"
,
req
.
TaskType
,
"TaskId"
,
req
.
TaskId
)
slog
.
Info
(
c
.
Route
()
.
Path
,
"user"
,
req
.
User
,
"TaskType"
,
req
.
TaskType
,
"TaskId"
,
req
.
TaskId
,
"config"
,
req
.
ApiConfig
)
if
req
.
TaskType
==
""
||
req
.
TaskId
==
""
{
return
c
.
JSON
(
Res
{
...
...
@@ -165,83 +169,50 @@ func TaskAdd(c *fiber.Ctx) error {
Msg
:
"must provide TaskId and TaskType"
,
})
}
//Todo
// 校验任务 条件是否存在;
projects
,
ok
,
err
:=
QueryProjectByUserId
(
req
.
User
)
// check the config is valid or not.
user
,
err
:=
NewClient
(
req
.
ApiConfig
,
nil
)
.
MyInfo
()
if
err
!=
nil
{
return
c
.
JSON
(
Res
{
Code
:
500
,
Msg
:
"
QueryProjectByUserId "
+
req
.
User
,
Msg
:
"
get api owner info failed "
+
err
.
Error
()
,
})
}
if
!
ok
{
if
user
.
UserId
!=
req
.
User
{
return
c
.
JSON
(
Res
{
Code
:
500
,
Msg
:
"
can not find the user keys and token info "
+
req
.
User
,
Msg
:
"
user_id not match with the api owner"
,
})
}
ok
,
err
=
CheckTaskExist
(
req
.
User
,
req
.
TaskId
,
req
.
TaskType
)
if
err
!=
nil
{
// 校验任务 条件是否存在;
if
exist
,
err
:=
CheckTaskExist
(
req
.
User
,
req
.
TaskId
,
req
.
TaskType
);
err
!=
nil
{
return
c
.
JSON
(
Res
{
Code
:
500
,
Msg
:
"QCheckTask "
+
req
.
User
,
})
}
if
ok
{
}
else
if
exist
{
return
c
.
JSON
(
Res
{
Code
:
500
,
Msg
:
"task already existed"
,
})
}
fc
:=
0
fc
:=
0
if
req
.
TaskType
==
FollowType
{
// do some check.
// follower task count < available account count.
ok
,
err
:=
CheckFollowerTaskAndAccountCount
()
if
err
!=
nil
{
if
passed
,
err
:=
CheckFollowerTaskAndAccountCount
();
err
!=
nil
{
return
c
.
JSON
(
Res
{
Code
:
500
,
Msg
:
"CheckFollowerTaskAndAccountCount "
+
req
.
User
,
})
}
if
!
ok
{
}
else
if
!
passed
{
return
c
.
JSON
(
Res
{
Code
:
500
,
Msg
:
"CheckFollowerTaskAndAccountCount !ok"
,
})
}
ok
,
err
=
QueryProjectByUserIdAndName
(
req
.
TaskId
,
req
.
User
)
if
err
!=
nil
{
return
c
.
JSON
(
Res
{
Code
:
500
,
Msg
:
"QueryProjectByUserIdAndName "
+
req
.
User
,
})
}
if
!
ok
{
return
c
.
JSON
(
Res
{
Code
:
500
,
Msg
:
"QueryProjectByUserIdAndName !ok"
,
})
}
fc
,
err
=
NewFollowerOb
()
.
TryProfileFollowerCount
(
req
.
TaskId
)
if
err
!=
nil
{
return
c
.
JSON
(
Res
{
Code
:
500
,
...
...
@@ -253,9 +224,8 @@ func TaskAdd(c *fiber.Ctx) error {
// create a taskJob.
job
:=
TaskJob
{
Config
:
projects
[
0
]
.
Config
,
Idx
:
make
([]
UserTask
,
0
),
Config
:
req
.
ApiConfig
,
Idx
:
make
([]
UserTask
,
0
),
UserId
:
req
.
User
,
TaskId
:
req
.
TaskId
,
TaskType
:
req
.
TaskType
,
...
...
@@ -288,30 +258,17 @@ func TaskAdd(c *fiber.Ctx) error {
})
}
// func CheckErrOk(ok bool, err error, c *fiber.Ctx) error {
// if err != nil {
// return c.JSON(Res{
// Code: 500,
// Msg: "QCheckTask " + req.User,
// })
// }
// if ok {
// return c.JSON(Res{
// Code: 500,
// Msg: "task already existed",
// })
// }
// }
type
StopTaskReq
struct
{
User
string
`json:"user_id"`
TaskType
string
`json:"task_type"`
TaskId
string
`json:"task_id"`
}
func
TaskStop
(
c
*
fiber
.
Ctx
)
error
{
slog
.
Info
(
c
.
Route
()
.
Path
,
"body"
,
string
(
c
.
Request
()
.
Body
()))
req
:=
Add
TaskReq
{}
req
:=
Stop
TaskReq
{}
if
err
:=
json
.
Unmarshal
(
c
.
Request
()
.
Body
(),
&
req
);
err
!=
nil
{
slog
.
Error
(
"json.Unmarshal(c.Request().Body(), &req)"
,
"err"
,
err
.
Error
())
...
...
client.go
View file @
e72fa1c2
...
...
@@ -145,6 +145,26 @@ func NewOAuth2Client0817() *Client {
}
}
func
(
c
*
Client
)
MyInfo
()
(
UserInfo
,
error
)
{
me
,
err
:=
c
.
Me
()
if
err
!=
nil
{
return
UserInfo
{},
err
}
for
_
,
v
:=
range
me
{
user
:=
UserInfo
{
UserId
:
v
.
User
.
ID
,
UserName
:
v
.
User
.
UserName
,
Name
:
v
.
User
.
Name
,
}
return
user
,
nil
}
return
UserInfo
{},
fmt
.
Errorf
(
"no user info found"
)
}
func
(
c
*
Client
)
Retweeters
(
tweetId
string
,
next
string
)
([]
*
twitter
.
UserObj
,
string
,
*
twitter
.
RateLimit
,
error
)
{
// ctx is generated here only to use with Ratelimiter
...
...
@@ -227,114 +247,3 @@ func (c *Client) Me() (map[string]*twitter.UserDictionary, error) {
return
dictionaries
,
nil
}
/*
先不考虑单个app cli异常;
确认根据数量 交替,还是根据时间;优先时间;
*/
type
TwoClient
struct
{
cli1
*
Client
cli2
*
Client
//queue chan *Client
count
int
}
func
NewTwoClient
()
{
//先确认,周期是怎么计算的,确定能不能交叉;
}
func
(
c
*
TwoClient
)
Retweeters
(
tweetId
string
,
next
string
)
([]
*
twitter
.
UserObj
,
string
,
*
twitter
.
RateLimit
,
error
)
{
//for c := range c.queue {
// return c.Retweeters(tweetId, next)
//}
if
c
.
count
<
6
{
return
c
.
cli1
.
Retweeters
(
tweetId
,
next
)
}
return
c
.
cli1
.
Retweeters
(
tweetId
,
next
)
//return nil, "", nil, fmt.Errorf("two client queue has closed")
}
func
(
c
*
TwoClient
)
TweetLikingUsers
(
tweetId
string
,
next
string
)
([]
*
twitter
.
UserObj
,
string
,
*
twitter
.
RateLimit
,
error
)
{
return
nil
,
""
,
nil
,
fmt
.
Errorf
(
"two client queue has closed"
)
}
func
(
c
*
TwoClient
)
Me
()
(
map
[
string
]
*
twitter
.
UserDictionary
,
error
)
{
// for c := range c.queue {
// return c.Me()
// }
return
nil
,
fmt
.
Errorf
(
"two client queue has closed"
)
}
/*
func NewOAuth2ClientSelf() *Client {
twitterAPIKey := "Ufhj9NggOmRb61LTYUinaDHws" //os.Getenv("TWITTER_API_KEY")
twitterAPIKeySecret := "IfsfhxpyKqmYaEkyB89uH5tT8Ma77FJqrB0BsFN7uUnNX0UZ4B" //os.Getenv("TWITTER_API_KEY_SECRET")
oauth1Config := oauth1.NewConfig(twitterAPIKey, twitterAPIKeySecret)
twitterAccessToken := "1823984946710765569-9Nj7JZaBKQQiTnSyFiOBC3ADnepQqR" //os.Getenv("TWITTER_ACCESS_TOKEN")
twitterAccessTokenSecret := "MltXbwW8Rrb6DJKJo3qnG3lHMUWZ6ILCcqFnujfHrZ875" //os.Getenv("TWITTER_ACCESS_TOKEN_SECRET")
twitterHttpClient := oauth1Config.Client(oauth1.NoContext, &oauth1.Token{
Token: twitterAccessToken,
TokenSecret: twitterAccessTokenSecret,
})
twitterClient := &twitter.Client{
Authorizer: authorize{
Token: "AAAAAAAAAAAAAAAAAAAAAD6AvQEAAAAApN304Hsb89%2FMWG2RoLfSEgb2RS0%3DVQin0pVyPOsOBkCPFoy4wQKOXh3nBvoxMqQ6dc7ulaJ2anvoCm",
},
Client: twitterHttpClient,
Host: "https://api.twitter.com",
}
return &Client{
Client: twitterClient,
}
}
func NewOAuth2Client0816() *Client {
twitterAPIKey := "lVnj6Ox9HPcI4LwArSSYU7Pba" //os.Getenv("TWITTER_API_KEY")
twitterAPIKeySecret := "QMSnWG4QwyXWBVW2hQazzxhw9cSjd32CDfXGkg2DEaUUdscCRZ" //os.Getenv("TWITTER_API_KEY_SECRET")
oauth1Config := oauth1.NewConfig(twitterAPIKey, twitterAPIKeySecret)
twitterAccessToken := "1783145144700874752-TqHrsFUL20fEz4nz71yYlYVihkGmZn" //os.Getenv("TWITTER_ACCESS_TOKEN")
twitterAccessTokenSecret := "QDmtDfsiMigTJk1iqoyq8zCHNQJq5zCeC560NH9T5yUZl" //os.Getenv("TWITTER_ACCESS_TOKEN_SECRET")
twitterHttpClient := oauth1Config.Client(oauth1.NoContext, &oauth1.Token{
Token: twitterAccessToken,
TokenSecret: twitterAccessTokenSecret,
})
twitterClient := &twitter.Client{
Authorizer: authorize{
Token: "AAAAAAAAAAAAAAAAAAAAAEaPvQEAAAAAWDyrWaIbZHPYeg3ifnvXWdlylvs%3D7XFIO4y2HA0suNxLv570AsaIfmWD4x6XB64zHd9saEqVAhuTMq",
},
Client: twitterHttpClient,
Host: "https://api.twitter.com",
}
return &Client{
Client: twitterClient,
}
}
*/
client_test.go
View file @
e72fa1c2
...
...
@@ -2,6 +2,7 @@ package main
import
(
"encoding/json"
"fmt"
"testing"
)
...
...
@@ -32,11 +33,11 @@ func TestMe(t *testing.T) {
func
TestCfg
(
t
*
testing
.
T
)
{
cfg
:=
Config
{
ApiKey
:
"
lVnj6Ox9HPcI4LwArSSYU7Pba
"
,
ApiKeySecrect
:
"
QMSnWG4QwyXWBVW2hQazzxhw9cSjd32CDfXGkg2DEaUUdscCRZ
"
,
AccessToken
:
"1783145144700874752-
TqHrsFUL20fEz4nz71yYlYVihkGmZn
"
,
AccessTokenSecret
:
"
QDmtDfsiMigTJk1iqoyq8zCHNQJq5zCeC560NH9T5yUZl
"
,
Token
:
"AAAAAAAAAAAAAAAAAAAAA
EaPvQEAAAAAWDyrWaIbZHPYeg3ifnvXWdlylvs%3D7XFIO4y2HA0suNxLv570AsaIfmWD4x6XB64zHd9saEqVAhuTMq
"
,
ApiKey
:
"
u5HOlaBhMFNqXbs7lznEuUQVx
"
,
ApiKeySecrect
:
"
VhpTsl4TJUQi9FSAymajDCWfpgyJoK2d18i4lX9sPGiWc462nR
"
,
AccessToken
:
"1783145144700874752-
F38iX5PVmnOe3RHKwB0mc8tj4vObaT
"
,
AccessTokenSecret
:
"
O8BhSezZA8M8Jlz0rZJReBsV9HZO420iWdU58wCG2BXaA
"
,
Token
:
"AAAAAAAAAAAAAAAAAAAAA
IZTwQEAAAAApSMHaI24HWCEVYV0mx%2F2HauZHI0%3D6GrtIbJbiAwGJr4rZIz4WdrTLO6qI4zRGN4ZKpaOqUbwRRauPI
"
,
}
cfgAsJson
,
err
:=
json
.
Marshal
(
cfg
)
...
...
@@ -48,3 +49,22 @@ func TestCfg(t *testing.T) {
t
.
Log
(
string
(
cfgAsJson
))
}
func
TestGetLikeUsers
(
t
*
testing
.
T
)
{
cfg
:=
Config
{
ApiKey
:
"u5HOlaBhMFNqXbs7lznEuUQVx"
,
ApiKeySecrect
:
"VhpTsl4TJUQi9FSAymajDCWfpgyJoK2d18i4lX9sPGiWc462nR"
,
AccessToken
:
"1783145144700874752-F38iX5PVmnOe3RHKwB0mc8tj4vObaT"
,
AccessTokenSecret
:
"O8BhSezZA8M8Jlz0rZJReBsV9HZO420iWdU58wCG2BXaA"
,
Token
:
"AAAAAAAAAAAAAAAAAAAAAIZTwQEAAAAApSMHaI24HWCEVYV0mx%2F2HauZHI0%3D6GrtIbJbiAwGJr4rZIz4WdrTLO6qI4zRGN4ZKpaOqUbwRRauPI"
,
}
cc
:=
NewLikeClient
(
cfg
)
users
,
nextToken
,
nextTime
,
err
:=
cc
.
TweetLikingUsers
(
"1843181642300674228"
,
""
)
if
err
!=
nil
{
t
.
Error
(
err
)
}
t
.
Log
(
"nextToken"
,
nextToken
,
"nextTime"
,
nextTime
)
for
_
,
user
:=
range
users
{
fmt
.
Printf
(
"user: %s, %s
\n
"
,
user
.
ID
,
user
.
UserName
)
}
}
db.go
View file @
e72fa1c2
...
...
@@ -46,165 +46,10 @@ func init() {
}
}
// func TwitterAccountFromDB() ([]TwitterAccount, error) {
// data, count, err := client.From("account").Select("*", "exact", false).Eq("available", "true").Execute()
// if err != nil {
// return nil, err
// }
// slog.Info("TwitterAccountFromDB", "count", count)
// res := make([]TwitterAccount, 0, count)
// if err := json.Unmarshal(data, &res); err != nil {
// return nil, err
// }
// return res, nil
// }
const
FollowType
=
"followers"
const
RetweetType
=
"retweeters"
const
TweetLikingUsersType
=
"tweet_liking_users"
// func ListToQueue(done <-chan interface{}, inStream <-chan TaskIdAndList) <-chan TaskIdAndProfiles {
// outStream := make(chan TaskIdAndProfiles, 1)
// go func() {
// for {
// select {
// case <-done:
// return
// case users, ok := <-inStream:
// if ok == false {
// return
// }
// c := 0
// // if c < 100 {
// // c = c + 1
// res := make([]Profile, 0, users.List.Len())
// for e := users.List.Front(); e != nil; e = e.Next() {
// if user, ok := e.Value.(Profile); ok {
// //fmt.Printf("The data is a string: %s\n", str)
// res = append(res, user)
// c = c + 1
// if c%100 == 0 {
// fmt.Println("BackListToQueue", "len(inStream)", len(inStream), "len(outStream)", len(outStream), "len(res)", len(res))
// select {
// case <-done:
// return
// case outStream <- TaskIdAndProfiles{
// Profiles: res,
// TaskId: users.TaskId,
// TaskType: users.TaskType,
// }:
// res = make([]Profile, 0, users.List.Len())
// }
// }
// }
// }
// fmt.Println("BackListToQueue", "len(inStream)", len(inStream), "len(outStream)", len(outStream), "len(res)", len(res))
// select {
// case <-done:
// return
// case outStream <- TaskIdAndProfiles{
// Profiles: res,
// TaskId: users.TaskId,
// TaskType: users.TaskType,
// }:
// }
// // } else {
// // c = 0
// // }
// }
// }
// }()
// return outStream
// }
// func InsertOrUpdateFinishTask(done <-chan interface{}, inStream <-chan TaskIdAndProfiles) error {
// go func() {
// for {
// select {
// case <-done:
// return
// case users, ok := <-inStream:
// if ok == false {
// return
// }
// var res []byte
// var err error
// //if users.TaskType == FollowType {
// rows := make([]Follower, 0, len(users.Profiles))
// for _, user := range users.Profiles {
// sDec, _ := b64.StdEncoding.DecodeString(user.UserID)
// userId, _ := strings.CutPrefix(string(sDec), "User:")
// row := Follower{
// Follower: userId,
// UserName: user.Username,
// UserId: users.TaskId,
// }
// rows = append(rows, row)
// }
// res, _, err = client.From(users.TaskType).Insert(rows, true, "", "representation", "").Execute()
// //}
// if err != nil {
// slog.Error("insert into followers or retweeters ", err)
// for _, user := range users.Profiles {
// usersAsJson, err := json.Marshal(user)
// if err != nil {
// slog.Error("insert into followers or retweeters json.Marshal", err)
// continue
// }
// sDec, _ := b64.StdEncoding.DecodeString(user.UserID)
// userId, _ := strings.CutPrefix(string(sDec), "User:")
// slog.Error("insert into followers or retweeters error", string(usersAsJson), userId)
// }
// } else {
// slog.Info("insert into followers or retweeters", string(res), err)
// }
// fmt.Println("InsertOrUpdateUsers", "len(inStream)", len(inStream))
// }
// }
// }()
// return nil
// }
//UserTask
type
TaskRes
struct
{
...
...
@@ -219,11 +64,20 @@ type UserTaskIdAndTime struct {
CreatedAt
string
`json:"created_at"`
}
type
ApiConfig
struct
{
ApiKey
string
`json:"api_key"`
ApiKeySecrect
string
`json:"api_key_secret"`
AccessToken
string
`json:"access_token"`
AccessTokenSecret
string
`json:"access_token_secret"`
Token
string
`json:"token"`
}
type
TaskInDB
struct
{
// ID int `json:"id"`
User
string
`json:"user_id"`
TaskType
string
`json:"task_type"`
TaskId
string
`json:"task_id"`
ApiConfig
string
`json:"api_config"`
Start
bool
`json:"start"`
Stop
bool
`json:"stop"`
FollowerCount
int
`json:"follower_count"`
...
...
@@ -268,6 +122,27 @@ func (job *TaskJob) String() string {
return
string
(
jobAsJson
)
}
func
parseToConfig
(
api
string
)
Config
{
cfg
:=
Config
{}
if
err
:=
json
.
Unmarshal
([]
byte
(
api
),
&
cfg
);
err
!=
nil
{
return
Config
{}
}
else
{
return
cfg
}
}
func
parseToApiConfig
(
cfg
Config
)
string
{
api
:=
ApiConfig
{
ApiKey
:
cfg
.
ApiKey
,
ApiKeySecrect
:
cfg
.
ApiKeySecrect
,
AccessToken
:
cfg
.
AccessToken
,
AccessTokenSecret
:
cfg
.
AccessTokenSecret
,
Token
:
cfg
.
Token
,
}
d
,
_
:=
json
.
Marshal
(
api
)
return
string
(
d
)
}
func
GetTasks
()
([]
TaskJob
,
error
)
{
tasks
,
err
:=
QueryAllTask
()
...
...
@@ -282,7 +157,10 @@ func GetTasks() ([]TaskJob, error) {
res
:=
make
([]
TaskJob
,
0
,
10
)
for
_
,
task
:=
range
tasks
{
if
len
(
task
.
ApiConfig
)
==
0
{
continue
}
// get idx data from each task table.
data
,
count
,
err
:=
client
.
From
(
task
.
TaskType
)
.
Select
(
""
,
"user_id"
,
false
)
.
Eq
(
"task_id"
,
task
.
TaskId
)
.
Order
(
"id"
,
&
postgrest
.
OrderOpts
{
...
...
@@ -313,35 +191,14 @@ func GetTasks() ([]TaskJob, error) {
TaskType
:
task
.
TaskType
,
TaskId
:
task
.
TaskId
,
FollowerCount
:
task
.
FollowerCount
,
Config
:
parseToConfig
(
task
.
ApiConfig
),
}
// taskJob := NewTaskJob(task.TaskId, task.User, task.TaskType)
taskJob
.
Idx
=
userRes
res
=
append
(
res
,
taskJob
)
}
projects
,
err
:=
QueryAvailableProject
()
if
err
!=
nil
{
return
nil
,
err
}
resWithKeys
:=
make
([]
TaskJob
,
0
,
10
)
for
_
,
v
:=
range
projects
{
for
_
,
r
:=
range
res
{
if
v
.
UserId
==
r
.
UserId
{
r
.
Config
=
v
.
Config
resWithKeys
=
append
(
resWithKeys
,
r
)
}
}
}
return
resWithKeys
,
nil
return
res
,
nil
}
type
UserTask
struct
{
...
...
db_test.go
View file @
e72fa1c2
package
main
import
"testing"
import
(
"encoding/json"
"fmt"
"testing"
)
func
TestQueryTask
(
t
*
testing
.
T
)
{
...
...
@@ -19,6 +23,52 @@ func TestQueryTask(t *testing.T) {
//GetTasks()
func
TestAddTasks
(
t
*
testing
.
T
)
{
cfg
:=
Config
{
ApiKey
:
"u5HOlaBhMFNqXbs7lznEuUQVx"
,
ApiKeySecrect
:
"VhpTsl4TJUQi9FSAymajDCWfpgyJoK2d18i4lX9sPGiWc462nR"
,
AccessToken
:
"1783145144700874752-F38iX5PVmnOe3RHKwB0mc8tj4vObaT"
,
AccessTokenSecret
:
"O8BhSezZA8M8Jlz0rZJReBsV9HZO420iWdU58wCG2BXaA"
,
Token
:
"AAAAAAAAAAAAAAAAAAAAAIZTwQEAAAAApSMHaI24HWCEVYV0mx%2F2HauZHI0%3D6GrtIbJbiAwGJr4rZIz4WdrTLO6qI4zRGN4ZKpaOqUbwRRauPI"
,
}
task
:=
TaskInDB
{
User
:
"1783145144700874752"
,
TaskType
:
"tweet_liking_users"
,
TaskId
:
"1800805503066661056"
,
ApiConfig
:
parseToApiConfig
(
cfg
),
Start
:
true
,
Stop
:
true
,
FollowerCount
:
0
,
}
res
,
_
,
err
:=
client
.
From
(
"tasks"
)
.
Insert
(
task
,
true
,
""
,
"representation"
,
""
)
.
Execute
()
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
t
.
Log
(
res
)
}
func
TestGetTaskInDB
(
t
*
testing
.
T
)
{
data
,
count
,
err
:=
client
.
From
(
"tasks"
)
.
Select
(
"*"
,
"exact"
,
false
)
.
Eq
(
"start"
,
"true"
)
.
Eq
(
"stop"
,
"true"
)
.
Execute
()
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
fmt
.
Println
(
"found count"
,
count
)
res
:=
make
([]
TaskInDB
,
0
,
count
)
if
err
:=
json
.
Unmarshal
(
data
,
&
res
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
for
k
,
v
:=
range
res
{
t
.
Log
(
k
,
"v.User"
,
v
.
User
,
"v.TaskType"
,
v
.
TaskType
,
"v.TaskId"
,
v
.
TaskId
,
"v.Config"
,
v
.
ApiConfig
)
}
}
func
TestGetTasks
(
t
*
testing
.
T
)
{
tasks
,
err
:=
GetTasks
()
...
...
@@ -32,6 +82,12 @@ func TestGetTasks(t *testing.T) {
t
.
Log
(
k
,
"v.User"
,
v
.
UserId
,
"v.TaskType"
,
v
.
TaskType
,
"v.TaskId"
,
v
.
TaskId
,
"Token"
,
v
.
Token
,
"accessToken"
,
v
.
AccessToken
)
}
//exist, err := CheckTaskExist("1783145144700874752", "aon_aonet", "followers")
//if err != nil {
// t.Fatal(err)
//}
//t.Log("exist", exist)
}
// func TestInsertTask(t *testing.T) {
...
...
main.go
View file @
e72fa1c2
...
...
@@ -61,6 +61,7 @@ func main() {
app
.
Use
(
swagger
.
New
(
cfg
))
app
.
Post
(
"/project"
,
Project
)
app
.
Get
(
"/apikey/owner"
,
GetConfigOwner
)
app
.
Post
(
"/task/add"
,
TaskAdd
)
app
.
Post
(
"/task/stop"
,
TaskStop
)
app
.
Get
(
"/verify/follower"
,
VerifyFollower
)
...
...
task.go
View file @
e72fa1c2
...
...
@@ -23,7 +23,7 @@ func init() {
}
}
func
(
w
*
Work
)
StopJob
(
req
Add
TaskReq
)
error
{
func
(
w
*
Work
)
StopJob
(
req
Stop
TaskReq
)
error
{
w
.
Lock
.
Lock
()
defer
w
.
Lock
.
Unlock
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment