Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
twitter_syncer
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Odysseus
twitter_syncer
Commits
c8ae0af4
Commit
c8ae0af4
authored
Aug 03, 2024
by
Ubuntu
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
follow stream init ok
parent
ff5874c9
Changes
8
Show whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
904 additions
and
350 deletions
+904
-350
cookies.json
cookies.json
+210
-0
swagger.yaml
docs/swagger.yaml
+1
-1
idx.go
idx.go
+0
-294
main.go
main.go
+12
-2
stream.go
stream.go
+236
-5
stream_idx.go
stream_idx.go
+297
-0
task.go
task.go
+30
-30
type.go
type.go
+118
-18
No files found.
cookies.json
0 → 100644
View file @
c8ae0af4
[
{
"domain"
:
".twitter.com"
,
"hostOnly"
:
false
,
"httpOnly"
:
true
,
"name"
:
"_twitter_sess"
,
"path"
:
"/"
,
"sameSite"
:
"unspecified"
,
"secure"
:
true
,
"session"
:
true
,
"storeId"
:
"0"
,
"value"
:
"BAh7BiIKZmxhc2hJQzonQWN0aW9uQ29udHJvbGxlcjo6Rmxhc2g6OkZsYXNo%250ASGFzaHsABjoKQHVzZWR7AA%253D%253D--1164b91ac812d853b877e93ddb612b7471bebc74"
,
"id"
:
1
},
{
"domain"
:
".twitter.com"
,
"expirationDate"
:
1722772659.123745
,
"hostOnly"
:
false
,
"httpOnly"
:
true
,
"name"
:
"att"
,
"path"
:
"/"
,
"sameSite"
:
"no_restriction"
,
"secure"
:
true
,
"session"
:
false
,
"storeId"
:
"0"
,
"value"
:
"1-t27ASoVioPwK483s1zzCcXADK63c5SMNU0SbLt3i"
,
"id"
:
2
},
{
"domain"
:
".twitter.com"
,
"expirationDate"
:
1756900658.284772
,
"hostOnly"
:
false
,
"httpOnly"
:
true
,
"name"
:
"auth_token"
,
"path"
:
"/"
,
"sameSite"
:
"no_restriction"
,
"secure"
:
true
,
"session"
:
false
,
"storeId"
:
"0"
,
"value"
:
"b3cb77558ba11670d592387937b2cd86355f8925"
,
"id"
:
3
},
{
"domain"
:
".twitter.com"
,
"expirationDate"
:
1756900658.690693
,
"hostOnly"
:
false
,
"httpOnly"
:
false
,
"name"
:
"ct0"
,
"path"
:
"/"
,
"sameSite"
:
"lax"
,
"secure"
:
true
,
"session"
:
false
,
"storeId"
:
"0"
,
"value"
:
"e7d3edebd701cd68c1d9ca4ac755e71b896d1a3e03c4b7f77610f687fd6365967f61aaf5b6ab9bfb21fe9d7a38e4b92bf63c9193c2d7a0be42ae2e9f2252b9e789b2bc0917f70648f8c8be44645ccbea"
,
"id"
:
4
},
{
"domain"
:
".twitter.com"
,
"expirationDate"
:
1756895711.574034
,
"hostOnly"
:
false
,
"httpOnly"
:
false
,
"name"
:
"dnt"
,
"path"
:
"/"
,
"sameSite"
:
"no_restriction"
,
"secure"
:
true
,
"session"
:
false
,
"storeId"
:
"0"
,
"value"
:
"1"
,
"id"
:
5
},
{
"domain"
:
".twitter.com"
,
"expirationDate"
:
1723289601.759261
,
"hostOnly"
:
false
,
"httpOnly"
:
false
,
"name"
:
"external_referer"
,
"path"
:
"/"
,
"sameSite"
:
"unspecified"
,
"secure"
:
true
,
"session"
:
false
,
"storeId"
:
"0"
,
"value"
:
"padhuUp37zjgzgv1mFWxJ5Xq0CLV%2BbpWuS41v6lN3QU%3D|0|8e8t2xd8A2w%3D"
,
"id"
:
6
},
{
"domain"
:
".twitter.com"
,
"expirationDate"
:
1722688497.654729
,
"hostOnly"
:
false
,
"httpOnly"
:
false
,
"name"
:
"gt"
,
"path"
:
"/"
,
"sameSite"
:
"unspecified"
,
"secure"
:
true
,
"session"
:
false
,
"storeId"
:
"0"
,
"value"
:
"1819675816772730888"
,
"id"
:
7
},
{
"domain"
:
".twitter.com"
,
"expirationDate"
:
1756895712.329647
,
"hostOnly"
:
false
,
"httpOnly"
:
false
,
"name"
:
"guest_id"
,
"path"
:
"/"
,
"sameSite"
:
"no_restriction"
,
"secure"
:
true
,
"session"
:
false
,
"storeId"
:
"0"
,
"value"
:
"v1%3A172268131226928865"
,
"id"
:
8
},
{
"domain"
:
".twitter.com"
,
"expirationDate"
:
1757239567.820636
,
"hostOnly"
:
false
,
"httpOnly"
:
false
,
"name"
:
"guest_id_ads"
,
"path"
:
"/"
,
"sameSite"
:
"no_restriction"
,
"secure"
:
true
,
"session"
:
false
,
"storeId"
:
"0"
,
"value"
:
"v1%3A172267949548993887"
,
"id"
:
9
},
{
"domain"
:
".twitter.com"
,
"expirationDate"
:
1757239567.820538
,
"hostOnly"
:
false
,
"httpOnly"
:
false
,
"name"
:
"guest_id_marketing"
,
"path"
:
"/"
,
"sameSite"
:
"no_restriction"
,
"secure"
:
true
,
"session"
:
false
,
"storeId"
:
"0"
,
"value"
:
"v1%3A172267949548993887"
,
"id"
:
10
},
{
"domain"
:
".twitter.com"
,
"expirationDate"
:
1756900658.284168
,
"hostOnly"
:
false
,
"httpOnly"
:
true
,
"name"
:
"kdt"
,
"path"
:
"/"
,
"sameSite"
:
"unspecified"
,
"secure"
:
true
,
"session"
:
false
,
"storeId"
:
"0"
,
"value"
:
"n4OBqkWAyTvlv8c0AVr7eJ07cldRIlGw9rVRpcHI"
,
"id"
:
11
},
{
"domain"
:
".twitter.com"
,
"expirationDate"
:
1754222260.197705
,
"hostOnly"
:
false
,
"httpOnly"
:
false
,
"name"
:
"night_mode"
,
"path"
:
"/"
,
"sameSite"
:
"no_restriction"
,
"secure"
:
true
,
"session"
:
false
,
"storeId"
:
"0"
,
"value"
:
"2"
,
"id"
:
12
},
{
"domain"
:
".twitter.com"
,
"expirationDate"
:
1757239567.820814
,
"hostOnly"
:
false
,
"httpOnly"
:
false
,
"name"
:
"personalization_id"
,
"path"
:
"/"
,
"sameSite"
:
"no_restriction"
,
"secure"
:
true
,
"session"
:
false
,
"storeId"
:
"0"
,
"value"
:
"
\"
v1_ZCUHat1WJYkTwmbDxH++aA==
\"
"
,
"id"
:
13
},
{
"domain"
:
".twitter.com"
,
"expirationDate"
:
1754222269.877218
,
"hostOnly"
:
false
,
"httpOnly"
:
false
,
"name"
:
"twid"
,
"path"
:
"/"
,
"sameSite"
:
"no_restriction"
,
"secure"
:
true
,
"session"
:
false
,
"storeId"
:
"0"
,
"value"
:
"u%3D1535642152566259712"
,
"id"
:
14
},
{
"domain"
:
"twitter.com"
,
"hostOnly"
:
true
,
"httpOnly"
:
false
,
"name"
:
"lang"
,
"path"
:
"/"
,
"sameSite"
:
"unspecified"
,
"secure"
:
false
,
"session"
:
true
,
"storeId"
:
"0"
,
"value"
:
"en"
,
"id"
:
15
}
]
\ No newline at end of file
docs/swagger.yaml
View file @
c8ae0af4
...
...
@@ -240,7 +240,7 @@ components:
-
retweet
example
:
follow
task_id
:
description
:
user id
or full name(不要各提交一次,现在没有校验。)
or tweet id;
description
:
user id
(只使用uid https://twiteridfinder.com/)
or tweet id;
type
:
string
example
:
"
1570057485914087429"
StopTaskReq
:
...
...
idx.go
deleted
100644 → 0
View file @
ff5874c9
package
main
import
(
"container/list"
"encoding/json"
"fmt"
"log/slog"
"time"
twitterscraper
"github.com/imperatrona/twitter-scraper"
"github.com/supabase-community/postgrest-go"
)
func
GetFollowTasks
()
([]
Task
,
error
)
{
data
,
count
,
err
:=
client
.
From
(
"tasks"
)
.
Select
(
"*"
,
"exact"
,
false
)
.
Eq
(
"follow"
,
"true"
)
.
Neq
(
"follow_stop"
,
"true"
)
.
Execute
()
if
err
!=
nil
{
return
nil
,
err
}
res
:=
make
([]
Task
,
0
,
count
)
if
err
:=
json
.
Unmarshal
(
data
,
&
res
);
err
!=
nil
{
return
nil
,
err
}
return
res
,
nil
}
func
GetTasksFollowIdx
()
([][]
FollowerId
,
error
)
{
tasks
,
err
:=
GetFollowTasks
()
if
err
!=
nil
{
return
nil
,
err
}
res
:=
make
([][]
FollowerId
,
0
,
10
)
for
_
,
task
:=
range
tasks
{
data
,
count
,
err
:=
client
.
From
(
"followers"
)
.
Select
(
""
,
"user_id"
,
false
)
.
Eq
(
"user_id"
,
task
.
TaskId
)
.
Order
(
"id"
,
&
postgrest
.
OrderOpts
{
Ascending
:
false
,
// NullsFirst bool
// ForeignTable string
})
.
Range
(
0
,
10
,
""
)
.
Execute
()
if
err
!=
nil
{
slog
.
Error
(
"select * from followers error"
,
err
)
return
nil
,
err
}
_
=
count
fmt
.
Println
(
"data"
,
string
(
data
))
userRes
:=
make
([]
FollowerId
,
0
,
10
)
if
err
:=
json
.
Unmarshal
(
data
,
&
userRes
);
err
!=
nil
{
return
nil
,
err
}
res
=
append
(
res
,
userRes
)
}
return
res
,
nil
}
//////////////////
// ///////////////
func
GetTasks
()
([]
Task
,
error
)
{
data
,
count
,
err
:=
client
.
From
(
"tasks"
)
.
Select
(
"*"
,
"exact"
,
false
)
.
Eq
(
"retweet"
,
"true"
)
.
Neq
(
"retweet_stop"
,
"true"
)
.
Eq
(
"follow"
,
"true"
)
.
Neq
(
"follow_stop"
,
"true"
)
.
Execute
()
if
err
!=
nil
{
return
nil
,
err
}
res
:=
make
([]
Task
,
0
,
count
)
if
err
:=
json
.
Unmarshal
(
data
,
&
res
);
err
!=
nil
{
return
nil
,
err
}
return
res
,
nil
}
func
FollowersToBackList
(
done
<-
chan
interface
{},
idxss
[][]
FollowerId
)
(
<-
chan
*
list
.
List
,
error
)
{
scraper
:=
twitterscraper
.
New
()
//err := scraper.Login("Wade_Leeeee", "923881393time")
err
:=
scraper
.
Login
(
"wuban358369"
,
"123456789T"
)
if
err
!=
nil
{
fmt
.
Println
(
"scraper"
,
err
.
Error
())
return
nil
,
err
}
/*
wuban01@gmail.com
wuban01
1234567890Wuban
1234567890Wuban
// 2346
980b
641a
774d
39fb
9425
c6c5
3140
b469
0b93
d1c6
6101
ef6f
9fb6
ae94
ee6f
//wuban001
wuban01@tutamail.com
1234567890Wuban
*/
scraper3
:=
twitterscraper
.
New
()
//err = scraper3.Login("tifawe2861@stikezz.com", "123456789T")
//err = scraper3.Login("wuban358369", "123456789T")
err
=
scraper3
.
Login
(
"Wade_Leeeee"
,
"923881393time"
)
if
err
!=
nil
{
fmt
.
Println
(
"scraper3"
,
err
.
Error
())
return
nil
,
err
}
fmt
.
Println
(
"scraper.IsLoggedIn()"
,
scraper
.
IsLoggedIn
())
fmt
.
Println
(
"scraper3.IsLoggedIn()"
,
scraper3
.
IsLoggedIn
())
userId
:=
"Bitcoin"
next
:=
""
var
backPushPop
=
list
.
New
()
outStream
:=
make
(
chan
*
list
.
List
,
1
)
go
func
()
{
//newBegin := true
for
_
,
idxs
:=
range
idxss
{
newIdxs
:=
make
([]
FollowerId
,
0
,
len
(
idxs
))
c
:=
true
for
{
c
=
!
c
var
err
error
var
users
Users
if
c
{
users
,
err
=
FetchFollowers
(
scraper
,
userId
,
next
)
if
err
!=
nil
{
slog
.
Error
(
"FetchFollowers"
,
err
)
time
.
Sleep
(
time
.
Second
)
continue
}
}
else
{
users
,
err
=
FetchFollowers
(
scraper3
,
userId
,
next
)
if
err
!=
nil
{
slog
.
Error
(
"FetchFollowers"
,
err
)
time
.
Sleep
(
time
.
Second
)
continue
}
}
// users, err := FetchFollowers(scraper, userId, next)
// if err != nil {
// slog.Error("FetchFollowers", err)
// time.Sleep(time.Second)
// continue
// }
fmt
.
Println
(
"len(users)-----------------"
,
len
(
users
.
Profiles
))
for
k
,
v
:=
range
users
.
Profiles
{
fmt
.
Println
(
"k"
,
k
,
"v"
,
v
.
UserIdAsNumber
,
v
.
Username
)
}
if
len
(
newIdxs
)
==
0
{
for
k
,
v
:=
range
users
.
Profiles
{
newIdxs
=
append
(
newIdxs
,
FollowerId
{
Follower
:
Follower
{
Follower
:
v
.
UserIdAsNumber
,
UserName
:
v
.
Username
,
},
})
if
k
>
5
{
break
}
}
}
profiles
,
ok
:=
MatchIdx
(
users
,
idxs
)
fmt
.
Println
(
"len(profiles)-----------------"
,
len
(
profiles
))
for
_
,
v
:=
range
profiles
{
backPushPop
.
PushFront
(
v
)
}
fmt
.
Printf
(
"!ok %v users.Next %s %d len(outStream) %d backPushPop len %d
\n
"
,
!
ok
,
users
.
Next
,
len
(
users
.
Next
),
len
(
outStream
),
backPushPop
.
Len
())
time
.
Sleep
(
time
.
Second
*
2
)
if
!
ok
&&
len
(
users
.
Next
)
!=
0
{
next
=
users
.
Next
continue
}
/*
Api has a global limit on how many requests per second are allowed, don’t make requests more than once per 1.5 seconds from one account.
Also each endpoint has its own limits, most of them are 150 requests per 15 minutes.
*/
if
backPushPop
.
Len
()
==
0
{
next
=
""
//time.Sleep(time.Second * 20)
continue
}
idxs
=
newIdxs
newIdxs
=
make
([]
FollowerId
,
0
,
len
(
idxs
))
select
{
case
<-
done
:
return
case
outStream
<-
backPushPop
:
fmt
.
Println
(
"case outStream <- backPushPop: ------------"
)
backPushPop
=
list
.
New
()
//time.Sleep(time.Second * 20)
}
next
=
""
//time.Sleep(time.Second * 20)
}
}
}()
return
outStream
,
nil
}
func
MatchIdx
(
data
Users
,
idxs
[]
FollowerId
)
([]
Profile
,
bool
)
{
if
len
(
idxs
)
==
0
{
return
data
.
Profiles
,
false
}
for
k
,
v
:=
range
data
.
Profiles
{
for
_
,
idx
:=
range
idxs
{
if
v
.
UserIdAsNumber
==
idx
.
Follower
.
Follower
{
return
data
.
Profiles
[
:
k
],
true
}
}
}
return
data
.
Profiles
,
false
}
main.go
View file @
c8ae0af4
...
...
@@ -21,6 +21,16 @@ import (
func
main
()
{
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
go
func
()
{
if
err
:=
newSync
(
done
);
err
!=
nil
{
panic
(
err
)
}
}()
app
:=
fiber
.
New
()
app
.
Use
(
cors
.
New
())
...
...
@@ -42,8 +52,8 @@ func main() {
app
.
Post
(
"/task/add"
,
TaskAdd
)
app
.
Post
(
"/task/stop"
,
TaskStop
)
//
app.Get("/verify/follower", VerifyFollower)
//
app.Get("/verify/retweeter", VerifyRetweeter)
app
.
Get
(
"/verify/follower"
,
VerifyFollower
)
app
.
Get
(
"/verify/retweeter"
,
VerifyRetweeter
)
if
err
:=
app
.
Listen
(
":8001"
);
err
!=
nil
{
slog
.
Error
(
err
.
Error
())
...
...
stream.go
View file @
c8ae0af4
package
main
import
(
"container/list"
b64
"encoding/base64"
"encoding/json"
"fmt"
"log/slog"
"strings"
"time"
twitterscraper
"github.com/imperatrona/twitter-scraper"
)
func
newSync
()
error
{
func
newSync
(
done
<-
chan
interface
{}
)
error
{
done
:=
make
(
<-
chan
interface
{})
//
done := make(<-chan interface{})
connStream
:=
make
(
chan
taskInterface
,
1000
)
idxStream
:=
TaskIdx
(
done
,
connStream
)
tasks
,
err
:=
GetTasksIdx
()
if
err
!=
nil
{
return
err
}
for
_
,
task
:=
range
tasks
{
connStream
<-
task
}
idxStream
,
listStream
:=
TaskIdx
(
done
,
connStream
)
usersStream
:=
BackListToQueue
(
done
,
listStream
)
if
err
:=
InsertOrUpdateUsers
(
done
,
usersStream
);
err
!=
nil
{
return
err
}
resStream
,
err
:=
InitResource
()
if
err
!=
nil
{
...
...
@@ -26,17 +45,208 @@ func newSync() error {
ConnectTailResource
(
done
,
resStream
,
resTailStream
)
select
{}
//select {}
return
nil
}
func
TaskIdx
(
done
<-
chan
interface
{},
inStream
<-
chan
taskInterface
)
<-
chan
taskInterface
{
func
TaskIdx
(
done
<-
chan
interface
{},
inStream
<-
chan
taskInterface
)
(
<-
chan
taskInterface
,
<-
chan
TaskIdAndList
)
{
outStream
:=
make
(
chan
taskInterface
,
1000
)
outListStream
:=
make
(
chan
TaskIdAndList
,
1
)
go
func
()
{
for
{
select
{
case
<-
done
:
return
case
task
:=
<-
inStream
:
slog
.
Info
(
"TaskIdx"
,
"task.ID()"
,
task
.
ID
(),
"task.InitIdx()"
,
task
.
InitIdx
())
if
task
.
InitIdx
()
{
select
{
case
<-
done
:
return
case
outStream
<-
task
:
}
continue
}
list
,
ok
:=
task
.
UpdateIdx
()
if
ok
{
select
{
case
<-
done
:
return
case
outListStream
<-
TaskIdAndList
{
TaskId
:
task
.
ID
(),
List
:
list
,
}
:
}
}
}
}
}()
return
outStream
,
outListStream
}
type
TaskIdAndList
struct
{
TaskId
string
List
*
list
.
List
}
type
TaskIdAndProfiles
struct
{
TaskId
string
Profiles
[]
Profile
}
func
BackListToQueue
(
done
<-
chan
interface
{},
inStream
<-
chan
TaskIdAndList
)
<-
chan
TaskIdAndProfiles
{
outStream
:=
make
(
chan
TaskIdAndProfiles
,
1
)
go
func
()
{
for
{
select
{
case
<-
done
:
return
case
users
,
ok
:=
<-
inStream
:
if
ok
==
false
{
return
}
c
:=
0
// if c < 100 {
// c = c + 1
res
:=
make
([]
Profile
,
0
,
users
.
List
.
Len
())
for
e
:=
users
.
List
.
Front
();
e
!=
nil
;
e
=
e
.
Next
()
{
if
user
,
ok
:=
e
.
Value
.
(
Profile
);
ok
{
//fmt.Printf("The data is a string: %s\n", str)
res
=
append
(
res
,
user
)
c
=
c
+
1
if
c
%
100
==
0
{
fmt
.
Println
(
"BackListToQueue"
,
"len(inStream)"
,
len
(
inStream
),
"len(outStream)"
,
len
(
outStream
))
select
{
case
<-
done
:
return
case
outStream
<-
TaskIdAndProfiles
{
Profiles
:
res
,
TaskId
:
users
.
TaskId
,
}
:
res
=
make
([]
Profile
,
0
,
users
.
List
.
Len
())
}
}
}
}
fmt
.
Println
(
"BackListToQueue"
,
"len(inStream)"
,
len
(
inStream
),
"len(outStream)"
,
len
(
outStream
))
select
{
case
<-
done
:
return
case
outStream
<-
TaskIdAndProfiles
{
Profiles
:
res
,
TaskId
:
users
.
TaskId
,
}
:
}
// } else {
// c = 0
// }
}
}
}()
return
outStream
}
func
InsertOrUpdateUsers
(
done
<-
chan
interface
{},
inStream
<-
chan
TaskIdAndProfiles
)
error
{
// client, err := supabase.NewClient(API_URL, API_KEY, nil)
// if err != nil {
// return fmt.Errorf("cannot initalize client: %v .", err)
// }
go
func
()
{
for
{
select
{
case
<-
done
:
return
case
users
,
ok
:=
<-
inStream
:
if
ok
==
false
{
return
}
//rows := make([]map[string]string, 0, len(users.Profiles))
rows
:=
make
([]
Follower
,
0
,
len
(
users
.
Profiles
))
for
_
,
user
:=
range
users
.
Profiles
{
sDec
,
_
:=
b64
.
StdEncoding
.
DecodeString
(
user
.
UserID
)
userId
,
_
:=
strings
.
CutPrefix
(
string
(
sDec
),
"User:"
)
row
:=
Follower
{
Follower
:
userId
,
UserName
:
user
.
Username
,
UserId
:
users
.
TaskId
,
}
rows
=
append
(
rows
,
row
)
}
//res, c, err := client.From("followers").Insert(rows, true, "", "representation", "").Execute()
res
,
_
,
err
:=
client
.
From
(
"followers"
)
.
Insert
(
rows
,
true
,
""
,
"representation"
,
""
)
.
Execute
()
if
err
!=
nil
{
slog
.
Error
(
"insert into followers"
,
err
)
for
_
,
user
:=
range
users
.
Profiles
{
usersAsJson
,
err
:=
json
.
Marshal
(
user
)
if
err
!=
nil
{
slog
.
Error
(
"insert into followers json.Marshal"
,
err
)
continue
}
sDec
,
_
:=
b64
.
StdEncoding
.
DecodeString
(
user
.
UserID
)
userId
,
_
:=
strings
.
CutPrefix
(
string
(
sDec
),
"User:"
)
slog
.
Error
(
"insert into followers error"
,
string
(
usersAsJson
),
userId
)
}
}
else
{
slog
.
Info
(
"insert into followers"
,
string
(
res
),
err
)
}
fmt
.
Println
(
"InsertOrUpdateUsers"
,
"len(inStream)"
,
len
(
inStream
))
}
}
}()
return
nil
}
type
ScraperTimer
struct
{
Scraper
*
twitterscraper
.
Scraper
Timer
time
.
Timer
...
...
@@ -54,6 +264,7 @@ func InitResource() (chan ScraperTimer, error) {
for
_
,
v
:=
range
accounts
{
fmt
.
Println
(
v
.
User
,
v
.
PassWd
)
scraper
,
err
:=
InitScraper
(
v
.
User
,
v
.
PassWd
)
if
err
!=
nil
{
...
...
@@ -65,10 +276,14 @@ func InitResource() (chan ScraperTimer, error) {
Timer
:
*
time
.
NewTimer
(
0
),
}
//
outStream
<-
newScraperTimer
}
fmt
.
Println
(
"twitter init ok"
)
return
outStream
,
nil
}
...
...
@@ -115,9 +330,25 @@ func TaskImplement(done <-chan interface{}, inTaskStream <-chan taskInterface, i
case
<-
done
:
return
case
res
:=
<-
inResourceStream
:
fmt
.
Println
(
"TaskImplement"
,
task
.
ID
())
if
err
:=
task
.
Fetch
(
res
.
Scraper
);
err
!=
nil
{
slog
.
Error
(
"task.Fetch"
,
"err"
,
err
.
Error
())
}
select
{
case
<-
done
:
return
case
taskOutStream
<-
task
:
}
select
{
case
<-
done
:
return
case
scraperOutStream
<-
res
:
}
}
}
}
...
...
stream_idx.go
0 → 100644
View file @
c8ae0af4
package
main
import
(
"encoding/json"
"fmt"
"log/slog"
"github.com/supabase-community/postgrest-go"
)
// func GetFollowTasks() ([]Task, error) {
// data, count, err := client.From("tasks").Select("*", "exact", false).
// Eq("follow", "true").Neq("follow_stop", "true").
// Execute()
// if err != nil {
// return nil, err
// }
// res := make([]Task, 0, count)
// if err := json.Unmarshal(data, &res); err != nil {
// return nil, err
// }
// return res, nil
// }
// follow, retweet
const
FollowType
=
"follow"
const
RetweetType
=
"retweet"
type
FollowerId
struct
{
Follower
Id
int
`json:"id"`
CreatedAt
string
`json:"created_at"`
}
type
Follower
struct
{
//user_id
UserId
string
`json:"user_id"`
Follower
string
`json:"follower_id"`
UserName
string
`json:"follower_username"`
}
func
GetTasksIdx
()
([]
taskInterface
,
error
)
{
tasks
,
err
:=
QueryAllTask
()
if
err
!=
nil
{
return
nil
,
err
}
res
:=
make
([]
taskInterface
,
0
,
10
)
for
_
,
task
:=
range
tasks
{
if
task
.
TaskType
==
FollowType
{
data
,
count
,
err
:=
client
.
From
(
"followers"
)
.
Select
(
""
,
"user_id"
,
false
)
.
Eq
(
"user_id"
,
task
.
TaskId
)
.
Order
(
"id"
,
&
postgrest
.
OrderOpts
{
Ascending
:
false
,
// NullsFirst bool
// ForeignTable string
})
.
Range
(
0
,
10
,
""
)
.
Execute
()
if
err
!=
nil
{
slog
.
Error
(
"select * from followers error"
,
err
)
return
nil
,
err
}
_
=
count
slog
.
Info
(
"idx data"
,
"user id"
,
task
.
TaskId
,
"user name"
,
task
.
User
,
"idx"
,
data
)
fmt
.
Println
(
"idx data"
,
string
(
data
))
userRes
:=
make
([]
FollowerId
,
0
,
10
)
if
err
:=
json
.
Unmarshal
(
data
,
&
userRes
);
err
!=
nil
{
return
nil
,
err
}
followTask
:=
NewFollowTask
()
followTask
.
UserId
=
task
.
TaskId
followTask
.
Idx
=
userRes
if
len
(
userRes
)
==
0
{
followTask
.
Init
=
true
}
res
=
append
(
res
,
followTask
)
}
if
task
.
TaskType
==
RetweetType
{
}
}
return
res
,
nil
}
// func FollowersToBackList(done <-chan interface{}, idxss [][]FollowerId) (<-chan *list.List, error) {
// scraper := twitterscraper.New()
// //err := scraper.Login("Wade_Leeeee", "923881393time")
// err := scraper.Login("wuban358369", "123456789T")
// if err != nil {
// fmt.Println("scraper", err.Error())
// return nil, err
// }
// /*
// wuban01@gmail.com
// wuban01
// 1234567890Wuban
// 1234567890Wuban
// // 2346
// 980b
// 641a
// 774d
// 39fb
// 9425
// c6c5
// 3140
// b469
// 0b93
// d1c6
// 6101
// ef6f
// 9fb6
// ae94
// ee6f
// //wuban001
// wuban01@tutamail.com
// 1234567890Wuban
// */
// scraper3 := twitterscraper.New()
// //err = scraper3.Login("tifawe2861@stikezz.com", "123456789T")
// //err = scraper3.Login("wuban358369", "123456789T")
// err = scraper3.Login("Wade_Leeeee", "923881393time")
// if err != nil {
// fmt.Println("scraper3", err.Error())
// return nil, err
// }
// fmt.Println("scraper.IsLoggedIn()", scraper.IsLoggedIn())
// fmt.Println("scraper3.IsLoggedIn()", scraper3.IsLoggedIn())
// userId := "Bitcoin"
// next := ""
// var backPushPop = list.New()
// outStream := make(chan *list.List, 1)
// go func() {
// //newBegin := true
// for _, idxs := range idxss {
// newIdxs := make([]FollowerId, 0, len(idxs))
// c := true
// for {
// c = !c
// var err error
// var users Users
// if c {
// users, err = FetchFollowers(scraper, userId, next)
// if err != nil {
// slog.Error("FetchFollowers", err)
// time.Sleep(time.Second)
// continue
// }
// } else {
// users, err = FetchFollowers(scraper3, userId, next)
// if err != nil {
// slog.Error("FetchFollowers", err)
// time.Sleep(time.Second)
// continue
// }
// }
// // users, err := FetchFollowers(scraper, userId, next)
// // if err != nil {
// // slog.Error("FetchFollowers", err)
// // time.Sleep(time.Second)
// // continue
// // }
// fmt.Println("len(users)-----------------", len(users.Profiles))
// for k, v := range users.Profiles {
// fmt.Println("k", k, "v", v.UserIdAsNumber, v.Username)
// }
// if len(newIdxs) == 0 {
// for k, v := range users.Profiles {
// newIdxs = append(newIdxs, FollowerId{
// Follower: Follower{
// Follower: v.UserIdAsNumber,
// UserName: v.Username,
// },
// })
// if k > 5 {
// break
// }
// }
// }
// profiles, ok := MatchIdx(users, idxs)
// fmt.Println("len(profiles)-----------------", len(profiles))
// for _, v := range profiles {
// backPushPop.PushFront(v)
// }
// fmt.Printf("!ok %v users.Next %s %d len(outStream) %d backPushPop len %d\n", !ok, users.Next, len(users.Next), len(outStream), backPushPop.Len())
// time.Sleep(time.Second * 2)
// if !ok && len(users.Next) != 0 {
// next = users.Next
// continue
// }
// /*
// Api has a global limit on how many requests per second are allowed, don’t make requests more than once per 1.5 seconds from one account.
// Also each endpoint has its own limits, most of them are 150 requests per 15 minutes.
// */
// if backPushPop.Len() == 0 {
// next = ""
// //time.Sleep(time.Second * 20)
// continue
// }
// idxs = newIdxs
// newIdxs = make([]FollowerId, 0, len(idxs))
// select {
// case <-done:
// return
// case outStream <- backPushPop:
// fmt.Println("case outStream <- backPushPop: ------------")
// backPushPop = list.New()
// //time.Sleep(time.Second * 20)
// }
// next = ""
// //time.Sleep(time.Second * 20)
// }
// }
// }()
// return outStream, nil
// }
// func MatchIdx(data Users, idxs []FollowerId) ([]Profile, bool) {
// if len(idxs) == 0 {
// return data.Profiles, false
// }
// for k, v := range data.Profiles {
// for _, idx := range idxs {
// if v.UserIdAsNumber == idx.Follower.Follower {
// return data.Profiles[:k], true
// }
// }
// }
// return data.Profiles, false
// }
task.go
View file @
c8ae0af4
package
main
import
(
"net/http"
"os"
"strings"
"time"
b64
"encoding/base64"
"encoding/json"
twitterscraper
"github.com/imperatrona/twitter-scraper"
)
...
...
@@ -38,28 +41,28 @@ import (
// }
func
sync
()
error
{
//
func sync() error {
idxs
,
err
:=
GetTasksFollowIdx
()
//
idxs, err := GetTasksFollowIdx()
//idxs, err := GetIndexList()
//
//idxs, err := GetIndexList()
if
err
!=
nil
{
return
err
}
//
if err != nil {
//
return err
//
}
done
:=
make
(
chan
interface
{})
//
done := make(chan interface{})
defer
close
(
done
)
//
defer close(done)
backListStream
,
err
:=
FollowersToBackList
(
done
,
idxs
)
if
err
!=
nil
{
return
err
}
//
backListStream, err := FollowersToBackList(done, idxs)
//
if err != nil {
//
return err
//
}
_
=
backListStream
return
nil
}
//
_ = backListStream
//
return nil
//
}
func
tasks
(
done
<-
chan
interface
{},
inStream
<-
chan
TaskParam
)
(
<-
chan
TaskParam
,
<-
chan
TaskParam
)
{
...
...
@@ -119,8 +122,18 @@ func InitScraper(user, password string) (*twitterscraper.Scraper, error) {
//err := scraper3.Login("Wade_Leeeee", "923881393time")
if
err
:=
scraper3
.
Login
(
user
,
password
);
err
!=
nil
{
return
nil
,
err
// if err := scraper3.Login(user, password); err != nil {
// return nil, err
// }
// Deserialize from JSON
var
cookies
[]
*
http
.
Cookie
f
,
_
:=
os
.
Open
(
"cookies.json"
)
json
.
NewDecoder
(
f
)
.
Decode
(
&
cookies
)
scraper3
.
SetCookies
(
cookies
)
if
!
scraper3
.
IsLoggedIn
()
{
panic
(
"Invalid cookies"
)
}
return
scraper3
,
nil
...
...
@@ -181,19 +194,6 @@ func ImplementTask(done <-chan interface{}, scraper3 *twitterscraper.Scraper, ti
}()
}
type
FollowerId
struct
{
Follower
Id
int
`json:"id"`
CreatedAt
string
`json:"created_at"`
}
type
Follower
struct
{
//user_id
UserId
string
`json:"user_id"`
Follower
string
`json:"follower"`
UserName
string
`json:"follower_username"`
}
func
FetchFollowers
(
scraper
*
twitterscraper
.
Scraper
,
user
,
next
string
)
(
Users
,
error
)
{
users
,
newNext
,
err
:=
scraper
.
FetchFollowers
(
user
,
20
,
next
)
...
...
type.go
View file @
c8ae0af4
package
main
import
(
"container/list"
b64
"encoding/base64"
"log/slog"
"strings"
twitterscraper
"github.com/imperatrona/twitter-scraper"
...
...
@@ -34,38 +36,122 @@ type NewTask[T FollowTask | RetweetTask] struct {
}
type
FollowTask
struct
{
URL
string
// URL string
Init
bool
UserId
string
Next
string
Idx
[]
FollowerId
NewIdx
[]
FollowerId
backPushPop
*
list
.
List
Res
Users
Scraper
*
twitterscraper
.
Scraper
//Scraper *twitterscraper.Scraper
}
func
NewFollowTask
()
*
FollowTask
{
return
&
FollowTask
{
backPushPop
:
list
.
New
(),
Idx
:
make
([]
FollowerId
,
0
),
}
}
type
taskInterface
interface
{
ID
()
string
Fetch
(
scraper
*
twitterscraper
.
Scraper
)
error
SetScraper
(
scraper
*
twitterscraper
.
Scraper
)
SetScraperNil
(
)
InitIdx
()
bool
UpdateIdx
()
(
*
list
.
List
,
bool
)
}
// func (f *FollowTask) URl() string {
// return f.URL
// }
func
(
f
*
FollowTask
)
InitIdx
()
bool
{
if
len
(
f
.
Res
.
Profiles
)
>
0
{
return
false
}
return
true
}
func
(
f
*
FollowTask
)
ID
()
string
{
return
f
.
UserId
}
func
(
f
*
FollowTask
)
SetScraper
(
scraper
*
twitterscraper
.
Scraper
)
{
f
.
Scraper
=
scraper
func
(
f
*
FollowTask
)
UpdateIdx
()
(
*
list
.
List
,
bool
)
{
//newIdxs := make([]FollowerId, 0, len(f.Idx))
if
len
(
f
.
NewIdx
)
==
0
{
for
k
,
v
:=
range
f
.
Res
.
Profiles
{
f
.
NewIdx
=
append
(
f
.
NewIdx
,
FollowerId
{
Follower
:
Follower
{
Follower
:
v
.
UserIdAsNumber
,
UserName
:
v
.
Username
,
},
})
if
k
>
5
{
break
}
}
}
profiles
,
ok
:=
MatchIdx
(
f
.
Res
,
f
.
Idx
)
for
_
,
v
:=
range
profiles
{
f
.
backPushPop
.
PushFront
(
v
)
}
if
!
ok
{
f
.
Next
=
f
.
Res
.
Next
return
nil
,
false
}
// 这个想一想
if
f
.
backPushPop
.
Len
()
==
0
{
f
.
Next
=
""
//time.Sleep(time.Second * 20)
return
nil
,
false
}
f
.
Idx
=
f
.
NewIdx
f
.
NewIdx
=
make
([]
FollowerId
,
0
,
len
(
f
.
Idx
))
resList
:=
f
.
backPushPop
f
.
backPushPop
=
list
.
New
()
return
resList
,
true
}
func
(
f
*
FollowTask
)
SetScraperNil
()
{
f
.
Scraper
=
nil
func
MatchIdx
(
data
Users
,
idxs
[]
FollowerId
)
([]
Profile
,
bool
)
{
if
len
(
idxs
)
==
0
{
return
data
.
Profiles
,
true
}
for
k
,
v
:=
range
data
.
Profiles
{
for
_
,
idx
:=
range
idxs
{
if
v
.
UserIdAsNumber
==
idx
.
Follower
.
Follower
{
return
data
.
Profiles
[
:
k
],
true
}
}
}
return
data
.
Profiles
,
false
}
func
(
f
*
FollowTask
)
Fetch
(
scraper
*
twitterscraper
.
Scraper
)
error
{
users
,
newNext
,
err
:=
scraper
.
FetchFollowers
(
f
.
UserId
,
20
,
f
.
Next
)
// f.UserId = "OnlyDD_D"
users
,
newNext
,
err
:=
scraper
.
FetchFollowersByUserID
(
f
.
UserId
,
20
,
f
.
Next
)
//users, newNext, err := scraper.FetchFollowers(f.UserId, 20, f.Next)
if
err
!=
nil
{
return
err
...
...
@@ -89,6 +175,8 @@ func (f *FollowTask) Fetch(scraper *twitterscraper.Scraper) error {
Next
:
newNext
,
}
slog
.
Info
(
"follow fetch"
,
"res.Current"
,
res
.
Current
,
"res.Next"
,
res
.
Next
,
"len(res.Profiles)"
,
len
(
res
.
Profiles
))
f
.
Res
=
res
return
nil
...
...
@@ -99,7 +187,10 @@ type RetweetTask struct {
URL
string
TweetId
string
Next
string
Scraper
*
twitterscraper
.
Scraper
Idx
[]
FollowerId
// backPushPop = list.New()
backPushPop
list
.
List
}
// func (r *RetweetTask) URl() string {
...
...
@@ -110,9 +201,9 @@ func (f *RetweetTask) ID() string {
return
f
.
TweetId
}
func
(
f
*
RetweetTask
)
SetScraper
(
scraper
*
twitterscraper
.
Scraper
)
{
f
.
Scraper
=
scraper
}
//
func (f *RetweetTask) SetScraper(scraper *twitterscraper.Scraper) {
//
f.Scraper = scraper
//
}
func
(
f
*
RetweetTask
)
Fetch
(
scraper
*
twitterscraper
.
Scraper
)
error
{
...
...
@@ -120,8 +211,17 @@ func (f *RetweetTask) Fetch(scraper *twitterscraper.Scraper) error {
}
func
(
f
*
RetweetTask
)
SetScraperNil
()
{
f
.
Scraper
=
nil
// func (f *RetweetTask) SetScraperNil() {
// f.Scraper = nil
// }
func
(
f
*
RetweetTask
)
InitIdx
()
bool
{
return
f
.
Idx
==
nil
||
len
(
f
.
Idx
)
==
0
}
func
(
f
*
RetweetTask
)
UpdateIdx
()
(
*
list
.
List
,
bool
)
{
return
nil
,
false
}
type
TwitterAccount
struct
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment