Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
A
ApiToKafkaBill
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
李伟@五瓣科技
ApiToKafkaBill
Commits
4748c851
Commit
4748c851
authored
Jan 29, 2024
by
Your Name
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix route path match
parent
173de253
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
29 additions
and
289 deletions
+29
-289
main.go
main.go
+29
-289
No files found.
main.go
View file @
4748c851
...
...
@@ -7,6 +7,7 @@ import (
"log"
"math/rand"
"strconv"
"strings"
"sync"
"time"
...
...
@@ -332,32 +333,37 @@ func main() {
_
=
cache
// Wait for the consumer goroutine to be ready
//wg.Wait()
app
.
Get
(
"/*"
,
func
(
c
*
fiber
.
Ctx
)
error
{
//fmt.Println("c.BaseURL() get", c.BaseURL())
fmt
.
Println
(
"c.Path()"
,
c
.
Path
())
v1
:=
app
.
Group
(
"/v1"
)
return
nil
})
v1
.
Post
(
"/callback"
,
func
(
c
*
fiber
.
Ctx
)
error
{
body
:=
c
.
Body
()
app
.
Post
(
"/start/*"
,
func
(
c
*
fiber
.
Ctx
)
error
{
//fmt.Println("c.BaseURL() post", c.BaseURL())
fmt
.
Println
(
"c.Path()"
,
c
.
Path
())
fmt
.
Println
(
"callback body-----"
,
string
(
body
))
var
resbody
pbUpstream
.
TaskResponse
paths
:=
[]
string
{
c
.
Path
(),
if
err
:=
gogoPbProto
.
Unmarshal
(
body
,
&
resbody
);
err
!=
nil
{
return
c
.
SendString
(
fmt
.
Sprintf
(
"Unmarshal error %v"
,
err
.
Error
()))
}
re
qHeaders
:=
c
.
GetReqHeaders
(
)
re
s
(
resbody
)
// for k, v := range reqHeaders {
// fmt.Printf("k %v v %v \n", k, v)
// }
return
c
.
SendStatus
(
200
)
})
uid
:=
reqHeaders
[
"X-Consumer-Custom-Id"
]
v1
.
Post
(
"/*"
,
func
(
c
*
fiber
.
Ctx
)
error
{
fmt
.
Println
(
"c.Route().Path"
,
c
.
Route
()
.
Path
)
routePath
:=
c
.
Route
()
.
Path
routePathWithoutStar
:=
strings
.
TrimSuffix
(
routePath
,
"/*"
)
fmt
.
Println
(
"uid"
,
uid
)
path
:=
c
.
Path
()
pathInDB
:=
strings
.
TrimPrefix
(
path
,
routePathWithoutStar
)
reqHeaders
:=
c
.
GetReqHeaders
()
uid
:=
reqHeaders
[
"X-Consumer-Custom-Id"
]
if
uid
==
nil
{
return
c
.
SendString
(
fmt
.
Sprintf
(
"uid can not be nil"
))
...
...
@@ -369,57 +375,17 @@ func main() {
uidAsInt
,
err
:=
strconv
.
Atoi
(
uid
[
0
])
// uids := []int64{
// int64(uidAsInt),
// }
_
=
paths
fmt
.
Println
(
"c.Path(), int64(uidAsInt)"
,
c
.
Path
(),
int64
(
uidAsInt
))
fmt
.
Println
(
"req pathInDB, int64(uidAsInt)"
,
pathInDB
,
int64
(
uidAsInt
))
task
,
err
:=
cache
.
Query
(
c
.
Path
()
,
int64
(
uidAsInt
))
task
,
err
:=
cache
.
Query
(
pathInDB
,
int64
(
uidAsInt
))
if
err
!=
nil
{
return
c
.
SendString
(
fmt
.
Sprintf
(
"cache.Query %v"
,
err
.
Error
()))
}
// taskChan, err := cache.MQuery(paths, uids)
// if err != nil {
// //t.Error(err)
// fmt.Println("cache.MQuery", err.Error())
// }
// count := 0
// for task := range taskChan {
// count++
// t.Log(task)
// if count == len(testPaths) {
// break
// }
// }
// type TaskType struct {
// ID int64 `orm:"column(id)" db:"id" json:"id" form:"id"` // 任务类型id
// Desc string `orm:"column(desc)" db:"desc" json:"desc" form:"desc"` // 任务类型描述
// Price int64 `orm:"column(price)" db:"price" json:"price" form:"price"` // 该任务类型的费用
// Complexity int64 `orm:"column(complexity)" db:"complexity" json:"complexity" form:"complexity"`
// HardwareRequire string `orm:"column(hardware_require)" db:"hardware_require" json:"hardware_require" form:"hardware_require"`
// ImageId string `orm:"column(image_id)" db:"image_id" json:"image_id" form:"image_id"`
// ImageUrl string `orm:"column(image_url)" db:"image_url" json:"image_url" form:"image_url"`
// Cmd string `orm:"column(cmd)" db:"cmd" json:"cmd" form:"cmd"`
// Workload int64 `orm:"column(workload)" db:"workload" json:"workload" form:"workload"`
// ApiPath string `orm:"column(api_path)" db:"api_path" json:"api_path" form:"api_path"`
// CreatedTime time.Time `orm:"column(created_time)" db:"created_time" json:"created_time" form:"created_time"` // 创建时间
// UpdatedTime time.Time `orm:"column(updated_time)" db:"updated_time" json:"updated_time" form:"updated_time"` // 更新时间
// Deleted int64 `orm:"column(deleted)" db:"deleted" json:"deleted" form:"deleted"` // 逻辑删除
// }
//var taskType pbUpstream.TaskType
var
cmd
string
var
fee
int64
//for task := range taskChan {
//taskId = pbUpstream.TaskType(task.ID)
cmd
=
task
.
Cmd
//}
pbMsg
:=
pbUpstream
.
TaskContent
{
TaskUuid
:
reqHeaders
[
"Task-Id"
][
0
],
...
...
@@ -428,7 +394,7 @@ func main() {
TaskCmd
:
cmd
,
TaskParam
:
[]
byte
(
reqHeaders
[
"Task-Id"
][
0
]),
TaskTimestamp
:
uint64
(
time
.
Now
()
.
UnixMilli
()),
TaskCallback
:
"http://192.168.1.220:6000/callback"
,
TaskCallback
:
"http://192.168.1.220:6000/
v1/
callback"
,
TaskUid
:
reqHeaders
[
"X-Consumer-Custom-Id"
][
0
],
TaskFee
:
fmt
.
Sprintf
(
"%d"
,
fee
),
}
...
...
@@ -440,7 +406,7 @@ func main() {
return
c
.
SendString
(
fmt
.
Sprintf
(
"json.Marshal %v"
,
err
.
Error
()))
}
fmt
.
Println
(
"msgAsJson"
,
string
(
msgAsJson
))
fmt
.
Println
(
"msgAsJson
to kafka
"
,
string
(
msgAsJson
))
pbBytes
,
err
:=
gogoPbProto
.
Marshal
(
&
pbMsg
)
...
...
@@ -451,7 +417,6 @@ func main() {
producerMessagesBytes
<-
bytesAndHeader
{
Bytes
:
pbBytes
,
//HttpHeader: reqHeaders,
}
fmt
.
Println
(
"pbMsg.TaskUid--------------"
,
pbMsg
.
TaskUuid
)
...
...
@@ -461,7 +426,6 @@ func main() {
resAsJson
:=
ResponseJson
{
TaskUUID
:
resAsPb
.
TaskUuid
,
//TaskId: resAsPb.TaskId,
TaskResult
:
resAsPb
.
TaskResult
,
TaskUid
:
resAsPb
.
TaskUid
,
TaskFee
:
resAsPb
.
TaskFee
,
...
...
@@ -473,229 +437,5 @@ func main() {
})
// app.Post("/chat/completions", func(c *fiber.Ctx) error {
// body := c.Body()
// fmt.Println("body", string(body))
// reqHeaders := c.GetReqHeaders()
// // fmt.Println("reqHeaders[\"Task-Id\"]", reqHeaders["Task-Id"])
// //prefix := fmt.Sprintf("%02x", ImagesGenerations)
// // _ =pbUpstream
// pbMsg := pbUpstream.TaskContent{
// TaskId: reqHeaders["Task-Id"],
// TaskType: ChatCompletionsType,
// TaskCmd: "not provide",
// TaskParam: body,
// TaskTimestamp: uint64(time.Now().UnixMilli()),
// TaskCallback: "http://192.168.1.220:6000/callback",
// TaskUid: reqHeaders["X-Consumer-Custom-Id"],
// TaskFee: ChatCompletionsFeeStr,
// }
// pbBytes, err := gogoPbProto.Marshal(&pbMsg)
// if err != nil {
// fmt.Println("pb error", err.Error())
// return c.SendString(fmt.Sprintf("pb error: %v", err.Error()))
// }
// // res := make([]byte, 0, len(prefix)+len(reqHeaders["Task-Id"])+len(body))
// // res = append(res, []byte(prefix)...)
// // res = append(res, reqHeaders["Task-Id"]...)
// // res = append(res, body...)
// accept := Bill.Meter(UserFee{
// User: reqHeaders["X-Consumer-Custom-Id"],
// Fee: decimal.NewFromInt(ChatCompletionsFee),
// })
// if accept {
// producerMessagesBytes <- bytesAndHeader{
// Bytes: pbBytes,
// HttpHeader: reqHeaders,
// }
// wait := req(pbMsg.TaskId)
// resAsPb := <-wait
// resAsJson := ResponseJson{
// TaskId: resAsPb.TaskId,
// TaskResult: resAsPb.TaskResult,
// TaskUid: resAsPb.TaskUid,
// TaskFee: resAsPb.TaskFee,
// }
// return c.JSON(resAsJson)
// //return c.SendStatus(200)
// } else {
// return c.SendString("your balance can not pay the request fee")
// }
// //return c.SendString("Message sent to Kafka producer.")
// })
// app.Post("/images/generations", func(c *fiber.Ctx) error {
// body := c.Body()
// fmt.Println("body", string(body))
// reqHeaders := c.GetReqHeaders()
// fmt.Println("reqHeaders[\"Task-Id\"]", reqHeaders["Task-Id"])
// //prefix := fmt.Sprintf("%02x", ImagesGenerations)
// // _ =pbUpstream
// pbMsg := pbUpstream.TaskContent{
// TaskId: reqHeaders["Task-Id"],
// TaskType: ImagesGenerationsType,
// TaskCmd: "not provide",
// TaskParam: body,
// TaskTimestamp: uint64(time.Now().UnixMilli()),
// TaskCallback: "http://192.168.1.220:6000/callback",
// TaskUid: reqHeaders["X-Consumer-Custom-Id"],
// TaskFee: ImagesGenerationsFeeStr,
// }
// pbBytes, err := gogoPbProto.Marshal(&pbMsg)
// if err != nil {
// fmt.Println("pb error", err.Error())
// return c.SendString(fmt.Sprintf("pb error: %v", err.Error()))
// }
// accept := Bill.Meter(UserFee{
// User: reqHeaders["X-Consumer-Custom-Id"],
// Fee: decimal.NewFromInt(ChatCompletionsFee),
// })
// if accept {
// producerMessagesBytes <- bytesAndHeader{
// Bytes: pbBytes,
// HttpHeader: reqHeaders,
// }
// wait := req(pbMsg.TaskId)
// resAsPb := <-wait
// resAsJson := ResponseJson{
// TaskId: resAsPb.TaskId,
// TaskResult: resAsPb.TaskResult,
// TaskUid: resAsPb.TaskUid,
// TaskFee: resAsPb.TaskFee,
// }
// return c.JSON(resAsJson)
// //return c.SendStatus(200)
// } else {
// return c.SendString("your balance can not pay the request fee")
// }
// // res := make([]byte, 0, len(prefix)+len(reqHeaders["Task-Id"])+len(body))
// // res = append(res, []byte(prefix)...)
// // res = append(res, reqHeaders["Task-Id"]...)
// // res = append(res, body...)
// // producerMessagesBytes <- bytesAndHeader{
// // Bytes: pbBytes,
// // HttpHeader: reqHeaders,
// // }
// // return c.SendStatus(200)
// // return c.SendStatus(200)
// // wait := req(pbMsg.TaskId)
// // resAsPb := <-wait
// // resAsJson := ResponseJson{
// // TaskId: resAsPb.TaskId,
// // TaskResult: resAsPb.TaskResult,
// // TaskUid: resAsPb.TaskUid,
// // TaskFee: resAsPb.TaskFee,
// // }
// // return c.JSON(resAsJson)
// //return c.SendStatus(200)
// //return c.SendString("Message sent to Kafka producer.")
// })
// app.Post("/images/variations", func(c *fiber.Ctx) error {
// body := c.Body()
// fmt.Println("body", string(body))
// return c.SendStatus(200)
// // message := c.Params("message")
// // // Sending message to the Kafka producer via the producerMessages channel
// // producerMessages <- message
// // return c.SendString("Message sent to Kafka producer.")
// })
// app.Post("/images/edits", func(c *fiber.Ctx) error {
// body := c.Body()
// fmt.Println("body", string(body))
// return c.SendStatus(200)
// // message := c.Params("message")
// // // Sending message to the Kafka producer via the producerMessages channel
// // producerMessages <- message
// // return c.SendString("Message sent to Kafka producer.")
// })
// // The /producer endpoint for sending messages to the Kafka producer
// app.Get("/producer/:message", func(c *fiber.Ctx) error {
// message := c.Params("message")
// // Sending message to the Kafka producer via the producerMessages channel
// producerMessages <- message
// return c.SendString("Message sent to Kafka producer.")
// })
// // The /consumer endpoint for receiving messages from the Kafka consumer
// app.Get("/consumer", func(c *fiber.Ctx) error {
// select {
// case msg := <-consumerMessages:
// // If a message is available in the consumerMessages channel, return it as the response.
// return c.SendString(fmt.Sprintf("Received from Kafka consumer: %s", msg))
// case <-time.After(4 * time.Second):
// // If no message is available within 4 seconds, respond with a default message.
// return c.SendString("No messages available at the moment. Please try again later.")
// }
// })
app
.
Post
(
"/callback"
,
func
(
c
*
fiber
.
Ctx
)
error
{
body
:=
c
.
Body
()
fmt
.
Println
(
"body----------------"
,
string
(
body
))
fmt
.
Println
(
"end"
)
var
resbody
pbUpstream
.
TaskResponse
if
err
:=
gogoPbProto
.
Unmarshal
(
body
,
&
resbody
);
err
!=
nil
{
return
c
.
SendString
(
fmt
.
Sprintf
(
"Unmarshal error %v"
,
err
.
Error
()))
}
res
(
resbody
)
return
c
.
SendStatus
(
200
)
// message := c.Params("message")
// // Sending message to the Kafka producer via the producerMessages channel
// producerMessages <- message
// return c.SendString("Message sent to Kafka producer.")
})
log
.
Fatal
(
app
.
Listen
(
"0.0.0.0:6000"
))
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment