Commit 6282daf5 authored by brent's avatar brent

modify client interface

parent 572ac87f
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"github.com/beego/beego/orm" "github.com/beego/beego/orm"
"github.com/beego/beego/v2/core/logs" "github.com/beego/beego/v2/core/logs"
beego "github.com/beego/beego/v2/server/web" beego "github.com/beego/beego/v2/server/web"
"github.com/odysseus/cache/model"
"io" "io"
"math/big" "math/big"
"net/http" "net/http"
...@@ -238,7 +239,7 @@ func (server *ClientController) GlobalTasksPerDay() { ...@@ -238,7 +239,7 @@ func (server *ClientController) GlobalTasksPerDay() {
body := server.Ctx.Input.RequestBody body := server.Ctx.Input.RequestBody
appRequest := models.AppRequest{} appRequest := models.AppRequest{}
err := json.Unmarshal(body, &appRequest) //解析body中数据 err := json.Unmarshal(body, &appRequest) //解析body中数据
logs.Debug("appRequest", appRequest) logs.Debug("appRequest", body, appRequest)
if appRequest.Page == 0 { if appRequest.Page == 0 {
appRequest.Page = 1 appRequest.Page = 1
...@@ -276,12 +277,12 @@ func (server *ClientController) GlobalTasksPerDay() { ...@@ -276,12 +277,12 @@ func (server *ClientController) GlobalTasksPerDay() {
//} //}
if appRequest.StartTime != "" && appRequest.EndTime != "" { if appRequest.StartTime != "" && appRequest.EndTime != "" {
temp, _ := time.Parse(layout, appRequest.StartTime) //temp, _ := time.Parse("2006-01-02T15:04:05Z", appRequest.StartTime)
startTime := fmt.Sprintf(temp.Format(format)) //startTime := fmt.Sprintf(temp.Format(format))
temp, _ = time.Parse(layout, appRequest.EndTime) //temp, _ = time.Parse(layout, appRequest.EndTime)
endTime := fmt.Sprintf(temp.Format(format)) //endTime := fmt.Sprintf(temp.Format(format))
countQB.And(fmt.Sprintf("time >= '%s'", startTime)).And(fmt.Sprintf("time <= '%s'", endTime)) countQB.Where(fmt.Sprintf("time >= '%s'", appRequest.StartTime)).And(fmt.Sprintf("time <= '%s'", appRequest.EndTime))
queryQB.And(fmt.Sprintf("time >= '%s'", startTime)).And(fmt.Sprintf("time <= '%s'", endTime)) queryQB.Where(fmt.Sprintf("time >= '%s'", appRequest.StartTime)).And(fmt.Sprintf("time <= '%s'", appRequest.EndTime))
} }
sql := fmt.Sprintf("%s SAMPLE BY 1d ALIGN TO CALENDAR", countQB.String()) sql := fmt.Sprintf("%s SAMPLE BY 1d ALIGN TO CALENDAR", countQB.String())
...@@ -381,8 +382,8 @@ func (server *ClientController) TaskReward() { ...@@ -381,8 +382,8 @@ func (server *ClientController) TaskReward() {
offset := (appRequest.Page - 1) * appRequest.Size offset := (appRequest.Page - 1) * appRequest.Size
size := appRequest.Page * appRequest.Size size := appRequest.Page * appRequest.Size
if appRequest.ProfitAcc == "" { if appRequest.WorkerAcc == "" {
server.respond(models.MissingParameter, "Missing profit_acc.") server.respond(models.MissingParameter, "Missing worker_acc.")
return return
} }
...@@ -394,9 +395,9 @@ func (server *ClientController) TaskReward() { ...@@ -394,9 +395,9 @@ func (server *ClientController) TaskReward() {
queryQB.Select("id", "type", "time", "workload", "profit_acc", "worker_acc"). queryQB.Select("id", "type", "time", "workload", "profit_acc", "worker_acc").
From("bills") From("bills")
if appRequest.ProfitAcc != "" { if appRequest.WorkerAcc != "" {
countQB.Where(fmt.Sprintf("profit_acc = '%s'", appRequest.ProfitAcc)) countQB.Where(fmt.Sprintf("worker_acc = '%s'", appRequest.WorkerAcc))
queryQB.Where(fmt.Sprintf("profit_acc = '%s'", appRequest.ProfitAcc)) queryQB.Where(fmt.Sprintf("worker_acc = '%s'", appRequest.WorkerAcc))
} }
if appRequest.StartTime != "" && appRequest.EndTime != "" { if appRequest.StartTime != "" && appRequest.EndTime != "" {
...@@ -440,25 +441,30 @@ func (server *ClientController) TaskReward() { ...@@ -440,25 +441,30 @@ func (server *ClientController) TaskReward() {
// 遍历对象数组,按照日期进行分组 // 遍历对象数组,按照日期进行分组
var dates []string var dates []string
dateString := ""
for _, item := range data { for _, item := range data {
// 将日期转换为当天的 0 点 // 将日期转换为当天的 0 点
date := time.Date(item.Time.Year(), item.Time.Month(), item.Time.Day(), 0, 0, 0, 0, time.UTC) date := time.Date(item.Time.Year(), item.Time.Month(), item.Time.Day(), 0, 0, 0, 0, time.UTC)
// 将 item 添加到对应日期的分组中 // 将 item 添加到对应日期的分组中
key := date.Format("2006-01-02") key := date.Format(format)
//key := date.Format("2006-01-02")
if !contains(dates, key) { if !contains(dates, key) {
dates = append(dates, key) dates = append(dates, key)
dateString = dateString + "'" + key + "'" + ","
} }
baseModel := "" baseModel := ""
typeDe := 1 typeDe := 0
taskId := item.Type taskId := item.Type
var taskType *model.TaskType
if err == nil { if err == nil {
taskType, _ := odysseus.GetTaskType(int64(taskId)) taskType, _ = odysseus.GetTaskType(int64(taskId))
if taskType != nil { if taskType != nil {
baseModel = taskType.BaseModel baseModel = taskType.BaseModel
typeDe = taskType.Type typeDe = taskType.Type
} }
} }
reTask := &models.Revenues{ reTask := &models.Revenues{
Id: item.Id, Id: item.Id,
Type: models.ModelType(typeDe).String(), Type: models.ModelType(typeDe).String(),
...@@ -470,8 +476,11 @@ func (server *ClientController) TaskReward() { ...@@ -470,8 +476,11 @@ func (server *ClientController) TaskReward() {
responseTasks = append(responseTasks, reTask) responseTasks = append(responseTasks, reTask)
groupedItems[key] = append(groupedItems[key], reTask) groupedItems[key] = append(groupedItems[key], reTask)
} }
if len(dates) > 0 {
dateString = dateString[:len(dateString)-1]
}
wei, err := getWeiPerWorkload(dates) //wei, err := getWeiPerWorkload(dates)
//wei := []models.WeiPerWorkloadStruct{ //wei := []models.WeiPerWorkloadStruct{
// { // {
// Date: "2024-05-08", // Date: "2024-05-08",
...@@ -480,52 +489,21 @@ func (server *ClientController) TaskReward() { ...@@ -480,52 +489,21 @@ func (server *ClientController) TaskReward() {
// Reward: "10000", // Reward: "10000",
// }, // },
//} //}
sql = fmt.Sprintf("SELECT * FROM reward_multiplier WHERE time in(%s) ORDER BY time DESC;", dateString)
wei, err := postgres.QueryWei(sql)
//return total, data, err
for _, value := range wei { for _, value := range wei {
bills := groupedItems[value.Date] key := value.Time.Format(format)
bills := groupedItems[key]
for _, item := range bills { for _, item := range bills {
workload := big.NewInt(item.Workload) workload := big.NewInt(item.Workload)
weiPerWorkload, _ := strconv.Atoi(value.WeiPerWorkload) weiPerWorkload, _ := strconv.Atoi(value.Wei)
weiPerWorkloadBig := big.NewInt(int64(weiPerWorkload)) weiPerWorkloadBig := big.NewInt(int64(weiPerWorkload))
incomeWei := new(big.Int).Mul(workload, weiPerWorkloadBig) incomeWei := new(big.Int).Mul(workload, weiPerWorkloadBig)
income := weiToAGI(incomeWei) income := weiToAGI(incomeWei)
item.Income = income item.Income = income
} }
} }
//for _, task := range data {
// //apiPath := ""
// //model := ""
// baseModel := ""
// //kind := 1
// typeDe := 1
// taskId, err := strconv.Atoi(task.Type)
// if err == nil {
// taskType, _ := odysseus.GetTaskType(int64(taskId))
// if taskType != nil {
// //apiPath = taskType.ApiPath
// //model = taskType.Model
// baseModel = taskType.BaseModel
// //kind = taskType.Kind
// typeDe = taskType.Type
// }
// }
//
// reTask := models.Bills{
// Id: task.Id,
// Type: models.ModelType(typeDe).String(),
// Time: task.Time,
// //Result: task.Result,
// //ApiPath: apiPath,
// //Model: model,
// BaseModel: baseModel,
// //Kind: models.TaskKind(kind).EnString(),
// //Desc: desc,
// Workload: task.Workload,
// ProfitAcc: task.ProfitAcc,
// //WorkerAcc: task.WorkerAcc,
// }
// responseTasks = append(responseTasks, reTask)
//}
responseData := struct { responseData := struct {
Total int64 `json:"total"` Total int64 `json:"total"`
Data interface{} `json:"data,omitempty"` Data interface{} `json:"data,omitempty"`
......
...@@ -2,19 +2,22 @@ package cronjob ...@@ -2,19 +2,22 @@ package cronjob
import ( import (
"ai_developer_admin/libs/mysql" "ai_developer_admin/libs/mysql"
"ai_developer_admin/libs/odysseus"
"ai_developer_admin/libs/postgres" "ai_developer_admin/libs/postgres"
"ai_developer_admin/libs/redis" "ai_developer_admin/libs/redis"
"ai_developer_admin/libs/registry" "ai_developer_admin/libs/registry"
"ai_developer_admin/libs/snowflake" "ai_developer_admin/libs/snowflake"
"ai_developer_admin/models" "ai_developer_admin/models"
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"github.com/beego/beego/v2/core/logs" "github.com/beego/beego/v2/core/logs"
beego "github.com/beego/beego/v2/server/web" beego "github.com/beego/beego/v2/server/web"
qdb "github.com/questdb/go-questdb-client/v2" qdb "github.com/questdb/go-questdb-client/v2"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"io"
"net/http"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
...@@ -34,6 +37,7 @@ func Start() { ...@@ -34,6 +37,7 @@ func Start() {
//defer loopCronTask.Stop() //defer loopCronTask.Stop()
startHeatKey() startHeatKey()
startDebit() startDebit()
startRevenue()
//startRegistBackend() //startRegistBackend()
} }
...@@ -262,86 +266,211 @@ func startRevenue() { ...@@ -262,86 +266,211 @@ func startRevenue() {
logs.Debug("startRevenue") logs.Debug("startRevenue")
//spec := "*/50 */23 * * * ?" //"@every 1h" //spec := "*/50 */23 * * * ?" //"@every 1h"
//spec := "01 01 00 * * ?" //spec := "01 01 00 * * ?"
spec, _ := beego.AppConfig.String("debitTime") //spec, _ := beego.AppConfig.String("debitTime")
//spec := "@every 1m" //spec := "@every 1m"
dbhost, _ := beego.AppConfig.String("postgreshost") spec := "@every 6h"
dbport, _ := beego.AppConfig.Int("senderport") //dbhost, _ := beego.AppConfig.String("postgreshost")
//dbport, _ := beego.AppConfig.Int("senderport")
questAddr := fmt.Sprintf("%s:%d", dbhost, dbport) //questAddr := fmt.Sprintf("%s:%d", dbhost, dbport)
revenueTask.AddFunc(spec, func() { revenueTask.AddFunc(spec, func() {
logs.Debug("startRevenue revenueTask") logs.Debug("startRevenue revenueTask")
ctx := context.TODO()
addrOpt := qdb.WithAddress(questAddr)
sender, err := qdb.NewLineSender(ctx, addrOpt)
_, multipliers, err := checkMultiplier()
if err != nil { if err != nil {
logs.Debug("startRevenue NewLineSender = %s", err.Error())
return return
} }
defer sender.Close() var dates []string
// Make sure to close the sender on exit to release resources. for _, value := range multipliers {
temp := fmt.Sprintf("+%dh", 24)
currentTime := time.Now() m, _ := time.ParseDuration(temp)
temp := fmt.Sprintf("-%dh", 2) time := value.Time.Add(m)
m, _ := time.ParseDuration(temp) date := fmt.Sprintf(time.Format("2006-01-02"))
dayTime := currentTime.Add(m) dates = append(dates, date)
endTime := time.Date(dayTime.Year(), dayTime.Month(), dayTime.Day(), 23, 59, 59, 0, time.UTC) }
startTime := time.Date(dayTime.Year(), dayTime.Month(), dayTime.Day(), 0, 0, 0, 0, time.UTC) if len(dates) <= 0 {
start := fmt.Sprintf(startTime.Format(format)) dates = checkDate()
end := fmt.Sprintf(endTime.Format(format)) }
reaponse, err := getWeiPerWorkload(dates)
logs.Debug("startRevenue before sql = %s", dayTime)
sql := fmt.Sprintf("SELECT id,time,type,workload,profit_acc,worker_acc FROM bills WHERE uid != '0' and time >= '%s' and time <= '%s';", start, end)
data, err := postgres.QueryBills(sql)
if err != nil { if err != nil {
logs.Debug("startRevenue postgres CountFunds = %s", err.Error())
return return
} }
if data == nil { err = insertMultiplier(reaponse)
logs.Debug("startRevenue data = nil") if err != nil {
return return
} }
for _, bill := range data { //ctx := context.TODO()
//addrOpt := qdb.WithAddress(questAddr)
//sender, err := qdb.NewLineSender(ctx, addrOpt)
//
//if err != nil {
// logs.Debug("startRevenue NewLineSender = %s", err.Error())
// return
//}
//defer sender.Close()
// Make sure to close the sender on exit to release resources.
//nanoseconds := int64(uint64(dayTime.UnixNano())) //temp, _ := time.Parse("2006-01-02", value.Date)
//seconds := nanoseconds / 1e9 //startTime :=
//endTime := time.Date(dayTime.Year(), dayTime.Month(), dayTime.Day(), 23, 59, 59, 0, time.UTC)
//startTime := time.Date(dayTime.Year(), dayTime.Month(), dayTime.Day(), 0, 0, 0, 0, time.UTC)
//start := fmt.Sprintf(startTime.Format(format))
//end := fmt.Sprintf(endTime.Format(format))
taskId := bill.Type //logs.Debug("startRevenue before sql = %s", dayTime)
typeDesc := ""
baseModel := ""
if err == nil {
taskType, err1 := odysseus.GetTaskType(int64(taskId))
if err1 == nil {
typeDesc = models.ModelType(taskType.Type).String()
baseModel = taskType.BaseModel
}
}
err = sender.Table("revenues"). //sql := fmt.Sprintf("SELECT id,time,type,workload,profit_acc,worker_acc FROM bills WHERE time >= '%s' and time <= '%s';", start, end)
Symbol("id", bill.Id). //data, err := postgres.QueryBills(sql)
Symbol("type", typeDesc). //if err != nil {
Symbol("base_model", baseModel). // logs.Debug("startRevenue postgres CountFunds = %s", err.Error())
Symbol("profit_acc", bill.ProfitAcc). // return
Symbol("worker_acc", bill.WorkerAcc). //}
Int64Column("workload", bill.Workload). //if data == nil {
Int64Column("income", int64(0)). // logs.Debug("startRevenue data = nil")
TimestampColumn("time", bill.Time). // return
AtNow(ctx) //}
if err != nil { //for _, bill := range data {
logs.Debug("startRevenue sender = %s,id = %d", err.Error(), bill.Id) //
} // //nanoseconds := int64(uint64(dayTime.UnixNano()))
//break // //seconds := nanoseconds / 1e9
//
// taskId := bill.Type
// typeDesc := ""
// baseModel := ""
// if err == nil {
// taskType, err1 := odysseus.GetTaskType(int64(taskId))
// if err1 == nil {
// typeDesc = models.ModelType(taskType.Type).String()
// baseModel = taskType.BaseModel
// }
// }
//
// err = sender.Table("revenues").
// Symbol("id", bill.Id).
// Symbol("type", typeDesc).
// Symbol("base_model", baseModel).
// Symbol("profit_acc", bill.ProfitAcc).
// Symbol("worker_acc", bill.WorkerAcc).
// Int64Column("workload", bill.Workload).
// Int64Column("income", int64(0)).
// TimestampColumn("time", bill.Time).
// AtNow(ctx)
//
// if err != nil {
// logs.Debug("startRevenue sender = %s,id = %d", err.Error(), bill.Id)
// }
// //break
//}
//
//err = sender.Flush(ctx)
//if err != nil {
// logs.Debug("startRevenue Flush = %s", err.Error())
//}
})
revenueTask.Start()
}
func checkDate() []string {
sql := "SELECT count(*),time FROM bills SAMPLE BY 1d ALIGN TO CALENDAR ORDER BY time DESC;"
date, err := postgres.QueryBills(sql)
if err != nil {
return nil
}
var dates []string
for _, value := range date {
dateString := fmt.Sprintf(value.Time.Format("2006-01-02"))
dates = append(dates, dateString)
}
return dates
}
func getWeiPerWorkload(dates []string) ([]models.WeiPerWorkloadStruct, error) {
host, _ := beego.AppConfig.String("rewardUrl")
url := host + "/api/v1/workload"
payload := new(bytes.Buffer)
json.NewEncoder(payload).Encode(dates)
//if len(dates) <= 0 {
// payload = nil
//}
resp, err := http.Post(url, "application/json;charset=UTF-8", payload)
if err != nil {
logs.Info("Error sending request:", err)
return nil, err
}
defer resp.Body.Close()
logs.Info("getWeiPerWorkload resp code", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
if err != nil {
logs.Info("Error reading response:", err)
return nil, err
}
var response models.WeiPerWorkloadResponse
if err := json.Unmarshal(body, &response); err != nil {
logs.Info("Error Unmarshal response:", err)
return nil, err
}
return response.Data, nil
}
func checkMultiplier() (int64, []models.Revenues, error) {
sql := "SELECT count(*) FROM reward_multiplier;"
total, err := postgres.QueryTotal(sql)
if total <= 0 {
return total, nil, err
}
sql = "SELECT * FROM reward_multiplier ORDER BY time DESC LIMIT 0,1;"
data, err := postgres.QueryRevenues(sql)
return total, data, err
}
func insertMultiplier(data []models.WeiPerWorkloadStruct) error {
if len(data) <= 0 {
return errors.New("data len < 0")
}
dbhost, _ := beego.AppConfig.String("postgreshost")
dbport, _ := beego.AppConfig.Int("senderport")
questAddr := fmt.Sprintf("%s:%d", dbhost, dbport)
ctx := context.TODO()
addrOpt := qdb.WithAddress(questAddr)
sender, err := qdb.NewLineSender(ctx, addrOpt)
if err != nil {
logs.Debug("startRevenue NewLineSender = %s", err.Error())
return err
}
defer sender.Close()
for _, value := range data {
if strings.Compare(value.WeiPerWorkload, "0") == 0 {
continue
} }
temp, _ := time.Parse("2006-01-02", value.Date)
nanoseconds := int64(uint64(temp.UnixNano()))
seconds := nanoseconds / 1e9
id, _ := snowflake.NextId()
err = sender.Table("reward_multiplier").
Symbol("wei", value.WeiPerWorkload).
Int64Column("id", int64(id)).
TimestampColumn("time", time.Unix(seconds, nanoseconds%1e9)).
AtNow(ctx)
err = sender.Flush(ctx)
if err != nil { if err != nil {
logs.Debug("startRevenue Flush = %s", err.Error()) logs.Debug("startRevenue sender = %s,id = %d", err.Error())
return err
} }
}) //break
debitTask.Start() }
err = sender.Flush(ctx)
if err != nil {
logs.Debug("startRevenue Flush = %s", err.Error())
return err
}
return nil
} }
func startRegistBackend() { func startRegistBackend() {
......
...@@ -62,6 +62,7 @@ func init() { ...@@ -62,6 +62,7 @@ func init() {
} }
createFundTable() createFundTable()
createRevenueTable() createRevenueTable()
createRewardMultiplierTable()
} }
func createFundTable() { func createFundTable() {
...@@ -82,6 +83,15 @@ func createRevenueTable() { ...@@ -82,6 +83,15 @@ func createRevenueTable() {
} }
} }
func createRewardMultiplierTable() {
sql := "CREATE TABLE IF NOT EXISTS 'reward_multiplier' (\nid LONG,\ntime TIMESTAMP, \nwei SYMBOL CAPACITY 128 CACHE INDEX CAPACITY 8192) timestamp (time) PARTITION BY DAY WAL;"
qs := ormpost.Raw(sql)
_, err := qs.Exec()
if err != nil {
logs.Debug("createRewardMultiplierTable", err.Error())
}
}
func QueryTset(sql string, args ...interface{}) ([]models.Bills, error) { func QueryTset(sql string, args ...interface{}) ([]models.Bills, error) {
logs.Debug("QueryBills = ", sql) logs.Debug("QueryBills = ", sql)
qs := ormpost.Raw(sql, args) qs := ormpost.Raw(sql, args)
...@@ -185,6 +195,13 @@ func QueryRevenues(sql string) ([]models.Revenues, error) { ...@@ -185,6 +195,13 @@ func QueryRevenues(sql string) ([]models.Revenues, error) {
return containers, err return containers, err
} }
func QueryWei(sql string) ([]models.WeiStruct, error) {
logs.Debug("QueryWei = ", sql)
var containers []models.WeiStruct
_, err := ormpost.Raw(sql).QueryRows(&containers)
return containers, err
}
func QueryTotal(sql string) (int64, error) { func QueryTotal(sql string) (int64, error) {
logs.Debug("QueryBills = ", sql) logs.Debug("QueryBills = ", sql)
var count int64 var count int64
......
...@@ -16,6 +16,12 @@ type Revenues struct { ...@@ -16,6 +16,12 @@ type Revenues struct {
Income *big.Float `json:"income,omitempty";orm:"column(income)"` Income *big.Float `json:"income,omitempty";orm:"column(income)"`
} }
type WeiStruct struct {
Id string `json:"id,omitempty";orm:"column(id)"`
Time time.Time `json:"time,omitempty";orm:"column(time);type(datetime)"`
Wei string `json:"wei,omitempty";orm:"column(wei)"`
}
type WeiPerWorkloadStruct struct { type WeiPerWorkloadStruct struct {
Date string `json:"date,omitempty"` Date string `json:"date,omitempty"`
WeiPerWorkload string `json:"weiPerWorkload,omitempty"` WeiPerWorkload string `json:"weiPerWorkload,omitempty"`
......
...@@ -283,6 +283,7 @@ type TaskHeat struct { ...@@ -283,6 +283,7 @@ type TaskHeat struct {
EstimatExeTime int `json:"estimat_exe_time"` EstimatExeTime int `json:"estimat_exe_time"`
StartUpTime int `json:"start_up_time"` StartUpTime int `json:"start_up_time"`
RunningMem int `json:"running_mem";orm:"column(running_mem)"` RunningMem int `json:"running_mem";orm:"column(running_mem)"`
CMD interface{} `json:"cmd";orm:"column(cmd)"`
} }
type AddTaskType struct { type AddTaskType struct {
...@@ -334,7 +335,7 @@ func (m ModelType) String() string { ...@@ -334,7 +335,7 @@ func (m ModelType) String() string {
case AUDIOTOAUDIO: case AUDIOTOAUDIO:
return "audio2audio" return "audio2audio"
default: default:
return "未知类型" return "unknown"
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment