Commit 7c694525 authored by brent's avatar brent

modify client api

parent 583ff673
......@@ -395,7 +395,7 @@ func (server *AppController) UpdateJWT() {
return
}
jwttoken := models.JwtToken{Id: appRequest.Id}
err = mysql.GetMysqlInstace().Ormer.Read(&token)
err = mysql.GetMysqlInstace().Ormer.Read(&jwttoken)
if err != nil {
server.respond(models.BusinessFailed, "JWT-token 不存在")
return
......@@ -406,7 +406,7 @@ func (server *AppController) UpdateJWT() {
}
jwttoken.Name = appRequest.Name
_, err = mysql.GetMysqlInstace().Ormer.Update(&token)
_, err = mysql.GetMysqlInstace().Ormer.Update(&jwttoken)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
return
......
package controllers
import (
"ai_developer_admin/libs/ipgeo"
"ai_developer_admin/libs/odysseus"
"ai_developer_admin/libs/postgres"
"ai_developer_admin/libs/registry"
"ai_developer_admin/models"
"encoding/json"
"fmt"
......@@ -17,6 +19,54 @@ type ClientController struct {
MainController
}
func (server *ClientController) GlobalStatistics() {
body := server.Ctx.Input.RequestBody
appRequest := models.AppRequest{}
err := json.Unmarshal(body, &appRequest) //解析body中数据
logs.Debug("appRequest", appRequest)
if err != nil {
server.respond(models.NoRequestBody, err.Error())
return
}
//if appRequest.WorkerAcc == "" {
// server.respond(models.MissingParameter, "Missing worker_acc parameter")
// return
//}
countQB, _ := orm.NewQueryBuilder("mysql")
countQB.Select("count(*),sum(workload) AS workload").
From("bills")
sql := countQB.String()
taskCount, err := postgres.QueryBills(sql)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
return
}
count := int64(0)
workload := int64(0)
if len(taskCount) > 0 {
task := taskCount[0]
count, _ = strconv.ParseInt(task.Count, 10, 64)
workload, _ = strconv.ParseInt(task.Workload, 10, 64)
}
_, total, _ := registry.NodeManagersByPage(0, 10)
responseData := struct {
TotalWorkload int64 `json:"total_workload"`
TotalTask int64 `json:"total_task"`
//TotalCalls int64 `json:"total_calls"`
NodeNums int `json:"node_nums"`
}{
TotalWorkload: workload,
TotalTask: count,
//TotalCalls: count,
NodeNums: total,
}
server.respond(http.StatusOK, "", responseData)
}
func (server *ClientController) Statistics() {
body := server.Ctx.Input.RequestBody
appRequest := models.AppRequest{}
......@@ -47,44 +97,44 @@ func (server *ClientController) Statistics() {
sql = sumQB.String()
workload, err := postgres.QueryTotal(sql)
currentTime := time.Now()
endTime := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), 23, 59, 59, 0, time.UTC)
temp := fmt.Sprintf("-%dh", 24*7)
m, _ := time.ParseDuration(temp)
tempTime := currentTime.Add(m)
startTime := time.Date(tempTime.Year(), tempTime.Month(), tempTime.Day(), 0, 0, 0, 0, time.UTC)
start := fmt.Sprintf(startTime.Format(format))
end := fmt.Sprintf(endTime.Format(format))
YCountQB, _ := orm.NewQueryBuilder("mysql")
YCountQB.Select("count(*)").
From("bills").Where(fmt.Sprintf("worker_acc = '%s'", appRequest.WorkerAcc)).
And(fmt.Sprintf("time >= '%s'", start)).And(fmt.Sprintf("time <= '%s'", end))
sql = YCountQB.String()
YCount, err := postgres.QueryTotal(sql)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
return
}
YSumQB, _ := orm.NewQueryBuilder("mysql")
YSumQB.Select("sum(workload)").
From("bills").Where(fmt.Sprintf("worker_acc = '%s'", appRequest.WorkerAcc)).
And(fmt.Sprintf("time >= '%s'", start)).And(fmt.Sprintf("time <= '%s'", end))
sql = YSumQB.String()
YWorkload, err := postgres.QueryTotal(sql)
//currentTime := time.Now()
//endTime := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), 23, 59, 59, 0, time.UTC)
//temp := fmt.Sprintf("-%dh", 24*7)
//m, _ := time.ParseDuration(temp)
//tempTime := currentTime.Add(m)
//startTime := time.Date(tempTime.Year(), tempTime.Month(), tempTime.Day(), 0, 0, 0, 0, time.UTC)
//
//start := fmt.Sprintf(startTime.Format(format))
//end := fmt.Sprintf(endTime.Format(format))
//YCountQB, _ := orm.NewQueryBuilder("mysql")
//YCountQB.Select("count(*)").
// From("bills").Where(fmt.Sprintf("worker_acc = '%s'", appRequest.WorkerAcc)).
// And(fmt.Sprintf("time >= '%s'", start)).And(fmt.Sprintf("time <= '%s'", end))
//sql = YCountQB.String()
//YCount, err := postgres.QueryTotal(sql)
//if err != nil {
// server.respond(models.BusinessFailed, err.Error())
// return
//}
//YSumQB, _ := orm.NewQueryBuilder("mysql")
//YSumQB.Select("sum(workload)").
// From("bills").Where(fmt.Sprintf("worker_acc = '%s'", appRequest.WorkerAcc)).
// And(fmt.Sprintf("time >= '%s'", start)).And(fmt.Sprintf("time <= '%s'", end))
//sql = YSumQB.String()
//YWorkload, err := postgres.QueryTotal(sql)
responseData := struct {
TotalWorkload int64 `json:"total_workload"`
TotalTask int64 `json:"total_task"`
YesterdayWorkload int64 `json:"yesterday_workload"`
YesterdayTask int64 `json:"yesterday_task"`
//YesterdayWorkload int64 `json:"yesterday_workload"`
//YesterdayTask int64 `json:"yesterday_task"`
}{
TotalWorkload: workload,
TotalTask: count,
YesterdayWorkload: YWorkload,
YesterdayTask: YCount,
//YesterdayWorkload: YWorkload,
//YesterdayTask: YCount,
}
server.respond(http.StatusOK, "", responseData)
}
......@@ -140,7 +190,7 @@ func (server *ClientController) TasksPerDay() {
queryQB.And(fmt.Sprintf("time >= '%s'", startTime)).And(fmt.Sprintf("time <= '%s'", endTime))
}
sql := fmt.Sprintf("%s SAMPLE BY 1M ALIGN TO CALENDAR", countQB.String())
sql := fmt.Sprintf("%s SAMPLE BY 1d ALIGN TO CALENDAR", countQB.String())
sql = fmt.Sprintf("SELECT count(*) FROM (%s);", sql)
total, err := postgres.QueryTotal(sql)
......@@ -163,7 +213,7 @@ func (server *ClientController) TasksPerDay() {
}
//queryQB.OrderBy("time").Desc()
sql = fmt.Sprintf("%s SAMPLE BY 1M ALIGN TO CALENDAR ORDER BY time DESC LIMIT %d,%d;", queryQB.String(), offset, size)
sql = fmt.Sprintf("%s SAMPLE BY 1d ALIGN TO CALENDAR ORDER BY time DESC LIMIT %d,%d;", queryQB.String(), offset, size)
counts, err := postgres.QueryBills(sql)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
......@@ -179,7 +229,97 @@ func (server *ClientController) TasksPerDay() {
server.respond(http.StatusOK, "", responseData)
}
func (server *ClientController) Tasks() {
func (server *ClientController) GlobalTasksPerDay() {
body := server.Ctx.Input.RequestBody
appRequest := models.AppRequest{}
err := json.Unmarshal(body, &appRequest) //解析body中数据
logs.Debug("appRequest", appRequest)
if appRequest.Page == 0 {
appRequest.Page = 1
}
if appRequest.Size == 0 {
appRequest.Size = 10
}
offset := (appRequest.Page - 1) * appRequest.Size
size := appRequest.Page * appRequest.Size
//if appRequest.WorkerAcc == "" && appRequest.ProfitAcc == "" {
// server.respond(models.MissingParameter, "At least one of worker_acc or profit_acc needs to be passed.")
// return
//}
countQB, _ := orm.NewQueryBuilder("mysql")
countQB.Select("count(*)").
From("bills")
queryQB, _ := orm.NewQueryBuilder("mysql")
queryQB.Select("count(*),sum(workload) AS workload", "sum(exec_duration) AS exec_duration", "time").
From("bills")
//if appRequest.WorkerAcc != "" {
// countQB.Where(fmt.Sprintf("worker_acc = '%s'", appRequest.WorkerAcc))
// queryQB.Where(fmt.Sprintf("worker_acc = '%s'", appRequest.WorkerAcc))
//}
//if appRequest.ProfitAcc != "" && appRequest.WorkerAcc == "" {
// countQB.Where(fmt.Sprintf("profit_acc = '%s'", appRequest.ProfitAcc))
// queryQB.Where(fmt.Sprintf("profit_acc = '%s'", appRequest.ProfitAcc))
//}
//if appRequest.ProfitAcc != "" && appRequest.WorkerAcc != "" {
// countQB.And(fmt.Sprintf("profit_acc = '%s'", appRequest.ProfitAcc))
// queryQB.And(fmt.Sprintf("profit_acc = '%s'", appRequest.ProfitAcc))
//}
if appRequest.StartTime != "" && appRequest.EndTime != "" {
temp, _ := time.Parse(layout, appRequest.StartTime)
startTime := fmt.Sprintf(temp.Format(format))
temp, _ = time.Parse(layout, appRequest.EndTime)
endTime := fmt.Sprintf(temp.Format(format))
countQB.And(fmt.Sprintf("time >= '%s'", startTime)).And(fmt.Sprintf("time <= '%s'", endTime))
queryQB.And(fmt.Sprintf("time >= '%s'", startTime)).And(fmt.Sprintf("time <= '%s'", endTime))
}
sql := fmt.Sprintf("%s SAMPLE BY 1d ALIGN TO CALENDAR", countQB.String())
sql = fmt.Sprintf("SELECT count(*) FROM (%s);", sql)
total, err := postgres.QueryTotal(sql)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
return
}
logs.Debug("total = %d", total)
var responseTasks []models.Bills
if total == 0 {
responseData := struct {
Total int64 `json:"total"`
Data interface{} `json:"data,omitempty"`
}{
Total: total,
Data: responseTasks,
}
server.respond(http.StatusOK, "", responseData)
return
}
//queryQB.OrderBy("time").Desc()
sql = fmt.Sprintf("%s SAMPLE BY 1d ALIGN TO CALENDAR ORDER BY time DESC LIMIT %d,%d;", queryQB.String(), offset, size)
counts, err := postgres.QueryBills(sql)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
return
}
responseData := struct {
Total int64 `json:"total"`
Data interface{} `json:"data,omitempty"`
}{
Total: total,
Data: counts,
}
server.respond(http.StatusOK, "", responseData)
}
func (server *ClientController) TaskRevenue() {
body := server.Ctx.Input.RequestBody
appRequest := models.AppRequest{}
err := json.Unmarshal(body, &appRequest) //解析body中数据
......@@ -194,18 +334,31 @@ func (server *ClientController) Tasks() {
offset := (appRequest.Page - 1) * appRequest.Size
size := appRequest.Page * appRequest.Size
if appRequest.WorkerAcc == "" {
server.respond(models.MissingParameter, "Missing worker_acc parameter")
if appRequest.WorkerAcc == "" && appRequest.ProfitAcc == "" {
server.respond(models.MissingParameter, "At least one of worker_acc or profit_acc needs to be passed.")
return
}
countQB, _ := orm.NewQueryBuilder("mysql")
countQB.Select("count(*)").
From("bills").Where(fmt.Sprintf("worker_acc = '%s'", appRequest.WorkerAcc))
From("bills")
queryQB, _ := orm.NewQueryBuilder("mysql")
queryQB.Select("id", "fee", "type", "time", "exec_duration", "workload", "profit_acc", "worker_acc", "result").
From("bills").Where(fmt.Sprintf("worker_acc = '%s'", appRequest.WorkerAcc))
queryQB.Select("id", "type", "time", "workload", "profit_acc", "worker_acc").
From("bills")
if appRequest.WorkerAcc != "" {
countQB.Where(fmt.Sprintf("worker_acc = '%s'", appRequest.WorkerAcc))
queryQB.Where(fmt.Sprintf("worker_acc = '%s'", appRequest.WorkerAcc))
}
if appRequest.ProfitAcc != "" && appRequest.WorkerAcc == "" {
countQB.Where(fmt.Sprintf("profit_acc = '%s'", appRequest.ProfitAcc))
queryQB.Where(fmt.Sprintf("profit_acc = '%s'", appRequest.ProfitAcc))
}
if appRequest.ProfitAcc != "" && appRequest.WorkerAcc != "" {
countQB.And(fmt.Sprintf("profit_acc = '%s'", appRequest.ProfitAcc))
queryQB.And(fmt.Sprintf("profit_acc = '%s'", appRequest.ProfitAcc))
}
if appRequest.StartTime != "" && appRequest.EndTime != "" {
temp, _ := time.Parse(layout, appRequest.StartTime)
......@@ -266,7 +419,7 @@ func (server *ClientController) Tasks() {
Id: task.Id,
Type: models.ModelType(typeDe).String(),
Time: task.Time,
Result: task.Result,
//Result: task.Result,
//ApiPath: apiPath,
//Model: model,
BaseModel: baseModel,
......@@ -288,6 +441,49 @@ func (server *ClientController) Tasks() {
server.respond(http.StatusOK, "", responseData)
}
func (server *ClientController) AllNode() {
data, err := registry.NodeManagers()
if err != nil {
server.respond(models.BusinessFailed, err.Error())
return
}
//var nodes []registor.RegistryInfo
//for _, value := range data {
// var node registor.RegistryInfo
// if err := json.Unmarshal([]byte(value), &node); err == nil {
// server.respond(models.BusinessFailed, err.Error())
// return
// }
// nodes = append(nodes, node)
//}
server.respond(http.StatusOK, "", data)
}
func (server *ClientController) AddressByIP() {
body := server.Ctx.Input.RequestBody
appRequest := models.AppRequest{}
_ = json.Unmarshal(body, &appRequest) //解析body中数据
logs.Debug("appRequest", appRequest, string(body))
if appRequest.Ips == nil || (appRequest.Ips != nil && len(appRequest.Ips) <= 0) {
server.respond(models.MissingParameter, "Missing ips parameter")
return
}
address := make(map[string]interface{})
for _, value := range appRequest.Ips {
temp := ipgeo.GetIpAddr(value)
logs.Debug("AddressByIP", temp)
address[value] = temp
}
//data, err := json.Marshal(address)
//if err != nil {
// server.respond(models.BusinessFailed, err.Error())
// return
//}
server.respond(http.StatusOK, "", address)
}
func (server *ClientController) RevenuePerDay() {
body := server.Ctx.Input.RequestBody
......@@ -311,11 +507,11 @@ func (server *ClientController) RevenuePerDay() {
countQB, _ := orm.NewQueryBuilder("mysql")
countQB.Select("count(*)").
From("bills")
From("revenues")
queryQB, _ := orm.NewQueryBuilder("mysql")
queryQB.Select("count(*),sum(workload) AS workload", "sum(exec_duration) AS exec_duration", "time").
From("bills")
queryQB.Select("*").
From("revenues")
if appRequest.WorkerAcc != "" {
countQB.Where(fmt.Sprintf("worker_acc = '%s'", appRequest.WorkerAcc))
......@@ -326,16 +522,17 @@ func (server *ClientController) RevenuePerDay() {
queryQB.Where(fmt.Sprintf("profit_acc = '%s'", appRequest.ProfitAcc))
}
sql := fmt.Sprintf("%s SAMPLE BY 1M ALIGN TO CALENDAR", countQB.String())
sql = fmt.Sprintf("SELECT count(*) FROM (%s);", sql)
//sql := fmt.Sprintf("%s SAMPLE BY 1M ALIGN TO CALENDAR", countQB.String())
//sql = fmt.Sprintf("SELECT count(*) FROM (%s);", sql)
sql := countQB.String()
total, err := postgres.QueryTotal(sql)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
return
}
logs.Debug("total = %d", total)
var responseTasks []models.TaskCount
var responseTasks []interface{}
if total == 0 {
responseData := struct {
Total int64 `json:"total"`
......@@ -349,8 +546,9 @@ func (server *ClientController) RevenuePerDay() {
}
queryQB.OrderBy("time").Desc()
sql = fmt.Sprintf("%s SAMPLE BY 1M ALIGN TO CALENDAR LIMIT %d,%d;", queryQB.String(), offset, size)
counts, err := postgres.CountTasks(sql)
//sql = fmt.Sprintf("%s SAMPLE BY 1M ALIGN TO CALENDAR LIMIT %d,%d;", queryQB.String(), offset, size)
sql = fmt.Sprintf("%s LIMIT %d,%d;", queryQB.String(), offset, size)
revenues, err := postgres.QueryRevenues(sql)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
return
......@@ -360,7 +558,7 @@ func (server *ClientController) RevenuePerDay() {
Data interface{} `json:"data,omitempty"`
}{
Total: total,
Data: counts,
Data: revenues,
}
server.respond(http.StatusOK, "", responseData)
}
......@@ -156,8 +156,8 @@ func (server *FavoriteController) Lists() {
queryQB, _ := orm.NewQueryBuilder("mysql")
countQB.Select("count(*) AS total").
From("favorite").
InnerJoin("task_type").On("favorite.task_type_id = task_type.id")
From("favorite")
//InnerJoin("task_type").On("favorite.task_type_id = task_type.id")
queryQB.Select("favorite.id", "favorite.task_type_id", "favorite.user_id", "favorite.created_time",
"task_type.name").
......
......@@ -398,8 +398,8 @@ func (server *FundsController) IncomeAndExpense() {
}
var ids []int
for _, fund := range data {
id, _ := strconv.Atoi(fund.Uid)
ids = append(ids, id)
//id, _ := strconv.Atoi(fund.Uid)
ids = append(ids, fund.Uid)
}
var users []models.User
if token.Role == 1 || token.Role == 2 {
......@@ -408,34 +408,30 @@ func (server *FundsController) IncomeAndExpense() {
var responseTypes []models.ResponseFunds
for _, fund := range data {
amountInt, _ := strconv.ParseInt(fund.Amount, 10, 64)
//amountInt, _ := strconv.ParseInt(fund.Amount, 10, 64)
amountInt := fund.Amount
amount := amountInt / 1000000
balanceInt, _ := strconv.ParseInt(fund.Balance, 10, 64)
//balanceInt, _ := strconv.ParseInt(fund.Balance, 10, 64)
balanceInt := fund.Balance
balance := balanceInt / 1000000
tradeChannel, _ := strconv.Atoi(fund.TradeChannel)
status, _ := strconv.Atoi(fund.Status)
tradeFlow, _ := strconv.Atoi(fund.TradeFlow)
tradeType, _ := strconv.Atoi(fund.TradeType)
id, _ := strconv.Atoi(fund.Uid)
username := FindName(users, id)
username := FindName(users, fund.Uid)
responseType := models.ResponseFunds{
Id: fund.Id, // int64
Uid: fund.Uid, // int
Amount: float64(amount), // int64
TradeChannel: models.PayMethodType(tradeChannel).String(), // int
TradeChannelEn: models.PayMethodType(tradeChannel).EnString(),
TradeChannel: models.PayMethodType(fund.TradeChannel).String(), // int
TradeChannelEn: models.PayMethodType(fund.TradeChannel).EnString(),
ChannelSerial: fund.ChannelSerial, // string
StatusDesc: models.PayStatus(status).String(),
StatusDescEn: models.PayStatus(status).EnString(), // int
Status: status,
StatusDesc: models.PayStatus(fund.Status).String(),
StatusDescEn: models.PayStatus(fund.Status).EnString(), // int
Status: fund.Status,
TradeTime: fund.TradeTime, // string
TradeFlow: models.TradeFlowType(tradeFlow).String(), // int
TradeFlowEn: models.TradeFlowType(tradeFlow).EnString(),
TradeType: models.TradeKind(tradeType).String(), // int
TradeTypeEn: models.TradeKind(tradeType).EnString(),
TradeFlow: models.TradeFlowType(fund.TradeFlow).String(), // int
TradeFlowEn: models.TradeFlowType(fund.TradeFlow).EnString(),
TradeType: models.TradeKind(fund.TradeType).String(), // int
TradeTypeEn: models.TradeKind(fund.TradeType).EnString(),
Balance: float64(balance), // int64
Remark: fund.Remark, // string
OrderId: fund.OrderId,
......
......@@ -218,7 +218,7 @@ func (server *TaskController) BillDetails() {
}
}
balance, _ := odysseus.GetUserBalance(int64(token.UserID))
//balance, _ := odysseus.GetUserBalance(int64(token.UserID))
reTask := models.Bills{
Id: task.Id,
......@@ -228,7 +228,6 @@ func (server *TaskController) BillDetails() {
Result: task.Result,
ApiPath: apiPath,
Desc: desc,
Balance: balance,
}
responseTasks = append(responseTasks, reTask)
}
......
......@@ -445,45 +445,45 @@ func regisgerUser(user models.User) (*models.User, error) {
user.Level = 0
user.Role = 4
_, err = mysql.GetMysqlInstace().Ormer.Insert(&user)
id, err := mysql.GetMysqlInstace().Ormer.Insert(&user)
if err != nil {
return nil, errors.New("create user failed")
}
checkUser := &models.User{Username: user.Username}
err = mysql.GetMysqlInstace().Ormer.Read(checkUser, "username")
if err != nil {
return nil, errors.New("create user failed")
}
checkUser.CustomId = strconv.Itoa(checkUser.Id)
//checkUser := &models.User{Username: user.Username}
//err = mysql.GetMysqlInstace().Ormer.Read(checkUser, "username")
//if err != nil {
// return nil, errors.New("create user failed")
//}
user.CustomId = strconv.Itoa(int(id))
data, err := kong.CreateUser(checkUser)
data, err := kong.CreateUser(&user)
if err != nil {
mysql.GetMysqlInstace().Ormer.Delete(checkUser)
mysql.GetMysqlInstace().Ormer.Delete(&user)
return nil, err
}
if data.Id == "" {
mysql.GetMysqlInstace().Ormer.Delete(checkUser)
mysql.GetMysqlInstace().Ormer.Delete(&user)
return nil, errors.New(data.Message)
}
checkUserLevel := &models.UserLevel{Level: checkUser.Level}
checkUserLevel := &models.UserLevel{Level: user.Level}
err = mysql.GetMysqlInstace().Ormer.Read(checkUserLevel, "level")
if err != nil {
logs.Debug("Recharge 用户等级查找失败")
} else {
plugin, err := kong.SetRateLimit(checkUser, checkUserLevel, "")
plugin, err := kong.SetRateLimit(&user, checkUserLevel, "")
if err == nil {
checkUser.RateLimitPluginId = plugin.Id
user.RateLimitPluginId = plugin.Id
}
}
mysql.GetMysqlInstace().Ormer.Update(checkUser)
mysql.GetMysqlInstace().Ormer.Update(&user)
createApiKey(checkUser)
createJWTToken(checkUser)
createApiKey(&user)
createJWTToken(&user)
return checkUser, nil
return &user, nil
}
func createApiKey(checkUser *models.User) {
......
......@@ -2,6 +2,7 @@ package cronjob
import (
"ai_developer_admin/libs/mysql"
"ai_developer_admin/libs/odysseus"
"ai_developer_admin/libs/postgres"
"ai_developer_admin/libs/redis"
"ai_developer_admin/libs/registry"
......@@ -22,6 +23,7 @@ import (
var loopCronTask = cron.New(cron.WithSeconds())
var debitTask = cron.New(cron.WithSeconds())
var revenueTask = cron.New(cron.WithSeconds())
var registryTask = cron.New(cron.WithSeconds())
var HeatKey = "task:heat"
......@@ -190,8 +192,8 @@ func startDebit() {
temp := fmt.Sprintf("-%dh", 2)
m, _ := time.ParseDuration(temp)
dayTime := currentTime.Add(m)
endTime := time.Date(dayTime.Year(), dayTime.Month(), dayTime.Day(), 23, 59, 59, 0, dayTime.Location())
startTime := time.Date(dayTime.Year(), dayTime.Month(), dayTime.Day(), 0, 0, 0, 0, dayTime.Location())
endTime := time.Date(dayTime.Year(), dayTime.Month(), dayTime.Day(), 23, 59, 59, 0, time.UTC)
startTime := time.Date(dayTime.Year(), dayTime.Month(), dayTime.Day(), 0, 0, 0, 0, time.UTC)
start := fmt.Sprintf(startTime.Format(format))
end := fmt.Sprintf(endTime.Format(format))
......@@ -256,6 +258,94 @@ func startDebit() {
debitTask.Start()
}
func startRevenue() {
logs.Debug("startRevenue")
//spec := "*/50 */23 * * * ?" //"@every 1h"
//spec := "01 01 00 * * ?"
spec, _ := beego.AppConfig.String("debitTime")
//spec := "@every 1m"
dbhost, _ := beego.AppConfig.String("postgreshost")
dbport, _ := beego.AppConfig.Int("senderport")
questAddr := fmt.Sprintf("%s:%d", dbhost, dbport)
revenueTask.AddFunc(spec, func() {
logs.Debug("startRevenue revenueTask")
ctx := context.TODO()
addrOpt := qdb.WithAddress(questAddr)
sender, err := qdb.NewLineSender(ctx, addrOpt)
if err != nil {
logs.Debug("startRevenue NewLineSender = %s", err.Error())
return
}
defer sender.Close()
// Make sure to close the sender on exit to release resources.
currentTime := time.Now()
temp := fmt.Sprintf("-%dh", 2)
m, _ := time.ParseDuration(temp)
dayTime := currentTime.Add(m)
endTime := time.Date(dayTime.Year(), dayTime.Month(), dayTime.Day(), 23, 59, 59, 0, time.UTC)
startTime := time.Date(dayTime.Year(), dayTime.Month(), dayTime.Day(), 0, 0, 0, 0, time.UTC)
start := fmt.Sprintf(startTime.Format(format))
end := fmt.Sprintf(endTime.Format(format))
logs.Debug("startRevenue before sql = %s", dayTime)
sql := fmt.Sprintf("SELECT id,time,type,workload,profit_acc,worker_acc FROM bills WHERE uid != '0' and time >= '%s' and time <= '%s';", start, end)
data, err := postgres.QueryBills(sql)
if err != nil {
logs.Debug("startRevenue postgres CountFunds = %s", err.Error())
return
}
if data == nil {
logs.Debug("startRevenue data = nil")
return
}
for _, bill := range data {
workload, _ := strconv.Atoi(bill.Workload)
//nanoseconds := int64(uint64(dayTime.UnixNano()))
//seconds := nanoseconds / 1e9
taskId, err := strconv.Atoi(bill.Type)
typeDesc := ""
baseModel := ""
if err == nil {
taskType, err1 := odysseus.GetTaskType(int64(taskId))
if err1 == nil {
typeDesc = models.ModelType(taskType.Type).String()
baseModel = taskType.BaseModel
}
}
err = sender.Table("revenues").
Symbol("id", bill.Id).
Symbol("type", typeDesc).
Symbol("base_model", baseModel).
Symbol("profit_acc", bill.ProfitAcc).
Symbol("worker_acc", bill.WorkerAcc).
Int64Column("workload", int64(workload)).
Int64Column("income", int64(0)).
TimestampColumn("time", bill.Time).
AtNow(ctx)
if err != nil {
logs.Debug("startRevenue sender = %s,id = %d", err.Error(), bill.Id)
}
//break
}
err = sender.Flush(ctx)
if err != nil {
logs.Debug("startRevenue Flush = %s", err.Error())
}
})
debitTask.Start()
}
func startRegistBackend() {
spec := "@every 1m"
registryTask.AddFunc(spec, func() {
......
......@@ -61,6 +61,7 @@ func init() {
panic(err)
}
createFundTable()
createRevenueTable()
}
func createFundTable() {
......@@ -72,6 +73,15 @@ func createFundTable() {
}
}
func createRevenueTable() {
sql := "CREATE TABLE IF NOT EXISTS 'revenues' (\nid LONG,\ntime TIMESTAMP, \ntype SYMBOL CAPACITY 128 CACHE INDEX CAPACITY 8192,\nbase_model SYMBOL CAPACITY 128 CACHE INDEX CAPACITY 8192,\nprofit_acc SYMBOL CAPACITY 128 CACHE INDEX CAPACITY 8192, \nworker_acc SYMBOL CAPACITY 128 CACHE INDEX CAPACITY 8192, \nworkload int,\nincome LONG) timestamp (time) PARTITION BY DAY WAL;"
qs := ormpost.Raw(sql)
_, err := qs.Exec()
if err != nil {
logs.Debug("createRevenueTable", err.Error())
}
}
func QueryTset(sql string, args ...interface{}) ([]models.Bills, error) {
logs.Debug("QueryBills = ", sql)
qs := ormpost.Raw(sql, args)
......@@ -168,6 +178,13 @@ func CountFunds(sql string) ([]models.IncomeAndExpenseRsponse, error) {
//return containers, nil
}
func QueryRevenues(sql string) ([]models.Revenues, error) {
logs.Debug("QueryFunds = ", sql)
var containers []models.Revenues
_, err := ormpost.Raw(sql).QueryRows(&containers)
return containers, err
}
func QueryTotal(sql string) (int64, error) {
logs.Debug("QueryBills = ", sql)
var count int64
......
package models
import "time"
type Revenues struct {
Id string `json:"id";orm:"column(id)"`
Time time.Time `json:"time";orm:"column(time);type(datetime)"`
Type string `json:"type";orm:"column(type)"`
BaseModel string `json:"base_model";orm:"column(base_model)"`
ProfitAcc string `json:"profit_acc";orm:"column(profit_acc)"`
WorkerAcc string `json:"worker_acc";orm:"column(worker_acc)"`
Workload int `json:"workload";orm:"column(workload)"`
Income int64 `json:"income";orm:"column(income)"`
}
......@@ -223,23 +223,23 @@ type ChargeRecord struct {
}
type Funds struct {
Id string `json:"id";orm:"column(id)"` // int64
Uid string `json:"uid";orm:"column(uid)"` // int
Amount string `json:"amount";orm:"column(amount)"` // int64
TradeChannel string `json:"trade_channel";orm:"column(trade_channel)"` // int
Id int64 `json:"id";orm:"column(id)"` // int64
Uid int `json:"uid";orm:"column(uid)"` // int
Amount int64 `json:"amount";orm:"column(amount)"` // int64
TradeChannel int `json:"trade_channel";orm:"column(trade_channel)"` // int
ChannelSerial string `json:"channel_serial";orm:"column(channel_serial)"` // string
Status string `json:"status";orm:"column(status);size(1)"` // int
Status int `json:"status";orm:"column(status);size(1)"` // int
TradeTime string `json:"trade_time";orm:"column(trade_time);type(datetime)"` // string
TradeFlow string `json:"trade_flow";orm:"column(trade_flow)"` // int
TradeType string `json:"trade_type";orm:"column(trade_type)"` // int
Balance string `json:"balance";orm:"column(balance)"` // int64
TradeFlow int `json:"trade_flow";orm:"column(trade_flow)"` // int
TradeType int `json:"trade_type";orm:"column(trade_type)"` // int
Balance int64 `json:"balance";orm:"column(balance)"` // int64
Remark string `json:"remark";orm:"column(remark)"` // string
OrderId string `json:"order_id";orm:"column(order_id)"` // string
}
type ResponseFunds struct {
Id string `json:"id";orm:"column(id)"` // int64
Uid string `json:"uid";orm:"column(uid)"` // int
Id int64 `json:"id";orm:"column(id)"` // int64
Uid int `json:"uid";orm:"column(uid)"` // int
Amount float64 `json:"amount";orm:"column(amount)"` // int64
TradeChannel string `json:"trade_channel";orm:"column(trade_channel)"`
TradeChannelEn string `json:"trade_channel_en";orm:"column(trade_channel)"` // int
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment