Commit ee75edbb authored by 贾浩@五瓣科技's avatar 贾浩@五瓣科技

update

parent 2a47dede
......@@ -4,7 +4,7 @@ WORKDIR /app
COPY . .
RUN go mod tidy && go build -v -o /tmp/api ./cmd/api
RUN go mod tidy && go build -v -o /tmp/api ./cmd/api && go build -v -o /tmp/sync ./cmd/sync
FROM alpine:latest
......@@ -14,4 +14,6 @@ COPY ./config.toml .
COPY --from=builder /tmp/api /usr/bin/api
COPY --from=builder /tmp/sync /usr/bin/sync
EXPOSE 8080
\ No newline at end of file
......@@ -4,10 +4,10 @@ import (
"flag"
"io"
"os"
"sdk_api/config"
"sdk_api/dao"
"sdk_api/server"
"sdk_api/service"
"taskcenter/config"
"taskcenter/dao"
"taskcenter/server"
"taskcenter/service"
log "github.com/sirupsen/logrus"
)
......@@ -41,6 +41,6 @@ func main() {
svs := service.New(cfg, da)
server.StartServer(svs, cfg)
server.StartServer(svs, da, cfg)
}
package main
import (
"flag"
"io"
"os"
"taskcenter/config"
"taskcenter/dao"
"taskcenter/sync"
log "github.com/sirupsen/logrus"
)
func initLog() {
file, _ := os.OpenFile("log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
mw := io.MultiWriter(os.Stdout, file)
log.SetOutput(mw)
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
}
func main() {
initLog()
flag.Parse()
cfg, err := config.New()
if err != nil {
panic(err)
}
da, err := dao.New(cfg)
if err != nil {
panic(err)
}
if cfg.Debug {
log.SetLevel(log.DebugLevel)
}
syc := sync.NewSync(da)
syc.Start()
}
debug = true
[supabase]
jwt_secret = "ZHqJkt6h0rmjUOM3cKon/f4//XkJvsa9jaLRpXnBYV1LVY9ZfkF9vt7yCwGX6AfS+HW7vq7W+jwCHYSX2W4i7A=="
[pgsql]
host = "aws-0-ap-northeast-1.pooler.supabase.com"
port = 5432
......
......@@ -12,13 +12,13 @@ type Config struct {
Server ServerConfig `toml:"server"`
TGTask TGTaskConfig `toml:"tg_task"`
TwitterTask TwitterTaskConfig `toml:"twitter_task"`
Supabase SupabaseConfig `tomo:"supabase"`
}
type SupabaseConfig struct {
URL string `toml:"url"`
APIKey string `toml:"api_key"`
Email string `toml:"email"`
Password string `toml:"password"`
AuthURL string `toml:"auth_url"`
APIKey string `toml:"api_key"`
JWTSecret string `toml:"jwt_secret"`
}
type PGSQLConfig struct {
......
......@@ -66,7 +66,15 @@ var TaskAction = map[string][]string{
}
const (
TwitterAPIActionRetwitter = "retweeters"
TwitterAPIActionLike = "like"
TwitterAPIActionFollow = "follow"
TwitterAPIActionRetweet = "retweeters"
TwitterAPIActionLike = "tweet_liking_users"
TwitterAPIActionFollow = "followers"
)
const (
TaskHistoryStatusPending = "pending" // 点了检查,3分钟后检查
TaskHistoryStatusSuccess = "success" // 3分钟后检查成功
TaskHistoryStatusRetry = "retry" // 3分钟后检查未完成
TaskHistoryStatusUncompleted = "uncompleted"
TaskHistoryStatusNotExist = "not exist"
)
package dao
import (
dbModel "sdk_api/model/db"
dbModel "taskcenter/model/db"
"gorm.io/gorm"
)
func (d *Dao) GetProviderId(userId, platform string) (providerId string, err error) {
var temp dbModel.Identity
err = d.db.Model(&dbModel.Identity{}).Where("user_id = ? and provider = ?", userId, platform).First(&temp).Error
tx := d.db
if platform == "telegram" {
tx = d.db.Table((&dbModel.Identity{}).TelegramAuthTableName())
} else {
tx = d.db.Table((&dbModel.Identity{}).TableName())
}
err = tx.Select("provider_id").Where("user_id = ? and provider = ?", userId, platform).First(&providerId).Error
if err == gorm.ErrRecordNotFound {
return
return "", nil
}
return temp.ProviderId, nil
return providerId, err
}
......@@ -3,8 +3,8 @@ package dao
import (
"fmt"
"os"
"sdk_api/config"
dbModel "sdk_api/model/db"
"taskcenter/config"
dbModel "taskcenter/model/db"
"time"
"github.com/supabase-community/supabase-go"
......
package dao
import (
"sdk_api/constant"
dbModel "sdk_api/model/db"
"sdk_api/util"
"fmt"
"taskcenter/constant"
dbModel "taskcenter/model/db"
"taskcenter/util"
"time"
"gorm.io/gorm"
......@@ -61,6 +62,9 @@ func (d *Dao) GetGroupList(page, pageSize int) (list []*dbModel.TaskGroup, total
}
totalCount = int(tmpCount)
return list, totalCount, d.db.Limit(pageSize).Offset((page - 1) * pageSize).Find(&list).Error
// 性能优化
// sql := fmt.Sprintf("SELECT * FROM %[1]s t1 JOIN (SELECT id FROM %[1]s LIMIT ?, ?) t2 ON t1.id = t2.id", (&dbModel.Task{}).TableName())
// return list, totalCount, d.db.Raw(sql, (page-1)*pageSize, pageSize).Scan(&list).Error
}
func (d *Dao) GetGroup(id int) (g *dbModel.TaskGroup, err error) {
......@@ -89,9 +93,10 @@ func (d *Dao) GetTaskDetail(tid int) (t *dbModel.Task, err error) {
return t, err
}
func (d *Dao) IsDailyTask(tid int) (ok bool, err error) {
var count int64
err = d.db.Model(&dbModel.Task{}).Where("id = ? and daily = ?", tid, 1).Count(&count).Error
err = d.db.Model(&dbModel.Task{}).Where("id = ? and daily = ?", tid, true).Count(&count).Error
if err != nil {
return false, err
}
......@@ -117,7 +122,31 @@ func (d *Dao) IsTaskDone(tid int, userId string) (ok bool, err error) {
return count > 0, err
}
func (d *Dao) CreateTaskHistory(taskId int, userId string, isDailyTask bool) (err error) {
func (d *Dao) GetTaskResult(tid int, userId string) (status string, err error) {
isDailyTask, err := d.IsDailyTask(tid)
if err != nil {
return "", err
}
temp := &dbModel.TaskHistory{}
if !isDailyTask {
err = d.db.Model(&dbModel.TaskHistory{}).Where("task_id = ? and user_id = ?", tid, userId).First(temp).Error
if err == gorm.ErrRecordNotFound {
return constant.TaskHistoryStatusNotExist, nil
}
return temp.Status, err
}
err = d.db.Model(&dbModel.TaskHistory{}).
Where("task_id = ? and user_id = ? and created_at >= ?", tid, userId, time.Now().UTC().Truncate(24*time.Hour)).
First(temp).Error
if err == gorm.ErrRecordNotFound {
return constant.TaskHistoryStatusNotExist, nil
}
return temp.Status, err
}
func (d *Dao) CreateTaskHistory(taskId int, userId string, isDailyTask bool) (exist bool, err error) {
// 使用行锁,防止并发
tx := d.db.Begin()
defer func() {
......@@ -127,32 +156,65 @@ func (d *Dao) CreateTaskHistory(taskId int, userId string, isDailyTask bool) (er
tx.Commit()
}
}()
temp := &dbModel.TaskHistory{}
temp := make([]*dbModel.TaskHistory, 0)
if !isDailyTask {
err = tx.Set("gorm:query_option", "FOR UPDATE NOWAIT").Where("task_id = ? and user_id = ?", taskId, userId).First(temp).Error
err = tx.Set("gorm:query_option",
"FOR UPDATE").Where("task_id = ? and user_id = ?", taskId, userId).Take(temp).Error
if err == gorm.ErrRecordNotFound {
err = tx.Create(&dbModel.TaskHistory{
Id: util.GenFlakeID(),
TaskId: taskId,
UserId: userId,
Status: constant.TaskHistoryStatusPending,
}).Error
if err != nil {
return err
return false, err
}
}
return err
return err == nil, err
}
err = tx.Set("gorm:query_option", "FOR UPDATE NOWAIT").Where("task_id = ? and user_id = ? and created_at >= ?", taskId, userId, time.Now().UTC().Truncate(24*time.Hour)).First(temp).Error
err = tx.Set("gorm:query_option", "FOR UPDATE").Where("task_id = ? and user_id = ? and created_at >= ?", taskId, userId, time.Now().UTC().Truncate(24*time.Hour)).Take(temp).Error
if err == gorm.ErrRecordNotFound {
err = tx.Create(&dbModel.TaskHistory{
Id: util.GenFlakeID(),
TaskId: taskId,
UserId: userId,
Status: constant.TaskHistoryStatusPending,
}).Error
if err != nil {
return err
return false, err
}
}
return err
return err == nil, err
}
func (d *Dao) UpdateTaskHistory(id int, status string) (err error) {
err = d.db.Model(&dbModel.TaskHistory{}).Where("id = ?", id).Update("status", status).Error
return
}
func (d *Dao) GetUnprocessedTasks() (tasks []*dbModel.TaskHistory, err error) {
// twitter延迟3分钟,其余延迟1分钟
sql := fmt.Sprintf(
`SELECT TH.* FROM %s AS TH JOIN %s AS T ON TH.task_id = T.id WHERE TH.status = ? AND ((T.platform = ? AND TH.created_at <= NOW() - INTERVAL '3 minutes') OR (T.platform != ? AND TH.created_at <= NOW() - INTERVAL '1 minutes'))`,
(&dbModel.TaskHistory{}).TableName(), (&dbModel.Task{}).TableName(),
)
err = d.db.Raw(sql, constant.TaskHistoryStatusPending, constant.TaskPlatformTwitter, constant.TaskPlatformTwitter).Scan(&tasks).Error
// err = d.db.Model(&dbModel.TaskHistory{}).Where(
// "status = ? and created_at <= ?", constant.TaskHistoryStatusPending, gorm.Expr("NOW() - INTERVAL '3 minutes'"),
// ).Find(&tasks).Error
return
}
func (d *Dao) IsAdminUser(userId string) (ok bool, err error) {
var ct int64
err = d.db.Model(&dbModel.AdminUser{}).Where("user_id = ?", userId).Count(&ct).Error
return ct > 0, err
}
func (d *Dao) GetUnstoppedTasks() {
}
......@@ -7,6 +7,7 @@ import (
"fmt"
"strconv"
"strings"
"taskcenter/constant"
"time"
log "github.com/sirupsen/logrus"
......@@ -58,13 +59,17 @@ func (d *Dao) CreateTwitterProject(apiKey, apiSecret, accessToken, accessSecret,
return userId, respTemp.Data.Username, respTemp.Data.Name, nil
}
func (d *Dao) DoTweetTask(twitterUserId int, tweetId int, action string, start bool) (err error) {
func (d *Dao) DoTweetTask(twitterUserId int, tweetId int, twitterHandle, action string, start bool) (err error) {
body := map[string]string{
"user_id": fmt.Sprintf("%d", twitterUserId),
"task_id": fmt.Sprintf("%d", tweetId),
"task_type": action,
}
if twitterHandle != "" && action == constant.TwitterAPIActionFollow {
body["task_id"] = twitterHandle
}
buf := new(bytes.Buffer)
err = json.NewEncoder(buf).Encode(body)
if err != nil {
......@@ -90,8 +95,8 @@ func (d *Dao) DoTweetTask(twitterUserId int, tweetId int, action string, start b
return
}
func (d *Dao) CheckTwitterFollow(userId int, followerId string) (ok bool, err error) {
url := fmt.Sprintf("%s/verify/follow?user_id=%d&follower_id=%s", strings.TrimSuffix(d.c.TwitterTask.URL, "/"), userId, followerId)
func (d *Dao) CheckTwitterFollow(twitterUserHandle string, followerId string) (ok bool, err error) {
url := fmt.Sprintf("%s/verify/follower?task_id=%s&follower_id=%s", strings.TrimSuffix(d.c.TwitterTask.URL, "/"), twitterUserHandle, followerId)
log.WithField("url", url).Debug("check tweet follow")
data, err := httpGet(url)
if err != nil {
......
......@@ -9,10 +9,6 @@ services:
container_name: taskcenter-api
ports:
- "16670:8080"
depends_on:
aon-db:
condition: service_healthy
volumes:
- ./conf/taskcenter-api/config.toml:/config.toml
- ./data/taskcenter-api/api-log:/app
......@@ -20,5 +16,19 @@ services:
- "/bin/sh"
- "-c"
- "/usr/bin/api -c /config.toml"
restart:
unless-stopped
taskcenter-sync:
image: caduceus/taskcenter:latest
pull_policy: always
container_name: taskcenter-sync
volumes:
- ./conf/taskcenter-sync/config.toml:/config.toml
- ./data/taskcenter-sync/sync-log:/app
command:
- "/bin/sh"
- "-c"
- "/usr/bin/sync -c /config.toml"
restart:
unless-stopped
\ No newline at end of file
module sdk_api
module taskcenter
go 1.21.4
......
package middleware
import (
"sdk_api/util"
"taskcenter/util"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
)
func JWTMiddleware(c *gin.Context) {
tokenString := c.GetHeader("Authorization")
log.Debugln(tokenString)
if tokenString == "" || len(tokenString) < 7 {
c.JSON(200, gin.H{
"code": 1,
"msg": "invalid token",
"data": "",
})
c.Abort()
return
}
func JWTMiddleware(secret string, needAdmin bool) gin.HandlerFunc {
return func(c *gin.Context) {
tokenString := c.GetHeader("Authorization")
if tokenString == "" || len(tokenString) < 7 {
c.JSON(200, gin.H{
"code": 1,
"msg": "invalid token",
"data": "",
})
c.Abort()
return
}
ok, expired, uid, _, _ := util.ParseJWT(tokenString[7:])
if !ok {
c.JSON(200, gin.H{
"code": 1,
"msg": "invalid token",
"data": "",
})
c.Abort()
return
}
ok, expired, uid := util.ParseJWT(secret, tokenString[7:])
if !ok {
c.JSON(200, gin.H{
"code": 1,
"msg": "invalid token",
"data": "",
})
c.Abort()
return
}
if expired {
c.JSON(200, gin.H{
"code": 1,
"msg": "token expired",
"data": "",
})
c.Abort()
return
}
if expired {
c.JSON(200, gin.H{
"code": 1,
"msg": "token expired",
"data": "",
})
c.Abort()
return
}
log.WithField("uid", uid).Debug("jwt uid")
c.Set("jwt-uid", uid)
c.Next()
log.WithField("userId", uid).Debug("jwt uid")
c.Set("userId", uid)
c.Next()
}
}
package middleware
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"strings"
"taskcenter/config"
"taskcenter/constant"
"taskcenter/dao"
"time"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)
func JWTAuthMiddleware(d *dao.Dao, cfg config.SupabaseConfig, needAdmin bool) gin.HandlerFunc {
return func(c *gin.Context) {
tokenString := c.GetHeader("Authorization")
log.Debugln(tokenString)
if tokenString == "" || len(tokenString) < 7 {
c.JSON(200, gin.H{
"code": 1,
"msg": "invalid token",
"data": "",
})
c.Abort()
return
}
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
req, err := http.NewRequestWithContext(timeoutCtx, http.MethodGet, fmt.Sprintf("%s/auth/v1/user", strings.TrimRight(cfg.AuthURL, "/")), bytes.NewBuffer(nil))
if err != nil {
log.WithError(err).Error("auth middleware new request")
c.JSON(200, gin.H{
"code": 1,
"msg": constant.InternalError,
"data": "",
})
c.Abort()
return
}
req.Header.Set("apiKey", cfg.APIKey)
req.Header.Set("Authorization", tokenString)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.WithError(err).Error("auth middleware do request")
c.JSON(200, gin.H{
"code": 1,
"msg": constant.InternalError,
"data": "",
})
c.Abort()
return
}
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
log.WithError(err).Error("auth middleware read all")
c.JSON(200, gin.H{
"code": 1,
"msg": constant.InternalError,
"data": "",
})
c.Abort()
return
}
userId := gjson.Get(string(data), "data.user.id").String()
isAnonymous := gjson.Get(string(data), "data.user.is_anonymous").Bool()
if userId == "" || isAnonymous {
log.WithField("resp", string(data)).Warn("parse data from bearer token")
c.JSON(200, gin.H{
"code": 1,
"msg": "invalid token",
"data": "",
})
c.Abort()
return
}
c.Set("userId", userId)
if needAdmin {
ok, err := d.IsAdminUser(userId)
if err != nil {
log.WithError(err).Error("auth is admin")
c.JSON(200, gin.H{
"code": 1,
"msg": constant.InternalError,
"data": "",
})
c.Abort()
return
}
if !ok {
c.JSON(200, gin.H{
"code": 1,
"msg": "no access permissions",
"data": "",
})
c.Abort()
return
}
}
c.Next()
}
}
......@@ -9,6 +9,7 @@ type CreateProjectRequest struct {
TwitterAccessSecret string `json:"twitterAccessSecret" binding:"required"`
TwitterToken string `json:"twitterToken" binding:"required"`
TelegramChatId int `json:"telegramChatId" binding:"required"`
UserId string `json:"-"`
}
type CreateProjectResponse struct {
......@@ -34,6 +35,7 @@ type GetProjectResponse struct {
TwitterAccessToken string `json:"twitterAccessToken,omitempty"`
TwitterAccessSecret string `json:"twitterAccessSecret,omitempty"`
TwitterToken string `json:"twitterToken,omitempty"`
UserId string `json:"-"`
}
type CreateGroupRequest struct {
......@@ -41,6 +43,7 @@ type CreateGroupRequest struct {
GroupId int `json:"groupId,omitempty"`
Description string `json:"description"`
Tasks []Task `json:"tasks"`
UserId string `json:"-"`
}
type Task struct {
......@@ -54,6 +57,7 @@ type Task struct {
Start int `json:"start" binding:"required"`
End int `json:"end" binding:"required"`
Daily bool `json:"daily" binding:"required"`
Status string `json:"status"`
}
type GetGroupResponse CreateGroupRequest
......
......@@ -21,3 +21,7 @@ type Identity struct {
func (*Identity) TableName() string {
return "auth.identities"
}
func (*Identity) TelegramAuthTableName() string {
return "telegram.identities"
}
package db_model
import (
"database/sql"
"time"
"gorm.io/gorm"
)
......@@ -17,6 +20,7 @@ func (t *TaskAction) TableName() string {
type Project struct {
Id int `gorm:"primaryKey;autoIncrement:false"`
UserId string `gorm:"type:uuid;not null;comment:关联用户id"`
Name string `gorm:"type:text;not null;comment:项目名称"`
Description string `gorm:"type:text;not null;comment:项目描述"`
TwitterAPIKey string `gorm:"type:text;not null;comment:tweet api key"`
......@@ -41,6 +45,7 @@ func (p *Project) TableName() string {
type TaskGroup struct {
Id int `gorm:"primaryKey;autoIncrement:false"`
UserId string `gorm:"type:uuid;not null;comment:关联用户id"`
ProjectId int `gorm:"type:int;not null;comment:项目id"`
Description string `gorm:"type:text;not null;comment:任务组描述"`
gorm.Model
......@@ -51,21 +56,22 @@ func (t *TaskGroup) TableName() string {
}
type Task struct {
Id int `gorm:"primaryKey;autoIncrement:false"`
GroupId int `gorm:"type:int;index;not null;comment:任务组id"`
Platform string `gorm:"type:text;not null;comment:任务平台"`
Action string `gorm:"type:text;not null;comment:任务动作"`
Url string `gorm:"type:text;not null;comment:任务链接"`
TwitterUserId int `gorm:"type:int;not null;comment:tweet user id,用于关注"`
TwitterHandle string `gorm:"type:text;not null;comment:tweet handle,用于关注"`
TweetId int `gorm:"type:int;not null;comment:推文id,用于转发点赞"`
TelegramChatId int `gorm:"type:int;not null;comment:telegram群id"`
TelegramChatUsername string `gorm:"type:text;not null;comment:telegram群用户名"`
Description string `gorm:"type:text;not null;comment:任务描述"`
Reward int `gorm:"type:int;not null;comment:任务奖励"`
Start int `gorm:"type:int;not null;comment:任务开始时间"`
End int `gorm:"type:int;not null;comment:任务结束时间"`
Daily int `gorm:"type:int;not null;comment:是否是每日任务"`
Id int `gorm:"primaryKey;autoIncrement:false"`
GroupId int `gorm:"type:int;index;not null;comment:任务组id"`
Platform string `gorm:"type:text;not null;comment:任务平台"`
Action string `gorm:"type:text;not null;comment:任务动作"`
Url string `gorm:"type:text;not null;comment:任务链接"`
TwitterUserId int `gorm:"type:int;not null;comment:tweet user id,用于关注"`
TwitterHandle string `gorm:"type:text;not null;comment:tweet handle,用于关注"`
TweetId int `gorm:"type:int;not null;comment:推文id,用于转发点赞"`
TelegramChatId int `gorm:"type:int;not null;comment:telegram群id"`
TelegramChatUsername string `gorm:"type:text;not null;comment:telegram群用户名"`
Description string `gorm:"type:text;not null;comment:任务描述"`
Reward int `gorm:"type:int;not null;comment:任务奖励"`
Start int `gorm:"type:int;not null;comment:任务开始时间"`
End int `gorm:"type:int;index;not null;comment:任务结束时间"`
Daily bool `gorm:"type:bool;not null;comment:是否是每日任务"`
TwitterTaskEndAt sql.NullTime `gorm:"index;comment:推特可用,推特任务中心已停止"`
gorm.Model
}
......@@ -74,13 +80,25 @@ func (t *Task) TableName() string {
}
type TaskHistory struct {
Id int `gorm:"primaryKey;autoIncrement:false"`
TaskId int `gorm:"type:int;index;not null;comment:任务id"`
UserId string `gorm:"type:uuid;index;not null;comment:用户id"`
IsReward int `gorm:"type:int;not null;comment:是否已发放奖励"`
Id int `gorm:"primaryKey;autoIncrement:false"`
TaskId int `gorm:"type:int;index;not null;comment:任务id"`
UserId string `gorm:"type:uuid;index;not null;comment:用户id"`
RewardTxHash string `gorm:"type:text;not null;index;comment:发放奖励的txHash"`
Status string `gorm:"type:text;not null;comment:任务状态"`
CreatedAt time.Time `gorm:"index"`
gorm.Model
}
func (t *TaskHistory) TableName() string {
return "taskcenter.task_history"
}
type AdminUser struct {
Id int `gorm:"primaryKey;autoIncrement:true"`
UserId string `gorm:"type:uuid;index;not null;comment:用户id"`
gorm.Model
}
func (a *AdminUser) TableName() string {
return "taskcenter.admin_user"
}
package server
import (
"sdk_api/constant"
apiModel "sdk_api/model/api"
"strconv"
"taskcenter/constant"
apiModel "taskcenter/model/api"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
)
func createGroup(c *gin.Context) {
req := &apiModel.CreateGroupRequest{}
if err := c.ShouldBindJSON(req); err != nil {
log.Info("11111", err)
c.JSON(200, withError(constant.InvalidParam))
return
}
req.UserId = c.GetString("userId")
if len(req.Tasks) == 0 {
c.JSON(200, withError(constant.InvalidParam))
......@@ -33,12 +32,17 @@ func createGroup(c *gin.Context) {
return
}
if project.UserId != req.UserId {
c.JSON(200, withError("permission denied"))
return
}
for _, task := range req.Tasks {
if !constant.IsValidAction(task.Action) || !constant.IsValidPlatform(task.Platform) {
log.Info("1111", task.Action, task.Platform)
c.JSON(200, withError(constant.InvalidParam))
return
}
// go wait retry done
// if task.Platform == constant.TaskPlatformTelegram && task.TelegramChatId == 0 {
// c.JSON(200, withError(constant.InvalidParam))
// return
......@@ -70,8 +74,9 @@ func getGroup(c *gin.Context) {
c.JSON(200, withError(constant.InvalidParam))
return
}
userId := c.GetString("userId")
resp, err := srv.GetGroup(gid)
resp, err := srv.GetGroup(gid, userId)
if err != nil {
c.JSON(200, withError(constant.InternalError))
return
......
package server
import (
"fmt"
"sdk_api/constant"
apiModel "sdk_api/model/api"
"strconv"
"taskcenter/constant"
apiModel "taskcenter/model/api"
"github.com/gin-gonic/gin"
)
......@@ -12,11 +11,11 @@ import (
func createProject(c *gin.Context) {
req := &apiModel.CreateProjectRequest{}
if err := c.ShouldBindJSON(&req); err != nil {
fmt.Println(err)
c.JSON(200, withError(constant.InvalidParam))
return
}
req.UserId = c.GetString("userId")
resp, err := srv.CreateProject(req)
if err != nil {
c.JSON(200, withError(constant.InternalError))
......
package server
import (
"sdk_api/middleware"
"taskcenter/middleware"
"github.com/gin-gonic/gin"
)
......@@ -12,22 +12,23 @@ func initRouter(e *gin.Engine) {
v1 := e.Group("/api/v1")
{
project := v1.Group("/project")
project := v1.Group("/project", middleware.JWTMiddleware(conf.Supabase.JWTSecret, true))
project.GET("/:pid", getProject) // 获取项目详情
project.GET("/list", listProject) // 获取项目列表
project.POST("/create", createProject) // 创建项目
}
{
group := v1.Group("/group")
group := v1.Group("/group", middleware.JWTMiddleware(conf.Supabase.JWTSecret, false))
group.GET("/:gid", getGroup) // 获取任务组任务详情
group.GET("/list", listGroup) // 获取任务组任务详情
group.POST("/create", createGroup) // 创建任务组
}
{
task := v1.Group("/task")
task.GET("/check/:tid", checkTask) // 检查任务是否完成
task := v1.Group("/task", middleware.JWTMiddleware(conf.Supabase.JWTSecret, false))
task.POST("/submit/:tid", submitTask) // 提交任务
task.GET("/check/:tid", checkTask) // 检查任务是否完成
}
}
package server
import (
"sdk_api/config"
"sdk_api/service"
"taskcenter/config"
"taskcenter/dao"
"taskcenter/service"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
......@@ -10,10 +11,12 @@ import (
)
var srv *service.Service
var d *dao.Dao
var conf *config.Config
func StartServer(_srv *service.Service, _conf *config.Config) {
func StartServer(_srv *service.Service, _dao *dao.Dao, _conf *config.Config) {
srv = _srv
d = _dao
conf = _conf
if !conf.Debug {
gin.SetMode(gin.ReleaseMode)
......
package server
import (
"sdk_api/constant"
"strconv"
"taskcenter/constant"
"github.com/gin-gonic/gin"
)
......@@ -10,34 +10,56 @@ import (
func checkTask(c *gin.Context) {
_taskId := c.Param("tid")
taskId, _ := strconv.Atoi(_taskId)
userId := c.Query("userId")
// userId := c.Query("userId")
userId := c.GetString("userId")
if taskId == 0 || userId == "" {
c.JSON(200, withError(constant.InvalidParam))
return
}
exist, done, expired, err := srv.CheckTask(taskId, userId)
status, err := srv.GetTaskResult(taskId, userId)
if err != nil {
c.JSON(200, withError(constant.InternalError))
return
}
if !exist {
c.JSON(200, withError("task not found"))
c.JSON(200, withSuccess(gin.H{"status": status}))
}
// submitTask 提交任务,等待后台检查
func submitTask(c *gin.Context) {
_taskId := c.Param("tid")
taskId, _ := strconv.Atoi(_taskId)
// userId := c.Query("userId")
userId := c.GetString("userId")
if taskId == 0 || userId == "" {
c.JSON(200, withError(constant.InvalidParam))
return
}
if done || expired {
c.JSON(200, withSuccess(gin.H{"done": done}))
task, err := srv.GetTaskDetail(taskId)
if err != nil {
c.JSON(200, withError(constant.InternalError))
return
}
if task == nil {
c.JSON(200, withError("task not found"))
return
}
done, err = srv.SyncTask(taskId, userId)
exist, err := srv.SubmitTask(taskId, userId, task.Daily)
if err != nil {
c.JSON(200, withError(constant.InternalError))
return
}
c.JSON(200, withSuccess(gin.H{"done": done}))
if exist {
c.JSON(200, withError("task already submitted"))
return
}
c.JSON(200, withSuccess(gin.H{}))
}
......@@ -2,10 +2,10 @@ package service
import (
"fmt"
"sdk_api/constant"
apiModel "sdk_api/model/api"
dbModel "sdk_api/model/db"
"sdk_api/util"
"taskcenter/constant"
apiModel "taskcenter/model/api"
dbModel "taskcenter/model/db"
"taskcenter/util"
log "github.com/sirupsen/logrus"
)
......@@ -13,6 +13,7 @@ import (
func (s *Service) CreateGroup(req *apiModel.CreateGroupRequest) (gid int, err error) {
g := &dbModel.TaskGroup{
Id: util.GenFlakeID(),
UserId: req.UserId,
ProjectId: req.ProjectId,
Description: req.Description,
}
......@@ -29,24 +30,20 @@ func (s *Service) CreateGroup(req *apiModel.CreateGroupRequest) (gid int, err er
}
for _, task := range req.Tasks {
daily := 0
if task.Daily {
daily = 1
}
gt := &dbModel.Task{
Id: util.GenFlakeID(),
GroupId: g.Id,
Platform: task.Platform,
Action: task.Action,
TwitterHandle: project.TwitterHandle,
TwitterUserId: project.TwitterUserId,
Description: task.Description,
Reward: task.Reward,
Start: task.Start,
End: task.End,
Daily: daily,
Id: util.GenFlakeID(),
GroupId: g.Id,
Platform: task.Platform,
Action: task.Action,
Description: task.Description,
Reward: task.Reward,
Start: task.Start,
End: task.End,
Daily: task.Daily,
}
if task.Platform == constant.TaskPlatformTwitter {
gt.TwitterHandle = project.TwitterHandle
gt.TwitterUserId = project.TwitterUserId
var twitterAPIAction string
switch task.Action {
case constant.TaskActionFollow:
......@@ -57,16 +54,17 @@ func (s *Service) CreateGroup(req *apiModel.CreateGroupRequest) (gid int, err er
gt.TweetId = task.TweetId
gt.Url = fmt.Sprintf("https://x.com/intent/like?tweet_id=%d", task.TweetId)
case constant.TaskActionRetweet:
twitterAPIAction = constant.TwitterAPIActionRetwitter
twitterAPIAction = constant.TwitterAPIActionRetweet
gt.TweetId = task.TweetId
gt.Url = fmt.Sprintf("https://x.com/intent/retweet?tweet_id=%d", task.TweetId)
}
tweetId := gt.TweetId
var twitterHandle string
// 关注时传handle
if task.Action == constant.TaskActionFollow {
tweetId = gt.TwitterUserId
twitterHandle = project.TwitterHandle
}
// 推特任务中心,启动任务
err = s.d.DoTweetTask(gt.TwitterUserId, tweetId, twitterAPIAction, true)
err = s.d.DoTweetTask(gt.TwitterUserId, gt.TweetId, twitterHandle, twitterAPIAction, true)
if err != nil {
log.WithError(err).Error("do tweet task error")
return 0, err
......@@ -88,7 +86,7 @@ func (s *Service) CreateGroup(req *apiModel.CreateGroupRequest) (gid int, err er
return g.Id, nil
}
func (s *Service) GetGroup(gid int) (resp *apiModel.GetGroupResponse, err error) {
func (s *Service) GetGroup(gid int, userId string) (resp *apiModel.GetGroupResponse, err error) {
resp = &apiModel.GetGroupResponse{Tasks: make([]apiModel.Task, 0)}
g, err := s.d.GetGroup(gid)
if err != nil {
......@@ -110,7 +108,7 @@ func (s *Service) GetGroup(gid int) (resp *apiModel.GetGroupResponse, err error)
}
for _, task := range tasks {
resp.Tasks = append(resp.Tasks, apiModel.Task{
_task := apiModel.Task{
TaskId: task.Id,
Platform: task.Platform,
Action: task.Action,
......@@ -119,9 +117,17 @@ func (s *Service) GetGroup(gid int) (resp *apiModel.GetGroupResponse, err error)
Reward: task.Reward,
Start: task.Start,
End: task.End,
Daily: task.Daily == 1,
Daily: task.Daily,
TweetId: task.TweetId,
})
}
status, err := s.GetTaskResult(_task.TaskId, userId)
if err != nil {
log.WithError(err).Error("group check task")
return nil, err
}
_task.Status = status
resp.Tasks = append(resp.Tasks, _task)
}
return
}
......@@ -148,8 +154,6 @@ func (s *Service) GetGroupList(page, pageSize int) (resp *apiModel.GetGroupListR
return resp, err
}
log.Error("len: ", len(tasks))
for _, task := range tasks {
resp.Items[len(resp.Items)-1].Tasks = append(resp.Items[len(resp.Items)-1].Tasks, apiModel.Task{
TaskId: task.Id,
......@@ -160,7 +164,7 @@ func (s *Service) GetGroupList(page, pageSize int) (resp *apiModel.GetGroupListR
Reward: task.Reward,
Start: task.Start,
End: task.End,
Daily: task.Daily == 1,
Daily: task.Daily,
TweetId: task.TweetId,
})
}
......
package service
import (
apiModel "sdk_api/model/api"
dbModel "sdk_api/model/db"
"sdk_api/util"
apiModel "taskcenter/model/api"
dbModel "taskcenter/model/db"
"taskcenter/util"
log "github.com/sirupsen/logrus"
)
......@@ -24,6 +24,7 @@ func (s *Service) CreateProject(req *apiModel.CreateProjectRequest) (resp *apiMo
p := &dbModel.Project{
Id: util.GenFlakeID(),
UserId: req.UserId,
Name: req.ProjectName,
Description: req.Description,
TwitterAPIKey: req.TwitterAPIKey,
......@@ -102,5 +103,6 @@ func (s *Service) GetProject(pid int) (resp *apiModel.GetProjectResponse, err er
// TwitterAccessToken: p.TwitterAccessToken,
// TwitterAccessSecret: p.TwitterAccessSecret,
// TwitterToken: p.TwitterToken,
UserId: p.UserId,
}, nil
}
package service
import (
"sdk_api/config"
"sdk_api/dao"
"taskcenter/config"
"taskcenter/dao"
)
type Service struct {
......
package service
import (
"fmt"
"sdk_api/constant"
dbModel "taskcenter/model/db"
"time"
log "github.com/sirupsen/logrus"
......@@ -35,101 +34,26 @@ func (s *Service) CheckTask(taskId int, userId string) (exist, done bool, expire
return
}
func (s *Service) SyncTask(taskId int, userId string) (done bool, err error) {
task, err := s.d.GetTaskDetail(taskId)
func (s *Service) GetTaskDetail(taskId int) (task *dbModel.Task, err error) {
task, err = s.d.GetTaskDetail(taskId)
if err != nil {
log.WithError(err).Error("get task error")
return
}
if task == nil {
return
}
return
}
switch task.Platform {
case constant.TaskPlatformTelegram:
telegramUserId, err := s.d.GetProviderId(userId, constant.TaskPlatformTelegram)
if err != nil {
log.WithError(err).Error("get provider telegram user id error")
return false, err
}
if telegramUserId == "" {
return false, nil
}
switch task.Action {
case constant.TaskActionJoin, constant.TaskActionActive:
var taskDone bool
if task.Action == constant.TaskActionJoin {
taskDone, err = s.d.CheckTGJoin(telegramUserId, task.TelegramChatId)
} else {
taskDone, err = s.d.CheckTGActive(telegramUserId, task.TelegramChatId)
}
if err != nil {
log.WithError(err).Errorf("check tg %s error", task.Action)
return false, err
}
if !taskDone {
return false, nil
}
err = s.d.CreateTaskHistory(taskId, userId, task.Daily == 1)
if err != nil {
log.WithError(err).Error("create task history error")
return false, err
}
return true, nil
default:
return false, fmt.Errorf("unknown task action: %s", task.Action)
}
case constant.TaskPlatformTwitter:
twitterUserId, err := s.d.GetProviderId(userId, constant.TaskPlatformTwitter)
if err != nil {
log.WithError(err).Error("get provider twitter user id error")
return false, err
}
switch task.Action {
case constant.TaskActionFollow:
followed, err := s.d.CheckTwitterFollow(task.TwitterUserId, twitterUserId)
if err != nil {
log.WithError(err).Error("check twitter follow error")
return false, err
}
if !followed {
return false, nil
}
err = s.d.CreateTaskHistory(taskId, userId, task.Daily == 1)
if err != nil {
log.WithError(err).Error("create task history error")
return false, err
}
return true, nil
case constant.TaskActionLike, constant.TaskActionRetweet, constant.TaskActionReply:
var taskDone bool
if task.Action == constant.TaskActionLike {
taskDone, err = s.d.CheckTwitterLike(task.TweetId, twitterUserId, 0, 0)
} else {
taskDone, err = s.d.CheckTwitterRetweet(task.TweetId, twitterUserId, 0, 0)
}
if err != nil {
log.WithError(err).Errorf("check twitter %s error", task.Action)
return false, err
}
if !taskDone {
return false, nil
}
err = s.d.CreateTaskHistory(taskId, userId, task.Daily == 1)
if err != nil {
log.WithError(err).Error("create task history error")
return false, err
}
return true, nil
func (s *Service) GetTaskResult(taskId int, userId string) (status string, err error) {
status, err = s.d.GetTaskResult(taskId, userId)
if err != nil {
log.WithError(err).Error("get task result error")
}
return
}
default:
return false, fmt.Errorf("unknown task action: %s", task.Action)
}
default:
return false, fmt.Errorf("unknown task platform: %s", task.Platform)
func (s *Service) SubmitTask(taskId int, userId string, isDailyTask bool) (exist bool, err error) {
exist, err = s.d.CreateTaskHistory(taskId, userId, isDailyTask)
if err != nil {
log.WithError(err).Error("create task history error")
}
return
}
package sync
import (
"taskcenter/dao"
)
type Sync struct {
d *dao.Dao
}
func NewSync(d *dao.Dao) *Sync {
return &Sync{
d: d,
}
}
func (s *Sync) Start() {
s.ProcessTasks()
}
package sync
import (
"fmt"
"taskcenter/constant"
"time"
log "github.com/sirupsen/logrus"
)
func (s *Sync) ProcessTasks() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
tasks, err := s.d.GetUnprocessedTasks()
if err != nil {
log.WithError(err).Error("get unprocessed tasks error")
return
}
for _, task := range tasks {
status := constant.TaskHistoryStatusSuccess
ok, err := s.SyncTask(task.Id, task.UserId)
if err != nil {
log.WithError(err).Error("sync task error")
}
if !ok {
status = constant.TaskHistoryStatusRetry
}
log.WithFields(log.Fields{"task_id": task.Id, "error": err, "status": status}).Info("sync task")
err = s.d.UpdateTaskHistory(task.Id, status)
if err != nil {
log.WithError(err).Error("update task history error")
}
}
}
}
}
func (s *Sync) SyncTask(taskId int, userId string) (ok bool, err error) {
task, err := s.d.GetTaskDetail(taskId)
if err != nil {
log.WithError(err).Error("get task error")
return
}
if task == nil {
return
}
switch task.Platform {
case constant.TaskPlatformTelegram:
telegramUserId, err := s.d.GetProviderId(userId, constant.TaskPlatformTelegram)
if err != nil {
log.WithError(err).Error("get provider telegram user id error")
return false, err
}
if telegramUserId == "" {
return false, nil
}
switch task.Action {
case constant.TaskActionJoin, constant.TaskActionActive:
var taskDone bool
if task.Action == constant.TaskActionJoin {
taskDone, err = s.d.CheckTGJoin(telegramUserId, task.TelegramChatId)
} else {
taskDone, err = s.d.CheckTGActive(telegramUserId, task.TelegramChatId)
}
if err != nil {
log.WithError(err).Errorf("check tg %s error", task.Action)
return false, err
}
return taskDone, nil
default:
return false, fmt.Errorf("unknown task action: %s", task.Action)
}
case constant.TaskPlatformTwitter:
twitterUserId, err := s.d.GetProviderId(userId, constant.TaskPlatformTwitter)
if err != nil {
log.WithError(err).Error("get provider twitter user id error")
return false, err
}
switch task.Action {
case constant.TaskActionFollow:
followed, err := s.d.CheckTwitterFollow(task.TwitterHandle, twitterUserId)
if err != nil {
log.WithError(err).Error("check twitter follow error")
return false, err
}
if !followed {
return false, nil
}
return true, nil
case constant.TaskActionLike, constant.TaskActionRetweet, constant.TaskActionReply:
var taskDone bool
if task.Action == constant.TaskActionLike {
taskDone, err = s.d.CheckTwitterLike(task.TweetId, twitterUserId, 0, 0)
} else {
taskDone, err = s.d.CheckTwitterRetweet(task.TweetId, twitterUserId, 0, 0)
}
if err != nil {
log.WithError(err).Errorf("check twitter %s error", task.Action)
return false, err
}
return taskDone, nil
default:
return false, fmt.Errorf("unknown task action: %s", task.Action)
}
default:
return false, fmt.Errorf("unknown task platform: %s", task.Platform)
}
}
......@@ -6,21 +6,7 @@ import (
"github.com/golang-jwt/jwt/v5"
)
const secret = "cxcZa005Y5zWH1wFgXvPGDL02Ey4ZCLAh2XFcfp7HhG3wTg5TbcnhuYhNvN3YLgt"
func GenerateJWT(uid, platform, platformId string) string {
tk := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{
"uid": uid,
"platform": platform,
"platformId": platformId,
"iat": time.Now().Unix(),
"exp": time.Now().Add(7 * 24 * time.Hour).Unix(),
})
j, _ := tk.SignedString([]byte(secret))
return j
}
func ParseJWT(token string) (ok, expired bool, uid, platform, platformId string) {
func ParseJWT(secret, token string) (ok, expired bool, uid string) {
claims := jwt.MapClaims{}
tk, err := jwt.ParseWithClaims(token, claims, func(t *jwt.Token) (interface{}, error) {
return []byte(secret), nil
......@@ -33,14 +19,20 @@ func ParseJWT(token string) (ok, expired bool, uid, platform, platformId string)
return
}
uid = claims["uid"].(string)
platform = claims["platform"].(string)
platformId = claims["platformId"].(string)
exp := claims["exp"].(float64)
if time.Now().Unix() > int64(exp) {
exp, _ := claims.GetExpirationTime()
if exp == nil || exp.Before(time.Now()) {
expired = true
return
}
if claims["is_anonymous"].(bool) {
return
}
uid = claims["sub"].(string)
if uid == "" {
return
}
ok = true
return
......
......@@ -5,12 +5,9 @@ import (
)
func TestA(t *testing.T) {
token := GenerateJWT("12345", "telegram", "0x12345")
t.Logf("token: %s", token)
ParseJWT(token)
ParseJWT("token", "")
}
func TestB(t *testing.T) {
t.Log(ParseJWT("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MTgwMTU2MTQsImlhdCI6MTcxNzQxMDgxNCwicGxhdGZvcm0iOiJ0ZWxlZ3JhbSIsInBsYXRmb3JtSWQiOiIweDEyMzQ1IiwidWlkIjoiMTIzNDUifQ.yZ1V_cGozBrwK55Y9iZsG4C-B5T96V2E3-AqP6CqkR8"))
t.Log(ParseJWT("", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MTgwMTU2MTQsImlhdCI6MTcxNzQxMDgxNCwicGxhdGZvcm0iOiJ0ZWxlZ3JhbSIsInBsYXRmb3JtSWQiOiIweDEyMzQ1IiwidWlkIjoiMTIzNDUifQ.yZ1V_cGozBrwK55Y9iZsG4C-B5T96V2E3-AqP6CqkR8"))
}
package util
import (
"crypto/hmac"
"crypto/sha256"
"fmt"
"net/url"
"sort"
"strconv"
"strings"
"time"
"github.com/tidwall/gjson"
)
func VerifyInitData(initData, botToken string) (ok bool, botId, userId string) {
h := hmac.New(sha256.New, []byte("WebAppData"))
h.Write([]byte(botToken))
secret := h.Sum(nil)
h2 := hmac.New(sha256.New, secret)
params, err := url.ParseQuery(initData)
if err != nil {
return
}
var hashval string
var keys []string
for key := range params {
if key == "hash" {
hashval = params.Get(key)
continue
}
if key == "auth_date" {
authDate, _ := strconv.Atoi(params.Get(key))
if int64(authDate) < time.Now().Unix()-3600 || int64(authDate) > time.Now().Unix()+300 {
// todo 可以限制超时时间
return false, "", ""
}
}
if key == "user" {
userId = gjson.Get(params.Get(key), "id").String()
}
keys = append(keys, key)
}
sort.Strings(keys)
var payloads []string
for _, key := range keys {
payloads = append(payloads, fmt.Sprintf("%s=%s", key, params.Get(key)))
}
payload := strings.Join(payloads, "\n")
h2.Write([]byte(payload))
h2sum := h2.Sum(nil)
items := strings.Split(botToken, ":")
if len(items) != 2 {
return
}
ok = fmt.Sprintf("%x", h2sum) == hashval
botId = items[0]
return
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment