Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
A
ai-api-mgr
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Odysseus
ai-api-mgr
Commits
be1df639
Commit
be1df639
authored
Feb 28, 2024
by
Your Name
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
del comment
parent
ba095504
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
47 additions
and
110 deletions
+47
-110
main.go
main.go
+1
-63
service-register.go
service-register.go
+46
-47
No files found.
main.go
View file @
be1df639
...
@@ -282,11 +282,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
...
@@ -282,11 +282,7 @@ func kafkaConsumerBytes(done chan interface{}, req, resTopic string) (chan pbUps
func
batchToQuestDb
(
done
chan
interface
{},
reqStream
chan
pbUpstream
.
TaskContent
,
resStream
chan
pbUpstream
.
TaskReceipt
,
reqTableName
string
,
resTableName
,
questAddr
string
)
{
func
batchToQuestDb
(
done
chan
interface
{},
reqStream
chan
pbUpstream
.
TaskContent
,
resStream
chan
pbUpstream
.
TaskReceipt
,
reqTableName
string
,
resTableName
,
questAddr
string
)
{
//tableName := "tasks"
ctx
:=
context
.
TODO
()
ctx
:=
context
.
TODO
()
// Connect to QuestDB running on 127.0.0.1:9009
//addrOpt := qdb.WithAddress("192.168.1.220:9009")
addrOpt
:=
qdb
.
WithAddress
(
questAddr
)
addrOpt
:=
qdb
.
WithAddress
(
questAddr
)
var
sender
*
qdb
.
LineSender
var
sender
*
qdb
.
LineSender
...
@@ -327,7 +323,6 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
...
@@ -327,7 +323,6 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
Int64Column
(
"workload"
,
int64
(
task
.
TaskWorkload
))
.
Int64Column
(
"workload"
,
int64
(
task
.
TaskWorkload
))
.
Int64Column
(
"out_len"
,
int64
(
task
.
TaskOutLen
))
.
Int64Column
(
"out_len"
,
int64
(
task
.
TaskOutLen
))
.
Int64Column
(
"task_duration"
,
int64
(
task
.
TaskDuration
))
.
Int64Column
(
"task_duration"
,
int64
(
task
.
TaskDuration
))
.
//task_execute_duration
Int64Column
(
"exec_duration"
,
int64
(
task
.
TaskExecuteDuration
))
.
Int64Column
(
"exec_duration"
,
int64
(
task
.
TaskExecuteDuration
))
.
AtNow
(
ctx
)
AtNow
(
ctx
)
...
@@ -343,17 +338,6 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
...
@@ -343,17 +338,6 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
case
task
:=
<-
reqStream
:
case
task
:=
<-
reqStream
:
// sender.
// Table(reqTableName).
// Symbol("type", fmt.Sprintf("%d", randomType())).
// Symbol("uid", fmt.Sprintf("%d", randomUID())).
// StringColumn("id", uuid.New().String()).
// TimestampColumn("time", time.Now()).
// Int64Column("fee", int64(randomFeeAndWorkload())).
// Int64Column("in_len", int64(randomInput())).
// AtNow(ctx)
_
=
task
taskFeeAsInt
,
err
:=
strconv
.
Atoi
(
task
.
TaskFee
)
taskFeeAsInt
,
err
:=
strconv
.
Atoi
(
task
.
TaskFee
)
if
err
!=
nil
{
if
err
!=
nil
{
slog
.
Error
(
"task.TaskFee string to int"
,
"error"
,
err
)
slog
.
Error
(
"task.TaskFee string to int"
,
"error"
,
err
)
...
@@ -365,10 +349,6 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
...
@@ -365,10 +349,6 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
nanoseconds
:=
int64
(
task
.
TaskTimestamp
)
nanoseconds
:=
int64
(
task
.
TaskTimestamp
)
seconds
:=
nanoseconds
/
1e9
seconds
:=
nanoseconds
/
1e9
// 使用Unix函数转换为time.Time
// t := time.Unix(seconds, nanoseconds%1e9)
// time.Unix()
for
{
for
{
sender
.
sender
.
...
@@ -408,20 +388,15 @@ type ResponseJson struct {
...
@@ -408,20 +388,15 @@ type ResponseJson struct {
func
newCache
(
redisAddr
,
redisPass
,
mysqlIP
,
dbName
,
user
,
passwd
string
,
port
int
)
*
cachedata
.
CacheData
{
func
newCache
(
redisAddr
,
redisPass
,
mysqlIP
,
dbName
,
user
,
passwd
string
,
port
int
)
*
cachedata
.
CacheData
{
_cache
:=
cachedata
.
NewCacheData
(
context
.
Background
(),
cachedata
.
RedisConnParam
{
_cache
:=
cachedata
.
NewCacheData
(
context
.
Background
(),
cachedata
.
RedisConnParam
{
//Addr: "192.168.1.10:6379",
Addr
:
redisAddr
,
Addr
:
redisAddr
,
Password
:
redisPass
,
Password
:
redisPass
,
DbIndex
:
0
,
DbIndex
:
0
,
},
model
.
DbConfig
{
},
model
.
DbConfig
{
//Host: "192.168.1.211",
Host
:
mysqlIP
,
Host
:
mysqlIP
,
Port
:
port
,
Port
:
port
,
DbName
:
dbName
,
DbName
:
dbName
,
Passwd
:
passwd
,
Passwd
:
passwd
,
User
:
user
,
User
:
user
,
// DbName: "liuxuzhong",
// Passwd: "12345678",
// User: "root",
})
})
return
_cache
return
_cache
}
}
...
@@ -550,7 +525,7 @@ func main() {
...
@@ -550,7 +525,7 @@ func main() {
slog
.
Warn
(
"start param"
,
"!!!!!!!!!!!!!!!!!!!!**********With Bill DB**********!!!!!!!!!!!!!!!!!!!!"
,
withBillDb
)
slog
.
Warn
(
"start param"
,
"!!!!!!!!!!!!!!!!!!!!**********With Bill DB**********!!!!!!!!!!!!!!!!!!!!"
,
withBillDb
)
slog
.
Warn
(
"start param"
,
"!!!!!!!!!!!!!!!!!!!!**********With Bill DB**********!!!!!!!!!!!!!!!!!!!!"
,
withBillDb
)
slog
.
Warn
(
"start param"
,
"!!!!!!!!!!!!!!!!!!!!**********With Bill DB**********!!!!!!!!!!!!!!!!!!!!"
,
withBillDb
)
go
Registry
(
redisAddr
,
redisPassWd
)
//
go Registry(redisAddr, redisPassWd)
kafkaBrokers
=
[]
string
{
kafkaBroker
}
kafkaBrokers
=
[]
string
{
kafkaBroker
}
callbackAddr
=
callbackAddrP
callbackAddr
=
callbackAddrP
...
@@ -564,10 +539,6 @@ func main() {
...
@@ -564,10 +539,6 @@ func main() {
createTable
(
questAddr
[
:
idx
],
createTaskTableSql
)
createTable
(
questAddr
[
:
idx
],
createTaskTableSql
)
createTable
(
questAddr
[
:
idx
],
createBillsTableSql
)
createTable
(
questAddr
[
:
idx
],
createBillsTableSql
)
// DbName: "liuxuzhong",
// Passwd: "12345678",
// User: "root",
cache
=
newCache
(
redisAddr
,
redisPassWd
,
mysqlAddr
,
mysqlDbName
,
mysqlUser
,
mysqlPassWd
,
mysqlPort
)
cache
=
newCache
(
redisAddr
,
redisPassWd
,
mysqlAddr
,
mysqlDbName
,
mysqlUser
,
mysqlPassWd
,
mysqlPort
)
// Create a WaitGroup to synchronize goroutines.
// Create a WaitGroup to synchronize goroutines.
...
@@ -588,15 +559,6 @@ func main() {
...
@@ -588,15 +559,6 @@ func main() {
// Create a new instance of the Fiber web framework.
// Create a new instance of the Fiber web framework.
app
:=
fiber
.
New
()
app
:=
fiber
.
New
()
// cfg := swagger.Config{
// BasePath: "/",
// FilePath: "./docs/swagger.json",
// Path: "swagger",
// Title: "Swagger API Docs",
// }
// app.Use(swagger.New(cfg))
apiGroup
:=
app
.
Group
(
"/api"
)
apiGroup
:=
app
.
Group
(
"/api"
)
jwtGroup
:=
app
.
Group
(
"/jwt"
)
jwtGroup
:=
app
.
Group
(
"/jwt"
)
callbackGroup
:=
app
.
Group
(
"/callback"
)
callbackGroup
:=
app
.
Group
(
"/callback"
)
...
@@ -643,24 +605,10 @@ func main() {
...
@@ -643,24 +605,10 @@ func main() {
var
replanceQueryTask
*
model
.
TaskType
var
replanceQueryTask
*
model
.
TaskType
//var m = make([]byte, 1024*1024+1024*512)
//var m = make([]byte, 1024*1024+1024*512)
var
mypool
=
&
sync
.
Pool
{
New
:
func
()
interface
{}
{
mem
:=
make
([]
byte
,
1024
*
1024
+
1024
*
512
)
return
&
mem
},
}
func
ApiAndJWT
(
c
*
fiber
.
Ctx
)
error
{
func
ApiAndJWT
(
c
*
fiber
.
Ctx
)
error
{
slog
.
Debug
(
"ApiAndJWT"
,
"path"
,
c
.
Route
()
.
Path
)
slog
.
Debug
(
"ApiAndJWT"
,
"path"
,
c
.
Route
()
.
Path
)
//m := mypool.Get().(*[]byte)
//c, err := w.Write(*m)
//return c.Send(*m)
reqHeaders
:=
c
.
GetReqHeaders
()
reqHeaders
:=
c
.
GetReqHeaders
()
//return c.SendString(reqHeaders["Task-Id"][0])
//return c.SendString(reqHeaders["Task-Id"][0])
...
@@ -698,9 +646,6 @@ func ApiAndJWT(c *fiber.Ctx) error {
...
@@ -698,9 +646,6 @@ func ApiAndJWT(c *fiber.Ctx) error {
}
}
//var cmd string
//cmd = task.Cmd
pbMsg
:=
pbUpstream
.
TaskContent
{
pbMsg
:=
pbUpstream
.
TaskContent
{
TaskId
:
reqHeaders
[
"Task-Id"
][
0
],
TaskId
:
reqHeaders
[
"Task-Id"
][
0
],
TaskType
:
uint64
(
task
.
ID
),
TaskType
:
uint64
(
task
.
ID
),
...
@@ -741,8 +686,6 @@ func ApiAndJWT(c *fiber.Ctx) error {
...
@@ -741,8 +686,6 @@ func ApiAndJWT(c *fiber.Ctx) error {
Bytes
:
pbBytes
,
Bytes
:
pbBytes
,
}
}
//fmt.Println("pbMsg.TaskUid--------------", pbMsg.TaskId)
asyncMode
:=
false
asyncMode
:=
false
// for k, v := range reqHeaders {
// for k, v := range reqHeaders {
...
@@ -750,7 +693,6 @@ func ApiAndJWT(c *fiber.Ctx) error {
...
@@ -750,7 +693,6 @@ func ApiAndJWT(c *fiber.Ctx) error {
// }
// }
if
headerSync
,
ok
:=
reqHeaders
[
"Async"
];
ok
{
if
headerSync
,
ok
:=
reqHeaders
[
"Async"
];
ok
{
//fmt.Println("sync-----------------sync", headerSync)
for
_
,
syncAsString
:=
range
headerSync
{
for
_
,
syncAsString
:=
range
headerSync
{
if
syncAsString
==
"true"
{
if
syncAsString
==
"true"
{
asyncMode
=
true
asyncMode
=
true
...
@@ -761,8 +703,6 @@ func ApiAndJWT(c *fiber.Ctx) error {
...
@@ -761,8 +703,6 @@ func ApiAndJWT(c *fiber.Ctx) error {
slog
.
Info
(
"task info"
,
"taskid"
,
pbMsg
.
TaskId
,
"asyncMode"
,
asyncMode
)
slog
.
Info
(
"task info"
,
"taskid"
,
pbMsg
.
TaskId
,
"asyncMode"
,
asyncMode
)
//fmt.Println("asyncMode-----------", asyncMode)
if
asyncMode
{
if
asyncMode
{
//time.Sleep(10 * time.Second)
//time.Sleep(10 * time.Second)
//m := make([]byte, 1024*1024+1024*512)
//m := make([]byte, 1024*1024+1024*512)
...
@@ -795,8 +735,6 @@ func syncModeF(c *fiber.Ctx, taskid string) error {
...
@@ -795,8 +735,6 @@ func syncModeF(c *fiber.Ctx, taskid string) error {
slog
.
Debug
(
"resAsPb.TaskResultHeader"
,
"resAsPb.TaskResultHeader"
,
resAsPb
.
TaskResultHeader
)
slog
.
Debug
(
"resAsPb.TaskResultHeader"
,
"resAsPb.TaskResultHeader"
,
resAsPb
.
TaskResultHeader
)
//fmt.Println("resAsPb.TaskResultHeader", string(resAsPb.TaskResultHeader), len(resAsPb.TaskResultHeader))
if
resAsPb
.
TaskResultHeader
!=
nil
{
if
resAsPb
.
TaskResultHeader
!=
nil
{
if
len
(
resAsPb
.
TaskResultHeader
)
!=
0
{
if
len
(
resAsPb
.
TaskResultHeader
)
!=
0
{
...
...
service-register.go
View file @
be1df639
package
main
package
main
import
(
// import (
"encoding/json"
// "encoding/json"
"time"
// "time"
"github.com/odysseus/service-registry/common"
// "github.com/odysseus/service-registry/common"
"github.com/odysseus/service-registry/query"
// registry "github.com/odysseus/service-registry/registry"
registry
"github.com/odysseus/service-registry/registry"
// )
)
// type demoService struct {
type
demoService
struct
{
// }
}
// func (d demoService) ServiceType() common.ServiceType {
func
(
d
demoService
)
ServiceType
()
common
.
ServiceType
{
// return common.SERVICE_API_GATEWAY
return
common
.
SERVICE_API_GATEWAY
// }
}
// func (d demoService) Endpoint() string {
func
(
d
demoService
)
Endpoint
()
string
{
// return "http://service-endpoint_ai-api-magr:6001"
return
"http://service-endpoint_ai-api-magr:6001"
// }
}
// func (d demoService) DetailInfo() (json.RawMessage, error) {
func
(
d
demoService
)
DetailInfo
()
(
json
.
RawMessage
,
error
)
{
// //return nil, nil
//return nil, nil
// detail := struct {
detail
:=
struct
{
// IP string `json:"ip"`
IP
string
`json:"ip"`
// WorkerCount int `json:"worker_count"`
WorkerCount
int
`json:"worker_count"`
// }{}
}{}
// detail.IP = ""
detail
.
IP
=
""
// detail.WorkerCount = 10
detail
.
WorkerCount
=
10
// return json.Marshal(detail)
return
json
.
Marshal
(
detail
)
// }
}
// func Registry(redisAddr, passWd string) {
func
Registry
(
redisAddr
,
passWd
string
)
{
// // _ = query.GatewayInfo{
_
=
query
.
GatewayInfo
{
// // Endpoint: "http://query-endpoint_ai-api-magr:6001",
Endpoint
:
"http://query-endpoint_ai-api-magr:6001"
,
// // }
}
// r := registry.NewRegistry(registry.RedisConnParam{
r
:=
registry
.
NewRegistry
(
registry
.
RedisConnParam
{
// Addr: redisAddr,
Addr
:
redisAddr
,
// Password: passWd,
Password
:
passWd
,
// DbIndex: 0,
DbIndex
:
0
,
// }, demoService{})
},
demoService
{})
// r.Start()
r
.
Start
()
// time.Sleep(time.Second * 5)
time
.
Sleep
(
time
.
Second
*
5
)
// r.Stop()
r
.
Stop
()
// }
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment