Commit 54000bad authored by brent's avatar brent

Initial commit

parents
Pipeline #782 failed with stages
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/aon_app_server.iml" filepath="$PROJECT_DIR$/.idea/aon_app_server.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
FROM golang:1.20-alpine AS build
# Install dependencies
RUN apk update && \
apk upgrade && \
apk add --no-cache bash git openssh make build-base
RUN go env -w CGO_ENABLED="1"
WORKDIR /build
RUN cd /build/aon-app-server && go build -o /aon-app-server
FROM alpine
WORKDIR /root
COPY --from=build /aon-app-server /usr/bin/aon-app-server
ENTRYPOINT [ "aon-app-server" ]
\ No newline at end of file
appname = aon_app_server
httpport = 8088
runmode = dev
copyrequestbody = true
[dev]
apikey = "Rbhpcp0FKNrYNA1nZkrwrIbD0YSSRlVG"
taskUrl = "https://api.aonet.ai/api/v1"
imageUrl = "https://tmp-file.aigic.ai/api/v1/upload/persistence"
[test]
apikey = "Rbhpcp0FKNrYNA1nZkrwrIbD0YSSRlVG"
taskUrl = "https://api.aonet.ai/api/v1"
imageUrl = "https://tmp-file.aigic.ai/api/v1/upload/persistence"
[prod]
apikey = "Rbhpcp0FKNrYNA1nZkrwrIbD0YSSRlVG"
taskUrl = "https://api.aonet.ai/api/v1"
imageUrl = "https://tmp-file.aigic.ai/api/v1/upload/persistence"
include "mysql.conf"
include "mongo.conf"
\ No newline at end of file
[dev]
mongo.user = "aon_demo"
mongo.pass = "7Aj2Cke7RYbAzpsy"
mongo.host = "127.0.0.1"
mongo.port = 27017
mongo.db = "aon_demo"
[test]
mongo.user = "aon_demo"
mongo.pass = "7Aj2Cke7RYbAzpsy"
mongo.host = "18.167.203.17"
mongo.port = 27017
mongo.db = "aon_demo"
[prod]
mongo.user = "aon_demo"
mongo.pass = "7Aj2Cke7RYbAzpsy"
mongo.host = "aon-app-server-db"
mongo.port = 27017
mongo.db = "aon_demo"
\ No newline at end of file
[dev]
mysql.user = "aon_demo"
mysql.pass = "7Aj2Cke7RYbAzpsy"
mysql.host = "18.167.203.17"
mysql.port = 3306
mysql.db = "aon_demo"
[test]
mysql.user = "aon_demo"
mysql.pass = "7Aj2Cke7RYbAzpsy"
mysql.host = "18.167.203.17"
mysql.port = 3306
mysql.db = "aon_demo"
[prod]
mysql.user = "aon_demo"
mysql.pass = "7Aj2Cke7RYbAzpsy"
mysql.host = "18.167.203.17"
mysql.port = 3306
mysql.db = "aon_demo"
\ No newline at end of file
package controllers
import (
beego "github.com/beego/beego/v2/server/web"
"net/http"
)
type MainController struct {
beego.Controller
}
func (server *MainController) Get() {
data := struct {
Website string `json:"website"`
Email string `json:"email"`
}{
Website: "aon app",
Email: "aon@aonet.ai",
}
server.respond(http.StatusOK, "", data)
}
func (server *MainController) respond(code int, message string, data ...interface{}) {
status := 200
if code == 401 {
status = code
}
server.Ctx.Output.SetStatus(status)
var d interface{}
if len(data) > 0 {
d = data[0]
}
server.Data["json"] = struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}{
Code: code,
Message: message,
Data: d,
}
server.ServeJSON()
}
package controllers
import (
"aon_app_server/models"
"aon_app_server/utils/mongo"
"bytes"
"encoding/json"
"github.com/beego/beego/v2/core/logs"
beego "github.com/beego/beego/v2/server/web"
"go.mongodb.org/mongo-driver/bson"
"io"
"net/http"
"time"
)
type TaskController struct {
MainController
}
var execTasks = make(chan *models.Task, 100)
// Prediction @router /prediction/ [post]
func (server *TaskController) Prediction() {
body := server.Ctx.Input.RequestBody
task := models.Task{}
err := json.Unmarshal(body, &task) //解析body中数据
logs.Debug("appRequest", task)
if err != nil {
server.respond(models.NoRequestBody, err.Error())
return
}
task.CreatedTime = time.Now().UTC()
id, err := mongo.Insert(&task)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
return
}
task.Id = id
//execTasks <- &task
result, err := sendTask(&task)
if err != nil {
server.respond(http.StatusOK, err.Error())
return
}
server.respond(http.StatusOK, "", result)
}
func copyImages(images []string) []string {
imagesToCopy := models.ImagesToCopy{
Sources: images,
}
url, _ := beego.AppConfig.String("imageUrl")
payload := new(bytes.Buffer)
json.NewEncoder(payload).Encode(imagesToCopy)
resp, err := http.Post(url, "application/json;charset=UTF-8", payload)
if err != nil {
logs.Info("Error sending request:", err)
return images
}
defer resp.Body.Close()
logs.Info("copyImages resp code", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
if err != nil {
logs.Info("Error reading response:", err)
return images
}
logs.Debug("copyImages body", string(body))
var response []models.ImageCopied
if err := json.Unmarshal(body, &response); err != nil {
return images
}
var backImages []string
for _, value := range response {
if value.Code == 200 {
backImages = append(backImages, value.Data)
}
}
if len(backImages) > 0 {
return backImages
}
return images
}
func sendTask(task *models.Task) (*models.TaskResponse, error) {
host, _ := beego.AppConfig.String("taskUrl")
url := host + task.ApiPath
payload := new(bytes.Buffer)
//var input interface{}
//if err := json.Unmarshal([]byte(task.Input), &input); err != nil {
// setError(task, "task.Input Unmarshal error:"+err.Error())
// logs.Info("sendTask task.Input Unmarshal response:", err)
// return nil, err
//}
json.NewEncoder(payload).Encode(task.Input)
client := &http.Client{}
request, err := http.NewRequest("POST", url, payload)
if err != nil {
setError(task, "sendTask request create error:"+err.Error())
logs.Info("sendTask Error NewRequest request:", err)
return nil, err
}
apikey, _ := beego.AppConfig.String("apikey")
request.Header.Add("apikey", apikey)
logs.Info("sendTask client sending request:")
resp, err := client.Do(request)
if err != nil {
setError(task, "Task sending error:"+err.Error())
logs.Info("sendTask Error sending request:", err)
return nil, err
}
defer resp.Body.Close()
logs.Info("sendTask resp code", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
logs.Info("sendTask body", string(body))
if resp.StatusCode == 200 && body == nil {
setError(task, "Task response body null")
logs.Info("sendTask Body reading response:", err)
return nil, err
}
if err != nil {
setError(task, "Task read response body error:"+err.Error())
logs.Info("sendTask Error reading response:", err)
return nil, err
}
var response models.TaskResponse
if err = json.Unmarshal(body, &response); err != nil {
setError(task, "Task response Unmarshal error:"+err.Error())
logs.Info("sendTask Error Unmarshal response:", err)
return nil, err
}
taskId := resp.Header.Get("task-id")
task.TaskId = taskId
task.UpdatedTime = time.Now().UTC()
if response.Task.IsSuccess {
task.Status = 2
task.Output = copyImages(response.Output)
mongo.Update(task)
response.Output = task.Output
//data, err := json.Marshal(response.Output)
//if err != nil {
// logs.Info("sendTask Output Unmarshal response:", err)
// //return nil, err
// task.Error = "Output json Unmarshal error"
// mongo.Update(task)
//} else {
// task.Output = string(data)
// mongo.Update(task)
//}
} else {
task.Status = 3
task.Error = response.Task
_, err = mongo.Update(task)
if err != nil {
logs.Info("Update Task Error:", err)
}
//data, err := json.Marshal(response.Task)
//if err != nil {
// logs.Info("sendTask response.Task Unmarshal response:", err)
// //return
// task.Error = "response task json Unmarshal error"
//} else {
// task.Error = string(data)
//}
//mongo.Update(task)
}
return &response, nil
}
func setError(task *models.Task, error string) {
task.Status = 3
task.Error = error
_, err := mongo.Update(task)
if err != nil {
logs.Info("Update Task Error:", err)
}
}
// Result @router /result/:task_id [get]
func (server *TaskController) Result() {
taskId := server.GetString("task_id")
data := struct {
Result string `json:"result"`
TaskId string `json:"task_id"`
}{
Result: "success",
TaskId: taskId,
}
server.respond(http.StatusOK, "", data)
}
// List @router /list/ [post]
func (server *TaskController) List() {
body := server.Ctx.Input.RequestBody
request := models.ListRequest{}
err := json.Unmarshal(body, &request) //解析body中数据
logs.Debug("Request", string(body), request)
if err != nil {
server.respond(models.NoRequestBody, err.Error())
return
}
if request.Page == 0 {
request.Page = 1
}
if request.Size == 0 {
request.Size = 10
}
total, data, err := mongo.Query("Task", request.Page, request.Size, request.Filter)
if err != nil {
logs.Info("List Error:", err)
}
var tasks []models.Task
for _, bsonD := range data {
var task models.Task
// 将 bson.D 转换为 bson.Raw
bsonRaw, err := bson.Marshal(bsonD)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
}
// 将 bson.Raw 解码为 User 结构体
err = bson.Unmarshal(bsonRaw, &task)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
}
if task.Input != nil {
raw, err := bson.Marshal(task.Input)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
}
// 将 bson.Raw 解码为 User 结构体
var input map[string]interface{}
err = bson.Unmarshal(raw, &input)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
}
task.Input = input
}
if task.Error != nil {
raw, err := bson.Marshal(task.Error)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
}
// 将 bson.Raw 解码为 User 结构体
var error1 models.TaskReturn
err = bson.Unmarshal(raw, &error1)
if err != nil {
server.respond(models.BusinessFailed, err.Error())
}
task.Error = error1
}
tasks = append(tasks, task)
}
responseData := struct {
Total int64 `json:"total"`
Data interface{} `json:"data,omitempty"`
}{
Total: total,
Data: tasks,
}
server.respond(http.StatusOK, "", responseData)
}
module aon_app_server
go 1.19
require github.com/beego/beego/v2 v2.0.1
require (
github.com/beego/beego v1.12.12
github.com/go-sql-driver/mysql v1.5.0
github.com/robfig/cron/v3 v3.0.1
github.com/smartystreets/goconvey v1.6.4
go.mongodb.org/mongo-driver v1.15.1
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/golang/protobuf v1.4.2 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.3.3 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.7.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.10.0 // indirect
github.com/prometheus/procfs v0.1.3 // indirect
github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.6.0 // indirect
google.golang.org/protobuf v1.23.0 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
)
This diff is collapsed.
{"/Users/brent/Documents/wubanWork/aon_app_server/controllers":1719017412063939595}
\ No newline at end of file
package main
import (
_ "aon_app_server/routers"
beego "github.com/beego/beego/v2/server/web"
"github.com/beego/beego/v2/server/web/filter/cors"
)
func init() {
beego.InsertFilter("*", beego.BeforeRouter, cors.Allow(&cors.Options{
AllowAllOrigins: true,
AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
AllowHeaders: []string{"Origin", "Authorization", "Access-Control-Allow-Origin", "Access-Control-Allow-Headers", "Content-Type", "X-Xsrf-Token"},
ExposeHeaders: []string{"Content-Length", "Access-Control-Allow-Origin", "Access-Control-Allow-Headers", "Content-Type", "X-Xsrf-Token", "Authorization"},
AllowCredentials: true,
}))
}
func main() {
//cronjob.Start()
beego.Run()
}
package models
import "time"
const (
NoRequestBody = iota + 5000
MissingParameter
CreateUserFailed
LoginFailed
BusinessFailed
Web3VerifyFailed
Web3LoginTimeout
Web3ClinetIdError
)
type Task struct {
Id interface{} `json:"id" bson:"_id,omitempty"`
TaskId string `json:"task_id,omitempty" bson:"task_id"`
Input interface{} `json:"input" bson:"input"`
ApiPath string `json:"api_path" bson:"api_path"`
UserId string `json:"user_id" bson:"user_id"`
Status int `json:"status" bson:"status"`
Output []string `json:"output" bson:"output"`
Error interface{} `json:"error" bson:"error"`
ExcuteId string `json:"excute_id" bson:"excute_id"`
CreatedTime time.Time `json:"created_time" bson:"created_time"`
UpdatedTime time.Time `json:"updated_time" bson:"updated_time"`
Deleted int `json:"deleted" bson:"deleted"`
}
type TaskReturn struct {
TaskId string `json:"task_id"`
TaskUid string `json:"task_uid"`
TaskFee string `json:"task_fee"`
IsSuccess bool `json:"is_success"`
TaskError string `json:"task_error"`
ExecCode string `json:"exec_code"`
ExecError string `json:"exec_error"`
ApiError struct {
RequestId string `json:"request_id"`
Message string `json:"message"`
} `json:"api_error"`
}
type TaskResponse struct {
Task TaskReturn `json:"task"`
Output []string `json:"output"`
}
type TaskResult struct {
Id int `json:"id";orm:"column(id);auto"`
TaskId string `json:"task_id,omitempty";orm:"column(task_id)"`
Fee int64 `json:"fee";orm:"column(fee)"`
OutLen int `json:"out_len";orm:"column(out_len)"`
workload int `json:"workload";orm:"column(workload)"`
TaskDuration int `json:"task_duration";orm:"column(task_duration)"`
ExecDuration int `json:"exec_duration";orm:"column(exec_duration)"`
Result string `json:"result";orm:"column(result)"`
Output string `json:"output";orm:"column(output)"`
Error string `json:"error";orm:"column(error);type(json)"`
CreatedTime time.Time `json:"created_time";orm:"column(created_time);auto_now_add;type(datetime)"`
UpdatedTime time.Time `json:"updated_time";orm:"column(updated_time);auto_now;type(datetime)"`
Deleted int `json:"deleted";orm:"column(deleted);size(1)"`
}
type ListRequest struct {
Page int `json:"page"`
Size int `json:"size"`
Filter interface{} `json:"filter"`
}
type ImageCopied struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data string `json:"data"`
}
type ImagesToCopy struct {
Sources []string `json:"sources"`
}
package routers
import (
"aon_app_server/controllers"
beego "github.com/beego/beego/v2/server/web"
)
func init() {
beego.Router("/", &controllers.MainController{})
beego.AutoPrefix("app/api/v1", &controllers.TaskController{})
//ns := beego.NewNamespace("app",
// beego.NSNamespace("api",
// beego.NSNamespace("v1",
// beego.NSNamespace("task",
// beego.NSInclude(&controllers.TaskController{})))))
//beego.AddNamespace(ns)
}
function b(a){var c=new WebSocket(a);c.onclose=function(){setTimeout(function(){b(a)},2E3)};c.onmessage=function(){location.reload()}}try{if(window.WebSocket)try{b("ws://localhost:12450/reload")}catch(a){console.error(a)}else console.log("Your browser does not support WebSockets.")}catch(a){console.error("Exception during connecting to Reload:",a)};
package test
import (
"net/http"
"net/http/httptest"
"testing"
"runtime"
"path/filepath"
"github.com/beego/beego/v2/core/logs"
_ "aon_app_server/routers"
beego "github.com/beego/beego/v2/server/web"
. "github.com/smartystreets/goconvey/convey"
)
func init() {
_, file, _, _ := runtime.Caller(0)
apppath, _ := filepath.Abs(filepath.Dir(filepath.Join(file, ".." + string(filepath.Separator))))
beego.TestBeegoInit(apppath)
}
// TestBeego is a sample to run an endpoint test
func TestBeego(t *testing.T) {
r, _ := http.NewRequest("GET", "/", nil)
w := httptest.NewRecorder()
beego.BeeApp.Handlers.ServeHTTP(w, r)
logs.Trace("testing", "TestBeego", "Code[%d]\n%s", w.Code, w.Body.String())
Convey("Subject: Test Station Endpoint\n", t, func() {
Convey("Status Code Should Be 200", func() {
So(w.Code, ShouldEqual, 200)
})
Convey("The Result Should Not Be Empty", func() {
So(w.Body.Len(), ShouldBeGreaterThan, 0)
})
})
}
package cronjob
import (
"aon_app_server/models"
"aon_app_server/utils/mysql"
"bytes"
"encoding/json"
"github.com/beego/beego/v2/core/logs"
beego "github.com/beego/beego/v2/server/web"
"github.com/robfig/cron/v3"
"io"
"net/http"
"sync"
)
var sendTask = cron.New(cron.WithSeconds())
var resultTask = cron.New(cron.WithSeconds())
var sendOffsetMutex sync.Mutex
var resultOffsetMutex sync.Mutex
func Start() {
//defer loopCronTask.Stop()
startSendTask()
//startResultTask()
}
func startSendTask() {
logs.Debug("startSendTask")
//spec := "0 0 * * * ?" //"@every 1h"
spec := "@every 1m" //"@every 1h"
offset := int64(0)
execTasks := make(chan *models.Task, 100)
//spec := "*/1 * * * * ?" //"@every 1h"
sendTask.AddFunc(spec, func() {
logs.Debug("sendTask")
tasks := queryPendingTask(offset)
for _, value := range tasks {
execTasks <- value
}
sendOffsetMutex.Lock()
defer sendOffsetMutex.Unlock()
offset += int64(len(tasks))
})
go func() {
for task := range execTasks {
go sendPendingTask(task)
}
}()
sendTask.Start()
}
func queryPendingTask(offset int64) []*models.Task {
qs := mysql.GetOrmer().QueryTable("task").
Filter("status", 0).Offset(offset).Limit(5)
var tasks []*models.Task
qs.All(&tasks)
return tasks
}
func sendPendingTask(task *models.Task) {
host, _ := beego.AppConfig.String("taskUrl")
url := host + task.ApiPath
payload := new(bytes.Buffer)
var input interface{}
if err := json.Unmarshal([]byte(task.Input), &input); err != nil {
logs.Info("sendPendingTask task.Input Unmarshal response:", err)
return
}
json.NewEncoder(payload).Encode(input)
client := &http.Client{}
request, err := http.NewRequest("POST", url, payload)
if err != nil {
logs.Info("sendPendingTask Error NewRequest request:", err)
return
}
if task.Auth != "" && len(task.Auth) > 32 {
request.Header.Add("Authorization", "Bearer "+task.Auth)
} else {
request.Header.Add("apikey", task.Auth)
}
logs.Info("sendPendingTask client sending request:")
resp, err := client.Do(request)
if err != nil {
logs.Info("sendPendingTask Error sending request:", err)
return
}
defer resp.Body.Close()
logs.Info("sendPendingTask resp code", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
logs.Info("sendPendingTask body", string(body))
if err != nil {
logs.Info("sendPendingTask Error reading response:", err)
return
}
var response models.TaskResponse
if err = json.Unmarshal(body, &response); err != nil {
logs.Info("sendPendingTask Error Unmarshal response:", err)
return
}
if response.Task.IsSuccess {
task.Status = 2
data, err := json.Marshal(response.Output)
if err != nil {
logs.Info("sendPendingTask Output Unmarshal response:", err)
return
}
task.Output = string(data)
mysql.GetOrmer().Update(task, "status",
"output")
} else {
task.Status = 3
data, err := json.Marshal(response.Task)
if err != nil {
logs.Info("sendPendingTask response.Task Unmarshal response:", err)
return
}
task.Error = string(data)
mysql.GetOrmer().Update(task, "status",
"error")
}
}
func startResultTask() {
logs.Debug("startResultTask")
//spec := "0 0 * * * ?" //"@every 1h"
spec := "@every 1h" //"@every 1h"
offset := int64(0)
//spec := "*/1 * * * * ?" //"@every 1h"
resultTask.AddFunc(spec, func() {
logs.Debug("resultTask")
tasks := queryExecingTask(offset)
for _, value := range tasks {
go queryResult(value)
}
resultOffsetMutex.Lock()
defer resultOffsetMutex.Unlock()
offset += 5
})
resultTask.Start()
}
func queryExecingTask(offset int64) []*models.Task {
qs := mysql.GetOrmer().QueryTable("task").
Filter("status", 1).Offset(offset).Limit(5)
var tasks []*models.Task
qs.All(&tasks)
return tasks
}
func queryResult(task *models.Task) {
host, _ := beego.AppConfig.String("taskUrl")
url := host + "/query/v1/" + task.TaskId
payload := new(bytes.Buffer)
json.NewEncoder(payload).Encode(task.Input)
client := &http.Client{}
request, err := http.NewRequest("POST", url, payload)
if err != nil {
logs.Info("queryResult Error NewRequest request:", err)
return
}
if task.Auth != "" && len(task.Auth) > 32 {
request.Header.Add("Authorization", "Bearer "+task.Auth)
} else {
request.Header.Add("apikey", task.Auth)
}
resp, err := client.Do(request)
if err != nil {
logs.Info("queryResult Error sending request:", err)
return
}
defer resp.Body.Close()
logs.Info("queryResult resp code", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
if err != nil {
logs.Info("queryResult Error reading response:", err)
return
}
var response models.TaskResponse
if err = json.Unmarshal(body, &response); err != nil {
logs.Info("queryResult Error Unmarshal response:", err)
return
}
task.Status = 1
//mysql.GetOrmer().Update(task, "status")
}
package mongo
import (
"context"
"errors"
"fmt"
"github.com/beego/beego/v2/core/logs"
beego "github.com/beego/beego/v2/server/web"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"reflect"
"time"
)
//type Singleton struct {
// client *mongo.Client
// col *mongo.Collection
//}
//
//var instance *Singleton
//var once sync.Once
//
//func GetInstace() *Singleton {
// once.Do(func() {
// instance = &Singleton{
// Ormer: orm.NewOrm(),
// }
// })
// return instance
//}
var Client *mongo.Client
var database *mongo.Database
func init() {
logs.Debug("mongo init")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
username, _ := beego.AppConfig.String("mongo.user")
passwd, _ := beego.AppConfig.String("mongo.pass")
host, _ := beego.AppConfig.String("mongo.host")
port, _ := beego.AppConfig.String("mongo.port")
db, _ := beego.AppConfig.String("mongo.db")
logs.Debug("mongo info", username, passwd, host, port, db)
uri := fmt.Sprintf("mongodb://%s:%s", host, port)
//client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri).SetAuth(options.Credential{
// Username: username,
// Password: passwd,
//}))
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
//defer func() {
// if err = client.Disconnect(ctx); err != nil {
// panic(err)
// }
//}()
if err != nil {
logs.Debug("mongo db Connect err ", err.Error())
panic(err.Error())
}
//err = client.Ping(ctx, readpref.Primary())
//if err != nil {
// logs.Debug("mongo db Connect err ", err.Error())
// panic(err.Error())
//}
Client = client
database = client.Database(db)
}
func Insert(i interface{}) (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
t := reflect.TypeOf(i)
// 检查是否是指针,如果是指针则获取指针指向的元素类型
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
// 打印结构体的名称
logs.Debug("结构体名称 = ", t.Name())
collection := database.Collection(t.Name())
res, err := collection.InsertOne(ctx, i)
if err != nil {
return nil, err
}
return res.InsertedID, err
}
func Update(i interface{}) (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
t := reflect.TypeOf(i)
v := reflect.ValueOf(i)
// 检查是否是指针,如果是指针则获取指针指向的元素类型
if t.Kind() == reflect.Ptr {
t = t.Elem()
v = v.Elem()
}
// 打印结构体的名称
logs.Debug("结构体名称 =", t.Name())
collection := database.Collection(t.Name())
idField := v.FieldByName("Id")
if !(idField.IsValid() && idField.CanInterface()) {
logs.Debug("结构体中没有 ID 字段或无法访问")
return nil, errors.New("_id field must set")
}
filter := bson.M{"_id": idField.Interface()}
bsonData, err := bson.Marshal(i)
if err != nil {
logs.Debug("BSON 编码失败", err)
return nil, err
}
var bsonDoc bson.D
err = bson.Unmarshal(bsonData, &bsonDoc)
if err != nil {
logs.Debug("BSON 反序列化失败", err)
return nil, err
}
updateData := bson.M{}
for _, elem := range bsonDoc {
updateData[elem.Key] = elem.Value
}
update := bson.M{"$set": updateData}
res, err := collection.UpdateOne(ctx, filter, update)
if err != nil {
logs.Debug("UpdateOne", err)
}
return res.UpsertedID, err
}
func Read(i interface{}, filter interface{}) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
t := reflect.TypeOf(i)
// 检查是否是指针,如果是指针则获取指针指向的元素类型
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
// 打印结构体的名称
logs.Debug("结构体名称 =", t.Name())
collection := database.Collection(t.Name())
finalFilter := bson.M{}
if filter != nil {
v := reflect.ValueOf(filter)
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
switch v.Kind() {
case reflect.Struct:
// 遍历结构体字段
for i := 0; i < v.NumField(); i++ {
fieldName := v.Type().Field(i).Name
fieldValue := v.Field(i).Interface()
finalFilter[fieldName] = fieldValue
logs.Debug("fieldName: fieldValue\n", fieldName, fieldValue)
}
case reflect.Map:
// 遍历 map 键和值
for _, key := range v.MapKeys() {
value := v.MapIndex(key).Interface()
keyStr := fmt.Sprintf("%v", key.Interface())
logs.Debug("keyStr: value\n", keyStr, value)
finalFilter[keyStr] = value
}
default:
logs.Debug("Unsupported type")
return errors.New("filter type is Unsupported")
}
}
err := collection.FindOne(ctx, finalFilter).Decode(i)
if err == mongo.ErrNoDocuments {
return nil
} else if err != nil {
return err
}
return err
}
func Query(collectionName string, page int, size int, filter interface{}) (int64, []bson.D, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
collection := database.Collection(collectionName)
finalFilter := bson.M{}
if filter != nil {
v := reflect.ValueOf(filter)
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
switch v.Kind() {
case reflect.Struct:
// 遍历结构体字段
for i := 0; i < v.NumField(); i++ {
fieldName := v.Type().Field(i).Name
fieldValue := v.Field(i).Interface()
if fieldName == "created_time" {
finalFilter[fieldName] = bson.M{
"$gte": "",
"$lt": "",
}
continue
}
finalFilter[fieldName] = fieldValue
logs.Debug("fieldName: fieldValue\n", fieldName, fieldValue)
}
case reflect.Map:
// 遍历 map 键和值
for _, key := range v.MapKeys() {
value := v.MapIndex(key).Interface()
keyStr := fmt.Sprintf("%v", key.Interface())
logs.Debug("keyStr: value\n", keyStr, value)
if keyStr == "created_time" {
finalFilter[keyStr] = bson.M{
"$gte": "",
"$lt": "",
}
continue
}
finalFilter[keyStr] = value
}
default:
logs.Debug("Unsupported type")
return 0, nil, errors.New("filter type is Unsupported")
}
}
total, err := collection.CountDocuments(ctx, finalFilter)
if err != nil {
logs.Debug("Failed to count documents: %v", err)
}
fmt.Printf("Total documents count: %d\n", total)
skip := (page - 1) * size
findOptions := options.Find()
findOptions.SetSkip(int64(skip))
findOptions.SetLimit(int64(size))
logs.Debug("finalFilter ", finalFilter)
cur, err := collection.Find(ctx, finalFilter, findOptions)
if err != nil {
logs.Debug("Find cur", err)
return 0, nil, err
}
var back []bson.D
for cur.Next(ctx) {
var result bson.D
err = cur.Decode(&result)
if err != nil {
logs.Debug("cur.Next", err)
return 0, nil, err
}
back = append(back, result)
// do something with result....
}
if err = cur.Err(); err != nil {
logs.Debug("cur.Err()", err)
return 0, nil, err
}
//jsonData, err := json.Marshal(back)
//if err != nil {
// fmt.Println("JSON marshaling failed:", err)
// return nil, err
//}
//logs.Debug("jsonData", jsonData)
return total, back, nil
}
package mysql
//import (
// "aon_app_server/models"
// "fmt"
// "github.com/beego/beego/orm"
// "github.com/beego/beego/v2/core/logs"
// beego "github.com/beego/beego/v2/server/web"
// _ "github.com/go-sql-driver/mysql"
//)
//
////Ormer orm.Ormer
//
//var mysqlorm orm.Ormer
//
//func init() {
// logs.Debug("mysql init")
// //return
// orm.Debug = true
// if err := orm.RegisterDriver("mysql", orm.DRMySQL); err != nil {
// logs.Error(err.Error())
// }
//
// orm.RegisterModel(new(models.Task))
// //orm.RegisterModel(new(models.App))
// orm.RegisterModel(new(models.TaskResult))
//
// logs.Debug("AppConfig", beego.AppConfig)
//
// user, _ := beego.AppConfig.String("mysql.user")
// pass, _ := beego.AppConfig.String("mysql.pass")
// host, _ := beego.AppConfig.String("mysql.host")
// port, _ := beego.AppConfig.String("mysql.port")
// db, _ := beego.AppConfig.String("mysql.db")
// logs.Debug("mysql info", user, pass, host, port, db)
//
// dbURL := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=true", user, pass, host, port, db)
// logs.Debug("Will connect to mysql url", dbURL)
//
// if err := orm.RegisterDataBase("default", "mysql", dbURL); err != nil {
// logs.Error(err.Error())
// panic(err.Error())
// }
// orm.SetMaxIdleConns("default", 10)
// orm.SetMaxOpenConns("default", 100)
//
// mysqlorm = orm.NewOrm()
//}
//
//func GetOrmer() orm.Ormer {
// return mysqlorm
//}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment