Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
A
ApiToKafkaBill
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
李伟@五瓣科技
ApiToKafkaBill
Commits
7967213e
Commit
7967213e
authored
Feb 02, 2024
by
Your Name
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add docker file
parent
180276e9
Changes
6
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
379 additions
and
97 deletions
+379
-97
dockerfile
dockerfile
+40
-0
docs.go
docs/docs.go
+0
-0
swagger.json
docs/swagger.json
+108
-0
swagger.yaml
docs/swagger.yaml
+0
-0
swagger.json
docs/v1/swagger.json
+0
-7
main.go
main.go
+231
-90
No files found.
dockerfile
0 → 100644
View file @
7967213e
FROM
golang:1.21.3 AS base
#FROM golang:1.20.6-alpine3.18 AS base
#FROM golang:1.21.3 AS base
#FROM golang:1.20.6-alpine3.18 AS build
WORKDIR
/go/src/github.com/odysseus/
ENV
https_proxy 'http://192.168.1.180:1080'
#RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
RUN
mkdir
-p
./go-kafka/
COPY
./ ./go-kafka
COPY
./payment ./payment
COPY
./odysseus-protocol ./odysseus-protocol
#RUN pwd && ls && sleep 10
# RUN go mod tidy
#RUN go build -o /go-kafka
RUN
cd
go-kafka
&&
go mod tidy
&&
go build
-o
/ai-api-mgr
RUN
ls
/ai-api-mgr
&&
sleep
30
FROM
alpine
WORKDIR
/root
RUN
ls
/usr/bin/
&&
sleep
10
COPY
--from=base /ai-api-mgr /usr/bin/
RUN
ls
/usr/bin/ai-api-mgr
&&
sleep
10
#/usr/bin/go-kafka
# Add entrypoint script
#COPY ./scripts/entrypoint.sh /usr/local/bin/entrypoint.sh
#RUN chmod u+x /usr/local/bin/entrypoint.sh
docs/
v1/
docs.go
→
docs/docs.go
View file @
7967213e
File moved
docs/swagger.json
0 → 100644
View file @
7967213e
{
"swagger"
:
"2.0"
,
"info"
:
{
"title"
:
"GPU+Model service"
,
"version"
:
"1.0.0"
},
"paths"
:
{
"/{TaskType}"
:
{
"post"
:
{
"summary"
:
"customer private api"
,
"operationId"
:
"place a task"
,
"parameters"
:
[
{
"name"
:
"TaskType"
,
"in"
:
"path"
,
"required"
:
true
,
"type"
:
"string"
,
"default"
:
"startdocker"
},
{
"name"
:
"apikey"
,
"in"
:
"header"
,
"required"
:
true
,
"type"
:
"string"
,
"maxLength"
:
32
,
"default"
:
"YNExx9qR8aBoZuxfNTBCDkVJ3EiedIma"
},
{
"in"
:
"body"
,
"schema"
:{
"$ref"
:
"#/definitions/ReqTask"
},
"examples"
:{
"modelType"
:{
"value"
:
"language"
},
"model"
:{
"value"
:
"chat"
}
}
}
],
"responses"
:
{
"200"
:
{
"description"
:
"success"
,
"schema"
:
{
"$ref"
:
"#/definitions/User"
}
}
}
}
},
"/callback"
:
{
"post"
:
{}
}
},
"definitions"
:
{
"ReqTask"
:{
"type"
:
"object"
,
"properties"
:
{
"modelType"
:
{
"type"
:
"string"
},
"model"
:{
"type"
:
"string"
},
"desc"
:{
"type"
:
"string"
},
"token"
:{
"type"
:
"string"
}
}
},
"User"
:
{
"type"
:
"object"
,
"properties"
:
{
"id"
:
{
"type"
:
"integer"
,
"format"
:
"int64"
},
"username"
:
{
"type"
:
"string"
},
"email"
:
{
"type"
:
"string"
}
}
},
"Error"
:
{
"type"
:
"object"
,
"properties"
:
{
"code"
:
{
"type"
:
"integer"
,
"format"
:
"int32"
},
"message"
:
{
"type"
:
"string"
}
}
}
}
}
docs/
v1/
swagger.yaml
→
docs/swagger.yaml
View file @
7967213e
File moved
docs/v1/swagger.json
deleted
100644 → 0
View file @
180276e9
{
"swagger"
:
"2.0"
,
"info"
:
{
"contact"
:
{}
},
"paths"
:
{}
}
\ No newline at end of file
main.go
View file @
7967213e
...
...
@@ -4,13 +4,18 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"flag"
"github.com/IBM/sarama"
"github.com/gofiber/fiber/v2"
...
...
@@ -20,11 +25,16 @@ import (
"github.com/odysseus/payment/cachedata"
"github.com/odysseus/payment/model"
"github.com/gofiber/contrib/swagger"
)
var
(
// Update with your Kafka broker addresses
kafkaBrokers
=
[]
string
{
"localhost:9092"
}
callbackAddr
=
""
// Update with your Kafka topic name
topic
=
"test_topic"
// Set an appropriate buffer size based on your requirements
...
...
@@ -212,7 +222,7 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
select
{
case
task
:=
<-
resStream
:
nanoseconds
:=
int64
(
task
.
Task
FinishTime
)
nanoseconds
:=
int64
(
task
.
Task
Timestamp
)
seconds
:=
nanoseconds
/
1e9
sender
.
Table
(
resTableName
)
.
...
...
@@ -226,7 +236,8 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
Int64Column
(
"fee"
,
task
.
TaskFee
)
.
Int64Column
(
"workload"
,
int64
(
task
.
TaskWorkload
))
.
Int64Column
(
"out_len"
,
int64
(
task
.
TaskOutLen
))
.
Int64Column
(
"duration"
,
int64
(
task
.
TaskDuration
))
.
Int64Column
(
"task_duration"
,
int64
(
task
.
TaskDuration
))
.
Int64Column
(
"exec_duration"
,
int64
(
0
))
.
AtNow
(
ctx
)
err
=
sender
.
Flush
(
ctx
)
...
...
@@ -283,22 +294,6 @@ func batchToQuestDb(done chan interface{}, reqStream chan pbUpstream.TaskContent
}
}
func
randomInput
()
int
{
return
rand
.
Intn
(
1536
)
}
func
randomUID
()
int
{
return
rand
.
Intn
(
10000
)
}
func
randomFeeAndWorkload
()
int
{
return
rand
.
Intn
(
100
)
}
func
randomType
()
int
{
return
rand
.
Intn
(
50
)
}
type
ResponseJson
struct
{
TaskUUID
string
TaskResult
[]
byte
...
...
@@ -324,9 +319,107 @@ func newCache() *cachedata.CacheData {
return
_cache
}
var
cache
*
cachedata
.
CacheData
var
createTaskTableSql
string
=
`CREATE TABLE IF NOT EXISTS
'tasks' (
id UUID,
time TIMESTAMP,
type symbol CAPACITY 128 INDEX CAPACITY 1048576,
uid symbol CAPACITY 1024 INDEX CAPACITY 1048576,
fee INT,
in_len INT
) timestamp (time) PARTITION BY DAY WAL;`
var
createBillsTableSql
string
=
`CREATE TABLE IF NOT EXISTS
'bills' (
id UUID,
time TIMESTAMP,
type symbol CAPACITY 128 CACHE INDEX CAPACITY 8192,
uid symbol CAPACITY 128 CACHE INDEX CAPACITY 8192,
fee INT,
out_len INT,
workload INT,
task_duration INT,
exec_duration INT,
profit_acc symbol CAPACITY 128 CACHE INDEX CAPACITY 8192,
worker_acc symbol CAPACITY 128 CACHE INDEX CAPACITY 8192,
result symbol CAPACITY 128 CACHE INDEX CAPACITY 8192
) timestamp (time) PARTITION BY DAY WAL;`
func
createTable
(
ipAddr
,
tableSql
string
)
{
//"http://localhost:9000"
u
,
err
:=
url
.
Parse
(
"http://"
+
ipAddr
+
":9000"
)
checkErr
(
err
)
u
.
Path
+=
"exec"
params
:=
url
.
Values
{}
params
.
Add
(
"query"
,
tableSql
)
u
.
RawQuery
=
params
.
Encode
()
url
:=
fmt
.
Sprintf
(
"%v"
,
u
)
res
,
err
:=
http
.
Get
(
url
)
checkErr
(
err
)
defer
res
.
Body
.
Close
()
body
,
err
:=
io
.
ReadAll
(
res
.
Body
)
checkErr
(
err
)
log
.
Println
(
string
(
body
))
}
func
checkErr
(
err
error
)
{
if
err
!=
nil
{
panic
(
err
)
}
}
func
main
()
{
// Create a new instance of the Fiber web framework.
app
:=
fiber
.
New
()
var
questAddr
,
kafkaBroker
,
callbackAddrP
,
listenIpPort
,
aigcProduceTopic
,
aigcConsumerTopic
string
var
redisAddr
,
mysqlAddr
string
//flag.StringVar(&questAddr, "questAddr", "192.168.1.10:9009", "questDbAddr")
flag
.
StringVar
(
&
questAddr
,
"questAddr"
,
"localhost:9009"
,
"questDbAddr"
)
flag
.
StringVar
(
&
kafkaBroker
,
"kafkaBroker"
,
"localhost:9092"
,
"single kafka broker"
)
flag
.
StringVar
(
&
redisAddr
,
"redisAddr"
,
"localhost:6379"
,
"Redis Addr"
)
flag
.
StringVar
(
&
mysqlAddr
,
"mysqlAddr"
,
"localhost:3306"
,
"Mysql Addr"
)
flag
.
StringVar
(
&
callbackAddrP
,
"callbackIpAddr"
,
"localhost:6001"
,
"ip:port"
)
flag
.
StringVar
(
&
listenIpPort
,
"listenIpPort"
,
"0.0.0.0:6001"
,
"api listen on ip:port"
)
flag
.
StringVar
(
&
aigcProduceTopic
,
"aigcProduceTopic"
,
"pbaigc"
,
"produce topic, default value is: pbaigc"
)
flag
.
StringVar
(
&
aigcConsumerTopic
,
"aigcConsumerTopic"
,
"taskreceipt"
,
"consumer topic, default value is: taskreceipt"
)
flag
.
Parse
()
fmt
.
Println
(
"questAddr|"
,
questAddr
)
fmt
.
Println
(
"redisAddr|"
,
redisAddr
)
fmt
.
Println
(
"mysqlAddr|"
,
mysqlAddr
)
fmt
.
Println
(
"kafkaBroker|"
,
kafkaBroker
)
fmt
.
Println
(
"callbackIpAddr|"
,
callbackAddrP
)
fmt
.
Println
(
"listenIpPort|"
,
listenIpPort
)
fmt
.
Println
(
"aigcProduceTopic|"
,
aigcProduceTopic
)
fmt
.
Println
(
"aigcConsumerTopic|"
,
aigcConsumerTopic
)
kafkaBrokers
=
[]
string
{
kafkaBroker
}
callbackAddr
=
callbackAddrP
idx
:=
strings
.
Index
(
questAddr
,
":"
)
if
idx
==
-
1
{
fmt
.
Println
(
"please use the format: 1.1.1.1:9009"
)
return
}
createTable
(
questAddr
[
:
idx
],
createTaskTableSql
)
createTable
(
questAddr
[
:
idx
],
createBillsTableSql
)
cache
=
newCache
()
// Create a WaitGroup to synchronize goroutines.
wg
:=
&
sync
.
WaitGroup
{}
...
...
@@ -340,22 +433,40 @@ func main() {
done
:=
make
(
chan
interface
{})
reqToQuestDb
,
resToQuestDb
:=
kafkaConsumerBytes
(
done
,
aigc
Topic
,
"taskreceipt"
)
reqToQuestDb
,
resToQuestDb
:=
kafkaConsumerBytes
(
done
,
aigc
ProduceTopic
,
aigcConsumerTopic
)
go
batchToQuestDb
(
done
,
reqToQuestDb
,
resToQuestDb
,
"tasks"
,
"bills"
,
"192.168.1.220:9009"
)
go
batchToQuestDb
(
done
,
reqToQuestDb
,
resToQuestDb
,
"tasks"
,
"bills"
,
questAddr
)
go
recordUUID
()
cache
:=
newCache
()
_
=
cache
// Create a new instance of the Fiber web framework.
app
:=
fiber
.
New
()
v1
:=
app
.
Group
(
"/v1"
)
cfg
:=
swagger
.
Config
{
BasePath
:
"/"
,
FilePath
:
"./docs/swagger.json"
,
Path
:
"swagger"
,
Title
:
"Swagger API Docs"
,
}
v1
.
Post
(
"/callback"
,
func
(
c
*
fiber
.
Ctx
)
error
{
app
.
Use
(
swagger
.
New
(
cfg
))
body
:=
c
.
Body
()
apiGroup
:=
app
.
Group
(
"/api"
)
jwtGroup
:=
app
.
Group
(
"/jwt"
)
callbackGroup
:=
app
.
Group
(
"/callback"
)
apiGroupV1
:=
apiGroup
.
Group
(
"/v1"
)
apiGroupV1
.
Post
(
"/*"
,
ApiAndJWT
)
fmt
.
Println
(
"callback body-----"
,
string
(
body
))
jwtGroupV1
:=
jwtGroup
.
Group
(
"/v1"
)
jwtGroupV1
.
Post
(
"/*"
,
ApiAndJWT
)
callbackGroupV1
:=
callbackGroup
.
Group
(
"/v1"
)
callbackGroupV1
.
Post
(
"/"
,
func
(
c
*
fiber
.
Ctx
)
error
{
fmt
.
Println
(
"c.Route().Path"
,
c
.
Route
()
.
Path
)
body
:=
c
.
Body
()
var
resbody
pbUpstream
.
TaskResponse
...
...
@@ -368,7 +479,11 @@ func main() {
return
c
.
SendStatus
(
200
)
})
v1
.
Post
(
"/*"
,
func
(
c
*
fiber
.
Ctx
)
error
{
log
.
Fatal
(
app
.
Listen
(
"0.0.0.0:6001"
))
}
func
ApiAndJWT
(
c
*
fiber
.
Ctx
)
error
{
fmt
.
Println
(
"c.Route().Path"
,
c
.
Route
()
.
Path
)
routePath
:=
c
.
Route
()
.
Path
...
...
@@ -407,9 +522,9 @@ func main() {
TaskId
:
uint64
(
task
.
ID
),
TaskType
:
1
,
TaskCmd
:
cmd
,
TaskParam
:
[]
byte
(
reqHeaders
[
"Task-Id"
][
0
]),
TaskParam
:
c
.
Body
(),
//
[]byte(reqHeaders["Task-Id"][0]),
TaskTimestamp
:
uint64
(
time
.
Now
()
.
UnixNano
()),
TaskCallback
:
"http://192.168.1.220:6000/v1/callback
"
,
TaskCallback
:
"http://"
+
callbackAddr
+
"/callback/v1"
,
//"http://192.168.1.10:6001/callback/v1
",
TaskUid
:
reqHeaders
[
"X-Consumer-Custom-Id"
][
0
],
TaskFee
:
fmt
.
Sprintf
(
"%d"
,
task
.
Price
),
TaskInLen
:
int32
(
len
(
c
.
Body
())),
...
...
@@ -440,9 +555,38 @@ func main() {
wait
:=
req
(
pbMsg
.
TaskUuid
)
resAsPb
:=
<-
wait
fmt
.
Println
(
"resAsPb.TaskResultHeader"
,
string
(
resAsPb
.
TaskResultHeader
),
len
(
resAsPb
.
TaskResultHeader
))
if
resAsPb
.
TaskResultHeader
!=
nil
{
if
len
(
resAsPb
.
TaskResultHeader
)
!=
0
{
headers
:=
make
(
map
[
string
][]
string
)
if
err
:=
json
.
Unmarshal
(
resAsPb
.
TaskResultHeader
,
&
headers
);
err
!=
nil
{
fmt
.
Println
(
"json.Unmarshal(resAsPb.TaskResultHeader--------------err"
,
err
.
Error
())
return
c
.
SendString
(
fmt
.
Sprintf
(
"json.Unmarshal(resAsPb.TaskResultHeader--------------err %v"
,
err
.
Error
()))
}
for
k
,
vs
:=
range
headers
{
for
_
,
v
:=
range
vs
{
if
k
==
"Content-Encoding"
{
c
.
Response
()
.
Header
.
SetContentEncoding
(
v
)
}
if
k
==
"Content-Type"
{
c
.
Response
()
.
Header
.
SetContentType
(
v
)
}
}
}
}
}
resAsJson
:=
ResponseJson
{
TaskUUID
:
resAsPb
.
TaskUuid
,
TaskResult
:
resAsPb
.
TaskResult
,
TaskResult
:
resAsPb
.
GetTaskResultBody
()
,
TaskUid
:
resAsPb
.
TaskUid
,
TaskFee
:
resAsPb
.
TaskFee
,
IsSuccess
:
resAsPb
.
TaskIsSucceed
,
...
...
@@ -451,9 +595,6 @@ func main() {
return
c
.
JSON
(
resAsJson
)
})
log
.
Fatal
(
app
.
Listen
(
"0.0.0.0:6000"
))
}
/////////////////////////////
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment